Documentation for a newer release is available. View Latest

Proyectos Flo Starter

Módulos

ipf-common-starter

Este es el módulo base que proporciona estas capacidades fundamentales. Es utilizado por los otros proyectos starter.

  1. Sistema de actores Akka y otros subsistemas (p. ej., cluster sharding, persistence)

  2. Inicializador de configuración HOCON y Spring. Consulta IPF Application Configuration. Contribuidores de información de Spring para mostrar propiedades de la aplicación en /actuator/info

ipf-write-starter

Proporciona capacidades de escritura para persistir events al tipo de persistencia seleccionado. Adicionalmente, proporciona:

  1. BehaviourRetriesSupport para enviar commands a Akka Event Sourced Behaviour de forma fiable

  2. Registradores de métricas que proporcionan métricas para Prometheus

  3. TransactionCacheService para detectar comprobaciones de duplicados funcionales y técnicos. Consulta Transaction Caching

ipf-journal-processor-starter

Proporciona capacidades para crear tus propios journal processors, que son componentes que leen continuamente events persistidos y delegan el procesamiento de esos events a un EventProcessor configurado.

Actualmente, solo se admiten dos tecnologías de persistencia por los journal processor starters:

  1. Mongo (ipf-journal-processor-starter-mongo)

  2. Cassandra (ipf-journal-processor-starter-cassandra)

Así es como funcionan:

bar

Partitioning Events

  • Por defecto, los domain events producidos por código generado por Flo Lang se etiquetarán con etiquetas tag-i y flow-name-i, donde i representa la partición lógica a la que se ha asignado el flujo. El número de particiones se define por ipf.behaviour.event-processor.parallelism en el lado de escritura, con una propiedad de configuración correspondiente event-processor.number-of-partitions en el lado del journal processor.

El número de particiones en el lado de escritura y en el lado del journal processor siempre debe coincidir.

  • La coincidencia se hace automáticamente si el journal processor se implementa como parte del lado de escritura. Sin embargo, si estás desplegando un journal processor como una aplicación separada, depende de ti asegurar la consistencia entre las configuraciones.

  • Un desajuste donde el lado del journal processor esté configurado a un valor mayor es funcionalmente correcto pero llevará a uso de recursos desperdiciado.

  • Un desajuste donde el lado del journal processor esté configurado a un valor menor causará pérdida de datos temporal ya que ciertas particiones de eventos no serán emitidas ni procesadas hasta que el número de particiones se incremente para coincidir con el lado de escritura.

El valor de particiones tampoco debería disminuir nunca ya que puede conducir a pérdida de datos y eventos fuera de orden. Si pretendes aumentarlo desde su valor por defecto, sé conservador en tus modificaciones y asegúrate de que tus servidores de base de datos puedan manejar la carga adicional de consultas.

  • Puedes elegir cómo emitir tus events:

    • por una etiqueta global (p. ej., tag-i), que incluye events de todos los flujos; habilitado estableciendo event-streaming-type a EVENT_STREAM_PER_TAG

    • por una etiqueta basada en el flujo (p. ej., flow-name-i), que incluye solo events pertenecientes a un flujo específico; habilitado estableciendo event-streaming-type a EVENT_STREAM_PER_FLOW

Emitir por una etiqueta global (el valor por defecto) usa menos recursos pero no soporta despliegues con rolling upgrades.

Streaming Events

  • Un journal processor proporciona un EventProcessorStreamInitialiser, que es un actor Akka shardeado utilizado para iniciar una instancia de EventProcessorStream por cada partición lógica conocida, representada por una etiqueta de evento.

  • Un EventProcessorStream es un componente que usa un Akka persistence read journal para suscribirse y emitir domain events pertenecientes a una etiqueta específica.

  • Los events se pasan a un EventProcessor para su procesamiento.

Gestión de offsets

  • Antes de iniciarse, un EventProcessorStream usa un OffsetService para determinar la posición —llamada offset— desde la que comenzar a emitir eventos.

  • La propiedad de configuración start-stream-from te permite elegir dónde empezar cuando el OffsetService no encuentra offsets:

    • EARLIEST, el valor por defecto, empezará a emitir eventos desde el principio del journal, recogiendo el evento más antiguo para cada etiqueta.

    • LATEST, disponible solo en plugins de persistencia que soportan TimestampOffset, empieza a emitir eventos creados después de que el journal processor se haya iniciado, ignorando completamente los eventos históricos.

  • Una vez procesados, los offsets de eventos se almacenan en buffer y se persisten periódicamente en el OffsetService de modo que el stream pueda reanudar el procesamiento desde el último evento procesado con éxito en caso de fallo.

  • Consulta Modelo de datos de Mongo Offset para detalles de cómo se almacenan los offsets.

Event Processors

  • Los EventProcessors son componentes creados por los usuarios del módulo ipf-journal-processor-starter que proporcionan lógica de negocio para el manejo de eventos.

Para permitir un alto rendimiento en el procesamiento de eventos, los offsets de eventos procesados con éxito no se almacenan para cada evento sino que se almacenan en buffer y se persisten periódicamente; por lo tanto, en caso de caída de un nodo los eventos pueden reprocesarse. En consecuencia, el EventProcessor que crees tiene que estar escrito de forma que pueda manejar eventos duplicados.

Además, cuando se emiten eventos por flujo, eventos pertenecientes a distintos flujos pero a un mismo UnitOfWorkId pueden procesarse fuera de orden; por lo tanto, si tienes múltiples flujos que cooperan dentro de un UnitOfWorkId, tu EventProcessor tendrá que ser capaz de soportar eventos fuera de orden.

Manejo de errores

En caso de que falle la implementación de EventProcessor, las Resiliency Settings están en su lugar para gestionar la recuperación de mensajes del flujo de eventos. Opcionalmente, puede implementarse una estrategia de "dead letter" en caso de fallos de reintento.

Deadletter Appender

El DeadletterAppender es una interfaz funcional que se llama siempre que un mensaje falla durante el procesamiento del flujo de eventos después de agotar todos los reintentos.

@FunctionalInterface
public interface DeadletterAppender {
    /**
     * Appends the {@link EventProcessorException} to the deadletter queue.
     *
     * @param eventProcessorException contains the failed {@link EventEnvelope message}, {@link PersistenceId id} and the cause of failure
     * @return a {@link CompletableFuture}
     */
    CompletableFuture<Void> append(EventProcessorException eventProcessorException);
}

Proporcionar una implementación de DeadletterAppender es opcional y, si no se proporciona, el journal processor usará la implementación por defecto. El comportamiento por defecto es simplemente registrar tanto el mensaje fallido como la excepción que causó el error.

Todos los mensajes fallidos se proporcionarán como una excepción que extienda EventProcessorException. EventProcessorException envuelve la excepción original como causa junto con el evento recibido.

ipf-read-starter

Proporciona capacidades para procesar los eventos que fueron leídos desde ipf-journal-processor-starter para construir un domain aggregate.

Consulta ReadSideEventProcessor como ejemplo de una implementación de EventProcessor.

Se proporciona un /index.html estático listo para usar para una vista simple de los read aggregates que se han procesado correctamente.

Configuración por defecto

ipf-common-starter

ipf.conf
# Default IPF configuration to allow bootstrapped execution of applications that depend on ipf-common-starter and provide
# core functionalities such metrics, health, ports and cluster set-up.

# Name of the actor system that is required by Akka. There will only be a single actor system per application
actor-system-name = ipf-flow

# The name to be used as a unique identifier of the source of IPF system events
ipf.system-events.source = 0.0.0.0
ipf.system-events.source = ${?akka.remote.artery.canonical.hostname}

ipf.application.name = ${actor-system-name}
spring.application.name = ${ipf.application.name}

# The duration after which the connector event processor will
# check connector health and perform the startup logic
connector.event-processor.keep-alive-interval = 5s

# Default cinnamon to allow for monitoring
cinnamon {
  prometheus.exporters += "http-server"
}

# Exposing Spring management endpoints for further metrics
management {
  endpoint.metrics.enabled = true
  endpoints.web.exposure.include = "health,info,metrics,prometheus"
  endpoint.health.probes.enabled=true
  health.livenessState.enabled=true
  health.readinessState.enabled=true
  endpoint.prometheus.enabled = true
  endpoint.health.show-details = always
  metrics.export.prometheus.enabled = true
}

ipf-journal-processor-starter

ipf.conf
# Default IPF journal processor configuration
event-processor {
  # The ID used to create the key for the Entity Type of events.
  # The KeepAlive cluster singleton is also created with the name keepAlive-[id]
  id = EventProcessor

  # Whether to start the processor or not
  enabled = true

  # Cluster role to use to instantiate this event processor
  cluster-role = read-model
  # Maintain backward compatibility with the old configuration
  cluster-role = ${?stream.processor.cluster-role}

  # The interval at which the KeepAlive actor probes itself with a Probe.INSTANCE
  keep-alive-interval = 2 seconds

  # Determines which offset to use if no offsets are found in the offset store:
  # * `EARLIEST` will start from the earliest offset found in the journal
  # * `LATEST` will start from the latest
  start-stream-from = EARLIEST

  # Determines how events are streamed:
  # * `EVENT_STREAM_PER_TAG` will create an event stream per each tag found in the `tag-prefix` list;
  #    domain events belonging to multiple flows can be streamed per tag
  # * `EVENT_STREAM_PER_FLOW` will create an event stream per each flow found in `flows` list
  #    (populated from `ipf.behaviour.event-processor.flows` by default);
  #    only domain events belonging to a single flow will be streamed
  event-streaming-type = EVENT_STREAM_PER_TAG

  # The tag prefix for events generated by the write side.
  # This value must match the `ipf.behaviour.event-processor.tag-prefix` setting
  # used by the write side or else the processor will not be consuming all events.
  tag-prefix = ["tag"]

  # Ensure configurations match if journal processor is running on the write side
  tag-prefix = ${?ipf.behaviour.event-processor.tag-prefix}

  # The flows to use when `event-streaming-type` is set to `EVENT_STREAM_PER_FLOW`.
  # Should not be overridden by users, a warning will be printed by `akka-diagnostics`
  # if the users try to provide their own lists.
  flows = []
  flows = ${?ipf.behaviour.event-processor.flows}

  # The number of partitions configured for our event journal.
  # Each partition will be processed in parallel by a dedicated Akka stream.
  # This value must match the `ipf.behaviour.event-processor.parallelism` setting
  # used by the write side - in case the value is lower than `parallelism` the processor
  # will not be consuming all events, and if the value is higher, the processor will
  # not balance the work between the nodes equally.
  number-of-partitions = 4

  # Maintaining backward compatibility with previous configuration setting.
  number-of-partitions = ${?event-processor.parallelism}

  # Ensure configurations match if journal processor is running on the write side
  number-of-partitions = ${?ipf.behaviour.event-processor.parallelism}

  # The number of events to demand per partition from upstream, and process in parallel
  processing-parallelism = 1

  # Backward compatibility for the deprecated upstream-event-demand config.
  processing-parallelism = ${?event-processor.upstream-event-demand}

  # How many events belonging to a particular entity to process in parallel.
  # Should only be set to a value higher than 1 if the configured EventProcessor
  # can safely handle out of order events.
  processing-parallelism-per-entity = 1

  restart-settings {
    # The starting backoff interval to use when restarting event processor streams.
    min-backoff = 500 millis

    # The starting backoff interval to use when restarting event processor streams.
    max-backoff = 20 seconds

    # Maintaining backward compatibility with previous configuration.
    max-backoff = ${?event-processor.backoff}

    # The amount of restarts is capped within a timeframe of max-restarts-within.
    max-restarts = 86400000

    # The amount of restarts is capped to max-restarts within a timeframe of within.
    max-restarts-within = 1 days

    # The starting backoff interval to use when restarting event processor streams.
    jitter = 0.1
  }

  # To improve throughput, offsets of successfully processed events are
  # not checkpointed for each event but are grouped together in
  # size and time based windows and the last event offset in a window
  # is used as a checkpoint.
  # The window is considered complete when either it is filled by `size`
  # offsets or the `timeout` interval has elapsed.
  commit-offset-window {
    # The size of the window.
    size = 1000

    # The amount of time to wait for `size` events to complete.
    timeout = 1 minute
  }

  # Requests to EventProcessor implementations will be retried on exception based on the below config
  # Once retries have been exhausted the event will get sent to a dead letter appender
  resiliency-settings {
    # Max number of attempts to retry EventProcessor in case of failure
    max-attempts = 3
    # Retry wait period between retries
    initial-retry-wait-duration = 1s
    # Backoff multiplier between retires
    backoff-multiplier = 2
    # Thread pool size for retries executor service
    retry-scheduler-thread-pool-size = 1
    # In the case of the dead letter itself failing we have more recovery options to try:
    # * COMMIT - Commit the offset
    # * NO_COMMIT - Do not commit the offset and retry the event
    deadletter-failure-strategy = NO_COMMIT
  }
}

ipf-write-starter

ipf.conf
# Default IPF write side configuration

# Toggles model validation of generated ModelOperations
ipf.validators.model.enabled = true

# Declaring ipf_version_selection context key as propagated to ensure it is added
# to the current propagated context if encountered as a transport header
ipf.context-propagation.propagated-supporting-context-keys += ipf_version_selection

# The configuration block related to version selection in IPF flows
ipf.behaviour.version-selection {

  # Toggles the IPF-default version selection support
  enabled = true

  # Determines which of the two IPF-default types of route store is to be used for version selection=
  # * hocon, which is easy to set up but cannot be changed through the API
  # * ddata, which requires a persistent volume to be mounted in order not to lose data between full cluster restarts
  # Defaults to hocon store type
  route-store-type = hocon

  # The global flow routing configuration to use when DData store is disabled or empty
  global-rules = [{
    # The defaults use the propagated `ipf_version_selection` key to drive version selection
    key-name = ipf_version_selection
    version-rules = [{
      active-from = "1970-01-01T00:00:00.00Z"
      # The default rules only allow you to pick the oldest or latest versions of flows
      values-to-versions {
        OLDEST = OLDEST
        LATEST = LATEST
        STABLE = OLDEST
        CANARY = LATEST
      }
    }]
  }]

  # The per-flow routing configuration to use when DData store is disabled or empty
  per-flow-rules {
    # Example structure your per-flow rule configuration should take:
    #"Flow" = {
    #  key-name = ipf_version_selection
    #  version-rules = [{
    #   active-from = "1970-01-01T00:00:00.00Z"
    #    # The default rules only allow you to pick the oldest or latest versions of flows
    #    values-to-versions {
    #      VALUE1 = V1
    #      VALUE2 = V2
    #    }
    #  }]
    #}
  }

  # Configures DData-specific settings
  ddata {
    # How long to wait for DData read and write operations to propagate to other nodes
    replicator-timeout = 3s

    # Retry settings used by the rules store when communicating with the replicator actor
    retry-settings {
      initial-timeout = 5s
      backoff-factor = 2
      jitter-factor = 0.2
      max-attempts = 3
    }
  }
}

# cluster settings
akka.cluster {
  # Routing rules used by DDataFlowRouteReplicator will be durable by default
  #
  distributed-data.durable.keys += "flow-routing-rules"

  # Identifies that this node is on the write side when the cluster endpoint is invoked.
  roles += write-model

  seed-nodes = ["akka://"${actor-system-name}"@0.0.0.0:"${akka.remote.artery.canonical.port}]

  # sharding settings
  sharding {
    remember-entities = on
    remember-entities-store = eventsourced

    handoff-timeout = 8s
    least-shard-allocation-strategy.rebalance-absolute-limit = 20
    rebalance-interval = 2s
  }
}

# Providing cinnamon metrics by default for all persistent entities
cinnamon {
  akka.persistence.entities {
    "sharded:?" {
      report-by = group
      command-type = on
      event-type = on
    }
  }
}