CON2 - Escribiendo su propio conector (Kafka)
Si en algún momento desea ver la solución a este paso, esta se puede encontrar en la solución "add_kafka". |
En CON1 - Añadiendo la iniciación de pagos, conectamos nuestra aplicación de muestra con un iniciador de pagos para permitirnos recibir el pago instructions de un servicio remoto. Para hacer esto, utilizamos un conector preempaquetado, de modo que todo lo que teníamos que hacer era implementar la lógica para el procesamiento al recibir el mensaje. Esta vez vamos a dar un paso más atrás y, de hecho, escribir el conector nosotros mismos.
Usted lo hará integrándose con un sistema de sanciones de prueba. Este sistema:
-
Espera recibir un custom " SanctionsRequest " objeto.
-
Devolverá un custom Objeto "SampleEvent".
El sistema puede funcionar sobre cualquiera de Kafka or JMS. Usted utilizará Kafka¡en este ejemplo!
Algunos conceptos básicos
Comencemos con algunos conceptos básicos sobre el marco de conectores.
| Concepto | Descripción |
|---|---|
Utiliza flujos para procesar mensajes. El procesamiento de mensajes puede involucrar:
|
|
Indica al conector cómo comunicarse con el protocolo subyacente. |
En este tutorial consideraremos dos tipos de conectores.- "Envío" de conectores y "Recepción" de conectores. Usted utilizará estos para colocar un mensaje en un Kafka tema (enviar) y luego también para procesar la respuesta (recibir).
Dado que la solicitud / respuesta será asíncrona, necesitaremos alguna forma de determinar qué respuesta corresponde a qué solicitud. Hacemos esto utilizando "correlation", que se utiliza para relacionar una respuesta con un mensaje. Puede leer más sobre la correlación.aquí.
Es posible utilizar el mismo conector y proporcionar diferentes transportes para poder comunicarse a través de diferentes protocolos. Esto mantiene nuestra lógica de procesamiento de mensajes separada del protocolo subyacente.
El connector transport las bibliotecas se nombran con la convención connector-{TRANSPORT_NAME} e.g.connector-http or connector-kafka.
Clases de Soporte
Lo primero que haremos es importar la definición del dominio para el sistema de sanciones. Para ello, necesitamos agregar una dependencia en nuestras aplicaciones "ipf-tutorial-app".pom.xml:
<dependency>
<artifactId>sanctions-domain</artifactId>
<groupId>com.iconsolutions.ipf.sample.samplesystems</groupId>
</dependency>
Veamos las clases clave que recibimos de este módulo. La primera es el objeto de solicitud que enviamos al sistema de Sanciones.
@Data
public class SanctionsRequest extends SampleEvent<SanctionsRequestPayload> {
private Map<String, String> headers;
}
Aquí tenemos un objeto que está tomando tanto un conjunto de encabezados como una carga útil de solicitud.
Para el lado de respuesta, podemos ver el objeto SanctionsResponse. Nuevamente, al verificar la jerarquía, veremos el core la respuesta es:
@Builder
@NoArgsConstructor
@AllArgsConstructor
@Data
@JsonInclude(JsonInclude. Include. NON_NULL)
public class FilteringResponse {
private String status;
private String additionalInf;
}
Entonces, el elemento clave aquí es que recibiremos de vuelta un estado.
A continuación, incorporaremos otra dependencia,
<dependency>
<artifactId>sanctions-mapping</artifactId>
<groupId>com.iconsolutions.ipf.sample.samplesystems</groupId>
</dependency>
Esta dependencia proporciona un conjunto preempaquetado de mappers(usando el Mapping Framework de Icon) que proporciona un mapping de un pacs008 a un SanctionsRequest objeto. La clase clave a considerar aquí es la clase SanctionsMapper que proporciona este método:
public SanctionsRequest map(FIToFICustomerCreditTransferV08 fiToFICustomerCreditTransfer) {
var filteringRequest = transformationService.mapThenEnrichWithDefault(fiToFICustomerCreditTransfer, FilteringRequest.class);
SanctionsRequest sanctionsRequest = new SanctionsRequest();
sanctionsRequest.setHeader(HeaderUtils.makeHeader("Sanctions", fiToFICustomerCreditTransfer.getCdtTrfTxInf().get(0).getPmtId().getTxId()));
sanctionsRequest.setPayload(new SanctionsRequestPayload(filteringRequest));
return sanctionsRequest;
}
Aquí podemos ver que estamos mapping desde el pacs. 008(FITo FICustomer Credit Transfer) para devolver el SanctionsRequest.
Ahora utilizaremos Spring para proporcionarnos una instancia de las sanciones.mapper que podemos utilizar. Para ello, crearemos un nuevo bean dentro de la clase IpfTutorialConfig como:
@Bean
public SanctionsMapper sanctionsMapper(ObjectMapper objectMapper) {
return new SanctionsMapper(objectMapper);
}
Esos son los bloques de construcción que vamos a utilizar, así que procedamos y comencemos a escribir nuestro conector.
El Send Connector
Comenzará por observar el send connector, este es el que publicará un SanctionsRequest mensaje sobre el tema apropiado para que el sistema externo lo consuma.
Crearemos una nueva clase de configuración, llamada "SanctionsConnectorConfiguration", y la colocaremos en un nuevo paquete "connector" dentro del paquete "config" existente.
(Recuerde marcar su clase con la primavera @Configuration¡anotación! También utilizará la anotación @Slf4j de lombok para proporcionar acceso a una implementación de registro.
Ahora escribamos nuestro send connector, necesitaremos hacer algunos decisions primero:
-
Tipos- La definición de un send connector is SendConnector<D, T>. En este caso, la D representa el tipo de objeto fuente (dominio) y la T el tipo de objeto destino. En nuestro conector, se tomará un FITo FICustomer Credit Transfer V08 y se enviará un SanctionsRequest objeto.
-
Registro- Podemos proporcionar una implementación de registro a un conector. Un message logger es cualquier clase que implemente esta interfaz funcional simple:
public interface MessageLogger {
void logMessage(MessageLogEntry var1);
}
En nuestra instancia, la aplicación viene con un message logger que es parte del procesamiento de datos. Esto tiene todo lo que necesitamos, así que simplemente lo reutilizaremos.
-
Correlación- Para la correlación, vamos a utilizar otra utilidad proporcionada por Icon. Usted utilizará el servicio de correlación de Icon mongo. Por lo tanto, también necesitaremos incluir esa dependencia:
<dependency>
<groupId>com.iconsolutions.ipf.core.connector</groupId>
<artifactId>connector-correlation-starter-mongodb</artifactId>
</dependency>
-
Mapeo- Como se discutió anteriormente, para mapping utilizaremos el SampleMapper que hemos incorporado.
Comencemos pensando a nivel de clase. Usted proporcionará variables de clase para apoyar nuestros conectores:
private final SanctionsMapper sanctionsMapper;
private final ObjectMapper objectMapper;
private final ClassicActorSystemProvider actorSystem;
private final CorrelationService correlationService;
private final MessageLogger messageLogger;
Estos son, como se discutió anteriormente, junto con el sistema de actores en sí.
Utilizará el marco de trabajo de Spring.dependency injection para proporcionarlos utilizando la anotación @AllArgsConstructor.
Entonces, creemos un nuevo método para crear el send connector:
@Bean(name = "sanctionsSendConnector")
public SendConnector<FIToFICustomerCreditTransferV08, SanctionsRequest> sanctionsSendConnector(ConnectorTransport<SanctionsRequest> sanctionsSendConnectorTransport) {
return SendConnector.<FIToFICustomerCreditTransferV08, SanctionsRequest>builder("Sanctions", "sanctions.send-connector", actorSystem.classicSystem())
.withConnectorTransport(sanctionsSendConnectorTransport)
.withCorrelationIdExtractor(event -> CorrelationId.of(event.getHeader().getTechnical().getEventId()))
.withCorrelationService(correlationService)
.withSendTransportMessageConverter(this::convertToTransport)
.withDomainToTargetTypeConverter(sanctionsMapper::map)
.withMessageLogger(messageLogger)
.build();
}
Esto es lo suficientemente importante como para revisar cada parte por turno.
Primero pasamos al método un Connector Transport<SanctionsRequest> sanctions Send Connector Transport. Esta es una implementación de un connector transport, si recuerda nuestra discusión anterior, el connector transport se utiliza para proporcionar la información del protocolo de bajo nivel. Por ahora, no vamos a preocuparnos por eso, por lo tanto, simplemente lo pasaremos a nuestro método.
Ahora tomemos cada línea por turno y expliquemos qué está sucediendo.
-
La construcción del constructor- toma tres parámetros:
-
El nombre del conector-esto es particularmente útil más adelante cuando revisemos métricas.
-
La raíz de configuración para el conector. Esto nos permite pasar la ruta raíz para las propiedades del conector. Todas las propiedades del conector comenzarán con esta variable. Esto nos permite, a su vez, construir propiedades a nivel de conector.
-
El sistema de actores en sí.
-
-
el transporte- obviamente aquí simplemente utilizamos el connector transport hemos pasado al método.
-
la función de extracción de correlación- esta es una función que proporcionará al conector un id único que se utiliza para la correlación. El id debe ser único y también debe ser obtenible del mensaje de respuesta. Aquí utilizamos el event id en el encabezado.
-
el servicio de correlación- De acuerdo con lo anterior, utilizaremos la implementación respaldada por Icon mongo.
-
el transport message converter- esta es una función que toma la solicitud de sanciones y la convierte en una transport message para transmitir a través del cable. En nuestro caso, realizaremos una implementación simple que crea un nuevo TransportMessage con una representación de cadena de nuestra solicitud como la carga útil.
private TransportMessage convertToTransport(SanctionsRequest request) {
try {
return new TransportMessage(new MessageHeaders(CryptoHelper.messageHeaders()), objectMapper.writeValueAsString(request));
} catch (JsonProcessingException e) {
throw new IconRuntimeException(e);
}
}
-
el convertidor de tipo de dominio a objetivo- esta es la función que mapeará desde nuestro tipo de dominio (pacs. 008) a nuestro tipo objetivo (SanctionsRequest). De acuerdo con lo anterior, se utilizará el método map de SanctionsMapper para hacer esto.
-
el message logger- Como se discutió, simplemente utilizaremos el registrador existente aquí.
Send Connector Diagrama de Secuencia
El diagrama muestra cómo el conector utiliza los objetos y funciones que usted ha proporcionado para preparar un mensaje y lo envía a el destino. Tenga en cuenta que este es un diagrama simplificado y no representa todo lo que ocurre dentro de un conector. Una explicación completa de la send connector se puede encontrar aquí.
El Receive Connector
Ahora que hemos escrito el código para configurar nuestro conector para enviar un mensaje al sistema de sanciones, necesitaremos uno para hacer lo contrario y recibir la respuesta cuando esté lista. Así que, nuevamente, añadamos un nuevo método a nuestra configuración, esta vez para construir nuestro receive connector:
@Bean(name = "sanctionsReceiveConnector")
public ReceiveConnector<SampleEvent> sanctionsReceiveConnector(@Qualifier("sanctionsReceiveConnectorTransport") ReceiveConnectorTransport sanctionsReceiveConnectorTransport) {
return ReceiveConnector.<SampleEvent>builder("SanctionsReceive", "sanctions.receive-connector", actorSystem.classicSystem())
.withConnectorTransport(sanctionsReceiveConnectorTransport)
.withCorrelationIdExtractor(event -> CorrelationId.of(event.getHeader().getTechnical().getOriginalEventId()))
.withCorrelationService(correlationService)
.withReceiveTransportMessageConverter(message -> sanctionsMapper.convertResponse(message.getPayload().toString()))
.withReceiveHandler(this::sanctionsReceiveConnector)
.withMessageLogger(messageLogger)
.build();
}
Como hicimos la última vez, revisemos los puntos clave de esta configuración.
-
La construcción del constructor- justo como con send connector Pasaremos el nombre del conector, la ruta de configuración raíz y el sistema de actores.
-
el transporte- nuevamente necesitaremos la configuración del protocolo, esta vez para recibir un mensaje. Usted pasará esto a nuestro método para permitirnos manejar los detalles del protocolo en otro lugar.
-
la función extractora de correlación- al igual que con el envío, necesitamos una función que proporcione el mismo id que teníamos en la función de envío, pero esta vez extraído de la respuesta event. Esta vez tomaremos el "original event id" del encabezado.
-
el servicio de correlación- nuevamente utilizaremos el respaldado por Mongo de Icon.
-
el convertidor de transporte de recepción- aquí necesitamos una función que convierta el mensaje en bruto recibido en el tipo de respuesta esperado (un ejemplo event). Usted tomará eso de nuestras sanciones mapper funciones.
-
el manejador de recepción- esta es la función clave. Esta es una función que recibe el mensaje de respuesta junto con el contexto para este (tomado de la correlación en nuestro caso) y requiere que determinemos qué hacer con él. En nuestro caso, queremos construir una nueva entrada de sanciones de dominio y enviarla a nuestro dominio. Verifique si puede escribir esto y, cuando esté listo, compárelo con nuestra función a continuación:
private CompletionStage<Void> sanctionsReceiveConnector(ReceivingContext receivingContext, SampleEvent sampleEvent) {
return SanctionsDomain.sanctionsSystem().handle(new SanctionsNoHitInput. Builder(receivingContext.getProcessingContext().getAssociationId().getValue()).build())
.thenAccept(done -> log.info("Completed {}", done));
}
-
el message logger- y nuevamente utilizaremos el registrador existente.
Eso es todo, esa es nuestra totalidad receive connector escrito y listo para enviar.
El Connector Transport s
En las definiciones tanto para nuestra recepción y send connector s, hemos pasado una definición de transporte. Ahora necesitamos configurarlas. Por el momento, solo vamos a utilizar Kafka, por lo que necesitaremos esas versiones. Para ello, crearemos una nueva clase de configuración de Spring llamada "SanctionsTransportConfiguration" que colocaremos en un nuevo paquete "transports" bajo nuestro paquete de conectores.
El Sending Connector Transporte
Primero, añadamos la dependencia para el marco del conector. Kafka implementación.
<dependency>
<groupId>com.iconsolutions.ipf.core.connector</groupId>
<artifactId>connector-kafka</artifactId>
</dependency>
Comenzará con el lado de envío:
@Bean
public ConnectorTransport<SanctionsRequest> sanctionSendConnectorTransport(ClassicActorSystemProvider actorSystem) {
return KafkaConnectorTransport.stringBuilder(
"SanctionsSendKAFKA", actorSystem, "sanctions")
.build();
}
En esta etapa, nos movemos a niveles específicos bajos. Kafka configuración, cuyos detalles están fuera de este tutorial. Sin embargo, las cosas clave a tener en cuenta son:
-
Damos el connector transport un nombre, nuevamente para su uso en métricas, un sistema actor y la ruta raíz de configuración que se utilizará
El Receive Connector Transporte
Esto es muy similar al envío:
@Bean
public ReceiveConnectorTransport sanctionsReceiveConnectorTransport(ClassicActorSystemProvider actorSystem) {
return KafkaAckReceiveConnectorTransport.stringBuilder(
"KafkaSanctionsReceiveConnectorTransport", actorSystem, "sanctions")
.build();
}
Las principales diferencias son que esta vez utilizamos el Kafka configuración para consumir mensajes de un tema (¡o potencialmente múltiples temas!). También debemos decidir cuántas particiones queremos aplicar a nuestro tema. Esto ayudará con el rendimiento, pero por ahora está fuera del alcance de este tutorial.
Eso es nuestro transporte definido, lo único que queda ahora es configurar la configuración real para utilizarlo.
Uso del Conector
Ahora necesitamos conectar nuestras conexiones a nuestro flujo para su uso. Lo primero que debe tener en cuenta aquí es que el receive connector La parte (método sanctionsReceiveConnector) ya está hecha para nosotros, ya que el controlador está enviando la respuesta de vuelta al dominio.
Aquí solo necesitamos preocuparnos por el lado de envío. Para esto, simplemente debemos reemplazar nuestro adaptador de sanciones de muestra (SampleSanctionsSystemActionAdapter) por uno real que llame a nuestro nuevo send connector El método send de 's. La acción que proporcionamos al adaptador tiene toda la información que necesitaremos para hacer eso también. Consulte si puede configurarlo ahora y, cuando esté listo, la solución se encuentra a continuación.
@Bean
public SanctionsDomain sanctionsDomain(ActorSystem actorSystem, SendConnector<FIToFICustomerCreditTransferV08, SanctionsRequest> sanctionsSendConnector) {
// All adapters should be added to the domain model
return new SanctionsDomain. Builder(actorSystem)
.withSanctionsSystemActionAdapter(checkSanctionsAction ->
sanctionsSendConnector.send(checkSanctionsAction.getProcessingContext(), checkSanctionsAction.getCustomerCreditTransfer())
.thenAccept(done -> log.debug("Result: {}", done)))
.build();
}
Así que aquí podemos ver que estamos inyectando en nuestro nuevo send connector y luego simplemente llamando al envío y registrando el resultado. Eso es todo lo que necesitamos hacer.
Configuración
Añadirá nuestra configuración en el archivo de configuración de nuestra aplicación (ipf-tutorial-app/application.conf).
Configuración del Conector
Para que nuestro conector funcione, necesitamos añadir una serie de elementos:
-
El restart ajustes
-
El número de particiones a utilizar en el envío
-
Los identificadores de grupo y cliente para Kafka para usar
Vamos a añadir estos en el común akka configuración que creamos en el tutorial anterior. Podría igualmente agregar todo esto en el bloque de sanciones personalizado, pero entonces no estarían disponibles para reutilizar en otros conectores.
Restart Configuraciones
Primero, necesitamos restart configuraciones, es decir, para indicarle al conector qué hacer en caso de conmutación por error. Aquí definiremos el conjunto estándar que vamos a utilizar para todos nuestros conectores.
default-restart-settings {
min-backoff = 1s
max-backoff = 5s
random-factor = 0. 25
max-restarts = 5
max-restarts-within = 10m
}
Estamos informando a nuestro conector que en el event de fallo de transporte, intentaremos restart hasta 5 veces con un tiempo creciente entre cada una restart.
Para utilizar esto, vamos a añadirlo a la configuración predeterminada.akka bloque. Puede hacer esto añadiendo una línea debajo de la configuración del consumidor y del productor:
producer {
restart-settings = ${default-restart-settings}
}
| Tenga en cuenta la sintaxis aquí, cómo podemos referirnos a un bloque complejo desde otro lugar en nuestro hocon estructura utilizando el ${..} configuración. |
Particiones
Las particiones nos permiten definir cuántas Kafka particiones que debemos configurar. En nuestro caso, solo vamos a configurar 10 particiones, lo hacemos añadiéndolo a la akka bloque necesitamos añadir una línea al consumidor con:
Identificadores de Cliente y Grupo
Finalmente, añadiremos los identificadores de cliente y de grupo.
-
client.id- establece el nombre de un individuo Kafka productor o cliente consumidor. -
group.id- establece el nombre del Kafka grupo de consumidores al que un individuo Kafka cliente consumidor al que pertenece.
Para el identificador del cliente, necesitamos agregar una entrada a la configuración del productor:
kafka-clients {
client.id = ipf-tutorial-client
}
Y para el id del grupo, necesitamos agregar una entrada a la configuración del consumidor:
kafka-clients {
group.id = ipf-tutorial-group
}
Connector Transport Configuración
Para nuestro connector transport Para funcionar, se añadió el parámetro "sanctions" al stringBuilder anteriormente.
El parámetro "sanctions" indica al conector que busque elementos bajo el prefijo "sanctions" en la configuración. Por lo tanto, todo lo que necesitamos hacer es agregar nuestros temas bajo esta raíz de la siguiente manera:
sanctions {
kafka {
producer {
topic = SANCTIONS_REQUEST
}
consumer {
topic = SANCTIONS_RESPONSE
}
}
}
Resumen
Eso es todo lo que hemos completado desde el lado de la configuración de nuestra aplicación, solo como un resumen, la configuración completa para el conector debería verse ahora así:
sanctions {
kafka {
producer {
topic = SANCTIONS_REQUEST
}
consumer {
topic = SANCTIONS_RESPONSE
}
}
}
// default settings for kafka
default-restart-settings {
min-backoff = 1s
max-backoff = 5s
random-factor = 0. 25
max-restarts = 5
max-restarts-within = 10m
}
common-kafka-client-bootstrap-servers = "localhost:9093"
akka {
kafka {
producer {
restart-settings = ${default-restart-settings}
kafka-clients {
bootstrap.servers = ${common-kafka-client-bootstrap-servers}
client.id = ipf-tutorial-client
}
}
consumer {
restart-settings = ${default-restart-settings}
kafka-clients {
bootstrap.servers = ${common-kafka-client-bootstrap-servers}
group.id = ipf-tutorial-group
}
}
}
}
// end default kafka settings
Ejecutando la Aplicación
Para ejecutar la aplicación, lo primero que debe hacer es configurar el servicio de sanciones real con el que estaremos interactuando.
Docker Configuración
Si utiliza docker, aquí está la nueva entrada para nuestro application.yml (docker/application.yml)
sanctions:
image:registry.ipf.iconsolutions.com/sample-systems-sanctions-simulator-kafka:2. 1. 47
container_name: sanctions-sim
ports:
- 5010:5005
- 8088:55555
environment:
- SANCTIONS_MODE=normal
- SANCTIONS_TRANSPORT=kafka
- SANCTIONS_SIM_ENCRYPTION_ENABLED=FALSE
- SANCTIONS_SIM_ENCRYPTION_KEYSTORE_PATH=file:///tmp/keys/connector/keystore-pkcs12-aes128.jks
volumes:
-./config/keys:/tmp/keys:ro
-./config/sanctions:/sanctions-simulator-kafka/conf
-./logs/sanctions:/ipf/logs
depends_on:
- kafka
Tenga en cuenta que la versión "2. 1. 47" proporcionada aquí es la última versión en el momento de la redacción de este documento.
También tenga en cuenta que no estamos estableciendo ningún Kafka configuración aquí, esto se debe a que el conjunto de muestras proporcionado arriba funciona con el docker entorno.
Para facilitar las cosas, también añadiremos un logback.xml file para sanciones:
<? xml version="1. 0" encoding="UTF-8"?>
<configuration>
<appender name="FILE"
class="ch.qos.logback.core.rolling. RollingFileAppender">
<file>/ipf/logs/sanctions-sim.log</file>
<rollingPolicy class="ch.qos.logback.core.rolling. FixedWindowRollingPolicy">
<fileNamePattern>/ipf/logs/sanctions-sim.log.%i</fileNamePattern>
<minIndex>1</minIndex>
<maxIndex>20</maxIndex>
</rollingPolicy>
<triggeringPolicy
class="ch.qos.logback.core.rolling. SizeBasedTriggeringPolicy">
<maxFileSize>50MB</maxFileSize>
</triggeringPolicy>
<encoder>
<pattern>%date{yyyy-MM-dd} %d{HH:mm:ss. SSS} %-5level %X{traceId} %logger{36} %X{sourceThread} %X{akkaSource} - %msg%n</pattern>
</encoder>
</appender>
<root level="INFO">
<appender-ref ref="FILE" />
</root>
</configuration>
No Docker Configuración
Los detalles sobre cómo ejecutar el simulador de sanciones se pueden encontrar aquí:Uso del simulador de sanciones
Si es necesario, también asegúrese de que la configuración descrita anteriormente tenga la correcta Kafka configuración para su entorno.
Cómo el Simulador de Sanciones Interactúa con el Flujo
Antes de ejecutar la aplicación, revise este diagrama para entender cómo los conectores interactúan con el simulador de sanciones a través del Kafka colas.
Probando que todo funcione
Ahora es el momento de verificar que todo funcione, así que reconstruyamos nuestra aplicación:
mvn clean install -rf:ipf-tutorial-app
Y luego podríamos enviar un pago:
curl -X POST localhost:8080/submit | jq
Y si mencionamos el pago en el Developer GUI En ('IPF Transaction Explorer') podemos observar un par de cosas interesantes.
Primero, si vamos a la pestaña de mensajes (search from the main page by unit of work id (uowId), haga clic en ver en la transacción, haga clic en la pestaña de mensajes) veremos:
Aquí podemos ver que ahora estamos registrando los mensajes que van tanto a (ENVIADOS) como desde (RECIBIDOS) el sistema de sanciones. Si usted "Hace clic para ver el cuerpo", puede ver los detalles de los mensajes de sanciones transformados.
En segundo lugar, para confirmar que nada más ha cambiado, podemos observar el gráfico de nuestro flujo de tutorial (click flows, click Iptutorial Flow V2, click view graph) entonces vemos:
¡Y aquí podemos ver que nuestro flujo está funcionando correctamente!