Sending Connector
Sending connector s son responsables de tomar los mensajes proporcionados por un cliente y enviarlos a través del mecanismo de transporte configurado.
Todo sending connector s implement the SendingConnector interfaz, que a su vez extiende OperableConnector.
public interface SendingConnector<D, R> extends OperableConnector {
int DEFAULT_BUFFER_SIZE = 50;
int DEFAULT_MAX_CONCURRENT_OFFERS = 500;
/**
* Send a request via the connector without a supporting context
*/
default CompletionStage<R> send(@NotNull ProcessingContext processingContext,
D domainMessage) {
return send(processingContext, domainMessage, SupportingContext.empty());
}
/**
* Send a request via the connector with a full sending context
* that allows users to supply a supporting context as well
*/
CompletionStage<R> send(@NotNull ProcessingContext processingContext,
D domainMessage,
@NotNull SupportingContext supportingContext);
/**
* Allows setting of the meterRegistry implementation for a connector
* after connector build
* This allows implementations to set the registry for all SendingConnector instances
* at once instead of having to set them at a per connector bases at build time
*/
void setMeterRegistry(MeterRegistry meterRegistry);
SendingConnectorConfig getConfig();
}
public interface OperableConnector {
/**
* Retrieve name of the connector.
*/
String getName();
/**
* Starts the connector.
*/
void start();
/**
* Starts the connector's health check procedure.
*/
void startHealthCheck();
ConnectorHealth getHealth();
/**
* Shuts down the connector.
*
* @param reason the reason for shutdown
*/
CompletionStage<Void> shutdown(ShutdownReason reason);
/**
* Returns the connector's running status
*/
boolean isRunning();
/**
* Returns the connector's configuration.
*/
ConnectorConfig getConfig();
/**
* Returns all the connector's transports.
*/
List<? extends OperableConnectorTransport> getTransports();
/**
* Abstraction of a connector's configuration.
*/
interface ConnectorConfig {
String getConfigRoot();
}
}
Tipos genéricos D y T
Primero comencemos describiendo qué es el D y T media genérica.
-
Des el tipo de Dominio. Este es un mensaje en nuestro modelo de datos canónico "interno". -
Tes el tipo de Target. Este es un mensaje convertido al modelo de datos "externo" con el cual estamos integrando.== Etapas
La implementación del conector utiliza Akka Streams. Los conectores están compuestos por una serie de etapas configurables que se ejecutan de manera asíncrona, donde cada etapa realiza alguna transformación o verificación para asegurar que los mensajes sean válidos antes de enviarlos al mecanismo de transporte. La imagen a continuación describe de manera aproximada el enfoque basado en etapas al enviar mensajes. Tenga en cuenta que algunas etapas son opcionales y se omitirán si no están configuradas cuando se construya el conector.
Las siguientes secciones cubrirán brevemente cada etapa.
Validación de Mensajes
Al enviar mensajes, puede que necesitemos verificar la validez de la carga útil, especialmente si alguna de ella es generada por el usuario, ya que esto podría contener errores.
Los conectores pueden configurarse opcionalmente para validar el mensaje proporcionando una implementación de la Validator interfaz.
Para obtener más información sobre la validación de mensajes, consulte el Validación de Mensajes documentación.
Correlación de Mensajes
Una de las limitaciones en el envío y recepción de mensajes de manera asíncrona es que las respuestas a los mensajes deben correlacionarse entre sí.
Esto se puede lograr proporcionando una implementación del CorrelationService interfaz al construir un sending connector.
Los detalles de correlación se persistirán antes de enviar el mensaje y luego podrán ser recuperados más tarde cuando se reciba la respuesta en un conector de recepción separado.
Para obtener más información sobre la correlación de mensajes, consulte el Asociación de Mensajes documentación.
Message Log ging
Message log El registro es un requisito común al construir integraciones entre múltiples sistemas, ya que puede ayudar en la depuración o proporcionar una auditoría completa de toda la mensajería. Los conectores pueden configurarse opcionalmente para registrar mensajes proporcionando una implementación de la `MessageLogger` interfaz.
Message log ging opera de tres maneras:
-
Con presión de retroceso, ignora fallos (predeterminado)
-
Con presión de retroceso, falla la etapa si el registro falla.
-
Fuego y olvido (no respeta la presión de retroceso)
Puede cambiar el comportamiento predeterminado al crear un SendConnector utilizando withMessageLoggingBehaviour.
Para aprender más sobre message log ging, vea el Message Loggingdocumentación.
Cifrado de Carga Útil
A veces, los protocolos de cifrado de transporte, como TLS, no son suficientes. En estos casos, se puede aplicar cifrado a nivel de aplicación a los mensajes mismos. Al enviar un mensaje, su carga útil puede ser opcionalmente cifrada antes de ser enviada a través del transporte.
Para aprender a configurar la encriptación, consulte el Cifrado documentación.
Envío Resiliente
La etapa final y única obligatoria en el sending connector el flujo es la etapa de envío resiliente. Aquí es donde podemos configurar cómo manejar los escenarios de fallo que se encuentran típicamente al enviar mensajes a través de redes, utilizando estrategias de resiliencia, como reintentos, interrupción de circuitos y reenvío.
Para aprender a configurar los ajustes de resiliencia, consulte el Resiliencia documentación.
A medida que los mensajes fluyen hacia la etapa de envío resiliente, se particionan de acuerdo con su UnitOfWorkId. Hasta parallelism se permite que las particiones sean procesadas en paralelo, con cada partición permitiendo hasta parallelism-per-partition mensajes que deben ser manejados en paralelo. Por defecto, no se permite el paralelismo por partición, asegurando así el orden adecuado de los mensajes por UnitOfWorkId. Si el destino objetivo no necesita recibir mensajes en el orden original pero el remitente requiere un rendimiento adicional,parallelism-per-partition puede aumentarse de manera segura a cualquier valor inferior a parallelism.
Múltiple Connector Transport s
Algunos sistemas representados por un conector pueden ser alcanzados a través de múltiples transportes. Un ejemplo de esto podría ser una serie de gestores de colas MQ no agrupados o Kafka corredores. Para atender este caso de uso, es posible pasar una lista de connector transport s cuando se construye un sending connector.
SendConnector<ExampleType, ExampleType> connector = SendConnector
.<ExampleType, ExampleType>builder("ExampleSystem")
.withConnectorTransports(List.of(transport1, transport2)) (1)
.withRoutingLogic(routingLogic) (2)
.withQueueSize(0)
.withMaxConcurrentOffers(1)
.withSendTransportMessageConverter(converter)
.withCorrelationIdExtractor(correlationIdExtractor)
.withActorSystem(actorSystem)
.build();
| 1 | Pasa transport1 y transport2 al conector. |
| 2 | Configura la estrategia de enrutamiento. |
La combinación de withRoutingLogic y withConnectorTransports permite el balanceo de carga de solicitudes a través de un conector en múltiples transportes, utilizando diversas implementaciones de lógica de enrutamiento.
La lógica de enrutamiento predeterminada es round-robin.
Para obtener más detalles sobre este tema, consulte la sección de Lógica de Enrutamiento en el Resiliencia documentación.
Exactamente uno de los métodos de construcción withConnectorTransports or withConnectorTransport debe ser llamado.
Si ambos se utilizan, entonces un IllegalArgumentException se lanzará al llamar al método de construcción.
Presión de retroceso
| La contrapresión se discute en el Presión de retroceso página de documentación, consúltela para más detalles sobre este tema. |
Los flujos se inician (materializan) con una cola. Esto significa que es posible configurar la cantidad de trabajo que un conector, y por asociación el sistema con el que se está comunicando, puede manejar a la vez. Esto es útil para situaciones de contrapresión donde hay un productor rápido que está produciendo a una tasa mucho más rápida de lo que el sistema aguas abajo (representado por este conector) puede manejar.
El tamaño de la cola es configurable utilizando el withQueueSize método de constructor.
SendConnector<ExampleType, ExampleType> sendConnector;
sendConnector = new SendConnector
.Builder<ExampleType, ExampleType>(connectorName)
.withQueueSize(1) (1)
.withConnectorTransport(transport)
.withSendTransportMessageConverter(messageConverter)
.withCorrelationIdExtractor(correlationIdExtractor)
.withActorSystem(actorSystem)
.build();
-
El argumento del tamaño de la cola.
Es posible obtener este valor de un marcador de posición, por ejemplo, para permitir reemplazar este valor desde un archivo de configuración.
Si no se proporciona un tamaño de cola, entonces el tamaño de cola predeterminado es definido por el DEFAULT_BUFFER_SIZE que se puede encontrar en el SendingConnector interfaz.
int DEFAULT_BUFFER_SIZE = 50;
int DEFAULT_MAX_CONCURRENT_OFFERS = 500;