Usando Bulker

¿Qué es un Bulker?

A bulker es un módulo de aplicación responsable de reunir transacciones o componentes individuales, y actúa inicialmente como un área de almacenamiento temporal donde el principal IPF flow puede almacenar elementos que serán eventually terminará en un archivo estructurado. Cuando se le indique, el Bulker transmitirá cada elemento, en un orden preconfigurado, a un archivo en una ubicación predefinida.

Ejemplo Bulker Aplicación

Para explicar cómo implementar bulker funcionalidad en su aplicación, utilizaremos una aplicación de ejemplo que está disponible en la carpeta solutions de ipf-tutorial bajo add-bulker

Revisando el archivo pom de la add-bulker proyecto hay dos dependencias principales que añaden bulking capacidad

<!--Responsible for bulking components-->
<dependency>
    <groupId>com.iconsolutions.ipf.bulk</groupId>
    <artifactId>ipf-bulker-aggregate-starter</artifactId>
</dependency>
<!--Responsible for creating output from the bulked components-->
<dependency>
    <groupId>com.iconsolutions.ipf.bulk</groupId>
    <artifactId>ipf-bulker-bulk-producer-starter</artifactId>
</dependency>
  • ipf-bulker-xml-analizador-de-componentes-componente que facilita el procesamiento xml contenido para detectar la posición de bytes donde se pueden inyectar componentes secundarios

  • programador-core-proporciona scheduling funcionalidad que se utiliza para el cierre automático bulks(si corresponde). También se utiliza para retrasar el registro del niño bulks para asegurar que el padre bulk se ha creado previamente. Bulker utiliza un persistent scheduler así scheduled las solicitudes se recuperan en el event de una falla de nodo. Más detalles sobre el scheduler están disponibles

aquí * scheduler-domain-contiene objetos de dominio que apoyan el scheduler-core módulo

Añadiendo dependencias necesarias para los módulos de inicio

Además de los dos módulos anteriores, también deben añadirse los siguientes a su archivo pom de la aplicación para añadir bulker funcionalidad a una aplicación

<!-- ipf-component-store implementation which uses mongodb to store components. This store is read from when assembling and streaming output to a file -->
<dependency>
    <groupId>com.iconsolutions.ipf.componentstore</groupId>
    <artifactId>ipf-component-store-mongo</artifactId>
</dependency>
<!-- module provides a http rest interface which allows querying of the component store-->
<dependency>
    <groupId>com.iconsolutions.ipf.componentstore</groupId>
    <artifactId>ipf-component-store-service</artifactId>
</dependency>
<!-- Kafka receive connector implementation of BulkIngestionReceiveConnector, which is responsible for receiving various BulkIngestionMessages (commands) e.g. CreateBulkMessage, AddComponentMessage -->
<dependency>
    <groupId>com.iconsolutions.ipf.bulk</groupId>
    <artifactId>ipf-bulker-ingestion-connector-kafka</artifactId>
</dependency>
<!-- Kafka implementation of ipf-bulker-status-sender-connector which allows you to send status messages upon completion of execution of received commands to a kafka topic -->
<dependency>
    <groupId>com.iconsolutions.ipf.bulk</groupId>
    <artifactId>ipf-bulker-status-sender-connector-kafka</artifactId>
</dependency>
<!-- Local implementation of ipf-bulker-output-stream which streams the bulker output to the local file system -->
<dependency>
    <groupId>com.iconsolutions.ipf.bulk</groupId>
    <artifactId>ipf-bulker-output-stream-local</artifactId>
</dependency>
<!-- Kafka implementation of ipf-bulker-notifications-connector which is used by ProjectionHandlers in ipf-bulker-bulk-producer to send notifications on specific
events occurring e.g. BulkAutoClosed-->
<dependency>
    <groupId>com.iconsolutions.ipf.bulk</groupId>
    <artifactId>ipf-bulker-notifications-connector-kafka</artifactId>
</dependency>
<!-- The bulker module is event sourced and this enables using mongo for journal storage -->
<dependency>
    <groupId>com.iconsolutions.ipf.core.platform</groupId>
    <artifactId>ipf-write-starter-mongo</artifactId>
</dependency>
<!-- Allows for the processing of journal events to create read side projections/send notifications -->
<dependency>
    <groupId>com.iconsolutions.ipf.core.platform</groupId>
    <artifactId>ipf-journal-processor-starter-mongo</artifactId>
</dependency>
<!-- Logs messages exchanges to/from the application to mongo -->
<dependency>
    <groupId>com.iconsolutions.ipf.core.messagelogger</groupId>
    <artifactId>message-logger-mongo</artifactId>
</dependency>

Configurando Bulker en su aplicación

Con el fin de ensamblar los componentes almacenados en un bulked archivo, su aplicación debe proporcionar configuración como la estructura del archivo (jerarquía de componentes), marcadores en el documento que indiquen dónde se encuentran los componentes en el flujo y el nombre de la configuración dada. Esto puede hacerse a través de ipf.bulker.configurations propiedad. Se espera un arreglo de objetos de configuración, cada uno conteniendo:

  • nombre (cadena)- se utiliza para identificar de manera única la configuración. Las solicitudes para agregar componentes deben especificar un nombre coincidente para que el bulker sabe cómo manejar la solicitud.

  • prefijo-del-nombre-del-archivo (cadena)- prefijo para generado bulk archivos

  • jerarquía-de-componentes (objeto)- estructura de árbol que representa la jerarquía de los componentes que serán extraídos de la bulk archivo. Cada nodo puede tener nodos hijos configurados que serán extraídos como componentes separados. El contenido de los componentes hijos será omitido del componente padre.

  • before-elements-lista de elementos hermanos que el componente hijo debe preceder en la salida bulk

  • auto-close-triggers es una lista de cadenas, cada entrada debe coincidir con el resultado del método getName() de la AutoCloseTrigger que debe aplicarse a esta configuración

  • programar-cierre-automático-se pueden especificar dos opciones aquí (una o ambas) cierre automático por edad o programar en-cron. Al menos una de estas opciones debe ser especificada si la configuración de cierre automático del horario existe.

  • finalizar-en-cierre-automático-configurar esta opción en verdadero comenzará a transmitir a un archivo de salida tan pronto como el bulk ha sido cerrado

Ejemplo de configuración para bulking a pain. 001. 001. 09 XML file.

ipf.bulker {
  configurations = [
    {
      name = "pain. 001"
      file-name-prefix = "bulk-"
      file-path = "/tmp/bulk-output"
      component-hierarchy {
        component-parser-name = "xml"
        marker = "Document"
        children = [
          {
            marker = "CstmrCdtTrfInitn. PmtInf"
            before-elements = ["SplmtryData"]
            children = [
              {
                marker = "CdtTrfTxInf"
              }
            ]
          },
          {
            marker = "CstmrCdtTrfInitn. SplmtryData"
          }
        ]
      }
      finalise-on-auto-close = true
    },
  ]
}

La configuración anterior producirá un único bulk archivo a la ubicación "/tmp/bulk-salida" con un prefijo de " bulk-" que consiste en componentes en el siguiente orden jerárquico Documento → CstmrCdtTrfInitn → PmtInf → CdtTrfTxInf. Cuando el abierto bulk está cerrado, el bulk ya no aceptará comandos para agregar nuevos o eliminar componentes existentes, pero permitirá la actualización de componentes existentes. La Finalización Bulk el comando será un desencadenante para un bulk productor para comenzar la transmisión bulk componentes a un archivo. Desde finalise-on-auto-close es cierto, si el bulk se cierra automáticamente, también se finalizará (se transmitirá a un archivo) al mismo tiempo, en el /tmp/bulk_output directorio.

Definiendo Input Adapter

Como se puede ver en la sección de dependencias anterior, hay un ipf-bulker-ingestion-connector-kafka dependencia que permite a la aplicación aceptar BulkIngestionMessage solicitudes sobre el BULK_INGESTION_REQUEST tema. Esto luego invoca un BulkIngestionReceiveClientAdapter, que es la implementación de la interfaz definida en nuestra aplicación de ejemplo. Contiene dos métodos,determineProcessingContext y handle.

  • determineProcessingContext acepta el BulkIngestionMessage como entrada, y puede opcionalmente mapear campos del mensaje en el ProcessingContext (que es una clase IPF especial que contiene identificadores únicos relacionados con el procesamiento)

  • handle el método también acepta el BulkIngestionMessage, que es donde realizamos el procesamiento real del mensaje. En este caso, estamos manejando los diferentes tipos de mensajes de solicitud posibles, por ejemplo, AddComponentMessage, FinaliseBulkMessage, etc., y luego invocando el relevante Bulker métodos para realizar la acción solicitada.

Creando un generador de componentes raíz

`ComponentGenerator`se utiliza para generar contenido para el componente raíz de un bulk sobre la creación automática. En la aplicación de ejemplo, hemos utilizado un generador codificado que genera contenido a partir de archivos de plantilla contenidos en el directorio de recursos.

Padre Bulk Proveedor de Referencia

Aplicable para recurrente bulks, el ParentBulkReferenceProvider se llama por el bulker para que el niño bulks puede estar asociado con el padre relevante bulk. Toma el BulkIdentifier y BulkSpecification desde el actual state y devuelve el bulk identificador del padre. En la aplicación de ejemplo, este método asocia los mensajes PmtInf a pain. 001 padre. Pero esto también podría utilizarse para agrupar hijo bulks por InstructingAgentBic, por ejemplo.

Docker Configuración para ipf-bulker-tutorial-app

  ipf-bulker-tutorial-app:
    image:ipf-bulker-tutorial-app:latest
    container_name: ipf-bulker-tutorial-app
    ports:
      - 8080:8080
      - 8559:8558
      - 5006:5005
      - 55002:55001
      - 9002:9001
    volumes:
      -./config/ipf-bulker-tutorial-app:/ipf-bulker-tutorial-app/conf
      -./bulk_files:/tmp/bulk_files
      -./bulk_output:/tmp/bulk_output
      -./logs:/ipf/logs
    environment:
      - IPF_JAVA_ARGS=-Dma.glasnost.orika.writeClassFiles=false -Dma.glasnost.orika.writeSourceFiles=false -Dconfig.override_with_env_vars=true
    depends_on:
      - ipf-mongo-kafka
    healthcheck:
      test: [ "CMD", "curl", "http://localhost:8080/actuator/health" ]

Como se muestra en el Docker setup arriba, el ipf-bulker-tutorial-app requiere MongoDB y Kafka para ejecutar.

Ejecutando la aplicación

Puede iniciar application.yml utilizando el siguiente comando desde el docker directorio:

docker-compose -f application.yml up -d

Probando la aplicación

Run the FullComposeLauncher(que utiliza el application.yml) bajo src/test/java para iniciar la aplicación. Una vez que la aplicación esté en funcionamiento, puede entonces utilizar el InProgressRunner para ejecutar el Bulking pain. 001 XML file Prueba BDD.

Además de la Bulking pain. 001 XML file prueba, también se han incluido otros escenarios para demostrar diferentes situaciones. Todas las pruebas pueden ser ejecutadas mediante la ejecución del FeatureTestRunner. El MavenRunner ejecuta el conjunto completo de pruebas durante el maven construir

Notará que el código/recursos bajo el directorio de pruebas son similares a lo que estaba presente en el Tutorial TEST1. Eso es porque estamos probando la aplicación utilizando the Test Framework from Icon

Para interactuar con la aplicación de ejemplo, simplemente envíe mensajes a la BULK_INGESTION_REQUEST tema. Esto puede ser una solicitud para crear un bulk por ejemplo.

Por conveniencia, podemos ejecutar el InProgressRunner y enviará mensajes a la BULK_INGESTION_REQUEST tema para nosotros, así como afirmar que la aplicación de ejemplo envía las respuestas como se espera.

El BDD que es ejecutado por el InProgressRunner se muestra a continuación

Meta:


Narrative:
As an IPF tester I want to test bulking of a pain. 001 XML file

Scenario: Bulking pain. 001 XML file

When the 'Client' sends a 'Create Bulk Message' with values: (1)
| configName | pain. 001 |
Then the 'Client' receives a 'Bulk Status Message' with values: (2)
| status | SUCCESS |
Then the 'Client' receives a 'Bulk Notification' with values: (3)
| name | BulkConfiguredNotification |
When the 'Client' sends a 'document' 'Add Component Message' (4)
Then the 'Client' receives a 'Bulk Status Message' with values:
| status | SUCCESS |
When the 'Client' sends a 'pmtInf' 'Add Component Message' with values: (5)
| parentId | #BULK_STATUS_RECEIVED_STACK[0].document.componentId |
Then the 'Client' receives a 'Bulk Status Message' with values:
| status | SUCCESS |
When the 'Client' sends a 'cdtTrfTxInf' 'Add Component Message' with values: (6)
| parentId | #BULK_STATUS_RECEIVED_STACK[0].document.componentId |
Then the 'Client' receives a 'Bulk Status Message' with values:
| status | SUCCESS |
When the 'Client' sends a 'cdtTrfTxInf' 'Add Component Message' with values: (7)
| parentId | #BULK_STATUS_RECEIVED_STACK[1].document.componentId |
Then the 'Client' receives a 'Bulk Status Message' with values:
| status | SUCCESS |
When the 'Client' sends a 'Close Bulk Message'   (8)
Then the 'Client' receives a 'Bulk Status Message' with values:
| status | SUCCESS |
Then the 'Client' receives a 'Bulk Notification' with values:
| name               | BulkClosedNotification |
| payload.autoClosed | false                  |
Then there are '4' components in the component store
When the 'Client' sends a 'Finalise Bulk Message' (9)
Then the 'Client' receives a 'Bulk Status Message' with values:
| status | SUCCESS |
Then the 'Client' receives a 'Bulk Notification' with values:
| name | BulkFinalisedNotification |
Then the bulk file is produced (10)
1 Se envía un mensaje a la BULK_INGESTION_REQUEST kafka tema para Crear un Bulk
2 Se envía un mensaje de estado al BULK_OPERATION_RESPONSE tema. Esto se utiliza para comunicar el resultado de la operación.
3 Se envía una notificación al BULK_NOTIFICATION tema. Si necesita devolver información adicional al estado, se debe utilizar una notificación.- en este caso esto devolvió bulkId es requerido para los siguientes comandos de Agregar Componente
4 Se añade un componente al bulk-todos los componentes deben corresponder a un marker en la configuración y este componente corresponde al "Documento" marker
5 Se añade un componente al bulk correspondiente a la "CstmrCdtTrfInitn. PmtInf" marker
6 Se añade un componente al bulk correspondiente al "CdtTrfTxInf" marker
7 Se añade un componente al bulk correspondiente a la "CdtTrfTxInf" marker
8 Se envía una solicitud para cerrar el bulk, es decir, el bulk no aceptará nuevos componentes
9 Se envía una solicitud para finalizar el bulk es decir, los componentes están ensamblados y transmitidos a una salida-en este caso el sistema de archivos local
10 Debe haber un archivo recién creado en el /tmp/bulk_output directorio en el ipf-tutorial-bulker-contenedor de aplicación. El archivo también es visible bajo el docker/bulk_archive directorio del proyecto de solución, según lo anterior docker configuración.

Los mensajes que se envían a la kafka tema esencialmente envuelve el bulk interfaz que proporciona funcionalidad para gestionar bulk/s.

Se encuentran disponibles detalles adicionales sobre esta interfaz.aquí

Muestra pain. 001 producido por la prueba BDD anterior

<Document xmlns="urn:iso:std:iso:20022:tech:xsd:pain. 001. 001. 09" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
    <CstmrCdtTrfInitn>
        <GrpHdr>
            <MsgId>LgALKHUdpnSgYlJtYJrM</MsgId>
            <CreDtTm>2023-10-20T12:31:32. 605801Z</CreDtTm>
            <NbOfTxs>5</NbOfTxs>
            <CtrlSum>5</CtrlSum>
            <InitgPty>
                <Id>
                    <PrvtId>
                        <Othr>
                            <Id>200001</Id>
                        </Othr>
                    </PrvtId>
                </Id>
            </InitgPty>
        </GrpHdr>        <PmtInf>
        <PmtInfId>STAFF EXPENSES 001</PmtInfId>
        <PmtMtd>TRF</PmtMtd>
        <NbOfTxs>3</NbOfTxs>
        <CtrlSum>5</CtrlSum>
        <ReqdExctnDt>2013-09-03</ReqdExctnDt>
        <Dbtr>
            <Nm>MR BLOGGS</Nm>
        </Dbtr>
        <DbtrAcct>
            <Id>
                <IBAN>IE75BOFI90377959996017</IBAN>
            </Id>
        </DbtrAcct>
        <DbtrAgt>
            <FinInstnId>
                <BIC>BOFIIE2DXXX</BIC>
            </FinInstnId>
        </DbtrAgt>
        <CdtTrfTxInf>
            <PmtId>
                <EndToEndId>5678090300031</EndToEndId>
            </PmtId>
            <Amt>
                <InstdAmt Ccy="EUR">16. 44</InstdAmt>
            </Amt>
            <CdtrAgt>
                <FinInstnId>
                    <BIC>BOFIIE2DXXX</BIC>
                </FinInstnId>
            </CdtrAgt>
            <Cdtr>
                <Nm>Scott, Tiger</Nm>
            </Cdtr>
            <CdtrAcct>
                <Id>
                    <IBAN>IE82BOFI90393929352659</IBAN>
                </Id>
            </CdtrAcct>
            <RmtInf>
                <Ustrd>POSTAGE EXPENSE</Ustrd>
            </RmtInf>
        </CdtTrfTxInf>            <CdtTrfTxInf>
        <PmtId>
            <EndToEndId>5678090300031</EndToEndId>
        </PmtId>
        <Amt>
            <InstdAmt Ccy="EUR">16. 44</InstdAmt>
        </Amt>
        <CdtrAgt>
            <FinInstnId>
                <BIC>BOFIIE2DXXX</BIC>
            </FinInstnId>
        </CdtrAgt>
        <Cdtr>
            <Nm>Scott, Tiger</Nm>
        </Cdtr>
        <CdtrAcct>
            <Id>
                <IBAN>IE82BOFI90393929352659</IBAN>
            </Id>
        </CdtrAcct>
        <RmtInf>
            <Ustrd>POSTAGE EXPENSE</Ustrd>
        </RmtInf>
    </CdtTrfTxInf>       </PmtInf>

    </CstmrCdtTrfInitn>
</Document>

Conclusiones

En esta sección nosotros:

  • Se explicaron los componentes clave del Tutorial de Ejemplo. Bulk Aplicación y mostró cómo el bulker se pueden ensamblar componentes para añadir bulker funcionalidad a cualquier aplicación

  • Partes de un archivo enviadas individualmente a la bulker para agregar a un único bulk

  • Cerrado el bulk y ensambló todas las partes del bulk

  • Finalizado el bulk, que transmite los contenidos a un único archivo de salida