Cómo Consumir IPF processing data
Paso 1: Elija un medio de transporte
Primero debe elegir si desea utilizar kafka or http como su protocolo de transporte. Si Kafka entonces añada la dependencia:
<dependency>
<groupId>com.iconsolutions.ipf.core.processingdata</groupId>
<artifactId>ipf-processing-data-ingress-kafka</artifactId>
</dependency>
o si http agregue esta dependencia:
<dependency>
<groupId>com.iconsolutions.ipf.core.processingdata</groupId>
<artifactId>ipf-processing-data-ingress-http</artifactId>
</dependency>
Si usted depende de ambos en su proyecto, deberá configurarlos explícitamente.ipf.processing-data.ingress.transport como ya sea kafka or http.
| HTTP ingress no está destinado a ser utilizado en producción. |
Paso 2: Registre un controlador spring bean
Decida si manejar la entrada IPF Processing Data mensajes uno a la vez, o en un lote. Si tanto el controlador de no agrupamiento como el controlador de agrupamiento están presentes, se utiliza el controlador de agrupamiento.
| Tipo de Manejador | Bueno para |
|---|---|
BatchedIpfProcessingDataHandler |
Aplicaciones que están diseñadas para procesar de manera eficiente un gran lote de Data Envelopes simultáneamente. Por ejemplo, el IPF ODS Ingestion Service utiliza el controlador por lotes, lo que le permite aprovechar las mejoras de rendimiento ofrecidas por bulk inserciones de base de datos. |
IpfProcessingDataHandler |
Procesamiento de datos en tiempo real con retroalimentación inmediata. |
Por lotes
| El soporte completo por lotes solo funciona con el Kafka ingreso. Al utilizar HTTP el tamaño del lote siempre será 1. |
Defina un BatchedIpfProcessingDataHandler spring bean.
Este controlador recibirá una lista de sobres donde el tamaño de la lista es de hasta el tamaño de lote configurado con ipf.processing-data.ingress.batch-size.
@Bean
BatchedIpfProcessingDataHandler myBatchedIpfProcessingDataHandler() {
new BatchedIpfProcessingDataHandler() {
@Override
//Optionally implement a handle method to handle V1 data
public CompletionStage<Void> handle(final List<DataEnvelope> envelopes) {
//Do something
return CompletableFuture.completedFuture(null);
}
@Override
//Optionally implement a handleV2 method to handle V2 data
public CompletionStage<Void> handleV2(final List<DataEnvelopeV2> envelopes) {
//Do something
return CompletableFuture.completedFuture(null);
}
};
}
Debe elegir explícitamente qué versiones del modelo de datos manejar, y puede soportar ambas. El handle y handleV2 los métodos son opcionales-puede omitir uno, o incluso ambos. Si los métodos no están implementados, se ignora cualquier dato recibido.
|
Puede configurar el tamaño del lote y el tiempo de espera para la plenitud del lote con la siguiente configuración..
| Propiedad | Predeterminado | Descripción |
|---|---|---|
|
|
El número máximo de sobres a recibir antes de invocar el manejador de lotes |
|
|
El tiempo máximo de espera para que se alcance el tamaño del lote. |
|
|
Limita el número de concurrentes mapping operaciones ejecutadas en lotes consumidos de DataEnvelopes. Consulte el Akka Streams documentación para obtener más información. |
La propiedad del tamaño del lote también impulsa directamente las siguientes tres claves de configuración:
-
ipf.processing-data.ingress.connector.receiver-parallelism -
ipf.processing-data.ingress.connector.mapping-parallelism -
ipf.processing-data.ingress.kafka.consumer.kafka-clients.max.poll.records
es decir, los valores predeterminados son..
ipf.processing-data.ingress.connector.receiver-parallelism = ${ipf.processing-data.ingress.batch-size}
ipf.processing-data.ingress.connector.mapping-parallelism = ${ipf.processing-data.ingress.batch-size}
ipf.processing-data.ingress.kafka.consumer.kafka-clients.max.poll.records = ${ipf.processing-data.ingress.batch-size}
Configuración de ajuste por lotes
Para maximizar el rendimiento de la consumición en lotes, puede ser necesario realizar ajustes de configuración durante las pruebas de rendimiento.
El ipf_processing_data_ingress_batch_receive_size métrica de resumen registra el tamaño de cada lote recibido por su BatchedIpfProcessingDataHandler spring bean. Idealmente, su controlador debe recibir lotes que tengan un tamaño cercano al tamaño de lote configurado, minimizando así el tiempo dedicado a esperar a que el lote se complete. Algunos escenarios potenciales son:
-
Su controlador está recibiendo consistentemente lotes iguales al tamaño de lote configurado.
-
Esto podría indicar que el rendimiento de su aplicación está limitado por el paralelismo de Ingress configurado. Aumente
ipf.processing-data.ingress.parallelismpara permitir más concurrentes mapping operaciones en lotes de DataEnvelopes.
-
-
Su controlador está recibiendo constantemente lotes con tamaños muy inferiores al tamaño de lote configurado.
-
Esto podría indicar que el rendimiento de su aplicación está limitado por el paralelismo del conector configurado. Aumente ambos
ipf.processing-data.ingress.connector.receiver-parallelismyipf.processing-data.ingress.connector.mapping-parallelismpara permitir más concurrentes mapping operaciones sobre los mensajes recibidos por su conector Ingress. Consulte Guía Rápida del Conector de Recepción para obtener detalles sobre la configuración del Conector.
-
Único
Defina un IpfProcessingDataHandler spring bean.
Este controlador recibirá un solo sobre a la vez y funciona con ambos Kafka y HTTP ingreso.
Defina un spring bean, p. ej.
@Bean
IpfProcessingDataHandler myIpfProcessingDataHandler() {
new IpfProcessingDataHandler() {
@Override
//Optionally implement this method to handle V1 data
public CompletionStage<Void> handle(final DataEnvelope envelope) {
//Do something
return CompletableFuture.completedFuture(null);
}
@Override
//Optionally implement this method to handle V2 data
public CompletionStage<Void> handle(final DataEnvelopeV2 envelope) {
//Do something
return CompletableFuture.completedFuture(null);
}
};
}
Debe elegir explícitamente qué versiones del modelo de datos manejar. El handle Los métodos son opcionales, y si no se implementan, se ignora cualquier dato recibido.
|
Migre para consumir V2 Processing Data Modelo
| Esta sección es de particular interés si su IPF Processing Data el consumidor utiliza custom lógica para manejar mensajes entrantes, sin utilizar ninguno de los preconfigurados IPF Processing Data Módulos de ingreso. |
La carga útil creada por IPF Processing Data egress contiene el encabezado schema-version, cuyo valor definirá qué versión de la IPF Processing Data el modelo está contenido dentro.
| Encabezado del Mensaje | Descripción |
|---|---|
schema-version = 2 |
El mensaje entrante utiliza la V2 IPF Processing Data modelo |
schema-version = 1 |
El mensaje entrante utiliza la V1 IPF Processing Data modelo |
la versión del esquema no está establecida |
El mensaje entrante utiliza la V1 IPF Processing Data modelo. Esto habrá originado de una versión anterior de IPF que solo contenía el modelo de datos V1. |
Al actualizar un consumidor de IPF Processing Data mensajes, debe agregar una verificación para el schema-value encabezado para identificar qué versión del modelo de datos utilizar al manejar el mensaje entrante.
-
Receive Connector ReceiveTransportMessageConverter ejemplo
ReceiveTransportMessageConverter<T> receiveTransportMessageConverter() {
return transportMessage -> {
final var version = (String) transportMessage.getMessageHeaders().getHeader(SchemaVersion. HEADER_KEY).orElse(null);
final var json = transportMessage.getPayload().toString();
if (version == null || SchemaVersion. V1.equals(version)) {
// Handle the V1 Data Model - DataEnvelope
//..
}
if (SchemaVersion. V2.equals(version)) {
// Handle the V2 Data Model - DataEnvelopeV2
//..
}
throw new IllegalStateException("Unsupported IPF Processing Data version " + version);
};
}
Ejemplo de Controlador Http
@RestController
final class IpfProcessingDataIngressController {
@PostMapping("/ipf-processing-data")
@ResponseStatus(HttpStatus. ACCEPTED)
Mono<Void> ingestV1ByDefault(@RequestBody final DataEnvelope envelope) {
// Handle the V1 Data Model - DataEnvelope
//..
}
@PostMapping(path = "/ipf-processing-data", headers = SchemaVersion. HEADER_KEY + "=" + SchemaVersion. V1)
@ResponseStatus(HttpStatus. ACCEPTED)
Mono<Void> ingestV1(@RequestBody final DataEnvelope envelope) {
// Handle the V1 Data Model - DataEnvelope
//..
}
@PostMapping(path = "/ipf-processing-data", headers = SchemaVersion. HEADER_KEY + "=" + SchemaVersion. V2)
@ResponseStatus(HttpStatus. ACCEPTED)
Mono<Void> ingestV2(@RequestBody final DataEnvelopeV2 envelope) {
// Handle the V2 Data Model - DataEnvelopeV2
//..
}
}
Diferencias de esquema
Los dos IPF Processing Data los esquemas son estructuralmente similares. En la mayoría de los casos, la migración es sencilla y se trata simplemente de manejar los datos en los nuevos POJOs. Hay una pequeña excepción;Custom Los objetos han sido optimizados para la V2. IPF Processing Data modelo. El V1 CustomObjectWrapper y CustomObjectType Los POJOs han sido eliminados. El nuevo CustomObjectContainer2 utiliza campos de tipo String de key y value como un reemplazo.
| Los objetos personalizados actualmente no son exportados por IPF Processing Data Salida. |
Ejemplo de diferencias entre Custom esquemas de objeto
CustomObjectContainer v1Container = CustomObjectContainer.builder()
.object(CustomObjectWrapper.builder()
.name("ClientCustomObject")
.content("Custom object value")
.build())
.objectType(CustomObjectType. KEY_VALUE)
// Remaining fields are functionally identical
//.primaryAssociation(..)
//.uniqueId(..)
//.createdAt(..)
//.processObjectReference(.)
//.processingContext(..)
.build();
CustomObjectContainer2 v2Container = new CustomObjectContainer2()
.key("ClientCustomObject")
.value("Custom object value");
// Remaining fields are functionally identical
//.primaryAssociation(..)
//.uniqueId(..)
//.createdAt(..)
//.processObjectReference(.)
//.processingContext(..);