Documentation for a newer release is available. View Latest
Esta página no está disponible actualmente en Español. Si lo necesita, póngase en contacto con el servicio de asistencia de Icon (correo electrónico)

Getting Started

This document will explain how to get set up with using Human Task Manager (HTM) with IPF.

Step 0: Dependency

Add the following dependencies to your application’s pom.xml:

<!--Allows for your application to create tasks in HTM-->
<dependency>
    <groupId>com.iconsolutions.ipf.htm</groupId>
    <artifactId>htm-client-sdk-request-reply-connector-starter</artifactId>
</dependency>
<!--Allows for your app to receive notifications (over Kafka) when tasks are marked as completed-->
<dependency>
    <groupId>com.iconsolutions.ipf.htm</groupId>
    <artifactId>htm-client-sdk-notification-kafka-starter</artifactId>
</dependency>
Explicit HTM versions are not required if you are importing the IPF BOM

Step 1: Register a handler to be notified of tasks

When notifications are received over Kafka that tasks have been marked as completed, the Receive Connector that receives these messages will look for an interface called HtmClientReceiveNotificationPort, which has the following signature:

public interface HtmClientReceiveNotificationPort {
    default ProcessingContext determineContextFor(TransportMessage message) {
        var messageHeaders = message.getMessageHeaders();
        var builder = ProcessingContext.builder()
                .associationId(messageHeaders.getHeader("associationId").map(Object::toString).orElse(null))
                .unitOfWorkId(messageHeaders.getHeader("uowId").map(Object::toString).orElse(null));
        messageHeaders.getHeader("requestId").map(Object::toString).ifPresent(builder::clientRequestId);
        messageHeaders.getHeader("processingEntity").map(Object::toString).ifPresent(builder::clientRequestId);
        return builder.build();
    }
    CompletionStage<Void> receiveNotification(ReceivingContext receivingContext, TaskClosedNotification taskClosedNotification);
}

If no implementation is available for this interface, the Spring context will not start and will fail with the following error:

Caused by: org.springframework.beans.factory.NoSuchBeanDefinitionException: No qualifying bean of type 'com.iconsolutions.ipf.htm.client.receive.sdk.HtmClientReceiveNotificationPort' available: expected at least 1 bean which qualifies as autowire candidate

You should only need to implement receiveNotification as per the below:

public class HtmTaskClosedNotificationReceiveAdapter implements HtmClientReceiveNotificationPort {

    @Override
    public CompletionStage<Void> receiveNotification(ReceivingContext receivingContext, TaskClosedNotification taskClosedNotification) {
        return HtmclientmodelDomain.humanTaskManager().handle(
                        new TaskClosedNotificationInput.Builder(receivingContext.getProcessingContext().getAssociationId().getValue())
                                .withTaskClosedResult(toTaskClosedNotification(taskClosedNotification))
                                .build())
                .thenAccept(done -> log.info("Completed {}", done));
    }
}

The example above shows us calling an IPF flow to continue its execution; this is the typical kind of usage for an application that uses HTM but any other implementation can be plugged in here instead.

Step 2: Kafka Configuration

The Kafka Receive Connector is configured as follows:

akka {
  kafka {
    consumer {
      kafka-clients {
        bootstrap.servers = "kafka:9092"
      }
      restart-settings = ${common-flow-restart-settings}

    }
  }
}

htm {
  kafka {
    consumer {
      topic = HTM_TASK_CLOSED_NOTIFICATION
      restart-settings = ${common-flow-restart-settings}
      kafka-clients {
        group.id = htm-task-closed-notification-group
      }
    }
  }
}

Step 3: HTM Client Configuration

Connectors

While notifications of completed tasks are received as Kafka messages, requests to create messages in HTM are done via HTTP.

By default, all HTM Request-Reply Connectors are disabled by default and need to be explicitly enabled through config:

ipf.htm.request-reply.starter {
  task-summaries.enabled = true
  task-details.enabled = true
  task-history.enabled = true
  register-task.enabled = true
  assign-task.enabled = true
  execute-task.enabled = true
  approve-task.enabled = true
  reject-task.enabled = true
  cancel-task.enabled = true
  execute-bulk-tasks.enabled = true
  bulk-details.enabled = true
}

The HTTP client’s configuration should be overridden as well. These are the defaults:

ipf.htm.request-reply.starter {
  http {
    client {
      host = "localhost"
      endpoint-url = "/tasks"
      port = 8080
    }
  }
}

Finally, these Connectors come with provided resiliency settings, which can also be overridden. Here are the defaults:

ipf.htm.request-reply.starter {
  call-timeout = 2s
  resiliency-settings {
    enabled = true
    minimum-number-of-calls = 50
    max-attempts = 5
    reset-timeout = 1s
    initial-retry-wait-duration = 1s
    backoff-multiplier = 2
    retry-on-failure-when = true
    retry-on-result-when = false
    retryable-status-codes = []
  }
}

Indexing

HTM has default configuration to automatically create MongoDB indexes on initialisation.

The creation of default indexes can be disabled with the following:

ipf.htm.mongodb.create-indexes=false

Indexes can be disabled globally with:

ipf.mongodb.create-indexes=false

To disable indexing globally but to retain it for HTM, apply the following, retaining the order:

ipf.mongodb.create-indexes=false
ipf.htm.mongodb.create-indexes=true

Commit Quorum

The commit quorum can similarly be controlled with:

ipf.htm.mongodb.commit-quorum=1

Or overridden globally with:

ipf.mongodb.commit-quorum=1

To set a different commit quorum globally to the HTM one(s), apply the following, retaining the order:

ipf.mongodb.commit-quorum="votingMembers"
htm.task-history-repository.commit-quorum=1
htm.task-idempotency-cache.commit-quorum=1
htm.task-repository.commit-quorum=1

Step 4: Execute Various HTM Operations

Connector operations are available via the htm-client-sdk-request-reply-connector-starter dependency you added in step 0. This dependency creates a HtmClientSdk implementation for you. The interface is:

public interface HtmClientSdk {
    CompletionStage<Response<RegisterTaskResponse>> registerTask(ProcessingContext processingContext, RegisterTaskRequest request);
    CompletionStage<Response<ResponseModel<CancelTaskResponse>>> cancelTask(ProcessingContext processingContext, String taskId, CancelTaskRequest request);
    CompletionStage<Response<GetTaskSummariesResponse>> getTaskSummaries(ProcessingContext processingContext, GetTaskSummariesParams getTaskSummariesParams);
    CompletionStage<Response<ResponseModel<GetTaskDetailsResponse>>> getTaskDetails(ProcessingContext processingContext, String taskId);
    CompletionStage<Response<ResponseModel<GetTaskHistoryResponse>>> getTaskHistory(ProcessingContext processingContext, String taskId);
    CompletionStage<Response<ResponseModel<AssignTaskResponse>>> assignTask(ProcessingContext processingContext, WrapperRequest<AssignTaskRequest> assignTaskRequest);
    CompletionStage<Response<ResponseModel<ExecuteTaskResponse>>> executeTask(ProcessingContext processingContext, WrapperRequest<ExecuteTaskRequest> executeTaskRequest);
    CompletionStage<Response<ResponseModel<ApproveTaskResponse>>> approveTask(ProcessingContext processingContext, WrapperRequest<ApproveTaskRequest> approveTaskRequest);
    CompletionStage<Response<ResponseModel<RejectTaskResponse>>> rejectTask(ProcessingContext processingContext, WrapperRequest<RejectTaskRequest> rejectTaskRequest);
    CompletionStage<Response<ResponseModel<BulkProcessingAcceptedResponse>>> executeBulkTasks(ProcessingContext processingContext, ExecuteBulkRequest executeBulkRequest);
    CompletionStage<Response<ResponseModel<GetBulkDetailsResponse>>> getBulkDetails(ProcessingContext processingContext, String bulkId);
}

Note that in order to use various functions they need to first be enabled as per step 3 above.

Finally, you can inject this dependency as a Spring bean (for example, using @Autowire) and use it in your code:

import com.iconsolutions.ipf.htm.adapter.HumanTaskManagerAdapter;

public class MyHtmClient {

    private final HtmClientSdk htmClientSdk;

    public MyHtmClient(HtmClientSdk htmClientSdk) {
        this.htmClientSdk = htmClientSdk;
    }

    public void createTask() {
        htmClientSdk.registerTask(...)
            .toCompletableFuture().join();
    }
}

Step 5: Deployment

HTM connectors can be configured to do retries and use the circuit breaker pattern to protect the HTM Http server (for more information on how to configure this, visit the above connector docs link).

Configuration that is required for deployment to Kubernetes:

 akka {
   remote.artery.canonical.hostname = ${POD_IP}
 # Use Kubernetes API to discover the cluster
   discovery {
     kubernetes-api {
       pod-label-selector = "app=%s"
     }
   }

   management {
 # use the Kubernetes API to create the cluster
     cluster.bootstrap {
       contact-point-discovery {
         discovery-method          = kubernetes-api
         service-name              = ${AKKA_CLUSTER_BOOTSTRAP_SERVICE_NAME}
         required-contact-point-nr = 1
         required-contact-point-nr = ${?REQUIRED_CONTACT_POINT_NR}
       }
     }
 # available from Akka management >= 1.0.0
     health-checks {
       readiness-path  = "health/ready"
       liveness-path   = "health/alive"
     }
   }

   actor {
     provider = cluster
   }

   cluster {
     seed-nodes = []
     downing-provider-class = "akka.cluster.sbr.SplitBrainResolverProvider"
     split-brain-resolver {
       active-strategy = keep-majority
       stable-after = 20s
     }
     sharding {
       remember-entities = off
       handoff-timeout = 8s
       least-shard-allocation-strategy.rebalance-absolute-limit = 20
       rebalance-interval = 2s
       number-of-shards = 100
     }
   }
}

Event processor configuration

By default, the Human Task Manager application registers 4 event processors:

  • Task read side event processor

  • Task history read side event processor

  • Bulk execution event processor

  • IPF Processing Data Egress

These processors register their own event processor streams, which consumes more Request Units (RUs) on Azure CosmosDB for MongoDB.
To save some RUs on Azure CosmosDB, the Delegating Event Processor can be used.

In this case only one event processor consumes events from journal and then delegate events to other registered event processors.

The disadvantage of the Delegating Event Processor can be found in its handling of errors. If an event fails to process in any of the delegated event processors, the same event will be retried by all delegated event processors, even if it was successfully processed by others. However, when there are three independent event processors, the failing event will only be retried by the event processor where the previous processing failed.

You can enable the Delegating Event Processor using the following configuration:

ipf.htm.event-processor.delegating.enabled = true

Congratulations! You are now creating and being notified of human tasks in IPF.

Safely switching to and from the delegating event processor is a complex task that requires downtime and a careful migration of the committed offsets in the mongoOffsets collection. It is advisable to do a few practice runs on test environments before attempting one in production.