Resequencer
The Notification Service is concerned with processing domain events and based on those generating notifications. But due to the nature of this, how the events might arrive and the potential for delays in event arrival, there are potential problematic scenarios that can happen when events arrive out of order. To do this we can reorder those events/message before processing. Reordering of messages before processing is not a new concept; in fact, it’s been documented as the Resequencer pattern in the seminal EIP book almost 20 years ago.
The Resequencer Pattern is used for reordering of messages before processing.
Resequencer implementation is done as a local Akka actor created per unitOfWorkId. The actor delegates state-management and decisions regarding whether to handle an envelope to the ResequencerStrategy.
Re-sequencing strategies can be implemented via ResequencerStrategy interface that contains common resequencer strategy operations.
Handling is in turn delegated to the injected handlers: MDS Object Handler and Process Object Handler.
ResequencerGuardian is also an Akka actor that spawns Resequencer actors for a given unitOfWorkId.
The Resequencer roughly performs the following:
Re-sequencing Until all pain.001 MDS Objects Have Been Received
The simplest kind of re-sequencing is to stash all the data envelopes, regardless of their sequence until all pain.001 MDS objects (group header, instruction, transaction) have been encountered, at which point we continue to pass the data envelopes through as they are incoming. While this kind of re-sequencing will not protect us from lost updates, it should exhibit lower end to end latency than full sequence-based ordering.
Handling of a processing data element roughly conform to the following pseudocode:
encounteredMdsObjects.addAll(envelope.mdsObjects)
if (encounteredMdsObjects.containsAll(PAIN_001, PAIN_001_PAYMENT_INSTRUCTION, PAIN_001_PAYMENT_TRANSACTION)) {
// do the message processing here and when done
// unstash the next element if available and start from the top
} else {
// stash the data element
}
In order to enable this strategy, add the following property to configuration:
payment-status-notification.resequencer.strategy = FULL_PAIN001
Re-sequencing Until all pain.001 MDS Objects and Custom Objects Have Been Received
This behaves in the same way as the above resequencer with the addition of waiting on custom data keys in CustomObjectContainer objects. See configuration in Notification Sender for more information on how to configure this strategy.
In order to enable this strategy, add the following property to configuration:
payment-status-notification.resequencer.strategy = FULL_PAIN001_AND_CUSTOM_DATA
Additionally, this strategy requires keys to be configured, which will be used to look up the custom data keys to wait upon before proceeding to send notifications:
payment-status-notification.resequencer.custom-data-keys = ['BusinessData']
Re-sequencing Until all pain.001 MDS Objects and Pds Objects Have Been Received
This behaves in the same way as the above resequencer with the addition of waiting on pds data keys in PdsObjectContainer objects. See configuration in Notification Sender for more information on how to configure this strategy.
In order to enable this strategy, add the following property to configuration:
payment-status-notification.resequencer.strategy = FULL_PAIN001_AND_PDS_DATA
Additionally, this strategy requires keys to be configured, which will be used to look up the pds data keys to wait upon before proceeding to send notifications:
payment-status-notification.resequencer.pds-data-keys = ['pdsBusinessData']
Re-sequencing Based on IPF Sequences
Since we can have multiple flows executing concurrently, we need to order events according to both a global sequence and a local one. Handling of a processing data element roughly conform to the following pseudocode:
boolean isFirstEventGlobally = dataElement.sequence == 1
&& expectedNextGlobalSequence == 1
&& observedSequencesPerFlow.isEmpty()
// covers the condition above but being explicit about scenarios
// <= operator used here to allow concurrent flows
boolean isFirstEventLocally = dataElement.sequence <= expectedNextGlobalSequence
&& !observedSequencesPerFlow.containsKey(dataElement.flow)
// <= operator used here to allow concurrent flows
boolean isNextEvent = dataElement.sequence <= expectedNextGlobalSequence
|| dataElement.sequence <= (observedSequencesPerFlow.getOrDefault(dataElement.flow, 0) + 1)
if (isFirstEventGlobally || isFirstEventLocally || isNextEvent) {
// do the message processing here and when done:
expectedNextGlobalSequence++
var currentLocalSequence = observedSequencesPerFlow.getOrDefault(dataElement.flow, 1)
observedSequencesPerFlow.put(dataElement.flow, max(currentLocalSequence, dataElement.sequence))
// unstash the next element if available and start from the top
} else {
// stash the data element
}
The above snippet relies on messages being consumed one by one from Kafka, i.e. on receiver parallelism of 1 (current default).
If receive parallelism is enabled (and it may need to be for performance reasons, if increasing the sending parallelism is not enough), then we can no longer rely on events from a single flow arriving in sequence and the correctness of the solution for multiple concurrent flows is lessened to a degree in certain scenarios (e.g. one of the concurrent flows enriches a MDS object, its events arrive out of sequence with each other but are accepted due to the other flow increasing the global sequence).
In order to enable this strategy, add the following property to configuration:
payment-status-notification.resequencer.strategy = SEQUENCE
Supporting Durable Stashes
Since the Resequencer is a stateful component, the implementation needs to persist the stash state outside of the current JVM to be considered reliable - otherwise all the data is lost on a JVM exit.
To allow us to choose between several degrees of reliability and performance, a configurable state storage strategy is used, with the following implementations:
-
no-op storage, which loses the state on JVM shutdown; suitable for latency-critical workloads where a late notification is worthless
-
cache storage, which uses our Infinispan cache adapters to store the data; depending on the cache mode used, offers a balance between performance and reliability - async caches may be lossy but will only slow us down during JVM start; sync caches are reliable but may slow down the notification send rate; unless Infinispan is configured to store cache data on disk, a full cluster restart will result in complete state loss
Cache implementation is used based on the following property:
payment-status-notification.resequencer.storage-type = none
or
payment-status-notification.resequencer.storage-type = cache