Using Avro with Kafka Connectors
How do I send and receive Avro messages with Kafka connectors?
Apache Avro is a data serialization framework that provides compact, fast binary data format and rich data structures. This guide explains how to configure Kafka connectors to work with Avro-encoded messages, with or without a Schema Registry.
Overview
The connector library supports two approaches for Avro serialization:
| Approach | Description | When to Use |
|---|---|---|
Self-describing format |
Schema is embedded in each message using Avro Object Container format |
Simple setups, testing, or when a Schema Registry is not available |
Schema Registry |
Schema is stored centrally; messages contain only a schema ID reference |
Production environments requiring schema evolution, validation, and governance |
Dependencies
Add the connector-kafka dependency to your pom.xml:
<dependency>
<groupId>com.iconsolutions.ipf.core.connector</groupId>
<artifactId>connector-kafka</artifactId>
</dependency>
The connector-kafka module includes the necessary Avro and Confluent Schema Registry client dependencies.
Option 1: Self-Describing Format (No Schema Registry)
This approach embeds the Avro schema in each message, making messages self-describing. This is useful for development, testing, or scenarios where a Schema Registry is not available.
The self-describing format supports:
-
GenericRecord- dynamic records with schema -
SpecificRecord- generated Avro classes -
Plain POJOs - using Avro reflection to derive schema at runtime
Configuration
mybank.payments {
kafka.producer {
topic = "mybank.payments.request"
kafka-clients {
# No schema.registry.url means self-describing format will be used
}
}
kafka.consumer {
topics = ["mybank.payments.response"]
kafka-clients {
group.id = "mybank-payments-consumer"
# No schema.registry.url means self-describing format will be used
}
}
}
Creating the Send Transport
Use the avroBuilder to create a sending transport:
// Using GenericRecord
var kafkaSendTransport = KafkaConnectorTransport
.<GenericRecord>avroBuilder(
"payments-send-transport",
actorSystem,
"mybank.payments"
)
.build();
// Or using a POJO directly (schema derived via reflection)
var pojoSendTransport = KafkaConnectorTransport
.<PaymentMessage>avroBuilder( (1)
"payments-send-transport",
actorSystem,
"mybank.payments"
)
.build();
| 1 | POJOs require a no-argument constructor for deserialization |
Creating the Receive Transport
Use the avroBuilder to create a receiving transport:
var kafkaReceiveTransport = KafkaAckReceiveConnectorTransport
.<GenericRecord>avroBuilder( (1)
"payments-receive-transport",
actorSystem,
"mybank.payments"
)
.build();
| 1 | KafkaAckReceiveConnectorTransport commits the offset for each message after successful processing. Use KafkaReceiveConnectorTransport to enable Kafka’s auto-commit. |
Option 2: Using Confluent Schema Registry
For production environments, using a Confluent Schema Registry is recommended. The Schema Registry provides:
-
Centralized schema storage and versioning
-
Schema evolution with compatibility checking
-
Reduced message size (schema ID instead of full schema)
For detailed information about Schema Registry concepts and configuration, see the Schema Registry Fundamentals documentation.
Configuration
Add the schema.registry.url to enable Schema Registry integration:
mybank.payments {
kafka.producer {
topic = "mybank.payments.request"
kafka-clients {
schema.registry.url = "http://schema-registry:8081" (1)
# Optional: configure serializer behavior
# auto.register.schemas = true (2)
}
}
kafka.consumer {
topics = ["mybank.payments.response"]
kafka-clients {
group.id = "mybank-payments-consumer"
schema.registry.url = "http://schema-registry:8081" (1)
# Optional: configure deserializer behavior
# specific.avro.reader = false (3)
}
}
}
| 1 | When schema.registry.url is present, the connector automatically uses Confluent Avro Serializers/Deserializers |
| 2 | Additional configuration options can be specified; see KafkaAvroSerializerConfig |
| 3 | Additional configuration options can be specified; see KafkaAvroDeserializerConfig |
Creating the Transport
The transport creation code is identical to the self-describing format.
The avroBuilder automatically detects the presence of schema.registry.url and configures the appropriate serializer/deserializer:
// Send transport - automatically uses KafkaAvroSerializer when schema.registry.url is configured
var kafkaSendTransport = KafkaConnectorTransport
.<GenericRecord>avroBuilder(
"payments-send-transport",
actorSystem,
"mybank.payments"
)
.build();
// Receive transport - automatically uses KafkaAvroDeserializer when schema.registry.url is configured
var kafkaReceiveTransport = KafkaAckReceiveConnectorTransport
.<GenericRecord>avroBuilder(
"payments-receive-transport",
actorSystem,
"mybank.payments"
)
.build();
Option 3: POJOs with Schema Registry
When using Schema Registry with POJOs (instead of GenericRecord or generated SpecificRecord classes), enable reflection mode in the Confluent serializers.
This approach uses Avro’s ReflectData to automatically derive schemas from POJO classes at runtime.
|
This approach is particularly useful when:
|
Configuration
Enable reflection mode by setting schema.reflection = true:
mybank.payments {
kafka.producer {
topic = "mybank.payments.request"
kafka-clients {
schema.registry.url = "http://schema-registry:8081"
schema.reflection = true (1)
auto.register.schemas = true (2)
}
}
kafka.consumer {
topics = ["mybank.payments.response"]
kafka-clients {
group.id = "mybank-payments-consumer"
schema.registry.url = "http://schema-registry:8081"
schema.reflection = true (1)
}
}
}
| 1 | Enables reflection-based schema derivation from POJO classes |
| 2 | Recommended when using reflection mode to auto-register derived schemas |
Define Your POJO
Create a standard Java class with a no-argument constructor (required for Avro reflection):
@Data
@NoArgsConstructor (1)
@AllArgsConstructor
public class PaymentMessage {
private String id;
private String description;
private int amount;
private String timestamp;
private Map<String, String> metadata; (2)
private List<String> tags;
}
| 1 | A no-argument constructor is required for Avro reflection deserialization |
| 2 | Complex types like Map and List are supported |
Creating the Transport
When using reflection mode, type the transport with your POJO class instead of GenericRecord:
// Send transport typed with your POJO
var kafkaSendTransport = KafkaConnectorTransport
.<PaymentMessage>avroBuilder( (1)
"payments-send-transport",
actorSystem,
"mybank.payments"
)
.build();
// Receive transport typed with your POJO
var kafkaReceiveTransport = KafkaAckReceiveConnectorTransport
.<PaymentMessage>avroBuilder( (1)
"payments-receive-transport",
actorSystem,
"mybank.payments"
)
.build();
| 1 | Use your POJO type instead of GenericRecord |
|
Avro reflection uses |
Building Connectors with Avro Transports
Once you have configured the Avro transport, wire it into your connectors using the standard builder pattern.
Send Connector
SendConnector<MyDomainRequest, GenericRecord> sendConnector = SendConnector
.<MyDomainRequest, GenericRecord>builder(
"payments-send-connector",
"mybank.payments",
actorSystem
)
.withConnectorTransport(kafkaSendTransport) (1)
.withSendTransportMessageConverter(this::toTransportMessage) (2)
.withCorrelationIdExtractor(record -> record.get("correlationId").toString()) (3)
.withCorrelationService(correlationService)
.build();
| 1 | The Avro-configured transport created earlier |
| 2 | Converter that creates a TransportMessage with the Avro GenericRecord as payload |
| 3 | Extract correlation ID from the Avro record |
Receive Connector
ReceiveConnector<MyDomainResponse, GenericRecord> receiveConnector = ReceiveConnector
.<MyDomainResponse, GenericRecord>builder(
"payments-receive-connector",
"mybank.payments",
actorSystem
)
.withReceivingConnectorTransport(kafkaReceiveTransport) (1)
.withReceiveTransportMessageConverter(this::fromTransportMessage) (2)
.withCorrelationIdExtractor(record -> record.get("correlationId").toString()) (3)
.withCorrelationService(correlationService)
.withReceiveHandler(this::handleResponse)
.build();
| 1 | The Avro-configured transport created earlier |
| 2 | Converter that extracts the Avro GenericRecord from the TransportMessage payload |
| 3 | Extract correlation ID from the Avro record for response correlation |
Working with Avro Records
Creating GenericRecords
private TransportMessage toTransportMessage(MyDomainRequest request) {
Schema schema = new Schema.Parser().parse("""
{
"type": "record",
"name": "PaymentRequest",
"namespace": "com.mybank.payments",
"fields": [
{"name": "correlationId", "type": "string"},
{"name": "amount", "type": "double"},
{"name": "currency", "type": "string"}
]
}
""");
GenericRecord record = new GenericData.Record(schema);
record.put("correlationId", request.getCorrelationId());
record.put("amount", request.getAmount());
record.put("currency", request.getCurrency());
return new TransportMessage(new MessageHeaders(), record);
}
Reading GenericRecords
private MyDomainResponse fromTransportMessage(TransportMessage transportMessage) {
GenericRecord record = (GenericRecord) transportMessage.getPayload();
return new MyDomainResponse(
record.get("correlationId").toString(),
(Double) record.get("amount"),
record.get("status").toString()
);
}
Schema Registry Authentication
For secured Schema Registry deployments, configure authentication in kafka-clients:
mybank.payments.kafka.producer.kafka-clients {
schema.registry.url = "https://schema-registry:8081"
basic.auth.credentials.source = "USER_INFO"
basic.auth.user.info = "username:password"
}
See Schema Registry Security for more authentication options.
Further Reading
-
Kafka Quickstart for basic Kafka setup
-
Asynchronous Request-Reply for correlation patterns