DSL 8 - Versioning
|
Iniciando
El paso del tutorial utiliza la solución "handling_timeouts" del proyecto como su punto de partida. Si en algún momento desea ver la solución a este paso, ¡esta se puede encontrar en la solución "add_version"! |
Versioning
Suponga que tenemos un flujo en producción. Luego, llega un requisito para añadir un nuevo paso al flujo.- por lo que queremos que las transacciones en curso permanezcan en el flujo actual, pero todas las nuevas transacciones deben comenzar a procesarse en el nuevo flujo actualizado. Podemos lograr esto mediante versioning. Con el propósito de este tutorial, vamos a insertar un paso para llamar a un CSM service, pero también queremos preservar el flujo original. Vamos a hacer esto creando una nueva versión de nuestro flujo de ejecución y añadiendo el paso adicional solo en esa versión.
Una visión general completa del flujo versioning está disponible en nuestro xref:core:flo-starter:guides/flow-versioning.adoc[Flujo versioning guía]. |
Versioning el Flujo
Tiempo para configurar nuestro versioning, si observamos nuestro flujo actual veremos:
Aquí podemos ver que dice "No Versioning ". Vamos a continuar y añadir versioning.
Comenzaremos estableciendo la versión actual en 1 y luego presionaremos ALT + ENTER en el flujo y seleccionaremos "Nueva Versión" del menú desplegable:
Si usted mira en nuestro navegador, ahora veremos:
Aquí elegimos cambiar la versión del flujo original a 1 en este punto. También podríamos haberlo dejado sin versión, en cuyo caso nuestra nueva versión se definiría como V1. En general, si usted sabe que un flujo es probable que sea versionado, puede ser mejor llamarlo versión 1 de inmediato, ya que hay un par de cambios menores en el código requeridos, como veremos en la sección de implementación. |
Abramos "Ipftutorialflow (V2)", lo que encontraremos es un clon exacto de nuestro flujo original, excepto que ahora tiene la versión especificada como V2 (nota-el flujo original ahora tiene la Versión especificada).
También puede ver aquí que un nuevo conjunto de mapping se han creado funciones en el nuevo flujo. Esto se debe a que las definimos a nivel de flujo. También es posible definirlas fuera de un flujo a través de un 'Mapping'Biblioteca' en cuyo caso el mismo mapping se utilizaría en ambos flujos. |
Ahora estamos listos para editar nuestro flujo de la versión dos. Sin embargo, antes de hacerlo, necesitamos crear el CSM service que hará ser utilizado en V2.
Definiendo el CSM Service
Lo primero que necesitamos hacer es llamar a nuestro CSM service. Nosotros:
-
Envíe el CSM a pacs. 008
-
Reciba un pacs002 de la CSM - esto puede contener un código aceptado o rechazado.
Para esto necesitaremos un nuevo dominio externo y una solicitud definida en él. Es muy similar a lo que hicimos en DSL 4 - Uso de un dominio externo. Usted llamará a nuestra solicitud "Aclarar y liquidar".
Vea si puede configurarlo usted mismo, y la solución se encuentra a continuación:
Actualizando el Flujo V2
Así que, mientras que anteriormente teníamos lo siguiente " Event Comportamiento":
Cambiará esto para que en lugar de pasar a "Completo" y aumentar nuestro adicional event, nos moveremos a un nuevo estado "Claro y Liquidando" y luego llamaremos a nuestra solicitud de Claro y Liquidación (definida en el CSM Service).
Si la solicitud de aclaración y resolución es exitosa, procederemos a completar y elevar nuestro event. Si la liquidación y el acuerdo fallan, pasaremos a Rechazado.
Intente agregar esas condiciones ahora y la solución se encuentra a continuación, recuerde que necesitaremos agregar los Estados apropiados,Event definiciones, Comportamiento de Entrada y por último Event¡comportamiento!
(Si necesita un resumen, revise:DSL 4 - Uso de un dominio externo)
Primero, aquí está el state definición:
Entonces necesitaremos un event para ambos, positivo y negativo CSM respuesta:
No olvide al configurar estos events, que el CSM service está enviándonos de vuelta el pacs002 y queremos almacenar eso como business data!
A continuación, se presenta nuestro comportamiento de entrada, es muy estándar:
Y finalmente nuestro event¡comportamiento!
Tenga en cuenta que hemos aplicado la lógica clara y de asentamiento tanto a los casos de "Omitir Fraude" como de "Fraude Requerido".
Eso es todo lo que necesitamos hacer, ahora hemos creado con éxito una nueva versión del flujo y la hemos actualizado para utilizar nuestro CSM Service. Si observamos el flujo para la versión 2, veremos:
Mientras que el flujo para la versión 1 permanece sin este paso.
Eso es todo nuestro trabajo de DSL, así que ahora pasemos a la parte de implementación.
Java Implementación
Como es habitual, comencemos nuestra implementación regenerando nuestra base de código.
mvn clean install
En este caso, la compilación debería fallar:
Si revisamos el IpfTutorialConfig.java vemos:
Esto está fallando porque el aggregate function es único para un flujo, y ahora que hemos creado nuestra nueva versión, necesitamos indicarle a la aplicación cómo aplicar el aggregate function para cada flujo-ya que es perfectamente posible que la lógica de implementación pueda cambiar. Sin embargo, en nuestro caso solo necesitamos hacer lo mismo, por lo que simplemente definiremos las nuevas funciones de la misma manera. Tenga en cuenta que las nuevas versiones se llamarán V1 y V2 respectivamente.
También hemos añadido el nuevo CSM Service, por lo que debemos recordar incluir eso en nuestra configuración de dominio. Para esto, simplemente utilizaremos el ejemplo proporcionado.
Intente actualizar y la solución se encuentra a continuación:
@Bean
public IpftutorialmodelDomain ipftutorialmodelDomain(ActorSystem actorSystem, SchedulerPort schedulerAdapter) {
// All adapters should be added to the domain model
return new IpftutorialmodelDomain. Builder(actorSystem)
.withTutorialDomainFunctionLibraryAdapter(input -> CompletableFuture.completedStage(new DuplicateCheckResponseInput. Builder(input.getId(), AcceptOrRejectCodes. Accepted).build()))
.withAccountingSystemActionAdapter(new SampleAccountingSystemActionAdapter())
.withFraudSystemActionAdapter(new FraudSystemActionAdapter())
.withDecisionLibraryAdapter(input ->
input.getCustomerCreditTransfer().getCdtTrfTxInf().get(0).getIntrBkSttlmAmt().getValue().compareTo(BigDecimal. TEN)>0?
RunFraudCheckDecisionOutcomes. FRAUDREQUIRED: RunFraudCheckDecisionOutcomes. SKIPFRAUD)
.withIpftutorialflowV1MappingAdapter(input -> new ExtractCreditorAccountForFlowIpftutorialflowV1MappingOutput(input.getCustomerCreditTransfer().getCdtTrfTxInf().get(0).getCdtrAcct()))
.withIpftutorialflowV2MappingAdapter(input -> new ExtractCreditorAccountForFlowIpftutorialflowV2MappingOutput(input.getCustomerCreditTransfer().getCdtTrfTxInf().get(0).getCdtrAcct()))
.withSchedulerAdapter(schedulerAdapter)
.withCSMServiceActionAdapter(new SampleCSMServiceActionAdapter())
.build();
}
Si usted revisa domain-root/domain/target/classes/com/iconsolutions/ipf/tutorial/ipftutorialmodel/domain/IpftutorialmodelDomain.class, puede ver las versiones V1 y V2 que deben ser llamadas. |
Eso es todo lo que debemos hacer para que nuestro código funcione ahora, pero antes de continuar, volvamos a nuestro controlador de iniciación (ipf-tutorial-app/src/main/java/com/iconsolutions/ipf/tutorial/app/controller/InitiationController.java) y considere cómo está funcionando:
return Mono.fromCompletionStage(IpftutorialmodelDomain.initiation()
.handle(new InitiateIpftutorialflowInput. Builder(entityId)
.withProcessingContext(ProcessingContext.builder()
.unitOfWorkId(unitOfWorkId)
.clientRequestId(clientRequestId)
.build())
.withCustomerCreditTransfer(dummyPacs008)
.build())
.thenApply(done -> InitiationResponse.builder()
.requestId(clientRequestId)
.uowId(unitOfWorkId)
.aggregateId(done.getAggregateId())
.build()));
Así que lo estamos enviando aquí, simplemente un común "InitiateIpftutorialflowInput".-¿Cómo sabe la aplicación qué flujo utilizar? La respuesta es que, por defecto, siempre dirigirá el trabajo "Nuevo" al flujo más reciente (V2 en nuestro caso). Sin embargo, también podemos elegir a qué flujo enviar los datos.
Ahora, para fines de prueba, recordemos que tenemos un campo de versión en nuestro objeto InitiationRequest, utilicemos eso para poder seleccionar qué flujo enviar. Por lo tanto, actualicemos nuestra lógica de iniciación para que sea como se muestra a continuación:
return Mono.fromCompletionStage(IpftutorialmodelDomain.initiation()
.handle(new InitiateIpftutorialflowInput. Builder(entityId)
.withProcessingContext(ProcessingContext.builder()
.unitOfWorkId(unitOfWorkId)
.clientRequestId(clientRequestId)
.build())
.withCustomerCreditTransfer(samplePacs008)
.withVersion(Objects.isNull(request) || Objects.isNull(request.getVersion())
? null
: "V1".equals(request.getVersion())? IpftutorialflowFlowVersions. V1: IpftutorialflowFlowVersions. V2)
.build())
.thenApply(done -> InitiationResponse.builder()
.requestId(clientRequestId)
.uowId(unitOfWorkId)
.aggregateId(done.getAggregateId())
.build()));
Aquí podemos ver que hemos añadido una línea para establecer la versión en la solicitud en función de la propiedad de versión que hemos proporcionado (_nota-el uso del enum IpftutorialflowFlowVersions que contiene "V1" y "V2".
Además de especificar la versión como lo hicimos anteriormente, también hay entradas explícitas _(ipftutorialmodel/inputs)_ que puede utilizar. Así que en lugar de `InitiateIpfTutorialflowInput` en lo anterior podría haber utilizado `InitiateIpfTutorialflowV1Input` por ejemplo. |
En el tutorial anterior configuramos ActionTimeouts con la siguiente configuración;
ipf flow. Ipftutorialflow. CheckingFraud. CheckFraud.timeout-duration=2s
Aquí utilizamos el nombre del flujo, Ipftutorialflow, y funcionó perfectamente. Sin embargo, con el versioning de nuestros flujos, esto ha creado en realidad 2 flujos de diferentes nombres (con diferentes behaviours) y esto debe reflejarse en la configuración. Para cada flujo que desee mantener esta configuración, debe establecerlo de la siguiente manera (note la adición de V1 y V2 a los nombres):
ipf flow. IpftutorialflowV1. CheckingFraud. CheckFraud.timeout-duration=2s
ipf flow. IpftutorialflowV2. CheckingFraud. CheckFraud.timeout-duration=2s
Usted puede así tener diferentes configuraciones por flujo.
O si desea tener lo mismo para todos los flujos, puede utilizar el comodín 'Any' (que aplicará la configuración para TODOS los flujos para esta acción CheckFraud);
ipf flow. Any. CheckingFraud. CheckFraud.timeout-duration=2s
Verificando Nuestra Solución
Como es habitual, ahora verifiquemos que la solución funcione. Inicie la aplicación como se indicó anteriormente (instructions están disponibles en Revisando la solicitud inicial si necesita un repaso!)
Comenzará enviando un pago sin especificar una versión.
curl -X POST localhost:8080/submit | jq
Ahora, lo primero que debe tener en cuenta es la respuesta:
{
"requestId": "c16a5c43-1038-4311-9d3f-8bf34efa0c81",
"uowId": "0945fe73-521c-478e-9f62-df4ac6393091",
"aggregateId": "IpftutorialflowV2|239f3e48-8d26-4a2f-8241-0997dc25f1c2"
}
Aquí podemos ver que hemos alcanzado nuestro flujo V2 desde el id agregado "Ipftutorialflow V2|..". Si ahora verificamos nuevamente el events(recuerde actualizar el id agregado para que coincida con el suyo!):
Hablemos del pago en el Developer GUI y eleve el flujo events ver (buscar por unit of work id, haga clic en ver) y deberíamos ver:
Aquí podemos ver que el process flow se está ejecutando es efectivamente nuestro flujo V2, y al observar el events vista muestra lo mismo (haga clic domain events):
Ahora intentemos alcanzar nuestra versión V1:
curl -X POST localhost:8080/submit -H 'Content-Type: application/json' -d '{"version": "V1"}' | jq
Y nuevamente, al observar la respuesta:
{ "requestId": "492a0177-d9c3-4845-bc81-f54c9aae917d", "uowId": "d8cf8b99-448b-44e7-8207-a015dc41623a", "aggregateId": "IpftutorialflowV1|b0e19b4a-34f6-4584-8109-eef311fd2a13" }
Aquí podemos ver que hemos alcanzado nuestro flujo V1 desde el id agregado "Ipftutorialflow V1|..". Si ahora presentamos el pago en el Developer GUI y muestre la vista de flujo (search by unit of work id, haga clic en ver) y deberíamos ver:
Mostrándonos nuevamente que estamos acertando en lo correcto process flow. Si revisamos el domain events tab (haga clic domain events) entonces vemos que no hay CSM Service invocación:
Si lo desea, puede enviar ahora una solicitud de flujo V2 específicamente y ver que también funcione.
Manejo de Transacciones en Vuelo
Puede haber casos en los que una transacción se haya iniciado en V1 del flujo, pero esté en pausa esperando una respuesta/instrucción de un proceso externo de larga duración. Durante este tiempo, usted ha desplegado V2 del flujo, pero las transacciones en curso aún no se encuentran en un estado terminal.state.
Este escenario es perfectamente válido y cuando las transacciones antiguas en el flujo V1 se reanuden, continuarán en el flujo en el que comenzaron, en este caso el flujo V1. Cualquier nueva transacción que se haya iniciado desde la actualización se ejecutará en el flujo V2.
| No es posible que una transacción iniciada utilizando una versión de un flujo (por ejemplo, V1) sea reanudada utilizando otra (por ejemplo, V2). Debe reanudar el flujo en el flujo original (V1) o iniciar un nuevo flujo en la nueva versión (V2). |
Versioning y Rolling Upgrades
IPF y Flo Lang ya vienen con soporte básico para rolling upgrades. Al aprovechar Akka Cluster los roles de nodo bajo el capó, se garantiza que los flujos se inicien únicamente en nodos que los soporten y al enviar una entrada a un flujo,Akka se encargará de todo el enrutamiento necesario que debe aplicarse. Las transacciones en vuelo — una ocurrencia regular al realizar rolling upgrades-- se manejará de la misma manera que se describe en Manejo de Transacciones en Vuelo, lo que resulta en que su aplicación procese ambas versiones del flujo durante un tiempo, antes de establecerse en una nueva versión.
(Figura: Actualización en línea comenzando a las 15:36:00 y finalizando a las 15:37:30)
Sin embargo, todavía hay escenarios que requieren la atención del desarrollador, y los cubrimos en las siguientes secciones.
Reprocesamiento de Mensajes
La entrega exactamente una vez es muy difícil de lograr en sistemas distribuidos y, incluso cuando se puede conseguir, conlleva una penalización de rendimiento que a menudo es demasiado costosa de asumir. La alternativa es adoptar al menos una vez la semántica de entrega y asegurar que la lógica de procesamiento de mensajes sea idempotente.
Mientras que el reprocesamiento de mensajes puede causar problemas incluso cuando versioning no está involucrado — p. ej. cuando un ID de flujo (entityId en nuestros ejemplos de solución) es un ID único seudorrandom — los problemas causados por la falta de idempotencia son más propensos a surgir cuando usted está realizando rolling upgrades con múltiples versiones presentes.
Por defecto, si un mensaje de inicio se vuelve a procesar durante rolling upgrades, incluso cuando utiliza un valor derivado del mensaje como su entityId puede causar que se inicien flujos duplicados — la primera vez para la versión de flujo Vx y la versión de flujo Vy al reprocesar el mensaje.
Para protegerse contra este escenario, usted puede optar por utilizar los mecanismos de idempotencia que vienen integrados con Flo Lang y que aprovechan un CorrelationService implementación bajo el capó.
Uso de los guardias de idempotencia integrados
Primero, necesitamos agregar una dependencia a un CorrelationService implementación. Si decide utilizar su propia implementación, por favor asegúrese de que los datos en el almacenamiento de respaldo del servicio sobrevivan a un nodo restart. Para utilizar un MongoDB- implementación respaldada que viene con IPF, añada esto a la ipf-tutorial-app’s `pom.xml:
<dependency>
<groupId>com.iconsolutions.ipf.core.connector</groupId>
<artifactId>connector-correlation-starter-mongodb</artifactId>
</dependency>
A continuación, necesitamos integrar esta implementación en su dominio:
@Bean
public IpftutorialmodelDomain init(ActorSystem<?> actorSystem,
SchedulerPort schedulerAdapter,
CorrelationService correlationService) {
return new IpftutorialmodelDomain. Builder(actorSystem)
.withTutorialDomainFunctionLibraryAdapter(input -> CompletableFuture.completedStage(new DuplicateCheckResponseInput. Builder(input.getId(), AcceptOrRejectCodes. Accepted).build()))
.withAccountingSystemActionAdapter(new SampleAccountingSystemActionAdapter())
.withFraudSystemActionAdapter(new FraudSystemActionAdapter())
.withDecisionLibraryAdapter(input ->
input.getCustomerCreditTransfer().getCdtTrfTxInf().get(0).getIntrBkSttlmAmt().getValue().compareTo(BigDecimal. TEN) > 0?
RunFraudCheckDecisionOutcomes. FRAUDREQUIRED: RunFraudCheckDecisionOutcomes. SKIPFRAUD)
.withIpftutorialflowV1MappingAdapter(input -> new ExtractCreditorAccountForFlowIpftutorialflowV1MappingOutput(input.getCustomerCreditTransfer().getCdtTrfTxInf().get(0).getCdtrAcct()))
.withIpftutorialflowV2MappingAdapter(input -> new ExtractCreditorAccountForFlowIpftutorialflowV2MappingOutput(input.getCustomerCreditTransfer().getCdtTrfTxInf().get(0).getCdtrAcct()))
.withSchedulerAdapter(schedulerAdapter)
.withCSMServiceActionAdapter(new SampleCSMServiceActionAdapter())
// idempotency guards will be applied against this correlation service
.withCorrelationService(correlationService)
.build();
}
Los guardias de idempotencia integrados en las entradas de iniciación funcionan de manera similar, pero ligeramente diferente a los guardias de idempotencia para otras entradas. A diferencia de la idempotencia de entrada general, que debe ser capaz de distinguir entre envíos duplicados y reprocesamiento de mensajes (para lo cual utiliza el (inputId, physicalMessageId) tuple), la idempotencia de la entrada de iniciación inspeccionará varios campos --physicalMessageId,inputId,processingContext.unitOfWorkId, en ese orden — elija el primero que tenga un valor y utilice ese valor para determinar duplicados.
Si physicalMessageId en la entrada se completa a partir de la ReceivingContext, protegerá contra el reprocesamiento de mensajes en transportes que permiten que un único mensaje físico sea consumido dos veces (Kafka,JMS o de otro middleware de mensajería). En caso de HTTP, un solo mensaje no puede ser reprocesado y los mensajes generalmente se reenvían. Para protegerse contra duplicados de mensajes — independientemente del transporte subyacente — llenando el inputId se aconseja un valor derivado del mensaje.
|
| Una vez que haya elegido un valor para poblar su campo de idempotencia, ya no podrá cambiarlo en versiones futuras sin comprometer las salvaguardas de idempotencia y aceptar posibles duplicados, o renunciar a una implementación de actualización continua para esa versión. |
Como paso final, poblamos uno de los campos habilitados para idempotencia en la entrada:
return Mono.fromCompletionStage(IpftutorialmodelDomain.initiation()
.handle(new InitiateIpftutorialflowInput. Builder(entityId)
.withProcessingContext(ProcessingContext.builder()
.unitOfWorkId(unitOfWorkId)
.clientRequestId(clientRequestId)
.build())
.withCustomerCreditTransfer(samplePacs008)
// use clientRequestId for idempotency guards
.withInputId(clientRequestId)
.withVersion(Objects.isNull(request) || Objects.isNull(request.getVersion())
? null
: "V1".equals(request.getVersion())? IpftutorialflowFlowVersions. V1: IpftutorialflowFlowVersions. V2)
.build())
.thenApply(done -> InitiationResponse.builder()
.requestId(clientRequestId)
.uowId(unitOfWorkId)
.aggregateId(done.getAggregateId())
.build()));
Es recomendable utilizar entradas de iniciación no versionadas cuando se confíe en las garantías de idempotencia integradas, ya que le permitirán iniciar la versión que se utilizó en la solicitud original. Si utiliza entradas versionadas y los intentos posteriores no coinciden con la versión del intento original — por ejemplo, el intento original fue para V1, mientras que los posteriores son V2 — todos los intentos posteriores events fallará con un IllegalStateException.
|
Restrictions Causado por el Manejo de Respuestas Externas
Al manejar external responses-- ya sea consumiendo mensajes de intermediarios de mensajes o recibiendo HTTP requests — the restrictions impuesto por rolling upgrades son los mismos:
-
los inputs creados como resultado del manejo no deben cambiar entre versiones — al realizar rolling upgrades, ambas versiones de su aplicación serán responsables de procesar respuestas de sistemas externos y, al recibir una respuesta destinada a la nueva versión, la versión antigua de la aplicación debe ser capaz de enviar una entrada que la versión V2 del flujo entenderá.
-
si los inputs deben cambiar, depende de usted como desarrollador asegurarse de que la versión anterior de la aplicación no maneje respuestas destinadas a nuevas versiones del flujo — alcanzable mediante el uso de receive connector la capacidad de filtrado de, dedicando diferentes temas/colas a las respuestas destinadas a diferentes versiones del flujo, configurando reglas de enrutamiento en el equilibrador de carga para asegurar HTTP las solicitudes se dirigen a las versiones adecuadas del servicio, etc.