Connector Transport s

El propósito de los conectores es abstraer los detalles del transporte subyacente al colocar la implementación detrás de un conjunto común de interfaces. Esto nos permite conectar cualquier mecanismo de transporte que implemente las interfaces requeridas para enviar y recibir mensajes y el conector.core la biblioteca se encarga de todos los demás aspectos, como mapping, validación, registro, etc.

Al crear un nuevo connector transport hay dos interfaces que deben ser implementadas. Estos son SendingConnectorTransport y ReceivingConnectorTransport.

La biblioteca de conectores proporciona algunos de los mecanismos de transporte más comúnmente utilizados "listos para usar". Estos son para Kafka,JMS, HTTP y el sistema de archivos, aunque custom Los transportes pueden desarrollarse con relativa facilidad, y deben "simplemente funcionar" con la biblioteca de conectores.

Recepción de Transportes

Recepción connector transport s implement the ReceivingConnectorTransport interfaz, que tiene la siguiente firma.

public interface ReceivingConnectorTransport extends OperableConnectorTransport {

    void startProcessingMessagesVia(Flow<ReceivedMessage<TransportMessage>, ReceivedMessage<Void>, ? extends KillSwitch> receiverFlow, Criteria filterCriteria);

    default void acknowledge(ReceivedMessage<?> receivedMessage) {
    }

}

Una implementación bien realizada startProcessingMessagesVia el método toma un flujo no materializado y criterios de filtrado. Si los criterios de filtrado son nulos, se pasarán todos los mensajes recibidos al flujo. Si se especifican criterios de filtrado, solo se procesarán los mensajes que cumplan los criterios se transmitirán a lo largo del flujo.

El método de reconocimiento puede implementarse opcionalmente si el mecanismo de transporte necesita confirmar la recepción de un mensaje.

Envío de Transportes

Sending connector los transportes implementan el `SendingConnectorTransport` interfaz, que tiene la siguiente firma.
public interface SendingConnectorTransport<T> extends OperableConnectorTransport {

    void startProcessingMessages(Integer maxConcurrentOffers);

    CompletionStage<DeliveryOutcome> offer(MessageDelivery<T> messageDelivery);
}

El tipo genérico T es "Tipo de Destino", es decir, el tipo de mensaje que se enviará al sistema de destino, antes de la serialización.

The offer método toma un MessageDelivery<T> y devuelve un futuro que contiene el resultado del envío del mensaje. Típicamente, el método de oferta toma el mensaje a enviar y lo añade a una cola interna de mensajes que deben ser enviados al mecanismo de transporte.

El startProcessingMessage El método debe permitir que el mensaje sea enviado a través del transporte; en la mayoría de los casos, aquí es donde se materializa el flujo de transporte.

Custom Connector Transport s

Para desarrollar un custom transporte, algunos conocimientos básicos sobre Akka Streams es necesario ya que esto es, en última instancia, lo que se utiliza para impulsar el flujo de mensajes a través del sistema. Para una introducción a Akka Streams, considere el Lightbend Akka Streams para Java curso que es proporcionado por la Academia Lightbend.

Excepciones de Transporte

A NoAvailableTransportException se lanzará si:

  • el interruptor automático state is OPEN para todos los transportes disponibles, O bien

  • todos los transportes disponibles se han agotado al intentar enviar un mensaje.