Documentation for a newer release is available. View Latest

Message Logging

In many applications there are often requirements around keeping a record of every message, this could be for monitoring, auditing, or data warehousing purposes. Regardless of the use case, the connector library provides an optional message logging facility that will publish both sent and received messages.

Logged messages are published using the MessageLogEntry class defined in the message-logger-api.

Message Logger Interface

To add message logging, an implementation of the MessageLogger functional interface must be provided when building the connector. The provided implementation can be more complex than demonstrated here and may publish to a database or a queue.

MessageLogger messageLogger = messageLogEntry -> {
    log.info("logging message: {}", messageLogEntry);
};
initiatingReceiveConnectorBuilder()
        .withMessageLogger(messageLogger);
To have the emitted message log entries utilise the Checkpointing behaviour, you must supply an implementation of the CheckpointAwareMessageLogger interface to your connector.

Supporting context

The MessageLogEntryEnricher class contains an element called supportingData. This has a key of type String and a value of type Object, and gets populated with a combination of header elements that were added to the TransportMessage and then any passed supportingData, in that order.

Any key clashes between the two maps will be resolved by adding header details as they are, and clashing supportingData elements will get a supportingData- prefix, i.e. supportingData-kafka-key.

Message Log Entry Enrichment

MessagesLogEntry objects can optionally be enriched before they are published by the MessageLogger.

public interface MessageLogEntryEnricher<T> {
    void enrich(ConnectorMessage<T> connectorMessage, MessageLogEntry messageLogEntry);
}

To add message log entry enrichment, provide an implementation of the MessageLogEntryEnrichment functional interface when building the connector.

MessageLogEntryEnricher<ExampleType> messageLogEntryEnricher = (connectorMessage, messageLogEntry)
        -> messageLogEntry.setSupportingData(Map.of("enriched", "true"));
initiatingReceiveConnectorBuilder()
        .withMessageLogger(messageLogger)
        .withMessageLogEntryEnricher(messageLogEntryEnricher);