Usando Debulker

¿Qué es un Debulker?

A Debulker es un módulo de aplicación responsable de dividir archivos grandes en componentes que serán utilizados para el procesamiento. Debulker espera recibir una notificación sobre el archivo que necesita ser debulked, junto con el nombre de configuración que se va a utilizar para dividir el archivo en componentes.

Debulker en su aplicación

El primer paso es agregar el debulker iniciador maven módulos a su IPF application pom:

<!--Responsible for debulking-->
<dependency>
  <groupId>com.iconsolutions.ipf.debulk</groupId>
  <artifactId>ipf-debulker-starter</artifactId>
</dependency>
<!--Responsible for cleaning up after bulk components are processed by interested party-->
<dependency>
  <groupId>com.iconsolutions.ipf.debulk</groupId>
  <artifactId>ipf-debulker-housekeeping-starter</artifactId>
</dependency>

Este módulo depende de algunos módulos adicionales:

  • almacen-de-componentes-responsable de almacenar los componentes producidos por debulker.

  • ipf-debulker-nueva-notificación-de-archivo-responsable de consumir la notificación del archivo que debería activar debulking del archivo.

  • ipf-debulker-archiver-responsable de archivar el bulk archivo después de que fue procesado con éxito.

  • ipf-debulker-procesamiento-de-clientes-notifica a un sistema externo que debulking está terminado y los componentes producidos pueden ser procesados. Esto también consume la notificación de procesamiento exitoso de esos componentes por el sistema externo, para que pueda comenzar a realizar tareas de mantenimiento.

Añadiendo dependencias necesarias para los módulos de inicio

<!-- ipf-component-store implementation which uses mongodb to store and read components -->
<dependency>
    <groupId>com.iconsolutions.ipf.componentstore</groupId>
    <artifactId>ipf-component-store-mongo</artifactId>
</dependency>
<!-- Kafka receive connector implementation of ipf-debulker-new-file-notification which consumes FileNotification message which tells debulker to process bulk file -->
<dependency>
    <groupId>com.iconsolutions.ipf.debulk</groupId>
    <artifactId>ipf-debulker-new-file-notification-connector-kafka</artifactId>
</dependency>
<!-- ipf-debulker-archiver implementation which archives processed bulk file to local file system -->
<dependency>
    <groupId>com.iconsolutions.ipf.debulk</groupId>
    <artifactId>ipf-debulker-archiver-local</artifactId>
</dependency>
<!-- Kafka connector implementation of ipf-debulker-client-processing -->
<dependency>
    <groupId>com.iconsolutions.ipf.debulk</groupId>
    <artifactId>ipf-debulker-client-processing-connector-kafka</artifactId>
</dependency>

Configurando Debulker en su aplicación

Para dividir el archivo de tipo específico (xml,json..) y estructura, se debe proporcionar la configuración para las necesidades de división. Esto puede hacerse a través de ipf.debulker.configurations propiedad. Se espera un arreglo de objetos de configuración, cada uno conteniendo:

  • nombre (cadena)- utilizado para identificar de manera única la configuración. La notificación del archivo contendrá el nombre de la configuración que se utilizará para debulk el archivo.

  • divisor (cadena)- splitter tipo que se utilizará para extraer los componentes. Actualmente, hay xml y json.

  • jerarquía-de-componentes (objeto)- estructura de árbol que representa la jerarquía de los componentes que serán extraídos de la bulk archivo. Cada nodo puede tener nodos hijos configurados que serán extraídos como componentes separados. El contenido de los componentes hijos será omitido del componente padre.

Ejemplo de configuración para debulking pain. 001. 001. 09 XML file.

ipf.debulker {
  configurations = [
    {
      name = "pain. 001. 001. 09"
      splitter = "xml"
      archive-path = "/tmp/bulk_archive"
      processing-entity = "TEST-ENTITY"
      component-hierarchy {
        marker = "Document"
        children = [
          {
            marker = "CstmrCdtTrfInitn. PmtInf"
            children = [
              {
                marker = "CdtTrfTxInf"
              }
            ]
          }
        ]
      }
    }
  ]
}

Esta configuración nos indica que el pain. 001 XML file será debulked en uno solo Document componente, que contiene todos los elementos hijos excepto CstmrCdtTrfInitn. PmtInf elementos, que serán extraídos como componentes separados. Cada hijo PmtInf el componente contendrá todos los elementos hijos excepto CdtTrfTxInf elementos, que serán extraídos como componentes secundarios separados de cada PmtInf. En caso de un pain. 001 XML file que tiene 3 PmtInf elementos, cada uno conteniendo 3 CdtTrfTxInf elementos,debulker producirá 12 componentes:

  • 1 Componente del documento

  • 3 componentes PmtInf

  • 9 componentes CdtTrfTxInf

Configurando Archiver

Dado que estamos utilizando ipf-debulker-archiver-local Para archivar, los archivos se copiarán a la ubicación "/tmp/bulk_archive " como se configuró arriba.

Docker Configuración para ipf-debulker-tutorial-app

  ipf-debulker-tutorial-app:
    image:ipf-debulker-tutorial-app:latest
    container_name: ipf-debulker-tutorial-app
    ports:
      - 8080:8080
      - 8559:8558
      - 5006:5005
      - 55002:55001
      - 9002:9001
    volumes:
      -./config/ipf-debulker-tutorial-app:/ipf-debulker-tutorial-app/conf
      -./logs:/ipf/logs
    environment:
      - IPF_JAVA_ARGS=-Dma.glasnost.orika.writeClassFiles=false -Dma.glasnost.orika.writeSourceFiles=false -Dconfig.override_with_env_vars=true
    depends_on:
      - ipf-mongo-kafka
    healthcheck:
      test: [ "CMD", "curl", "http://localhost:8080/actuator/health" ]

Para poder trabajar ipf-debulker-tutorial-app requiere MongoDB y Kafka.

Ejecutando la aplicación

Puede iniciar application.yml utilizando el siguiente comando:

docker-compose -f application.yml up -d

Probando la aplicación

Ahora que la aplicación está iniciada, podemos probarla, esto se realiza de la siguiente manera:

  1. Proporcionando un archivo de datos fuente en la ubicación esperada.

  2. Enviando una Notificación de Archivo al ipf-debulk, a través de Kafka, para notificar a la aplicación que un archivo está listo para ser procesado.

  3. Validando que el archivo es debulked.

  4. Enviando un ComponentProcessingCompleteCommand, a través de Kafka, para activar el mantenimiento que eliminará componentes de component store y elimine el bulk archivo.

  5. Validando que se realice el servicio de limpieza.

Paso 1 - Creación de pain. 001 archivo que será debulked

Dado que tenemos configuración para debulking pain. 001 XML file, utilizaremos ese para las pruebas.

Pain. 001 archivo de muestra.
<Document xmlns="urn:iso:std:iso:20022:tech:xsd:pain. 001. 001. 09">
    <CstmrCdtTrfInitn>
        <GrpHdr>
            <MsgId>abc</MsgId>
        </GrpHdr>
        <PmtInf>
            <PmtInfId>1</PmtInfId>
            <NbOfTxs>2</NbOfTxs>
            <CdtTrfTxInf>
                <PmtId>
                    <EndToEndId>1</EndToEndId>
                </PmtId>
                <Amt>
                    <InstdAmt Ccy="GBP">500. 00</InstdAmt>
                </Amt>
            </CdtTrfTxInf>
            <CdtTrfTxInf>
                <PmtId>
                    <EndToEndId>2</EndToEndId>
                </PmtId>
            </CdtTrfTxInf>
        </PmtInf>
        <PmtInf>
            <PmtInfId>2</PmtInfId>
            <NbOfTxs>2</NbOfTxs>
            <CdtTrfTxInf>
                <PmtId>
                    <EndToEndId>3</EndToEndId>
                </PmtId>
            </CdtTrfTxInf>
            <CdtTrfTxInf>
                <PmtId>
                    <EndToEndId>4</EndToEndId>
                </PmtId>
            </CdtTrfTxInf>
        </PmtInf>
        <SplmtryData>
            <Envlp/>
        </SplmtryData>
    </CstmrCdtTrfInitn>
</Document>

El archivo como este ya ha sido creado y se encuentra en solutions/add-debulker/docker/bulk_files/ directorio.

Paso 2 - Enviando FileNotification a Kafka

Hemos configurado la aplicación para recibir FileNotifications de Kafka, esa notificación tiene varias propiedades que deben ser proporcionadas:

  • configName-nombre de la configuración que será utilizada por debulker to debulk el bulk archivo en componentes.

  • bulkId-para correlacionar los componentes producidos por debulker.

  • fileProvider-nombre del proveedor que se utilizará para recuperar el bulk archivo para procesamiento.

  • filePath-ruta al archivo.

Mensaje de notificación de archivo a enviar:

{
  "configName": "pain. 001. 001. 09",
  "bulkId": "pain. 001. 12345",
  "fileProvider": "local",
  "filePath": "/tmp/bulk_files",
  "fileName": "pain_001_test.xml"
}

podemos enviar el archivoNotification a Kafka usando Productor de consola Kafka:

./kafka-console-producer.sh --topic FILE_NOTIFICATION_REQUEST --bootstrap-server localhost:9092

El mensaje que estamos enviando debe estar en una sola línea:

{"configName": "pain. 001. 001. 09", "bulkId": "pain. 001. 12345", "fileProvider": "local", "filePath": "/tmp/bulk_files/", "fileName": "pain_001_test.xml"}

Paso 3 Validando que el archivo es debulked

En este momento, el debulker debe haber recibido la notificación, accedido al archivo y debulked Hay un par de cosas que debemos verificar:

  • pain_001_test.xml file debe existir en bulk_archive directorio

  • los componentes deben estar presentes en el component store

  • InitiarComandoDeProcesamientoDeComponentes (notifica a la parte interesada que bulk los componentes están listos para su procesamiento) se envía a kafka. El tema predeterminado es CLIENT_PROCESSING_REQUEST.

La forma más sencilla de verificar los componentes es a través de component store REST API. Deberá añadir ipf-component-store-service maven dependency:

<dependency>
    <groupId>com.iconsolutions.ipf.componentstore</groupId>
    <artifactId>ipf-component-store-service</artifactId>
</dependency>

Debemos ejecutar el siguiente comando desde la línea de comandos para obtener todos los componentes relacionados con nuestro bulk:

curl http://localhost:8080/v1/components/pain. 001. 12345 | json_pp

Esto debería ser la salida esperada para debulked*pain_001_test.xml* archivo (el número del componente y su contenido deben ser los mismos):

[
   {
      "bulkId": {
         "value": "pain. 001. 12345"
      },
      "content": "<CdtTrfTxInf><PmtId><EndToEndId>1</EndToEndId></PmtId><Amt><InstdAmt Ccy=\"GBP\">500. 00</InstdAmt></Amt></CdtTrfTxInf>",
      "creationTime": "2023-02-27T14:15:52. 788Z",
      "custom": null,
      "id": {
         "value": "a78d25e5-3625-4fe6-86dd-b220b92d9e14"
      },
      "index": 2,
      "marker": "Document. CstmrCdtTrfInitn. PmtInf. CdtTrfTxInf",
      "parentId": {
         "value": "bd8f8a58-e9d8-4e16-805e-5b7b1579584c"
      }
   },
   {
      "bulkId": {
         "value": "pain. 001. 12345"
      },
      "content": "<CdtTrfTxInf><PmtId><EndToEndId>2</EndToEndId></PmtId></CdtTrfTxInf>",
      "creationTime": "2023-02-27T14:15:52. 845Z",
      "custom": null,
      "id": {
         "value": "cc1a2d7c-4f94-4c94-a755-242c99881162"
      },
      "index": 3,
      "marker": "Document. CstmrCdtTrfInitn. PmtInf. CdtTrfTxInf",
      "parentId": {
         "value": "bd8f8a58-e9d8-4e16-805e-5b7b1579584c"
      }
   },
   {
      "bulkId": {
         "value": "pain. 001. 12345"
      },
      "content": "<PmtInf><PmtInfId>1</PmtInfId><NbOfTxs>2</NbOfTxs></PmtInf>",
      "creationTime": "2023-02-27T14:15:52. 851Z",
      "custom": null,
      "id": {
         "value": "bd8f8a58-e9d8-4e16-805e-5b7b1579584c"
      },
      "index": 1,
      "marker": "Document. CstmrCdtTrfInitn. PmtInf",
      "parentId": {
         "value": "c35a269e-4e58-4b16-83b5-32c2bb5000f4"
      }
   },
   {
      "bulkId": {
         "value": "pain. 001. 12345"
      },
      "content": "<CdtTrfTxInf><PmtId><EndToEndId>3</EndToEndId></PmtId></CdtTrfTxInf>",
      "creationTime": "2023-02-27T14:15:52. 857Z",
      "custom": null,
      "id": {
         "value": "ab312e01-9385-4846-93c4-75bdd0d94c66"
      },
      "index": 5,
      "marker": "Document. CstmrCdtTrfInitn. PmtInf. CdtTrfTxInf",
      "parentId": {
         "value": "f9086dbb-5f8e-43ee-bf83-5ee92f0255bb"
      }
   },
   {
      "bulkId": {
         "value": "pain. 001. 12345"
      },
      "content": "<CdtTrfTxInf><PmtId><EndToEndId>4</EndToEndId></PmtId></CdtTrfTxInf>",
      "creationTime": "2023-02-27T14:15:52. 863Z",
      "custom": null,
      "id": {
         "value": "fd9a777d-e012-44a5-8560-903cdafe65f6"
      },
      "index": 6,
      "marker": "Document. CstmrCdtTrfInitn. PmtInf. CdtTrfTxInf",
      "parentId": {
         "value": "f9086dbb-5f8e-43ee-bf83-5ee92f0255bb"
      }
   },
   {
      "bulkId": {
         "value": "pain. 001. 12345"
      },
      "content": "<PmtInf><PmtInfId>2</PmtInfId><NbOfTxs>2</NbOfTxs></PmtInf>",
      "creationTime": "2023-02-27T14:15:52. 868Z",
      "custom": null,
      "id": {
         "value": "f9086dbb-5f8e-43ee-bf83-5ee92f0255bb"
      },
      "index": 4,
      "marker": "Document. CstmrCdtTrfInitn. PmtInf",
      "parentId": {
         "value": "c35a269e-4e58-4b16-83b5-32c2bb5000f4"
      }
   },
   {
      "bulkId": {
         "value": "pain. 001. 12345"
      },
      "content": "<Document xmlns=\"urn:iso:std:iso:20022:tech:xsd:pain. 001. 001. 09\"><CstmrCdtTrfInitn><GrpHdr><MsgId>abc</MsgId></GrpHdr><SplmtryData><Envlp></Envlp></SplmtryData></CstmrCdtTrfInitn></Document>",
      "creationTime": "2023-02-27T14:15:52. 873Z",
      "custom": null,
      "id": {
         "value": "c35a269e-4e58-4b16-83b5-32c2bb5000f4"
      },
      "index": 0,
      "marker": "Document",
      "parentId": null
   }
]

Verificando si se envía InitiateComponentProcessingCommand a kafka al iniciar el consumidor de consola (este es el comando que se envía a la aplicación/flujo del cliente para informarle de un bulk ha sido recibido,debulked y los componentes están listos para el procesamiento):

./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic CLIENT_PROCESSING_REQUEST --from-beginning

Debería haber un único mensaje que debería verse así:

{"bulkId":"pain. 001. 12345"}

Paso 4: Activación debulker mantenimiento del hogar

El siguiente paso sería enviar el ComponentProcessingCompleteCommand para notificar debulker que los componentes son procesados y debulker puede realizar tareas de mantenimiento (eliminar el archivo, quitar componentes de component store). El tema predeterminado para enviar este mensaje es CLIENT_PROCESSING_RESPONSE

Comenzando kafka productor de consola:

./kafka-console-producer.sh --topic CLIENT_PROCESSING_RESPONSE --bootstrap-server localhost:9092

Enviando el mensaje:

{"bulkId": "pain. 001. 12345"}

Paso 5: Validando que se realice la limpieza

Hay un par de cosas que debemos verificar:

  • pain_001_test.xml file debe ser eliminado de bulk_files directorio

  • todos los componentes relacionados con bulkId= " pain. 001. 12345" debe ser eliminado de component store

Conclusiones

En esta sección nosotros:

  • Configurado con éxito debulker en nuestro IPF application

  • Procesado un pain. 001 XML file y se ha validado que los componentes son producidos y el archivo está archivado.

  • Activó el mantenimiento de bulk eliminando un bulk archivo y eliminando componentes de component store.