Usando Avro con Kafka Conectores
¿Cómo puedo enviar y recibir Avro mensajes con Kafka¿conectores?
Apache Avro es un marco de serialización de datos que proporciona un formato de datos binario compacto y rápido, así como estructuras de datos ricas. Esta guía explica cómo configurar Kafka conectores para trabajar con Avro-mensajes codificados, con o sin un Registro de Esquemas.
Descripción general
La biblioteca de conectores admite dos enfoques para Avro serialización:
| Enfoque | Descripción | Cuándo Usar |
|---|---|---|
Formato autodocumentado |
El esquema está incrustado en cada mensaje utilizando Avro Formato de Contenedor de Objetos |
Configuraciones simples, pruebas o cuando un Registro de Esquemas no está disponible |
Registro de Esquemas |
El esquema se almacena de forma central; los mensajes contienen únicamente una referencia de ID de esquema. |
Entornos de producción que requieren evolución de esquemas, validación y gobernanza. |
Dependencias
Añada el connector-kafka dependencia a su pom.xml:
<dependency>
<groupId>com.iconsolutions.ipf.core.connector</groupId>
<artifactId>connector-kafka</artifactId>
</dependency>
El connector-kafka el módulo incluye lo necesario Avro y las dependencias del cliente de Confluent Schema Registry.
Opción 1: Formato Autodescriptivo (Sin Registro de Esquema)
Este enfoque incrusta el Avro esquema en cada mensaje, haciendo que los mensajes sean autodescriptivos. Esto es útil para el desarrollo, las pruebas o escenarios donde no está disponible un Registro de Esquemas.
El formato autodescriptivo admite:
-
GenericRecord- registros dinámicos con esquema -
SpecificRecord- generado Avro clases -
POJOs simples-usando Avro reflexión para derivar el esquema en tiempo de ejecución
Configuración
mybank.payments {
kafka.producer {
topic = "mybank.payments.request"
kafka-clients {
# No schema.registry.url means self-describing format will be used
}
}
kafka.consumer {
topics = ["mybank.payments.response"]
kafka-clients {
group.id = "mybank-payments-consumer"
# No schema.registry.url means self-describing format will be used
}
}
}
Creando el Transporte de Envío
Utilice el avroBuilder para crear un transporte de envío:
// Using GenericRecord
var kafkaSendTransport = KafkaConnectorTransport
.<GenericRecord>avroBuilder(
"payments-send-transport",
actorSystem,
"mybank.payments"
)
.build();
// Or using a POJO directly (schema derived via reflection)
var pojoSendTransport = KafkaConnectorTransport
.<PaymentMessage>avroBuilder( (1)
"payments-send-transport",
actorSystem,
"mybank.payments"
)
.build();
| 1 | Los POJOs requieren un constructor sin argumentos para la deserialización. |
Creando el Transporte de Recepción
Utilice el avroBuilder para crear un transporte de recepción:
var kafkaReceiveTransport = KafkaAckReceiveConnectorTransport
.<GenericRecord>avroBuilder( (1)
"payments-receive-transport",
actorSystem,
"mybank.payments"
)
.build();
<1>`KafkaAckReceiveConnectorTransport` confirma el desplazamiento para cada mensaje después del procesamiento exitoso. Utilice KafkaReceiveConnectorTransport para habilitar Kafka’s auto-commit.
Opción 2: Uso de Confluent Schema Registry
Para entornos de producción, utilizar un Registro de Esquemas de Confluent se recomienda. El Registro de Esquemas proporciona:
-
Almacenamiento y versionado de esquemas centralizados
-
Evolución del esquema con verificación de compatibilidad
-
Tamaño de mensaje reducido (ID de esquema en lugar de esquema completo)
Para obtener información detallada sobre los conceptos y la configuración del Registro de Esquemas, consulte el Fundamentos del Registro de Esquemas documentación.
Configuración
Añada el schema.registry.url para habilitar la integración del Registro de Esquemas:
mybank.payments {
kafka.producer {
topic = "mybank.payments.request"
kafka-clients {
schema.registry.url = "http://schema-registry:8081" (1)
# Optional: configure serializer behavior
# auto.register.schemas = true (2)
}
}
kafka.consumer {
topics = ["mybank.payments.response"]
kafka-clients {
group.id = "mybank-payments-consumer"
schema.registry.url = "http://schema-registry:8081" (1)
# Optional: configure deserializer behavior
# specific.avro.reader = false (3)
}
}
}
| 1 | Cuando schema.registry.url está presente, el conector utiliza automáticamente Confluent Avro Serializadores/Deserializadores |
| 2 | Se pueden especificar opciones de configuración adicionales; consulte KafkaAvroSerializerConfig |
| 3 | Se pueden especificar opciones de configuración adicionales; consulte KafkaAvroDeserializerConfig |
Creando el Transporte
El código de creación de transporte es idéntico al formato autodescriptivo.
El avroBuilder detecta automáticamente la presencia de schema.registry.url y configura el serializador/deserializador apropiado:
// Send transport - automatically uses KafkaAvroSerializer when schema.registry.url is configured
var kafkaSendTransport = KafkaConnectorTransport
.<GenericRecord>avroBuilder(
"payments-send-transport",
actorSystem,
"mybank.payments"
)
.build();
// Receive transport - automatically uses KafkaAvroDeserializer when schema.registry.url is configured
var kafkaReceiveTransport = KafkaAckReceiveConnectorTransport
.<GenericRecord>avroBuilder(
"payments-receive-transport",
actorSystem,
"mybank.payments"
)
.build();
Opción 3: POJOs con Registro de Esquema
Al utilizar Schema Registry con POJOs (en lugar de GenericRecord o generado SpecificRecord clases), active el modo de reflexión en los serializadores de Confluent.
Este enfoque utiliza Avro’s ReflectData para derivar automáticamente esquemas de clases POJO en tiempo de ejecución.
Este enfoque es particularmente útil cuando:
|
Configuración
Active el modo de reflexión configurando schema.reflection = true:
mybank.payments {
kafka.producer {
topic = "mybank.payments.request"
kafka-clients {
schema.registry.url = "http://schema-registry:8081"
schema.reflection = true (1)
auto.register.schemas = true (2)
}
}
kafka.consumer {
topics = ["mybank.payments.response"]
kafka-clients {
group.id = "mybank-payments-consumer"
schema.registry.url = "http://schema-registry:8081"
schema.reflection = true (1)
}
}
}
| 1 | Permite la derivación de esquemas basada en reflexión a partir de clases POJO |
| 2 | Se recomienda al utilizar el modo de reflexión para registrar automáticamente esquemas derivados. |
Defina su POJO
Crear un estándar Java clase con un constructor sin argumentos (requerido para Avro reflection):
@Data
@NoArgsConstructor (1)
@AllArgsConstructor
public class PaymentMessage {
private String id;
private String description;
private int amount;
private String timestamp;
private Map<String, String> metadata; (2)
private List<String> tags;
}
| 1 | Se requiere un constructor sin argumentos para Avro deserialización de reflexión |
| 2 | Tipos complejos como Map y List son compatibles |
Creando el Transporte
Al utilizar el modo de reflexión, escriba el transporte con su clase POJO en lugar de GenericRecord:
// Send transport typed with your POJO
var kafkaSendTransport = KafkaConnectorTransport
.<PaymentMessage>avroBuilder( (1)
"payments-send-transport",
actorSystem,
"mybank.payments"
)
.build();
// Receive transport typed with your POJO
var kafkaReceiveTransport = KafkaAckReceiveConnectorTransport
.<PaymentMessage>avroBuilder( (1)
"payments-receive-transport",
actorSystem,
"mybank.payments"
)
.build();
| 1 | Utilice su tipo POJO en lugar de GenericRecord |
La reflexión de Avro utiliza `org.apache.avro.reflect. ReflectData` para derivar esquemas de Java clases en tiempo de ejecución. Campos que son `static` or `transient` se ignoran, y todos los demás campos heredados se incluyen en el esquema. |
Construyendo conectores con Avro Transportes
Una vez que haya configurado el Avro transporte, conéctelo a sus conectores utilizando el patrón de constructor estándar.
Send Connector
SendConnector<MyDomainRequest, GenericRecord> sendConnector = SendConnector
.<MyDomainRequest, GenericRecord>builder(
"payments-send-connector",
"mybank.payments",
actorSystem
)
.withConnectorTransport(kafkaSendTransport) (1)
.withSendTransportMessageConverter(this::toTransportMessage) (2)
.withCorrelationIdExtractor(record -> record.get("correlationId").toString()) (3)
.withCorrelationService(correlationService)
.build();
| 1 | El Avro-transporte configurado creado anteriormente |
| 2 | Convertidor que crea un TransportMessage con el Avro GenericRecord como carga útil |
| 3 | Extraiga el ID de correlación del Avro registro |
Receive Connector
ReceiveConnector<MyDomainResponse, GenericRecord> receiveConnector = ReceiveConnector
.<MyDomainResponse, GenericRecord>builder(
"payments-receive-connector",
"mybank.payments",
actorSystem
)
.withReceivingConnectorTransport(kafkaReceiveTransport) (1)
.withReceiveTransportMessageConverter(this::fromTransportMessage) (2)
.withCorrelationIdExtractor(record -> record.get("correlationId").toString()) (3)
.withCorrelationService(correlationService)
.withReceiveHandler(this::handleResponse)
.build();
| 1 | El Avro-transporte configurado creado anteriormente |
| 2 | Convertidor que extrae el Avro GenericRecord from the TransportMessage carga útil |
| 3 | Extraiga el ID de correlación del Avro registro para la correlación de respuestas |
Trabajando con Avro Registros
Creando GenericRecords
private TransportMessage toTransportMessage(MyDomainRequest request) {
Schema schema = new Schema.Parser().parse("""
{
"type": "record",
"name": "PaymentRequest",
"namespace": "com.mybank.payments",
"fields": [
{"name": "correlationId", "type": "string"},
{"name": "amount", "type": "double"},
{"name": "currency", "type": "string"}
]
}
""");
GenericRecord record = new GenericData.Record(schema);
record.put("correlationId", request.getCorrelationId());
record.put("amount", request.getAmount());
record.put("currency", request.getCurrency());
return new TransportMessage(new MessageHeaders(), record);
}
Lectura GenericRecords
private MyDomainResponse fromTransportMessage(TransportMessage transportMessage) {
GenericRecord record = (GenericRecord) transportMessage.getPayload();
return new MyDomainResponse(
record.get("correlationId").toString(),
(Double) record.get("amount"),
record.get("status").toString()
);
}
Autenticación del Registro de Esquemas
Para implementaciones seguras de Schema Registry, configure la autenticación en kafka-clients:
mybank.payments.kafka.producer.kafka-clients {
schema.registry.url = "https://schema-registry:8081"
basic.auth.credentials.source = "USER_INFO"
basic.auth.user.info = "username:password"
}
Ver Seguridad del Registro de Esquemas para más opciones de autenticación.
Lectura adicional
-
docs.confluent.io/platform/current/schema-registry/fundamentals/serdes-develop/serdes-avro.html Avro Serializador y Deserializador
-
Kafka Introducción Rápida para básico Kafka configuración
-
Solicitud-Respuesta Asincrónica para patrones de correlación