Kafka Inicio rápido
A menudo puede desear configurar un par de conectores: uno para enviar y otro para recibir mensajes de/a un destino. Esta guía explica cómo hacer esto utilizando Kafka.
Para ejemplos más completos sobre el uso de conectores para consumir y producir mensajes en general, consulte Solicitud-Respuesta Asincrónica.
Paso 1: Agregue connector-kafka dependencia
La dependencia que debe agregar a su pom.xml is:
<dependency>
<groupId>com.iconsolutions.ipf.core.connector</groupId>
<artifactId>connector-kafka</artifactId>
</dependency>
Si está importando el Icon BOM, o utilizando el Icon BOM como padre, no es necesario proporcionar una versión separada.
Paso 2: Config
Configuración del conector-en general-es altamente impulsado por la configuración. La configuración nos permite especificar:
-
Nombres de temas
-
Configuración del consumidor/productor de Kafka
-
Reiniciar configuraciones (en caso de fallo)
Aquí tiene un ejemplo de un bloque de configuración para un par de conectores de envío y recepción que conectaremos a nuestro `ConnectorTransport`s, para el sistema de reservas de un banco ficticio que presenta un tema de solicitud y respuesta:
mybank.booking { (1)
kafka.producer {
topics {
request-topic = "mybank.booking.request" (2)
}
kafka-clients {
compression.type = lz4 (4)
}
}
kafka.consumer {
topics {
response-topic = "mybank.booking.response" (3)
}
kafka-clients {
group.id = ipf (4)
}
}
}
| 1 | Esto se conoce como la ruta raíz de configuración y se hará referencia en el código. Indica dónde en la aplicación configuración para buscar esto connector transport la configuración de 's |
| 2 | El request-topic la clave también será referenciada en el código. Esta capacidad existe si desea referirse a múltiples
temas dentro de un común Kafka bloque de configuración. Alternativamente, puede utilizar simplemente topic = "mybank.booking.request" en lugar de un discreto topics bloque de configuración. |
| 3 | El response-topic, igual que <2>
<4>`kafka-clients` le permite especificar cualquier estándar Kafka configuraciones de productor o consumidor según lo documentado aquí(productor) y aquí(consumidor) |
Paso 2. 1: Configuración Común
Puede haber notado que no especificamos el bootstrap.servers propiedad. Esto se debe a que-por defecto-el Kafka
la configuración que especifique aquí se revertirá a la akka.kafka.consumer y akka.kafka.producer ajustes.
Lo que esto significa es que usted puede simplemente especificar dos configuraciones para configurar globalmente todos Kafka Servidores Bootstrap para todos Kafka consumidores y productores. Por ejemplo:
akka.kafka.consumer.kafka-clients.bootstrap.servers="kafka:9092"
akka.kafka.producer.kafka-clients.bootstrap.servers="kafka:9092"
Esto es lo mismo para todas las demás configuraciones de consumidor y productor, por ejemplo, TLS, compresión, tamaño del búfer.
Si-por ejemplo-uno específico Kafka el productor o consumidor consume/produce de/a un tema diferente, puede ser configurado como tal:
akka.kafka.consumer.kafka-clients.bootstrap.servers="kafka:9092"
akka.kafka.producer.kafka-clients.bootstrap.servers="kafka:9092"
mybank.booking.kafka.producer.kafka-clients.bootstrap.servers="someotherkafka:9094"
La configuración anterior significa que todos Kafka consumidores y productores-aparte del productor de reservas-irá a
kafka:9092, y el productor de reservas irá a someotherkafka:9094.
Paso 3: Crear Send Connector Transporte
La mejor manera de crear un SendingConnectorTransport para Kafka que utiliza cadenas como tipos de clave y valor es a través del stringBuilder:
var kafkaConnectorTransport = KafkaConnectorTransport.stringBuilder(
"accounts-booking-send-transport", (1)
actorSystem, (2)
"mybank.booking" (3)
).build();
Como puede ver, este constructor requiere solo tres cosas:
| 1 | El ConnectorTransport nombre; utilizar nombres significativos para los conectores facilitará la resolución de problemas |
| 2 | El ActorSystem |
| 3 | La Ruta Raíz de Configuración; utilizando el mybank.booking clave de configuración definida en Step 2 |
También puede crear el transporte utilizando el constructor normal, que es la forma recomendada en caso de que desee utilizar tipos que no sean cadenas.
var producerConfig = AlpakkaConfigProvider.getProducerConfig(classicActorSystemProvider.classicSystem(), "mybank.booking");(1)
var config = new StringSerializerProducerConfig(producerConfig);
var kafkaConnectorTransport = KafkaConnectorTransport.<String, String>builder()
.withName("accounts-booking-send-transport") (2)
.withTopicName(config.topic())
.withProducerSettings(config.producerSettings()) (3)
.withRestartSettings(config.restartSettings()) (4)
.withHealthCheckSettings(config.healthCheckSettings()) (4)
.withProducerRecordKeyValueProvider(new StringProducerRecordKeyValueProvider()) (5)
.withActorSystem(actorSystem)
.build();
| 1 | Usando el mybank.booking clave de configuración definida en Step 2 |
| 2 | Asigne un nombre significativo al transporte |
| 3 | Estos son los ProducerSettings de la kafka-clients bloque |
| 4 | Prefiera la verificación de salud y restart configuración de los ajustes a menos que haya una razón muy convincente para especificar los suyos propios |
| 5 | Es a través de la ProducerRecordKeyValueProvider instancia en la que podrá elegir tipos de clave-valor para su Kafka mensajes |
Paso 4: Crear Receive Connector Transporte
La mejor manera de crear un ReceivingConnectorTransport para Kafka que utiliza cadenas como tipos de clave y valor es a través del stringBuilder:
var kafkaAckReceiveConnectorTransport = KafkaAckReceiveConnectorTransport.stringBuilder(
"accounts-booking-receive-transport", (1)
actorSystem, (2)
"mybank.booking" (3)
).build();
Como el constructor que se encuentra en KafkaConnectorTransport, este constructor acepta los mismos tres parámetros:
| 1 | El nombre del ReceivingConnectorTransport; utilizar nombres significativos para los conectores facilitará la resolución de problemas. |
| 2 | El ActorSystem |
| 3 | La Ruta Raíz de Configuración; utilizando el mybank.booking clave de configuración definida en Step 2 |
También puede crear el transporte utilizando el constructor normal, que es la forma recomendada en caso de que desee utilizar tipos que no sean cadenas.
var consumerConfig = AlpakkaConfigProvider.getConsumerConfig(classicActorSystemProvider.classicSystem(), "mybank.booking");(1)
var config = new StringDeserializerConsumerConfig(consumerConfig);
var kafkaAckReceiveConnectorTransport = KafkaAckReceiveConnectorTransport.<String, String>builder()
.withName("accounts-booking-receive-transport") (2)
.withActorSystem(classicActorSystemProvider)
.withConsumerSettings(config.consumerSettings()) (3)
.withTopics(config.topic("response-topic")) (4)
.withRestartSettings(config.restartSettings()) (5)
.withMetadataClientSettings(config.healthCheckSettings()) (5)
.build();
| 1 | Usando el mybank.booking clave de configuración definida en Step 2 |
| 2 | Asigne un nombre significativo al transporte |
| 3 | Estos son los ConsumerSettings de la kafka-clients bloque |
| 4 | Captura response-topic from the topics bloque. Si solo utiliza un tema, puede reemplazar esto con solo una llamada a
topic() |
| 5 | Prefiera la verificación de salud y restart configuración de los ajustes a menos que haya una razón muy convincente para especificar los suyos propios |