Primeros pasos
Este documento explicará cómo configurarse para usar Human Task Manager (HTM) con IPF.
Paso 0: Dependencia
Agregue las siguientes dependencias al pom.xml de su aplicación:
<!--Allows for your application to create tasks in HTM-->
<dependency>
<groupId>com.iconsolutions.ipf.htm</groupId>
<artifactId>htm-client-sdk-request-reply-connector-starter</artifactId>
</dependency>
<!--Allows for your app to receive notifications (over Kafka) when tasks are marked as completed-->
<dependency>
<groupId>com.iconsolutions.ipf.htm</groupId>
<artifactId>htm-client-sdk-notification-kafka-starter</artifactId>
</dependency>
<!--Optional dependency, allows for your application to create tasks in HTM by sending Kafka messages-->
<dependency>
<groupId>com.iconsolutions.ipf.htm</groupId>
<artifactId>htm-client-sdk-async-connector-kafka-starter</artifactId>
</dependency>
| Las versiones explícitas de HTM no son necesarias si está importando el IPF BOM |
Paso 1: Registre un manejador para ser notificado de tareas
Cuando se reciben notificaciones por Kafka de que las tareas han sido marcadas como completadas, el Receive Connector que recibe estos mensajes buscará una interfaz llamada HtmClientReceiveNotificationPort, que tiene la siguiente firma:
public interface HtmClientReceiveNotificationPort {
default ProcessingContext determineContextFor(TransportMessage message) {
var messageHeaders = message.getMessageHeaders();
var builder = ProcessingContext.builder()
.associationId(messageHeaders.getHeader("associationId").map(Object::toString).orElse(null))
.unitOfWorkId(messageHeaders.getHeader("uowId").map(Object::toString).orElse(null));
messageHeaders.getHeader("requestId").map(Object::toString).ifPresent(builder::clientRequestId);
messageHeaders.getHeader("processingEntity").map(Object::toString).ifPresent(builder::clientRequestId);
return builder.build();
}
CompletionStage<Void> receiveNotification(ReceivingContext receivingContext, TaskClosedNotification taskClosedNotification);
}
Si no hay una implementación disponible para esta interfaz, el contexto de Spring no se iniciará y fallará con el siguiente error:
Caused by: org.springframework.beans.factory.NoSuchBeanDefinitionException: No qualifying bean of type 'com.iconsolutions.ipf.htm.client.receive.sdk.HtmClientReceiveNotificationPort' available: expected at least 1 bean which qualifies as autowire candidate
Solo debería necesitar implementar receiveNotification como se muestra a continuación:
public class HtmTaskClosedNotificationReceiveAdapter implements HtmClientReceiveNotificationPort {
@Override
public CompletionStage<Void> receiveNotification(ReceivingContext receivingContext, TaskClosedNotification taskClosedNotification) {
return HtmclientmodelDomain.humanTaskManager().handle(
new TaskClosedNotificationInput.Builder(receivingContext.getProcessingContext().getAssociationId().getValue())
.withTaskClosedResult(toTaskClosedNotification(taskClosedNotification))
.build())
.thenAccept(done -> log.info("Completed {}", done));
}
}
El ejemplo anterior muestra que llamamos a un flujo de IPF para continuar su ejecución; este es el tipo típico de uso para una aplicación que utiliza HTM, pero cualquier otra implementación puede conectarse aquí en su lugar.
Paso 2: Configuración de Kafka
El Kafka Receive Connector se configura de la siguiente manera:
akka {
kafka {
consumer {
kafka-clients {
bootstrap.servers = "kafka:9092"
}
restart-settings = ${common-flow-restart-settings}
}
}
}
htm {
kafka {
consumer {
topic = HTM_TASK_CLOSED_NOTIFICATION
restart-settings = ${common-flow-restart-settings}
kafka-clients {
group.id = htm-task-closed-notification-group
}
}
}
}
Si desea usar Kafka para registrar tareas de HTM, se utiliza Kafka Send Connector para ello, y por defecto está configurado de la siguiente manera:
ipf.htm.async.register-task {
type = kafka
kafka {
producer {
topic = HTM_REGISTER_TASK
}
}
}
Paso 3: Configuración del cliente HTM
Conectores HTTP
Si bien las notificaciones de tareas completadas se reciben como mensajes de Kafka, las solicitudes para crear mensajes en HTM generalmente se realizan a través de HTTP (el registro de tareas de HTM también podría realizarse a través de Kafka).
De forma predeterminada, todos los HTM Request-Reply Connectors están deshabilitados y deben habilitarse explícitamente mediante configuración:
ipf.htm.request-reply.starter {
task-summaries.enabled = true
task-details.enabled = true
task-history.enabled = true
register-task.enabled = true
assign-task.enabled = true
execute-task.enabled = true
approve-task.enabled = true
reject-task.enabled = true
cancel-task.enabled = true
execute-bulk-tasks.enabled = true
bulk-details.enabled = true
}
La configuración del cliente HTTP también debe sobrescribirse. Estos son los valores predeterminados:
ipf.htm.request-reply.starter {
http {
client {
host = "localhost"
endpoint-url = "/tasks"
port = 8080
}
}
}
Finalmente, estos Connectors vienen con configuraciones de resiliencia proporcionadas, que también pueden sobrescribirse. Estos son los valores predeterminados:
ipf.htm.request-reply.starter {
call-timeout = 2s
resiliency-settings {
enabled = true
minimum-number-of-calls = 50
max-attempts = 5
reset-timeout = 1s
initial-retry-wait-duration = 1s
backoff-multiplier = 2
retry-on-failure-when = true
retry-on-result-when = false
retryable-status-codes = []
}
}
Indexación
HTM tiene una configuración predeterminada para crear automáticamente índices de MongoDB en la inicialización.
La creación de índices predeterminados puede deshabilitarse con lo siguiente:
ipf.htm.mongodb.create-indexes=false
Los índices pueden deshabilitarse globalmente con:
ipf.mongodb.create-indexes=false
Para deshabilitar la indexación globalmente pero mantenerla para HTM, aplique lo siguiente, manteniendo el orden:
ipf.mongodb.create-indexes=false
ipf.htm.mongodb.create-indexes=true
Commit Quorum
El commit quorum puede controlarse de manera similar con:
ipf.htm.mongodb.commit-quorum=1
O sobrescribirse globalmente con:
ipf.mongodb.commit-quorum=1
Para establecer un commit quorum global diferente al de HTM, aplique lo siguiente, manteniendo el orden:
ipf.mongodb.commit-quorum="votingMembers"
htm.task-history-repository.commit-quorum=1
htm.task-idempotency-cache.commit-quorum=1
htm.task-repository.commit-quorum=1
Paso 4: Ejecutar varias operaciones de HTM
Las operaciones del conector están disponibles a través de la dependencia htm-client-sdk-request-reply-connector-starter que agregó en el paso 0. Esta dependencia crea una implementación de HtmClientSdk para usted.
La interfaz es:
public interface HtmClientSdk {
CompletionStage<Response<RegisterTaskResponse>> registerTask(ProcessingContext processingContext, RegisterTaskRequest request);
CompletionStage<Response<ResponseModel<CancelTaskResponse>>> cancelTask(ProcessingContext processingContext, String taskId, CancelTaskRequest request);
CompletionStage<Response<GetTaskSummariesResponse>> getTaskSummaries(ProcessingContext processingContext, GetTaskSummariesParams getTaskSummariesParams);
CompletionStage<Response<ResponseModel<GetTaskDetailsResponse>>> getTaskDetails(ProcessingContext processingContext, String taskId);
CompletionStage<Response<ResponseModel<GetTaskHistoryResponse>>> getTaskHistory(ProcessingContext processingContext, String taskId);
CompletionStage<Response<ResponseModel<AssignTaskResponse>>> assignTask(ProcessingContext processingContext, WrapperRequest<AssignTaskRequest> assignTaskRequest);
CompletionStage<Response<ResponseModel<ExecuteTaskResponse>>> executeTask(ProcessingContext processingContext, WrapperRequest<ExecuteTaskRequest> executeTaskRequest);
CompletionStage<Response<ResponseModel<ApproveTaskResponse>>> approveTask(ProcessingContext processingContext, WrapperRequest<ApproveTaskRequest> approveTaskRequest);
CompletionStage<Response<ResponseModel<RejectTaskResponse>>> rejectTask(ProcessingContext processingContext, WrapperRequest<RejectTaskRequest> rejectTaskRequest);
CompletionStage<Response<ResponseModel<BulkProcessingAcceptedResponse>>> executeBulkTasks(ProcessingContext processingContext, ExecuteBulkRequest executeBulkRequest);
CompletionStage<Response<ResponseModel<GetBulkDetailsResponse>>> getBulkDetails(ProcessingContext processingContext, String bulkId);
}
Tenga en cuenta que, para usar varias funciones, primero deben habilitarse según el paso 3 anterior.
Finalmente, puede inyectar esta dependencia como un bean de Spring (por ejemplo, usando @Autowire) y usarla en su código:
import com.iconsolutions.ipf.htm.adapter.HumanTaskManagerAdapter;
public class MyHtmClient {
private final HtmClientSdk htmClientSdk;
public MyHtmClient(HtmClientSdk htmClientSdk) {
this.htmClientSdk = htmClientSdk;
}
public void createTask() {
htmClientSdk.registerTask(...)
.toCompletableFuture().join();
}
}
Registrar tareas de HTM vía Kafka
El registro de tareas de HTM también podría realizarse a través de Kafka.
Para poder hacer esto, es necesario agregar la dependencia htm-client-sdk-async-connector-kafka-starter (como ya se mencionó en el paso 0):
Esta dependencia expone el Spring Bean HtmAsyncClientSdk, que tiene un método registerTask(ProcessingContext processingContext, RegisterTaskRequest registerTaskRequest). Este método debe llamarse para registrar una tarea de forma asíncrona; devuelve el TaskId de la tarea de HTM que se está registrando como respuesta. Un ejemplo de uso en el código del cliente:
public class MyAsyncHtmRequestSender {
private final HtmAsyncClientSdk htmAsyncClientSdk;
public MyAsyncHtmRequestSender(HtmAsyncClientSdk htmAsyncClientSdk) {
this.htmAsyncClientSdk = htmAsyncClientSdk;
}
public CompletionStage<String> registerTask(ProcessingContext processingContext, RegisterTaskRequest registerTaskRequest) {
return htmAsyncClientSdk.registerTask(processingContext, registerTaskRequest);
}
}
Paso 5: Despliegue
Los conectores de HTM se pueden configurar para realizar reintentos y utilizar el patrón de circuit breaker para proteger el servidor HTM HTTP (para obtener más información sobre cómo configurar esto, consulte la sección de conectores).
Configuración requerida para el despliegue en Kubernetes:
akka {
remote.artery.canonical.hostname = ${POD_IP}
# Use Kubernetes API to discover the cluster
discovery {
kubernetes-api {
pod-label-selector = "app=%s"
}
}
management {
# use the Kubernetes API to create the cluster
cluster.bootstrap {
contact-point-discovery {
discovery-method = kubernetes-api
service-name = ${AKKA_CLUSTER_BOOTSTRAP_SERVICE_NAME}
required-contact-point-nr = 1
required-contact-point-nr = ${?REQUIRED_CONTACT_POINT_NR}
}
}
# available from Akka management >= 1.0.0
health-checks {
readiness-path = "health/ready"
liveness-path = "health/alive"
}
}
actor {
provider = cluster
}
cluster {
seed-nodes = []
downing-provider-class = "akka.cluster.sbr.SplitBrainResolverProvider"
split-brain-resolver {
active-strategy = keep-majority
stable-after = 20s
}
sharding {
remember-entities = off
handoff-timeout = 8s
least-shard-allocation-strategy.rebalance-absolute-limit = 20
rebalance-interval = 2s
number-of-shards = 100
}
}
}
Configuración del event processor
De forma predeterminada, la aplicación Human Task Manager registra 4 event processors:
-
Task read side event processor
-
Task history read side event processor
-
Bulk execution event processor
-
IPF Processing Data Egress
| Estos processors registran sus propios event processor streams, lo que consume más Request Units (RUs) en Azure CosmosDB for MongoDB. |
| Para ahorrar algunas RUs en Azure CosmosDB, se puede usar el Delegating Event Processor. |
En este caso, solo un event processor consume eventos del journal y luego delega eventos a otros event processors registrados.
La desventaja del Delegating Event Processor puede encontrarse en su manejo de errores. Si un evento falla al procesarse en cualquiera de los event processors delegados, el mismo evento se reintentará por todos los event processors delegados, incluso si fue procesado correctamente por otros. Sin embargo, cuando hay tres event processors independientes, el evento que falla solo será reintentado por el event processor donde falló el procesamiento previo.
Puede habilitar el Delegating Event Processor utilizando la siguiente configuración:
ipf.htm.event-processor.delegating.enabled = true
¡Felicidades! Ahora está creando y siendo notificado de tareas humanas en IPF.
Cambiar de manera segura hacia y desde el delegating event processor es una tarea compleja que requiere tiempo de inactividad y una migración cuidadosa de los offsets confirmados en la colección mongoOffsets.
Se recomienda hacer algunas ejecuciones de práctica en entornos de prueba antes de intentar hacerlo en producción.
|