Getting Started
This document will explain how to get set up with using Human Task Manager (HTM) with IPF.
Step 0: Dependency
Add the following dependencies to your application’s pom.xml:
<!--Allows for your application to create tasks in HTM-->
<dependency>
<groupId>com.iconsolutions.ipf.htm</groupId>
<artifactId>htm-client-sdk-request-reply-connector-starter</artifactId>
</dependency>
<!--Allows for your app to receive notifications (over Kafka) when tasks are marked as completed-->
<dependency>
<groupId>com.iconsolutions.ipf.htm</groupId>
<artifactId>htm-client-sdk-notification-kafka-starter</artifactId>
</dependency>
| Explicit HTM versions are not required if you are importing the IPF BOM |
Step 1: Register a handler to be notified of tasks
When notifications are received over Kafka that tasks have been marked as completed, the Receive Connector that receives these messages will look for an interface called HtmClientReceiveNotificationPort, which has the following signature:
public interface HtmClientReceiveNotificationPort {
default ProcessingContext determineContextFor(TransportMessage message) {
var messageHeaders = message.getMessageHeaders();
var builder = ProcessingContext.builder()
.associationId(messageHeaders.getHeader("associationId").map(Object::toString).orElse(null))
.unitOfWorkId(messageHeaders.getHeader("uowId").map(Object::toString).orElse(null));
messageHeaders.getHeader("requestId").map(Object::toString).ifPresent(builder::clientRequestId);
messageHeaders.getHeader("processingEntity").map(Object::toString).ifPresent(builder::clientRequestId);
return builder.build();
}
CompletionStage<Void> receiveNotification(ReceivingContext receivingContext, TaskClosedNotification taskClosedNotification);
}
If no implementation is available for this interface, the Spring context will not start and will fail with the following error:
Caused by: org.springframework.beans.factory.NoSuchBeanDefinitionException: No qualifying bean of type 'com.iconsolutions.ipf.htm.client.receive.sdk.HtmClientReceiveNotificationPort' available: expected at least 1 bean which qualifies as autowire candidate
You should only need to implement receiveNotification as per the below:
public class HtmTaskClosedNotificationReceiveAdapter implements HtmClientReceiveNotificationPort {
@Override
public CompletionStage<Void> receiveNotification(ReceivingContext receivingContext, TaskClosedNotification taskClosedNotification) {
return HtmclientmodelDomain.humanTaskManager().handle(
new TaskClosedNotificationInput.Builder(receivingContext.getProcessingContext().getAssociationId().getValue())
.withTaskClosedResult(toTaskClosedNotification(taskClosedNotification))
.build())
.thenAccept(done -> log.info("Completed {}", done));
}
}
The example above shows us calling an IPF flow to continue its execution; this is the typical kind of usage for an application that uses HTM but any other implementation can be plugged in here instead.
Step 2: Kafka Configuration
The Kafka Receive Connector is configured as follows:
akka {
kafka {
consumer {
kafka-clients {
bootstrap.servers = "kafka:9092"
}
restart-settings = ${common-flow-restart-settings}
}
}
}
htm {
kafka {
consumer {
topic = HTM_TASK_CLOSED_NOTIFICATION
restart-settings = ${common-flow-restart-settings}
kafka-clients {
group.id = htm-task-closed-notification-group
}
}
}
}
Step 3: HTM Client Configuration
Connectors
While notifications of completed tasks are received as Kafka messages, requests to create messages in HTM are done via HTTP.
By default, all HTM Request-Reply Connectors are disabled by default and need to be explicitly enabled through config:
ipf.htm.request-reply.starter {
task-summaries.enabled = true
task-details.enabled = true
task-history.enabled = true
register-task.enabled = true
assign-task.enabled = true
execute-task.enabled = true
approve-task.enabled = true
reject-task.enabled = true
cancel-task.enabled = true
execute-bulk-tasks.enabled = true
bulk-details.enabled = true
}
The HTTP client’s configuration should be overridden as well. These are the defaults:
ipf.htm.request-reply.starter {
http {
client {
host = "localhost"
endpoint-url = "/tasks"
port = 8080
}
}
}
Finally, these Connectors come with provided resiliency settings, which can also be overridden. Here are the defaults:
ipf.htm.request-reply.starter {
call-timeout = 2s
resiliency-settings {
enabled = true
minimum-number-of-calls = 50
max-attempts = 5
reset-timeout = 1s
initial-retry-wait-duration = 1s
backoff-multiplier = 2
retry-on-failure-when = true
retry-on-result-when = false
retryable-status-codes = []
}
}
Indexing
HTM has default configuration to automatically create MongoDB indexes on initialisation.
The creation of default indexes can be disabled with the following:
ipf.htm.mongodb.create-indexes=false
Indexes can be disabled globally with:
ipf.mongodb.create-indexes=false
To disable indexing globally but to retain it for HTM, apply the following, retaining the order:
ipf.mongodb.create-indexes=false
ipf.htm.mongodb.create-indexes=true
Commit Quorum
The commit quorum can similarly be controlled with:
ipf.htm.mongodb.commit-quorum=1
Or overridden globally with:
ipf.mongodb.commit-quorum=1
To set a different commit quorum globally to the HTM one(s), apply the following, retaining the order:
ipf.mongodb.commit-quorum="votingMembers"
htm.task-history-repository.commit-quorum=1
htm.task-idempotency-cache.commit-quorum=1
htm.task-repository.commit-quorum=1
Step 4: Execute Various HTM Operations
Connector operations are available via the htm-client-sdk-request-reply-connector-starter dependency you added in step 0. This dependency creates a HtmClientSdk implementation for you.
The interface is:
public interface HtmClientSdk {
CompletionStage<Response<RegisterTaskResponse>> registerTask(ProcessingContext processingContext, RegisterTaskRequest request);
CompletionStage<Response<ResponseModel<CancelTaskResponse>>> cancelTask(ProcessingContext processingContext, String taskId, CancelTaskRequest request);
CompletionStage<Response<GetTaskSummariesResponse>> getTaskSummaries(ProcessingContext processingContext, GetTaskSummariesParams getTaskSummariesParams);
CompletionStage<Response<ResponseModel<GetTaskDetailsResponse>>> getTaskDetails(ProcessingContext processingContext, String taskId);
CompletionStage<Response<ResponseModel<GetTaskHistoryResponse>>> getTaskHistory(ProcessingContext processingContext, String taskId);
CompletionStage<Response<ResponseModel<AssignTaskResponse>>> assignTask(ProcessingContext processingContext, WrapperRequest<AssignTaskRequest> registerTaskRequest);
CompletionStage<Response<ResponseModel<ExecuteTaskResponse>>> executeTask(ProcessingContext processingContext, WrapperRequest<ExecuteTaskRequest> executeTaskRequest);
CompletionStage<Response<ResponseModel<ExecuteTaskResponse>>> approveTask(ProcessingContext processingContext, WrapperRequest<ApproveTaskRequest> approveTaskRequest);
CompletionStage<Response<ResponseModel<AssignTaskResponse>>> rejectTask(ProcessingContext processingContext, WrapperRequest<ApproveTaskRequest> registerTaskRequest);
CompletionStage<Response<ResponseModel<BulkProcessingAcceptedResponse>>> executeBulkTasks(ProcessingContext processingContext, ExecuteBulkRequest executeBulkRequest);
CompletionStage<Response<ResponseModel<GetBulkDetailsResponse>>> getBulkDetails(ProcessingContext processingContext, String bulkId);
}
Note that in order to use various functions they need to first be enabled as per step 3 above.
Finally, you can inject this dependency as a Spring bean (for example, using @Autowire) and use it in your code:
import com.iconsolutions.ipf.htm.adapter.HumanTaskManagerAdapter;
public class MyHtmClient {
private final HtmClientSdk htmClientSdk;
public MyHtmClient(HtmClientSdk htmClientSdk) {
this.htmClientSdk = htmClientSdk;
}
public void createTask() {
htmClientSdk.registerTask(...)
.toCompletableFuture().join();
}
}
Step 5: Deployment
HTM connectors can be configured to do retries and use the circuit breaker pattern to protect the HTM Http server (for more information on how to configure this, visit the above connector docs link).
Configuration that is required for deployment to Kubernetes:
akka {
remote.artery.canonical.hostname = ${POD_IP}
# Use Kubernetes API to discover the cluster
discovery {
kubernetes-api {
pod-label-selector = "app=%s"
}
}
management {
# use the Kubernetes API to create the cluster
cluster.bootstrap {
contact-point-discovery {
discovery-method = kubernetes-api
service-name = ${AKKA_CLUSTER_BOOTSTRAP_SERVICE_NAME}
required-contact-point-nr = 1
required-contact-point-nr = ${?REQUIRED_CONTACT_POINT_NR}
}
}
# available from Akka management >= 1.0.0
health-checks {
readiness-path = "health/ready"
liveness-path = "health/alive"
}
}
actor {
provider = cluster
}
cluster {
seed-nodes = []
downing-provider-class = "akka.cluster.sbr.SplitBrainResolverProvider"
split-brain-resolver {
active-strategy = keep-majority
stable-after = 20s
}
sharding {
remember-entities = off
handoff-timeout = 8s
least-shard-allocation-strategy.rebalance-absolute-limit = 20
rebalance-interval = 2s
number-of-shards = 100
}
}
}
Event processor configuration
By default, the Human Task Manager application registers 4 event processors:
-
Task read side event processor
-
Task history read side event processor
-
Bulk execution event processor
-
IPF Processing Data Egress
| These processors register their own event processor streams, which consumes more Request Units (RUs) on Azure CosmosDB for MongoDB. |
| To save some RUs on Azure CosmosDB, the Delegating Event Processor can be used. |
In this case only one event processor consumes events from journal and then delegate events to other registered event processors.
The disadvantage of the Delegating Event Processor can be found in its handling of errors. If an event fails to process in any of the delegated event processors, the same event will be retried by all delegated event processors, even if it was successfully processed by others. However, when there are three independent event processors, the failing event will only be retried by the event processor where the previous processing failed.
You can enable the Delegating Event Processor using the following configuration:
ipf.htm.event-processor.delegating.enabled = true
Congratulations! You are now creating and being notified of human tasks in IPF.
Safely switching to and from the delegating event processor is a complex task that requires downtime and a careful migration of the committed offsets in the mongoOffsets collection.
It is advisable to do a few practice runs on test environments before attempting one in production.
|