How to Consume IPF processing data
Step 1: Choose a transport
Firstly you need to choose whether to use kafka or http as your transport protocol. If Kafka then add the dependency:
<dependency>
<groupId>com.iconsolutions.ipf.core.processingdata</groupId>
<artifactId>ipf-processing-data-ingress-kafka</artifactId>
</dependency>
or if http add this dependency:
<dependency>
<groupId>com.iconsolutions.ipf.core.processingdata</groupId>
<artifactId>ipf-processing-data-ingress-http</artifactId>
</dependency>
If you depend on both in your project, you will need to explicitly configure ipf.processing-data.ingress.transport as either kafka or http.
| HTTP ingress is not intended to be used in production. |
Step 2: Register a handler spring bean
Decide whether to handle incoming IPF Processing Data messages one at a time, or in a batch. If both the non-batching handler and the batching handler are present, the batching handler is used.
| Handler Type | Good for |
|---|---|
BatchedIpfProcessingDataHandler |
Applications that are designed to efficiently process a large batch of Data Envelopes simultaneously. For example, the IPF ODS Ingestion Service uses the batched handler, enabling it to leverage the performance improvements offered by bulk database inserts. |
IpfProcessingDataHandler |
Real time data processing with immediate feedback. |
Batched
| Full batch support only works with the Kafka ingress. When using HTTP the batch size will always be 1. |
Define a BatchedIpfProcessingDataHandler spring bean.
This handler will receive a list of envelopes where the size of the list is up to the batch size configured with ipf.processing-data.ingress.batch-size.
@Bean
BatchedIpfProcessingDataHandler myBatchedIpfProcessingDataHandler() {
new BatchedIpfProcessingDataHandler() {
@Override
//Optionally implement a handle method to handle V1 data
public CompletionStage<Void> handle(final List<DataEnvelope> envelopes) {
//Do something
return CompletableFuture.completedFuture(null);
}
@Override
//Optionally implement a handleV2 method to handle V2 data
public CompletionStage<Void> handleV2(final List<DataEnvelopeV2> envelopes) {
//Do something
return CompletableFuture.completedFuture(null);
}
};
}
You must explicitly choose which versions of the data model to handle, and you can support both. The handle and handleV2 methods are optional - you can omit either one, or even both. If the methods are not implemented, any data received is ignored.
|
You can configure the batch size and the time to wait for batch fullness with the following configuration…
| Property | Default | Description |
|---|---|---|
|
|
The maximum number of envelopes to receive before invoking the batch handler |
|
|
The maximum time to wait for the batch size to be reached. |
|
|
Limits the number of concurrent mapping operations executed on consumed batches of DataEnvelopes. See the Akka Streams documentation for further information. |
The batch size property also directly drives the following three configuration keys:
-
ipf.processing-data.ingress.connector.receiver-parallelism -
ipf.processing-data.ingress.connector.mapping-parallelism -
ipf.processing-data.ingress.kafka.consumer.kafka-clients.max.poll.records
i.e. the defaults are…
ipf.processing-data.ingress.connector.receiver-parallelism = ${ipf.processing-data.ingress.batch-size}
ipf.processing-data.ingress.connector.mapping-parallelism = ${ipf.processing-data.ingress.batch-size}
ipf.processing-data.ingress.kafka.consumer.kafka-clients.max.poll.records = ${ipf.processing-data.ingress.batch-size}
Tuning batch configuration
To maximise the performance of consuming in batches, some configuration tuning may be necessary during performance testing.
The ipf_processing_data_ingress_batch_receive_size summary metric records the size of each batch received by your BatchedIpfProcessingDataHandler spring bean. Ideally, your handler should receive batches that have a size close to the configured batch size, therefore minimising time spent waiting for the batch to be filled. Some potential scenarios are:
-
Your handler is consistently receiving batches equal to the configured batch size.
-
This might indicate that your application’s performance is limited by the configured Ingress parallelism. Increase
ipf.processing-data.ingress.parallelismto allow for more concurrent mapping operations on batches of DataEnvelopes.
-
-
Your handler is consistently receiving batches with sizes much less than the configured batch-size.
-
This might indicate that your application’s performance is limited by the configured connector parallelism. Increase both
ipf.processing-data.ingress.connector.receiver-parallelismandipf.processing-data.ingress.connector.mapping-parallelismto allow for more concurrent mapping operations on messages received by your Ingress connector. See Receiving Connector Quickstart for details about Connector configuration.
-
Single
Define an IpfProcessingDataHandler spring bean.
This handler will receive a single envelope at a time, and works with both Kafka and HTTP ingress.
Define a spring bean, e.g.
@Bean
IpfProcessingDataHandler myIpfProcessingDataHandler() {
new IpfProcessingDataHandler() {
@Override
//Optionally implement this method to handle V1 data
public CompletionStage<Void> handle(final DataEnvelope envelope) {
//Do something
return CompletableFuture.completedFuture(null);
}
@Override
//Optionally implement this method to handle V2 data
public CompletionStage<Void> handle(final DataEnvelopeV2 envelope) {
//Do something
return CompletableFuture.completedFuture(null);
}
};
}
You must explicitly choose which versions of the data model to handle. The handle methods are optional, and if they’re not implemented, any data received is ignored.
|
Creating a custom consumer for IPF Processing Data
| This section is of particular interest if your IPF Processing Data consumer utilises custom logic to handle incoming messages, without using any of the pre-configured IPF Processing Data Ingress modules. |
The payload created by IPF Processing Data egress contains the header ipf_schema_version, the value of which will define what version of the IPF Processing Data model is contained within.
The payload will also contain the header schema-version.This header is deprecated, the final IPF release that will support this header is 2026.4, it will not be supported in IPF release 2027.1. While supported, the payload will include both ipf_schema_version and schema-version headers and they will have the same value.
|
| Message Header | Description |
|---|---|
|
The inbound message utilises the V2 IPF Processing Data model |
|
The inbound message utilises the V1 IPF Processing Data model |
|
The inbound message utilises the V1 IPF Processing Data model. This will have originated from a previous version of IPF that only contained the V1 data model. |
When updating a consumer of IPF Processing Data messages, you should add a check for the ipf_schema_version or the schema-version header to identify which data model version to utilise when handling the inbound message.
Receive Connector ReceiveTransportMessageConverter example
ReceiveTransportMessageConverter<T> receiveTransportMessageConverter() {
return transportMessage -> {
final var version = (String) transportMessage.getMessageHeaders().getHeader(SchemaVersion.IPF_SCHEMA_VERSION).orElse(null);
final var json = transportMessage.getPayload().toString();
if (version == null || SchemaVersion.V1.equals(version)) {
// Handle the V1 Data Model - DataEnvelope
// ...
}
if (SchemaVersion.V2.equals(version)) {
// Handle the V2 Data Model - DataEnvelopeV2
// ...
}
throw new IllegalStateException("Unsupported IPF Processing Data version " + version);
};
}
Http Controller example
@RestController
final class IpfProcessingDataIngressController {
@PostMapping("/ipf-processing-data")
@ResponseStatus(HttpStatus.ACCEPTED)
Mono<Void> ingestV1ByDefault(@RequestBody final DataEnvelope envelope) {
// Handle the V1 Data Model - DataEnvelope
// ...
}
@PostMapping(path = "/ipf-processing-data", headers = SchemaVersion.IPF_SCHEMA_VERSION + "=" + SchemaVersion.V1)
@ResponseStatus(HttpStatus.ACCEPTED)
Mono<Void> ingestV1(@RequestBody final DataEnvelope envelope) {
// Handle the V1 Data Model - DataEnvelope
// ...
}
@PostMapping(path = "/ipf-processing-data", headers = SchemaVersion.IPF_SCHEMA_VERSION + "=" + SchemaVersion.V2)
@ResponseStatus(HttpStatus.ACCEPTED)
Mono<Void> ingestV2(@RequestBody final DataEnvelopeV2 envelope) {
// Handle the V2 Data Model - DataEnvelopeV2
// ...
}
}