Introducción

Este documento explicará cómo configurarse para utilizar Human Task Manager(HTM) con IPF.

Paso 0: Dependencia

Agregue las siguientes dependencias a su aplicación pom.xml:

<!--Allows for your application to create tasks in HTM-->
<dependency>
    <groupId>com.iconsolutions.ipf.htm</groupId>
    <artifactId>htm-client-sdk-request-reply-connector-starter</artifactId>
</dependency>
<!--Allows for your app to receive notifications (over Kafka) when tasks are marked as completed-->
<dependency>
    <groupId>com.iconsolutions.ipf.htm</groupId>
    <artifactId>htm-client-sdk-notification-kafka-starter</artifactId>
</dependency>

<!--Optional dependency, allows for your application to create tasks in HTM by sending Kafka messages-->
<dependency>
    <groupId>com.iconsolutions.ipf.htm</groupId>
    <artifactId>htm-client-sdk-async-connector-kafka-starter</artifactId>
</dependency>
<!--Optional dependency, allows for your application to receive tasks in HTM-->
<dependency>
    <groupId>com.iconsolutions.ipf.htm</groupId>
    <artifactId>htm-receive-connector-core-starter</artifactId>
</dependency>
Explícito HTM las versiones no son requeridas si usted está importando el IPF BOM

Paso 1: Registre un controlador para ser notificado de las tareas

Cuando se reciben notificaciones a través de Kafka que las tareas han sido marcadas como completadas, el Receive Connector que recibe estos mensajes buscará una interfaz llamada HtmClientReceiveNotificationPort, que tiene la siguiente firma:

public interface HtmClientReceiveNotificationPort {
    default ProcessingContext determineContextFor(TransportMessage message) {
        var messageHeaders = message.getMessageHeaders();
        var builder = ProcessingContext.builder()
                .associationId(messageHeaders.getHeader("associationId").map(Object::toString).orElse(null))
                .unitOfWorkId(messageHeaders.getHeader("uowId").map(Object::toString).orElse(null));
        messageHeaders.getHeader("requestId").map(Object::toString).ifPresent(builder::clientRequestId);
        messageHeaders.getHeader("processingEntity").map(Object::toString).ifPresent(builder::clientRequestId);
        return builder.build();
    }
    CompletionStage<Void> receiveNotification(ReceivingContext receivingContext, TaskClosedNotification taskClosedNotification);
}

Si no hay una implementación disponible para esta interfaz, el Spring el contexto no se iniciará y fallará con el siguiente error:

Caused by: org.springframework.beans.factory.NoSuchBeanDefinitionException: No qualifying bean of type 'com.iconsolutions.ipf.htm.client.receive.sdk.HtmClientReceiveNotificationPort' available: expected at least 1 bean which qualifies as autowire candidate

Solo debe implementar receiveNotification según lo siguiente:

public class HtmTaskClosedNotificationReceiveAdapter implements HtmClientReceiveNotificationPort {

    @Override
    public CompletionStage<Void> receiveNotification(ReceivingContext receivingContext, TaskClosedNotification taskClosedNotification) {
        return HtmclientmodelDomain.humanTaskManager().handle(
                        new TaskClosedNotificationInput.Builder(receivingContext.getProcessingContext().getAssociationId().getValue())
                                .withTaskClosedResult(toTaskClosedNotification(taskClosedNotification))
                                .build())
                .thenAccept(done -> log.info("Completed {}", done));
    }
}

El ejemplo anterior nos muestra llamando a un IPF flow para continuar su ejecución; este es el tipo típico de uso para una aplicación que utiliza HTM pero cualquier otra implementación puede ser conectada aquí en su lugar.

Paso 2: Kafka Configuración

Conectores de recepción

El Kafka Receive Connector está configurado de la siguiente manera:

akka {
  kafka {
    consumer {
      kafka-clients {
        bootstrap.servers = "kafka:9092"
      }
      restart-settings = ${common-flow-restart-settings}

    }
  }
}

htm {
  kafka {
    consumer {
      topic = HTM_TASK_CLOSED_NOTIFICATION
      restart-settings = ${common-flow-restart-settings}
      kafka-clients {
        group.id = htm-task-closed-notification-group
      }
    }
  }
}

Si desea utilizar Kafka para registrarse HTM tareas,Kafka Send Connector se utiliza para ello, y por defecto está configurado de la siguiente manera:

ipf.htm.async.register-task {
  type = kafka

  kafka {
    producer {
      topic = HTM_REGISTER_TASK
    }
  }
}

De manera similar, si usted desea utilizar Kafka para cancelar HTM tareas,Kafka Send Connector se utiliza para ello, y por defecto está configurado de la siguiente manera:

ipf.htm.async.cancel-task {
  type = kafka

  kafka {
    producer {
      topic = HTM_CANCEL_TASK
    }
  }
}

Paso 3:HTM Configuración del Cliente

Kafka Conectores

Además de recibir notificaciones de tareas completadas,Kafka puede ser utilizado para registrar y cancelar tareas.

Por defecto, ambos Kafka los conectores están habilitados. Para desactivarlos, simplemente establezca el indicador habilitado apropiado en false.

ipf.htm.async {
  register-task.enabled = true
  cancel-task.enabled = true
}

HTTP Conectores

Mientras que las notificaciones de tareas completadas siempre se reciben como Kafka mensajes, solicitudes para crear y cancelar tareas en HTM se puede hacer a través de ambos HTTP y Kafka, pero todas las demás operaciones están solamente disponibles a través de HTTP.

Por defecto, todos HTM Los conectores de solicitud-respuesta están deshabilitados y deben ser habilitados explícitamente a través de la configuración:

ipf.htm.request-reply.starter {
  task-summaries.enabled = true
  task-details.enabled = true
  task-history.enabled = true
  register-task.enabled = true
  assign-task.enabled = true
  execute-task.enabled = true
  approve-task.enabled = true
  reject-task.enabled = true
  cancel-task.enabled = true
  execute-bulk-tasks.enabled = true
  bulk-details.enabled = true
}

El HTTP la configuración del cliente también debe ser anulada. Estos son los valores predeterminados:

ipf.htm.request-reply.starter {
  http {
    client {
      host = "localhost"
      endpoint-url = "/tasks"
      port = 8080
    }
  }
}

Finalmente, estos Conectores vienen con configuraciones de resiliencia proporcionadas, las cuales también pueden ser anuladas. Aquí están los valores predeterminados:

ipf.htm.request-reply.starter {
  call-timeout = 2s
  resiliency-settings {
    enabled = true
    minimum-number-of-calls = 50
    max-attempts = 5
    reset-timeout = 1s
    initial-retry-wait-duration = 1s
    backoff-multiplier = 2
    retry-on-failure-when = true
    retry-on-result-when = false
    retryable-status-codes = []
  }
}

Indexación

HTMtiene una configuración predeterminada para crear automáticamente MongoDB índices en la inicialización.

La creación de índices predeterminados puede ser desactivada con lo siguiente:

ipf.htm.mongodb.create-indexes=false

Los índices pueden ser deshabilitados globalmente con:

ipf.mongodb.create-indexes=false

Para deshabilitar la indexación globalmente pero mantenerla para HTM, aplique lo siguiente, manteniendo el orden:

ipf.mongodb.create-indexes=false
ipf.htm.mongodb.create-indexes=true

Compromiso de Quórum

El quórum de compromiso puede ser controlado de manera similar con:

ipf.htm.mongodb.commit-quorum=1

O bien anulado globalmente con:

ipf.mongodb.commit-quorum=1

Para establecer un quórum de confirmación diferente a nivel global para el HTM uno(s), aplique lo siguiente, manteniendo el orden:

ipf.mongodb.commit-quorum="votingMembers"
htm.task-history-repository.commit-quorum=1
htm.task-idempotency-cache.commit-quorum=1
htm.task-repository.commit-quorum=1

Paso 4: Ejecute Varios HTM Operaciones

Las operaciones del conector están disponibles a través de la htm-client-sdk-request-reply-connector-starter dependencia que usted agregó en el paso 0. Esta dependencia crea un HtmClientSdk implementación para usted. La interfaz es:

public interface HtmClientSdk {
    CompletionStage<Response<RegisterTaskResponse>> registerTask(ProcessingContext processingContext, RegisterTaskRequest request);
    CompletionStage<Response<ResponseModel<CancelTaskResponse>>> cancelTask(ProcessingContext processingContext, String taskId, CancelTaskRequest request);
    CompletionStage<Response<GetTaskSummariesResponse>> getTaskSummaries(ProcessingContext processingContext, GetTaskSummariesParams getTaskSummariesParams);
    CompletionStage<Response<ResponseModel<GetTaskDetailsResponse>>> getTaskDetails(ProcessingContext processingContext, String taskId);
    CompletionStage<Response<ResponseModel<GetTaskHistoryResponse>>> getTaskHistory(ProcessingContext processingContext, String taskId);
    CompletionStage<Response<ResponseModel<AssignTaskResponse>>> assignTask(ProcessingContext processingContext, WrapperRequest<AssignTaskRequest> assignTaskRequest);
    CompletionStage<Response<ResponseModel<ExecuteTaskResponse>>> executeTask(ProcessingContext processingContext, WrapperRequest<ExecuteTaskRequest> executeTaskRequest);
    CompletionStage<Response<ResponseModel<ApproveTaskResponse>>> approveTask(ProcessingContext processingContext, WrapperRequest<ApproveTaskRequest> approveTaskRequest);
    CompletionStage<Response<ResponseModel<RejectTaskResponse>>> rejectTask(ProcessingContext processingContext, WrapperRequest<RejectTaskRequest> rejectTaskRequest);
    CompletionStage<Response<ResponseModel<BulkProcessingAcceptedResponse>>> executeBulkTasks(ProcessingContext processingContext, ExecuteBulkRequest executeBulkRequest);
    CompletionStage<Response<ResponseModel<GetBulkDetailsResponse>>> getBulkDetails(ProcessingContext processingContext, String bulkId);
}

Tenga en cuenta que para utilizar diversas funciones, estas deben ser habilitadas primero según el paso 3 anterior.

Finalmente, puede inyectar esta dependencia como un Spring bean(por ejemplo, utilizando @Autowire) y utilícelo en su código:

import com.iconsolutions.ipf.htm.adapter.HumanTaskManagerAdapter;

public class MyHtmClient {

    private final HtmClientSdk htmClientSdk;

    public MyHtmClient(HtmClientSdk htmClientSdk) {
        this.htmClientSdk = htmClientSdk;
    }

    public void createTask() {
        htmClientSdk.registerTask(...)
            .toCompletableFuture().join();
    }
}

Gestionando HTM tareas a través de Kafka

Registro y Cancelación HTM las tareas también podrían realizarse a través de Kafka. Para poder hacer esto, es necesario añadir htm-client-sdk-async-connector-kafka-starter dependencia (como ya se mencionó en el paso 0):

Esta dependencia expone HtmAsyncClientSdk Spring Bean, teniendo un registerTask(ProcessingContext processingContext, RegisterTaskRequest registerTaskRequest) método, y un cancelTask(ProcessingContext processingContext, String taskId, CancelTaskRequest cancelTaskRequest) método.

El registerTask método debe ser llamado para registrar una tarea de manera asíncrona; devuelve el TaskId of HTM Tarea registrada como respuesta.

Asimismo, el cancelTask método debe ser llamado para cancelar una tarea de manera asíncrona; toma el TaskId para ser cancelado como un parámetro, y lo devuelve una vez que se ha ejecutado con éxito.

Ejemplos de uso en el código del cliente:

public class MyAsyncHtmRequestSender {

    private final HtmAsyncClientSdk htmAsyncClientSdk;

    public MyAsyncHtmRequestSender(HtmAsyncClientSdk htmAsyncClientSdk) {
        this.htmAsyncClientSdk = htmAsyncClientSdk;
    }

    public CompletionStage<String> registerTask(ProcessingContext processingContext, RegisterTaskRequest registerTaskRequest) {
        return htmAsyncClientSdk.registerTask(processingContext, registerTaskRequest);
    }

    public CompletionStage<String> cancelTask(ProcessingContext processingContext, String taskId, CancelTaskRequest cancelTaskRequest) {
        return htmAsyncClientSdk.cancelTask(processingContext, taskId, registerTaskRequest);
    }
}

Paso 5: Despliegue

HTMLos conectores pueden configurarse para realizar reintentos y utilizar el patrón de cortacircuito para proteger el HTM HTTP servidor (para más información sobre cómo configurar esto, consulte el sección de configuración del cliente).

Configuración que se requiere para el despliegue a Kubernetes:

 akka {
   remote.artery.canonical.hostname = ${POD_IP}
 # Use Kubernetes API to discover the cluster
   discovery {
     kubernetes-api {
       pod-label-selector = "app=%s"
     }
   }

   management {
 # use the Kubernetes API to create the cluster
     cluster.bootstrap {
       contact-point-discovery {
         discovery-method          = kubernetes-api
         service-name              = ${AKKA_CLUSTER_BOOTSTRAP_SERVICE_NAME}
         required-contact-point-nr = 1
         required-contact-point-nr = ${?REQUIRED_CONTACT_POINT_NR}
       }
     }
 # available from Akka management >= 1.0.0
     health-checks {
       readiness-path  = "health/ready"
       liveness-path   = "health/alive"
     }
   }

   actor {
     provider = cluster
   }

   cluster {
     seed-nodes = []
     downing-provider-class = "akka.cluster.sbr.SplitBrainResolverProvider"
     split-brain-resolver {
       active-strategy = keep-majority
       stable-after = 20s
     }
     sharding {
       remember-entities = off
       handoff-timeout = 8s
       least-shard-allocation-strategy.rebalance-absolute-limit = 20
       rebalance-interval = 2s
       number-of-shards = 100
     }
   }
}

Event configuración del procesador

Por defecto, el Human Task Manager la aplicación registra 4 procesadores de eventos:

  • Tarea de lectura del procesador de eventos secundarios

  • Procesador de eventos del lado de lectura del historial de tareas

  • Procesador de eventos de ejecución masiva

  • IPF Processing Data Salida

Estos procesadores registran sus propios flujos de procesadores de eventos, lo que consume más Unidades de Solicitud (RUs) en Azure. CosmosDB para MongoDB.
Para ahorrar algunos RUs en Azure CosmosDB, el Delegante Event El procesador puede ser utilizado.

En este caso, solo un procesador de eventos consume eventos del diario y luego delega los eventos a otros procesadores de eventos registrados.

La desventaja de la Delegación Event El procesador se puede encontrar en su manejo de errores. Si un evento no se procesa en ninguno de los procesadores de eventos delegados, el mismo evento será reintentado por todos los procesadores de eventos delegados, incluso si fue procesado con éxito por otros. Sin embargo, cuando hay tres procesadores de eventos independientes, el evento fallido solo será reintentado por el procesador de eventos donde falló el procesamiento anterior.

Puede habilitar la delegación Event Procesador utilizando la siguiente configuración:

ipf.htm.event-processor.delegating.enabled = true

¡Felicidades! Ahora está creando y siendo notificado de tareas humanas en IPF.

Cambiar de forma segura hacia y desde el procesador de eventos delegante es una tarea compleja que requiere tiempo de inactividad y una migración cuidadosa de los desplazamientos comprometidos en el mongoOffsets colección. Es recomendable realizar algunas pruebas en entornos de prueba antes de intentar una en producción.