Proyectos Flo Starter
Módulos
ipf-common-starter
Este es el módulo base que proporciona estas capacidades fundamentales. Es utilizado por los otros proyectos starter.
-
Sistema de actores Akka y otros subsistemas (p. ej., cluster sharding, persistence)
-
Inicializador de configuración HOCON y Spring. Consulta IPF Application Configuration. Contribuidores de información de Spring para mostrar propiedades de la aplicación en /actuator/info
ipf-write-starter
Proporciona capacidades de escritura para persistir events al tipo de persistencia seleccionado. Adicionalmente, proporciona:
-
BehaviourRetriesSupportpara enviar commands a Akka Event Sourced Behaviour de forma fiable -
Registradores de métricas que proporcionan métricas para Prometheus
-
TransactionCacheServicepara detectar comprobaciones de duplicados funcionales y técnicos. Consulta Transaction Caching
ipf-journal-processor-starter
Proporciona capacidades para crear tus propios journal processors, que son componentes que leen continuamente events persistidos y delegan el procesamiento de esos events a un EventProcessor configurado.
Actualmente, solo se admiten dos tecnologías de persistencia por los journal processor starters:
-
Mongo (
ipf-journal-processor-starter-mongo) -
Cassandra (
ipf-journal-processor-starter-cassandra)
Así es como funcionan:
Partitioning Events
-
Por defecto, los domain events producidos por código generado por Flo Lang se etiquetarán con etiquetas
tag-iyflow-name-i, dondeirepresenta la partición lógica a la que se ha asignado el flujo. El número de particiones se define poripf.behaviour.event-processor.parallelismen el lado de escritura, con una propiedad de configuración correspondienteevent-processor.number-of-partitionsen el lado del journal processor.
|
El número de particiones en el lado de escritura y en el lado del journal processor siempre debe coincidir.
El valor de particiones tampoco debería disminuir nunca ya que puede conducir a pérdida de datos y eventos fuera de orden. Si pretendes aumentarlo desde su valor por defecto, sé conservador en tus modificaciones y asegúrate de que tus servidores de base de datos puedan manejar la carga adicional de consultas. |
-
Puedes elegir cómo emitir tus events:
-
por una etiqueta global (p. ej.,
tag-i), que incluye events de todos los flujos; habilitado estableciendoevent-streaming-typeaEVENT_STREAM_PER_TAG -
por una etiqueta basada en el flujo (p. ej.,
flow-name-i), que incluye solo events pertenecientes a un flujo específico; habilitado estableciendoevent-streaming-typeaEVENT_STREAM_PER_FLOW
-
| Emitir por una etiqueta global (el valor por defecto) usa menos recursos pero no soporta despliegues con rolling upgrades. |
Streaming Events
-
Un journal processor proporciona un
EventProcessorStreamInitialiser, que es un actor Akka shardeado utilizado para iniciar una instancia deEventProcessorStreampor cada partición lógica conocida, representada por una etiqueta de evento. -
Un
EventProcessorStreames un componente que usa un Akka persistence read journal para suscribirse y emitir domain events pertenecientes a una etiqueta específica. -
Los events se pasan a un
EventProcessorpara su procesamiento.
Gestión de offsets
-
Antes de iniciarse, un
EventProcessorStreamusa unOffsetServicepara determinar la posición —llamada offset— desde la que comenzar a emitir eventos. -
La propiedad de configuración
start-stream-fromte permite elegir dónde empezar cuando elOffsetServiceno encuentra offsets:-
EARLIEST, el valor por defecto, empezará a emitir eventos desde el principio del journal, recogiendo el evento más antiguo para cada etiqueta. -
LATEST, disponible solo en plugins de persistencia que soportanTimestampOffset, empieza a emitir eventos creados después de que el journal processor se haya iniciado, ignorando completamente los eventos históricos.
-
-
Una vez procesados, los offsets de eventos se almacenan en buffer y se persisten periódicamente en el
OffsetServicede modo que el stream pueda reanudar el procesamiento desde el último evento procesado con éxito en caso de fallo. -
Consulta Modelo de datos de Mongo Offset para detalles de cómo se almacenan los offsets.
Event Processors
-
Los
EventProcessorsson componentes creados por los usuarios del móduloipf-journal-processor-starterque proporcionan lógica de negocio para el manejo de eventos.
|
Para permitir un alto rendimiento en el procesamiento de eventos, los offsets de eventos procesados con éxito no se almacenan para cada evento sino que se almacenan en buffer y se persisten periódicamente; por lo tanto, en caso de caída de un nodo los eventos pueden reprocesarse. En consecuencia, el Además, cuando se emiten eventos por flujo, eventos pertenecientes a distintos flujos pero a un mismo |
Manejo de errores
En caso de que falle la implementación de EventProcessor, las Resiliency Settings están en su lugar para gestionar la recuperación de mensajes del flujo de eventos. Opcionalmente, puede implementarse una estrategia de "dead letter" en caso de fallos de reintento.
Deadletter Appender
El DeadletterAppender es una interfaz funcional que se llama siempre que un mensaje falla durante el procesamiento del flujo de eventos después de agotar todos los reintentos.
@FunctionalInterface
public interface DeadletterAppender {
/**
* Appends the {@link EventProcessorException} to the deadletter queue.
*
* @param eventProcessorException contains the failed {@link EventEnvelope message}, {@link PersistenceId id} and the cause of failure
* @return a {@link CompletableFuture}
*/
CompletableFuture<Void> append(EventProcessorException eventProcessorException);
}
Proporcionar una implementación de DeadletterAppender es opcional y, si no se proporciona, el journal processor usará la implementación por defecto. El comportamiento por defecto es simplemente registrar tanto el mensaje fallido como la excepción que causó el error.
Todos los mensajes fallidos se proporcionarán como una excepción que extienda EventProcessorException. EventProcessorException envuelve la excepción original como causa junto con el evento recibido.
ipf-read-starter
Proporciona capacidades para procesar los eventos que fueron leídos desde ipf-journal-processor-starter para construir un domain aggregate.
Consulta ReadSideEventProcessor como ejemplo de una implementación de EventProcessor.
Se proporciona un /index.html estático listo para usar para una vista simple de los read aggregates que se han procesado correctamente.
Configuración por defecto
ipf-common-starter
# Default IPF configuration to allow bootstrapped execution of applications that depend on ipf-common-starter and provide
# core functionalities such metrics, health, ports and cluster set-up.
# Name of the actor system that is required by Akka. There will only be a single actor system per application
actor-system-name = ipf-flow
# The name to be used as a unique identifier of the source of IPF system events
ipf.system-events.source = 0.0.0.0
ipf.system-events.source = ${?akka.remote.artery.canonical.hostname}
ipf.application.name = ${actor-system-name}
spring.application.name = ${ipf.application.name}
# The duration after which the connector event processor will
# check connector health and perform the startup logic
connector.event-processor.keep-alive-interval = 5s
# Default cinnamon to allow for monitoring
cinnamon {
prometheus.exporters += "http-server"
}
# Exposing Spring management endpoints for further metrics
management {
endpoint.metrics.enabled = true
endpoints.web.exposure.include = "health,info,metrics,prometheus"
endpoint.health.probes.enabled=true
health.livenessState.enabled=true
health.readinessState.enabled=true
endpoint.prometheus.enabled = true
endpoint.health.show-details = always
metrics.export.prometheus.enabled = true
}
ipf-journal-processor-starter
# Default IPF journal processor configuration
event-processor {
# The ID used to create the key for the Entity Type of events.
# The KeepAlive cluster singleton is also created with the name keepAlive-[id]
id = EventProcessor
# Whether to start the processor or not
enabled = true
# Cluster role to use to instantiate this event processor
cluster-role = read-model
# Maintain backward compatibility with the old configuration
cluster-role = ${?stream.processor.cluster-role}
# The interval at which the KeepAlive actor probes itself with a Probe.INSTANCE
keep-alive-interval = 2 seconds
# Determines which offset to use if no offsets are found in the offset store:
# * `EARLIEST` will start from the earliest offset found in the journal
# * `LATEST` will start from the latest
start-stream-from = EARLIEST
# Determines how events are streamed:
# * `EVENT_STREAM_PER_TAG` will create an event stream per each tag found in the `tag-prefix` list;
# domain events belonging to multiple flows can be streamed per tag
# * `EVENT_STREAM_PER_FLOW` will create an event stream per each flow found in `flows` list
# (populated from `ipf.behaviour.event-processor.flows` by default);
# only domain events belonging to a single flow will be streamed
event-streaming-type = EVENT_STREAM_PER_TAG
# The tag prefix for events generated by the write side.
# This value must match the `ipf.behaviour.event-processor.tag-prefix` setting
# used by the write side or else the processor will not be consuming all events.
tag-prefix = ["tag"]
# Ensure configurations match if journal processor is running on the write side
tag-prefix = ${?ipf.behaviour.event-processor.tag-prefix}
# The flows to use when `event-streaming-type` is set to `EVENT_STREAM_PER_FLOW`.
# Should not be overridden by users, a warning will be printed by `akka-diagnostics`
# if the users try to provide their own lists.
flows = []
flows = ${?ipf.behaviour.event-processor.flows}
# The number of partitions configured for our event journal.
# Each partition will be processed in parallel by a dedicated Akka stream.
# This value must match the `ipf.behaviour.event-processor.parallelism` setting
# used by the write side - in case the value is lower than `parallelism` the processor
# will not be consuming all events, and if the value is higher, the processor will
# not balance the work between the nodes equally.
number-of-partitions = 4
# Maintaining backward compatibility with previous configuration setting.
number-of-partitions = ${?event-processor.parallelism}
# Ensure configurations match if journal processor is running on the write side
number-of-partitions = ${?ipf.behaviour.event-processor.parallelism}
# The number of events to demand per partition from upstream, and process in parallel
processing-parallelism = 1
# Backward compatibility for the deprecated upstream-event-demand config.
processing-parallelism = ${?event-processor.upstream-event-demand}
# How many events belonging to a particular entity to process in parallel.
# Should only be set to a value higher than 1 if the configured EventProcessor
# can safely handle out of order events.
processing-parallelism-per-entity = 1
restart-settings {
# The starting backoff interval to use when restarting event processor streams.
min-backoff = 500 millis
# The starting backoff interval to use when restarting event processor streams.
max-backoff = 20 seconds
# Maintaining backward compatibility with previous configuration.
max-backoff = ${?event-processor.backoff}
# The amount of restarts is capped within a timeframe of max-restarts-within.
max-restarts = 86400000
# The amount of restarts is capped to max-restarts within a timeframe of within.
max-restarts-within = 1 days
# The starting backoff interval to use when restarting event processor streams.
jitter = 0.1
}
# To improve throughput, offsets of successfully processed events are
# not checkpointed for each event but are grouped together in
# size and time based windows and the last event offset in a window
# is used as a checkpoint.
# The window is considered complete when either it is filled by `size`
# offsets or the `timeout` interval has elapsed.
commit-offset-window {
# The size of the window.
size = 1000
# The amount of time to wait for `size` events to complete.
timeout = 1 minute
}
# Requests to EventProcessor implementations will be retried on exception based on the below config
# Once retries have been exhausted the event will get sent to a dead letter appender
resiliency-settings {
# Max number of attempts to retry EventProcessor in case of failure
max-attempts = 3
# Retry wait period between retries
initial-retry-wait-duration = 1s
# Backoff multiplier between retires
backoff-multiplier = 2
# Thread pool size for retries executor service
retry-scheduler-thread-pool-size = 1
# In the case of the dead letter itself failing we have more recovery options to try:
# * COMMIT - Commit the offset
# * NO_COMMIT - Do not commit the offset and retry the event
deadletter-failure-strategy = NO_COMMIT
}
}
ipf-write-starter
# Default IPF write side configuration
# Toggles model validation of generated ModelOperations
ipf.validators.model.enabled = true
# Declaring ipf_version_selection context key as propagated to ensure it is added
# to the current propagated context if encountered as a transport header
ipf.context-propagation.propagated-supporting-context-keys += ipf_version_selection
# The configuration block related to version selection in IPF flows
ipf.behaviour.version-selection {
# Toggles the IPF-default version selection support
enabled = true
# Determines which of the two IPF-default types of route store is to be used for version selection=
# * hocon, which is easy to set up but cannot be changed through the API
# * ddata, which requires a persistent volume to be mounted in order not to lose data between full cluster restarts
# Defaults to hocon store type
route-store-type = hocon
# The global flow routing configuration to use when DData store is disabled or empty
global-rules = [{
# The defaults use the propagated `ipf_version_selection` key to drive version selection
key-name = ipf_version_selection
version-rules = [{
active-from = "1970-01-01T00:00:00.00Z"
# The default rules only allow you to pick the oldest or latest versions of flows
values-to-versions {
OLDEST = OLDEST
LATEST = LATEST
STABLE = OLDEST
CANARY = LATEST
}
}]
}]
# The per-flow routing configuration to use when DData store is disabled or empty
per-flow-rules {
# Example structure your per-flow rule configuration should take:
#"Flow" = {
# key-name = ipf_version_selection
# version-rules = [{
# active-from = "1970-01-01T00:00:00.00Z"
# # The default rules only allow you to pick the oldest or latest versions of flows
# values-to-versions {
# VALUE1 = V1
# VALUE2 = V2
# }
# }]
#}
}
# Configures DData-specific settings
ddata {
# How long to wait for DData read and write operations to propagate to other nodes
replicator-timeout = 3s
# Retry settings used by the rules store when communicating with the replicator actor
retry-settings {
initial-timeout = 5s
backoff-factor = 2
jitter-factor = 0.2
max-attempts = 3
}
}
}
# cluster settings
akka.cluster {
# Routing rules used by DDataFlowRouteReplicator will be durable by default
#
distributed-data.durable.keys += "flow-routing-rules"
# Identifies that this node is on the write side when the cluster endpoint is invoked.
roles += write-model
seed-nodes = ["akka://"${actor-system-name}"@0.0.0.0:"${akka.remote.artery.canonical.port}]
# sharding settings
sharding {
remember-entities = on
remember-entities-store = eventsourced
handoff-timeout = 8s
least-shard-allocation-strategy.rebalance-absolute-limit = 20
rebalance-interval = 2s
}
}
# Providing cinnamon metrics by default for all persistent entities
cinnamon {
akka.persistence.entities {
"sharded:?" {
report-by = group
command-type = on
event-type = on
}
}
}