Conector de Recepción
Los conectores de recepción son responsables de procesar los mensajes recibidos en un transporte configurado y transformar el mensaje en un tipo conocido, antes de delegar el manejo del mensaje al cliente.
Todos los conectores de recepción implementan el ReceivingConnector interfaz, que a su vez extiende OperableConnector.
public interface ReceivingConnector extends OperableConnector {
ReceivingConnectorConfig getConfig();
}
public interface OperableConnector {
/**
* Retrieve name of the connector.
*/
String getName();
/**
* Starts the connector.
*/
void start();
/**
* Starts the connector's health check procedure.
*/
void startHealthCheck();
ConnectorHealth getHealth();
/**
* Shuts down the connector.
*
* @param reason the reason for shutdown
*/
CompletionStage<Void> shutdown(ShutdownReason reason);
/**
* Returns the connector's running status
*/
boolean isRunning();
/**
* Returns the connector's configuration.
*/
ConnectorConfig getConfig();
/**
* Returns all the connector's transports.
*/
List<? extends OperableConnectorTransport> getTransports();
/**
* Abstraction of a connector's configuration.
*/
interface ConnectorConfig {
String getConfigRoot();
}
}
Tipo Genérico T
El ReceivingConnector la interfaz no se ocupa de los tipos, aunque la implementación predeterminada ReceiveConnector<T> está tipado genéricamente con tipo T, donde T es la abreviatura de 'tipo de objetivo'.
Los mensajes recibidos en la capa de transporte se envían al conector envueltos en un TransportMessage el objeto y se convierten al tipo de destino tan pronto como sea posible.
Esto permite que las etapas futuras trabajen con un tipo conocido que puede validarse opcionalmente para garantizar la integridad de los datos.
Etapas
La implementación del conector utiliza Akka Streams. Los conectores están compuestos por una serie de etapas configurables que se ejecutan de manera asíncrona, donde cada etapa realiza alguna transformación o verificación para asegurar que los mensajes sean válidos antes de delegar el control de nuevo al cliente para manejar el mensaje recibido.
La imagen a continuación describe de manera aproximada el enfoque basado en etapas al recibir mensajes. Tenga en cuenta que algunas etapas son opcionales y se omitirán si no están configuradas cuando se construya el conector.
Las siguientes secciones cubrirán brevemente cada etapa.
Filtrado
Algunos connector transport s como JMS ya tiene la funcionalidad para filtrar un mensaje (usando JMS Selectores), pero otros como Kafka or HTTP no lo haga. Por esta razón, el marco de Conector ofrece una funcionalidad de filtrado.
Para aprender a filtrar mensajes, consulte el Filtrado documentación.
Desencriptación de la carga útil
A veces, los protocolos de cifrado de transporte, como TLS, no son suficientes. En estos casos, se puede aplicar cifrado a nivel de aplicación a los mensajes mismos. Al recibir un mensaje cifrado, su carga útil debe ser descifrada antes de que podamos transformarla al tipo objetivo.
Para aprender a configurar la descifrado, consulte el Cifrado documentación.
Convertir a Tipo de Destino
El payload encontrado en el TransportMessage el objeto pasado de la capa de transporte al conector no tiene tipo.
El conector es responsable de mapping el tipo de carga en un tipo conocido.
Esto se puede lograr proporcionando una implementación del ReceiveTransportMessageConverter al construir un conector de recepción.
@FunctionalInterface
public interface ReceiveTransportMessageConverter<T> {
T convert(TransportMessage transportMessage);
}
Asociación de Mensajes
Los mensajes recibidos son bien una respuesta a una solicitud que hicimos anteriormente o una nueva solicitud. Al configurar un conector para recibir mensajes, típicamente sabemos cuál de estos escenarios se espera. Si un mensaje es una respuesta a una solicitud enviada previamente, entonces debe correlacionarse con el mensaje original utilizando el servicio de correlación. De lo contrario, si el mensaje es una solicitud, debemos generar (o extraer) un identificador para el mensaje de modo que podamos correlacionarlo más tarde.
Para obtener más información sobre la asociación de mensajes, consulte el Asociación de Mensajes documentación.
Message Log ging
Message log El registro es un requisito común al construir integraciones entre múltiples sistemas, ya que puede ayudar en la depuración o proporcionar una auditoría completa de toda la mensajería. Los conectores pueden configurarse opcionalmente para registrar mensajes proporcionando una implementación de la `MessageLogger` interfaz.
Message log ging opera de tres maneras:
-
Con presión de retroceso, ignora fallos (predeterminado)
-
Con presión de retroceso, falla la etapa si el registro falla.
-
Fuego y olvido (no respeta la presión de retroceso)
Puede cambiar el comportamiento predeterminado al crear un ReceiveConnector utilizando withMessageLoggingBehaviour.
Para aprender más sobre message log ging, vea el Message Loggingdocumentación.
Validación de Mensajes
Al recibir mensajes, a menudo tiene sentido asegurarse de que el mensaje sea válido antes de continuar con el procesamiento.
Esto puede ser en términos de validación de esquema o validación de lógica de negocio.
Los conectores pueden configurarse opcionalmente para validar el mensaje proporcionando una implementación de la BeanValidator interfaz.
Para obtener más información sobre la validación de mensajes, consulte el Validación de Mensajes documentación.
Manejo de Mensajes
La etapa final y, posiblemente, la más importante en el flujo del receptor es la etapa de manejo de mensajes. Este es el lugar donde el mensaje, después de pasar por la descifrado, validación, asociación y registro, puede ser finalmente manejado por el código del cliente.
A medida que los mensajes fluyen hacia la etapa de manejo de mensajes, se particionan de acuerdo a su UnitOfWorkId. Hasta receiver-parallelism se permite que las particiones sean procesadas en paralelo, con cada partición permitiendo hasta receiver-parallelism-per-partition mensajes que deben ser manejados en paralelo. Por defecto, no se permite el paralelismo por partición, asegurando así el orden adecuado de los mensajes por UnitOfWorkId. Si el controlador de mensajes asignado no necesita preservar el orden de los mensajes pero requiere un rendimiento adicional,receiver-parallelism-per-partition puede aumentarse de manera segura a cualquier valor inferior a receiver-parallelism.
El manejo del mensaje se realiza proporcionando una implementación de la ReceiveHandler interfaz funcional.
@FunctionalInterface
public interface ReceiveHandler<T> {
CompletionStage<Void> handle(ReceivingContext receivingContext, T payload);
}
La función handle toma un ReceivingContext y la carga útil de tipo genérico y devuelve un futuro vacío.
Resiliencia
La resiliencia de un determinado receive connector es bastante configurable y se basa en dos conceptos: reintentos automáticos y un canal de cartas muertas.
La imagen a continuación muestra algunos escenarios potenciales que pueden encontrarse al procesar un mensaje y los mecanismos utilizados para manejarlos.
Reintentos
Es posible configurar una política de reintentos para los conectores de recepción proporcionando una instancia de ResiliencySettings.
Para más detalles, consulte el Resiliencia documentación.
Los ajustes de resiliencia se utilizan para distinguir entre excepciones recuperables (reintentables) y sus contrapartes. Si se considera que una excepción es recuperable, se intentarán reintentos de acuerdo con la configuración.
Una vez que se han agotado los reintentos y no fue posible la recuperación, el error y el mensaje del conector que lo causó se envían al canal de cartas muertas para su posterior procesamiento y el mensaje se NACKea (se reconoce negativamente).
Carta Muerta
El componente del canal de cartas muertas está representado por el DeadletterAppender interfaz.
@FunctionalInterface
public interface DeadletterAppender {
/**
* Appends the {@link ReceiveConnectorException} to the deadletter queue.
*
* @param receiveConnectorException contains the failed {@link ReceivedMessage message} and the cause of failure
* @return a {@link CompletionStage}
*/
CompletionStage<Void> append(ReceiveConnectorException receiveConnectorException);
}
La idea general detrás del apéndice es permitir a los clientes soportar un procesamiento potencialmente complejo que puede necesitar ocurrir cuando los mensajes no son recibidos. Como persistirlos en un almacenamiento externo,scheduling su reprocesamiento con un distribuido scheduler, etc.