Documentation for a newer release is available. View Latest

Cómo consumir IPF processing data

Paso 1: Elegir un transporte

Primero, debe elegir si usará Kafka o HTTP como protocolo de transporte. Si usa Kafka, agregue la dependencia:

<dependency>
    <groupId>com.iconsolutions.ipf.core.processingdata</groupId>
    <artifactId>ipf-processing-data-ingress-kafka</artifactId>
</dependency>

o si http agrega esta dependencia:

<dependency>
    <groupId>com.iconsolutions.ipf.core.processingdata</groupId>
    <artifactId>ipf-processing-data-ingress-http</artifactId>
</dependency>

Si depende de ambos en su proyecto, deberá configurar explícitamente ipf.processing-data.ingress.transport como kafka o http.

NOTA: El ingress HTTP no está pensado para producción.

Paso 2: Registrar un bean handler de spring

Decida si manejar los mensajes entrantes de IPF Processing Data de uno en uno o por lotes. Si están presentes tanto el handler sin lotes como el handler con lotes, se utilizará el handler con lotes.

Tipo de handler Adecuado para

BatchedIpfProcessingDataHandler

Aplicaciones diseñadas para procesar eficientemente un lote grande de Data Envelopes simultáneamente. Por ejemplo, el IPF ODS Ingestion Service utiliza el handler por lotes, lo que le permite aprovechar las mejoras de rendimiento que ofrecen las inserciones en bloque a la base de datos.

IpfProcessingDataHandler

Procesamiento de datos en tiempo real con feedback inmediato.
Aislar el procesamiento de mensajes, asegurando que un fallo solo afecta al Data Envelope actual.
Aplicaciones de Http Ingress.

Por lotes

NOTA: El soporte completo de lotes solo funciona con el ingress Kafka. Cuando se utiliza HTTP, el tamaño del lote siempre será 1.

Defina un bean de BatchedIpfProcessingDataHandler.

Este handler recibirá una lista de sobres cuyo tamaño será de hasta el tamaño de lote configurado con ipf.processing-data.ingress.batch-size.

@Bean
BatchedIpfProcessingDataHandler myBatchedIpfProcessingDataHandler() {
    new BatchedIpfProcessingDataHandler() {
        @Override
        //Optionally implement a handle method to handle V1 data
        public CompletionStage<Void> handle(final List<DataEnvelope> envelopes) {
            //Do something
            return CompletableFuture.completedFuture(null);
        }

        @Override
        //Optionally implement a handleV2 method to handle V2 data
        public CompletionStage<Void> handleV2(final List<DataEnvelopeV2> envelopes) {
            //Do something
            return CompletableFuture.completedFuture(null);
        }
    };
}

NOTA: Debe elegir explícitamente qué versiones del modelo de datos manejar, y puede soportar ambas. Los métodos handle y handleV2 son opcionales: puede omitir cualquiera de ellos, o incluso ambos. Si no se implementan, cualquier dato recibido se ignora.

Puede configurar el tamaño del lote y el tiempo de espera para llenarlo con la siguiente configuración…​

Propiedad Por defecto Descripción

ipf.processing-data.ingress.batch-size

500

Número máximo de sobres a recibir antes de invocar el handler por lotes

ipf.processing-data.ingress.batch-interval

10ms

Tiempo máximo de espera para alcanzar el tamaño de lote.

ipf.processing-data.ingress.parallelism

10

Limita el número de operaciones de mapeo concurrentes ejecutadas sobre los lotes de DataEnvelopes consumidos. Consulte la documentación de Akka Streams para más información.

La propiedad del tamaño de lote también influye directamente en las siguientes tres claves de configuración:

  • ipf.processing-data.ingress.connector.receiver-parallelism

  • ipf.processing-data.ingress.connector.mapping-parallelism

  • ipf.processing-data.ingress.kafka.consumer.kafka-clients.max.poll.records

Es decir, los valores por defecto son…​

ipf.processing-data.ingress.connector.receiver-parallelism = ${ipf.processing-data.ingress.batch-size}
ipf.processing-data.ingress.connector.mapping-parallelism = ${ipf.processing-data.ingress.batch-size}
ipf.processing-data.ingress.kafka.consumer.kafka-clients.max.poll.records = ${ipf.processing-data.ingress.batch-size}

Ajuste de la configuración por lotes

Para maximizar el rendimiento del consumo por lotes, puede ser necesario ajustar la configuración durante las pruebas de rendimiento.

La métrica de resumen ipf_processing_data_ingress_batch_receive_size registra el tamaño de cada lote recibido por su bean de spring BatchedIpfProcessingDataHandler. Idealmente, su handler debería recibir lotes cuyo tamaño esté cercano al tamaño de lote configurado, minimizando así el tiempo de espera para que el lote se llene. Algunos escenarios posibles:

  • Su handler recibe constantemente lotes iguales al tamaño de lote configurado.

    • Esto podría indicar que el rendimiento de su aplicación está limitado por el paralelismo de Ingress configurado. Aumente ipf.processing-data.ingress.parallelism para permitir más operaciones de mapeo concurrentes sobre lotes de DataEnvelopes.

  • Su handler recibe constantemente lotes con tamaños muy inferiores al tamaño configurado.

    • Esto podría indicar que el rendimiento de su aplicación está limitado por el paralelismo del conector configurado. Aumente ipf.processing-data.ingress.connector.receiver-parallelism y ipf.processing-data.ingress.connector.mapping-parallelism para permitir más operaciones de mapeo concurrentes sobre los mensajes recibidos por su conector de Ingress. Vea Receiving Connector Quickstart para detalles sobre la configuración del conector.

Individual

Defina un bean IpfProcessingDataHandler de spring.

Este handler recibirá un solo sobre a la vez y funciona tanto con ingress Kafka como HTTP.

Defina un bean de spring, por ejemplo:

@Bean
IpfProcessingDataHandler myIpfProcessingDataHandler() {
    new IpfProcessingDataHandler() {
        @Override
        //Optionally implement this method to handle V1 data
        public CompletionStage<Void> handle(final DataEnvelope envelope) {
            //Do something
            return CompletableFuture.completedFuture(null);
        }


        @Override
        //Optionally implement this method to handle V2 data
        public CompletionStage<Void> handle(final DataEnvelopeV2 envelope) {
            //Do something
            return CompletableFuture.completedFuture(null);
        }
    };
}

NOTA: Debe elegir explícitamente qué versiones del modelo de datos manejar. Los métodos handle son opcionales y, si no se implementan, cualquier dato recibido se ignora.

Migrar para consumir el modelo de datos V2 de Processing Data

Esta sección es de especial interés si su consumidor de IPF Processing Data utiliza lógica personalizada para manejar los mensajes entrantes, sin utilizar ninguno de los módulos de Ingress de IPF Processing Data preconfigurados.

El payload creado por IPF Processing Data egress contiene la cabecera schema-version, cuyo valor definirá qué versión del modelo de IPF Processing Data se contiene.

Cabecera del mensaje Descripción

schema-version = 2

El mensaje entrante utiliza el modelo de IPF Processing Data V2

schema-version = 1

El mensaje entrante utiliza el modelo de IPF Processing Data V1

schema-version no está establecido

El mensaje entrante utiliza el modelo de IPF Processing Data V1. Esto habrá originado de una versión anterior de IPF que solo contenía el modelo de datos V1.

Al actualizar un consumidor de mensajes de IPF Processing Data, debe añadir una comprobación de la cabecera schema-value para identificar qué versión del modelo de datos utilizar al manejar el mensaje entrante.

  1. Ejemplo de ReceiveTransportMessageConverter del conector receptor

ReceiveTransportMessageConverter<T> receiveTransportMessageConverter() {
    return transportMessage -> {
        final var version = (String) transportMessage.getMessageHeaders().getHeader(SchemaVersion.HEADER_KEY).orElse(null);
        final var json = transportMessage.getPayload().toString();

        if (version == null || SchemaVersion.V1.equals(version)) {
            // Handle the V1 Data Model - DataEnvelope
            // ...
        }

        if (SchemaVersion.V2.equals(version)) {
            // Handle the V2 Data Model - DataEnvelopeV2
            // ...
        }

        throw new IllegalStateException("Unsupported IPF Processing Data version " + version);
    };
}
  1. Ejemplo de controlador Http

@RestController
final class IpfProcessingDataIngressController {

    @PostMapping("/ipf-processing-data")
    @ResponseStatus(HttpStatus.ACCEPTED)
    Mono<Void> ingestV1ByDefault(@RequestBody final DataEnvelope envelope) {
        // Handle the V1 Data Model - DataEnvelope
        // ...
    }

    @PostMapping(path = "/ipf-processing-data", headers = SchemaVersion.HEADER_KEY + "=" + SchemaVersion.V1)
    @ResponseStatus(HttpStatus.ACCEPTED)
    Mono<Void> ingestV1(@RequestBody final DataEnvelope envelope) {
        // Handle the V1 Data Model - DataEnvelope
        // ...
    }

    @PostMapping(path = "/ipf-processing-data", headers = SchemaVersion.HEADER_KEY + "=" + SchemaVersion.V2)
    @ResponseStatus(HttpStatus.ACCEPTED)
    Mono<Void> ingestV2(@RequestBody final DataEnvelopeV2 envelope) {
        // Handle the V2 Data Model - DataEnvelopeV2
        // ...
    }
}

Diferencias de esquema

Los dos esquemas de IPF Processing Data son estructuralmente similares. En la mayoría de los casos, la migración es sencilla y simplemente consiste en manejar los datos en los nuevos POJOs. Hay una pequeña excepción: los Custom Objects se han simplificado para el modelo V2 de IPF Processing Data. Los POJOs CustomObjectWrapper y CustomObjectType de V1 se han eliminado. El nuevo CustomObjectContainer2 utiliza campos String key y value como reemplazo.

NOTA: Actualmente, los custom objects no son exportados por IPF Processing Data Egress.

  1. Ejemplo de diferencias entre esquemas de objetos Custom

CustomObjectContainer v1Container = CustomObjectContainer.builder()
        .object(CustomObjectWrapper.builder()
                .name("ClientCustomObject")
                .content("Custom object value")
                .build())
        .objectType(CustomObjectType.KEY_VALUE)
// Remaining fields are functionally identical
//        .primaryAssociation(...)
//        .uniqueId(...)
//        .createdAt(...)
//        .processObjectReference(..)
//        .processingContext(...)
        .build();

CustomObjectContainer2 v2Container = new CustomObjectContainer2()
        .key("ClientCustomObject")
        .value("Custom object value");
// Remaining fields are functionally identical
//        .primaryAssociation(...)
//        .uniqueId(...)
//        .createdAt(...)
//        .processObjectReference(..)
//        .processingContext(...);