Documentation for a newer release is available. View Latest
Esta página no está disponible actualmente en Español. Si lo necesita, póngase en contacto con el servicio de asistencia de Icon (correo electrónico)

Message Association

In the context of connectors, association is a hypernym (umbrella term) for correlation and identification. Correlation is used to relate a response to a message sent asynchronously by another process, whereas identification is used to create an identity for a new message which has no relation to any previous messages sent before.

Correlation

Correlation Identifier Extraction

When sending or receiving a message that requires correlation, an implementation of CorrelationIdExtractor or ConnectorMessageCorrelationIdExtractor must be provided.

These are both functional interfaces, where the CorrelationIdExtractor can extract from the target message type and the ConnectorMessageCorrelationIdExtractor has access to headers passed from the transport layer which could be used to store the correlation identifier. In both cases we return an instance of CorrelationId that is used to either persist or fetch the context.

@FunctionalInterface
public interface ConnectorMessageCorrelationIdExtractor<T> {
    CorrelationId extract(ConnectorMessage<T> connectorMessage);
}

@FunctionalInterface
public interface CorrelationIdExtractor<T> {
    CorrelationId extract(T payload);

    static <T> ConnectorMessageCorrelationIdExtractor<T>
    forConnectorMessage(CorrelationIdExtractor<T> correlationIdExtractor) {
        return connectorMessage ->
                correlationIdExtractor.extract(connectorMessage.getTargetMessage());
    }
}
It is highly recommended to use extractors that provide unique correlation IDs per request-reply pair. The default correlation service implementation will match a response to the latest request made with the same correlation ID, but in rare scenarios that match may not be correct due to clock drift between the servers that created the correlation entries.

Send Correlation

Messages sent using a sending connector must pass a ProcessingContext and can optionally pass a SupportingContext too, see the Send Connector documentation for more details around this.

Both these context objects will be persisted via an implementation of CorrelationService, whose purpose is store the context into a datastore against a correlation identifier. The default datastore implementation used for the correlation service is MongoDB, though this can be swapped out with any other repository, e.g. Redis, PostgreSQL.

Receive Correlation

When messages are received in response to another message sent in a separate process, provided the receiving connector has been configured with a correlation extractor and a correlation service, correlation will be performed.

The correlation identifier will be extracted from the received message and is used to retrieve the context which was persisted during the send request. If the context is found it is passed through to later processing stages, otherwise an exception is thrown and passed to error handlers.

If the received message contains a processing context, e.g. as a result of the identification stage, any processing context contained in the correlation will be merged with the message context. See the Context Merging section for more information.

Time To Live (TTL)

The MongoDB correlation service by default will persist entries for 3600 seconds (60 minutes) based on the creationDate field. It does this by creating an index with a TTL on the MongoDB collection. Once the time has expired entries will be evicted from the collection and no longer available. If you want to change the field or the expiry period you can set the following properties in your application.conf file:

ipf.connector.correlation-expiry = 3600
ipf.connector.correlation-timestamp-field-name = "creationDate"

Identification

Identification is typically used for initiating requests, i.e. not responses to messages sent previously. In this case they need to generate a ProcessingContext to be used going forward, since there is no context to fetch from the correlation service.

In some cases the message might be correlated in some way to previous messages and the generated ProcessingContext may need to reflect this.

Identification is also supported in response scenarios where a correlation is also likely to exist, e.g. the response message contains a processing context. In this case, identification builds a processing context from the response message, and this is merged with the correlation context. See the Context Merging section for more information.

Setting up identification for a receiving connector only requires an implementation of ProcessingContextExtractor. This makes identification flexible as the context can be either generated randomly or be derived from the received message.

@FunctionalInterface
public interface ProcessingContextExtractor<T> {
    ProcessingContext extract(ConnectorMessage<T> connectorMessage);
}

Helper Class

The InitiatingProcessingContextExtractor is an implementation of ProcessingContextExtractor which aims to reduce boilerplate, especially in the case where you want to either fully or partially generate random values for processing context fields.

For the simplest case the generateRandom static factory method can be used. Regardless of the input it will generate a ProcessingContext with a randomly generated UnitOfWorkId; and unknown ProcessingEntity and ClientRequestId.

ProcessingContextExtractor<TestObject> extractor = InitiatingProcessingContextExtractor.generateRandom();

Alternatively, the extractor can be constructed using the builder pattern and individual extractors for each field can be supplied, otherwise the defaults will be used.

var extractor = InitiatingProcessingContextExtractor.<TestObject>builder()
        .unitOfWorkIdExtractor(message -> UnitOfWorkId.of(message.getTargetMessage().unitOfWorkId))
        .clientRequestIdExtractor(message -> ClientRequestId.of(message.getTargetMessage().clientRequestId))
        .processingEntityExtractor(message -> ProcessingEntity.of(message.getTargetMessage().processingEntity))
        .build();

Since it is often the case that the ProcessingEntity does not change between request and remains static, there is a static factory method defined to supply a static ProcessingEntity to cut down on the boilerplate code.

private static final ProcessingEntity PROCESSING_ENTITY = ProcessingEntity.of("processingEntity");
var extractor = InitiatingProcessingContextExtractor.<TestObject>builder()
        .processingEntityExtractor(staticSupplier(PROCESSING_ENTITY))
        .build();

Context Merging

When both identification and correlation are applied to a receive connector where the message being received is a response to a previous request, the context stored on the original request correlation, and the context on the received message, are merged together.

Example:

  1. System A sends a request to System B, and a correlation is stored containing the current processing context

  2. System B sends a response to System A, with a processing context containing different values (same uowId)

  3. The context being received is merged into the context stored on the correlation

Diagram

The correlation context is used as the base, and the received message context is merged into it. Both contexts must have the same unitOfWorkId or an exception is thrown. The following rules are used when merging processing context fields.

  1. The checkpoint is merged when it is not null

  2. The clientRequestId is merged when it is not UNKNOWN

  3. The processingEntity is merged when it is not UNKNOWN

  4. The associationId is NOT merged, even if the correlation value is UNKNOWN

When does merging not happen?

If the correlation stage is disabled, or skipped for the received message, the processing context from the received message is used in its entirety. No merging takes place.

If the identification stage is disabled, the processing context from the correlation is used in its entirety. No merging takes place.