Sending Connector
Los sending connectors son responsables de tomar mensajes suministrados por un cliente y enviarlos sobre el mecanismo de transport configurado.
Todos los sending connectors implementan la interfaz SendingConnector, que a su vez extiende OperableConnector.
public interface SendingConnector<D, R> extends OperableConnector {
/**
* Send a request via the connector without a supporting context
*/
default CompletionStage<R> send(@NotNull ProcessingContext processingContext,
D domainMessage) {
return send(processingContext, domainMessage, SupportingContext.empty());
}
/**
* Send a request via the connector with a full sending context
* that allows users to supply a supporting context as well
*/
CompletionStage<R> send(@NotNull ProcessingContext processingContext,
D domainMessage,
@NotNull SupportingContext supportingContext);
/**
* Allows setting of the meterRegistry implementation for a connector
* after connector build
* This allows implementations to set the registry for all SendingConnector instances
* at once instead of having to set them at a per connector bases at build time
*/
void setMeterRegistry(MeterRegistry meterRegistry);
SendingConnectorConfig getConfig();
}
public interface OperableConnector {
/**
* Retrieve name of the connector.
*/
String getName();
/**
* Starts the connector.
*/
void start();
/**
* Starts the connector's health check procedure.
*/
void startHealthCheck();
ConnectorHealth getHealth();
/**
* Shuts down the connector.
*
* @param reason the reason for shutdown
*/
CompletionStage<Void> shutdown(ShutdownReason reason);
/**
* Returns the connector's running status
*/
boolean isRunning();
/**
* Returns the connector's configuration.
*/
ConnectorConfig getConfig();
/**
* Returns all the connector's transports.
*/
List<? extends OperableConnectorTransport> getTransports();
/**
* Abstraction of a connector's configuration.
*/
interface ConnectorConfig {
String getConfigRoot();
}
}
Tipos genéricos D y T
Comencemos describiendo qué significan los genéricos D y T.
-
Des el tipo de Dominio. Este es un mensaje en nuestro modelo de datos canónico "interno" -
Tes el tipo Target (objetivo). Este es un mensaje convertido al modelo de datos "externo" con el que nos estamos integrando.
Etapas
La implementación del connector utiliza Akka Streams. Los connectors se componen de varias etapas configurables que se ejecutan de forma asíncrona, donde cada etapa realiza alguna transformación o comprobación para asegurar que los mensajes sean válidos antes de enviarlos al mecanismo de transport. La imagen siguiente describe de forma aproximada el enfoque basado en etapas al enviar mensajes. Tenga en cuenta que algunas etapas son opcionales y se omitirán si no se configuran cuando se construye el connector.
Las siguientes secciones cubren brevemente cada etapa.
Validación de Mensajes
Al enviar mensajes, puede que necesitemos comprobar la validez del payload, especialmente si parte de él es generado por usuarios, ya que podría contener errores.
Los connectors pueden configurarse opcionalmente para validar mensajes proporcionando una implementación de la interfaz Validator.
Para saber más sobre la validación de mensajes, vea la Message Validation.
Correlación de Mensajes
Una de las limitaciones de enviar y recibir mensajes de manera asíncrona es que las respuestas deben correlacionarse con los mensajes originales.
Esto puede lograrse proporcionando una implementación de la interfaz CorrelationService al construir un sending connector.
Los detalles de correlación se persistirán antes de enviar el mensaje y luego podrán recuperarse cuando la respuesta sea recibida en un receiving connector separado.
Para saber más sobre la correlación de mensajes, vea la Message Association.
Registro de Mensajes
El registro de mensajes es un requerimiento común al construir integraciones entre múltiples sistemas, ya que puede ayudar en la depuración o proporcionar una auditoría completa de toda la mensajería.
Los connectors pueden configurarse opcionalmente para registrar mensajes proporcionando una implementación de la interfaz MessageLogger.
El registro de mensajes funciona de tres formas:
-
Con backpressure, ignora fallos (predeterminado)
-
Con backpressure, falla la etapa si el logging falla
-
Fire-and-forget (no respeta backpressure)
Puede cambiar el comportamiento predeterminado al crear un SendConnector usando withMessageLoggingBehaviour.
Para saber más sobre el registro de mensajes, vea la Message Logging.
Cifrado del payload
A veces los protocolos de cifrado a nivel de transport, como TLS, no son suficientes. En estos casos se puede aplicar cifrado a nivel de aplicación a los propios mensajes. Al enviar un mensaje, su payload puede cifrarse opcionalmente antes de enviarlo por el transport.
Para saber cómo configurar el cifrado, vea la Encryption.
Envío resiliente
La etapa final y única obligatoria en el flujo de un sending connector es la etapa de envío resiliente. Aquí es donde podemos configurar cómo manejar escenarios de fallo típicos al enviar mensajes sobre redes, utilizando estrategias de resiliencia, como reintentos, circuit breaking y re-enrutamiento.
Para saber cómo configurar los ajustes de resiliencia, vea la Resilience.
A medida que los mensajes fluyen a la etapa de envío resiliente, se particionan según su UnitOfWorkId.
Se permite procesar en paralelo hasta parallelism particiones, con cada partición permitiendo hasta parallelism-per-partition mensajes en paralelo.
De forma predeterminada, no se permite paralelismo por partición, asegurando así el orden adecuado de los mensajes por UnitOfWorkId.
Si el destino no necesita recibir mensajes en el orden original pero el emisor necesita más throughput, parallelism-per-partition puede aumentarse con seguridad a cualquier valor menor que parallelism.
Múltiples Connector Transports
Algunos sistemas representados por un connector pueden alcanzarse a través de múltiples transports. Ejemplos de esto podrían ser una serie de gestores de colas MQ no clusterizados o brokers de Kafka. Para atender este caso de uso, es posible pasar una lista de connector transports al construir un sending connector.
SendConnector<ExampleType, ExampleType> connector = SendConnector
.<ExampleType, ExampleType>builder("ExampleSystem")
.withConnectorTransports(List.of(transport1, transport2)) (1)
.withRoutingLogic(routingLogic) (2)
.withQueueSize(0)
.withMaxConcurrentOffers(1)
.withSendTransportMessageConverter(converter)
.withCorrelationIdExtractor(correlationIdExtractor)
.withActorSystem(actorSystem)
.build();
| 1 | Pasa transport1 y transport2 al connector. |
| 2 | Configura la estrategia de enrutamiento. |
La combinación de withRoutingLogic y withConnectorTransports permite el balanceo de carga de solicitudes sobre un connector a través de múltiples transports, usando diversas implementaciones de lógica de enrutamiento.
La lógica de enrutamiento predeterminada es round-robin.
Para más detalles sobre este tema, vea la sección Routing Logic en la documentación de Resilience.
Exactamente uno de los métodos del builder withConnectorTransports o withConnectorTransport debe llamarse.
Si se usan ambos, se lanzará una IllegalArgumentException al llamar al método build.
Backpressure
| El backpressure se trata en la página de documentación Backpressure; consúltela para más detalles sobre este tema. |
Los streams se inician (materializan) con una cola. Esto significa que es posible configurar la cantidad de trabajo que un connector, y por asociación el sistema con el que se comunica, puede manejar a la vez. Esto es útil en situaciones de backpressure donde hay un productor rápido que produce a una tasa mucho más alta de la que el sistema aguas abajo (representado por este connector) puede manejar.
El tamaño de la cola es configurable usando el método del builder withQueueSize.
SendConnector<ExampleType, ExampleType> sendConnector;
sendConnector = new SendConnector
.Builder<ExampleType, ExampleType>(connectorName)
.withQueueSize(1) (1)
.withConnectorTransport(transport)
.withSendTransportMessageConverter(messageConverter)
.withCorrelationIdExtractor(correlationIdExtractor)
.withActorSystem(actorSystem)
.build();
-
El argumento de tamaño de la cola.
Es posible obtener este valor desde un placeholder, por ejemplo, para permitir reemplazarlo desde un archivo de configuración.
Si no se proporciona un tamaño de cola, entonces el tamaño predeterminado está definido por DEFAULT_BUFFER_SIZE, que puede encontrarse en la interfaz SendingConnector.
int DEFAULT_BUFFER_SIZE = 50;
int DEFAULT_MAX_CONCURRENT_OFFERS = 500;