Maintaining Settings Consistency Across Collections

In the Dynamic Processing Settings (DPS) project, there are situations where changes to settings need to be updated transactionally or in a coordinated way across more than one database collection. For example:

  • Approvals: When an approval is granted for a pending setting operation (create, update, or delete) and the approvals collection is updated, the corresponding change must then be applied to the associated settings collection.

  • Setting History: When a setting is created or modified, the change must be recorded in both the settings and history collections.

As some databases have limited support for cross-collection transactions, DPS implements transactional-style behaviour by adopting Change Data Capture (CDC) to capture and propagate changes across collections. Details on how this is achieved are provided in the following section.

CDC Components and Configuration

DB Change Processor Stream

The DB Change Processor Stream serves as the core component of the CDC implementation. Its responsibilities include:

  1. Subscribing to database changes through the DB Change Provider component, using one of the supported subscription mechanisms (currently, only change-streams is available; polling is planned for a future release).

  2. Forwarding the retrieved database changes to the appropriate processor, which determines how to handle each change. As the same database change might be delivered to a processor multiple times, processors must be idempotent.

  3. Tracking the current offset of processed database changes to minimise duplicate processing.

The complete list of available configuration properties for the DB Change Processor Stream component, including default values, is shown below:

ipf.dps.dbchange.processor.stream {
  # The number of DBChanges to demand from upstream and process in parallel
  # Suggested values are around 500 to 1000
  upstream-event-demand = 500

  restart-settings {
    # The starting backoff interval to use when restarting DBChange processor streams.
    min-backoff = 500 millis
    # The starting backoff interval to use when restarting DBChange processor streams.
    max-backoff = 20 seconds
    # The number of restarts is capped within a timeframe of max-restarts-within.
    max-restarts = 86400000
    # The number of restarts is capped to max-restarts within a timeframe of within.
    max-restarts-within = 1 days
    # The starting backoff interval to use when restarting DBChange processor streams.
    jitter = 0.1
  }

  # To improve throughput, offsets of successfully processed DBChanges are
  # not checkpointed for each DBChange but are instead grouped together in
  # size and time based windows, with the last DBChange offset in a particular
  # window used as a checkpoint.
  # The window completes when it reaches the specified `size` of offsets,
  # or when the `timeout` duration passes (whichever comes first).
  commit-offset-window {
    # The size of the window.
    size = 1000
    # The amount of time to wait for `size` DBChanges to complete.
    timeout = 1 minute
  }

  # Requests to DBChangeProcessor implementations will be retried on exception based on the below config.
  # Once all retries have been exhausted, the DBChange will get sent to a dead letter appender
  resiliency-settings {
    # Max number of attempts to retry DbChangeProcessor in case of failure
    max-attempts = 3
    # Retry wait period between retries
    initial-retry-wait-duration = 1s
    # Backoff multiplier between retires
    backoff-multiplier = 2
    # Thread pool size for retries executor service
    retry-scheduler-thread-pool-size = 1
    # In the case of the dead letter itself failing we have more recovery options to try:
    # * COMMIT - Commit the offset
    # * NO_COMMIT - Do not commit the offset and retry the DBChange
    deadletter-failure-strategy = NO_COMMIT
  }
}

DB Change Provider

The DB Change Provider component delivers a stream of database changes to the DB Change Processor Stream, using either change streams or polling mechanisms.

Change Streams

Change streams serve as the default method for capturing and subscribing to database changes as part of the CDC implementation. This enables real-time monitoring of collections, allowing the DB Change Processor Stream to consume and process changes as soon as they occur.

Polling

Polling is not available yet; support is planned for a future release.

The complete list of available configuration properties for the DB Change Provider component, including default values, is shown below. Note that selection of the subscription implementation is done using the ipf.dps.dbchange.subscription property:

ipf.dps.dbchange {
  # DbChangeProvider implementation
  # * change-streams - recomended option
  # * polling - TODO: not supported yet
  subscription = "change-streams"

  # Buffer period when resuming change streams to help prevent data loss
  # caused by timing delays or race conditions.
  # Note: The DbChangeProvider is likely to emit more
  # dbChange duplicates as the allowed-time-drift value increases.
  allowed-time-drift = 2s

  # Larger values reduce the number of getMore operations
  # to fetch additional batches but increase the duration of each operation.
  collection-batch-size = 100

  # Interval to wait between polls to check if
  # the specified collection is available
  # (typically after it has previously been dropped).
  collection-available-polling-interval = 10s

  # Maximum time to wait for the specified collection to become available
  # (typically after it has previously been dropped).
  collection-available-max-wait-time = 5m
}