Ejecución de Flujo Sincrónico
Si bien IPF es principalmente asincrónico y basado en mensajes, ocasionalmente puede haber un requisito para ejecutar un flujo. sincronizadamente. El flujo podría ser:
-
Inicie un IPF flow usando un HTTP solicitud
-
Espere hasta que el flujo (o flujos) se complete(n)
-
Envíe un HTTP respuesta cuando el flujo se completa con éxito o hay un fallo.
Este documento explica algunas maneras de implementar este patrón.
|
¡No es seguro para producción!
Este enfoque es bueno para probar o experimentar con IPF, pero no es una solución lista para producción, porque solo puede trabaje con un solo nodo. Consulte Consideraciones de resiliencia y Alternativas a Este Enfoque. |
Concepto
La idea es implementar un Request Tracker que haga referencia a una lista indexada por el unit of work ID y cuyo valor es a `CompletionStage`que se completa cuando el flujo termina (con éxito o no).
Solicitante de seguimiento
Considere la siguiente implementación de un Request Tracker:
.RequestTracker.java
public class RequestTracker<T> {
private final Map<UnitOfWorkId, CompletionStage<T>> OUTSTANDING_REQUESTS = new ConcurrentHashMap<>();
public CompletionStage<T> track(UnitOfWorkId unitOfWorkId) {
CompletionStage<T> completionStage = new CompletableFuture<>();
OUTSTANDING_REQUESTS.put(unitOfWorkId, completionStage);
return completionStage;
}
public boolean complete(UnitOfWorkId unitOfWorkId, T result) {
return OUTSTANDING_REQUESTS.remove(unitOfWorkId).toCompletableFuture().complete(result);
}
}
Cuando se inicia un flujo, podemos llamar track con el UnitOfWorkId, y cuando este flujo se completa (o falla) llamamos
complete. Tenga en cuenta que complete llamadas docs.oracle.com/en/java/javase/21/docs/api/java.base/java/util/Map.html#remove(java.lang. Object)[Map#remove] cuál
ayuda a prevenir la acumulación de futuros completados y permite la recolección de basura de solicitudes completadas.
Tenga en cuenta que el valor del futuro a ser devuelto está parametrizado.T) que le permite crear múltiples
RequestTracker s si es necesario, y no estar atado a un marco específico.
Si no tiene un requisito para cualquiera de estas cosas, entonces puede reemplazar el argumento parametrizado con el implementación concreta que desea utilizar.
Configuración del flujo
Escriba el flujo como normal. Los únicos cambios son:
-
Agregue una notificación final para completar el futuro (que es un gancho para permitirnos llamar el
RequestTracker#completemétodo) -
Añada un
FlowErrorExtensionsimplementación que también tomaRequestTrackerque nos permite completarlo con un respuesta de fallo
Aquí está el flujo de ejemplo:
Código
Aquí están los fragmentos de código para registrar un UnitOfWorkId con el RequestTracker. El ejemplo a continuación utiliza Spring
Marco y ResponseEntity, pero este patrón puede ser adaptado para cualquier otro marco.
Paso 1: Registre un Unit of Work ID
Cuando recibimos un HTTP solicitud para iniciar un flujo, registramos nuestra ejecución de la siguiente manera:
public class MyController {
private final RequestTracker<ResponseEntity<?>> requestTracker;
@PostMapping("/submit")
public CompletionStage<ResponseEntity<?>> submit(@RequestBody Body myBody) {
var processingContext =.. (1)
var future = requestTracker.track(processingContext.getUnitOfWorkId()); (2)
return HttpflowmodelDomain.initiation().handle(
new InitiateExampleFlowInput. Builder()
.withProcessingContext(processingContext)
.withPaymentJourneyType("PAYMENT")
.withCustomerCreditTransfer(cct)
.build())
.thenCompose(__ -> future); (3)
}
}
| 1 | Construya el ProcessingContext según corresponda |
| 2 | Registre el ProcessingContext’s Unit of Work ID con el RequestTracker |
| 3 | Devuelva el futuro (aún no completado) al llamador |
Paso 2: Complete el futuro cuando sea notificado
El fragmento a continuación es la implementación de la notificación "Notificar Finalización" del flujo anterior:
public class SampleActionAdapter implements SampleActionPort {
private final RequestTracker<ResponseEntity<?>> requestTracker;
public SampleActionAdapter(RequestTracker<ResponseEntity<?>> requestTracker) {
this.requestTracker = requestTracker;
}
@Override
public CompletionStage<Void> execute(NotifyCompletionAction action) {
return CompletableFuture.runAsync(() -> requestTracker.complete(
action.getProcessingContext().getUnitOfWorkId(), (1)
ResponseEntity.ok(action.getPaymentStatusReport()) (2)
));
}
}
| 1 | El UnitOfWorkId suministrado por el flujo a través de NotifyCompletionAction |
| 2 | El resultado completado (en este caso un pacs. 002 Informe de Estado de Pago, pero puede ser cualquier otro tipo. |
Paso 3: Manejador de Errores General
Puede manejar escenarios en los que se lanza una excepción en una etapa del flujo proporcionando un controlador de errores global en su flujo. definición de dominio.
Para hacer esto, pase un custom FlowErrorExtensions implementación a su definición de dominio a través de la
withFallbackExtensionProvider`método. En este ejemplo, implementaremos un controlador de errores que llama a `complete en el
RequestTracker`con un relevante `ProblemDetail explicando lo que salió mal.
También tenemos acceso a la Aggregate en el manejador de errores para que pueda devolver información parcial si es necesario.
El ejemplo a continuación simplemente devuelve un error 500 de servidor interno, junto con el Throwable que causó el error general
manejador a invocar:
@Configuration
public class MyConfig {
@Bean
public HttpflowmodelDomain httpflowmodelDomain(ActorSystem actorSystem, FlowErrorExtensions<Aggregate> flowErrorExtensions,
RequestTracker<ResponseEntity<?>> requestTracker) {
// All adapters should be added to the domain model
return new HttpflowmodelDomain. Builder(actorSystem)
.withFallbackExtensionProvider(ExtensionProvider.builder().flowErrorExtensions(flowErrorExtensions).build())
//..other adapters, mappers etc..
.build();
}
@Bean
FlowErrorExtensions<Aggregate> flowErrorExtensions(RequestTracker<ResponseEntity<?>> requestTracker) {
return (aggregate, t) -> {
var problemDetail = ProblemDetail.forStatus(500);
problemDetail.setDetail(t.getMessage());
requestTracker.complete(aggregate.getProcessingContext().getUnitOfWorkId(), ResponseEntity.of(problemDetail).build());
};
}
}
Ejemplo de aplicación
Se ha proporcionado una aplicación de ejemplo que demuestra los conceptos discutidos en esta página. Si se invoca sin
argumentos que devolverá un pacs. 002 Informe de Estado de Pago. Si se invoca con un monto de 1 entonces uno de los pasos será
lance una excepción, y devolverá un error 500 de servidor interno desde el FlowErrorExtensions.
Ejecute la Clase Principal de la aplicación utilizando com.iconsolutions.example.httpflow.app. Application.
Ejemplo de invocación normal:
curl -X POST localhost:8080/submit
Ejemplo de invocación que devolverá una respuesta de error:
curl -H "Content-Type: application/json" -d '{"value":"1"}' -X POST localhost:8080/submit
Consideraciones de resiliencia
-
No ejecute flujos de larga duración (más de unos pocos segundos) como este, debido a varios HTTP timeouts en el cliente y servidor
-
El
RequestTrackeres local a un nodo específico, y si un nodo falla, entonces su mapa de UoW a futuro asociado deberá desaparecer con ello -
Este enfoque no puede ser agrupado: No hay garantía de que un flujo se inicie en el nodo que recibió el HTTP solicitud
-
En el caso (altamente no recomendado) de que desee el
RequestTrackerpara ser persistente, considere reemplazar el mapa con un almacén persistente como MongoDB para rastrear el Unit of Work IDs y sus futuros
Alternativas a Este Enfoque
Una mejor solución a este problema sería tener un servicio web sin estado ubicado frente a una cola o un tema. (ej. JMS or Kafka):

El RequestTracker puede seguir utilizándose en el HTTP Controlador, pero la adición de un bus de mensajería desacopla el nodo IPF.
requisito de afinidad del cliente ascendente en espera de un HTTP respuesta.