Documentation for a newer release is available. View Latest
Esta página no está disponible actualmente en Español. Si lo necesita, póngase en contacto con el servicio de asistencia de Icon (correo electrónico)

Resequencer

The Notification Service is concerned with processing domain events and based on those generating notifications. But due to how the events might arrive and the potential for delays in event arrival, there are potential problematic scenarios that can happen when events arrive out of order. To do this we can reorder those events/message before processing. Reordering of messages before processing is not a new concept; in fact, it’s been documented as the Resequencer pattern in the seminal EIP book almost 20 years ago.

The Resequencer Pattern is used for reordering of messages before processing. Resequencer implementation is done as a local Akka actor created per unitOfWorkId. The actor delegates state-management and decisions regarding whether to handle an envelope to the ResequencerStrategy. Re-sequencing strategies can be implemented via ResequencerStrategy interface that contains common resequencer strategy operations. Handling is in turn delegated to the injected handlers: MDS Object Handler and Process Object Handler. ResequencerGuardian is also an Akka actor that spawns Resequencer actors for a given unitOfWorkId.

The Resequencer roughly performs the following:

notification-service-resequencer-state

Re-sequencing Until all pain.001 MDS Objects Have Been Received

The simplest kind of re-sequencing is to stash all the data envelopes, regardless of their sequence, until all pain.001 MDS objects (group header, instruction, transaction) have been encountered, at which point we continue to pass the data envelopes through as they are incoming. While this kind of re-sequencing will not protect us from lost updates, it should exhibit lower end to end latency than full sequence-based ordering.

Handling of a processing data element roughly conform to the following pseudocode:

encounteredMdsObjects.addAll(envelope.mdsObjects)

if (encounteredMdsObjects.containsAll(PAIN_001, PAIN_001_PAYMENT_INSTRUCTION, PAIN_001_PAYMENT_TRANSACTION)) {
    // do the message processing here and when done
    // unstash the next element if available and start from the top
} else {
    // stash the data element
}

In order to enable this strategy, add the following property to configuration:

payment-status-notification.resequencer.strategy = FULL_PAIN001

Re-sequencing Until all pain.001 MDS Objects and Custom Objects Have Been Received

This behaves in the same way as the above resequencer with the addition of waiting on custom data keys in CustomObjectContainer objects. See configuration in Notification Sender for more information on how to configure this strategy.

In order to enable this strategy, add the following property to configuration:

payment-status-notification.resequencer.strategy = FULL_PAIN001_AND_CUSTOM_DATA

Additionally, this strategy requires keys to be configured, which will be used to look up the custom data keys to wait upon before proceeding to send notifications:

payment-status-notification.resequencer.custom-data-keys = ['BusinessData']

Re-sequencing Until all pain.001 MDS Objects and Pds Objects Have Been Received

This behaves in the same way as the above resequencer with the addition of waiting on pds data keys in PdsObjectContainer objects. See configuration in Notification Sender for more information on how to configure this strategy.

In order to enable this strategy, add the following property to configuration:

payment-status-notification.resequencer.strategy = FULL_PAIN001_AND_PDS_DATA

Additionally, this strategy requires keys to be configured, which will be used to look up the pds data keys to wait upon before proceeding to send notifications:

payment-status-notification.resequencer.pds-data-keys = ['pdsBusinessData']

Re-sequencing Based on IPF Sequences

Since we can have multiple flows executing concurrently, we need to order events according to both a global sequence and a local one. Handling of a processing data element roughly conform to the following pseudocode:

boolean isFirstEventGlobally = dataElement.sequence == 1
    && expectedNextGlobalSequence == 1
    && observedSequencesPerFlow.isEmpty()
// covers the condition above but being explicit about scenarios
// <= operator used here to allow concurrent flows
boolean isFirstEventLocally = dataElement.sequence <= expectedNextGlobalSequence
    && !observedSequencesPerFlow.containsKey(dataElement.flow)
// <= operator used here to allow concurrent flows
boolean isNextEvent = dataElement.sequence <= expectedNextGlobalSequence
    || dataElement.sequence <= (observedSequencesPerFlow.getOrDefault(dataElement.flow, 0) + 1)
if (isFirstEventGlobally || isFirstEventLocally || isNextEvent) {
    // do the message processing here and when done:
    expectedNextGlobalSequence++
    var currentLocalSequence = observedSequencesPerFlow.getOrDefault(dataElement.flow, 1)
    observedSequencesPerFlow.put(dataElement.flow, max(currentLocalSequence, dataElement.sequence))
    // unstash the next element if available and start from the top
} else {
    // stash the data element
}

The above snippet relies on messages being consumed one by one from Kafka, i.e. on receiver parallelism of 1 (current default).

If receive parallelism is enabled (it may need to be for performance reasons, if increasing the sending parallelism is not enough), then we can no longer rely on events from a single flow arriving in sequence and the correctness of the solution for multiple concurrent flows is lessened to a degree in certain scenarios (e.g. one of the concurrent flows enriches a MDS object, its events arrive out of sequence with each other but are accepted due to the other flow increasing the global sequence).

In order to enable this strategy, add the following property to configuration:

payment-status-notification.resequencer.strategy = SEQUENCE

Supporting Durable Stashes

Since the Resequencer is a stateful component, the implementation needs to persist the stash state outside of the current JVM to be considered reliable - otherwise all the data is lost on a JVM exit.

To allow us to choose between several degrees of reliability and performance, a configurable state storage strategy is used, with the following implementations:

  • no-op storage, which loses the state on JVM shutdown; suitable for latency-critical workloads where a late notification is worthless

  • cache storage, which uses our Infinispan cache adapters to store the data; depending on the cache mode used, offers a balance between performance and reliability - async caches may be lossy but will only slow us down during JVM start; sync caches are reliable but may slow down the notification send rate; unless Infinispan is configured to store cache data on disk, a full cluster restart will result in complete state loss

Cache implementation is used based on the following property:

payment-status-notification.resequencer.storage-type = none

or

payment-status-notification.resequencer.storage-type = cache

Resequencer default configuration

If the processing timeout limit is hit on the resequencer before it finishes, it will unstash all messages on the actor. Any messages received after the timeout will cause a new actor to be created. When this new actor has finished collecting all the messages, it will emit them once again. This creates a possibility of duplicate notifications being sent out.

payment-status-notification {
  resequencer {
    # The type of storage to use with our resequencer. Available options are:
    # * `none` - a no-op storage, which loses the state on JVM shutdown;
    # suitable for latency-critical workloads where a late notification is worthless
    # * `cache` - which uses our cache adapters to store the data; depending on the cache type and mode used,
    # offers a balance between performance and reliability - async caches can be lossy but will only slow the service down during JVM start;
    # sync caches are more reliable but may slow down the notification send rate;
    # unless Infinispan is configured to store cache data on disk, a full cluster restart will result in complete state loss
    storage-type = cache

    # The type of strategy used by resequencer. Available options are:
    # * `FULL_PAIN001` - stash all the data envelopes, regardless of their sequence until all pain.001 MDS objects (group header, instruction, transaction) have been encountered,
    # at which point we continue to pass the data envelopes through as they are incoming; While this kind of re-sequencing will not protect us from lost updates,
    # it should exhibit lower end to end latency than full sequence-based ordering
    # * `SEQUENCE` - full sequence-based ordering; with multiple flows executing concurrently,
    # we are ordering events both according to a global sequence and according to a local (internal) one
    # * `FULL_PAIN001_AND_CUSTOM_DATA` - Same as FULL_PAIN001 strategy with the addition of waiting for specific configurable custom objects (see payment-status-notification.resequencer.custom-data-keys)
    # * `FULL_PAIN001_AND_PDS_DATA` - Same as FULL_PAIN001 strategy with the addition of waiting for specific configurable pds objects (see payment-status-notification.resequencer.pds-data-keys)
    strategy = FULL_PAIN001

    # Only used for strategy FULL_PAIN001_AND_CUSTOM_DATA.
    # A list of keys used to extract data from CustomObjectContainers, which will be available in the produced report
    # for use in the target type mapper
    custom-data-keys = []

    # Only used for strategy FULL_PAIN001_AND_PDS_DATA.
    # A list of keys used to extract data from PdsObjectContainer, which will be available in the produced report
    # for use in the target type mapper
    pds-data-keys = []

    # The amount of time to wait for the Resequencer to
    # process a single DataEnvelope message
    processing-timeout = 3s

    # The number of times to attempt sending a single DataEnvelope to the Resequencer
    max-attempts = 3

    # The delay multiplier to use on subsequent send attempts
    backoff-factor = 2

    # The max amount of time the Resequencer can spend
    # idling before it terminates itself
    idle-timeout = 30s

    # The percentage of randomness to use when retrying domain event handling.
    jitter-factor = 0.2
  }
}