Cómo consumir datos de procesamiento de IPF

Paso 1: Elija un medio de transporte

Primero debe elegir si utilizar kafka o http como su protocolo de transporte. Si Kafka entonces añada la dependencia:

<dependency>
    <groupId>com.iconsolutions.ipf.core.processingdata</groupId>
    <artifactId>ipf-processing-data-ingress-kafka</artifactId>
</dependency>

o si http añada esta dependencia:

<dependency>
    <groupId>com.iconsolutions.ipf.core.processingdata</groupId>
    <artifactId>ipf-processing-data-ingress-http</artifactId>
</dependency>

Si depende de ambos en su proyecto, deberá configurarlos explícitamente.ipf.processing-data.ingress.transport como ya sea kafka or http.

La entrada HTTP no está destinada a ser utilizada en producción.

Paso 2: Registre un controlador de Spring bean

Decida si manejar la entrada IPF Processing Data mensajes uno a la vez, o en un lote. Si tanto el controlador de no agrupamiento como el controlador de agrupamiento están presentes, se utiliza el controlador de agrupamiento.

Tipo de Manejador Bueno para

BatchedIpfProcessingDataHandler

Aplicaciones que están diseñadas para procesar de manera eficiente un gran lote de Data Envelopes simultáneamente. Por ejemplo, el IPF ODS Ingestion Service utiliza el controlador por lotes, lo que le permite aprovechar las mejoras de rendimiento ofrecidas por las inserciones masivas en la base de datos.

IpfProcessingDataHandler

Procesamiento de datos en tiempo real con retroalimentación inmediata.
Aislar el procesamiento de mensajes, asegurando que una falla solo afecte al actual Data Envelope.
Aplicaciones de Http Ingress.

Por lotes

El soporte completo por lotes solo funciona con el Kafka ingreso. Al utilizar HTTP el tamaño del lote siempre será 1.

Defina un BatchedIpfProcessingDataHandler Spring bean.

Este controlador recibirá una lista de sobres donde el tamaño de la lista es de hasta el tamaño de lote configurado con ipf.processing-data.ingress.batch-size.

@Bean
BatchedIpfProcessingDataHandler myBatchedIpfProcessingDataHandler() {
    new BatchedIpfProcessingDataHandler() {
        @Override
        //Optionally implement a handle method to handle V1 data
        public CompletionStage<Void> handle(final List<DataEnvelope> envelopes) {
            //Do something
            return CompletableFuture.completedFuture(null);
        }

        @Override
        //Optionally implement a handleV2 method to handle V2 data
        public CompletionStage<Void> handleV2(final List<DataEnvelopeV2> envelopes) {
            //Do something
            return CompletableFuture.completedFuture(null);
        }
    };
}

NOTA: Debe elegir explícitamente qué versiones del modelo de datos manejar, y puede soportar ambas. La handle y handleV2 los métodos son opcionales - Puede omitir uno, o incluso ambos. Si los métodos no están implementados, se ignora cualquier dato recibido.

Puede configurar el tamaño del lote y el tiempo de espera para la plenitud del lote con la siguiente configuración..

Propiedad Predeterminado Descripción

ipf.processing-data.ingress.batch-size

500

El número máximo de sobres a recibir antes de invocar el manejador de lotes

ipf.processing-data.ingress.batch-interval

10ms

El tiempo máximo de espera para que se alcance el tamaño del lote.

ipf.processing-data.ingress.parallelism

10

Limita el número de concurrentes mapping operaciones ejecutadas sobre lotes consumidos de DataEnvelopes. Consulte el Akka Streams documentación para obtener más información.

La propiedad del tamaño del lote también impulsa directamente las siguientes tres claves de configuración:

  • ipf.processing-data.ingress.connector.receiver-parallelism

  • ipf.processing-data.ingress.connector.mapping-parallelism

  • ipf.processing-data.ingress.kafka.consumer.kafka-clients.max.poll.records

es decir, los valores predeterminados son..

ipf.processing-data.ingress.connector.receiver-parallelism = ${ipf.processing-data.ingress.batch-size}
ipf.processing-data.ingress.connector.mapping-parallelism = ${ipf.processing-data.ingress.batch-size}
ipf.processing-data.ingress.kafka.consumer.kafka-clients.max.poll.records = ${ipf.processing-data.ingress.batch-size}

Configuración de lote de ajuste

Para maximizar el rendimiento de la consumición en lotes, puede ser necesario realizar ajustes de configuración durante las pruebas de rendimiento.

El ipf_processing_data_ingress_batch_receive_size métrica de resumen registra el tamaño de cada lote recibido por su BatchedIpfProcessingDataHandler Spring bean. Idealmente, su controlador debe recibir lotes que tengan un tamaño cercano al tamaño de lote configurado, minimizando así el tiempo dedicado a esperar a que el lote se complete. Algunos escenarios potenciales son:

  • Su controlador está recibiendo consistentemente lotes iguales al tamaño de lote configurado.

    • Esto podría indicar que el rendimiento de su aplicación está limitado por el paralelismo de Ingress configurado. Aumente ipf.processing-data.ingress.parallelism para permitir más concurrentes mapping operaciones en lotes de DataEnvelopes.

  • Su controlador está recibiendo constantemente lotes con tamaños mucho menores que el tamaño de lote configurado.

    • Esto podría indicar que el rendimiento de su aplicación está limitado por el paralelismo del conector configurado. Aumente ambos ipf.processing-data.ingress.connector.receiver-parallelism y ipf.processing-data.ingress.connector.mapping-parallelism para permitir más concurrentes mapping operaciones sobre los mensajes recibidos por su conector Ingress. Consulte Guía Rápida del Conector de Recepción para obtener detalles sobre la configuración del Conector.

Único

Defina un IpfProcessingDataHandler Spring bean.

Este controlador recibirá un solo sobre a la vez y funciona con ambos Kafka y HTTP ingreso.

Defina un resorte bean, p. ej.

@Bean
IpfProcessingDataHandler myIpfProcessingDataHandler() {
    new IpfProcessingDataHandler() {
        @Override
        //Optionally implement this method to handle V1 data
        public CompletionStage<Void> handle(final DataEnvelope envelope) {
            //Do something
            return CompletableFuture.completedFuture(null);
        }


        @Override
        //Optionally implement this method to handle V2 data
        public CompletionStage<Void> handle(final DataEnvelopeV2 envelope) {
            //Do something
            return CompletableFuture.completedFuture(null);
        }
    };
}
Debe elegir explícitamente qué versiones del modelo de datos manejar. El handle Los métodos son opcionales, y si no se implementan, se ignoran los datos recibidos.

Creando un custom consumidor para IPF Processing Data

Esta sección es de particular interés si su IPF Processing Data el consumidor utiliza custom lógica para manejar mensajes entrantes, sin utilizar ninguno de los preconfigurados IPF Processing Data Módulos de ingreso.

La carga útil creada por IPF Processing Data egress contiene el encabezado ipf_schema_version, el valor del cual definirá qué versión de la IPF Processing Data el modelo está contenido dentro.

La carga útil también contendrá el encabezado.schema-version.
Este encabezado está obsoleto, la versión final de IPF que admitirá este encabezado es 2026.4, no será compatible en la versión de IPF 2027.1.
Mientras esté soportado, la carga útil incluirá tanto ipf_schema_version y schema-version encabezados y tendrán el mismo valor.
Encabezado del Mensaje Descripción

ipf_schema_version= 2
Y/O
schema_version= 2

El mensaje entrante utiliza la V2 IPF Processing Data modelo

ipf_schema_version= 1
Y/O
schema_version= 1

El mensaje entrante utiliza la V1 IPF Processing Data modelo

ipf_schema_version y schema-version no están configurados

El mensaje entrante utiliza la V1 IPF Processing Data modelo. Esto habrá originado de una versión anterior de IPF que solo contenía el modelo de datos V1.

Al actualizar un consumidor de IPF Processing Data mensajes, usted debe agregar una verificación para el ipf_schema_version or the schema-version encabezado para identificar qué versión del modelo de datos utilizar al manejar el mensaje entrante.

  1. Receive Connector ReceiveTransportMessageConverter ejemplo

ReceiveTransportMessageConverter<T> receiveTransportMessageConverter() {
    return transportMessage -> {
        final var version = (String) transportMessage.getMessageHeaders().getHeader(SchemaVersion.IPF_SCHEMA_VERSION).orElse(null);
        final var json = transportMessage.getPayload().toString();

        if (version == null || SchemaVersion.V1.equals(version)) {
            // Handle the V1 Data Model - DataEnvelope
            // ...
        }

        if (SchemaVersion.V2.equals(version)) {
            // Handle the V2 Data Model - DataEnvelopeV2
            // ...
        }

        throw new IllegalStateException("Unsupported IPF Processing Data version " + version);
    };
}
Ejemplo de Controlador Http
@RestController
final class IpfProcessingDataIngressController {

    @PostMapping("/ipf-processing-data")
    @ResponseStatus(HttpStatus.ACCEPTED)
    Mono<Void> ingestV1ByDefault(@RequestBody final DataEnvelope envelope) {
        // Handle the V1 Data Model - DataEnvelope
        // ...
    }

    @PostMapping(path = "/ipf-processing-data", headers = SchemaVersion.IPF_SCHEMA_VERSION + "=" + SchemaVersion.V1)
    @ResponseStatus(HttpStatus.ACCEPTED)
    Mono<Void> ingestV1(@RequestBody final DataEnvelope envelope) {
        // Handle the V1 Data Model - DataEnvelope
        // ...
    }

    @PostMapping(path = "/ipf-processing-data", headers = SchemaVersion.IPF_SCHEMA_VERSION + "=" + SchemaVersion.V2)
    @ResponseStatus(HttpStatus.ACCEPTED)
    Mono<Void> ingestV2(@RequestBody final DataEnvelopeV2 envelope) {
        // Handle the V2 Data Model - DataEnvelopeV2
        // ...
    }
}