Kafka Quickstart

Often you might want to configure a pair of connectors: one to send and one to receive messages to/from a destination. This guide explains how to do this using Kafka.

For more complete examples of using connectors to consume and produce messages in general, see Asynchronous Request-Reply.

Step 1: Add connector-kafka dependency

The dependency to add to your pom.xml is:

<dependency>
    <groupId>com.iconsolutions.ipf.core.connector</groupId>
    <artifactId>connector-kafka</artifactId>
</dependency>

If importing the Icon BOM, or using the Icon BOM as a parent, there’s no need to supply a separate version.

Step 2: Config

Connector configuration - in general - is heavily config-driven.The configuration allows us to specify:

  • Topic names

  • Kafka consumer/producer settings

  • Restart settings (on failure)

Here’s an example of a configuration block for a pair of sending and receiving connectors which we will wire into our ConnectorTransport s, for a fictional bank’s booking system which features a request and response topic:

mybank.booking { (1)
  kafka.producer {
    topics {
      request-topic = "mybank.booking.request" (2)
    }
    kafka-clients {
      compression.type = lz4 (4)
    }
  }
  kafka.consumer {
    topics {
      response-topic = "mybank.booking.response" (3)
    }
    kafka-clients {
      group.id = ipf (4)
    }
  }
}
1 This is known as the config root path and will be referenced in the code. It indicates where in the application’s configuration to look for this connector transport’s settings
2 The request-topic key will also be referenced in the code. This capability exists if you wish to refer to multiple topics within a common Kafka configuration block. Alternatively, you can just use topic = "mybank.booking.request" instead of a discrete topics configuration block.
3 The response-topic, same as <2>
4 kafka-clients allows you to specify any standard Kafka producer or consumer settings as documented here (producer) and here (consumer)

Step 2.1: Common Config

You may have noticed that we did not specify the bootstrap.servers property. This is because - by default - the Kafka configuration you specify here will fall back to the akka.kafka.consumer and akka.kafka.producer settings.

What this means is that you can simply specify two settings to globally configure all Kafka Bootstrap Servers for all Kafka consumers and producers. For example:

akka.kafka.consumer.kafka-clients.bootstrap.servers="kafka:9092"
akka.kafka.producer.kafka-clients.bootstrap.servers="kafka:9092"

This is the same for all other consumer and producer settings, e.g. TLS, compression, buffer size.

If - for example - one specific Kafka producer or consumer consumes/produces to/from a different topic, it can be configured as such:

akka.kafka.consumer.kafka-clients.bootstrap.servers="kafka:9092"
akka.kafka.producer.kafka-clients.bootstrap.servers="kafka:9092"
mybank.booking.kafka.producer.kafka-clients.bootstrap.servers="someotherkafka:9094"

The above configuration means that all Kafka consumers and producers - apart from the booking producer - will go to kafka:9092, and the booking producer will go to someotherkafka:9094.

Step 3: Create Send Connector Transport

The best way to create a SendingConnectorTransport for Kafka that uses strings as key and value types is via the stringBuilder:

var kafkaConnectorTransport = KafkaConnectorTransport.stringBuilder(
        "accounts-booking-send-transport", (1)
        actorSystem, (2)
        "mybank.booking" (3)
).build();

As you can see, this builder requires only three things:

1 The ConnectorTransport name; using meaningful names for connectors will make troubleshooting easier
2 The ActorSystem
3 The Config Root Path; using the mybank.booking configuration key defined in Step 2

You can also create the transport using the normal builder, which is the recommended way to go in case you want to use non-string types.

var producerConfig = AlpakkaConfigProvider.getProducerConfig(classicActorSystemProvider.classicSystem(), "mybank.booking");(1)
var config = new StringSerializerProducerConfig(producerConfig);

var kafkaConnectorTransport = KafkaConnectorTransport.<String, String>builder()
        .withName("accounts-booking-send-transport") (2)
        .withTopicName(config.topic())
        .withProducerSettings(config.producerSettings()) (3)
        .withRestartSettings(config.restartSettings()) (4)
        .withHealthCheckSettings(config.healthCheckSettings()) (4)
        .withProducerRecordKeyValueProvider(new StringProducerRecordKeyValueProvider()) (5)
        .withActorSystem(actorSystem)
        .build();
1 Using the mybank.booking configuration key defined in Step 2
2 Give the transport a meaningful name
3 These are the ProducerSettings from the kafka-clients block
4 Prefer health check and restart settings from config unless there’s a very compelling reason to specify your own
5 It is through the ProducerRecordKeyValueProvider instance that you will be able to choose key-value types for your Kafka messages

Step 4: Create Receive Connector Transport

The best way to create a ReceivingConnectorTransport for Kafka that uses strings as key and value types is via the stringBuilder:

var kafkaAckReceiveConnectorTransport = KafkaAckReceiveConnectorTransport.stringBuilder(
        "accounts-booking-receive-transport", (1)
        actorSystem, (2)
        "mybank.booking" (3)
).build();

Like the builder found in KafkaConnectorTransport, this builder accepts the same three parameters:

1 The ReceivingConnectorTransport name; using meaningful names for connectors will make troubleshooting easier
2 The ActorSystem
3 The Config Root Path; using the mybank.booking configuration key defined in Step 2

You can also create the transport using the normal builder, which is the recommended way to go in case you want to use non-string types.

var consumerConfig = AlpakkaConfigProvider.getConsumerConfig(classicActorSystemProvider.classicSystem(), "mybank.booking");(1)
var config = new StringDeserializerConsumerConfig(consumerConfig);

var kafkaAckReceiveConnectorTransport = KafkaAckReceiveConnectorTransport.<String, String>builder()
                        .withName("accounts-booking-receive-transport") (2)
                        .withActorSystem(classicActorSystemProvider)
                        .withConsumerSettings(config.consumerSettings()) (3)
                        .withTopics(config.topic("response-topic")) (4)
                        .withRestartSettings(config.restartSettings()) (5)
                        .withMetadataClientSettings(config.healthCheckSettings()) (5)
                        .build();
1 Using the mybank.booking configuration key defined in Step 2
2 Give the transport a meaningful name
3 These are the ConsumerSettings from the kafka-clients block
4 Grabbing response-topic from the topics block. If only using one topic, you can replace this with just a call to topic()
5 Prefer health check and restart settings from config unless there’s a very compelling reason to specify your own