Documentation for a newer release is available. View Latest

Inicio rápido de Kafka

A menudo es posible que desee configurar un par de conectores: uno para enviar y otro para recibir mensajes hacia/desde un destino. Esta guía explica cómo hacerlo usando Kafka.

Para ejemplos más completos de uso de connectors para consumir y producir mensajes en general, vea Request-Reply Asíncrono.

Paso 1: Agregar la dependencia connector-kafka

La dependencia que debe agregar a su pom.xml es:

<dependency>
    <groupId>com.iconsolutions.ipf.core.connector</groupId>
    <artifactId>connector-kafka</artifactId>
</dependency>

Si importa el Icon BOM, o usa el Icon BOM como padre, no es necesario proporcionar una versión por separado.

Paso 2: Config

La configuración del connector, en general, está fuertemente impulsada por configuración. La configuración nos permite especificar:

  • Nombres de topics

  • Configuración de consumidor/productor de Kafka

  • Parámetros de reinicio (en caso de fallo)

Aquí hay un ejemplo de un bloque de configuración para un par de sending y receiving connectors que conectaremos con nuestros `ConnectorTransport`s, para el sistema de reservas de un banco ficticio que presenta un topic de solicitud y otro de respuesta:

mybank.booking { (1)
  kafka.producer {
    topics {
      request-topic = "mybank.booking.request" (2)
    }
    kafka-clients {
      compression.type = lz4 (4)
    }
  }
  kafka.consumer {
    topics {
      response-topic = "mybank.booking.response" (3)
    }
    kafka-clients {
      group.id = ipf (4)
    }
  }
}
1 Esto se conoce como la ruta raíz de configuración y se referenciará en el código. Indica dónde, en la configuración de la aplicación, buscar la configuración de este connector transport
2 La clave request-topic también se referenciará en el código. Esta capacidad existe si desea referirse a múltiples topics dentro de un bloque de configuración común de Kafka. Alternativamente, puede usar topic = "mybank.booking.request" en lugar de un bloque topics independiente.
3 El response-topic, igual que <2>
4 kafka-clients le permite especificar cualquier configuración estándar de productor o consumidor de Kafka como se documenta aquí (productor) y aquí (consumidor)

Paso 2.1: Configuración común

Puede haber notado que no especificamos la propiedad bootstrap.servers. Esto se debe a que, de forma predeterminada, la configuración de Kafka que especifique aquí heredará de los ajustes akka.kafka.consumer y akka.kafka.producer.

Esto significa que puede simplemente especificar dos ajustes para configurar globalmente todos los Kafka Bootstrap Servers para todos los consumidores y productores de Kafka. Por ejemplo:

akka.kafka.consumer.kafka-clients.bootstrap.servers="kafka:9092"
akka.kafka.producer.kafka-clients.bootstrap.servers="kafka:9092"

Esto es igual para todos los demás ajustes de consumidor y productor, por ejemplo TLS, compresión, tamaño de buffer.

Si, por ejemplo, un productor o consumidor específico consume/produce hacia/desde un topic diferente, puede configurarse así:

akka.kafka.consumer.kafka-clients.bootstrap.servers="kafka:9092"
akka.kafka.producer.kafka-clients.bootstrap.servers="kafka:9092"
mybank.booking.kafka.producer.kafka-clients.bootstrap.servers="someotherkafka:9094"

La configuración anterior significa que todos los consumidores y productores de Kafka, excepto el productor de booking, irán a kafka:9092, y el productor de booking irá a someotherkafka:9094.

Paso 3: Crear el Send Connector Transport

La mejor manera de crear un SendingConnectorTransport para Kafka que use strings como tipos de clave y valor es mediante el stringBuilder:

var kafkaConnectorTransport = KafkaConnectorTransport.stringBuilder(
        "accounts-booking-send-transport", (1)
        actorSystem, (2)
        "mybank.booking" (3)
).build();

Como puede ver, este builder solo requiere tres cosas:

1 El nombre del ConnectorTransport; usar nombres significativos para los connectors facilitará la resolución de problemas
2 El ActorSystem
3 La ruta raíz de configuración; usando la clave de configuración mybank.booking definida en Paso 2

También puede crear el transport usando el builder normal, que es la forma recomendada en caso de que quiera usar tipos que no sean string.

var producerConfig = AlpakkaConfigProvider.getProducerConfig(classicActorSystemProvider.classicSystem(), "mybank.booking");(1)
var config = new StringSerializerProducerConfig(producerConfig);

var kafkaConnectorTransport = KafkaConnectorTransport.<String, String>builder()
        .withName("accounts-booking-send-transport") (2)
        .withTopicName(config.topic())
        .withProducerSettings(config.producerSettings()) (3)
        .withRestartSettings(config.restartSettings()) (4)
        .withHealthCheckSettings(config.healthCheckSettings()) (4)
        .withProducerRecordKeyValueProvider(new StringProducerRecordKeyValueProvider()) (5)
        .withActorSystem(actorSystem)
        .build();
1 Usando la clave de configuración mybank.booking definida en Paso 2
2 Dé al transport un nombre significativo
3 Estos son los ProducerSettings del bloque kafka-clients
4 Prefiera health check y restart settings desde configuración salvo que haya una razón de peso para especificar los suyos
5 Es a través de la instancia ProducerRecordKeyValueProvider que podrá elegir los tipos key-value de sus mensajes de Kafka

Paso 4: Crear el Receive Connector Transport

La mejor manera de crear un ReceivingConnectorTransport para Kafka que use strings como tipos de clave y valor es mediante el stringBuilder:

var kafkaAckReceiveConnectorTransport = KafkaAckReceiveConnectorTransport.stringBuilder(
        "accounts-booking-receive-transport", (1)
        actorSystem, (2)
        "mybank.booking" (3)
).build();

Como el builder que se encuentra en KafkaConnectorTransport, este builder acepta los mismos tres parámetros:

1 El nombre del ReceivingConnectorTransport; usar nombres significativos para los connectors facilitará la resolución de problemas
2 El ActorSystem
3 La ruta raíz de configuración; usando la clave de configuración mybank.booking definida en Paso 2

También puede crear el transport usando el builder normal, que es la forma recomendada en caso de que quiera usar tipos que no sean string.

var consumerConfig = AlpakkaConfigProvider.getConsumerConfig(classicActorSystemProvider.classicSystem(), "mybank.booking");(1)
var config = new StringDeserializerConsumerConfig(consumerConfig);

var kafkaAckReceiveConnectorTransport = KafkaAckReceiveConnectorTransport.<String, String>builder()
                        .withName("accounts-booking-receive-transport") (2)
                        .withActorSystem(classicActorSystemProvider)
                        .withConsumerSettings(config.consumerSettings()) (3)
                        .withTopics(config.topic("response-topic")) (4)
                        .withRestartSettings(config.restartSettings()) (5)
                        .withMetadataClientSettings(config.healthCheckSettings()) (5)
                        .build();
1 Usando la clave de configuración mybank.booking definida en Paso 2
2 Dé al transport un nombre significativo
3 Estos son los ConsumerSettings del bloque kafka-clients
4 Obteniendo response-topic del bloque topics. Si solo usa un topic, puede reemplazar esto por una llamada a topic()
5 Prefiera health check y restart settings desde configuración salvo que haya una razón de peso para especificar los suyos