Kafka Quickstart
Often you might want to configure a pair of connectors: one to send and one to receive messages to/from a destination. This guide explains how to do this using Kafka.
For more complete examples of using connectors to consume and produce messages in general, see Asynchronous Request-Reply.
Step 1: Add connector-kafka dependency
The dependency to add to your pom.xml is:
<dependency>
<groupId>com.iconsolutions.ipf.core.connector</groupId>
<artifactId>connector-kafka</artifactId>
</dependency>
If importing the Icon BOM, or using the Icon BOM as a parent, there’s no need to supply a separate version.
Step 2: Config
Connector configuration - in general - is heavily config-driven.The configuration allows us to specify:
-
Topic names
-
Kafka consumer/producer settings
-
Restart settings (on failure)
Here’s an example of a configuration block for a pair of sending and receiving connectors which we will wire into our
ConnectorTransport s, for a fictional bank’s booking system which features a request and response topic:
mybank.booking { (1)
kafka.producer {
topics {
request-topic = "mybank.booking.request" (2)
}
kafka-clients {
compression.type = lz4 (4)
}
}
kafka.consumer {
topics {
response-topic = "mybank.booking.response" (3)
}
kafka-clients {
group.id = ipf (4)
}
}
}
| 1 | This is known as the config root path and will be referenced in the code. It indicates where in the application’s configuration to look for this connector transport’s settings |
| 2 | The request-topic key will also be referenced in the code. This capability exists if you wish to refer to multiple
topics within a common Kafka configuration block. Alternatively, you can just use topic = "mybank.booking.request" instead of a discrete topics configuration block. |
| 3 | The response-topic, same as <2> |
| 4 | kafka-clients allows you to specify any standard Kafka producer or consumer settings as documented here (producer) and here (consumer) |
Step 2.1: Common Config
You may have noticed that we did not specify the bootstrap.servers property. This is because - by default - the Kafka
configuration you specify here will fall back to the akka.kafka.consumer and akka.kafka.producer settings.
What this means is that you can simply specify two settings to globally configure all Kafka Bootstrap Servers for all Kafka consumers and producers. For example:
akka.kafka.consumer.kafka-clients.bootstrap.servers="kafka:9092"
akka.kafka.producer.kafka-clients.bootstrap.servers="kafka:9092"
This is the same for all other consumer and producer settings, e.g. TLS, compression, buffer size.
If - for example - one specific Kafka producer or consumer consumes/produces to/from a different topic, it can be configured as such:
akka.kafka.consumer.kafka-clients.bootstrap.servers="kafka:9092"
akka.kafka.producer.kafka-clients.bootstrap.servers="kafka:9092"
mybank.booking.kafka.producer.kafka-clients.bootstrap.servers="someotherkafka:9094"
The above configuration means that all Kafka consumers and producers - apart from the booking producer - will go to
kafka:9092, and the booking producer will go to someotherkafka:9094.
Step 3: Create Send Connector Transport
The best way to create a SendingConnectorTransport for Kafka that uses strings as key and value types is via the stringBuilder:
var kafkaConnectorTransport = KafkaConnectorTransport.stringBuilder(
"accounts-booking-send-transport", (1)
actorSystem, (2)
"mybank.booking" (3)
).build();
As you can see, this builder requires only three things:
| 1 | The ConnectorTransport name; using meaningful names for connectors will make troubleshooting easier |
| 2 | The ActorSystem |
| 3 | The Config Root Path; using the mybank.booking configuration key defined in Step 2 |
|
You can also create the transport using the normal builder, which is the recommended way to go in case you want to use non-string types.
|
Step 4: Create Receive Connector Transport
The best way to create a ReceivingConnectorTransport for Kafka that uses strings as key and value types is via the stringBuilder:
var kafkaAckReceiveConnectorTransport = KafkaAckReceiveConnectorTransport.stringBuilder(
"accounts-booking-receive-transport", (1)
actorSystem, (2)
"mybank.booking" (3)
).build();
Like the builder found in KafkaConnectorTransport, this builder accepts the same three parameters:
| 1 | The ReceivingConnectorTransport name; using meaningful names for connectors will make troubleshooting easier |
| 2 | The ActorSystem |
| 3 | The Config Root Path; using the mybank.booking configuration key defined in Step 2 |
|
You can also create the transport using the normal builder, which is the recommended way to go in case you want to use non-string types.
|