Using Avro with Kafka Connectors

How do I send and receive Avro messages with Kafka connectors?

Apache Avro is a data serialization framework that provides compact, fast binary data format and rich data structures. This guide explains how to configure Kafka connectors to work with Avro-encoded messages, with or without a Schema Registry.

Overview

The connector library supports two approaches for Avro serialization:

Approach Description When to Use

Self-describing format

Schema is embedded in each message using Avro Object Container format

Simple setups, testing, or when a Schema Registry is not available

Schema Registry

Schema is stored centrally; messages contain only a schema ID reference

Production environments requiring schema evolution, validation, and governance

Dependencies

Add the connector-kafka dependency to your pom.xml:

<dependency>
    <groupId>com.iconsolutions.ipf.core.connector</groupId>
    <artifactId>connector-kafka</artifactId>
</dependency>

The connector-kafka module includes the necessary Avro and Confluent Schema Registry client dependencies.

Option 1: Self-Describing Format (No Schema Registry)

This approach embeds the Avro schema in each message, making messages self-describing. This is useful for development, testing, or scenarios where a Schema Registry is not available.

The self-describing format supports:

  • GenericRecord - dynamic records with schema

  • SpecificRecord - generated Avro classes

  • Plain POJOs - using Avro reflection to derive schema at runtime

Configuration

mybank.payments {
  kafka.producer {
    topic = "mybank.payments.request"
    kafka-clients {
      # No schema.registry.url means self-describing format will be used
    }
  }
  kafka.consumer {
    topics = ["mybank.payments.response"]
    kafka-clients {
      group.id = "mybank-payments-consumer"
      # No schema.registry.url means self-describing format will be used
    }
  }
}

Creating the Send Transport

Use the avroBuilder to create a sending transport:

// Using GenericRecord
var kafkaSendTransport = KafkaConnectorTransport
        .<GenericRecord>avroBuilder(
                "payments-send-transport",
                actorSystem,
                "mybank.payments"
        )
        .build();

// Or using a POJO directly (schema derived via reflection)
var pojoSendTransport = KafkaConnectorTransport
        .<PaymentMessage>avroBuilder( (1)
                "payments-send-transport",
                actorSystem,
                "mybank.payments"
        )
        .build();
1 POJOs require a no-argument constructor for deserialization

Creating the Receive Transport

Use the avroBuilder to create a receiving transport:

var kafkaReceiveTransport = KafkaAckReceiveConnectorTransport
        .<GenericRecord>avroBuilder( (1)
                "payments-receive-transport",
                actorSystem,
                "mybank.payments"
        )
        .build();
1 KafkaAckReceiveConnectorTransport commits the offset for each message after successful processing. Use KafkaReceiveConnectorTransport to enable Kafka’s auto-commit.

Option 2: Using Confluent Schema Registry

For production environments, using a Confluent Schema Registry is recommended. The Schema Registry provides:

  • Centralized schema storage and versioning

  • Schema evolution with compatibility checking

  • Reduced message size (schema ID instead of full schema)

For detailed information about Schema Registry concepts and configuration, see the Schema Registry Fundamentals documentation.

Configuration

Add the schema.registry.url to enable Schema Registry integration:

mybank.payments {
  kafka.producer {
    topic = "mybank.payments.request"
    kafka-clients {
      schema.registry.url = "http://schema-registry:8081" (1)
      # Optional: configure serializer behavior
      # auto.register.schemas = true (2)
    }
  }
  kafka.consumer {
    topics = ["mybank.payments.response"]
    kafka-clients {
      group.id = "mybank-payments-consumer"
      schema.registry.url = "http://schema-registry:8081" (1)
      # Optional: configure deserializer behavior
      # specific.avro.reader = false (3)
    }
  }
}
1 When schema.registry.url is present, the connector automatically uses Confluent Avro Serializers/Deserializers
2 Additional configuration options can be specified; see KafkaAvroSerializerConfig
3 Additional configuration options can be specified; see KafkaAvroDeserializerConfig

Creating the Transport

The transport creation code is identical to the self-describing format. The avroBuilder automatically detects the presence of schema.registry.url and configures the appropriate serializer/deserializer:

// Send transport - automatically uses KafkaAvroSerializer when schema.registry.url is configured
var kafkaSendTransport = KafkaConnectorTransport
        .<GenericRecord>avroBuilder(
                "payments-send-transport",
                actorSystem,
                "mybank.payments"
        )
        .build();

// Receive transport - automatically uses KafkaAvroDeserializer when schema.registry.url is configured
var kafkaReceiveTransport = KafkaAckReceiveConnectorTransport
        .<GenericRecord>avroBuilder(
                "payments-receive-transport",
                actorSystem,
                "mybank.payments"
        )
        .build();

Option 3: POJOs with Schema Registry

When using Schema Registry with POJOs (instead of GenericRecord or generated SpecificRecord classes), enable reflection mode in the Confluent serializers. This approach uses Avro’s ReflectData to automatically derive schemas from POJO classes at runtime.

This approach is particularly useful when:

  • Migrating existing systems to use Schema Registry without rewriting domain models

  • You want to avoid the complexity of Avro code generation in your build process

  • Your team prefers working with familiar Java classes over Avro-specific types

Configuration

Enable reflection mode by setting schema.reflection = true:

mybank.payments {
  kafka.producer {
    topic = "mybank.payments.request"
    kafka-clients {
      schema.registry.url = "http://schema-registry:8081"
      schema.reflection = true (1)
      auto.register.schemas = true (2)
    }
  }
  kafka.consumer {
    topics = ["mybank.payments.response"]
    kafka-clients {
      group.id = "mybank-payments-consumer"
      schema.registry.url = "http://schema-registry:8081"
      schema.reflection = true (1)
    }
  }
}
1 Enables reflection-based schema derivation from POJO classes
2 Recommended when using reflection mode to auto-register derived schemas

Define Your POJO

Create a standard Java class with a no-argument constructor (required for Avro reflection):

@Data
@NoArgsConstructor (1)
@AllArgsConstructor
public class PaymentMessage {
    private String id;
    private String description;
    private int amount;
    private String timestamp;
    private Map<String, String> metadata; (2)
    private List<String> tags;
}
1 A no-argument constructor is required for Avro reflection deserialization
2 Complex types like Map and List are supported

Creating the Transport

When using reflection mode, type the transport with your POJO class instead of GenericRecord:

// Send transport typed with your POJO
var kafkaSendTransport = KafkaConnectorTransport
        .<PaymentMessage>avroBuilder( (1)
                "payments-send-transport",
                actorSystem,
                "mybank.payments"
        )
        .build();

// Receive transport typed with your POJO
var kafkaReceiveTransport = KafkaAckReceiveConnectorTransport
        .<PaymentMessage>avroBuilder( (1)
                "payments-receive-transport",
                actorSystem,
                "mybank.payments"
        )
        .build();
1 Use your POJO type instead of GenericRecord

Avro reflection uses org.apache.avro.reflect.ReflectData to derive schemas from Java classes at runtime. Fields that are static or transient are ignored, and all other inherited fields are included in the schema.

Building Connectors with Avro Transports

Once you have configured the Avro transport, wire it into your connectors using the standard builder pattern.

Send Connector

SendConnector<MyDomainRequest, GenericRecord> sendConnector = SendConnector
        .<MyDomainRequest, GenericRecord>builder(
                "payments-send-connector",
                "mybank.payments",
                actorSystem
        )
        .withConnectorTransport(kafkaSendTransport) (1)
        .withSendTransportMessageConverter(this::toTransportMessage) (2)
        .withCorrelationIdExtractor(record -> record.get("correlationId").toString()) (3)
        .withCorrelationService(correlationService)
        .build();
1 The Avro-configured transport created earlier
2 Converter that creates a TransportMessage with the Avro GenericRecord as payload
3 Extract correlation ID from the Avro record

Receive Connector

ReceiveConnector<MyDomainResponse, GenericRecord> receiveConnector = ReceiveConnector
        .<MyDomainResponse, GenericRecord>builder(
                "payments-receive-connector",
                "mybank.payments",
                actorSystem
        )
        .withReceivingConnectorTransport(kafkaReceiveTransport) (1)
        .withReceiveTransportMessageConverter(this::fromTransportMessage) (2)
        .withCorrelationIdExtractor(record -> record.get("correlationId").toString()) (3)
        .withCorrelationService(correlationService)
        .withReceiveHandler(this::handleResponse)
        .build();
1 The Avro-configured transport created earlier
2 Converter that extracts the Avro GenericRecord from the TransportMessage payload
3 Extract correlation ID from the Avro record for response correlation

Working with Avro Records

Creating GenericRecords

private TransportMessage toTransportMessage(MyDomainRequest request) {
    Schema schema = new Schema.Parser().parse("""
            {
              "type": "record",
              "name": "PaymentRequest",
              "namespace": "com.mybank.payments",
              "fields": [
                {"name": "correlationId", "type": "string"},
                {"name": "amount", "type": "double"},
                {"name": "currency", "type": "string"}
              ]
            }
            """);

    GenericRecord record = new GenericData.Record(schema);
    record.put("correlationId", request.getCorrelationId());
    record.put("amount", request.getAmount());
    record.put("currency", request.getCurrency());

    return new TransportMessage(new MessageHeaders(), record);
}

Reading GenericRecords

private MyDomainResponse fromTransportMessage(TransportMessage transportMessage) {
    GenericRecord record = (GenericRecord) transportMessage.getPayload();

    return new MyDomainResponse(
            record.get("correlationId").toString(),
            (Double) record.get("amount"),
            record.get("status").toString()
    );
}

Schema Registry Authentication

For secured Schema Registry deployments, configure authentication in kafka-clients:

mybank.payments.kafka.producer.kafka-clients {
  schema.registry.url = "https://schema-registry:8081"
  basic.auth.credentials.source = "USER_INFO"
  basic.auth.user.info = "username:password"
}

See Schema Registry Security for more authentication options.