Documentation for a newer release is available. View Latest

Receiving Connector

Receiving connectors are responsible for processing messages received on a configured transport and transforming the message into a known type, before delegating the handling of the message to the client.

All receiving connectors implement the ReceivingConnector interface, which itself extends OperableConnector.

public interface ReceivingConnector extends OperableConnector {

    ReceivingConnectorConfig getConfig();

}

public interface OperableConnector {

    /**
     * Retrieve name of the connector.
     */
    String getName();

    /**
     * Starts the connector.
     */
    void start();

    /**
     * Starts the connector's health check procedure.
     */
    void startHealthCheck();

    ConnectorHealth getHealth();

    /**
     * Shuts down the connector.
     *
     * @param reason the reason for shutdown
     */
    CompletionStage<Void> shutdown(ShutdownReason reason);

    /**
     * Returns the connector's running status
     */
    boolean isRunning();

    /**
     * Returns the connector's configuration.
     */
    ConnectorConfig getConfig();

    /**
     * Returns all the connector's transports.
     */
    List<? extends OperableConnectorTransport> getTransports();

    /**
     * Abstraction of a connector's configuration.
     */
    interface ConnectorConfig {

        String getConfigRoot();
    }
}

Generic Type T

The ReceivingConnector interface is not concerned with types, though the default implementation ReceiveConnector<T> is generically typed with type T, where T is short for 'target type'. Messages received in the transport layer are passed to the connector wrapped in a TransportMessage object and are converted to the target type as soon as possible. This allows future stages to work with a known type which can optionally be validated to ensure data integrity.

Stages

The connector implementation uses Akka Streams. Connectors are comprised of a number of configurable stages that are executed asynchronously, where each stage performs some transformation or checks to ensure the messages are valid before delegating control back to the client to handle the received message.

The image below roughly describes the stage-based approach when receiving messages. Note that some stages are optional and will be skipped if they are not configured when connector is built.

graphviz-diagram

The following sections will briefly cover each stage.

Filtering

Some connector transports such as JMS already have the functionality to filter a message (using JMS Selectors), but others such as Kafka or HTTP don’t. For this reason the Connector framework offers a filtering functionality.

To learn how to filter messages, see the Filtering documentation.

Payload Decryption

Sometimes transport encryption protocols, such as TLS are not sufficient. In these cases application level encryption can be applied to the messages themselves. When receiving an encrypted message, its payload must be decrypted before we can transform it to the target type.

To learn how to configure decryption, see the Encryption documentation.

Convert to Target Type

The payload found in the TransportMessage object passed from the transport layer to the connector is untyped. The connector is responsible for mapping the payload type into a known type. This can be achieved by providing an implementation of the ReceiveTransportMessageConverter when building a receiving connector.

@FunctionalInterface
public interface ReceiveTransportMessageConverter<T> {
    T convert(TransportMessage transportMessage);
}

Message Association

Received messages are either a response to a request we made earlier or a new request. When setting up a connector to receive messages we typically know which of these scenarios is expected. If a message is a response to a previously sent request, then it should be correlated back to the original message using the correlation service. Otherwise, if the message is a request we should generate (or extract) an identifier for the message so that we can correlate it later.

To learn more about message association, see the Message Association documentation.

Message Logging

Message logging is a common requirement when building integrations across multiple systems, as it can aid in debugging or provide a comprehensive audit of all messaging. Connectors can optionally be configured to log messages by providing an implementation of the MessageLogger interface.

Message logging operates in three ways:

  • Backpressured, ignores failures (default)

  • Backpressured, fails the stage if logging fails

  • Fire-and-forget (does not respect backpressure)

You can change the default behaviour when creating a ReceiveConnector by using withMessageLoggingBehaviour.

To learn more about message logging, see the Message Logging documentation.

Message Validation

When receiving messages, it often makes sense to make sure the message is valid before continuing processing. This can be in terms of schema validation or business logic validation. Connectors can optionally be configured to validate message by providing an implementation of the BeanValidator interface.

To learn more about message validation, see the Message Validation documentation.

Message Handling

The final and arguably most important stage in the receiver flow is the message handling stage. This is where the message, after passing through decryption, validation, association and logging can finally be handled by the client code.

As the messages flow into the message handling stage, they are partitioned according to their UnitOfWorkId. Up to receiver-parallelism partitions are allowed to be processed in parallel, with each partition allowing up to receiver-parallelism-per-partition messages to be handled in parallel. By default, no per-partition parallelism is allowed, thus ensuring the proper message order per UnitOfWorkId. If the assigned message handler doesn’t need to preserve message order but needs the additional throughput, receiver-parallelism-per-partition can be safely increased to any value less than receiver-parallelism.

Handling the message is done by providing an implementation of the ReceiveHandler functional interface.

@FunctionalInterface
public interface ReceiveHandler<T> {
    CompletionStage<Void> handle(ReceivingContext receivingContext, T payload);
}

The handle function takes a ReceivingContext and the generically typed payload and returns a void future.

Resilience

The resilience of a given receive connector is fairly configurable and relies on two concepts: automated retries and a dead letter channel.

The image below shows some potential scenarios that may be encountered when processing a message and the mechanisms used to handle them.

receive connector resiliency

Retries

It is possible to configure a retry policy for receiving connectors by supplying an instance of ResiliencySettings. For more details, see the Resilience documentation.

The resilience settings are used to distinguish between recoverable (retryable) exceptions and their counterparts. If an exception is deemed recoverable, retries will be attempted according to the configuration.

Once the retries have been exhausted and no recovery was possible, the error and the connector message that caused it are then passed to the dead letter channel for further processing and the message is NACKd (negatively acknowledged).

Dead Letter

The dead letter channel component is represented by the DeadletterAppender interface.

@FunctionalInterface
public interface DeadletterAppender {
    CompletionStage<Void> append(ReceiveConnectorException receiveConnectorException);
}

The general idea behind the appender is to allow clients to support potentially complex processing that may need to happen when messages fail to be received. Such as persisting them in an external storage, scheduling their reprocessing with a distributed scheduler, etc.