Connector Transport s

El propósito de los conectores es abstraer los detalles del transporte subyacente al colocar la implementación detrás de un conjunto común de interfaces. Esto nos permite conectar cualquier mecanismo de transporte que implemente las interfaces requeridas para enviar y recibir mensajes, y la biblioteca central del conector se encarga de todos los demás aspectos, tales como mapping, validación, registro, etc.

Al crear un nuevo transporte de conector, hay dos interfaces que deben ser implementadas. Estos son SendingConnectorTransport y ReceivingConnectorTransport.

La biblioteca de conectores proporciona algunos de los mecanismos de transporte más comúnmente utilizados "listos para usar". Estos son para Kafka,JMS, HTTP y el sistema de archivos, aunque custom Los transportes pueden desarrollarse con relativa facilidad, y deben "simplemente funcionar" con la biblioteca de conectores.

Recepción de Transportes

Los conectores de recepción implementan el ReceivingConnectorTransport interfaz, que tiene la siguiente firma.

public interface ReceivingConnectorTransport extends OperableConnectorTransport {

    void startProcessingMessagesVia(Flow<ReceivedMessage<TransportMessage>, ReceivedMessage<Void>, ? extends KillSwitch> receiverFlow, Criteria filterCriteria);

    default void acknowledge(ReceivedMessage<?> receivedMessage) {
    }

}

Una implementación bien realizada startProcessingMessagesVia el método toma un flujo no materializado y criterios de filtrado. Si los criterios de filtrado son nulos, se pasarán todos los mensajes recibidos al flujo. Si se especifican criterios de filtrado, solo se procesarán los mensajes que cumplan los criterios se transmitirán a lo largo del flujo.

El método de reconocimiento puede implementarse opcionalmente si el mecanismo de transporte necesita confirmar la recepción de un mensaje.

Envío de Transportes

El envío de transportes de conector implementa el SendingConnectorTransport interfaz, que tiene la siguiente firma.

public interface SendingConnectorTransport<T> extends OperableConnectorTransport {

    void startProcessingMessages(Integer maxConcurrentOffers);

    CompletionStage<DeliveryOutcome> offer(MessageDelivery<T> messageDelivery);
}

El tipo genérico T es "Tipo de Destino", es decir, el tipo de mensaje que se enviará al sistema de destino, antes de la serialización.

El offer método toma un MessageDelivery<T> y devuelve un futuro que contiene el resultado del envío del mensaje. Típicamente, el método de oferta toma el mensaje a enviar y lo añade a una cola interna de mensajes que deben ser enviados al mecanismo de transporte.

El startProcessingMessage el método debe permitir que el mensaje sea enviado a través del transporte; en la mayoría de los casos, aquí es donde se materializa el flujo de transporte.

Personalizado Connector Transport s

Para desarrollar un custom transporte, algunos conocimientos básicos sobre Akka Streams es necesario ya que esto es, en última instancia, lo que se utiliza para impulsar el flujo de mensajes a través del sistema. Para una introducción a Akka Streams, considere el Lightbend Akka Streams para Java curso que es proporcionado por la Academia Lightbend.

Excepciones de Transporte

A NoAvailableTransportException se lanzará si:

  • el estado del interruptor automático es OPEN para todos los transportes disponibles, O bien

  • todos los transportes disponibles se han agotado al intentar enviar un mensaje.