Documentation for a newer release is available. View Latest

Primeros pasos

Este documento explicará cómo configurarse para usar Human Task Manager (HTM) con IPF.

Paso 0: Dependencia

Agregue las siguientes dependencias al pom.xml de su aplicación:

<!--Allows for your application to create tasks in HTM-->
<dependency>
    <groupId>com.iconsolutions.ipf.htm</groupId>
    <artifactId>htm-client-sdk-request-reply-connector-starter</artifactId>
</dependency>
<!--Allows for your app to receive notifications (over Kafka) when tasks are marked as completed-->
<dependency>
    <groupId>com.iconsolutions.ipf.htm</groupId>
    <artifactId>htm-client-sdk-notification-kafka-starter</artifactId>
</dependency>

<!--Optional dependency, allows for your application to create tasks in HTM by sending Kafka messages-->
<dependency>
    <groupId>com.iconsolutions.ipf.htm</groupId>
    <artifactId>htm-client-sdk-async-connector-kafka-starter</artifactId>
</dependency>
Las versiones explícitas de HTM no son necesarias si está importando el IPF BOM

Paso 1: Registre un manejador para ser notificado de tareas

Cuando se reciben notificaciones por Kafka de que las tareas han sido marcadas como completadas, el Receive Connector que recibe estos mensajes buscará una interfaz llamada HtmClientReceiveNotificationPort, que tiene la siguiente firma:

public interface HtmClientReceiveNotificationPort {
    default ProcessingContext determineContextFor(TransportMessage message) {
        var messageHeaders = message.getMessageHeaders();
        var builder = ProcessingContext.builder()
                .associationId(messageHeaders.getHeader("associationId").map(Object::toString).orElse(null))
                .unitOfWorkId(messageHeaders.getHeader("uowId").map(Object::toString).orElse(null));
        messageHeaders.getHeader("requestId").map(Object::toString).ifPresent(builder::clientRequestId);
        messageHeaders.getHeader("processingEntity").map(Object::toString).ifPresent(builder::clientRequestId);
        return builder.build();
    }
    CompletionStage<Void> receiveNotification(ReceivingContext receivingContext, TaskClosedNotification taskClosedNotification);
}

Si no hay una implementación disponible para esta interfaz, el contexto de Spring no se iniciará y fallará con el siguiente error:

Caused by: org.springframework.beans.factory.NoSuchBeanDefinitionException: No qualifying bean of type 'com.iconsolutions.ipf.htm.client.receive.sdk.HtmClientReceiveNotificationPort' available: expected at least 1 bean which qualifies as autowire candidate

Solo debería necesitar implementar receiveNotification como se muestra a continuación:

public class HtmTaskClosedNotificationReceiveAdapter implements HtmClientReceiveNotificationPort {

    @Override
    public CompletionStage<Void> receiveNotification(ReceivingContext receivingContext, TaskClosedNotification taskClosedNotification) {
        return HtmclientmodelDomain.humanTaskManager().handle(
                        new TaskClosedNotificationInput.Builder(receivingContext.getProcessingContext().getAssociationId().getValue())
                                .withTaskClosedResult(toTaskClosedNotification(taskClosedNotification))
                                .build())
                .thenAccept(done -> log.info("Completed {}", done));
    }
}

El ejemplo anterior muestra que llamamos a un flujo de IPF para continuar su ejecución; este es el tipo típico de uso para una aplicación que utiliza HTM, pero cualquier otra implementación puede conectarse aquí en su lugar.

Paso 2: Configuración de Kafka

El Kafka Receive Connector se configura de la siguiente manera:

akka {
  kafka {
    consumer {
      kafka-clients {
        bootstrap.servers = "kafka:9092"
      }
      restart-settings = ${common-flow-restart-settings}

    }
  }
}

htm {
  kafka {
    consumer {
      topic = HTM_TASK_CLOSED_NOTIFICATION
      restart-settings = ${common-flow-restart-settings}
      kafka-clients {
        group.id = htm-task-closed-notification-group
      }
    }
  }
}

Si desea usar Kafka para registrar tareas de HTM, se utiliza Kafka Send Connector para ello, y por defecto está configurado de la siguiente manera:

ipf.htm.async.register-task {
  type = kafka

  kafka {
    producer {
      topic = HTM_REGISTER_TASK
    }
  }
}

Paso 3: Configuración del cliente HTM

Conectores HTTP

Si bien las notificaciones de tareas completadas se reciben como mensajes de Kafka, las solicitudes para crear mensajes en HTM generalmente se realizan a través de HTTP (el registro de tareas de HTM también podría realizarse a través de Kafka).

De forma predeterminada, todos los HTM Request-Reply Connectors están deshabilitados y deben habilitarse explícitamente mediante configuración:

ipf.htm.request-reply.starter {
  task-summaries.enabled = true
  task-details.enabled = true
  task-history.enabled = true
  register-task.enabled = true
  assign-task.enabled = true
  execute-task.enabled = true
  approve-task.enabled = true
  reject-task.enabled = true
  cancel-task.enabled = true
  execute-bulk-tasks.enabled = true
  bulk-details.enabled = true
}

La configuración del cliente HTTP también debe sobrescribirse. Estos son los valores predeterminados:

ipf.htm.request-reply.starter {
  http {
    client {
      host = "localhost"
      endpoint-url = "/tasks"
      port = 8080
    }
  }
}

Finalmente, estos Connectors vienen con configuraciones de resiliencia proporcionadas, que también pueden sobrescribirse. Estos son los valores predeterminados:

ipf.htm.request-reply.starter {
  call-timeout = 2s
  resiliency-settings {
    enabled = true
    minimum-number-of-calls = 50
    max-attempts = 5
    reset-timeout = 1s
    initial-retry-wait-duration = 1s
    backoff-multiplier = 2
    retry-on-failure-when = true
    retry-on-result-when = false
    retryable-status-codes = []
  }
}

Indexación

HTM tiene una configuración predeterminada para crear automáticamente índices de MongoDB en la inicialización.

La creación de índices predeterminados puede deshabilitarse con lo siguiente:

ipf.htm.mongodb.create-indexes=false

Los índices pueden deshabilitarse globalmente con:

ipf.mongodb.create-indexes=false

Para deshabilitar la indexación globalmente pero mantenerla para HTM, aplique lo siguiente, manteniendo el orden:

ipf.mongodb.create-indexes=false
ipf.htm.mongodb.create-indexes=true

Commit Quorum

El commit quorum puede controlarse de manera similar con:

ipf.htm.mongodb.commit-quorum=1

O sobrescribirse globalmente con:

ipf.mongodb.commit-quorum=1

Para establecer un commit quorum global diferente al de HTM, aplique lo siguiente, manteniendo el orden:

ipf.mongodb.commit-quorum="votingMembers"
htm.task-history-repository.commit-quorum=1
htm.task-idempotency-cache.commit-quorum=1
htm.task-repository.commit-quorum=1

Paso 4: Ejecutar varias operaciones de HTM

Las operaciones del conector están disponibles a través de la dependencia htm-client-sdk-request-reply-connector-starter que agregó en el paso 0. Esta dependencia crea una implementación de HtmClientSdk para usted. La interfaz es:

public interface HtmClientSdk {
    CompletionStage<Response<RegisterTaskResponse>> registerTask(ProcessingContext processingContext, RegisterTaskRequest request);
    CompletionStage<Response<ResponseModel<CancelTaskResponse>>> cancelTask(ProcessingContext processingContext, String taskId, CancelTaskRequest request);
    CompletionStage<Response<GetTaskSummariesResponse>> getTaskSummaries(ProcessingContext processingContext, GetTaskSummariesParams getTaskSummariesParams);
    CompletionStage<Response<ResponseModel<GetTaskDetailsResponse>>> getTaskDetails(ProcessingContext processingContext, String taskId);
    CompletionStage<Response<ResponseModel<GetTaskHistoryResponse>>> getTaskHistory(ProcessingContext processingContext, String taskId);
    CompletionStage<Response<ResponseModel<AssignTaskResponse>>> assignTask(ProcessingContext processingContext, WrapperRequest<AssignTaskRequest> assignTaskRequest);
    CompletionStage<Response<ResponseModel<ExecuteTaskResponse>>> executeTask(ProcessingContext processingContext, WrapperRequest<ExecuteTaskRequest> executeTaskRequest);
    CompletionStage<Response<ResponseModel<ApproveTaskResponse>>> approveTask(ProcessingContext processingContext, WrapperRequest<ApproveTaskRequest> approveTaskRequest);
    CompletionStage<Response<ResponseModel<RejectTaskResponse>>> rejectTask(ProcessingContext processingContext, WrapperRequest<RejectTaskRequest> rejectTaskRequest);
    CompletionStage<Response<ResponseModel<BulkProcessingAcceptedResponse>>> executeBulkTasks(ProcessingContext processingContext, ExecuteBulkRequest executeBulkRequest);
    CompletionStage<Response<ResponseModel<GetBulkDetailsResponse>>> getBulkDetails(ProcessingContext processingContext, String bulkId);
}

Tenga en cuenta que, para usar varias funciones, primero deben habilitarse según el paso 3 anterior.

Finalmente, puede inyectar esta dependencia como un bean de Spring (por ejemplo, usando @Autowire) y usarla en su código:

import com.iconsolutions.ipf.htm.adapter.HumanTaskManagerAdapter;

public class MyHtmClient {

    private final HtmClientSdk htmClientSdk;

    public MyHtmClient(HtmClientSdk htmClientSdk) {
        this.htmClientSdk = htmClientSdk;
    }

    public void createTask() {
        htmClientSdk.registerTask(...)
            .toCompletableFuture().join();
    }
}

Registrar tareas de HTM vía Kafka

El registro de tareas de HTM también podría realizarse a través de Kafka. Para poder hacer esto, es necesario agregar la dependencia htm-client-sdk-async-connector-kafka-starter (como ya se mencionó en el paso 0):

Esta dependencia expone el Spring Bean HtmAsyncClientSdk, que tiene un método registerTask(ProcessingContext processingContext, RegisterTaskRequest registerTaskRequest). Este método debe llamarse para registrar una tarea de forma asíncrona; devuelve el TaskId de la tarea de HTM que se está registrando como respuesta. Un ejemplo de uso en el código del cliente:

public class MyAsyncHtmRequestSender {

    private final HtmAsyncClientSdk htmAsyncClientSdk;

    public MyAsyncHtmRequestSender(HtmAsyncClientSdk htmAsyncClientSdk) {
        this.htmAsyncClientSdk = htmAsyncClientSdk;
    }

    public CompletionStage<String> registerTask(ProcessingContext processingContext, RegisterTaskRequest registerTaskRequest) {
        return htmAsyncClientSdk.registerTask(processingContext, registerTaskRequest);
    }
}

Paso 5: Despliegue

Los conectores de HTM se pueden configurar para realizar reintentos y utilizar el patrón de circuit breaker para proteger el servidor HTM HTTP (para obtener más información sobre cómo configurar esto, consulte la sección de conectores).

Configuración requerida para el despliegue en Kubernetes:

 akka {
   remote.artery.canonical.hostname = ${POD_IP}
 # Use Kubernetes API to discover the cluster
   discovery {
     kubernetes-api {
       pod-label-selector = "app=%s"
     }
   }

   management {
 # use the Kubernetes API to create the cluster
     cluster.bootstrap {
       contact-point-discovery {
         discovery-method          = kubernetes-api
         service-name              = ${AKKA_CLUSTER_BOOTSTRAP_SERVICE_NAME}
         required-contact-point-nr = 1
         required-contact-point-nr = ${?REQUIRED_CONTACT_POINT_NR}
       }
     }
 # available from Akka management >= 1.0.0
     health-checks {
       readiness-path  = "health/ready"
       liveness-path   = "health/alive"
     }
   }

   actor {
     provider = cluster
   }

   cluster {
     seed-nodes = []
     downing-provider-class = "akka.cluster.sbr.SplitBrainResolverProvider"
     split-brain-resolver {
       active-strategy = keep-majority
       stable-after = 20s
     }
     sharding {
       remember-entities = off
       handoff-timeout = 8s
       least-shard-allocation-strategy.rebalance-absolute-limit = 20
       rebalance-interval = 2s
       number-of-shards = 100
     }
   }
}

Configuración del event processor

De forma predeterminada, la aplicación Human Task Manager registra 4 event processors:

  • Task read side event processor

  • Task history read side event processor

  • Bulk execution event processor

  • IPF Processing Data Egress

Estos processors registran sus propios event processor streams, lo que consume más Request Units (RUs) en Azure CosmosDB for MongoDB.
Para ahorrar algunas RUs en Azure CosmosDB, se puede usar el Delegating Event Processor.

En este caso, solo un event processor consume eventos del journal y luego delega eventos a otros event processors registrados.

La desventaja del Delegating Event Processor puede encontrarse en su manejo de errores. Si un evento falla al procesarse en cualquiera de los event processors delegados, el mismo evento se reintentará por todos los event processors delegados, incluso si fue procesado correctamente por otros. Sin embargo, cuando hay tres event processors independientes, el evento que falla solo será reintentado por el event processor donde falló el procesamiento previo.

Puede habilitar el Delegating Event Processor utilizando la siguiente configuración:

ipf.htm.event-processor.delegating.enabled = true

¡Felicidades! Ahora está creando y siendo notificado de tareas humanas en IPF.

Cambiar de manera segura hacia y desde el delegating event processor es una tarea compleja que requiere tiempo de inactividad y una migración cuidadosa de los offsets confirmados en la colección mongoOffsets. Se recomienda hacer algunas ejecuciones de práctica en entornos de prueba antes de intentar hacerlo en producción.