Asociación de Mensajes
En el contexto de los connectors, la asociación es un hiperónimo (término paraguas) para correlación e identificación. La correlación se usa para relacionar una respuesta con un mensaje enviado de forma asíncrona por otro proceso, mientras que la identificación se usa para crear una identidad para un mensaje nuevo que no tiene relación con ningún mensaje previo enviado.
Correlación
Extracción del identificador de correlación
Al enviar o recibir un mensaje que requiere correlación, debe proporcionarse una implementación de CorrelationIdExtractor o ConnectorMessageCorrelationIdExtractor.
Ambas son interfaces funcionales, donde CorrelationIdExtractor puede extraer desde el tipo de mensaje objetivo y ConnectorMessageCorrelationIdExtractor tiene acceso a cabeceras pasadas desde la capa de transport que podrían usarse para almacenar el identificador de correlación.
En ambos casos devolvemos una instancia de CorrelationId que se usa para persistir o recuperar el contexto.
@FunctionalInterface
public interface ConnectorMessageCorrelationIdExtractor<T> {
CorrelationId extract(ConnectorMessage<T> connectorMessage);
}
@FunctionalInterface
public interface CorrelationIdExtractor<T> {
CorrelationId extract(T payload);
static <T> ConnectorMessageCorrelationIdExtractor<T>
forConnectorMessage(CorrelationIdExtractor<T> correlationIdExtractor) {
return connectorMessage ->
correlationIdExtractor.extract(connectorMessage.getTargetMessage());
}
}
| Se recomienda encarecidamente utilizar extractores que proporcionen IDs de correlación únicos por par request-reply. La implementación predeterminada del correlation service relacionará una respuesta con la última solicitud realizada con el mismo ID de correlación, pero en raros escenarios esa relación podría no ser correcta debido a desfases de reloj entre servidores que crearon las entradas de correlación. |
Correlación al enviar
Los mensajes enviados usando un sending connector deben pasar un ProcessingContext y opcionalmente también un SupportingContext; vea la documentación de Send Connector para más detalles al respecto.
Ambos objetos de contexto se persistirán mediante una implementación de CorrelationService, cuya finalidad es almacenar el contexto en un datastore asociado a un identificador de correlación.
La implementación de datastore predeterminada usada por el correlation service es MongoDB, aunque puede reemplazarse por cualquier otro repositorio, p. ej. Redis, PostgreSQL.
Correlación al recibir
Cuando se reciben mensajes en respuesta a otro mensaje enviado en un proceso separado, siempre que el receiving connector se haya configurado con un extractor de correlación y un correlation service, se realizará la correlación.
El identificador de correlación se extraerá del mensaje recibido y se usará para recuperar el contexto que se persistió durante la solicitud de envío. Si se encuentra el contexto, se pasa a etapas de procesamiento posteriores; de lo contrario, se lanza una excepción y se pasa a los manejadores de errores.
Si el mensaje recibido contiene un processing context, p. ej. como resultado de la etapa de identificación, cualquier processing context contenido en la correlación se fusionará con el contexto del mensaje. Consulte la sección Fusión de contextos para más información.
Time To Live (TTL)
El correlation service de MongoDB, por defecto, persistirá entradas durante 3600 segundos (60 minutos) basándose en el campo creationDate. Esto lo hace creando un índice con TTL en la colección de MongoDB. Una vez expirado el tiempo, las entradas serán expulsadas de la colección y dejarán de estar disponibles. Si desea cambiar el campo o el período de expiración, puede establecer las siguientes propiedades en su archivo application.conf:
ipf.connector.correlation-expiry = 3600
ipf.connector.correlation-timestamp-field-name = "creationDate"
Identificación
La identificación se utiliza normalmente para solicitudes iniciadoras, es decir, no respuestas a mensajes enviados previamente. En este caso, necesitan generar un ProcessingContext que se usará en adelante, ya que no hay un contexto que recuperar del correlation service.
En algunos casos el mensaje podría estar correlacionado de alguna manera con mensajes previos y el ProcessingContext generado podría necesitar reflejar esto.
La identificación también se admite en escenarios de respuesta en los que probablemente también exista una correlación, p. ej. el mensaje de respuesta contiene un processing context. En este caso, la identificación construye un processing context a partir del mensaje de respuesta y este se fusiona con el contexto de correlación. Consulte la sección Fusión de contextos para más información.
Configurar la identificación para un receiving connector solo requiere una implementación de ProcessingContextExtractor. Esto hace que la identificación sea flexible ya que el contexto puede generarse aleatoriamente o derivarse del mensaje recibido.
@FunctionalInterface
public interface ProcessingContextExtractor<T> {
ProcessingContext extract(ConnectorMessage<T> connectorMessage);
}
Clase auxiliar
InitiatingProcessingContextExtractor es una implementación de ProcessingContextExtractor cuyo objetivo es reducir el boilerplate, especialmente en el caso en que desea generar total o parcialmente valores aleatorios para los campos del processing context.
Para el caso más simple puede usarse el método de fábrica estático generateRandom.
Independientemente de la entrada, generará un ProcessingContext con un UnitOfWorkId generado aleatoriamente; y ProcessingEntity y ClientRequestId desconocidos.
ProcessingContextExtractor<TestObject> extractor = InitiatingProcessingContextExtractor.generateRandom();
Alternativamente, el extractor puede construirse utilizando el patrón builder y se pueden suministrar extractores individuales para cada campo; de lo contrario, se usarán los valores predeterminados.
var extractor = InitiatingProcessingContextExtractor.<TestObject>builder()
.unitOfWorkIdExtractor(message -> UnitOfWorkId.of(message.getTargetMessage().unitOfWorkId))
.clientRequestIdExtractor(message -> ClientRequestId.of(message.getTargetMessage().clientRequestId))
.processingEntityExtractor(message -> ProcessingEntity.of(message.getTargetMessage().processingEntity))
.build();
Como a menudo el ProcessingEntity no cambia entre solicitudes y permanece estático, existe un método de fábrica estático para suministrar un ProcessingEntity fijo y así reducir el código boilerplate.
private static final ProcessingEntity PROCESSING_ENTITY = ProcessingEntity.of("processingEntity");
var extractor = InitiatingProcessingContextExtractor.<TestObject>builder()
.processingEntityExtractor(staticSupplier(PROCESSING_ENTITY))
.build();
Fusión de contextos
Cuando se aplican tanto identificación como correlación a un receive connector, donde el mensaje recibido es una respuesta a una solicitud previa, el contexto almacenado en la correlación de la solicitud original y el contexto del mensaje recibido se fusionan.
Ejemplo:
-
El Sistema A envía una solicitud al Sistema B, y se almacena una correlación que contiene el processing context actual
-
El Sistema B envía una respuesta al Sistema A, con un processing context que contiene valores diferentes (mismo uowId)
-
El contexto que se recibe se fusiona en el contexto almacenado en la correlación
El contexto de correlación se usa como base, y el contexto del mensaje recibido se fusiona en él.
Ambos contextos deben tener el mismo unitOfWorkId o se lanzará una excepción.
Se usan las siguientes reglas al fusionar campos del processing context.
-
El
checkpointse fusiona cuando no esnull -
El
clientRequestIdse fusiona cuando no esUNKNOWN -
El
processingEntityse fusiona cuando no esUNKNOWN -
El
associationIdNO se fusiona, incluso si el valor de la correlación esUNKNOWN
¿Cuándo no ocurre la fusión?
Si la etapa de correlación está deshabilitada, o se omite para el mensaje recibido, se usa en su totalidad el processing context del mensaje recibido. No se realiza ninguna fusión.
Si la etapa de identificación está deshabilitada, se usa en su totalidad el processing context de la correlación. No se realiza ninguna fusión.