Procesamiento de Cliente de Debulker
Una instancia de Debulker puede dividir archivos grandes en componentes según su configuración (consulta la sección anterior Uso de Debulker). Esos componentes pueden procesarse individualmente a través de aplicaciones o flujos. Esta sección del tutorial muestra cómo procesar componentes después de que un archivo haya sido “debulkeado”.
| Se asume que estás siguiendo la solución add-debulker o has completado la sección anterior Uso de Debulker |
Procesamiento de componentes “debulkeados”
Cuando se debulkea un archivo, sus componentes se añaden al almacén de componentes (actualmente implementado como un almacén MongoDB; puedes ver la colección "componentEntity" si completaste la sección anterior). Esto significa que esos componentes están disponibles para que aplicaciones fuera del Debulker los accedan. Para saber cuándo los componentes están disponibles para procesamiento, el Debulker emitirá un InitiateComponentProcessingCommand. Este comando puede ser consumido por la aplicación/flujo cliente e informa de que se ha recibido un bulk, se ha debulkeado y los componentes están listos para ser procesados.
El mensaje enviado proporciona el bulkId:
{"bulkId":"pain.001.12345"}
Usando este bulkId, la aplicación cliente puede consultar el Component Store y utilizar la siguiente operación para acceder a los componentes relacionados con ese bulk:
Flow.Publisher<Component<T>> findAllByBulkId(BulkId bulkId);
Alternativamente, es posible consultar el component store especificando el bulkId y el marker del/los componente(s) a recuperar. Este es el método utilizado por Pain001Processor para recuperar el componente raíz a procesar.
Flux<Component<T>> findAllByBulkIdAndMarkerFlux(BulkId bulkId, String marker);
Codificando el procesamiento de componentes
Para procesar la notificación InitiateComponentProcessingCommand y acceder a los componentes, como mínimo debemos implementar:
-
Conector de recepción
-
Llamada al Component Store para recuperar componentes
-
Procesamiento para cada componente recuperado
Cada uno de estos puede codificarse como parte de la aplicación cliente o de un flujo IPF. La elección depende de la orquestación que necesites para procesar los componentes, pero es bastante probable que el procesamiento de cada componente se haga a través de un flujo IPF.
A efectos de este ejemplo, el conector receptor y la llamada al component store están implementados solo en Java (dentro de la misma aplicación de debulker) y se inicia un flujo IPF para cada componente de nivel de transacción del pain.001 que está siendo debulkeado.
| Nada impide que tanto el procesamiento de InitiateComponentProcessingCommand como la recuperación de componentes también se hagan mediante un flujo IPF; el código Java subyacente sería en gran medida el mismo. |
Conector de recepción
La implementación del conector para este ejemplo de tutorial se encuentra en ipf-debulker-tutorial-app config/connector/DebulkerNotificationConnector.java; es un simple ReceiveConnector<InitiateComponentProcessingCommand>. La parte más interesante es el procesamiento de cada notificación recibida e implementado en el receive handler de la misma clase:
// ReceiveHandler
private CompletionStage<Void> componentProcessingReceiveConnector(ReceivingContext receivingContext, InitiateComponentProcessingCommand componentProcessingCommand) {
return CompletableFuture.supplyAsync(() -> {
log.info("InitiateComponentProcessingCommand received for bulkId {} ", componentProcessingCommand.getProcessingContext().getClientRequestId().getValue());
Pain001Context pain001Context = Pain001Context.builder()
.marker("Document")
.processingContext(componentProcessingCommand.getProcessingContext())
.build();
componentProcessor.process(componentProcessingCommand.getProcessingContext(), pain001Context); (1)
return null;
});
}
| 1 | El component processor se inyecta vía Spring Llama a process en el ComponentProcessor pasando el processingContext completo, que contiene el bulkId (processingContext.clientRequestId.value). El segundo parámetro, un objeto de contexto, puede usarse para pasar parámetros adicionales, p. ej., parentId desde procesadores de componentes hijos. |
Configuración
El conector receptor también necesitará configuración en la aplicación para saber a qué tópico acceder. En el tutorial está configurado para acceder al tópico por defecto de esta notificación (y se coloca en el application.conf de ipf-debulker-tutorial-app):
client-processing.connector {
kafka {
consumer {
topic = CLIENT_PROCESSING_REQUEST
}
}
}
Component Processor - Java
Se deben definir Component Processors para cada nivel definido en la jerarquía de componentes.
En el ejemplo proporcionado, hay tres niveles definidos en la configuración:
-
Document
-
CstmrCdtTrfInitn.PmtInf
-
CdtTrfTxInf
El módulo ipf-debulker-starter incluye un ComponentProcessor; para definir un procesador específico, simplemente extiende esta clase.
Document Component Processor
public class Pain001Processor extends ComponentProcessor<Document, Pain001Context> { (1)
private final Pain001PaymentInstructionProcessor nextProcessor;
public Pain001Processor(ComponentStore<NoCustomData> componentStore, Pain001PaymentInstructionProcessor nextProcessor) {
super(componentStore, Pain001Processor::map, true); (2)
this.nextProcessor = nextProcessor;
}
// Used to unmarshall the XML to an iso20022 model object
|
Conserva sin traducir nombres de clases, paquetes, APIs, IDs de anclaje, propiedades de configuración, rutas, URLs y bloques de código. Traduce solo la prosa, títulos, encabezados, notas y texto alternativo. |