Using Bulker

What is a Bulker?

A bulker is an application module responsible for bringing together individual transactions or components, and acts initially as a temporary storage area where the main IPF flow can store elements that will eventually end up in a structured file. When instructed, the Bulker will stream each item, in a preconfigured order, to a file at a predefined location.

Example Bulker Application

In order to explain how to implement bulker functionality in your application, we will be using an example application that is available in the solutions folder of ipf-tutorial under add-bulker

Reviewing the pom file of the add-bulker project there are two main dependencies that add bulking capability

<!--Responsible for bulking components-->
<dependency>
    <groupId>com.iconsolutions.ipf.bulk</groupId>
    <artifactId>ipf-bulker-aggregate-starter</artifactId>
</dependency>
<!--Responsible for creating output from the bulked components-->
<dependency>
    <groupId>com.iconsolutions.ipf.bulk</groupId>
    <artifactId>ipf-bulker-bulk-producer-starter</artifactId>
</dependency>
  • ipf-bulker-xml-component-parser - component which facilitates processing xml content to detect byte position where child components can be injected

  • scheduler-core - provides scheduling functionality which is used for auto closing bulks (if applicable). It’s also used to delay registration of child bulks to ensure that the parent bulk has been created prior. Bulker uses a persistent scheduler so scheduled requests are recovered in the event of a node failure. More details on the scheduler are available

here * scheduler-domain - contains domain objects supporting the scheduler-core module

Adding dependencies needed for the starter modules

In addition to the above two modules, the following also need to be added to your application pom in order to add bulker functionality to an application

<!-- ipf-component-store implementation which uses mongodb to store components. This store is read from when assembling and streaming output to a file -->
<dependency>
    <groupId>com.iconsolutions.ipf.componentstore</groupId>
    <artifactId>ipf-component-store-mongo</artifactId>
</dependency>
<!-- module provides a http rest interface which allows querying of the component store-->
<dependency>
    <groupId>com.iconsolutions.ipf.componentstore</groupId>
    <artifactId>ipf-component-store-service</artifactId>
</dependency>
<!-- Kafka receive connector implementation of BulkIngestionReceiveConnector, which is responsible for receiving various BulkIngestionMessages (commands) e.g. CreateBulkMessage, AddComponentMessage -->
<dependency>
    <groupId>com.iconsolutions.ipf.bulk</groupId>
    <artifactId>ipf-bulker-ingestion-connector-kafka</artifactId>
</dependency>
<!-- Kafka implementation of ipf-bulker-status-sender-connector which allows you to send status messages upon completion of execution of received commands to a kafka topic -->
<dependency>
    <groupId>com.iconsolutions.ipf.bulk</groupId>
    <artifactId>ipf-bulker-status-sender-connector-kafka</artifactId>
</dependency>
<!-- Local implementation of ipf-bulker-output-stream which streams the bulker output to the local file system -->
<dependency>
    <groupId>com.iconsolutions.ipf.bulk</groupId>
    <artifactId>ipf-bulker-output-stream-local</artifactId>
</dependency>
<!-- Kafka implementation of ipf-bulker-notifications-connector which is used by ProjectionHandlers in ipf-bulker-bulk-producer to send notifications on specific
events occurring e.g. BulkAutoClosed-->
<dependency>
    <groupId>com.iconsolutions.ipf.bulk</groupId>
    <artifactId>ipf-bulker-notifications-connector-kafka</artifactId>
</dependency>
<!-- The bulker module is event sourced and this enables using mongo for journal storage -->
<dependency>
    <groupId>com.iconsolutions.ipf.core.platform</groupId>
    <artifactId>ipf-write-starter-mongo</artifactId>
</dependency>
<!-- Allows for the processing of journal events to create read side projections/send notifications -->
<dependency>
    <groupId>com.iconsolutions.ipf.core.platform</groupId>
    <artifactId>ipf-journal-processor-starter-mongo</artifactId>
</dependency>
<!-- Logs messages exchanges to/from the application to mongo -->
<dependency>
    <groupId>com.iconsolutions.ipf.core.messagelogger</groupId>
    <artifactId>message-logger-mongo</artifactId>
</dependency>

Configuring Bulker in your application

In order to assemble the stored components into a bulked file, your application needs to provide configuration such as the structure of the file (component-hierarchy), markers in the document that indicate where components are in the stream and the name of the given configuration. This can be done by via ipf.bulker.configurations property. It expects an array of configuration objects, each one containing:

  • name (string) - used to uniquely identify the configuration. Requests made to add components need to specify a matching name so the bulker knows how to handle the request.

  • file-name-prefix (string) - prefix for generated bulk files

  • component-hierarchy (object) - tree structure representing the hierarchy of the components which will be extracted from the bulk file. Each node can have configured child nodes which will be extracted as separate components. The content of the child components will be omitted from the parent component.

  • before-elements - list of sibling elements which the child component must come before in the output bulk

  • auto-close-triggers is a list of strings, each entry should match the result of the getName() method of the AutoCloseTrigger which should be applied to this configuration

  • schedule-auto-close - two options can be specified here (one or both) auto-close-by-age or schedule-at-cron. At least one of these options should be specified if the schedule-auto-close configuration exists

  • finalise-on-auto-close - setting this option to true will begin streaming to an output file as soon as the bulk has been closed

Configuration example for bulking a pain.001.001.09 XML file.

ipf.bulker {
  configurations = [
    {
      name = "pain.001"
      file-name-prefix = "bulk-"
      file-path = "/tmp/bulk-output"
      component-hierarchy {
        component-parser-name = "xml"
        marker = "Document"
        children = [
          {
            marker = "CstmrCdtTrfInitn.PmtInf"
            before-elements = ["SplmtryData"]
            children = [
              {
                marker = "CdtTrfTxInf"
              }
            ]
          },
          {
            marker = "CstmrCdtTrfInitn.SplmtryData"
          }
        ]
      }
      finalise-on-auto-close = true
    },
  ]
}

The above configuration will produce a single bulk file to location "/tmp/bulk-output" with a prefix of "bulk-" that consists of components in the following hierarchical order Document → CstmrCdtTrfInitn → PmtInf → CdtTrfTxInf. When the open bulk is closed, the bulk will no longer accept commands to add new or remove existing components, but it will allow updating of existing components. The Finalise Bulk command will be a trigger for a bulk producer to start streaming bulk components to a file. Since finalise-on-auto-close is true, if the bulk is automatically closed, it will also be finalised (streamed to a file) at the same time, to the /tmp/bulk_output directory.

Defining Input Adapter

As can be seen in the previous dependencies section, there is an ipf-bulker-ingestion-connector-kafka dependency which allows the application to accept BulkIngestionMessage requests on the BULK_INGESTION_REQUEST topic. This then invokes a BulkIngestionReceiveClientAdapter, which is the implementation of the interface defined in our example application. It contains two methods, determineProcessingContext and handle.

  • determineProcessingContext accepts the BulkIngestionMessage as input, and you can optionally map fields from the message into the ProcessingContext (which is a special IPF class which contains unique identifiers relating to processing)

  • handle method also accepts the BulkIngestionMessage, which is where we perform the actual processing of the message. In this case, we are handling the various different types of possible request messages e.g. AddComponentMessage, FinaliseBulkMessage etc. and then invoking the relevant Bulker methods to perform the requested action.

Creating a root component generator

ComponentGenerator is used to generate content for the root component of a bulk on auto creation. In the example application we have used a hardcoded generator which generates content from template files contained in the resources directory.

Parent Bulk Reference Provider

Applicable for recurring bulks, the ParentBulkReferenceProvider is called by the bulker so that child bulks can be associated with the relevant parent bulk. It takes the BulkIdentifier and BulkSpecification from the current state and returns the bulk identifier of the parent. In the example application, this method associates the PmtInf messages to pain.001 parent. But this could also be used to group child bulks by InstructingAgentBic for example.

Docker Setup for ipf-bulker-tutorial-app

  ipf-bulker-tutorial-app:
    image: ipf-bulker-tutorial-app:latest
    container_name: ipf-bulker-tutorial-app
    ports:
      - 8080:8080
      - 8559:8558
      - 5006:5005
      - 55002:55001
      - 9002:9001
    volumes:
      - ./config/ipf-bulker-tutorial-app:/ipf-bulker-tutorial-app/conf
      - ./bulk_files:/tmp/bulk_files
      - ./bulk_output:/tmp/bulk_output
      - ./logs:/ipf/logs
    environment:
      - IPF_JAVA_ARGS=-Dma.glasnost.orika.writeClassFiles=false -Dma.glasnost.orika.writeSourceFiles=false -Dconfig.override_with_env_vars=true
    depends_on:
      - ipf-mongo
      - kafka
    healthcheck:
      test: [ "CMD", "curl", "http://localhost:8080/actuator/health" ]

As shown in the Docker setup above, the ipf-bulker-tutorial-app requires MongoDB and Kafka to run.

Running the application

You can start application.yml using the below command from the docker directory:

docker-compose -f application.yml up -d

Testing the application

Run the FullComposeLauncher (which uses the application.yml) under src/test/java to start the application. Once the application is running you can then use the InProgressRunner to execute the Bulking pain.001 XML file BDD test.

In addition to the Bulking pain.001 XML file test, other scenarios have also been included to demonstrate different scenarios. All the tests can be executed by running the FeatureTestRunner. The MavenRunner runs the full suite of tests during the maven build

You’ll notice that the code/resources under the test directory are similar to what was present in the TEST1 Tutorial. That’s because we are testing the application using Icon’s Test Framework

In order to interact with the example application, you simply send messages to the BULK_INGESTION_REQUEST topic. This can be a request to create a bulk for example.

For convenience, we can run the InProgressRunner and it will send messages to the BULK_INGESTION_REQUEST topic for us, as well as assert that the example application sends the responses as expected.

The BDD that is executed by the InProgressRunner is shown below

Meta:


Narrative:
As an IPF tester I want to test bulking of a pain.001 XML file

Scenario: Bulking pain.001 XML file

When the 'Client' sends a 'Create Bulk Message' with values: (1)
| configName | pain.001 |
Then the 'Client' receives a 'Bulk Status Message' with values: (2)
| status | SUCCESS |
Then the 'Client' receives a 'Bulk Notification' with values: (3)
| name | BulkConfiguredNotification |
When the 'Client' sends a 'document' 'Add Component Message' (4)
Then the 'Client' receives a 'Bulk Status Message' with values:
| status | SUCCESS |
When the 'Client' sends a 'pmtInf' 'Add Component Message' with values: (5)
| parentId | #BULK_STATUS_RECEIVED_STACK[0].document.componentId |
Then the 'Client' receives a 'Bulk Status Message' with values:
| status | SUCCESS |
When the 'Client' sends a 'cdtTrfTxInf' 'Add Component Message' with values: (6)
| parentId | #BULK_STATUS_RECEIVED_STACK[0].document.componentId |
Then the 'Client' receives a 'Bulk Status Message' with values:
| status | SUCCESS |
When the 'Client' sends a 'cdtTrfTxInf' 'Add Component Message' with values: (7)
| parentId | #BULK_STATUS_RECEIVED_STACK[1].document.componentId |
Then the 'Client' receives a 'Bulk Status Message' with values:
| status | SUCCESS |
When the 'Client' sends a 'Close Bulk Message'   (8)
Then the 'Client' receives a 'Bulk Status Message' with values:
| status | SUCCESS |
Then the 'Client' receives a 'Bulk Notification' with values:
| name               | BulkClosedNotification |
| payload.autoClosed | false                  |
Then there are '4' components in the component store
When the 'Client' sends a 'Finalise Bulk Message' (9)
Then the 'Client' receives a 'Bulk Status Message' with values:
| status | SUCCESS |
Then the 'Client' receives a 'Bulk Notification' with values:
| name | BulkFinalisedNotification |
Then the bulk file is produced (10)
1 A message is sent to the BULK_INGESTION_REQUEST kafka topic to Create a Bulk
2 A status message is sent to the BULK_OPERATION_RESPONSE topic. This is used to communicate the result of the operation
3 A notification is sent to the BULK_NOTIFICATION topic. If you need to return additional information to the status, a notification should be used - in this case this returned bulkId is required for the subsequent Add Component commands
4 A component is added to the bulk - all components need to correspond to a marker in the configuration and this component corresponds to the "Document" marker
5 A component is added to the bulk corresponding to the "CstmrCdtTrfInitn.PmtInf" marker
6 A component is added to the bulk corresponding to the "CdtTrfTxInf" marker
7 A component is added to the bulk corresponding to the "CdtTrfTxInf" marker
8 A request is sent to close the bulk, i.e. the bulk will no longer accept new components
9 A request is sent to finalise the bulk, i.e. the components are assembled and streamed to an output - in this case the local file system
10 There should be a newly created file in the /tmp/bulk_output directory in the ipf-tutorial-bulker-application container. The file is also viewable under the docker/bulk_archive directory of the solution project, as per the previous docker configuration.

The messages being sent to the kafka topic essentially wrap the bulk interface which provides functionality for managing bulk/s.

Additional details on this interface are available here

Sample pain.001 produced by the above BDD test

<Document xmlns="urn:iso:std:iso:20022:tech:xsd:pain.001.001.09" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
    <CstmrCdtTrfInitn>
        <GrpHdr>
            <MsgId>LgALKHUdpnSgYlJtYJrM</MsgId>
            <CreDtTm>2023-10-20T12:31:32.605801Z</CreDtTm>
            <NbOfTxs>5</NbOfTxs>
            <CtrlSum>5</CtrlSum>
            <InitgPty>
                <Id>
                    <PrvtId>
                        <Othr>
                            <Id>200001</Id>
                        </Othr>
                    </PrvtId>
                </Id>
            </InitgPty>
        </GrpHdr>        <PmtInf>
        <PmtInfId>STAFF EXPENSES 001</PmtInfId>
        <PmtMtd>TRF</PmtMtd>
        <NbOfTxs>3</NbOfTxs>
        <CtrlSum>5</CtrlSum>
        <ReqdExctnDt>2013-09-03</ReqdExctnDt>
        <Dbtr>
            <Nm>MR BLOGGS</Nm>
        </Dbtr>
        <DbtrAcct>
            <Id>
                <IBAN>IE75BOFI90377959996017</IBAN>
            </Id>
        </DbtrAcct>
        <DbtrAgt>
            <FinInstnId>
                <BIC>BOFIIE2DXXX</BIC>
            </FinInstnId>
        </DbtrAgt>
        <CdtTrfTxInf>
            <PmtId>
                <EndToEndId>5678090300031</EndToEndId>
            </PmtId>
            <Amt>
                <InstdAmt Ccy="EUR">16.44</InstdAmt>
            </Amt>
            <CdtrAgt>
                <FinInstnId>
                    <BIC>BOFIIE2DXXX</BIC>
                </FinInstnId>
            </CdtrAgt>
            <Cdtr>
                <Nm>Scott, Tiger</Nm>
            </Cdtr>
            <CdtrAcct>
                <Id>
                    <IBAN>IE82BOFI90393929352659</IBAN>
                </Id>
            </CdtrAcct>
            <RmtInf>
                <Ustrd>POSTAGE EXPENSE</Ustrd>
            </RmtInf>
        </CdtTrfTxInf>            <CdtTrfTxInf>
        <PmtId>
            <EndToEndId>5678090300031</EndToEndId>
        </PmtId>
        <Amt>
            <InstdAmt Ccy="EUR">16.44</InstdAmt>
        </Amt>
        <CdtrAgt>
            <FinInstnId>
                <BIC>BOFIIE2DXXX</BIC>
            </FinInstnId>
        </CdtrAgt>
        <Cdtr>
            <Nm>Scott, Tiger</Nm>
        </Cdtr>
        <CdtrAcct>
            <Id>
                <IBAN>IE82BOFI90393929352659</IBAN>
            </Id>
        </CdtrAcct>
        <RmtInf>
            <Ustrd>POSTAGE EXPENSE</Ustrd>
        </RmtInf>
    </CdtTrfTxInf>       </PmtInf>

    </CstmrCdtTrfInitn>
</Document>

Conclusions

In this section we:

  • Explained the key components of the Example Tutorial Bulk Application and showed how the bulker components can be assembled to add bulker functionality to any application

  • Submitted parts of a file individually to the bulker for adding to a single bulk

  • Closed the bulk and assembled all the parts of the bulk

  • Finalised the bulk, which streams the contents to a single output file