Connector Transports

The purpose of connectors is to abstract away the details of the underlying transport by placing the implementation behind a common set of interfaces. This enables us to plug in any transport mechanism that implements the required interfaces for sending and receiving messages and the connector core library takes care of all other aspects, such as mapping, validation, logging, etc.

When creating a new connector transport there are two interfaces that need to be implemented. These are SendingConnectorTransport and ReceivingConnectorTransport.

The connector library provides some of the most commonly used transport mechanisms "out of the box". These are for Kafka, JMS, HTTP and Filesystem, though custom transports can be developed with relative ease, and they should "just work" with the connector library.

Receiving Transports

Receiving connector transports implement the ReceivingConnectorTransport interface, that has the following signature.

public interface ReceivingConnectorTransport extends OperableConnectorTransport {

    void startProcessingMessagesVia(Flow<ReceivedMessage<TransportMessage>, ReceivedMessage<Void>, ? extends KillSwitch> receiverFlow, Criteria filterCriteria);

    default void acknowledge(ReceivedMessage<?> receivedMessage) {
    }

}

A well implemented startProcessingMessagesVia method takes an unmaterialised flow and filter criteria. If filter criteria is null it will pass any received messages to the flow. If filter criteria is specified, only messages which fulfil the criteria will be passed down the flow.

The acknowledge method can optionally be implemented if the transport mechanism needs to acknowledge the receipt of a message.

Sending Transports

Sending connector transports implement the SendingConnectorTransport interface, that has the following signature.

public interface SendingConnectorTransport<T> extends OperableConnectorTransport {

    void startProcessingMessages(Integer maxConcurrentOffers);

    CompletionStage<DeliveryOutcome> offer(MessageDelivery<T> messageDelivery);
}

The generic type T is "Target Type", i.e. the type of message to be sent to the target system, before serialisation.

The offer method takes a MessageDelivery<T> and returns a future containing the outcome of the message send. Typically, the offer method takes the message to send and adds it to an internal queue of messages that are to be sent to the transport mechanism.

The startProcessingMessage method should enable message to be sent via the transport, in most cases this is where the transport flow is materialised.

Custom Connector Transports

To develop a custom transport, some basic knowledge about Akka Streams is required since this is ultimately what is used to drive the flow of messages through the system. For an introduction to Akka Streams, consider the Lightbend Akka Streams for Java course that is provided by the Lightbend Academy.

Transport Exceptions

A NoAvailableTransportException will be thrown if:

  • the circuit breaker state is OPEN for all available transports, OR

  • all available transports have been exhausted when attempting to send a message.