Asociación de Mensajes

En el contexto de conectores, asociación es un hipónimo (término general) para correlación e identificación. La correlación se utiliza para relacionar una respuesta con un mensaje enviado de manera asíncrona por otro proceso, mientras que la identificación se utiliza para crear una identidad para un nuevo mensaje que no tiene relación con ningún mensaje anterior enviado.

Correlación

Extracción del Identificador de Correlación

Al enviar o recibir un mensaje que requiere correlación, una implementación de CorrelationIdExtractor or ConnectorMessageCorrelationIdExtractor debe ser proporcionado.

Estos son ambos interfaces funcionales, donde el CorrelationIdExtractor puede extraer del objetivo message type y el ConnectorMessageCorrelationIdExtractor tiene acceso a los encabezados transmitidos desde la capa de transporte que podrían ser utilizados para almacenar el identificador de correlación. En ambos casos, devolvemos una instancia de CorrelationId que se utiliza para persistir o recuperar el contexto.

@FunctionalInterface
public interface ConnectorMessageCorrelationIdExtractor<T> {
    CorrelationId extract(ConnectorMessage<T> connectorMessage);
}

@FunctionalInterface
public interface CorrelationIdExtractor<T> {
    CorrelationId extract(T payload);

    static <T> ConnectorMessageCorrelationIdExtractor<T>
    forConnectorMessage(CorrelationIdExtractor<T> correlationIdExtractor) {
        return connectorMessage ->
                correlationIdExtractor.extract(connectorMessage.getTargetMessage());
    }
}
Se recomienda encarecidamente utilizar extractores que proporcionen identificadores de correlación únicos por cada par de solicitud-respuesta. La implementación del servicio de correlación por defecto emparejará una respuesta con la última solicitud realizada con el mismo identificador de correlación, pero en escenarios raros, esa coincidencia puede no ser correcta debido a la deriva del reloj entre los servidores que crearon las entradas de correlación.

Enviar Correlación

Mensajes enviados utilizando un sending connector debe pasar un ProcessingContext y puede opcionalmente pasar un SupportingContext también, vea el Send Connector documentación para más detalles al respecto.

Ambos estos objetos de contexto serán persistidos a través de una implementación de CorrelationService, cuyo propósito es almacenar el contexto en un almacén de datos contra un identificador de correlación. La implementación de almacenamiento de datos predeterminada utilizada para el servicio de correlación es MongoDB, aunque esto puede ser reemplazado por cualquier otro repositorio, por ejemplo, Redis, PostgreSQL.

Recibir Correlación

Cuando se reciben mensajes en respuesta a otro mensaje enviado en un proceso separado, siempre que el conector receptor haya sido configurado con un extractor de correlación y un servicio de correlación, se realizará la correlación.

El identificador de correlación se extraerá del mensaje recibido y se utilizará para recuperar el contexto que se persistió durante la solicitud de envío. Si se encuentra el contexto, se pasa a las etapas de procesamiento posteriores; de lo contrario, se lanza una excepción y se pasa a los controladores de errores.

Si el mensaje recibido contiene un contexto de procesamiento, por ejemplo, como resultado de la etapa de identificación, cualquier contexto de procesamiento contenido en la correlación se fusionará con el contexto del mensaje. Consulte el [Context Merging] sección para más información.

Tiempo de Vida (TTL)

El MongoDB El servicio de correlación, por defecto, persistirá las entradas durante 3600 segundos (60 minutos) basándose en el campo creation Date. Esto se realiza creando un índice con un TTL en el MongoDB colección. Una vez que el tiempo haya expirado, las entradas serán expulsadas de la colección y ya no estarán disponibles. Si desea cambiar el campo o el período de expiración, puede establecer las siguientes propiedades en su application.conf archivo:

ipf.connector.correlation.time-to-live = 3600
ipf.connector.correlation.timestamp-field-name = "creationDate"

Identificación

La identificación se utiliza típicamente para iniciar solicitudes, es decir, no para respuestas a mensajes enviados previamente. En este caso, deben generar un ProcessingContext que se utilizará en adelante, ya que no hay contexto del cual obtener información del servicio de correlación.

En algunos casos, el mensaje puede estar correlacionado de alguna manera con mensajes anteriores y el ProcessingContext generado puede necesitar reflejar esto.

La identificación también se admite en escenarios de respuesta donde es probable que exista una correlación, por ejemplo, el mensaje de respuesta contiene un contexto de procesamiento. En este caso, la identificación construye un contexto de procesamiento a partir del mensaje de respuesta, y este se fusiona con el contexto de correlación. Consulte el [Context Merging] sección para más información.

Configurar la identificación para un conector de recepción solo requiere una implementación de ProcessingContextExtractor. Esto hace que la identificación sea flexible, ya que el contexto puede ser generado aleatoriamente o derivarse del mensaje recibido.

@FunctionalInterface
public interface ProcessingContextExtractor<T> {
    ProcessingContext extract(ConnectorMessage<T> connectorMessage);
}

Clase Ayudante

El InitiatingProcessingContextExtractor es una implementación de ProcessingContextExtractor que tiene como objetivo reducir el código repetitivo, especialmente en el caso en que usted desea generar valores aleatorios de manera total o parcial para los campos del contexto de procesamiento.

Para el caso más simple, el generateRandom se puede utilizar un método de fábrica estático. Independientemente de la entrada, generará un ProcessingContext con un generado aleatoriamente. UnitOfWorkId; y ProcessingEntity desconocido y ClientRequestId.

ProcessingContextExtractor<TestObject> extractor = InitiatingProcessingContextExtractor.generateRandom();

Alternativamente, el extractor puede ser construido utilizando el patrón de constructor y se pueden suministrar extractores individuales para cada campo; de lo contrario, se utilizarán los valores predeterminados.

var extractor = InitiatingProcessingContextExtractor.<TestObject>builder()
        .unitOfWorkIdExtractor(message -> UnitOfWorkId.of(message.getTargetMessage().unitOfWorkId))
        .clientRequestIdExtractor(message -> ClientRequestId.of(message.getTargetMessage().clientRequestId))
        .processingEntityExtractor(message -> ProcessingEntity.of(message.getTargetMessage().processingEntity))
        .build();

Dado que a menudo ocurre que la ProcessingEntity no cambia entre solicitudes y permanece estática, se ha definido un método de fábrica estático para proporcionar una ProcessingEntity estática y reducir el código repetitivo.

private static final ProcessingEntity PROCESSING_ENTITY = ProcessingEntity.of("processingEntity");
var extractor = InitiatingProcessingContextExtractor.<TestObject>builder()
        .processingEntityExtractor(staticSupplier(PROCESSING_ENTITY))
        .build();

Asociación de Mensajes Mixtos

El enfoque predeterminado para la asociación de mensajes funciona de la siguiente manera:

  • Si configura únicamente la identificación, los mensajes se identifican como nuevos mensajes de inicio.

  • Si configura tanto la correlación como la identificación, los mensajes son primero identificados y luego correlacionados con los mensajes enviados previamente.

Esto cubre la mayoría de los casos de uso, pero algunos escenarios requieren utilizar el mismo transporte para recibir tanto mensajes correlacionados (respuestas a solicitudes anteriores) como mensajes no correlacionados (solicitudes iniciales).

El comportamiento estándar no puede manejar este escenario mixto porque, cuando se configura la correlación, generará un CorrelationNotFoundException si no existe correlación para un mensaje, lo que provoca que el receptor falle. Esto significa que no puede recibir nuevos mensajes de inicio en un transporte que también está configurado para la correlación.

Para habilitar el manejo de mensajes mixtos en un único transporte, puede configurar el conector para intentar la correlación primero y retroceder de manera elegante a la identificación cuando no se encuentra correlación:

receiveConnector = new ReceiveConnector. Builder<MyMessageType>("my-connector")
        .withConnectorTransport(transport)
        .withReceiveTransportMessageConverter(converter)
        .withReceiveHandler(handler)
        .withCorrelationIdExtractor(message -> CorrelationId.of(message.getCorrelationId()))
        .withCorrelationService(correlationService)
        .withProcessingContextExtractor(processingContextExtractor)
        .supportBothResponseAndInitiatingMessages(true) (1)
        .build();
1 Trata CorrelationNotFoundException como se esperaba y vuelve a la identificación

Con esta configuración, el conector puede manejar ambos:

  • Mensajes de respuesta que correlacionan con solicitudes enviadas previamente

  • Nuevos mensajes de inicio que no tienen correlación (sin fallar)

Esto permite casos de uso como una única cola o HTTP punto final que recibe tanto respuestas de pago como nuevas solicitudes de pago.

Fusión de Contexto

Cuando se aplican tanto la identificación como la correlación a un receive connector donde el mensaje recibido es una respuesta a una solicitud anterior, el contexto almacenado en la correlación de la solicitud original y el contexto del mensaje recibido se fusionan.

Ejemplo:

  1. El Sistema A envía una solicitud al Sistema B, y se almacena una correlación que contiene el contexto de procesamiento actual.

  2. El Sistema B envía una respuesta al Sistema A, con un contexto de procesamiento que contiene diferentes valores (mismo uowId).

  3. El contexto que se recibe se fusiona con el contexto almacenado en la correlación.

@startuml

database "Correlation Store"  as Database
participant "System A" as SystemA
participant "System B" as SystemB

SystemA -> SystemB: Request
rnote over SystemA, SystemB
  context: (checkpoint = 1, associationId = abc, clientRequestId = UNKNOWN)
endrnote

SystemA -> Database: Correlation

rnote over Database
  context: (checkpoint = 1, associationId = abc, clientRequestId = UNKNOWN)
endrnote

SystemB -> SystemB: Some Work

rnote over SystemB
  context: (checkpoint = 6, associationId = def, clientRequestId = xyz)
endrnote

SystemB -> SystemA: Response {context = (checkpoint = 6)}

rnote over SystemA, SystemB
  context: (checkpoint = 6, associationId = def, clientRequestId = xyz)
endrnote

SystemA -> Database: Fetch Correlation
SystemA <- Database: Found Correlation

rnote over SystemA
  context: (checkpoint = 1, associationId = abc, clientRequestId = UNKNOWN)
endrnote

SystemA -> SystemA: Merge Contexts

rnote over SystemA
  context: (checkpoint = 6, associationId = abc, clientRequestId = xyz)
endrnote

SystemA -> SystemA: Handle Response

@enduml

El contexto de correlación se utiliza como base, y el contexto del mensaje recibido se fusiona en él. Ambos contextos deben tener el mismo unitOfWorkId o se lanza una excepción. Se utilizan las siguientes reglas al fusionar los campos del contexto de procesamiento.

  1. El checkpoint se fusiona cuando no lo está null

  2. El clientRequestId se fusiona cuando no lo está UNKNOWN

  3. El processingEntity se fusiona cuando no lo está UNKNOWN

  4. El associationId no está NO fusionado, incluso si el valor de correlación es UNKNOWN

¿Cuándo no ocurre la fusión?

Si la etapa de correlación está deshabilitada o se omite para el mensaje recibido, el contexto de procesamiento del mensaje recibido se utiliza en su totalidad. No se realiza ninguna fusión.

Si la etapa de identificación está deshabilitada, se utiliza el contexto de procesamiento de la correlación en su totalidad. No se realiza ninguna fusión.