Programación de flujos
El módulo ipf-flo-scheduler proporciona la capacidad de interactuar con flujos de IPF y activar capacidades de scheduling.
Actualmente, hay tres tipos de funcionalidades de scheduling disponibles para usar dentro de un flujo:
-
Action Timeouts: proporciona la capacidad para que el flujo continúe procesando si no se recibe una respuesta de una action en un tiempo definido. -
Retries: proporciona un mecanismo para que el flujo pueda invocar reintentos de actions cuando no se devuelven respuestas en el tiempo definido. -
Processing Timeouts: proporciona un mecanismo para definir que una sección del flujo (entre dos estados cualesquiera) debe completarse dentro de un tiempo definido.
Existen dos tipos diferentes de scheduler para proporcionar las implementaciones según los requisitos:
Para usar un scheduler, simplemente es necesario importar el scheduler relevante como se define a continuación e inyectar el scheduler port al instanciar el dominio:
@Bean
public QuickStartModelDomain initialiseDomain(ActorSystem actorSystem, SchedulerPort schedulerAdapter) {
// All adapters should be added to the domain model
return new QuickStartModelDomain.Builder(actorSystem)
.withSchedulerAdapter(schedulerAdapter)
.build();
}
La implementación de scheduler relevante proporcionará la implementación requerida.
Esto asume que ya has cableado el bean ModelOperations del dominio para acceder a estas capacidades.
Si no, defínelo para tu dominio así:
|
@Bean
public QuickStartModelOperations quickStartModelOperations() {
return new QuickStartModelOperations();
}
Akka Scheduler
Toda la documentación sobre programación en Akka se puede encontrar aquí.
El scheduler en Akka está diseñado para un alto rendimiento de miles hasta millones de disparadores. El caso de uso principal es activar timeouts de recepción de Actors, timeouts de Futures, circuit breakers y otros eventos dependientes del tiempo que suceden continuamente y en muchas instancias al mismo tiempo.
El Akka scheduler no está diseñado para programación de largo plazo; para eso deberías usar Quartz scheduler. Tampoco debe usarse para disparos altamente precisos de eventos. La cantidad máxima de tiempo en el futuro que puedes programar un evento para que se dispare es alrededor de 8 meses, lo cual en la práctica es demasiado para ser útil ya que asumiría que el sistema no se cae durante ese período. Si necesitas programación a largo plazo recomendamos encarecidamente buscar schedulers alternativos, dado que este no es el caso de uso para el que se implementó el Akka scheduler.
Para usar la implementación de Akka scheduler, solo necesitas proporcionar la dependencia:
<dependency>
<groupId>com.iconsolutions.ipf.core.platform</groupId>
<artifactId>ipf-flo-scheduler-akka</artifactId>
</dependency>
Persistent Scheduler
La implementación de persistent scheduler usa ipf-persistent-scheduler de Icon como su implementación subyacente.
Toda la documentación sobre Quartz scheduling se puede encontrar en este enlace.
Quartz es una librería de programación de jobs rica en funcionalidades, de código abierto, que puede integrarse prácticamente en cualquier aplicación Java.
Si tu aplicación tiene tareas que deben ocurrir en momentos dados, o si tu sistema tiene jobs de mantenimiento recurrentes, Quartz puede ser tu solución ideal.
Además de la programación Quartz, IPF también soporta persistir jobs programados. En caso de un fallo del sistema, cuando se reinicie, el sistema puede rehidratar los jobs aún activos.
Para usar el Quartz scheduler, simplemente añade su dependencia:
<dependency>
<groupId>com.iconsolutions.ipf.core.platform</groupId>
<artifactId>ipf-flo-scheduler-persistent</artifactId>
</dependency>
Además, para la programación con Quartz es necesario proporcionar un datastore para la persistencia. La siguiente configuración muestra cómo hacerlo:
ipf-scheduler {
mongodb = {
uri = "mongodb://localhost/ipf"
database-name = "scheduled"
}
}
Akka Scheduler Vs Quartz Scheduler
| Scheduler Type | Pros | Cons |
|---|---|---|
Akka |
Short-term events that should last seconds to minutes. |
- Perhaps the name "Scheduler" was unfortunate, "Deferer" is probably more indicative of what it does. - The Akka Scheduler is designed to set up events to run at time intervals from the current date-time e.g. "fire this job in 15 minutes and every 30 minutes thereafter". It does not support fixed time scheduling e.g. "fire a job every day at 3pm". - Akka’s default scheduler is executed around a HashedWheelTimer – a potential precision loss for jobs, as it does not provide strong guarantees on the timeliness of execution. - Scheduled jobs get lost when the system restarts. |
Quartz |
- Jobs are scheduled to run when a given Trigger occurs. Triggers can be created with nearly any combination of the following directives.
At a certain time of day (to the second), on certain days of the week, and so on. - Jobs are given names by their creator and can also be organized into named groups. Triggers may also be given names and placed into groups, in order to easily organize them within the scheduler. Jobs can be added to the scheduler once, but registered with multiple Triggers. |
More complex for short "schedule once" jobs. |
Configuración
Action Timeouts
La configuración HOCON puede proporcionarse (generalmente en Akka application.conf) para configurar la duración de timeout de cada action. Cuando la duración ha expirado, el flujo recibirá un Action Timeout event para esa Action configurada.
El formato de los elementos de configuración está actualmente en flujo y sujeto a cambios
El formato actual para la configuración es
ipf.flow.FlowName.StateName.ActionName.Type=[Duration|Integer]
-
FlowName : identificador del flujo o Any como comodín
-
StateName : identificador del estado o Any como comodín
-
ActionName : identificador de la acción o Any como comodín
-
Type : uno de timeout-duration, initial-retry-interval, o max-retries
Donde Duration es cualquier formato soportado por HOCON
ipf.flow.OBCreditTransfer.ValidatingSchemeRules.ValidateAgainstSchemeRules.timeout-duration=2s
Esto equivale a:
La action Validate Against Scheme Rules en el estado ValidatingSchemeRules en el flujo OB Credit Transfer expira si no se responde en 2 segundos.
Cada parte de la configuración también soporta la palabra clave Any que coincidirá con cualquier cosa para esa parte dada.
Es aplicable a flujos, estados y actions.
Anyipf.flow {
Any.Any.ValidateAgainstSchemeRules.timeout-duration=10s (1)
Any.CheckingFraud.CheckFraud.timeout-duration=20s (2)
}
| 1 | La action Validate Against Scheme Rules en cualquier estado de cualquier flujo expira si no se responde dentro de 10 segundos. |
| 2 | La action Check Fraud en el estado Checking Fraud en cualquier flujo expira si no se responde dentro de 20 segundos. |
Tipos de backoff y jitter
La configuración permite determinar diferentes tipos de backoff:
-
EXPONENTIAL: escalado 2^n (dondenes el retraso inicial). Este es el tipo por defecto. -
LINEAR: escalado 2n (dondenes el retraso inicial). -
USER_DEFINED: intervalos personalizados definidos en la configuración.
El jitter está habilitado por defecto, pero la configuración también permite deshabilitarlo en caso de que los reintentos sean tan grandes que el jitter agregue una cantidad significativa de delta. Imagina un reintento dentro de un día; el jitter para eso sería +/- 5 horas en cada dirección, lo que puede no ser deseable.
Así, por ejemplo, para configurar 5 intentos de la action CheckFraud sin jitter, reintentando inicialmente después de 10 segundos pero luego cada 30 minutos, la configuración sería:
ipf.flow {
Any.CheckingFraud.CheckFraud.max-retries = 4
Any.CheckingFraud.CheckFraud.backoff-type = "USER_DEFINED"
Any.CheckingFraud.CheckFraud.custom-backoffs = [10s,30m] (1)
Any.CheckingFraud.CheckFraud.jitter-enabled = false
}
| 1 | Usando el formato de duración de HOCON |
Otro ejemplo muestra un reintento lineal (con jitter presente, por lo que se omite porque está activado por defecto):
ipf.flow {
Any.CheckingFraud.CheckFraud.initial-retry-interval = 1000
Any.CheckingFraud.CheckFraud.max-retries = 4
Any.CheckingFraud.CheckFraud.backoff-type = "LINEAR"
}
Este ejemplo reintentará a 1/2/3/4 segundos.
Precedencia
La configuración más específica tiene precedencia; es decir, si coincide en las 3 partes (flow, state y action). Para las actions, cuando hay múltiples elementos de configuración que podrían aplicar, el estado más específico tendrá prioridad sobre la configuración más específica del flujo.
Flow: Flow1 State: State1 Action: Action1
Este sería el orden de precedencia de todas las posibles configuraciones que podrían aplicar a esta action:
1. Flow1.State1.Action1.timeout-duration 2. Any.State1.Action1.timeout-duration 3. Flow1.Any.Action1.timeout-duration 4. Any.Any.Action1.timeout-duration 5. Flow1.State1.Any.timeout-duration 6. Any.State1.Any.timeout-duration 7. Flow1.Any.Any.timeout-duration 8. Any.Any.Any.timeout-duration
Processing Time
Duraciones
También puede proporcionarse configuración HOCON (generalmente en Akka application.conf) para configurar cuánto tiempo se permite dedicar al paso entre un par de estados, independientemente del recorrido tomado para alcanzar el destino.
Cuando la duración haya expirado, el flujo recibirá un ProcessingTimeElapsedEvent para el estado de destino.
El formato actual para la configuración es
ipf.flow.<flow-name>.processing-time.<source-state>.<destination-state>.timeout-duration=Duration|Integer]
-
flow-name : identificador del flujo
-
source-state : identificador del estado
-
destination-state : el estado que el flujo debe alcanzar en el tiempo asignado
Donde Duration es cualquier formato soportado por HOCON
ipf.flow.OBCreditTransfer.processing-time.ValidatingAgainstSchemeRules.Complete.timeout-duration=2s
Esto equivale a:
El flujo producirá un processing time elapsed event si el tiempo desde el estado ValidatingAgainstSchemeRules hasta el estado Complete excede 2 segundos.
Offsets
Los offsets proporcionan un mecanismo mejorado para definir la duración del timeout. El processing time básico mira simplemente el tiempo entre dos estados. Sin embargo, puede ser necesario que el timeout considere, por ejemplo, una hora de inicio definida por el cliente. En este caso podemos usar offsets como una forma de enriquecer la duración.
Por ejemplo, si el cliente proporciona un accepted timestamp y el flujo debe completar dentro de 5 segundos de ese tiempo, en lugar de 5 segundos después de que el flujo comience, IPF puede usar el accepted timestamp como un offset. El flujo se configura con una duración de 5 segundos, y el timeout se activa en relación con el offset proporcionado.
Hay dos tipos de offset:
-
System Offsets: cada vez que IPF alcanza un nuevo estado, se crea un nuevo offset. -
Custom Offsets: son definidos por el usuario y pueden proporcionarse a IPF.
Cada offset tiene dos atributos: un offset-id único y la hora del offset en sí.
Para usar un offset en la configuración de scheduling, simplemente es necesario definir en la configuración el offset-id a usar. Esto se hace así:
ipf.flow.OBCreditTransfer.processing-time.ValidatingAgainstSchemeRules.Complete.offset-id=anOffsetId
Para proporcionar un offset personalizado a cualquier flujo, simplemente necesitamos proporcionarlo en la entrada mediante el método withFlowOffset.
También es posible proporcionar los offsets de un flujo a otro.
Esto es útil en relaciones padre-hijo donde se requiere que un offset abarque múltiples flujos.
Para ello, en todas las actions se proporciona el mapa de offsets del flujo actual como parámetro y simplemente puede pasarse al nuevo flujo mediante su método withOffsetMap.