Ejecución de Flujo Sincrónico

Si bien IPF es principalmente asincrónico y basado en mensajes, ocasionalmente puede haber un requisito para ejecutar un flujo. sincronizadamente. El flujo podría ser:

  • Inicie un IPF flow usando un HTTP solicitud

  • Espere hasta que el flujo (o flujos) se complete(n)

  • Envíe un HTTP respuesta cuando el flujo se completa con éxito o hay un fallo.

Este documento explica algunas maneras de implementar este patrón.

¡No es seguro para producción!

Este enfoque es bueno para probar o experimentar con IPF, pero no es una solución lista para producción, porque solo puede trabaje con un solo nodo. Consulte Consideraciones de resiliencia y Alternativas a Este Enfoque.

Concepto

La idea es implementar un Request Tracker que haga referencia a una lista indexada por el unit of work ID y cuyo valor es a `CompletionStage`que se completa cuando el flujo termina (con éxito o no).

Solicitante de seguimiento

Considere la siguiente implementación de un Request Tracker:

.RequestTracker.java
public class RequestTracker<T> {

    private final Map<UnitOfWorkId, CompletionStage<T>> OUTSTANDING_REQUESTS = new ConcurrentHashMap<>();

    public CompletionStage<T> track(UnitOfWorkId unitOfWorkId) {
        CompletionStage<T> completionStage = new CompletableFuture<>();
        OUTSTANDING_REQUESTS.put(unitOfWorkId, completionStage);
        return completionStage;
    }

    public boolean complete(UnitOfWorkId unitOfWorkId, T result) {
        return OUTSTANDING_REQUESTS.remove(unitOfWorkId).toCompletableFuture().complete(result);
    }
}

Cuando se inicia un flujo, podemos llamar track con el UnitOfWorkId, y cuando este flujo se completa (o falla) llamamos complete. Tenga en cuenta que complete llamadas docs.oracle.com/en/java/javase/21/docs/api/java.base/java/util/Map.html#remove(java.lang. Object)[Map#remove] cuál ayuda a prevenir la acumulación de futuros completados y permite la recolección de basura de solicitudes completadas.

Tenga en cuenta que el valor del futuro a ser devuelto está parametrizado.T) que le permite crear múltiples RequestTracker s si es necesario, y no estar atado a un marco específico.

Si no tiene un requisito para cualquiera de estas cosas, entonces puede reemplazar el argumento parametrizado con el implementación concreta que desea utilizar.

Configuración del flujo

Escriba el flujo como normal. Los únicos cambios son:

  • Agregue una notificación final para completar el futuro (que es un gancho para permitirnos llamar el RequestTracker#complete método)

  • Añada un FlowErrorExtensions implementación que también toma RequestTracker que nos permite completarlo con un respuesta de fallo

Aquí está el flujo de ejemplo:

sync http

Código

Aquí están los fragmentos de código para registrar un UnitOfWorkId con el RequestTracker. El ejemplo a continuación utiliza Spring Marco y ResponseEntity, pero este patrón puede ser adaptado para cualquier otro marco.

Paso 1: Registre un Unit of Work ID

Cuando recibimos un HTTP solicitud para iniciar un flujo, registramos nuestra ejecución de la siguiente manera:

public class MyController {

    private final RequestTracker<ResponseEntity<?>> requestTracker;

    @PostMapping("/submit")
    public CompletionStage<ResponseEntity<?>> submit(@RequestBody Body myBody) {
        var processingContext =.. (1)
        var future = requestTracker.track(processingContext.getUnitOfWorkId()); (2)
        return HttpflowmodelDomain.initiation().handle(
                        new InitiateExampleFlowInput. Builder()
                                .withProcessingContext(processingContext)
                                .withPaymentJourneyType("PAYMENT")
                                .withCustomerCreditTransfer(cct)
                                .build())
                .thenCompose(__ -> future); (3)
    }
}
1 Construya el ProcessingContext según corresponda
2 Registre el ProcessingContext’s Unit of Work ID con el RequestTracker
3 Devuelva el futuro (aún no completado) al llamador

Paso 2: Complete el futuro cuando sea notificado

El fragmento a continuación es la implementación de la notificación "Notificar Finalización" del flujo anterior:

public class SampleActionAdapter implements SampleActionPort {

    private final RequestTracker<ResponseEntity<?>> requestTracker;

    public SampleActionAdapter(RequestTracker<ResponseEntity<?>> requestTracker) {
        this.requestTracker = requestTracker;
    }

    @Override
    public CompletionStage<Void> execute(NotifyCompletionAction action) {
        return CompletableFuture.runAsync(() -> requestTracker.complete(
                action.getProcessingContext().getUnitOfWorkId(), (1)
                ResponseEntity.ok(action.getPaymentStatusReport()) (2)
        ));
    }
}
1 El UnitOfWorkId suministrado por el flujo a través de NotifyCompletionAction
2 El resultado completado (en este caso un pacs. 002 Informe de Estado de Pago, pero puede ser cualquier otro tipo.

Paso 3: Manejador de Errores General

Puede manejar escenarios en los que se lanza una excepción en una etapa del flujo proporcionando un controlador de errores global en su flujo. definición de dominio.

Para hacer esto, pase un custom FlowErrorExtensions implementación a su definición de dominio a través de la withFallbackExtensionProvider`método. En este ejemplo, implementaremos un controlador de errores que llama a `complete en el RequestTracker`con un relevante `ProblemDetail explicando lo que salió mal.

También tenemos acceso a la Aggregate en el manejador de errores para que pueda devolver información parcial si es necesario.

El ejemplo a continuación simplemente devuelve un error 500 de servidor interno, junto con el Throwable que causó el error general manejador a invocar:

@Configuration
public class MyConfig {
    @Bean
    public HttpflowmodelDomain httpflowmodelDomain(ActorSystem actorSystem, FlowErrorExtensions<Aggregate> flowErrorExtensions,
                                                   RequestTracker<ResponseEntity<?>> requestTracker) {
        // All adapters should be added to the domain model
        return new HttpflowmodelDomain. Builder(actorSystem)
                .withFallbackExtensionProvider(ExtensionProvider.builder().flowErrorExtensions(flowErrorExtensions).build())
                //..other adapters, mappers etc..
                .build();
    }

    @Bean
    FlowErrorExtensions<Aggregate> flowErrorExtensions(RequestTracker<ResponseEntity<?>> requestTracker) {
        return (aggregate, t) -> {
            var problemDetail = ProblemDetail.forStatus(500);
            problemDetail.setDetail(t.getMessage());
            requestTracker.complete(aggregate.getProcessingContext().getUnitOfWorkId(), ResponseEntity.of(problemDetail).build());
        };
    }
}

Ejemplo de aplicación

Se ha proporcionado una aplicación de ejemplo que demuestra los conceptos discutidos en esta página. Si se invoca sin argumentos que devolverá un pacs. 002 Informe de Estado de Pago. Si se invoca con un monto de 1 entonces uno de los pasos será lance una excepción, y devolverá un error 500 de servidor interno desde el FlowErrorExtensions.

Ejecute la Clase Principal de la aplicación utilizando com.iconsolutions.example.httpflow.app. Application.

Ejemplo de invocación normal:

curl -X POST localhost:8080/submit

Ejemplo de invocación que devolverá una respuesta de error:

curl -H "Content-Type: application/json" -d '{"value":"1"}' -X POST localhost:8080/submit

Consideraciones de resiliencia

  • No ejecute flujos de larga duración (más de unos pocos segundos) como este, debido a varios HTTP timeouts en el cliente y servidor

  • El RequestTracker es local a un nodo específico, y si un nodo falla, entonces su mapa de UoW a futuro asociado deberá desaparecer con ello

  • Este enfoque no puede ser agrupado: No hay garantía de que un flujo se inicie en el nodo que recibió el HTTP solicitud

  • En el caso (altamente no recomendado) de que desee el RequestTracker para ser persistente, considere reemplazar el mapa con un almacén persistente como MongoDB para rastrear el Unit of Work IDs y sus futuros

Alternativas a Este Enfoque

Una mejor solución a este problema sería tener un servicio web sin estado ubicado frente a una cola o un tema. (ej. JMS or Kafka):

recommended

El RequestTracker puede seguir utilizándose en el HTTP Controlador, pero la adición de un bus de mensajería desacopla el nodo IPF. requisito de afinidad del cliente ascendente en espera de un HTTP respuesta.

Código de Ejemplo

Un ejemplo de implementación de este patrón se puede encontrar aquí:https://bitbucket.iconsolutions.com/projects/IPFV/repos/sync-http-flow-example/browse[aquí]. Por favor, contacte con el soporte de IPF si no puede acceder a esto.