Documentation for a newer release is available. View Latest
Esta página no está disponible actualmente en Español. Si lo necesita, póngase en contacto con el servicio de asistencia de Icon (correo electrónico)

Using Debulker

What is a Debulker?

A Debulker is a application module responsible for splitting large files into components which will be used for processing. Debulker expects to receive a notification about the file that needs to be debulked, together with the configuration name that is going to be used to split the file into components.

Debulker in your application

First step is to add the debulker starter maven modules to your IPF application pom:

<!--Responsible for debulking-->
<dependency>
  <groupId>com.iconsolutions.ipf.debulk</groupId>
  <artifactId>ipf-debulker-starter</artifactId>
</dependency>
<!--Responsible for cleaning up after bulk components are processed by interested party-->
<dependency>
  <groupId>com.iconsolutions.ipf.debulk</groupId>
  <artifactId>ipf-debulker-housekeeping-starter</artifactId>
</dependency>

This modules depends on some additional modules:

  • component-store - responsible for storing components produced by debulker.

  • ipf-debulker-new-file-notification - responsible for consuming the file notification which should trigger debulking of the file.

  • ipf-debulker-archiver - responsible for archiving the bulk file after it was successfully processed.

  • ipf-debulker-client-processing - notifies an external system that debulking is finished and produced components can be processed. This also consumes notification of successful processing of those components by the external system, so that it can start performing housekeeping.

Adding dependencies needed for the starter modules

<!-- ipf-component-store implementation which uses mongodb to store and read components -->
<dependency>
    <groupId>com.iconsolutions.ipf.componentstore</groupId>
    <artifactId>ipf-component-store-mongo</artifactId>
</dependency>
<!-- Kafka receive connector implementation of ipf-debulker-new-file-notification which consumes FileNotification message which tells debulker to process bulk file -->
<dependency>
    <groupId>com.iconsolutions.ipf.debulk</groupId>
    <artifactId>ipf-debulker-new-file-notification-connector-kafka</artifactId>
</dependency>
<!-- ipf-debulker-archiver implementation which archives processed bulk file to local file system -->
<dependency>
    <groupId>com.iconsolutions.ipf.debulk</groupId>
    <artifactId>ipf-debulker-archiver-local</artifactId>
</dependency>
<!-- Kafka connector implementation of ipf-debulker-client-processing -->
<dependency>
    <groupId>com.iconsolutions.ipf.debulk</groupId>
    <artifactId>ipf-debulker-client-processing-connector-kafka</artifactId>
</dependency>

Configuring Debulker in your application

In order to split the file of specific type (xml, json…​) and structure, configuration for splitting needs to be provided. This can be done by via ipf.debulker.configurations property. It expects an array of configuration objects, each one containing:

  • name (string) - used to uniquely identify the configuration. File notification will contain the configuration name which will be used to debulk the file.

  • splitter (string) - splitter type which will be used for extracting the components. Currently there are xml and json.

  • component-hierarchy (object) - tree structure representing the hierarchy of the components which will be extracted from the bulk file. Each node can have configured child nodes which will be extracted as separate components. The content of the child components will be ommited from the parent component.

Configuration example for debulking pain.001.001.09 XML file.

ipf.debulker {
  configurations = [
    {
      name = "pain.001.001.09"
      splitter = "xml"
      archive-path = "/tmp/bulk_archive"
      processing-entity = "TEST-ENTITY"
      component-hierarchy {
        marker = "Document"
        children = [
          {
            marker = "CstmrCdtTrfInitn.PmtInf"
            children = [
              {
                marker = "CdtTrfTxInf"
              }
            ]
          }
        ]
      }
    }
  ]
}

This configuration tells us that the pain.001 XML file will be debulked into single Document component, which contains all child elements except CstmrCdtTrfInitn.PmtInf elements, which will be extracted as separate components. Each child PmtInf component will contain all child elements except CdtTrfTxInf elements, which will be extracted as separate child components of each PmtInf. In case of a pain.001 XML file which has 3 PmtInf elements, each one containing 3 CdtTrfTxInf elements, debulker will produce 12 components:

  • 1 Document component

  • 3 PmtInf components

  • 9 CdtTrfTxInf components

Configuring Archiver

Since we are using ipf-debulker-archiver-local for archiving the files will be copied to the location "/tmp/bulk_archive" as configured above.

Docker Setup for ipf-debulker-tutorial-app

  ipf-debulker-tutorial-app:
    image: ipf-debulker-tutorial-app:latest
    container_name: ipf-debulker-tutorial-app
    ports:
      - 8080:8080
      - 8559:8558
      - 5006:5005
      - 55002:55001
      - 9002:9001
    volumes:
      - ./config/ipf-debulker-tutorial-app:/ipf-debulker-tutorial-app/conf
      - ./logs:/ipf/logs
    environment:
      - IPF_JAVA_ARGS=-Dma.glasnost.orika.writeClassFiles=false -Dma.glasnost.orika.writeSourceFiles=false -Dconfig.override_with_env_vars=true
    depends_on:
      - ipf-mongo
      - kafka
    healthcheck:
      test: [ "CMD", "curl", "http://localhost:8080/actuator/health" ]

In order to work ipf-debulker-tutorial-app requires MongoDB and Kafka.

Running the application

You can start application.yml using next command:

docker-compose -f application.yml up -d

Testing the application

Now the application is started we can test it, this is done by:

  1. Providing a source data file in the expected location.

  2. Sending a FileNotification to the ipf-debulk, via Kafka, to notify the application a file is ready for processing.

  3. Validating that the file is debulked.

  4. Sending a ComponentProcessingCompleteCommand, via Kafka, to trigger housekeeping which will delete components from component store and remove the bulk file.

  5. Validating that the housekeeping is performed.

Step 1 - Creation of pain.001 file which will be debulked

Since we have configuration for debulking pain.001 XML file, we will use that one for testing.

Pain.001 file sample.

<Document xmlns="urn:iso:std:iso:20022:tech:xsd:pain.001.001.09">
    <CstmrCdtTrfInitn>
        <GrpHdr>
            <MsgId>abc</MsgId>
        </GrpHdr>
        <PmtInf>
            <PmtInfId>1</PmtInfId>
            <NbOfTxs>2</NbOfTxs>
            <CdtTrfTxInf>
                <PmtId>
                    <EndToEndId>1</EndToEndId>
                </PmtId>
                <Amt>
                    <InstdAmt Ccy="GBP">500.00</InstdAmt>
                </Amt>
            </CdtTrfTxInf>
            <CdtTrfTxInf>
                <PmtId>
                    <EndToEndId>2</EndToEndId>
                </PmtId>
            </CdtTrfTxInf>
        </PmtInf>
        <PmtInf>
            <PmtInfId>2</PmtInfId>
            <NbOfTxs>2</NbOfTxs>
            <CdtTrfTxInf>
                <PmtId>
                    <EndToEndId>3</EndToEndId>
                </PmtId>
            </CdtTrfTxInf>
            <CdtTrfTxInf>
                <PmtId>
                    <EndToEndId>4</EndToEndId>
                </PmtId>
            </CdtTrfTxInf>
        </PmtInf>
        <SplmtryData>
            <Envlp/>
        </SplmtryData>
    </CstmrCdtTrfInitn>
</Document>

File like this is already created and it is located in the solutions/add-debulker/docker/bulk_files/ directory.

Step 2 - Sending FileNotification to Kafka

We have configured the application to take FileNotifications from Kafka, that notification has several properties which must be provided:

  • configName - name of the configuration which will be used by debulker to debulk the bulk file into components.

  • bulkId - to correlate produced components by debulker.

  • fileProvider - name of the provider which will be used to retreive the bulk file for processing.

  • filePath - path to the file.

FileNotification message to send:

{
  "configName": "pain.001.001.09",
  "bulkId": "pain.001.12345",
  "fileProvider": "local",
  "filePath": "/tmp/bulk_files",
  "fileName": "pain_001_test.xml"
}

we can push the fileNotification to Kafka using Kafka console producer:

./kafka-console-producer.sh --topic FILE_NOTIFICATION_REQUEST --bootstrap-server localhost:9092

Message we are sending should be in one line:

{"configName": "pain.001.001.09", "bulkId": "pain.001.12345", "fileProvider": "local", "filePath": "/tmp/bulk_files/", "fileName": "pain_001_test.xml"}

Step 3 Validating that the file is debulked

At this point the debulker should have received the notification, accessed the file and debulked it. There are a couple of things we should check:

  • pain_001_test.xml file should exist in bulk_archive directory

  • components should be present in the component store

  • InitiateComponentProcessingCommand (notifies interested party that bulk components are ready for processing) is sent to kafka. Default topic is CLIENT_PROCESSING_REQUEST.

Easiest way to check components are via component store REST API. You will need to add ipf-component-store-service maven dependency:

<dependency>
    <groupId>com.iconsolutions.ipf.componentstore</groupId>
    <artifactId>ipf-component-store-service</artifactId>
</dependency>

We should run next command from command line to get all components related to our bulk:

curl http://localhost:8080/v1/components/pain.001.12345 | json_pp

This should be the expected output for debulked pain_001_test.xml file (component number and their content should be the same):

[
   {
      "bulkId" : {
         "value" : "pain.001.12345"
      },
      "content" : "<CdtTrfTxInf><PmtId><EndToEndId>1</EndToEndId></PmtId><Amt><InstdAmt Ccy=\"GBP\">500.00</InstdAmt></Amt></CdtTrfTxInf>",
      "creationTime" : "2023-02-27T14:15:52.788Z",
      "custom" : null,
      "id" : {
         "value" : "a78d25e5-3625-4fe6-86dd-b220b92d9e14"
      },
      "index" : 2,
      "marker" : "Document.CstmrCdtTrfInitn.PmtInf.CdtTrfTxInf",
      "parentId" : {
         "value" : "bd8f8a58-e9d8-4e16-805e-5b7b1579584c"
      }
   },
   {
      "bulkId" : {
         "value" : "pain.001.12345"
      },
      "content" : "<CdtTrfTxInf><PmtId><EndToEndId>2</EndToEndId></PmtId></CdtTrfTxInf>",
      "creationTime" : "2023-02-27T14:15:52.845Z",
      "custom" : null,
      "id" : {
         "value" : "cc1a2d7c-4f94-4c94-a755-242c99881162"
      },
      "index" : 3,
      "marker" : "Document.CstmrCdtTrfInitn.PmtInf.CdtTrfTxInf",
      "parentId" : {
         "value" : "bd8f8a58-e9d8-4e16-805e-5b7b1579584c"
      }
   },
   {
      "bulkId" : {
         "value" : "pain.001.12345"
      },
      "content" : "<PmtInf><PmtInfId>1</PmtInfId><NbOfTxs>2</NbOfTxs></PmtInf>",
      "creationTime" : "2023-02-27T14:15:52.851Z",
      "custom" : null,
      "id" : {
         "value" : "bd8f8a58-e9d8-4e16-805e-5b7b1579584c"
      },
      "index" : 1,
      "marker" : "Document.CstmrCdtTrfInitn.PmtInf",
      "parentId" : {
         "value" : "c35a269e-4e58-4b16-83b5-32c2bb5000f4"
      }
   },
   {
      "bulkId" : {
         "value" : "pain.001.12345"
      },
      "content" : "<CdtTrfTxInf><PmtId><EndToEndId>3</EndToEndId></PmtId></CdtTrfTxInf>",
      "creationTime" : "2023-02-27T14:15:52.857Z",
      "custom" : null,
      "id" : {
         "value" : "ab312e01-9385-4846-93c4-75bdd0d94c66"
      },
      "index" : 5,
      "marker" : "Document.CstmrCdtTrfInitn.PmtInf.CdtTrfTxInf",
      "parentId" : {
         "value" : "f9086dbb-5f8e-43ee-bf83-5ee92f0255bb"
      }
   },
   {
      "bulkId" : {
         "value" : "pain.001.12345"
      },
      "content" : "<CdtTrfTxInf><PmtId><EndToEndId>4</EndToEndId></PmtId></CdtTrfTxInf>",
      "creationTime" : "2023-02-27T14:15:52.863Z",
      "custom" : null,
      "id" : {
         "value" : "fd9a777d-e012-44a5-8560-903cdafe65f6"
      },
      "index" : 6,
      "marker" : "Document.CstmrCdtTrfInitn.PmtInf.CdtTrfTxInf",
      "parentId" : {
         "value" : "f9086dbb-5f8e-43ee-bf83-5ee92f0255bb"
      }
   },
   {
      "bulkId" : {
         "value" : "pain.001.12345"
      },
      "content" : "<PmtInf><PmtInfId>2</PmtInfId><NbOfTxs>2</NbOfTxs></PmtInf>",
      "creationTime" : "2023-02-27T14:15:52.868Z",
      "custom" : null,
      "id" : {
         "value" : "f9086dbb-5f8e-43ee-bf83-5ee92f0255bb"
      },
      "index" : 4,
      "marker" : "Document.CstmrCdtTrfInitn.PmtInf",
      "parentId" : {
         "value" : "c35a269e-4e58-4b16-83b5-32c2bb5000f4"
      }
   },
   {
      "bulkId" : {
         "value" : "pain.001.12345"
      },
      "content" : "<Document xmlns=\"urn:iso:std:iso:20022:tech:xsd:pain.001.001.09\"><CstmrCdtTrfInitn><GrpHdr><MsgId>abc</MsgId></GrpHdr><SplmtryData><Envlp></Envlp></SplmtryData></CstmrCdtTrfInitn></Document>",
      "creationTime" : "2023-02-27T14:15:52.873Z",
      "custom" : null,
      "id" : {
         "value" : "c35a269e-4e58-4b16-83b5-32c2bb5000f4"
      },
      "index" : 0,
      "marker" : "Document",
      "parentId" : null
   }
]

Checking whether InitiateComponentProcessingCommand is sent to kafka by starting console consumer(this is the command which is sent to the client application/flow to inform it a bulk has been received, debulked and the components are ready for processing):

./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic CLIENT_PROCESSING_REQUEST --from-beginning

There should be a single message which should look like this:

{"bulkId":"pain.001.12345"}

Step 4: Triggering debulker housekeeping

Next step would be to send the ComponentProcessingCompleteCommand to notify debulker that components are processed and debulker can perform housekeeping (delete the file, remove components from component store). Default topic for sending this message is CLIENT_PROCESSING_RESPONSE

Starting kafka console producer:

./kafka-console-producer.sh --topic CLIENT_PROCESSING_RESPONSE --bootstrap-server localhost:9092

Sending the message:

{"bulkId": "pain.001.12345"}

Step 5: Validating that housekeeping is performed

There are a couple of things we should check:

  • pain_001_test.xml file should be removed from bulk_files directory

  • all components related to bulkId="pain.001.12345" should be removed from component store

Conclusions

In this section we:

  • Successfully configured debulker in our IPF application

  • Processed a pain.001 XML file and validated that components are produced and file is archived.

  • Triggered housekeeping of bulk by deleting a bulk file and deleting components from component store.