Documentation for a newer release is available. View Latest

How to Consume IPF processing data

Step 1: Choose a transport

<dependency>
    <groupId>com.iconsolutions.ipf.core.processingdata</groupId>
    <artifactId>ipf-processing-data-ingress-kafka</artifactId>
</dependency>

or

<dependency>
    <groupId>com.iconsolutions.ipf.core.processingdata</groupId>
    <artifactId>ipf-processing-data-ingress-http</artifactId>
</dependency>

If you depend on both in your project, you will need to explicitly configure ipf.processing-data.ingress.transport as either kafka or http.

HTTP ingress is not intended to be used in production.

Step 2: Register a handler spring bean

Decide whether to handle incoming IPF Processing Data messages one at a time, or in a batch. If both the non-batching handler and the batching handler are present, the batching handler is used.

Handler Type Good for

BatchedIpfProcessingDataHandler

Applications that are designed to efficiently process a large batch of Data Envelopes simultaneously. For example, the IPF ODS Ingestion Service uses the batched handler, enabling it to leverage the performance improvements offered by bulk database inserts.

IpfProcessingDataHandler

Real time data processing with immediate feedback.
Isolating message processing, ensuring that a failure only affects the current Data Envelope.
Http Ingress applications.

Batched

Full batch support only works with the Kafka ingress. When using HTTP the batch size will always be 1.

Define a BatchedIpfProcessingDataHandler spring bean.

This handler will receive a list of envelopes where the size of the list is up to the batch size configured with ipf.processing-data.ingress.batch-size.

@Bean
BatchedIpfProcessingDataHandler myBatchedIpfProcessingDataHandler() {
    new BatchedIpfProcessingDataHandler() {
        @Override
        //Optionally implement a handle method to handle V1 data
        public CompletionStage<Void> handle(final List<DataEnvelope> envelopes) {
            //Do something
            return CompletableFuture.completedFuture(null);
        }

        @Override
        //Optionally implement a handleV2 method to handle V2 data
        public CompletionStage<Void> handleV2(final List<DataEnvelopeV2> envelopes) {
            //Do something
            return CompletableFuture.completedFuture(null);
        }
    };
}
You must explicitly choose which versions of the data model to handle, and you can support both. The handle and handleV2 methods are optional - you can omit either one, or even both. If the methods are not implemented, any data received is ignored.

You can configure the batch size and the time to wait for batch fullness with the following configuration…​

Property Default Description

ipf.processing-data.ingress.batch-size

500

The maximum number of envelopes to receive before invoking the batch handler

ipf.processing-data.ingress.batch-interval

10ms

The maximum time to wait for the batch size to be reached.

ipf.processing-data.ingress.parallelism

10

Limits the number of concurrent mapping operations executed on consumed batches of DataEnvelopes. See the Akka Streams documentation for further information.

The batch size property also directly drives the following three configuration keys:

  • ipf.processing-data.ingress.connector.receiver-parallelism

  • ipf.processing-data.ingress.connector.mapping-parallelism

  • ipf.processing-data.ingress.kafka.consumer.kafka-clients.max.poll.records

i.e. the defaults are…​

ipf.processing-data.ingress.connector.receiver-parallelism = ${ipf.processing-data.ingress.batch-size}
ipf.processing-data.ingress.connector.mapping-parallelism = ${ipf.processing-data.ingress.batch-size}
ipf.processing-data.ingress.kafka.consumer.kafka-clients.max.poll.records = ${ipf.processing-data.ingress.batch-size}

Tuning batch configuration

To maximise the performance of consuming in batches, some configuration tuning may be necessary during performance testing.

The ipf_processing_data_ingress_batch_receive_size summary metric records the size of each batch received by your BatchedIpfProcessingDataHandler spring bean. Ideally, your handler should receive batches that have a size close to the configured batch size, therefore minimising time spent waiting for the batch to be filled. Some potential scenarios are:

  • Your handler is consistently receiving batches equal to the configured batch size.

    • This might indicate that your application’s performance is limited by the configured Ingress parallelism. Increase ipf.processing-data.ingress.parallelism to allow for more concurrent mapping operations on batches of DataEnvelopes.

  • Your handler is consistently receiving batches with sizes much less than the configured batch-size.

    • This might indicate that your application’s performance is limited by the configured connector parallelism. Increase both ipf.processing-data.ingress.connector.receiver-parallelism and ipf.processing-data.ingress.connector.mapping-parallelism to allow for more concurrent mapping operations on messages received by your Ingress connector. See Receiving Connector Quickstart for details about Connector configuration.

Single

Define an IpfProcessingDataHandler spring bean.

This handler will receive a single envelope at a time, and works with both Kafka and HTTP ingress.

Define a spring bean, e.g.

@Bean
IpfProcessingDataHandler myIpfProcessingDataHandler() {
    new IpfProcessingDataHandler() {
        @Override
        //Optionally implement this method to handle V1 data
        public CompletionStage<Void> handle(final DataEnvelope envelope) {
            //Do something
            return CompletableFuture.completedFuture(null);
        }


        @Override
        //Optionally implement this method to handle V2 data
        public CompletionStage<Void> handle(final DataEnvelopeV2 envelope) {
            //Do something
            return CompletableFuture.completedFuture(null);
        }
    };
}
You must explicitly choose which versions of the data model to handle. The handle methods are optional, and if they’re not implemented, any data received is ignored.

Migrate to consume V2 Processing Data Model

This section is of particular interest if your IPF Processing Data consumer utilises custom logic to handle incoming messages, without using any of the pre-configured IPF Processing Data Ingress modules.

The payload created by IPF Processing Data egress contains the header schema-version, the value of which will define what version of the IPF Processing Data model is contained within.

Message Header Description

schema-version = 2

The inbound message utilises the V2 IPF Processing Data model

schema-version = 1

The inbound message utilises the V1 IPF Processing Data model

schema-version is not set

The inbound message utilises the V1 IPF Processing Data model. This will have originated from a previous version of IPF that only contained the V1 data model.

When updating a consumer of IPF Processing Data messages, you should add a check for the schema-value header to identify which data model version to utilise when handling the inbound message.

Receive Connector ReceiveTransportMessageConverter example
ReceiveTransportMessageConverter<T> receiveTransportMessageConverter() {
    return transportMessage -> {
        final var version = (String) transportMessage.getMessageHeaders().getHeader(SchemaVersion.HEADER_KEY).orElse(null);
        final var json = transportMessage.getPayload().toString();

        if (version == null || SchemaVersion.V1.equals(version)) {
            // Handle the V1 Data Model - DataEnvelope
            // ...
        }

        if (SchemaVersion.V2.equals(version)) {
            // Handle the V2 Data Model - DataEnvelopeV2
            // ...
        }

        throw new IllegalStateException("Unsupported IPF Processing Data version " + version);
    };
}
Http Controller example
@RestController
final class IpfProcessingDataIngressController {

    @PostMapping("/ipf-processing-data")
    @ResponseStatus(HttpStatus.ACCEPTED)
    Mono<Void> ingestV1ByDefault(@RequestBody final DataEnvelope envelope) {
        // Handle the V1 Data Model - DataEnvelope
        // ...
    }

    @PostMapping(path = "/ipf-processing-data", headers = SchemaVersion.HEADER_KEY + "=" + SchemaVersion.V1)
    @ResponseStatus(HttpStatus.ACCEPTED)
    Mono<Void> ingestV1(@RequestBody final DataEnvelope envelope) {
        // Handle the V1 Data Model - DataEnvelope
        // ...
    }

    @PostMapping(path = "/ipf-processing-data", headers = SchemaVersion.HEADER_KEY + "=" + SchemaVersion.V2)
    @ResponseStatus(HttpStatus.ACCEPTED)
    Mono<Void> ingestV2(@RequestBody final DataEnvelopeV2 envelope) {
        // Handle the V2 Data Model - DataEnvelopeV2
        // ...
    }
}

Schema differences

The two IPF Processing Data schemas are structurally similar. In most cases migration is straightforward and is simply a case of handling the data in the new POJOs. There is one small exception; Custom Objects have been streamlined for the V2 IPF Processing Data model. The V1 CustomObjectWrapper and CustomObjectType POJOs have been removed. The new CustomObjectContainer2 uses String fields of key and value as a replacement.

Custom objects are currently not exported by IPF Processing Data Egress.
Example of differences between Custom object schemas
CustomObjectContainer v1Container = CustomObjectContainer.builder()
        .object(CustomObjectWrapper.builder()
                .name("ClientCustomObject")
                .content("Custom object value")
                .build())
        .objectType(CustomObjectType.KEY_VALUE)
// Remaining fields are functionally identical
//        .primaryAssociation(...)
//        .uniqueId(...)
//        .createdAt(...)
//        .processObjectReference(..)
//        .processingContext(...)
        .build();

CustomObjectContainer2 v2Container = new CustomObjectContainer2()
        .key("ClientCustomObject")
        .value("Custom object value");
// Remaining fields are functionally identical
//        .primaryAssociation(...)
//        .uniqueId(...)
//        .createdAt(...)
//        .processObjectReference(..)
//        .processingContext(...);