Resequenciador

El Servicio de Notificación se encarga de procesar domain events y basado en esos generando notifications. Pero debido a cómo el events puede llegar y el potencial de retrasos en event a la llegada, existen escenarios problemáticos potenciales que pueden ocurrir cuando events llegar fuera de orden. Para hacer esto, podemos reorganizar esos events/mensaje antes del procesamiento. La reordenación de mensajes antes del procesamiento no es un concepto nuevo; de hecho, ha sido documentado como el Patrón de re-secuenciación en el libro seminal de EIP hace casi 20 años.

El Patrón Resequenciador se utiliza para reordenar los mensajes antes de su procesamiento. La implementación del resequenciador se realiza de manera local. Akka actor creado por unitOfWorkId. El actor delega state-gestión y decisions en relación con si se debe manejar un sobre a la ResequencerStrategy. Las estrategias de re-secuenciación pueden ser implementadas a través de ResequencerStrategy interfaz que contiene operaciones de estrategia de reordenamiento comunes. El manejo se delega a su vez a los controladores inyectados:MDS Manejador de Objetos y Process Object Manejador. ResequencerGuardian también es un Akka actor que genera actores Resequencer para un dado unitOfWorkId.

El Resequenciador realiza aproximadamente lo siguiente:

notification-service-resequencer-state

Re-secuenciación Hasta que todo pain. 001 MDS Los objetos han sido recibidos

El tipo más simple de re-secuenciación es almacenar todos los data envelopes, independientemente de su secuencia, hasta que todos pain. 001 MDS se han encontrado objetos (encabezado de grupo, instrucción, transacción), momento en el cual continuamos pasando el data envelopes a medida que van llegando. Si bien este tipo de re-secuenciación no nos protegerá de actualizaciones perdidas, debería exhibir una latencia de extremo a extremo más baja que el ordenamiento basado en secuencias completo.

Manejo de un processing data el elemento se ajusta aproximadamente al siguiente pseudocódigo:

encounteredMdsObjects.addAll(envelope.mdsObjects)

if (encounteredMdsObjects.containsAll(PAIN_001, PAIN_001_PAYMENT_INSTRUCTION, PAIN_001_PAYMENT_TRANSACTION)) {
    // do the message processing here and when done
    // unstash the next element if available and start from the top
} else {
    // stash the data element
}

Para habilitar esta estrategia, añada la siguiente propiedad a la configuración:

payment-status-notification.resequencer.strategy = FULL_PAIN001

Re-secuenciación Hasta que todo pain. 001 MDS Objetos y Custom Los objetos han sido recibidos

Esto se comporta de la misma manera que el resecuenciador anterior con la adición de esperar en custom claves de datos en CustomObjectContainer objetos. Consulte la configuración en Remitente de Notificaciones para obtener más información sobre cómo configurar esta estrategia.

Para habilitar esta estrategia, añada la siguiente propiedad a la configuración:

payment-status-notification.resequencer.strategy = FULL_PAIN001_AND_CUSTOM_DATA

Además, esta estrategia requiere que se configuren claves, las cuales se utilizarán para buscar el custom claves de datos a esperar antes de proceder a enviar notifications:

payment-status-notification.resequencer.custom-data-keys = ['BusinessData']

Re-secuenciación Hasta que todo pain. 001 MDS Objetos y Pds Los objetos han sido recibidos

Esto se comporta de la misma manera que el resecuenciador anterior con la adición de esperar en pds claves de datos en objetos PdsObjectContainer. Consulte la configuración en Remitente de Notificaciones para obtener más información sobre cómo configurar esta estrategia.

Para habilitar esta estrategia, añada la siguiente propiedad a la configuración:

payment-status-notification.resequencer.strategy = FULL_PAIN001_AND_PDS_DATA

Además, esta estrategia requiere que se configuren claves, las cuales se utilizarán para buscar el pds claves de datos a esperar antes de proceder a enviar notifications:

payment-status-notification.resequencer.pds-data-keys = ['pdsBusinessData']

Re-secuenciación Basada en Secuencias de IPF

Dado que podemos tener múltiples flujos ejecutándose de manera concurrente, necesitamos ordenar events de acuerdo con una secuencia global y una local. Manejo de un processing data el elemento se ajusta aproximadamente al siguiente pseudocódigo:

boolean isFirstEventGlobally = dataElement.sequence == 1
    && expectedNextGlobalSequence == 1
    && observedSequencesPerFlow.isEmpty()
// covers the condition above but being explicit about scenarios
// <= operator used here to allow concurrent flows
boolean isFirstEventLocally = dataElement.sequence <= expectedNextGlobalSequence
    &&! observedSequencesPerFlow.containsKey(dataElement.flow)
// <= operator used here to allow concurrent flows
boolean isNextEvent = dataElement.sequence <= expectedNextGlobalSequence
    || dataElement.sequence <= (observedSequencesPerFlow.getOrDefault(dataElement flow, 0) + 1)
if (isFirstEventGlobally || isFirstEventLocally || isNextEvent) {
    // do the message processing here and when done:
    expectedNextGlobalSequence++
    var currentLocalSequence = observedSequencesPerFlow.getOrDefault(dataElement flow, 1)
    observedSequencesPerFlow.put(dataElement flow, max(currentLocalSequence, dataElement.sequence))
    // unstash the next element if available and start from the top
} else {
    // stash the data element
}

El fragmento anterior se basa en que los mensajes sean consumidos uno por uno de Kafka, es decir, en el paralelismo del receptor de 1 (valor predeterminado actual).

Si se habilita la recepción de paralelismo (puede ser necesario por razones de rendimiento, si aumentar el paralelismo de envío no es suficiente), entonces ya no podemos confiar en events de un único flujo que llega en secuencia y la corrección de la solución para múltiples flujos concurrentes se reduce en cierta medida en ciertos escenarios (ej. uno de los flujos concurrentes enriquece un MDS objeto, su events llegan fuera de secuencia entre sí, pero son aceptados debido a que el otro flujo incrementa la secuencia global).

Para habilitar esta estrategia, añada la siguiente propiedad a la configuración:

payment-status-notification.resequencer.strategy = SEQUENCE

Soporte para Almacenes Duraderos

Dado que el Resequencer es un componente con estado, la implementación debe persistir el stash.state fuera de lo actual JVM ser considerado confiable-de lo contrario, todos los datos se pierden en un JVM salir.

Para permitirnos elegir entre varios grados de fiabilidad y rendimiento, un configurable state se utiliza una estrategia de almacenamiento, con las siguientes implementaciones:

  • almacenamiento no-op, que pierde el state on JVM apagado; adecuado para cargas de trabajo críticas en latencia donde una notificación tardía no tiene valor-almacenamiento en caché, que utiliza nuestro Infinispan cache adaptadores para almacenar los datos; dependiendo de la cache el modo utilizado, ofrece un equilibrio entre rendimiento y fiabilidad-asíncrono caches puede ser con pérdida, pero solo nos ralentizará durante JVM iniciar; sincronizar caches son fiables pero pueden ralentizar la tasa de envío de notificaciones; a menos que Infinispan está configurado para almacenar cache datos en disco, un clúster completo restart resultará en completo state pérdida

    Cache la implementación se utiliza en función de la siguiente propiedad:
    payment-status-notification.resequencer.storage-type = none

o

payment-status-notification.resequencer.storage-type = cache

Configuración predeterminada del resecador

Si se alcanza el límite de tiempo de procesamiento en el reordenador antes de que finalice, se desalmacenarán todos los mensajes en el actor. Cualquier mensaje recibido después del tiempo de espera provocará la creación de un nuevo actor. Cuando este nuevo actor haya terminado de recopilar todos los mensajes, los emitirá una vez más. Esto crea una posibilidad de duplicado notifications enviado.

payment-status-notification {
  resequencer {
    # The type of storage to use with our resequencer. Available options are:
    # * `none` - a no-op storage, which loses the state on JVM shutdown;
    # suitable for latency-critical workloads where a late notification is worthless
    # * `cache` - which uses our cache adapters to store the data; depending on the cache type and mode used,
    # offers a balance between performance and reliability-async caches can be lossy but will only slow the service down during JVM start;
    # sync caches are more reliable but may slow down the notification send rate;
    # unless Infinispan is configured to store cache data on disk, a full cluster restart will result in complete state loss
    storage-type = cache

    # The type of strategy used by resequencer. Available options are:
    # * `FULL_PAIN001` - stash all the data envelopes, regardless of their sequence until all pain. 001 MDS objects (group header, instruction, transaction) have been encountered,
    # at which point we continue to pass the data envelopes through as they are incoming; While this kind of re-sequencing will not protect us from lost updates,
    # it should exhibit lower end to end latency than full sequence-based ordering
    # * `SEQUENCE` - full sequence-based ordering; with multiple flows executing concurrently,
    # we are ordering events both according to a global sequence and according to a local (internal) one
    # * `FULL_PAIN001_AND_CUSTOM_DATA` - Same as FULL_PAIN001 strategy with the addition of waiting for specific configurable custom objects (see payment-status-notification.resequencer.custom-data-keys)
    # * `FULL_PAIN001_AND_PDS_DATA` - Same as FULL_PAIN001 strategy with the addition of waiting for specific configurable pds objects (see payment-status-notification.resequencer.pds-data-keys)
    strategy = FULL_PAIN001

    # Only used for strategy FULL_PAIN001_AND_CUSTOM_DATA.
    # A list of keys used to extract data from CustomObjectContainers, which will be available in the produced report
    # for use in the target type mapper
    custom-data-keys = []

    # Only used for strategy FULL_PAIN001_AND_PDS_DATA.
    # A list of keys used to extract data from PdsObjectContainer, which will be available in the produced report
    # for use in the target type mapper
    pds-data-keys = []

    # The amount of time to wait for the Resequencer to
    # process a single DataEnvelope message
    processing-timeout = 3s

    # The number of times to attempt sending a single DataEnvelope to the Resequencer
    max-attempts = 3

    # The delay multiplier to use on subsequent send attempts
    backoff-factor = 2

    # The max amount of time the Resequencer can spend
    # idling before it terminates itself
    idle-timeout = 30s

    # The percentage of randomness to use when retrying domain event handling.
    jitter-factor = 0. 2
  }
}