Introducción

Este documento explicará cómo configurarse para utilizar Human Task Manager(HTM) con IPF.

Paso 0: Dependencia

Agregue las siguientes dependencias a su aplicación pom.xml:

<!--Allows for your application to create tasks in HTM-->
<dependency>
    <groupId>com.iconsolutions.ipf.htm</groupId>
    <artifactId>htm-client-sdk-request-reply-connector-starter</artifactId>
</dependency>
<!--Allows for your app to receive notifications (over Kafka) when tasks are marked as completed-->
<dependency>
    <groupId>com.iconsolutions.ipf.htm</groupId>
    <artifactId>htm-client-sdk-notification-kafka-starter</artifactId>
</dependency>

<!--Optional dependency, allows for your application to create tasks in HTM by sending Kafka messages-->
<dependency>
    <groupId>com.iconsolutions.ipf.htm</groupId>
    <artifactId>htm-client-sdk-async-connector-kafka-starter</artifactId>
</dependency>
<!--Optional dependency, allows for your application to receive tasks in HTM-->
<dependency>
    <groupId>com.iconsolutions.ipf.htm</groupId>
    <artifactId>htm-receive-connector-core-starter</artifactId>
</dependency>
Explícito HTM las versiones no son necesarias si está importando el IPF BOM

Paso 1: Registre un controlador para ser notificado de las tareas

Cuando notifications se reciben a través de Kafka que las tareas han sido marcadas como completadas, el Receive Connector que recibe estos mensajes buscará una interfaz llamada HtmClientReceiveNotificationPort, que tiene la siguiente firma:

public interface HtmClientReceiveNotificationPort {
    default ProcessingContext determineContextFor(TransportMessage message) {
        var messageHeaders = message.getMessageHeaders();
        var builder = ProcessingContext.builder()
                .associationId(messageHeaders.getHeader("associationId").map(Object::toString).orElse(null))
                .unitOfWorkId(messageHeaders.getHeader("uowId").map(Object::toString).orElse(null));
        messageHeaders.getHeader("requestId").map(Object::toString).ifPresent(builder::clientRequestId);
        messageHeaders.getHeader("processingEntity").map(Object::toString).ifPresent(builder::clientRequestId);
        return builder.build();
    }
    CompletionStage<Void> receiveNotification(ReceivingContext receivingContext, TaskClosedNotification taskClosedNotification);
}

Si no hay una implementación disponible para esta interfaz, el contexto de Spring no se iniciará y fallará con el siguiente error:

Caused by: org.springframework.beans.factory. NoSuchBeanDefinitionException: No qualifying bean of type 'com.iconsolutions.ipf.htm.client.receive.sdk. HtmClientReceiveNotificationPort' available: expected at least 1 bean which qualifies as autowire candidate

Solo debe implementar receiveNotification según lo siguiente:

public class HtmTaskClosedNotificationReceiveAdapter implements HtmClientReceiveNotificationPort {

    @Override
    public CompletionStage<Void> receiveNotification(ReceivingContext receivingContext, TaskClosedNotification taskClosedNotification) {
        return HtmclientmodelDomain.humanTaskManager().handle(
                        new TaskClosedNotificationInput. Builder(receivingContext.getProcessingContext().getAssociationId().getValue())
                                .withTaskClosedResult(toTaskClosedNotification(taskClosedNotification))
                                .build())
                .thenAccept(done -> log.info("Completed {}", done));
    }
}

El ejemplo anterior nos muestra llamando a un IPF flow para continuar su ejecución; este es el tipo típico de uso para una aplicación que utiliza HTM pero cualquier otra implementación puede ser conectada aquí en su lugar.

Paso 2: Kafka Configuración

Receive connector s

El Kafka Receive Connector está configurado de la siguiente manera:

akka {
  kafka {
    consumer {
      kafka-clients {
        bootstrap.servers = "kafka:9092"
      }
      restart-settings = ${common-flow-restart-settings}

    }
  }
}

htm {
  kafka {
    consumer {
      topic = HTM_TASK_CLOSED_NOTIFICATION
      restart-settings = ${common-flow-restart-settings}
      kafka-clients {
        group.id = htm-task-closed-notification-group
      }
    }
  }
}

Si desea utilizar Kafka para registrarse HTM tareas,Kafka Send Connector se utiliza para ello, y por defecto está configurado de la siguiente manera:

ipf.htm.async.register-task {
  type = kafka

  kafka {
    producer {
      topic = HTM_REGISTER_TASK
    }
  }
}

De manera similar, si usted desea utilizar Kafka para cancelar HTM tareas,Kafka Send Connector se utiliza para ello, y por defecto está configurado de la siguiente manera:

ipf.htm.async.cancel-task {
  type = kafka

  kafka {
    producer {
      topic = HTM_CANCEL_TASK
    }
  }
}

Paso 3:HTM Configuración del Cliente

Receive Connector s

Receive connector s, para apoyar el registro y la cancelación HTM las tareas se suministran en el `htm-receive-connector-core-starter` módulo.

Cuando el ipf.htm.async.register-task.enabled está configurado para true, un conector de recepción para procesar solicitudes de tareas registradas está disponible.

De manera similar, cuando ipf.htm.async.cancel-task.enabled está configurado para true, un conector receptor para procesar solicitudes de cancelación de tareas está disponible.

HTTP Conectores

Mientras notifications de tareas completadas se reciben como Kafka mensajes, solicitudes para crear mensajes en HTM se realizan generalmente a través de HTTP (registrando HTM las tareas también podrían realizarse a través de Kafka).

Por defecto, todos HTM Los conectores de solicitud-respuesta están deshabilitados por defecto y deben ser habilitados explícitamente a través de la configuración:

ipf.htm.request-reply.starter {
  task-summaries.enabled = true
  task-details.enabled = true
  task-history.enabled = true
  register-task.enabled = true
  assign-task.enabled = true
  execute-task.enabled = true
  approve-task.enabled = true
  reject-task.enabled = true
  cancel-task.enabled = true
  execute-bulk-tasks.enabled = true
  bulk-details.enabled = true
}

El HTTP la configuración del cliente también debe ser anulada. Estos son los valores predeterminados:

ipf.htm.request-reply.starter {
  http {
    client {
      host = "localhost"
      endpoint-url = "/tasks"
      port = 8080
    }
  }
}

Finalmente, estos Conectores vienen con configuraciones de resiliencia proporcionadas, las cuales también pueden ser anuladas. Aquí están los valores predeterminados:

ipf.htm.request-reply.starter {
  call-timeout = 2s
  resiliency-settings {
    enabled = true
    minimum-number-of-calls = 50
    max-attempts = 5
    reset-timeout = 1s
    initial-retry-wait-duration = 1s
    backoff-multiplier = 2
    retry-on-failure-when = true
    retry-on-result-when = false
    retryable-status-codes = []
  }
}

Indexación

HTM tiene una configuración predeterminada para crear automáticamente MongoDB índices en la inicialización.

La creación de índices predeterminados puede ser desactivada con lo siguiente:

ipf.htm.mongodb.create-indexes=false

Los índices pueden ser deshabilitados globalmente con:

ipf.mongodb.create-indexes=false

Para deshabilitar la indexación globalmente pero mantenerla para HTM, aplique lo siguiente, manteniendo el orden:

ipf.mongodb.create-indexes=false
ipf.htm.mongodb.create-indexes=true

Compromiso de Quórum

El quórum de confirmación puede ser controlado de manera similar con:

ipf.htm.mongodb.commit-quorum=1

O bien anulado globalmente con:

ipf.mongodb.commit-quorum=1

Para establecer un quórum de confirmación diferente a nivel global para el HTM uno(s), aplique lo siguiente, manteniendo el orden:

ipf.mongodb.commit-quorum="votingMembers"
htm.task-history-repository.commit-quorum=1
htm.task-idempotency-cache.commit-quorum=1
htm.task-repository.commit-quorum=1

Paso 4: Ejecute Varios HTM Operaciones

Las operaciones del conector están disponibles a través de la htm-client-sdk-request-reply-connector-starter dependencia que usted añadió en el paso 0. Esta dependencia crea un HtmClientSdk implementación para usted. La interfaz es:

public interface HtmClientSdk {
    CompletionStage<Response<RegisterTaskResponse>> registerTask(ProcessingContext processingContext, RegisterTaskRequest request);
    CompletionStage<Response<ResponseModel<CancelTaskResponse>>> cancelTask(ProcessingContext processingContext, String taskId, CancelTaskRequest request);
    CompletionStage<Response<GetTaskSummariesResponse>> getTaskSummaries(ProcessingContext processingContext, GetTaskSummariesParams getTaskSummariesParams);
    CompletionStage<Response<ResponseModel<GetTaskDetailsResponse>>> getTaskDetails(ProcessingContext processingContext, String taskId);
    CompletionStage<Response<ResponseModel<GetTaskHistoryResponse>>> getTaskHistory(ProcessingContext processingContext, String taskId);
    CompletionStage<Response<ResponseModel<AssignTaskResponse>>> assignTask(ProcessingContext processingContext, WrapperRequest<AssignTaskRequest> assignTaskRequest);
    CompletionStage<Response<ResponseModel<ExecuteTaskResponse>>> executeTask(ProcessingContext processingContext, WrapperRequest<ExecuteTaskRequest> executeTaskRequest);
    CompletionStage<Response<ResponseModel<ApproveTaskResponse>>> approveTask(ProcessingContext processingContext, WrapperRequest<ApproveTaskRequest> approveTaskRequest);
    CompletionStage<Response<ResponseModel<RejectTaskResponse>>> rejectTask(ProcessingContext processingContext, WrapperRequest<RejectTaskRequest> rejectTaskRequest);
    CompletionStage<Response<ResponseModel<BulkProcessingAcceptedResponse>>> executeBulkTasks(ProcessingContext processingContext, ExecuteBulkRequest executeBulkRequest);
    CompletionStage<Response<ResponseModel<GetBulkDetailsResponse>>> getBulkDetails(ProcessingContext processingContext, String bulkId);
}

Tenga en cuenta que para utilizar diversas funciones, estas deben ser habilitadas primero según el paso 3 anterior.

Finalmente, puede inyectar esta dependencia como un Spring bean(por ejemplo, utilizando @Autowire) y utilícelo en su código:

import com.iconsolutions.ipf.htm.adapter. HumanTaskManagerAdapter;

public class MyHtmClient {

    private final HtmClientSdk htmClientSdk;

    public MyHtmClient(HtmClientSdk htmClientSdk) {
        this.htmClientSdk = htmClientSdk;
    }

    public void createTask() {
        htmClientSdk.registerTask(..)
            .toCompletableFuture().join();
    }
}

Gestionando HTM tareas a través de Kafka

Registro y Cancelación HTM las tareas también podrían realizarse a través de Kafka. Para poder hacer esto, es necesario añadir htm-client-sdk-async-connector-kafka-starter dependencia (como ya se mencionó en el paso 0):

Esta dependencia expone HtmAsyncClientSdk Spring Bean, teniendo un registerTask(ProcessingContext processingContext, RegisterTaskRequest registerTaskRequest) método, y un cancelTask(ProcessingContext processingContext, String taskId, CancelTaskRequest cancelTaskRequest) método.

El registerTask el método debe ser llamado para registrar una tarea de manera asíncrona; devuelve el TaskId de HTM Tarea registrada como respuesta.

Asimismo, el cancelTask El método debe ser llamado para cancelar una tarea de manera asíncrona; toma el TaskId que se va a cancelar como parámetro y lo devuelve una vez que se ha ejecutado con éxito.

Ejemplos de uso en el código del cliente:

public class MyAsyncHtmRequestSender {

    private final HtmAsyncClientSdk htmAsyncClientSdk;

    public MyAsyncHtmRequestSender(HtmAsyncClientSdk htmAsyncClientSdk) {
        this.htmAsyncClientSdk = htmAsyncClientSdk;
    }

    public CompletionStage<String> registerTask(ProcessingContext processingContext, RegisterTaskRequest registerTaskRequest) {
        return htmAsyncClientSdk.registerTask(processingContext, registerTaskRequest);
    }

    public CompletionStage<String> cancelTask(ProcessingContext processingContext, String taskId, CancelTaskRequest cancelTaskRequest) {
        return htmAsyncClientSdk.cancelTask(processingContext, taskId, registerTaskRequest);
    }
}

Paso 5: Despliegue

HTM Los conectores pueden configurarse para realizar reintentos y utilizar el patrón de cortocircuito para proteger el HTM HTTP servidor (para más información sobre cómo configurar esto, consulte el xref:#connectors[sección de conectores]).

Configuración que se requiere para el despliegue a Kubernetes:

 akka {
   remote.artery.canonical.hostname = ${POD_IP}
 # Use Kubernetes API to discover the cluster
   discovery {
     kubernetes-api {
       pod-label-selector = "app=%s"
     }
   }

   management {
 # use the Kubernetes API to create the cluster
     cluster.bootstrap {
       contact-point-discovery {
         discovery-method          = kubernetes-api
         service-name              = ${AKKA_CLUSTER_BOOTSTRAP_SERVICE_NAME}
         required-contact-point-nr = 1
         required-contact-point-nr = ${?REQUIRED_CONTACT_POINT_NR}
       }
     }
 # available from Akka management >= 1.0.0
     health-checks {
       readiness-path  = "health/ready"
       liveness-path   = "health/alive"
     }
   }

   actor {
     provider = cluster
   }

   cluster {
     seed-nodes = []
     downing-provider-class = "akka.cluster.sbr.SplitBrainResolverProvider"
     split-brain-resolver {
       active-strategy = keep-majority
       stable-after = 20s
     }
     sharding {
       remember-entities = off
       handoff-timeout = 8s
       least-shard-allocation-strategy.rebalance-absolute-limit = 20
       rebalance-interval = 2s
       number-of-shards = 100
     }
   }
}

Event configuración del procesador

Por defecto, el Human Task Manager la aplicación registra 4 event procesadores:

  • Tarea leer lado event procesador

  • Historial de tareas-lado de lectura event procesador

  • Ejecución masiva event procesador

  • IPF Processing Data Salida

Estos procesadores registran su propio event flujos de procesador, que consumen más Unidades de Solicitud (RUs) en Azure CosmosDB para MongoDB.
Para ahorrar algunos RUs en Azure CosmosDB, el Delegating Event El procesador puede ser utilizado.

En este caso solo uno event el procesador consume events de la revista y luego delegue events a otros registrados event procesadores.

La desventaja de la Delegación Event El procesador se puede encontrar en su manejo de errores. Si un event no logra procesar en ninguno de los delegados event procesadores, el mismo event será reintentado por todos los delegados event procesadores, incluso si fue procesado con éxito por otros. Sin embargo, cuando hay tres independientes event procesadores, el que falla event solo será reintentado por el event procesador donde se realizó el procesamiento anterior failed.

Puede habilitar la Delegación Event Procesador utilizando la siguiente configuración:

ipf.htm.event-processor.delegating.enabled = true

¡Felicidades! Ahora está creando y siendo notificado de tareas humanas en IPF.

Cambio seguro hacia y desde la delegación event El procesamiento es una tarea compleja que requiere tiempo de inactividad y una migración cuidadosa de los desplazamientos comprometidos en el mongoOffsets colección. Es recomendable realizar algunas pruebas en entornos de prueba antes de intentar una en producción.