Inicio rápido del Sending Connector
Esta página explica detalles sobre cómo comenzar a enviar mensajes a sistemas externos mediante sending connectors proporcionados por la biblioteca de connector.
Dependencias
Antes de construir un sending connector, la biblioteca connector-core debe incluirse como dependencia.
<dependency>
<groupId>com.iconsolutions.ipf.core.connector</groupId>
<artifactId>connector-core</artifactId>
<version>${connector.version}</version>
</dependency>
La última versión de la biblioteca del connector puede encontrarse usando esta búsqueda en Nexus.
A menos que proporcione su propia implementación, al menos una biblioteca de transport debe declararse.
El esquema de nombres para todos los transports incluidos en la biblioteca del connector es connector-[transport], donde [transport] coincide con el esquema de transport que este connector debe usar.
Para más detalles sobre connector transports consulte la documentación de Connector Transports.
Aquí hay un ejemplo de declaración de dependencia para usar JMS.
<dependency>
<groupId>com.iconsolutions.ipf.core</groupId>
<artifactId>connector-jms</artifactId>
<version>${connector.version}</version>
</dependency>
Primeros pasos: Sending Connector
Los sending connectors se utilizan para enviar mensajes a algún destino, ya sea como respuesta a un mensaje recibido previamente o para realizar una solicitud.
Patrón Builder
Los sending connectors se instancian utilizando el patrón builder. Esto se debe a que los connectors tienen muchos parámetros que configurar y la mayoría son opcionales o tienen valores predeterminados.
Veamos cómo usamos el patrón builder para instanciar un sending connector.
SendConnector<ExampleType, ExampleType> connector = SendConnector
.<ExampleType, ExampleType>builder("ExampleSystem") (1)
.withConnectorTransport(transport) (2)
.withSendTransportMessageConverter(converter)(3)
.withCorrelationIdExtractor(correlationIdExtractor) (4)
.withCorrelationService(correlationService) (5)
.withActorSystem(actorSystem) (6)
.build();
| 1 | Establece el nombre del connector. El nombre debe representar con qué se conecta el connector. |
| 2 | Proporciona una implementación de la interfaz SendingConnectorTransport. |
| 3 | Proporciona una implementación de la interfaz SendTransportMessageConverter.
Toma el payload del mensaje de tipo T (ExampleType en este caso) y lo convierte a un objeto TransportMessage. |
| 4 | Proporciona una implementación de la interfaz CorrelationIdExtractor.
Toma el payload del mensaje y extrae (o genera) un identificador de correlación para usar al persistir el ProcessingContext mediante el correlation service. |
| 5 | Proporciona una implementación del CorrelationService.
El correlation service toma un ProcessingContext y un CorrelationId y los persiste usando el repositorio configurado del servicio.
Esto se utiliza para correlacionar una solicitud con su respuesta que será manejada por un proceso separado. |
| 6 | Establece el actor system utilizado en toda la aplicación. |
Comenzar a enviar mensajes
En este punto deberíamos haber instanciado correctamente un SendConnector<ExampleType>, que puede usarse para enviar mensajes sobre el transport configurado.
La interfaz SendingConnector define dos métodos para enviar mensajes, cuyas firmas son:
public interface SendingConnector<D, R> extends OperableConnector {
/**
* Send a request via the connector without a supporting context
*/
default CompletionStage<R> send(@NotNull ProcessingContext processingContext,
D domainMessage) {
return send(processingContext, domainMessage, SupportingContext.empty());
}
/**
* Send a request via the connector with a full sending context
* that allows users to supply a supporting context as well
*/
CompletionStage<R> send(@NotNull ProcessingContext processingContext,
D domainMessage,
@NotNull SupportingContext supportingContext);
/**
* Allows setting of the meterRegistry implementation for a connector
* after connector build
* This allows implementations to set the registry for all SendingConnector instances
* at once instead of having to set them at a per connector bases at build time
*/
void setMeterRegistry(MeterRegistry meterRegistry);
SendingConnectorConfig getConfig();
}
El tipo de mensaje de dominio genérico D en este caso es nuestro ExampleType.
ExampleType tiene la siguiente definición.
@Data
@AllArgsConstructor
@EqualsAndHashCode
public static class ExampleType implements UpdatebleProcessingContextHolder {
private final String name;
private final LocalDate dob;
private final int shoeSize;
private ProcessingContext processingContext;
public ExampleType(String name, LocalDate dob, int shoeSize) {
this(name, dob, shoeSize, null);
}
}
Ahora podemos crear una instancia del mensaje de dominio y enviarla al transport a través del connector.
ExampleType message = new ExampleType(
"Isaac Newton",
LocalDate.of(1642, 12, 25),
8
);
CompletionStage<DeliveryOutcome> future = connector.send(processingContext, message);
Esto devuelve un CompletionStage<DeliveryOutcome>
que podemos bloquear para esperar el DeliveryOutcome (no ideal), o usar varios métodos en CompletionStage
para realizar otras tareas de forma asíncrona cuando el future se completa.
Primeros pasos: Request-Reply Sending Connector
El sending connector predeterminado funciona de forma unidireccional donde la respuesta de la solicitud es el resultado de entrega. Sin embargo, para algunos protocolos (específicamente HTTP) es deseable y esperado que se devuelva una respuesta bien formada de una solicitud.
Un RequestReplySendConnector se construye de manera similar a otros connectors, usando el patrón builder.
RequestReplySendConnector<ExampleRequest, ExampleRequest, ExampleResponse, ExampleResponse> connector;
connector = RequestReplySendConnector
.<ExampleRequest, ExampleRequest, ExampleResponse, ExampleResponse>builder("ExampleSystem")
.withConnectorTransport(transport)
.withCorrelationIdExtractor(correlationIdExtractor)
.withSendTransportMessageConverter(sendMessageConverter)
.withReceiveTransportMessageConverter(receiveMessageConverter)
.withActorSystem(actorSystem)
.build();
Este connector es ligeramente más complejo ya que requiere convertir hacia y desde el TransportMessage. Sin embargo, se reduce cierta complejidad porque ya no es necesario ningún parámetro relacionado con la correlación.
La firma del método send ahora devuelve un future que contiene la respuesta.
Dado que el transport no es confiable, existe un mecanismo a prueba de fallos para garantizar que la solicitud eventualmente se complete, aunque con una excepción después de cierta duración de timeout.
Se proporciona un valor predeterminado, pero puede configurarse con el método del builder,
.withCallTimeoutSeconds.
Cuando el envío de un mensaje excede el tiempo de espera, el future de respuesta se completará excepcionalmente con una TimeoutException como causa.
Como referencia, la configuración predeterminada de RequestReplySendConnector se hereda de la configuración predeterminada de SendingConnector pero con algunas anulaciones de resiliencia necesarias pertinentes solo a interacciones de request/response.
# Inherit everything from default-send-connector and then override some of the resiliency settings
default-rr-send-connector = ${ipf.connector.default-send-connector} {
resiliency-settings {
reroute-messages-on-failure = false
minimum-number-of-calls = 10
}
}
Los valores que pueden configurarse mediante propiedades de configuración se muestran en la siguiente tabla.
| Property | Description | Example |
|---|---|---|
|
Cuando se establece en |
false |
|
Tamaño de una cola de origen que puede utilizarse para manejar backpressure, por ejemplo, situaciones de productor rápido. |
50 |
|
Duración máxima de espera para un acuse de recibo antes de completar el future devuelto excepcionalmente con una |
|
|
Número máximo de ofertas pendientes cuando el buffer está lleno. |
500 |
|
Número máximo de llamadas paralelas para el send connector |
500 |
|
Número máximo de envíos paralelos por |
1 |
|
Limita el throughput a un número especificado de mensajes consumidos por unidad de tiempo. Cuando se establece este valor, también debe proporcionarse throttle-duration. |
10 |
|
Se usa con 'throttle-count' para establecer la velocidad máxima de consumo de mensajes. Para más detalles, consulte la documentación de Message Throttling. |
1s |
|
La configuración de resiliencia que se utilizará al enviar. Para más detalles, consulte la documentación de Resilience. |