Solicitud-Respuesta Asincrónica

¿Cómo puedo correlacionar mensajes de mi propio formato interno a algún dominio externo, si las respuestas son asíncronas?

Este ejemplo de conector se conecta a un servicio ficticio que toma el nombre de una persona y devuelve su edad. El servicio en el otro extremo recibe solicitudes en una cola de solicitudes y envía respuestas de vuelta en una cola de respuestas con un ID de correlación para ayudar al consumidor a determinar a qué solicitud corresponde esta respuesta.

Diagram
Esta guía también está disponible en el separado connector-samples Repositorio de Git aquí.

Correlación

Utilizamos un servicio de correlación en memoria para almacenar mappings de nuestro concepto de ID de flujo "interno" al concepto de "ID de correlación" externo.

Puede haber múltiples razones por las cuales no podemos utilizar nuestro propio ID interno como el ID de correlación para un servicio externo (de ahí la necesidad de un servicio de ID de correlación).

  • No deseo publicar mi ID interno al mundo exterior.

  • Limitaciones técnicas, por ejemplo, los IDs de correlación de sistemas externos deben seguir un formato específico.

  • Existen múltiples interacciones con este sistema externo para un único ID interno, lo que haría que la correlación no sea única por cada invocación individual.

Configuración del conector

Hemos configurado un SendConnector y par de ReceiveConnector. Primero el SendConnector.

        var sendConnector = new SendConnector
                .Builder<AgeRequest, TheirAgeRequest>("OurSender")
                .withActorSystem(actorSystem)
                .withCorrelationService(correlationService)
                .withConnectorTransport(sendingTransport)
                .withMessageLogger(logger())
                .withCorrelationIdExtractor(request -> CorrelationId.of(request.getCorrelationId())) (1)
                .withDomainToTargetTypeConverter(this::createTheirAgeRequest) (2)
                .withSendTransportMessageConverter(toJson()) (3)
                .build();
1 Contando el SendConnector dónde buscar en el objetivo message type para el ID de correlación que se debe guardar en el servicio de ID de correlación.
2 Creando una solicitud de dominio externo con un número aleatorio como el ID de correlación externo.
3 Convirtiendo el tipo POJO del dominio externo a un JSON representación.

Y aquí está el ReceiveConnector.

        new ReceiveConnector
                .Builder<TheirAgeResponse>("OurReceiver")
                .withActorSystem(actorSystem)
                .withConnectorTransport(receivingTransport)
                .withMessageLogger(logger())
                .withCorrelationService(correlationService)
                .withCorrelationIdExtractor(response -> CorrelationId.of(response.getCorrelationId())) (1)
                .withReceiveTransportMessageConverter(fromJson(TheirAgeResponse.class)) (2)
                .withManualStart(false)
                .withReceiveHandler((receivingContext, response) -> {
                    var processingContext = receivingContext.getProcessingContext();
                    ageMap.put(processingContext.getUnitOfWorkId(), response.getAge()); (3)
                    return CompletableFuture.completedFuture(null);
                })
                .build();
1 Función para indicar al ReceiveConnector dónde en el mensaje de respuesta buscar el ID de correlación.
2 Convirtiendo el TransportMessage from JSON a un TheirAgeResponse POJO.
3 Población de la edad en el mapa para nuestro ID de flujo.

Ejercicio

En este momento, este ejemplo utiliza un almacenamiento en memoria Map implementación de CorrelationIdService para almacenar y recuperar identificadores de correlación. Si esta aplicación se implementara en un entorno distribuido, entonces no funcionaría en una configuración de múltiples nodos.

Puede haber notado que este ejemplo comienza un ActiveMQ contenedor usando Testcontainers. Intente lo siguiente.

  • Indique a Testcontainers que también inicie un contenedor de base de datos (RDBMS, MongoDB, etc).

  • Crear un nuevo CorrelationIdService implementación que guarda y almacena los IDs de correlación hacia y desde su nuevo creó un contenedor de base de datos y lo conectó a la prueba.

  • Crear un nuevo MessageLogger implementación que guarda los mensajes registrados en esta base de datos y lo conecta a la prueba.