Kafka Inicio rápido

A menudo puede desear configurar un par de conectores: uno para enviar y otro para recibir mensajes de/a un destino. Esta guía explica cómo hacer esto utilizando Kafka.

Para ejemplos más completos sobre el uso de conectores para consumir y producir mensajes en general, consulte Solicitud-Respuesta Asincrónica.

Paso 1: Agregue connector-kafka dependencia

La dependencia que debe agregar a su pom.xml is:

<dependency>
    <groupId>com.iconsolutions.ipf.core.connector</groupId>
    <artifactId>connector-kafka</artifactId>
</dependency>

Si está importando el Icon BOM, o utilizando el Icon BOM como padre, no es necesario proporcionar una versión separada.

Paso 2: Config

Configuración del conector-en general-es altamente impulsado por la configuración. La configuración nos permite especificar:

  • Nombres de temas

  • Configuración del consumidor/productor de Kafka

  • Reiniciar configuraciones (en caso de fallo)

Aquí tiene un ejemplo de un bloque de configuración para un par de conectores de envío y recepción que conectaremos a nuestro `ConnectorTransport`s, para el sistema de reservas de un banco ficticio que presenta un tema de solicitud y respuesta:

mybank.booking { (1)
  kafka.producer {
    topics {
      request-topic = "mybank.booking.request" (2)
    }
    kafka-clients {
      compression.type = lz4 (4)
    }
  }
  kafka.consumer {
    topics {
      response-topic = "mybank.booking.response" (3)
    }
    kafka-clients {
      group.id = ipf (4)
    }
  }
}
1 Esto se conoce como la ruta raíz de configuración y se hará referencia en el código. Indica dónde en la aplicación configuración para buscar esto connector transport la configuración de 's
2 El request-topic la clave también será referenciada en el código. Esta capacidad existe si desea referirse a múltiples temas dentro de un común Kafka bloque de configuración. Alternativamente, puede utilizar simplemente topic = "mybank.booking.request" en lugar de un discreto topics bloque de configuración.
3 El response-topic, igual que <2> <4>`kafka-clients` le permite especificar cualquier estándar Kafka configuraciones de productor o consumidor según lo documentado aquí(productor) y aquí(consumidor)

Paso 2. 1: Configuración Común

Puede haber notado que no especificamos el bootstrap.servers propiedad. Esto se debe a que-por defecto-el Kafka la configuración que especifique aquí se revertirá a la akka.kafka.consumer y akka.kafka.producer ajustes.

Lo que esto significa es que usted puede simplemente especificar dos configuraciones para configurar globalmente todos Kafka Servidores Bootstrap para todos Kafka consumidores y productores. Por ejemplo:

akka.kafka.consumer.kafka-clients.bootstrap.servers="kafka:9092"
akka.kafka.producer.kafka-clients.bootstrap.servers="kafka:9092"

Esto es lo mismo para todas las demás configuraciones de consumidor y productor, por ejemplo, TLS, compresión, tamaño del búfer.

Si-por ejemplo-uno específico Kafka el productor o consumidor consume/produce de/a un tema diferente, puede ser configurado como tal:

akka.kafka.consumer.kafka-clients.bootstrap.servers="kafka:9092"
akka.kafka.producer.kafka-clients.bootstrap.servers="kafka:9092"
mybank.booking.kafka.producer.kafka-clients.bootstrap.servers="someotherkafka:9094"

La configuración anterior significa que todos Kafka consumidores y productores-aparte del productor de reservas-irá a kafka:9092, y el productor de reservas irá a someotherkafka:9094.

Paso 3: Crear Send Connector Transporte

La mejor manera de crear un SendingConnectorTransport para Kafka que utiliza cadenas como tipos de clave y valor es a través del stringBuilder:

var kafkaConnectorTransport = KafkaConnectorTransport.stringBuilder(
        "accounts-booking-send-transport", (1)
        actorSystem, (2)
        "mybank.booking" (3)
).build();

Como puede ver, este constructor requiere solo tres cosas:

1 El ConnectorTransport nombre; utilizar nombres significativos para los conectores facilitará la resolución de problemas
2 El ActorSystem
3 La Ruta Raíz de Configuración; utilizando el mybank.booking clave de configuración definida en Step 2
También puede crear el transporte utilizando el constructor normal, que es la forma recomendada en caso de que desee utilizar tipos que no sean cadenas.
var producerConfig = AlpakkaConfigProvider.getProducerConfig(classicActorSystemProvider.classicSystem(), "mybank.booking");(1)
var config = new StringSerializerProducerConfig(producerConfig);

var kafkaConnectorTransport = KafkaConnectorTransport.<String, String>builder()
        .withName("accounts-booking-send-transport") (2)
        .withTopicName(config.topic())
        .withProducerSettings(config.producerSettings()) (3)
        .withRestartSettings(config.restartSettings()) (4)
        .withHealthCheckSettings(config.healthCheckSettings()) (4)
        .withProducerRecordKeyValueProvider(new StringProducerRecordKeyValueProvider()) (5)
        .withActorSystem(actorSystem)
        .build();
1 Usando el mybank.booking clave de configuración definida en Step 2
2 Asigne un nombre significativo al transporte
3 Estos son los ProducerSettings de la kafka-clients bloque
4 Prefiera la verificación de salud y restart configuración de los ajustes a menos que haya una razón muy convincente para especificar los suyos propios
5 Es a través de la ProducerRecordKeyValueProvider instancia en la que podrá elegir tipos de clave-valor para su Kafka mensajes

Paso 4: Crear Receive Connector Transporte

La mejor manera de crear un ReceivingConnectorTransport para Kafka que utiliza cadenas como tipos de clave y valor es a través del stringBuilder:

var kafkaAckReceiveConnectorTransport = KafkaAckReceiveConnectorTransport.stringBuilder(
        "accounts-booking-receive-transport", (1)
        actorSystem, (2)
        "mybank.booking" (3)
).build();

Como el constructor que se encuentra en KafkaConnectorTransport, este constructor acepta los mismos tres parámetros:

1 El nombre del ReceivingConnectorTransport; utilizar nombres significativos para los conectores facilitará la resolución de problemas.
2 El ActorSystem
3 La Ruta Raíz de Configuración; utilizando el mybank.booking clave de configuración definida en Step 2
También puede crear el transporte utilizando el constructor normal, que es la forma recomendada en caso de que desee utilizar tipos que no sean cadenas.
var consumerConfig = AlpakkaConfigProvider.getConsumerConfig(classicActorSystemProvider.classicSystem(), "mybank.booking");(1)
var config = new StringDeserializerConsumerConfig(consumerConfig);

var kafkaAckReceiveConnectorTransport = KafkaAckReceiveConnectorTransport.<String, String>builder()
                        .withName("accounts-booking-receive-transport") (2)
                        .withActorSystem(classicActorSystemProvider)
                        .withConsumerSettings(config.consumerSettings()) (3)
                        .withTopics(config.topic("response-topic")) (4)
                        .withRestartSettings(config.restartSettings()) (5)
                        .withMetadataClientSettings(config.healthCheckSettings()) (5)
                        .build();
1 Usando el mybank.booking clave de configuración definida en Step 2
2 Asigne un nombre significativo al transporte
3 Estos son los ConsumerSettings de la kafka-clients bloque
4 Captura response-topic from the topics bloque. Si solo utiliza un tema, puede reemplazar esto con solo una llamada a topic()
5 Prefiera la verificación de salud y restart configuración de los ajustes a menos que haya una razón muy convincente para especificar los suyos propios