Error Handling

Errors are an inevitable consequence when processing messages from external systems over unreliable networks. The connector library attempts to mitigate these with error handling strategies, which differ depending on the nature and context of the error in question.

The main distinguishing factor on how errors are handled is whether the error occurs whilst sending or receiving.

Error Handling in Send Connectors

Error handling in the send connectors is mostly focused around handling cases where a message fails to be delivered over the transport. This class of error is transient in nature. To overcome these errors the send connector utilises the resilience4j library to wrap calls with circuit breaking, fallback/routing and retries, all of which can be configured. Refer to the Resilient Message Sending documentation for a more detailed explanation of how this works.

If a message fails to send, despite retries and other resilience strategies, then the message delivery completes exceptionally and the sender must handle the exception themselves as there is nothing more the connector can do without knowing about the client application.

Error Handing in Receive Connectors

Exceptions can occur during receipt of a message, such as failing to deserialize a TransportMessage to the target type for example.

All failed messages are appended to a configurable deadletter, this way the failed message can be acknowledged and processing can continue.

Deadletter Appender

The DeadletterAppender is a functional interface which is called whenever a message fails during processing.

@FunctionalInterface
public interface DeadletterAppender {
    CompletionStage<Void> append(ReceiveConnectorException receiveConnectorException);
}

Providing a DeadletterAppender implementation is optional and if one is not provided the connector will use the default implementation. The default behaviour is to simply log both the failed message and the exception that caused the error.

All failed messages will be provided as an exception that extends ReceiveConnectorException. ReceiveConnectorException wraps the original exception as the cause alongside the received message.

public class ReceiveConnectorException extends ResolvableRootCauseException {
    private final ReceivedMessage<? extends ConnectorMessage<?>> receivedMessage;

    public ReceiveConnectorException(ReceivedMessage<? extends ConnectorMessage<?>> receivedMessage, Throwable e) {
        super(e);
        this.receivedMessage = receivedMessage;
    }

    public ReceiveConnectorException(ReceivedMessage<? extends ConnectorMessage<?>> receivedMessage, String message) {
        super(message);
        this.receivedMessage = receivedMessage;
    }

}

Exception Classification

There are some exceptions that occur due to transient failures, and for these, it may make sense to attempt to recover. However, since the majority of connector functionality is built on passing immutable objects through functions, we can avoid attempting to retry failing operations and move on as fast as possible since the result should never change.

To account for both of these situations, we classify exceptions as recoverable or unrecoverable. Exceptions can be classified as unrecoverable by extending UnrecoverableReceiveConnectorException, a marker class which itself extends the base exception class ReceiveConnectorException. Otherwise, for exceptions which are recoverable, extend ReceiveConnectorException and add the exception to the recoverable-exceptions key in the configuration.

recoverable-exceptions is a list of RecoverableExceptionFilter objects consisting of the following:

When an exception is thrown the entire stack trace is scanned for Recoverable exceptions and if found, then the exception is classified as Recoverable.
key definition example

filter

Regular expression to match against the fully qualified exception name e.g. HttpErrors$HttpServerErrorException

".*\\$HttpServerErrorException"

properties

List of key-value pairs, all of which must be matched in addition to the filter for the exception to be classified as a recoverable exception.

[{"statusCode": "503 Service Unavailable"}]

The default configuration is included below:

recoverable-exceptions = [
  {
    filter = ".*\\$HttpServerErrorException"
    properties = [{"statusCode": "503 Service Unavailable"}]
  },
  {
    filter = ".*\\.ReceiveConnectorException"
  }
]

Acknowledgements

Depending on the transport implementation, acknowledgements may need to be made by the receiving connector. After handling an error the receiving connector will call the acknowledge method on the transport so that processing can continue.

Error Handlers

For each stage we can optionally provide a function which takes a ReceiveConnectorException and returns a CompletionStage<Void>.

    public DefaultMapToConnectorMessageStage(ReceiveTransportMessageConverter<T> transportMessageConverter,
                                             Integer parallelism,
                                             ExecutionContextExecutor dispatcher,
                                             ReceiveErrorHandler doOnError) {
        if (transportMessageConverter == null) {
            throw new IllegalArgumentException("'transportMessageConverter' must not be null");
        }
        if (dispatcher == null) {
            throw new IllegalArgumentException("'dispatcher' must not be null");
        }
        this.transportMessageConverter = transportMessageConverter;
        this.parallelism = Objects.requireNonNullElseGet(parallelism, () -> {
            final int processors = Runtime.getRuntime().availableProcessors();
            log.debug("Using default parallelism of {}", processors);
            return processors;
        });
        this.dispatcher = dispatcher;
        this.doOnError = doOnError;
    }

This makes it easy to add error handling specific to each stage. We can also provide the doOnError parameter to the ReceiveConnector, which will be called on every ReceiveConnectorException in addition to the stage error handler.