Usando Debulker

¿Qué es un Debulker?

A Debulker es un módulo de aplicación responsable de dividir archivos grandes en componentes que serán utilizados para el procesamiento. Debulker espera recibir una notificación sobre el archivo que necesita ser desagregado, junto con el nombre de la configuración que se va a utilizar para dividir el archivo en componentes.

Debulker en su aplicación

El primer paso es agregar los módulos de inicio de maven del debulker a su IPF application pom:

<!--Responsible for debulking-->
<dependency>
  <groupId>com.iconsolutions.ipf.debulk</groupId>
  <artifactId>ipf-debulker-starter</artifactId>
</dependency>
<!--Responsible for cleaning up after bulk components are processed by interested party-->
<dependency>
  <groupId>com.iconsolutions.ipf.debulk</groupId>
  <artifactId>ipf-debulker-housekeeping-starter</artifactId>
</dependency>

Este módulo depende de algunos módulos adicionales:

  • almacén-de-componentes-responsable de almacenar los componentes producidos por el debulker.

  • notificación-de-nuevo-archivo-ipf-debulker-responsable de consumir la notificación del archivo que debe activar el desagregado del archivo.

  • ipf-debulker-archiver-responsable de archivar el archivo masivo después de que haya sido procesado con éxito.

  • ipf-debulker-client-processing-notifica a un sistema externo que el desmoldeo ha finalizado y que los componentes producidos pueden ser procesados. Esto también consume la notificación de procesamiento exitoso de esos componentes por parte del sistema externo, para que pueda comenzar a realizar tareas de mantenimiento.

Añadiendo dependencias necesarias para los módulos iniciales

<!-- ipf-component-store implementation which uses mongodb to store and read components -->
<dependency>
    <groupId>com.iconsolutions.ipf.componentstore</groupId>
    <artifactId>ipf-component-store-mongo</artifactId>
</dependency>
<!-- Kafka receive connector implementation of ipf-debulker-new-file-notification which consumes FileNotification message which tells debulker to process bulk file -->
<dependency>
    <groupId>com.iconsolutions.ipf.debulk</groupId>
    <artifactId>ipf-debulker-new-file-notification-connector-kafka</artifactId>
</dependency>
<!-- ipf-debulker-archiver implementation which archives processed bulk file to local file system -->
<dependency>
    <groupId>com.iconsolutions.ipf.debulk</groupId>
    <artifactId>ipf-debulker-archiver-local</artifactId>
</dependency>
<!-- Kafka connector implementation of ipf-debulker-client-processing -->
<dependency>
    <groupId>com.iconsolutions.ipf.debulk</groupId>
    <artifactId>ipf-debulker-client-processing-connector-kafka</artifactId>
</dependency>

Configurando Debulker en su aplicación

Para dividir el archivo de un tipo específico (xml, json..) y estructura, se debe proporcionar la configuración para la división. Esto se puede hacer a través de ipf.debulker.configurations propiedad. Se espera un arreglo de objetos de configuración, cada uno conteniendo:

  • nombre (cadena)- se utiliza para identificar de manera única la configuración. La notificación del archivo contendrá el nombre de la configuración que se utilizará para desagrupar el archivo.

  • divisor (cadena)- tipo de separador que se utilizará para extraer los componentes. Actualmente, hay xml y json.

  • jerarquía-de-componentes (objeto)- estructura de árbol que representa la jerarquía de los componentes que serán extraídos del archivo masivo. Cada nodo puede tener nodos hijos configurados que serán extraídos como componentes separados. El contenido de los componentes hijos será omitido del componente padre.

Ejemplo de configuración para desagrupar pain.001.001.09 XML file.

ipf.debulker {
  configurations = [
    {
      name = "pain.001.001.09"
      splitter = "xml"
      archive-path = "/tmp/bulk_archive"
      processing-entity = "TEST-ENTITY"
      component-hierarchy {
        marker = "Document"
        children = [
          {
            marker = "CstmrCdtTrfInitn. PmtInf"
            children = [
              {
                marker = "CdtTrfTxInf"
              }
            ]
          }
        ]
      }
    }
  ]
}

Esta configuración nos indica que el pain.001 XML file se descompondrá en unidades individuales Document componente, que contiene todos los elementos hijos excepto CstmrCdtTrfInitn. PmtInf elementos, que serán extraídos como componentes separados. Cada hijo PmtInf el componente contendrá todos los elementos hijos excepto CdtTrfTxInf elementos, que serán extraídos como componentes secundarios separados de cada PmtInf. En caso de un pain.001 XML file que tiene 3 PmtInf elementos, cada uno conteniendo 3 CdtTrfTxInf elementos, el desaglomerador producirá 12 componentes:

  • 1 Componente del documento

  • 3 componentes PmtInf

  • 9 componentes CdtTrfTxInf

Configurando Archiver

Dado que estamos utilizando ipf-debulker-archiver-local Para archivar, los archivos se copiarán a la ubicación "/tmp/bulk_archive" según lo configurado anteriormente.

Docker Configuración para la aplicación ipf-debulker-tutorial-app

  ipf-debulker-tutorial-app:
    image:ipf-debulker-tutorial-app:latest
    container_name: ipf-debulker-tutorial-app
    ports:
      - 8080:8080
      - 8559:8558
      - 5006:5005
      - 55002:55001
      - 9002:9001
    volumes:
      -./config/ipf-debulker-tutorial-app:/ipf-debulker-tutorial-app/conf
      -./logs:/ipf/logs
    environment:
      - IPF_JAVA_ARGS=-Dma.glasnost.orika.writeClassFiles=false -Dma.glasnost.orika.writeSourceFiles=false -Dconfig.override_with_env_vars=true
    depends_on:
      - ipf-mongo-kafka
    healthcheck:
      test: [ "CMD", "curl", "http://localhost:8080/actuator/health" ]

Para poder trabajar, ipf-debulker-tutorial-app requiere MongoDB y Kafka.

Ejecutando la aplicación

Puede iniciar application.yml utilizando el siguiente comando:

docker-compose -f application.yml up -d

Probando la aplicación

Ahora que la aplicación está iniciada, podemos probarla, esto se realiza de la siguiente manera:

  1. Proporcionando un archivo de datos fuente en la ubicación esperada.

  2. Enviando una Notificación de Archivo al ipf-debulk, a través de Kafka, para notificar a la aplicación que un archivo está listo para ser procesado.

  3. Validando que el archivo esté despojado.

  4. Enviando un ComponentProcessingCompleteCommand, a través de Kafka, para activar el mantenimiento que eliminará componentes del almacén de componentes y eliminará el archivo masivo.

  5. Validando que se realice la limpieza.

Paso 1 - Creación de pain.001 archivo que será despojado

Dado que tenemos configuración para desagrupar pain.001 XML file, utilizaremos ese para las pruebas.

Pain. 001archivo de muestra.

<Document xmlns="urn:iso:std:iso:20022:tech:xsd:pain.001.001.09">
    <CstmrCdtTrfInitn>
        <GrpHdr>
            <MsgId>abc</MsgId>
        </GrpHdr>
        <PmtInf>
            <PmtInfId>1</PmtInfId>
            <NbOfTxs>2</NbOfTxs>
            <CdtTrfTxInf>
                <PmtId>
                    <EndToEndId>1</EndToEndId>
                </PmtId>
                <Amt>
                    <InstdAmt Ccy="GBP">500.00</InstdAmt>
                </Amt>
            </CdtTrfTxInf>
            <CdtTrfTxInf>
                <PmtId>
                    <EndToEndId>2</EndToEndId>
                </PmtId>
            </CdtTrfTxInf>
        </PmtInf>
        <PmtInf>
            <PmtInfId>2</PmtInfId>
            <NbOfTxs>2</NbOfTxs>
            <CdtTrfTxInf>
                <PmtId>
                    <EndToEndId>3</EndToEndId>
                </PmtId>
            </CdtTrfTxInf>
            <CdtTrfTxInf>
                <PmtId>
                    <EndToEndId>4</EndToEndId>
                </PmtId>
            </CdtTrfTxInf>
        </PmtInf>
        <SplmtryData>
            <Envlp/>
        </SplmtryData>
    </CstmrCdtTrfInitn>
</Document>

Un archivo como este ya ha sido creado y se encuentra en el directorio solutions/add-debulker/docker/bulk_files/.

Paso 2 - Enviando FileNotification a Kafka

Hemos configurado la aplicación para recibir FileNotifications de Kafka, esa notificación tiene varias propiedades que deben ser proporcionadas:

  • configName-nombre de la configuración que será utilizada por el debulker para descomponer el archivo en componentes.

  • bulkId-para correlacionar los componentes producidos por el desmenuzador.

  • fileProvider-nombre del proveedor que se utilizará para recuperar el archivo masivo para su procesamiento.

  • filePath-ruta al archivo.

Mensaje de notificación de archivo a enviar:

{
  "configName": "pain.001.001.09",
  "bulkId": "pain.001.12345",
  "fileProvider": "local",
  "filePath": "/tmp/bulk_files",
  "fileName": "pain_001_test.xml"
}

podemos enviar el archivoNotification a Kafka usando Productor de consola Kafka:

./kafka-console-producer.sh --topic FILE_NOTIFICATION_REQUEST --bootstrap-server localhost:9092

El mensaje que estamos enviando debe estar en una sola línea:

{"configName": "pain.001.001.09", "bulkId": "pain.001.12345", "fileProvider": "local", "filePath": "/tmp/bulk_files/", "fileName": "pain_001_test.xml"}

Paso 3 Validando que el archivo esté descomprimido

En este punto, el debulker debe haber recibido la notificación, accedido al archivo y realizado el debulking. Hay un par de cosas que debemos verificar:

  • El archivo pain_001_test.xml debe existir en el directorio bulk_archive.

  • los componentes deben estar presentes en el almacén de componentes

  • El comando InitiateComponentProcessingCommand (notifica a la parte interesada que los componentes en bloque están listos para su procesamiento) se envía a Kafka. El tema predeterminado es CLIENT_PROCESSING_REQUEST.

La forma más fácil de verificar los componentes es a través de la tienda de componentes. REST API. Deberá agregar la dependencia maven ipf-component-store-service:

<dependency>
    <groupId>com.iconsolutions.ipf.componentstore</groupId>
    <artifactId>ipf-component-store-service</artifactId>
</dependency>

Debemos ejecutar el siguiente comando desde la línea de comandos para obtener todos los componentes relacionados con nuestro volumen:

curl http://localhost:8080/v1/components/pain.001.12345 | json_pp

Este debería ser el resultado esperado para el archivo pain_001_test.xml despojado (el número de componente y su contenido deben ser los mismos):

[
   {
      "bulkId": {
         "value": "pain.001.12345"
      },
      "content": "<CdtTrfTxInf><PmtId><EndToEndId>1</EndToEndId></PmtId><Amt><InstdAmt Ccy=\"GBP\">500.00</InstdAmt></Amt></CdtTrfTxInf>",
      "creationTime": "2023-02-27T14:15:52.788Z",
      "custom": null,
      "id": {
         "value": "a78d25e5-3625-4fe6-86dd-b220b92d9e14"
      },
      "index": 2,
      "marker": "Document. CstmrCdtTrfInitn. PmtInf. CdtTrfTxInf",
      "parentId": {
         "value": "bd8f8a58-e9d8-4e16-805e-5b7b1579584c"
      }
   },
   {
      "bulkId": {
         "value": "pain.001.12345"
      },
      "content": "<CdtTrfTxInf><PmtId><EndToEndId>2</EndToEndId></PmtId></CdtTrfTxInf>",
      "creationTime": "2023-02-27T14:15:52.845Z",
      "custom": null,
      "id": {
         "value": "cc1a2d7c-4f94-4c94-a755-242c99881162"
      },
      "index": 3,
      "marker": "Document. CstmrCdtTrfInitn. PmtInf. CdtTrfTxInf",
      "parentId": {
         "value": "bd8f8a58-e9d8-4e16-805e-5b7b1579584c"
      }
   },
   {
      "bulkId": {
         "value": "pain.001.12345"
      },
      "content": "<PmtInf><PmtInfId>1</PmtInfId><NbOfTxs>2</NbOfTxs></PmtInf>",
      "creationTime": "2023-02-27T14:15:52.851Z",
      "custom": null,
      "id": {
         "value": "bd8f8a58-e9d8-4e16-805e-5b7b1579584c"
      },
      "index": 1,
      "marker": "Document. CstmrCdtTrfInitn. PmtInf",
      "parentId": {
         "value": "c35a269e-4e58-4b16-83b5-32c2bb5000f4"
      }
   },
   {
      "bulkId": {
         "value": "pain.001.12345"
      },
      "content": "<CdtTrfTxInf><PmtId><EndToEndId>3</EndToEndId></PmtId></CdtTrfTxInf>",
      "creationTime": "2023-02-27T14:15:52.857Z",
      "custom": null,
      "id": {
         "value": "ab312e01-9385-4846-93c4-75bdd0d94c66"
      },
      "index": 5,
      "marker": "Document. CstmrCdtTrfInitn. PmtInf. CdtTrfTxInf",
      "parentId": {
         "value": "f9086dbb-5f8e-43ee-bf83-5ee92f0255bb"
      }
   },
   {
      "bulkId": {
         "value": "pain.001.12345"
      },
      "content": "<CdtTrfTxInf><PmtId><EndToEndId>4</EndToEndId></PmtId></CdtTrfTxInf>",
      "creationTime": "2023-02-27T14:15:52.863Z",
      "custom": null,
      "id": {
         "value": "fd9a777d-e012-44a5-8560-903cdafe65f6"
      },
      "index": 6,
      "marker": "Document. CstmrCdtTrfInitn. PmtInf. CdtTrfTxInf",
      "parentId": {
         "value": "f9086dbb-5f8e-43ee-bf83-5ee92f0255bb"
      }
   },
   {
      "bulkId": {
         "value": "pain.001.12345"
      },
      "content": "<PmtInf><PmtInfId>2</PmtInfId><NbOfTxs>2</NbOfTxs></PmtInf>",
      "creationTime": "2023-02-27T14:15:52.868Z",
      "custom": null,
      "id": {
         "value": "f9086dbb-5f8e-43ee-bf83-5ee92f0255bb"
      },
      "index": 4,
      "marker": "Document. CstmrCdtTrfInitn. PmtInf",
      "parentId": {
         "value": "c35a269e-4e58-4b16-83b5-32c2bb5000f4"
      }
   },
   {
      "bulkId": {
         "value": "pain.001.12345"
      },
      "content": "<Document xmlns=\"urn:iso:std:iso:20022:tech:xsd:pain.001.001.09\"><CstmrCdtTrfInitn><GrpHdr><MsgId>abc</MsgId></GrpHdr><SplmtryData><Envlp></Envlp></SplmtryData></CstmrCdtTrfInitn></Document>",
      "creationTime": "2023-02-27T14:15:52.873Z",
      "custom": null,
      "id": {
         "value": "c35a269e-4e58-4b16-83b5-32c2bb5000f4"
      },
      "index": 0,
      "marker": "Document",
      "parentId": null
   }
]

Verificando si se envía InitiateComponentProcessingCommand a kafka iniciando el consumidor de consola (este es el comando que se envía a la aplicación/flujo del cliente para informarle que se ha recibido un lote, se ha desagregado y los componentes están listos para su procesamiento):

./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic CLIENT_PROCESSING_REQUEST --from-beginning

Debería haber un único mensaje que debería verse así:

{"bulkId":"pain.001.12345"}

Paso 4: Activación del mantenimiento del desmenuzador

El siguiente paso sería enviar el ComponentProcessingCompleteCommand para notificar al debulker que los componentes han sido procesados y que el debulker puede realizar tareas de mantenimiento (eliminar el archivo, retirar componentes del almacén de componentes). El tema predeterminado para enviar este mensaje es CLIENT_PROCESSING_RESPONSE.

Iniciando el productor de consola de Kafka:

./kafka-console-producer.sh --topic CLIENT_PROCESSING_RESPONSE --bootstrap-server localhost:9092

Enviando el mensaje:

{"bulkId": "pain.001.12345"}

Paso 5: Validando que se realice la limpieza

Hay un par de cosas que debemos verificar:

  • El archivo pain_001_test.xml debe ser eliminado del directorio bulk_files.

  • todos los componentes relacionados con bulkId=" pain.001.12345" debe ser eliminado del almacén de componentes

Conclusiones

En esta sección nosotros:

  • Configuró con éxito el desagüe en nuestro IPF application

  • Procesado un pain.001 XML file y se ha validado que los componentes son producidos y el archivo está archivado.

  • Se activó el mantenimiento de la carga masiva eliminando un archivo de carga masiva y eliminando componentes del almacén de componentes.