DSL 8 - Versioning
|
Iniciando
El paso del tutorial utiliza la solución "handling_timeouts" del proyecto como su punto de partida. Si en algún momento desea ver la solución a este paso, ¡esta se puede encontrar en la solución "add_version"! |
Versioning
Suponga que tenemos un flujo en producción. Luego, llega un requisito para añadir un nuevo paso al flujo.- por lo que queremos que las transacciones en curso permanezcan en el flujo actual, pero que todas las nuevas transacciones comiencen a procesarse en el nuevo flujo actualizado. Podemos lograr esto mediante la versionado. Con el propósito de este tutorial, vamos a insertar un paso para llamar a un CSM servicio, pero también queremos preservar el flujo original. Vamos a hacer esto creando una nueva versión de nuestro flujo de ejecución y añadiendo el paso adicional solo en esa versión.
Una visión general completa de la versión de flujo está disponible en nuestro xref:core:flo-starter:guides/flow-versioning.adoc[Guía de versionado de flujo]. |
Versioning el Flujo
Es hora de establecer nuestra versionado, si observamos nuestro flujo actual veremos:
Aquí podemos ver que dice "No Versioning ". Procedamos a añadir versionado."
Comenzaremos estableciendo la versión actual en 1 y luego presionaremos ALT + ENTER en el flujo y seleccionaremos "Nueva Versión" del menú desplegable:
Si usted mira en nuestro navegador, ahora veremos:
Aquí elegimos cambiar la versión del flujo original a 1 en este punto. También podríamos haberlo dejado sin versión, en cuyo caso nuestra nueva versión se definiría como V1. En general, si usted sabe que un flujo es probable que sea versionado, puede ser mejor llamarlo versión 1 de inmediato, ya que hay un par de cambios menores en el código requeridos, como veremos en la sección de implementación. |
Abramos "Ipftutorialflow (V2)", lo que encontraremos es un clon exacto de nuestro flujo original, excepto que ahora tiene la versión especificada como V2 (nota-el flujo original ahora tiene la Versión especificada).
También puede ver aquí que un nuevo conjunto de mapping se han creado funciones en el nuevo flujo. Esto se debe a que las definimos a nivel de flujo. También es posible definirlas fuera de un flujo a través de una 'Biblioteca de Mapeo', en cuyo caso la misma mapping se utilizaría en ambos flujos. |
Ahora estamos listos para editar nuestro flujo de la versión dos. Sin embargo, antes de hacerlo, necesitamos crear el CSM servicio que permitirá ser utilizado en V2.
Definiendo el CSM Service
Lo primero que necesitamos hacer es llamar a nuestro CSM servicio. Nosotros:
-
Envíe el CSM a pacs.008
-
Reciba un pacs002 de la CSM - esto puede contener un código aceptado o rechazado.
Para esto necesitaremos un nuevo dominio externo y una solicitud definida en él. Es muy similar a lo que hicimos en DSL 4 - Uso de un dominio externo. Usted llamará a nuestra solicitud "Aclarar y liquidar".
Vea si puede configurarlo usted mismo, y la solución se encuentra a continuación:
Actualizando el Flujo V2
Así que, mientras que anteriormente teníamos lo siguiente " Event Comportamiento":
Cambiará esto para que, en lugar de pasar a "Completo" y generar nuestro evento adicional, pasemos a un nuevo estado "Claro y Liquidando" y luego llamemos a nuestra solicitud de Claro y Liquidación (definida en el CSM Service).
Si la solicitud de liquidación y compensación es exitosa, procederemos a completar y elevar nuestro evento. Si la liquidación y compensación falla, procederemos a Rechazado.
Intente agregar esas condiciones ahora y la solución se encuentra a continuación, recuerde que necesitaremos agregar los Estados apropiados,Event definiciones, Comportamiento de Entrada y por último Event¡comportamiento!
(Si necesita un resumen, revise:DSL 4 - Uso de un dominio externo)
Primero, aquí está la definición del estado:
Entonces necesitaremos un evento tanto para lo positivo como para lo negativo. CSM respuesta:
No olvide al configurar estos eventos, que el CSM el servicio nos está enviando de vuelta el pacs002 y queremos almacenar eso como datos empresariales.
A continuación, se presenta nuestro comportamiento de entrada, es muy estándar:
Y finalmente, ¡nuestro comportamiento de eventos!
Tenga en cuenta que hemos aplicado la lógica clara y de asentamiento tanto a los casos de "Omitir Fraude" como de "Fraude Requerido".
Eso es todo lo que necesitamos hacer, ahora hemos creado con éxito una nueva versión del flujo y la hemos actualizado para utilizar nuestro CSM Service. Si observamos el flujo para la versión 2, veremos:
Mientras que el flujo para la versión 1 permanece sin este paso.
Eso es todo nuestro trabajo de DSL, así que ahora pasemos a la parte de implementación.
Java Implementación
Como es habitual, comencemos nuestra implementación regenerando nuestra base de código.
mvn clean install
En este caso, la compilación debe fallar:
Si revisamos el IpfTutorialConfig.java, vemos:
Esto está fallando porque la función de agregado es única para un flujo, y ahora que hemos creado nuestra nueva versión, necesitamos indicarle a la aplicación cómo aplicar la función de agregado para cada flujo.- ya que es perfectamente posible que la lógica de implementación pueda cambiar. Sin embargo, en nuestro caso solo necesitamos hacer lo mismo, por lo que simplemente definiremos las nuevas funciones de la misma manera. Tenga en cuenta que las nuevas versiones se llamarán V1 y V2 respectivamente.
También hemos añadido el nuevo CSM Service, por lo que debemos recordar incluir eso en nuestra configuración de dominio. Para esto, simplemente utilizaremos el ejemplo proporcionado.
Intente actualizar y la solución se encuentra a continuación:
@Bean
public IpftutorialmodelDomain ipftutorialmodelDomain(ActorSystem actorSystem, SchedulerPort schedulerAdapter) {
// All adapters should be added to the domain model
return new IpftutorialmodelDomain. Builder(actorSystem)
.withTutorialDomainFunctionLibraryAdapter(input -> CompletableFuture.completedStage(new DuplicateCheckResponseInput. Builder(input.getId(), AcceptOrRejectCodes. Accepted).build()))
.withAccountingSystemActionAdapter(new SampleAccountingSystemActionAdapter())
.withFraudSystemActionAdapter(new FraudSystemActionAdapter())
.withDecisionLibraryAdapter(input ->
input.getCustomerCreditTransfer().getCdtTrfTxInf().get(0).getIntrBkSttlmAmt().getValue().compareTo(BigDecimal. TEN)>0?
RunFraudCheckDecisionOutcomes. FRAUDREQUIRED: RunFraudCheckDecisionOutcomes. SKIPFRAUD)
.withIpftutorialflowV1MappingAdapter(input -> new ExtractCreditorAccountForFlowIpftutorialflowV1MappingOutput(input.getCustomerCreditTransfer().getCdtTrfTxInf().get(0).getCdtrAcct()))
.withIpftutorialflowV2MappingAdapter(input -> new ExtractCreditorAccountForFlowIpftutorialflowV2MappingOutput(input.getCustomerCreditTransfer().getCdtTrfTxInf().get(0).getCdtrAcct()))
.withSchedulerAdapter(schedulerAdapter)
.withCSMServiceActionAdapter(new SampleCSMServiceActionAdapter())
.build();
}
Si usted revisa domain-root/domain/target/classes/com/iconsolutions/ipf/tutorial/ipftutorialmodel/domain/IpftutorialmodelDomain.class, puede ver las versiones V1 y V2 que deben ser llamadas. |
Eso es todo lo que debemos hacer para que nuestro código funcione ahora, pero antes de continuar, volvamos a nuestro controlador de iniciación (ipf-tutorial-app/src/main/java/com/iconsolutions/ipf/tutorial/app/controller/InitiationController.java) y consideremos cómo está funcionando:
return Mono.fromCompletionStage(IpftutorialmodelDomain.initiation()
.handle(new InitiateIpftutorialflowInput. Builder(entityId)
.withProcessingContext(ProcessingContext.builder()
.unitOfWorkId(unitOfWorkId)
.clientRequestId(clientRequestId)
.build())
.withCustomerCreditTransfer(dummyPacs008)
.build())
.thenApply(done -> InitiationResponse.builder()
.requestId(clientRequestId)
.uowId(unitOfWorkId)
.aggregateId(done.getAggregateId())
.build()));
Así que lo estamos enviando aquí, simplemente un "InitiateIpftutorialflowInput".-¿Cómo sabe la aplicación qué flujo utilizar? La respuesta es que, por defecto, siempre dirigirá el trabajo "Nuevo" al flujo más reciente (V2 en nuestro caso). Sin embargo, también podemos elegir a qué flujo enviar los datos.
Ahora, para fines de prueba, recordemos que tenemos un campo de versión en nuestro objeto InitiationRequest, utilicemos eso para poder seleccionar qué flujo enviar. Así que actualicemos nuestra lógica de iniciación para que sea como se muestra a continuación:
return Mono.fromCompletionStage(IpftutorialmodelDomain.initiation()
.handle(new InitiateIpftutorialflowInput. Builder(entityId)
.withProcessingContext(ProcessingContext.builder()
.unitOfWorkId(unitOfWorkId)
.clientRequestId(clientRequestId)
.build())
.withCustomerCreditTransfer(samplePacs008)
.withVersion(Objects.isNull(request) || Objects.isNull(request.getVersion())
? null
: "V1".equals(request.getVersion())? IpftutorialflowFlowVersions. V1: IpftutorialflowFlowVersions. V2)
.build())
.thenApply(done -> InitiationResponse.builder()
.requestId(clientRequestId)
.uowId(unitOfWorkId)
.aggregateId(done.getAggregateId())
.build()));
Aquí podemos ver que hemos añadido una línea para establecer la versión en la solicitud en función de la propiedad de versión que hemos proporcionado (_nota-el uso del enum IpftutorialflowFlowVersions que contiene "V1" y "V2".
|
Además de especificar la versión como lo hicimos anteriormente, también hay entradas explícitas (ipftutorialmodel/inputs) que puede utilizar. Así que en lugar de |
En el tutorial anterior configuramos ActionTimeouts con la siguiente configuración;
ipf.flow.Ipftutorialflow.CheckingFraud.CheckFraud.timeout-duration=2s
Aquí utilizamos el nombre del flujo, Ipftutorialflow, y funcionó correctamente. Sin embargo, con la versionado de nuestros flujos, esto ha creado en realidad 2 flujos de diferentes nombres (con comportamientos diferentes) y esto debe reflejarse en la configuración. Para cada flujo del cual desee mantener esta configuración, debe establecerlo de la siguiente manera (tenga en cuenta la adición de V1 y V2 a los nombres):
ipf.flow.IpftutorialflowV1.CheckingFraud.CheckFraud.timeout-duration=2s
ipf.flow.IpftutorialflowV2.CheckingFraud.CheckFraud.timeout-duration=2s
Usted puede así tener diferentes configuraciones por flujo.
O si desea tener lo mismo para todos los flujos, puede utilizar el comodín 'Any' (que aplicará la configuración para TODOS los flujos para esta acción CheckFraud);
ipf.flow.Any.CheckingFraud.CheckFraud.timeout-duration=2s
Verificando Nuestra Solución
Como es habitual, ahora verifiquemos que la solución funcione. Inicie la aplicación como se indicó anteriormente (las instrucciones están disponibles en Revisando la solicitud inicial si necesita un repaso!)
Comenzará enviando un pago sin especificar una versión.
curl -X POST localhost:8080/submit | jq
Ahora, lo primero que debe tener en cuenta es la respuesta:
{
"requestId": "c16a5c43-1038-4311-9d3f-8bf34efa0c81",
"uowId": "0945fe73-521c-478e-9f62-df4ac6393091",
"aggregateId": "IpftutorialflowV2|239f3e48-8d26-4a2f-8241-0997dc25f1c2"
}
Aquí podemos ver que hemos alcanzado nuestro flujo V2 desde el id agregado "Ipftutorialflow V2|..". Si ahora verificamos los eventos (¡recuerde actualizar el id agregado para que coincida con el suyo!):
Hablemos del pago en el Developer GUI y abra la vista de eventos de flujo (busque por id de unidad de trabajo, haga clic en ver) y deberíamos ver:
Aquí podemos ver que el flujo de proceso que se está ejecutando es, de hecho, nuestro flujo V2, y al observar la vista de eventos se muestra lo mismo.domain events_):
Ahora intentemos alcanzar nuestra versión V1:
curl -X POST localhost:8080/submit -H 'Content-Type: application/json' -d '{"version": "V1"}' | jq
Y nuevamente, al observar la respuesta:
{ "requestId": "492a0177-d9c3-4845-bc81-f54c9aae917d", "uowId": "d8cf8b99-448b-44e7-8207-a015dc41623a", "aggregateId": "IpftutorialflowV1|b0e19b4a-34f6-4584-8109-eef311fd2a13" }
Así que aquí podemos ver que hemos alcanzado nuestro flujo V1 desde el id agregado "Ipftutorialflow V1|..". Si ahora presentamos el pago en el Developer GUI y abra la vista de flujo (search by unit of work id, click view) y deberíamos ver:
Mostrándonos nuevamente que estamos siguiendo el flujo de proceso correcto. Si revisamos el domain events tab (haga clic domain events) entonces vemos que no hay CSM Service invocación:
Si lo desea, puede enviar ahora una solicitud de flujo V2 específicamente y ver que también funcione.
Manejo de Transacciones en Vuelo
Puede haber casos en los que una transacción se haya iniciado en V1 del flujo, pero esté en pausa esperando una respuesta/instrucción a un proceso de larga duración externo al flujo. Durante este tiempo, usted ha desplegado V2 del flujo, pero las transacciones en curso aún no se encuentran en un estado terminal.
Este escenario es perfectamente válido y cuando las transacciones antiguas en el flujo V1 se reanuden, continuarán en el flujo en el que comenzaron, en este caso el flujo V1. Cualquier nueva transacción que se haya iniciado desde la actualización se ejecutará en el flujo V2.
| No es posible que una transacción iniciada utilizando una versión de un flujo (por ejemplo, V1) sea reanudada utilizando otra (por ejemplo, V2). Debe reanudar el flujo en el flujo original (V1) o iniciar un nuevo flujo en la nueva versión (V2). |
Versioning y Actualizaciones Continuas
IPF y Flo Lang ya vienen con soporte básico para actualizaciones en línea. Al aprovechar Akka Cluster los roles de nodo bajo el capó, se garantiza que los flujos se inicien únicamente en nodos que los soporten y al enviar una entrada a un flujo,Akka se encargará de todo el enrutamiento necesario que debe aplicarse. Las transacciones en vuelo — una ocurrencia regular al realizar actualizaciones continuas — serán manejadas de la misma manera que se describe en Manejo de Transacciones en Vuelo, lo que resulta en que su aplicación procese ambas versiones del flujo durante un tiempo, antes de establecerse en una nueva versión.
(Figura: Actualización en línea comenzando a las 15:36:00 y finalizando a las 15:37:30)
Sin embargo, todavía hay escenarios que requieren la atención del desarrollador, y los cubrimos en las siguientes secciones.
Reprocesamiento de Mensajes
La entrega exactamente una vez es muy difícil de lograr en sistemas distribuidos y, incluso cuando se puede alcanzar, conlleva una penalización de rendimiento que a menudo es demasiado costosa de asumir. La alternativa es adoptar al menos una vez la semántica de entrega y asegurar que la lógica de procesamiento de mensajes sea idempotente.
Mientras que el reprocesamiento de mensajes puede causar problemas incluso cuando no se involucra la versionación — por ejemplo, cuando un ID de flujo (entityId en nuestros ejemplos de solución) es un ID único seudoaleatorio — los problemas causados por la falta de idempotencia son más propensos a surgir cuando usted realiza actualizaciones progresivas con múltiples versiones presentes.
Por defecto, si un mensaje iniciador se vuelve a procesar durante las actualizaciones continuas, incluso cuando utiliza un valor derivado del mensaje como su entityId puede causar que se inicien flujos duplicados — la primera vez para la versión de flujo Vx y la versión de flujo Vy al reprocesar el mensaje.
Para protegerse contra este escenario, usted puede optar por utilizar los mecanismos de idempotencia que vienen integrados con Flo Lang y que aprovechan un CorrelationService implementación bajo el capó.
Uso de los guardias de idempotencia integrados
Primero, necesitamos agregar una dependencia a un CorrelationService implementación. Si decide utilizar su propia implementación, asegúrese de que los datos en el almacenamiento de respaldo del servicio sobrevivan a un reinicio del nodo. Para utilizar un MongoDB-implementación respaldada que viene con IPF, añada esto a la ipf-tutorial-app’s `pom.xml:
<dependency>
<groupId>com.iconsolutions.ipf.core.connector</groupId>
<artifactId>connector-correlation-starter-mongodb</artifactId>
</dependency>
A continuación, necesitamos integrar esta implementación en su dominio:
@Bean
public IpftutorialmodelDomain init(ActorSystem<?> actorSystem,
SchedulerPort schedulerAdapter,
CorrelationService correlationService) {
return new IpftutorialmodelDomain. Builder(actorSystem)
.withTutorialDomainFunctionLibraryAdapter(input -> CompletableFuture.completedStage(new DuplicateCheckResponseInput. Builder(input.getId(), AcceptOrRejectCodes. Accepted).build()))
.withAccountingSystemActionAdapter(new SampleAccountingSystemActionAdapter())
.withFraudSystemActionAdapter(new FraudSystemActionAdapter())
.withDecisionLibraryAdapter(input ->
input.getCustomerCreditTransfer().getCdtTrfTxInf().get(0).getIntrBkSttlmAmt().getValue().compareTo(BigDecimal. TEN) > 0?
RunFraudCheckDecisionOutcomes. FRAUDREQUIRED: RunFraudCheckDecisionOutcomes. SKIPFRAUD)
.withIpftutorialflowV1MappingAdapter(input -> new ExtractCreditorAccountForFlowIpftutorialflowV1MappingOutput(input.getCustomerCreditTransfer().getCdtTrfTxInf().get(0).getCdtrAcct()))
.withIpftutorialflowV2MappingAdapter(input -> new ExtractCreditorAccountForFlowIpftutorialflowV2MappingOutput(input.getCustomerCreditTransfer().getCdtTrfTxInf().get(0).getCdtrAcct()))
.withSchedulerAdapter(schedulerAdapter)
.withCSMServiceActionAdapter(new SampleCSMServiceActionAdapter())
// idempotency guards will be applied against this correlation service
.withCorrelationService(correlationService)
.build();
}
Los guardias de idempotencia integrados en las entradas de iniciación funcionan de manera similar, pero ligeramente diferente a los guardias de idempotencia para otras entradas. A diferencia de la idempotencia de entrada general, que necesita ser capaz de distinguir entre envíos duplicados y reprocesamiento de mensajes (para lo cual utiliza el (inputId, physicalMessageId) tuple), la idempotencia de la entrada de iniciación inspeccionará varios campos --physicalMessageId,inputId,processingContext.unitOfWorkId, en ese orden — elija el primero que tenga un valor y utilice ese valor para determinar duplicados.
Si physicalMessageId en la entrada se completa a partir de la ReceivingContext, protegerá contra el reprocesamiento de mensajes en transportes que permiten que un único mensaje físico sea consumido dos veces (Kafka,JMS o de otro middleware de mensajería). En caso de HTTP, un solo mensaje no puede ser reprocesado y los mensajes generalmente se reenvían. Para protegerse contra duplicados de mensajes — independientemente del transporte subyacente — se debe completar el inputId se aconseja un valor derivado del mensaje.
|
| Una vez que haya elegido un valor para poblar su campo de idempotencia, ya no podrá cambiarlo en versiones futuras sin comprometer las salvaguardas de idempotencia y aceptar posibles duplicados, o renunciar a un despliegue de actualización continua para esa versión. |
Como paso final, poblamos uno de los campos habilitados para idempotencia en la entrada:
return Mono.fromCompletionStage(IpftutorialmodelDomain.initiation()
.handle(new InitiateIpftutorialflowInput. Builder(entityId)
.withProcessingContext(ProcessingContext.builder()
.unitOfWorkId(unitOfWorkId)
.clientRequestId(clientRequestId)
.build())
.withCustomerCreditTransfer(samplePacs008)
// use clientRequestId for idempotency guards
.withInputId(clientRequestId)
.withVersion(Objects.isNull(request) || Objects.isNull(request.getVersion())
? null
: "V1".equals(request.getVersion())? IpftutorialflowFlowVersions. V1: IpftutorialflowFlowVersions. V2)
.build())
.thenApply(done -> InitiationResponse.builder()
.requestId(clientRequestId)
.uowId(unitOfWorkId)
.aggregateId(done.getAggregateId())
.build()));
Se recomienda utilizar entradas de iniciación no versionadas cuando se confíe en las garantías de idempotencia integradas, ya que le permitirán iniciar la versión que se utilizó en la solicitud original. Si utiliza entradas versionadas y los intentos posteriores no coinciden con la versión del intento original — por ejemplo, el intento original fue para V1, mientras que los posteriores son V2 — todos los eventos subsiguientes fallarán con un IllegalStateException.
|
Restricciones Causadas por el Manejo de Respuestas Externas
Al manejar respuestas externas — ya sea consumiendo mensajes de intermediarios de mensajes o recibiendo HTTP solicitudes — las restricciones impuestas por las actualizaciones continuas son las mismas:
-
Los inputs creados como resultado del manejo no deben cambiar entre versiones — al realizar actualizaciones continuas, ambas versiones de su aplicación serán responsables de procesar respuestas de sistemas externos y, al recibir una respuesta destinada a la nueva versión, la versión anterior de la aplicación debe ser capaz de enviar un input que la versión V2 del flujo entenderá.
-
si los inputs deben cambiar, depende de usted como desarrollador asegurarse de que la versión anterior de la aplicación no maneje respuestas destinadas a nuevas versiones del flujo — alcanzable mediante el uso de receive connector la capacidad de filtrado, dedicando diferentes temas/colas a las respuestas destinadas a diferentes versiones del flujo, configurando reglas de enrutamiento en el equilibrador de carga para asegurar HTTP las solicitudes se dirigen a las versiones adecuadas del servicio, etc.