Documentation for a newer release is available. View Latest

Connector Transports

El propósito de los connectors es abstraer los detalles del transport subyacente colocando la implementación detrás de un conjunto común de interfaces. Esto nos permite enchufar cualquier mecanismo de transport que implemente las interfaces requeridas para enviar y recibir mensajes y la biblioteca núcleo del connector se encarga del resto de aspectos, como mapeo, validación, logging, etc.

Al crear un nuevo connector transport hay dos interfaces que deben implementarse. Estas son SendingConnectorTransport y ReceivingConnectorTransport.

La biblioteca del connector proporciona algunos de los mecanismos de transport más utilizados "listos para usar". Estos son para Kafka, JMS, HTTP y sistema de archivos, aunque se pueden desarrollar transports personalizados con relativa facilidad, y deberían funcionar "tal cual" con la biblioteca del connector.

Receiving Transports

Los receiving connector transports implementan la interfaz ReceivingConnectorTransport, que tiene la siguiente firma.

public interface ReceivingConnectorTransport extends OperableConnectorTransport {

    void startProcessingMessagesVia(Flow<ReceivedMessage<TransportMessage>, ReceivedMessage<Void>, ? extends KillSwitch> receiverFlow, Criteria filterCriteria);

    default void acknowledge(ReceivedMessage<?> receivedMessage) {
    }

}

Un método startProcessingMessagesVia bien implementado recibe un flow no materializado y unos criterios de filtrado. Si los criterios de filtrado son null, pasará cualquier mensaje recibido al flow. Si se especifican criterios, solo los mensajes que cumplan los criterios se pasarán por el flow.

El método acknowledge puede implementarse opcionalmente si el mecanismo de transport necesita reconocer (ack) la recepción de un mensaje.

Sending Transports

Los sending connector transports implementan la interfaz SendingConnectorTransport, que tiene la siguiente firma.

public interface SendingConnectorTransport<T> extends OperableConnectorTransport {

    void startProcessingMessages(Integer maxConcurrentOffers);

    CompletionStage<DeliveryOutcome> offer(MessageDelivery<T> messageDelivery);
}

El tipo genérico T es "Target Type", es decir, el tipo de mensaje que se enviará al sistema destino, antes de la serialización.

El método offer recibe un MessageDelivery<T> y devuelve un future que contiene el resultado del envío del mensaje. Normalmente, el método offer toma el mensaje a enviar y lo añade a una cola interna de mensajes que van a enviarse al mecanismo de transport.

El método startProcessingMessage debe permitir que los mensajes se envíen vía el transport; en la mayoría de los casos aquí es donde se materializa el flow del transport.

Custom Connector Transports

Para desarrollar un transport personalizado se requiere conocimiento básico de Akka Streams, ya que esto es, en última instancia, lo que se usa para impulsar el flujo de mensajes a través del sistema. Para una introducción a Akka Streams, considere el curso Lightbend Akka Streams for Java que ofrece Lightbend Academy.

Transport Exceptions

Se lanzará una NoAvailableTransportException si:

  • el estado del circuit breaker es OPEN para todos los transports disponibles, O

  • todos los transports disponibles se han agotado al intentar enviar un mensaje.