CON2 - Escribir tu propio conector (Kafka)
|
Comenzando
Este paso del tutorial utiliza como punto de partida la solución "add_payment_init" del proyecto. Si en cualquier momento quieres ver la solución de este paso, la encontrarás en la solución "add_kafka". |
En CON1 - Añadir iniciación de pagos, conectamos nuestra aplicación de ejemplo con un iniciador de pagos para permitirnos recibir instrucciones de pago desde un servicio remoto. Para ello, usamos un conector preempaquetado, de modo que lo único que tuvimos que implementar fue la lógica de procesamiento al recibir el mensaje. Esta vez vamos a ir un paso más atrás y a escribir el conector nosotros mismos.
Lo haremos integrándonos con un sistema de sanciones de prueba. Este sistema:
-
Espera recibir un objeto "SanctionsRequest" personalizado.
-
Retornará un objeto "SampleEvent" personalizado.
El sistema puede funcionar sobre Kafka o JMS. En este ejemplo usaremos Kafka.
Algunos conceptos básicos
Empecemos con algunos conceptos básicos del framework de conectores.
| Concepto | Descripción |
|---|---|
Usa streams para procesar mensajes. Procesar mensajes puede implicar:
|
|
Indica al conector cómo comunicarse con el protocolo subyacente |
En este tutorial consideraremos dos tipos de conectores: conectores de "envío" (Sending) y conectores de "recepción" (Receiving). Los usaremos para poner un mensaje en un topic de Kafka (envío) y para procesar la respuesta (recepción).
Como la petición/respuesta será asíncrona, necesitaremos alguna forma de determinar qué respuesta corresponde a qué petición. Esto lo hacemos usando la "correlación", que se utiliza para relacionar una respuesta con un mensaje. Puedes leer más sobre correlación aquí.
Es posible usar el mismo conector y suministrarle transportes distintos para poder comunicarse sobre diferentes protocolos. Esto mantiene separada nuestra lógica de procesamiento de mensajes del protocolo subyacente.
Las librerías de transporte de conectores se nombran con la convención connector-{NOMBRE_TRANSPORTE}, p. ej., connector-http o connector-kafka.
Clases de soporte
Lo primero que haremos será importar la definición de dominio del sistema de sanciones. Para ello, debemos añadir una dependencia en el pom.xml de nuestro módulo "ipf-tutorial-app":
<dependency>
<artifactId>sanctions-domain</artifactId>
<groupId>com.iconsolutions.ipf.sample.samplesystems</groupId>
</dependency>
Veamos las clases clave que recibimos de este módulo. La primera es el objeto de petición que enviamos al sistema de sanciones.
@Data
public class SanctionsRequest extends SampleEvent<SanctionsRequestPayload> {
private Map<String, String> headers;
}
Aquí tenemos un objeto que contiene un conjunto de cabeceras y un payload de petición.
Para la respuesta, podemos ver el objeto SanctionsResponse. Revisando su jerarquía, el núcleo de la respuesta es:
@Builder
@NoArgsConstructor
@AllArgsConstructor
@Data
@JsonInclude(JsonInclude.Include.NON_NULL)
public class FilteringResponse {
private String status;
private String additionalInf;
}
Así que lo clave es que recibiremos un status.
A continuación incorporaremos otra dependencia:
<dependency>
<artifactId>sanctions-mapping</artifactId>
<groupId>com.iconsolutions.ipf.sample.samplesystems</groupId>
</dependency>
Esta dependencia proporciona un conjunto preempaquetado de mapeos (usando el framework de mapeo de Icon) que transforma un pacs008 en un objeto SanctionsRequest. La clase clave a mirar aquí es SanctionsMapper, que expone este método:
public SanctionsRequest map(FIToFICustomerCreditTransferV08 fiToFICustomerCreditTransfer) {
var filteringRequest = transformationService.mapThenEnrichWithDefault(fiToFICustomerCreditTransfer, FilteringRequest.class);
SanctionsRequest sanctionsRequest = new SanctionsRequest();
sanctionsRequest.setHeader(HeaderUtils.makeHeader("Sanctions", fiToFICustomerCreditTransfer.getCdtTrfTxInf().get(0).getPmtId().getTxId()));
sanctionsRequest.setPayload(new SanctionsRequestPayload(filteringRequest));
return sanctionsRequest;
}