Documentation for a newer release is available. View Latest

Request-Reply Asíncrono

¿Cómo correlaciono mensajes desde mi propio formato interno hacia algún dominio externo, si las respuestas son asíncronas?

Este ejemplo de Connector se conecta a un servicio simulado que toma el nombre de una persona y devuelve su edad. El servicio del otro extremo recibe solicitudes en una cola de solicitudes y envía respuestas de vuelta en una cola de respuestas con un correlation ID para ayudar al consumidor a determinar a qué solicitud pertenece esta respuesta.

Diagram
Esta guía también está disponible en el repositorio Git separado connector-samples aquí.

Correlación

Usamos un correlation service en memoria para almacenar mapeos desde nuestro concepto de ID de flujo "interno" al concepto externo de "correlation ID".

Puede haber múltiples razones por las que no podamos usar nuestro propio ID interno como correlation ID para un servicio externo (de ahí la necesidad de un correlation ID service).

  • No quiero publicar mi ID interno al mundo exterior.

  • Limitaciones técnicas, por ejemplo, los correlation IDs de sistemas externos deben seguir un formato específico.

  • Hay múltiples interacciones con este sistema externo para un único ID interno mío, lo que haría que la correlación no fuera única por invocación individual.

Configuración del Connector

Hemos configurado un par SendConnector y ReceiveConnector. Primero el SendConnector.

        var sendConnector = new SendConnector
                .Builder<AgeRequest, TheirAgeRequest>("OurSender")
                .withActorSystem(actorSystem)
                .withCorrelationService(correlationService)
                .withConnectorTransport(sendingTransport)
                .withMessageLogger(logger())
                .withCorrelationIdExtractor(request -> CorrelationId.of(request.getCorrelationId())) (1)
                .withDomainToTargetTypeConverter(this::createTheirAgeRequest) (2)
                .withSendTransportMessageConverter(toJson()) (3)
                .build();
1 Indicando al SendConnector dónde buscar, en el tipo de mensaje de destino, el correlation ID que se guardará en el Correlation ID service.
2 Creando una solicitud de dominio externo con un número aleatorio como Correlation ID externo.
3 Convirtiendo el tipo POJO de dominio externo a una representación JSON.

Y aquí está el ReceiveConnector.

        new ReceiveConnector
                .Builder<TheirAgeResponse>("OurReceiver")
                .withActorSystem(actorSystem)
                .withConnectorTransport(receivingTransport)
                .withMessageLogger(logger())
                .withCorrelationService(correlationService)
                .withCorrelationIdExtractor(response -> CorrelationId.of(response.getCorrelationId())) (1)
                .withReceiveTransportMessageConverter(fromJson(TheirAgeResponse.class)) (2)
                .withManualStart(false)
                .withReceiveHandler((receivingContext, response) -> {
                    var processingContext = receivingContext.getProcessingContext();
                    ageMap.put(processingContext.getUnitOfWorkId(), response.getAge()); (3)
                    return CompletableFuture.completedFuture(null);
                })
                .build();
1 Función para indicar al ReceiveConnector dónde, en el mensaje de respuesta, buscar el correlation ID.
2 Convirtiendo el TransportMessage de JSON a un POJO TheirAgeResponse.
3 Rellenando la edad en el mapa para nuestro flow ID.

Ejercicio

Por el momento, este ejemplo usa una implementación Map en memoria de CorrelationIdService para almacenar y recuperar correlation IDs. Si esta aplicación se desplegara en un entorno distribuido, entonces no funcionaría en una configuración multinodo.

Habrá notado que este ejemplo inicia un contenedor de ActiveMQ usando Testcontainers. Pruebe lo siguiente.

  • Indique a Testcontainers que también inicie un contenedor de base de datos (RDBMS, MongoDB, etc.).

  • Cree una nueva implementación de CorrelationIdService que guarde y recupere correlation IDs hacia y desde su nuevo contenedor de base de datos, e inyéctela en la prueba.

  • Cree una nueva implementación de MessageLogger que guarde los mensajes registrados en esta base de datos, e inyéctela en la prueba.