Usando Bulker

¿Qué es un Bulker?

Un bulker es un módulo de aplicación responsable de reunir transacciones o componentes individuales, y actúa inicialmente como un área de almacenamiento temporal donde el principal IPF flow puede almacenar elementos que eventualmente terminarán en un archivo estructurado. Cuando se le indique, el Bulker transmitirá cada elemento, en un orden preconfigurado, a un archivo en una ubicación predefinida.

Ejemplo Bulker Aplicación

Para explicar cómo implementar la funcionalidad de bulker en su aplicación, utilizaremos una aplicación de ejemplo que está disponible en la carpeta solutions de ipf-tutorial bajo add-bulker.

Revisando el archivo pom del proyecto add-bulker, hay dos dependencias principales que añaden bulking capacidad

<!--Responsible for bulking components-->
<dependency>
    <groupId>com.iconsolutions.ipf.bulk</groupId>
    <artifactId>ipf-bulker-aggregate-starter</artifactId>
</dependency>
<!--Responsible for creating output from the bulked components-->
<dependency>
    <groupId>com.iconsolutions.ipf.bulk</groupId>
    <artifactId>ipf-bulker-bulk-producer-starter</artifactId>
</dependency>
  • ipf-bulker-xml-component-parser-componente que facilita el procesamiento de contenido xml para detectar la posición en bytes donde se pueden inyectar componentes secundarios

  • scheduler-core-proporciona scheduling funcionalidad que se utiliza para el cierre automático de lotes (si corresponde). También se utiliza para retrasar el registro de lotes secundarios para asegurar que el lote principal haya sido creado previamente. Bulker utiliza un persistent scheduler las solicitudes programadas se recuperan en caso de una falla de nodo. Más detalles sobre el scheduler están disponibles

aquí * scheduler-domain-contiene objetos de dominio que apoyan el scheduler-core módulo

Añadiendo dependencias necesarias para los módulos iniciales

Además de los dos módulos anteriores, también deben añadirse los siguientes a su archivo pom para agregar funcionalidad de carga masiva a una aplicación.

<!-- ipf-component-store implementation which uses mongodb to store components. This store is read from when assembling and streaming output to a file -->
<dependency>
    <groupId>com.iconsolutions.ipf.componentstore</groupId>
    <artifactId>ipf-component-store-mongo</artifactId>
</dependency>
<!-- module provides a http rest interface which allows querying of the component store-->
<dependency>
    <groupId>com.iconsolutions.ipf.componentstore</groupId>
    <artifactId>ipf-component-store-service</artifactId>
</dependency>
<!-- Kafka receive connector implementation of BulkIngestionReceiveConnector, which is responsible for receiving various BulkIngestionMessages (commands) e.g. CreateBulkMessage, AddComponentMessage -->
<dependency>
    <groupId>com.iconsolutions.ipf.bulk</groupId>
    <artifactId>ipf-bulker-ingestion-connector-kafka</artifactId>
</dependency>
<!-- Kafka implementation of ipf-bulker-status-sender-connector which allows you to send status messages upon completion of execution of received commands to a kafka topic -->
<dependency>
    <groupId>com.iconsolutions.ipf.bulk</groupId>
    <artifactId>ipf-bulker-status-sender-connector-kafka</artifactId>
</dependency>
<!-- Local implementation of ipf-bulker-output-stream which streams the bulker output to the local file system -->
<dependency>
    <groupId>com.iconsolutions.ipf.bulk</groupId>
    <artifactId>ipf-bulker-output-stream-local</artifactId>
</dependency>
<!-- Kafka implementation of ipf-bulker-notifications-connector which is used by ProjectionHandlers in ipf-bulker-bulk-producer to send notifications on specific
events occurring e.g. BulkAutoClosed-->
<dependency>
    <groupId>com.iconsolutions.ipf.bulk</groupId>
    <artifactId>ipf-bulker-notifications-connector-kafka</artifactId>
</dependency>
<!-- The bulker module is event sourced and this enables using mongo for journal storage -->
<dependency>
    <groupId>com.iconsolutions.ipf.core.platform</groupId>
    <artifactId>ipf-write-starter-mongo</artifactId>
</dependency>
<!-- Allows for the processing of journal events to create read side projections/send notifications -->
<dependency>
    <groupId>com.iconsolutions.ipf.core.platform</groupId>
    <artifactId>ipf-journal-processor-starter-mongo</artifactId>
</dependency>
<!-- Logs messages exchanges to/from the application to mongo -->
<dependency>
    <groupId>com.iconsolutions.ipf.core.messagelogger</groupId>
    <artifactId>message-logger-mongo</artifactId>
</dependency>

Configurando Bulker en su aplicación

Con el fin de ensamblar los componentes almacenados en un bulked archivo, su aplicación debe proporcionar configuración como la estructura del archivo (jerarquía de componentes), marcadores en el documento que indiquen dónde se encuentran los componentes en el flujo y el nombre de la configuración dada. Esto puede hacerse a través de ipf.bulker.configurations propiedad. Se espera un arreglo de objetos de configuración, cada uno conteniendo:

  • nombre (cadena)- se utiliza para identificar de manera única la configuración. Las solicitudes para agregar componentes deben especificar un nombre coincidente para que el sistema de carga masiva sepa cómo manejar la solicitud.

  • prefijo-del-nombre-de-archivo (cadena)- prefijo para archivos en masa generados

  • jerarquía-de-componentes (objeto)- estructura de árbol que representa la jerarquía de los componentes que serán extraídos del archivo masivo. Cada nodo puede tener nodos hijos configurados que serán extraídos como componentes separados. El contenido de los componentes hijos será omitido del componente padre.

  • before-elements-lista de elementos hermanos que el componente hijo debe preceder en la salida masiva

  • auto-close-triggers es una lista de cadenas, cada entrada debe coincidir con el resultado de la getName() método del AutoCloseTrigger que debe aplicarse a esta configuración

  • programar-cierre-automático-se pueden especificar dos opciones aquí (una o ambas) auto-cerrar-por-edad o programar-en-cron. Al menos una de estas opciones debe ser especificada si la configuración de cierre automático del horario existe.

  • finalizar-al-cerrar-automáticamente - Establecer esta opción en verdadero comenzará a transmitir a un archivo de salida tan pronto como se haya cerrado el lote.

Ejemplo de configuración para bulking a pain.001.001.09 XML file.

ipf.bulker {
  configurations = [
    {
      name = "pain.001"
      file-name-prefix = "bulk-"
      file-path = "/tmp/bulk-output"
      component-hierarchy {
        insertion-point-finder = "xml"
        marker = "Document"
        children = [
          {
            marker = "CstmrCdtTrfInitn.PmtInf"
            before-elements = ["SplmtryData"]
            children = [
              {
                marker = "CdtTrfTxInf"
              }
            ]
          },
          {
            marker = "CstmrCdtTrfInitn.SplmtryData"
          }
        ]
      }
      finalise-on-auto-close = true
    },
  ]
}

La configuración anterior producirá un único archivo masivo en la ubicación "/tmp/bulk-output" con un prefijo de "bulk-" que consiste en componentes en el siguiente orden jerárquico Documento →CstmrCdtTrfInitn→PmtInf→CdtTrfTxInf. Cuando el volumen abierto esté cerrado, el volumen ya no aceptará comandos para agregar nuevos o eliminar componentes existentes, pero permitirá la actualización de los componentes existentes. La Finalización Bulk el comando será un disparador para que un productor a granel comience a transmitir componentes a granel a un archivo. Dado que finalise-on-auto-close es cierto, si el volumen se cierra automáticamente, también se finalizará (transmitido a un archivo) al mismo tiempo, en el directorio /tmp/bulk_output.

Definiendo Input Adapter

Como se puede ver en la sección de dependencias anterior, hay un ipf-bulker-ingestion-connector-kafka dependencia que permite a la aplicación aceptar BulkIngestionMessage solicitudes sobre el tema BULK_INGESTION_REQUEST. Esto luego invoca un BulkIngestionReceiveClientAdapter, que es la implementación de la interfaz definida en nuestra aplicación de ejemplo. Contiene dos métodos,determineProcessingContext y handle.

  • determineProcessingContext acepta el BulkIngestionMessage como entrada, y puede opcionalmente mapear campos del mensaje en el ProcessingContext(que es una clase IPF especial que contiene identificadores únicos relacionados con el procesamiento)

  • handle el método también acepta el BulkIngestionMessage, que es donde realizamos el procesamiento real del mensaje. En este caso, estamos manejando los diferentes tipos de mensajes de solicitud posibles, por ejemplo. AddComponentMessage,FinaliseBulkMessage etc. y luego invocando el relevante Bulker métodos para realizar la acción solicitada.

Creando un generador de componentes raíz

`ComponentGenerator`se utiliza para generar contenido para el componente raíz de una creación masiva automática. En la aplicación de ejemplo, hemos utilizado un generador codificado que genera contenido a partir de archivos de plantilla contenidos en el directorio de recursos.

Padre Bulk Proveedor de Referencia

Aplicable para volúmenes recurrentes, el ParentBulkReferenceProvider se llama por el buque granelero para que los graneles hijos puedan asociarse con el granel padre relevante. Toma el BulkIdentifier y BulkSpecification desde el estado actual y devuelve el identificador de lote del padre. En la aplicación de ejemplo, este método asocia el PmtInf mensajes a pain.001 padre. Pero esto también podría utilizarse para agrupar cargas secundarias por InstructingAgentBic por ejemplo.

Docker Configuración para ipf-bulker-tutorial-app

  ipf-bulker-tutorial-app:
    image:ipf-bulker-tutorial-app:latest
    container_name: ipf-bulker-tutorial-app
    ports:
      - 8080:8080
      - 8559:8558
      - 5006:5005
      - 55002:55001
      - 9002:9001
    volumes:
      - ./config/ipf-bulker-tutorial-app:/ipf-bulker-tutorial-app/conf
      - ./bulk_files:/tmp/bulk_files
      - ./bulk_output:/tmp/bulk_output
      - ./logs:/ipf/logs
    environment:
      - IPF_JAVA_ARGS=-Dma.glasnost.orika.writeClassFiles=false -Dma.glasnost.orika.writeSourceFiles=false -Dconfig.override_with_env_vars=true
    depends_on:
      - ipf-mongo
      - kafka
    healthcheck:
      test: [ "CMD", "curl", "http://localhost:8080/actuator/health" ]

Como se muestra en el Docker La configuración anterior, la aplicación ipf-bulker-tutorial-app requiere MongoDB y Kafka para ejecutar.

Ejecutando la aplicación

Puede iniciar application.yml utilizando el siguiente comando desde el directorio de docker:

docker-compose -f application.yml up -d

Probando la aplicación

Run the FullComposeLauncher(que utiliza el application.yml) bajo src/test/java para iniciar la aplicación. Una vez que la aplicación esté en funcionamiento, puede utilizar la InProgressRunner para ejecutar el Bulking pain.001 XML file Prueba BDD.

Además de la Bulking pain.001 XML file prueba, también se han incluido otros escenarios para demostrar diferentes situaciones. Todas las pruebas se pueden ejecutar ejecutando el FeatureTestRunner. El MavenRunner ejecuta la suite completa de pruebas durante la construcción de maven

Notará que el código/recursos bajo el directorio de pruebas son similares a lo que estaba presente en el Tutorial TEST1. Eso es porque estamos probando la aplicación utilizando the Test Framework from Icon

Para interactuar con la aplicación de ejemplo, simplemente envíe mensajes a la BULK_INGESTION_REQUEST tema. Esto puede ser una solicitud para crear un lote, por ejemplo.

Por conveniencia, podemos ejecutar el InProgressRunner y enviará mensajes a la BULK_INGESTION_REQUEST tema para nosotros, así como afirmar que la aplicación de ejemplo envía las respuestas como se espera.

El BDD que es ejecutado por el InProgressRunner se muestra a continuación

Meta:


Narrative:
As an IPF tester I want to test bulking of a pain.001 XML file

Scenario: Bulking pain.001 XML file

When the 'Client' sends a 'Create Bulk Message' with values: (1)
| configName | pain.001 |
Then the 'Client' receives a 'Bulk Status Message' with values: (2)
| status | SUCCESS |
Then the 'Client' receives a 'Bulk Notification' with values: (3)
| name | BulkConfiguredNotification |
When the 'Client' sends a 'document' 'Add Component Message' (4)
Then the 'Client' receives a 'Bulk Status Message' with values:
| status | SUCCESS |
When the 'Client' sends a 'pmtInf' 'Add Component Message' with values: (5)
| parentId | #BULK_STATUS_RECEIVED_STACK[0].document.componentId |
Then the 'Client' receives a 'Bulk Status Message' with values:
| status | SUCCESS |
When the 'Client' sends a 'cdtTrfTxInf' 'Add Component Message' with values: (6)
| parentId | #BULK_STATUS_RECEIVED_STACK[0].document.componentId |
Then the 'Client' receives a 'Bulk Status Message' with values:
| status | SUCCESS |
When the 'Client' sends a 'cdtTrfTxInf' 'Add Component Message' with values: (7)
| parentId | #BULK_STATUS_RECEIVED_STACK[1].document.componentId |
Then the 'Client' receives a 'Bulk Status Message' with values:
| status | SUCCESS |
When the 'Client' sends a 'Close Bulk Message'   (8)
Then the 'Client' receives a 'Bulk Status Message' with values:
| status | SUCCESS |
Then the 'Client' receives a 'Bulk Notification' with values:
| name               | BulkClosedNotification |
| payload.autoClosed | false                  |
Then there are '4' components in the component store
When the 'Client' sends a 'Finalise Bulk Message' (9)
Then the 'Client' receives a 'Bulk Status Message' with values:
| status | SUCCESS |
Then the 'Client' receives a 'Bulk Notification' with values:
| name | BulkFinalisedNotification |
Then the bulk file is produced (10)
1 Se envía un mensaje al tema kafka BULK_INGESTION_REQUEST para crear un Bulk
2 Se envía un mensaje de estado al tema BULK_OPERATION_RESPONSE. Esto se utiliza para comunicar el resultado de la operación.
3 Se envía una notificación al tema BULK_NOTIFICATION. Si necesita devolver información adicional al estado, se debe utilizar una notificación.- en este caso esto devolvió bulkId es requerido para los siguientes comandos de Agregar Componente
4 Se añade un componente a la masa-todos los componentes deben corresponder a un marcador en la configuración y este componente corresponde al marcador "Documento"
5 Se añade un componente a la masa correspondiente a la " CstmrCdtTrfInitn. PmtInf " marcador
6 Se añade un componente a la masa correspondiente a la " CdtTrfTxInf " marcador
7 Se añade un componente a la masa correspondiente a la " CdtTrfTxInf " marcador
8 Se envía una solicitud para cerrar el lote, es decir, el lote ya no aceptará nuevos componentes.
9 Se envía una solicitud para finalizar el lote, es decir, los componentes se ensamblan y se transmiten a una salida.- en este caso el sistema de archivos local
10 Debe haber un archivo recién creado en el directorio /tmp/bulk_output en el contenedor de la aplicación ipf-tutorial-bulker. El archivo también es visible en el directorio docker/bulk_archive del proyecto de solución, de acuerdo con la configuración de docker anterior.

Los mensajes que se envían al tema de kafka esencialmente envuelven la interfaz de carga masiva que proporciona funcionalidad para gestionar cargas masivas.

Se encuentran disponibles detalles adicionales sobre esta interfaz.aquí

Muestra pain.001 producido por la prueba BDD anterior

<Document xmlns="urn:iso:std:iso:20022:tech:xsd:pain.001.001.09" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
    <CstmrCdtTrfInitn>
        <GrpHdr>
            <MsgId>LgALKHUdpnSgYlJtYJrM</MsgId>
            <CreDtTm>2023-10-20T12:31:32.605801Z</CreDtTm>
            <NbOfTxs>5</NbOfTxs>
            <CtrlSum>5</CtrlSum>
            <InitgPty>
                <Id>
                    <PrvtId>
                        <Othr>
                            <Id>200001</Id>
                        </Othr>
                    </PrvtId>
                </Id>
            </InitgPty>
        </GrpHdr>        <PmtInf>
        <PmtInfId>STAFF EXPENSES 001</PmtInfId>
        <PmtMtd>TRF</PmtMtd>
        <NbOfTxs>3</NbOfTxs>
        <CtrlSum>5</CtrlSum>
        <ReqdExctnDt>2013-09-03</ReqdExctnDt>
        <Dbtr>
            <Nm>MR BLOGGS</Nm>
        </Dbtr>
        <DbtrAcct>
            <Id>
                <IBAN>IE75BOFI90377959996017</IBAN>
            </Id>
        </DbtrAcct>
        <DbtrAgt>
            <FinInstnId>
                <BIC>BOFIIE2DXXX</BIC>
            </FinInstnId>
        </DbtrAgt>
        <CdtTrfTxInf>
            <PmtId>
                <EndToEndId>5678090300031</EndToEndId>
            </PmtId>
            <Amt>
                <InstdAmt Ccy="EUR">16.44</InstdAmt>
            </Amt>
            <CdtrAgt>
                <FinInstnId>
                    <BIC>BOFIIE2DXXX</BIC>
                </FinInstnId>
            </CdtrAgt>
            <Cdtr>
                <Nm>Scott, Tiger</Nm>
            </Cdtr>
            <CdtrAcct>
                <Id>
                    <IBAN>IE82BOFI90393929352659</IBAN>
                </Id>
            </CdtrAcct>
            <RmtInf>
                <Ustrd>POSTAGE EXPENSE</Ustrd>
            </RmtInf>
        </CdtTrfTxInf>            <CdtTrfTxInf>
        <PmtId>
            <EndToEndId>5678090300031</EndToEndId>
        </PmtId>
        <Amt>
            <InstdAmt Ccy="EUR">16.44</InstdAmt>
        </Amt>
        <CdtrAgt>
            <FinInstnId>
                <BIC>BOFIIE2DXXX</BIC>
            </FinInstnId>
        </CdtrAgt>
        <Cdtr>
            <Nm>Scott, Tiger</Nm>
        </Cdtr>
        <CdtrAcct>
            <Id>
                <IBAN>IE82BOFI90393929352659</IBAN>
            </Id>
        </CdtrAcct>
        <RmtInf>
            <Ustrd>POSTAGE EXPENSE</Ustrd>
        </RmtInf>
    </CdtTrfTxInf>       </PmtInf>

    </CstmrCdtTrfInitn>
</Document>

Conclusiones

En esta sección nosotros:

  • Se explicaron los componentes clave del Tutorial de Ejemplo. Bulk Aplicación y mostró cómo los componentes de bulker pueden ser ensamblados para agregar funcionalidad de bulker a cualquier aplicación.

  • Partes de un archivo enviadas individualmente al sistema de carga para ser añadidas a un único conjunto.

  • Cerró el volumen y ensambló todas las partes del volumen.

  • Finalizó el procesamiento por lotes, que transmite los contenidos a un único archivo de salida.