Sending Connector Inicio rápido
Esta página explica los detalles sobre cómo comenzar a enviar mensajes a sistemas externos, utilizando sending connector se proporciona al crear la biblioteca de conectores.
Dependencias
Antes de construir un sending connector, el connector-core la biblioteca debe ser incluida como una dependencia.
<dependency>
<groupId>com.iconsolutions.ipf.core.connector</groupId>
<artifactId>connector-core</artifactId>
<version>${connector.version}</version>
</dependency>
La última versión de la biblioteca de conectores se puede encontrar utilizando este Búsqueda de Nexus.
A menos que proporcione su propia implementación, al menos una biblioteca de transporte debe ser declarada.
El esquema de nomenclatura para todos los transportes incluidos en la biblioteca de conectores es connector-[transport], donde [transport] coincide con el esquema de transporte que este conector debe utilizar.
Para más detalles sobre connector transport s consulte el Connector Transports documentación.
Aquí tiene un ejemplo de declarar la dependencia para usar JMS.
<dependency>
<groupId>com.iconsolutions.ipf.core</groupId>
<artifactId>connector-jms</artifactId>
<version>${connector.version}</version>
</dependency>
Introducción: Sending Connector
Sending connector Los s se utilizan para enviar mensajes a algún destino, ya sea como respuesta a un mensaje previamente recibido o para hacer una solicitud.
Patrón de Constructor
Sending connector s se instancian utilizando el patrón de constructor. Esto se debe a que los conectores tienen muchos parámetros que configurar y la mayoría son opcionales o tienen valores predeterminados.
Veamos cómo utilizamos el patrón de constructor para instanciar un sending connector.
SendConnector<ExampleType, ExampleType> connector = SendConnector
.<ExampleType, ExampleType>builder("ExampleSystem") (1)
.withConnectorTransport(transport) (2)
.withSendTransportMessageConverter(converter)(3)
.withCorrelationIdExtractor(correlationIdExtractor) (4)
.withCorrelationService(correlationService) (5)
.withActorSystem(actorSystem) (6)
.build();
| 1 | Establece el nombre del conector. El nombre debe representar a qué se está conectando el conector. |
| 2 | Proporciona una implementación del SendingConnectorTransport interfaz. |
| 3 | Proporciona una implementación del SendTransportMessageConverter interfaz.
Toma la carga útil del mensaje de tipo T(ExampleType en este caso) y lo convierte a un TransportMessage objeto. |
| 4 | Proporciona una implementación del CorrelationIdExtractor interfaz.
Toma la carga del mensaje y extrae (o genera) un identificador de correlación para utilizar al persistir el ProcessingContext a través del servicio de correlación. |
| 5 | Proporciona una implementación del CorrelationService.
El servicio de correlación toma un ProcessingContext y un CorrelationId y los persiste utilizando el repositorio configurado del servicio.
Esto se utiliza para correlacionar una solicitud con su respuesta, la cual será gestionada por un proceso separado. |
| 6 | Establece el sistema de actores utilizado en toda la aplicación. |
Comience a enviar mensajes
En este punto, deberíamos haber instanciado con éxito un SendConnector<ExampleType>, que puede ser utilizado para enviar mensajes a través del transporte configurado.
El SendingConnector la interfaz define dos métodos para enviar mensajes, las firmas para estos son.
public interface SendingConnector<D, R> extends OperableConnector {
int DEFAULT_BUFFER_SIZE = 50;
int DEFAULT_MAX_CONCURRENT_OFFERS = 500;
/**
* Send a request via the connector without a supporting context
*/
default CompletionStage<R> send(@NotNull ProcessingContext processingContext,
D domainMessage) {
return send(processingContext, domainMessage, SupportingContext.empty());
}
/**
* Send a request via the connector with a full sending context
* that allows users to supply a supporting context as well
*/
CompletionStage<R> send(@NotNull ProcessingContext processingContext,
D domainMessage,
@NotNull SupportingContext supportingContext);
/**
* Allows setting of the meterRegistry implementation for a connector
* after connector build
* This allows implementations to set the registry for all SendingConnector instances
* at once instead of having to set them at a per connector bases at build time
*/
void setMeterRegistry(MeterRegistry meterRegistry);
SendingConnectorConfig getConfig();
}
El dominio genérico message type D en este caso es nuestro ExampleType.
`ExampleType`tiene la siguiente definición.
@Data
@AllArgsConstructor
@EqualsAndHashCode
public static class ExampleType implements UpdatebleProcessingContextHolder {
private final String name;
private final LocalDate dob;
private final int shoeSize;
private ProcessingContext processingContext;
public ExampleType(String name, LocalDate dob, int shoeSize) {
this(name, dob, shoeSize, null);
}
}
Ahora podemos crear una instancia del mensaje de dominio y enviarlo al transporte a través del conector.
ExampleType message = new ExampleType(
"Isaac Newton",
LocalDate.of(1642, 12, 25),
8
);
CompletionStage<DeliveryOutcome> future = connector.send(processingContext, message);
Esto devuelve un CompletionStage<DeliveryOutcome>
que podemos bloquear para esperar el DeliveryOutcome(no ideal), o utilice varios métodos en CompletionStage
para realizar otras tareas de manera asíncrona cuando se complete el futuro.
Introducción: Solicitud-Respuesta Sending Connector
El predeterminado sending connector funciona de manera unidireccional donde la respuesta de la solicitud es el resultado de la entrega. Sin embargo, para algunos protocolos (a saber HTTP) es deseable y se espera que se devuelva una respuesta bien formada de una solicitud.
A RequestReplySendConnector se construye de manera similar a otros conectores, utilizando el patrón de constructor.
RequestReplySendConnector<ExampleRequest, ExampleRequest, ExampleResponse, ExampleResponse> connector;
connector = RequestReplySendConnector
.<ExampleRequest, ExampleRequest, ExampleResponse, ExampleResponse>builder("ExampleSystem")
.withConnectorTransport(transport)
.withCorrelationIdExtractor(correlationIdExtractor)
.withSendTransportMessageConverter(sendMessageConverter)
.withReceiveTransportMessageConverter(receiveMessageConverter)
.withActorSystem(actorSystem)
.build();
Este conector es ligeramente más complejo, ya que requiere convertir a y desde el TransportMessage. Sin embargo, se reduce cierta complejidad porque ya no es necesario contar con parámetros relacionados con la correlación.
La firma del método send ahora devuelve un futuro que contiene la respuesta.
Dado que el transporte es poco fiable, existe un sistema de seguridad para garantizar que la solicitud eventually completa, aunque con una excepción después de un período de tiempo de espera.
Se proporciona un valor predeterminado, pero puede ser configurado con el método builder.
.withCallTimeoutSeconds.
Cuando un mensaje se agota el tiempo de envío, el futuro de la respuesta se completará de manera excepcional con un TimeoutException como la causa.
Para referencia, la configuración predeterminada RequestReplySendConnector se hereda de la SendingConnector ajustes predeterminados pero con algunas anulaciónes de resiliencia necesarias pertinentes a las interacciones de solicitud y respuesta únicamente.
# Inherit everything from default-send-connector and then override some of the resiliency settings
default-rr-send-connector = ${ipf.connector.default-send-connector} {
resiliency-settings {
reroute-messages-on-failure = false
minimum-number-of-calls = 10
}
}
Los valores que pueden ser configurados a través de las propiedades de configuración se muestran en la siguiente tabla.
| Propiedad | Descripción | Ejemplo |
|---|---|---|
|
Cuando se establece en |
falso |
|
Tamaño de una cola de origen que puede utilizarse para manejar la presión de retroceso, por ejemplo, en situaciones de productores rápidos. |
50 |
|
Duración máxima para esperar un acuse de recibo antes de completar la futura devuelta excepcionalmente con un |
|
|
Número máximo de ofertas pendientes cuando el búfer está lleno. |
500 |
|
Número máximo de llamadas paralelas para send connector |
500 |
|
Número máximo de envíos paralelos por |
1 |
|
Limita el rendimiento a un número específico de mensajes consumidos por unidad de tiempo. Cuando este valor está establecido, también debe proporcionarse la duración del estrangulador. |
10 |
|
Se utiliza con 'throttle-count' para establecer la tasa máxima de consumo de mensajes. Para más detalles, consulte el Limitación de Mensajes documentación. |
1s |
|
Los ajustes de resiliencia que se utilizarán al enviar. Para más detalles, consulte el Resiliencia documentación. |