Documentation for a newer release is available. View Latest

Mantener la consistencia de configuraciones entre colecciones

En el proyecto Dynamic Processing Settings (DPS), hay situaciones en las que los cambios en las configuraciones deben actualizarse de forma transaccional o coordinada a través de más de una colección de base de datos. Por ejemplo:

  • Aprobaciones: Cuando se concede una aprobación para una operación pendiente sobre una configuración (create, update o delete) y se actualiza la colección approvals, el cambio correspondiente debe aplicarse a la colección de configuraciones asociada.

  • Historial de configuraciones: Cuando se crea o modifica una configuración, el cambio debe registrarse tanto en la colección de configuraciones como en la colección history.

Dado que algunas bases de datos tienen soporte limitado para transacciones entre colecciones, DPS implementa un comportamiento de estilo transaccional adoptando Change Data Capture (CDC) para capturar y propagar cambios entre colecciones. Los detalles de cómo se logra esto se proporcionan en la siguiente sección.

Componentes y configuración de CDC

DB Change Processor Stream

El DB Change Processor Stream sirve como el componente principal de la implementación de CDC. Sus responsabilidades incluyen:

  1. Suscribirse a cambios en la base de datos a través del componente DB Change Provider, usando uno de los mecanismos de suscripción soportados (actualmente, solo change-streams está disponible; polling está planificado para una versión futura).

  2. Reenviar los cambios de base de datos recuperados al procesador apropiado, que determina cómo manejar cada cambio. Dado que el mismo cambio de base de datos puede entregarse a un procesador múltiples veces, los procesadores deben ser idempotentes.

  3. Rastrear el offset actual de los cambios de base de datos procesados para minimizar el procesamiento duplicado.

La lista completa de propiedades de configuración disponibles para el componente DB Change Processor Stream, incluyendo valores predeterminados, se muestra a continuación:

ipf.dps.dbchange.processor.stream {
  # The number of DBChanges to demand from upstream and process in parallel
  # Suggested values are around 500 to 1000
  upstream-event-demand = 500

  restart-settings {
    # The starting backoff interval to use when restarting DBChange processor streams.
    min-backoff = 500 millis
    # The starting backoff interval to use when restarting DBChange processor streams.
    max-backoff = 20 seconds
    # The number of restarts is capped within a timeframe of max-restarts-within.
    max-restarts = 86400000
    # The number of restarts is capped to max-restarts within a timeframe of within.
    max-restarts-within = 1 days
    # The starting backoff interval to use when restarting DBChange processor streams.
    jitter = 0.1
  }

  # To improve throughput, offsets of successfully processed DBChanges are
  # not checkpointed for each DBChange but are instead grouped together in
  # size and time based windows, with the last DBChange offset in a particular
  # window used as a checkpoint.
  # The window completes when it reaches the specified `size` of offsets,
  # or when the `timeout` duration passes (whichever comes first).
  commit-offset-window {
    # The size of the window.
    size = 1000
    # The amount of time to wait for `size` DBChanges to complete.
    timeout = 1 minute
  }

  # Requests to DBChangeProcessor implementations will be retried on exception based on the below config.
  # Once all retries have been exhausted, the DBChange will get sent to a dead letter appender
  resiliency-settings {
    # Max number of attempts to retry DbChangeProcessor in case of failure
    max-attempts = 3
    # Retry wait period between retries
    initial-retry-wait-duration = 1s
    # Backoff multiplier between retires
    backoff-multiplier = 2
    # Thread pool size for retries executor service
    retry-scheduler-thread-pool-size = 1
    # In the case of the dead letter itself failing we have more recovery options to try:
    # * COMMIT - Commit the offset
    # * NO_COMMIT - Do not commit the offset and retry the DBChange
    deadletter-failure-strategy = NO_COMMIT
  }
}

DB Change Provider

El componente DB Change Provider entrega un flujo de cambios de base de datos al DB Change Processor Stream, usando mecanismos de change streams o polling.

Change Streams

Los change streams sirven como el método predeterminado para capturar y suscribirse a cambios de base de datos como parte de la implementación de CDC. Esto permite la monitorización en tiempo real de las colecciones, permitiendo que el DB Change Processor Stream consuma y procese los cambios tan pronto como ocurren.

Polling

El polling aún no está disponible; se planea el soporte para una versión futura.

La lista completa de propiedades de configuración disponibles para el componente DB Change Provider, incluyendo valores predeterminados, se muestra a continuación. Nótese que la selección de la implementación de suscripción se realiza usando la propiedad ipf.dps.dbchange.subscription:

ipf.dps.dbchange {
  # DbChangeProvider implementation
  # * change-streams - recomended option
  # * polling - TODO: not supported yet
  subscription = "change-streams"

  # Buffer period when resuming change streams to help prevent data loss
  # caused by timing delays or race conditions.
  # Note: The DbChangeProvider is likely to emit more
  # dbChange duplicates as the allowed-time-drift value increases.
  allowed-time-drift = 2s

  # Larger values reduce the number of getMore operations
  # to fetch additional batches but increase the duration of each operation.
  collection-batch-size = 100

  # Interval to wait between polls to check if
  # the specified collection is available
  # (typically after it has previously been dropped).
  collection-available-polling-interval = 10s

  # Maximum time to wait for the specified collection to become available
  # (typically after it has previously been dropped).
  collection-available-max-wait-time = 5m
}