Documentation for a newer release is available. View Latest

Kafka Quickstart

Quite often you might want to configure a pair of connectors: one to send and one to receive messages to/from a destination. This guide explains how to do this using Kafka.

For more complete examples of using connectors to consume and produce messages in general, see Asynchronous Request-Reply.

Step 1: Add connector-kafka dependency

The dependency to add to your pom.xml is:

<dependency>
    <groupId>com.iconsolutions.ipf.core.connector</groupId>
    <artifactId>connector-kafka</artifactId>
</dependency>

If importing the Icon BOM, or using the Icon BOM as a parent, there’s no need to supply a separate version.

Step 2: Config

Connector configuration - in general - is heavily config-driven.The configuration allows us to specify:

  • Topic names

  • Kafka consumer/producer settings

  • Restart settings (on failure)

Here’s an example of a configuration block for a pair of sending and receiving connectors which we will wire into our ConnectorTransport s, for a fictional bank’s booking system which features a request and response topic:

mybank.booking { (1)
  kafka.producer {
    topics {
      request-topic = "mybank.booking.request" (2)
    }
    restart-settings = ${akka.kafka.producer.health-check-settings.restart-settings} (3)
    kafka-clients {
      compression.type = lz4 (5)
    }
  }
  kafka.consumer {
    topics {
      response-topic = "mybank.booking.response" (4)
    }
    restart-settings = ${akka.kafka.producer.health-check-settings.restart-settings}
    kafka-clients {
      group.id = ipf (5)
    }
  }
}
1 This is known as the config root path and will be referenced in the code. It indicates where in the application’s configuration to look for this connector transport’s settings
2 The request-topic key will also be referenced in the code. This capability exists if you wish to refer to multiple topics within a common Kafka configuration block. Alternatively, you can just use topic = "mybank.booking.request" instead of a discrete topics configuration block.
3 The default Akka restart settings are imported here. You can modify this if necessary
4 The response-topic, same as <2>
5 kafka-clients allows you to specify any standard Kafka producer or consumer settings as documented here (producer) and here (consumer)

Step 2.1: Common Config

You may have noticed that we did not specify the bootstrap.servers property. This is because - by default - the Kafka configuration you specify here will fall back to the akka.kafka.consumer and akka.kafka.producer settings.

What this means is that you can simply specify two settings to globally configure all Kafka Bootstrap Servers for all Kafka consumers and producers. For example:

akka.kafka.consumer.kafka-clients.bootstrap.servers="kafka:9092"
akka.kafka.producer.kafka-clients.bootstrap.servers="kafka:9092"

This is the same for all other consumer and producer settings, e.g. TLS, compression, buffer size.

If - for example - one specific Kafka producer or consumer consumes/produces to/from a different topic, it can be configured as such:

akka.kafka.consumer.kafka-clients.bootstrap.servers="kafka:9092"
akka.kafka.producer.kafka-clients.bootstrap.servers="kafka:9092"
mybank.booking.kafka.producer.kafka-clients.bootstrap.servers="someotherkafka:9094"

The above configuration means that all Kafka consumers and producers - apart from the booking producer - will go to kafka:9092, and the booking producer will go to someotherkafka:9094.

Step 3: Create Send Connector Transport

Here’s an example of how a SendConnectorTransport can be created for Kafka:

var producerConfig = AlpakkaConfigProvider.getProducerConfig(classicActorSystemProvider.classicSystem(), "mybank.booking");(1)
var config = new StringSerializerProducerConfig(producerConfig);

var kafkaConnectorTransport = KafkaConnectorTransport.<String, String>builder()
        .withName("accounts-booking-send-transport") (2)
        .withTopicName(config.topic())
        .withProducerSettings(config.producerSettings()) (3)
        .withProducerRecordKeyValueProvider(new StringProducerRecordKeyValueProvider())
        .withRestartSettings(config.restartSettings()) (4)
        .withActorSystem(actorSystem)
        .build();
1 Using the mybank.booking configuration key defined in Step 2
2 Give the ConnectorTransport a meaningful name
3 These are the ProducerSettings from the kafka-clients block
4 Default restart settings, if no overrides are supplied

There is also a KafkaConnectorTransport.stringBuilder method, with less boilerplate code, which expects three arguments:

  • ConnectorTransport name

  • ActorSystem

  • Config Root Path

This example builds the same instance of SendConnectorTransport as in previous example:

var kafkaConnectorTransport = KafkaConnectorTransport.stringBuilder(
        "accounts-booking-send-transport",
        actorSystem,
        "mybank.booking"
).build();

Step 4: Create Receive Connector Transport

Here’s how to create a ConnectorTransport for receiving messages over JMS:

var consumerConfig = AlpakkaConfigProvider.getConsumerConfig(classicActorSystemProvider.classicSystem(), "mybank.booking");(1)
var config = new StringDeserializerConsumerConfig(consumerConfig);

var kafkaAckReceiveConnectorTransport = KafkaAckReceiveConnectorTransport.<String, String>builder()
                        .withName("accounts-booking-receive-transport") (2)
                        .withConsumerSettings(config.consumerSettings()) (3)
                        .withActorSystem(classicActorSystemProvider)
                        .withTopics(config.topic("response-topic")) (4)
                        .withRestartSettings(config.restartSettings()) (5)
                        .build();
1 Using the mybank.booking configuration key defined in Step 2
2 Give the ConnectorTransport a meaningful name
3 These are the ConsumerSettings from the kafka-clients block
4 Grabbing response-topic from the topics block. If only using one topic, you can replace this with just a call to topic()
5 Default restart settings, if no overrides are supplied

As in KafkaConnectorTransport there is the same KafkaReceiveConnectorTransport.stringBuilder method, which expects same arguments.

This example builds the same instance of KafkaAckReceiveConnectorTransport as in previous example:

var kafkaAckReceiveConnectorTransport = KafkaAckReceiveConnectorTransport.stringBuilder(
        "accounts-booking-receive-transport",
        actorSystem,
        "mybank.booking"
).build();