Documentation for a newer release is available. View Latest

Asynchronous Request-Reply

How do I correlate messages from my own internal format to some external domain, if responses are async?

This Connector example connects to a fake service which takes a person’s name and returns their age. The service on the other end takes requests on a request queue, and sends responses back on a response queue with a correlation ID to help the consumer determine which request this response is for.

Diagram
This guide is also available in the separate connector-samples Git repository here.

Correlation

We use an in-memory correlation service to store mappings from our "internal" flow ID concept to the external "correlation ID" concept.

There might be multiple reasons why we cannot use our own internal ID as the correlation ID for an external service (hence the need for a correlation ID service).

  • I don’t want to publish my internal ID to the outside world.

  • Technical limitations, e.g. external system’s correlation IDs must follow a specific format.

  • There are multiple interactions with this external system for a single one of my internal ID, which would make correlation non-unique per individual invocation.

Connector Setup

We have set up a SendConnector and ReceiveConnector pair. First the SendConnector.

        var sendConnector = new SendConnector
                .Builder<AgeRequest, TheirAgeRequest>("OurSender")
                .withActorSystem(actorSystem)
                .withCorrelationService(correlationService)
                .withConnectorTransport(sendingTransport)
                .withMessageLogger(logger())
                .withCorrelationIdExtractor(request -> CorrelationId.of(request.getCorrelationId())) (1)
                .withDomainToTargetTypeConverter(this::createTheirAgeRequest) (2)
                .withSendTransportMessageConverter(toJson()) (3)
                .build();
1 Telling the SendConnector where to look in the target message type for the correlation ID to save in the Correlation ID service.
2 Creating an external domain request with a random number as the external Correlation ID.
3 Converting the external domain POJO type to a JSON representation.

And here’s the ReceiveConnector.

        new ReceiveConnector
                .Builder<TheirAgeResponse>("OurReceiver")
                .withActorSystem(actorSystem)
                .withConnectorTransport(receivingTransport)
                .withMessageLogger(logger())
                .withCorrelationService(correlationService)
                .withCorrelationIdExtractor(response -> CorrelationId.of(response.getCorrelationId())) (1)
                .withReceiveTransportMessageConverter(fromJson(TheirAgeResponse.class)) (2)
                .withManualStart(false)
                .withReceiveHandler((receivingContext, response) -> {
                    var processingContext = receivingContext.getProcessingContext();
                    ageMap.put(processingContext.getUnitOfWorkId(), response.getAge()); (3)
                    return CompletableFuture.completedFuture(null);
                })
                .build();
1 Function to tell the ReceiveConnector where in the response message to look for the correlation ID.
2 Converting the TransportMessage from JSON to a TheirAgeResponse POJO.
3 Populating the age in the map for our flow ID.

Exercise

At the moment this example uses an in-memory Map implementation of CorrelationIdService to store and retrieve correlation IDs. If this application were deployed in a distributed environment then it would not work in a multi-node setup.

You may have noticed that this sample starts an ActiveMQ container using Testcontainers. Try the following.

  • Tell Testcontainers to also start a database container (RDBMS, MongoDB, etc).

  • Create a new CorrelationIdService implementation which saves and stores correlation IDs to and from your newly. created database container, and wire it into the test

  • Create a new MessageLogger implementation which saves logged messages to this database, and wire it into the test.