Introducción
Este documento explicará cómo configurarse para utilizar Human Task Manager(HTM) con IPF.
Paso 0: Dependencia
Agregue las siguientes dependencias a su aplicación pom.xml:
<!--Allows for your application to create tasks in HTM-->
<dependency>
<groupId>com.iconsolutions.ipf.htm</groupId>
<artifactId>htm-client-sdk-request-reply-connector-starter</artifactId>
</dependency>
<!--Allows for your app to receive notifications (over Kafka) when tasks are marked as completed-->
<dependency>
<groupId>com.iconsolutions.ipf.htm</groupId>
<artifactId>htm-client-sdk-notification-kafka-starter</artifactId>
</dependency>
<!--Optional dependency, allows for your application to create tasks in HTM by sending Kafka messages-->
<dependency>
<groupId>com.iconsolutions.ipf.htm</groupId>
<artifactId>htm-client-sdk-async-connector-kafka-starter</artifactId>
</dependency>
<!--Optional dependency, allows for your application to receive tasks in HTM-->
<dependency>
<groupId>com.iconsolutions.ipf.htm</groupId>
<artifactId>htm-receive-connector-core-starter</artifactId>
</dependency>
| Explícito HTM las versiones no son requeridas si usted está importando el IPF BOM |
Paso 1: Registre un controlador para ser notificado de las tareas
Cuando se reciben notificaciones a través de Kafka que las tareas han sido marcadas como completadas, el Receive Connector que recibe estos mensajes buscará una interfaz llamada HtmClientReceiveNotificationPort, que tiene la siguiente firma:
public interface HtmClientReceiveNotificationPort {
default ProcessingContext determineContextFor(TransportMessage message) {
var messageHeaders = message.getMessageHeaders();
var builder = ProcessingContext.builder()
.associationId(messageHeaders.getHeader("associationId").map(Object::toString).orElse(null))
.unitOfWorkId(messageHeaders.getHeader("uowId").map(Object::toString).orElse(null));
messageHeaders.getHeader("requestId").map(Object::toString).ifPresent(builder::clientRequestId);
messageHeaders.getHeader("processingEntity").map(Object::toString).ifPresent(builder::clientRequestId);
return builder.build();
}
CompletionStage<Void> receiveNotification(ReceivingContext receivingContext, TaskClosedNotification taskClosedNotification);
}
Si no hay una implementación disponible para esta interfaz, el Spring el contexto no se iniciará y fallará con el siguiente error:
Caused by: org.springframework.beans.factory.NoSuchBeanDefinitionException: No qualifying bean of type 'com.iconsolutions.ipf.htm.client.receive.sdk.HtmClientReceiveNotificationPort' available: expected at least 1 bean which qualifies as autowire candidate
Solo debe implementar receiveNotification según lo siguiente:
public class HtmTaskClosedNotificationReceiveAdapter implements HtmClientReceiveNotificationPort {
@Override
public CompletionStage<Void> receiveNotification(ReceivingContext receivingContext, TaskClosedNotification taskClosedNotification) {
return HtmclientmodelDomain.humanTaskManager().handle(
new TaskClosedNotificationInput.Builder(receivingContext.getProcessingContext().getAssociationId().getValue())
.withTaskClosedResult(toTaskClosedNotification(taskClosedNotification))
.build())
.thenAccept(done -> log.info("Completed {}", done));
}
}
El ejemplo anterior nos muestra llamando a un IPF flow para continuar su ejecución; este es el tipo típico de uso para una aplicación que utiliza HTM pero cualquier otra implementación puede ser conectada aquí en su lugar.
Paso 2: Kafka Configuración
Conectores de recepción
El Kafka Receive Connector está configurado de la siguiente manera:
akka {
kafka {
consumer {
kafka-clients {
bootstrap.servers = "kafka:9092"
}
restart-settings = ${common-flow-restart-settings}
}
}
}
htm {
kafka {
consumer {
topic = HTM_TASK_CLOSED_NOTIFICATION
restart-settings = ${common-flow-restart-settings}
kafka-clients {
group.id = htm-task-closed-notification-group
}
}
}
}
Si desea utilizar Kafka para registrarse HTM tareas,Kafka Send Connector se utiliza para ello, y por defecto está configurado de la siguiente manera:
ipf.htm.async.register-task {
type = kafka
kafka {
producer {
topic = HTM_REGISTER_TASK
}
}
}
De manera similar, si usted desea utilizar Kafka para cancelar HTM tareas,Kafka Send Connector se utiliza para ello, y por defecto está configurado de la siguiente manera:
ipf.htm.async.cancel-task {
type = kafka
kafka {
producer {
topic = HTM_CANCEL_TASK
}
}
}
Paso 3:HTM Configuración del Cliente
Kafka Conectores
Además de recibir notificaciones de tareas completadas,Kafka puede ser utilizado para registrar y cancelar tareas.
Por defecto, ambos Kafka los conectores están habilitados.
Para desactivarlos, simplemente establezca el indicador habilitado apropiado en false.
ipf.htm.async {
register-task.enabled = true
cancel-task.enabled = true
}
HTTP Conectores
Mientras que las notificaciones de tareas completadas siempre se reciben como Kafka mensajes, solicitudes para crear y cancelar tareas en HTM se puede hacer a través de ambos HTTP y Kafka, pero todas las demás operaciones están solamente disponibles a través de HTTP.
Por defecto, todos HTM Los conectores de solicitud-respuesta están deshabilitados y deben ser habilitados explícitamente a través de la configuración:
ipf.htm.request-reply.starter {
task-summaries.enabled = true
task-details.enabled = true
task-history.enabled = true
register-task.enabled = true
assign-task.enabled = true
execute-task.enabled = true
approve-task.enabled = true
reject-task.enabled = true
cancel-task.enabled = true
execute-bulk-tasks.enabled = true
bulk-details.enabled = true
}
El HTTP la configuración del cliente también debe ser anulada. Estos son los valores predeterminados:
ipf.htm.request-reply.starter {
http {
client {
host = "localhost"
endpoint-url = "/tasks"
port = 8080
}
}
}
Finalmente, estos Conectores vienen con configuraciones de resiliencia proporcionadas, las cuales también pueden ser anuladas. Aquí están los valores predeterminados:
ipf.htm.request-reply.starter {
call-timeout = 2s
resiliency-settings {
enabled = true
minimum-number-of-calls = 50
max-attempts = 5
reset-timeout = 1s
initial-retry-wait-duration = 1s
backoff-multiplier = 2
retry-on-failure-when = true
retry-on-result-when = false
retryable-status-codes = []
}
}
Indexación
HTMtiene una configuración predeterminada para crear automáticamente MongoDB índices en la inicialización.
La creación de índices predeterminados puede ser desactivada con lo siguiente:
ipf.htm.mongodb.create-indexes=false
Los índices pueden ser deshabilitados globalmente con:
ipf.mongodb.create-indexes=false
Para deshabilitar la indexación globalmente pero mantenerla para HTM, aplique lo siguiente, manteniendo el orden:
ipf.mongodb.create-indexes=false
ipf.htm.mongodb.create-indexes=true
Compromiso de Quórum
El quórum de compromiso puede ser controlado de manera similar con:
ipf.htm.mongodb.commit-quorum=1
O bien anulado globalmente con:
ipf.mongodb.commit-quorum=1
Para establecer un quórum de confirmación diferente a nivel global para el HTM uno(s), aplique lo siguiente, manteniendo el orden:
ipf.mongodb.commit-quorum="votingMembers"
htm.task-history-repository.commit-quorum=1
htm.task-idempotency-cache.commit-quorum=1
htm.task-repository.commit-quorum=1
Paso 4: Ejecute Varios HTM Operaciones
Las operaciones del conector están disponibles a través de la htm-client-sdk-request-reply-connector-starter dependencia que usted agregó en el paso 0. Esta dependencia crea un HtmClientSdk implementación para usted.
La interfaz es:
public interface HtmClientSdk {
CompletionStage<Response<RegisterTaskResponse>> registerTask(ProcessingContext processingContext, RegisterTaskRequest request);
CompletionStage<Response<ResponseModel<CancelTaskResponse>>> cancelTask(ProcessingContext processingContext, String taskId, CancelTaskRequest request);
CompletionStage<Response<GetTaskSummariesResponse>> getTaskSummaries(ProcessingContext processingContext, GetTaskSummariesParams getTaskSummariesParams);
CompletionStage<Response<ResponseModel<GetTaskDetailsResponse>>> getTaskDetails(ProcessingContext processingContext, String taskId);
CompletionStage<Response<ResponseModel<GetTaskHistoryResponse>>> getTaskHistory(ProcessingContext processingContext, String taskId);
CompletionStage<Response<ResponseModel<AssignTaskResponse>>> assignTask(ProcessingContext processingContext, WrapperRequest<AssignTaskRequest> assignTaskRequest);
CompletionStage<Response<ResponseModel<ExecuteTaskResponse>>> executeTask(ProcessingContext processingContext, WrapperRequest<ExecuteTaskRequest> executeTaskRequest);
CompletionStage<Response<ResponseModel<ApproveTaskResponse>>> approveTask(ProcessingContext processingContext, WrapperRequest<ApproveTaskRequest> approveTaskRequest);
CompletionStage<Response<ResponseModel<RejectTaskResponse>>> rejectTask(ProcessingContext processingContext, WrapperRequest<RejectTaskRequest> rejectTaskRequest);
CompletionStage<Response<ResponseModel<BulkProcessingAcceptedResponse>>> executeBulkTasks(ProcessingContext processingContext, ExecuteBulkRequest executeBulkRequest);
CompletionStage<Response<ResponseModel<GetBulkDetailsResponse>>> getBulkDetails(ProcessingContext processingContext, String bulkId);
}
Tenga en cuenta que para utilizar diversas funciones, estas deben ser habilitadas primero según el paso 3 anterior.
Finalmente, puede inyectar esta dependencia como un Spring bean(por ejemplo, utilizando @Autowire) y utilícelo en su código:
import com.iconsolutions.ipf.htm.adapter.HumanTaskManagerAdapter;
public class MyHtmClient {
private final HtmClientSdk htmClientSdk;
public MyHtmClient(HtmClientSdk htmClientSdk) {
this.htmClientSdk = htmClientSdk;
}
public void createTask() {
htmClientSdk.registerTask(...)
.toCompletableFuture().join();
}
}
Gestionando HTM tareas a través de Kafka
Registro y Cancelación HTM las tareas también podrían realizarse a través de Kafka.
Para poder hacer esto, es necesario añadir htm-client-sdk-async-connector-kafka-starter dependencia (como ya se mencionó en el paso 0):
Esta dependencia expone HtmAsyncClientSdk Spring Bean, teniendo un registerTask(ProcessingContext processingContext, RegisterTaskRequest registerTaskRequest) método, y un cancelTask(ProcessingContext processingContext, String taskId, CancelTaskRequest cancelTaskRequest) método.
El registerTask método debe ser llamado para registrar una tarea de manera asíncrona; devuelve el TaskId of HTM Tarea registrada como respuesta.
Asimismo, el cancelTask método debe ser llamado para cancelar una tarea de manera asíncrona; toma el TaskId para ser cancelado como un parámetro, y lo devuelve una vez que se ha ejecutado con éxito.
Ejemplos de uso en el código del cliente:
public class MyAsyncHtmRequestSender {
private final HtmAsyncClientSdk htmAsyncClientSdk;
public MyAsyncHtmRequestSender(HtmAsyncClientSdk htmAsyncClientSdk) {
this.htmAsyncClientSdk = htmAsyncClientSdk;
}
public CompletionStage<String> registerTask(ProcessingContext processingContext, RegisterTaskRequest registerTaskRequest) {
return htmAsyncClientSdk.registerTask(processingContext, registerTaskRequest);
}
public CompletionStage<String> cancelTask(ProcessingContext processingContext, String taskId, CancelTaskRequest cancelTaskRequest) {
return htmAsyncClientSdk.cancelTask(processingContext, taskId, registerTaskRequest);
}
}
Paso 5: Despliegue
HTMLos conectores pueden configurarse para realizar reintentos y utilizar el patrón de cortacircuito para proteger el HTM HTTP servidor (para más información sobre cómo configurar esto, consulte el sección de configuración del cliente).
Configuración que se requiere para el despliegue a Kubernetes:
akka {
remote.artery.canonical.hostname = ${POD_IP}
# Use Kubernetes API to discover the cluster
discovery {
kubernetes-api {
pod-label-selector = "app=%s"
}
}
management {
# use the Kubernetes API to create the cluster
cluster.bootstrap {
contact-point-discovery {
discovery-method = kubernetes-api
service-name = ${AKKA_CLUSTER_BOOTSTRAP_SERVICE_NAME}
required-contact-point-nr = 1
required-contact-point-nr = ${?REQUIRED_CONTACT_POINT_NR}
}
}
# available from Akka management >= 1.0.0
health-checks {
readiness-path = "health/ready"
liveness-path = "health/alive"
}
}
actor {
provider = cluster
}
cluster {
seed-nodes = []
downing-provider-class = "akka.cluster.sbr.SplitBrainResolverProvider"
split-brain-resolver {
active-strategy = keep-majority
stable-after = 20s
}
sharding {
remember-entities = off
handoff-timeout = 8s
least-shard-allocation-strategy.rebalance-absolute-limit = 20
rebalance-interval = 2s
number-of-shards = 100
}
}
}
Event configuración del procesador
Por defecto, el Human Task Manager la aplicación registra 4 procesadores de eventos:
-
Tarea de lectura del procesador de eventos secundarios
-
Procesador de eventos del lado de lectura del historial de tareas
-
Procesador de eventos de ejecución masiva
-
IPF Processing Data Salida
| Estos procesadores registran sus propios flujos de procesadores de eventos, lo que consume más Unidades de Solicitud (RUs) en Azure. CosmosDB para MongoDB. |
| Para ahorrar algunos RUs en Azure CosmosDB, el Delegante Event El procesador puede ser utilizado. |
En este caso, solo un procesador de eventos consume eventos del diario y luego delega los eventos a otros procesadores de eventos registrados.
La desventaja de la Delegación Event El procesador se puede encontrar en su manejo de errores. Si un evento no se procesa en ninguno de los procesadores de eventos delegados, el mismo evento será reintentado por todos los procesadores de eventos delegados, incluso si fue procesado con éxito por otros. Sin embargo, cuando hay tres procesadores de eventos independientes, el evento fallido solo será reintentado por el procesador de eventos donde falló el procesamiento anterior.
Puede habilitar la delegación Event Procesador utilizando la siguiente configuración:
ipf.htm.event-processor.delegating.enabled = true
¡Felicidades! Ahora está creando y siendo notificado de tareas humanas en IPF.
Cambiar de forma segura hacia y desde el procesador de eventos delegante es una tarea compleja que requiere tiempo de inactividad y una migración cuidadosa de los desplazamientos comprometidos en el mongoOffsets colección.
Es recomendable realizar algunas pruebas en entornos de prueba antes de intentar una en producción.
|