Documentation for a newer release is available. View Latest

Ejecución síncrona de flujos

Aunque IPF es principalmente asíncrono y basado en mensajes, ocasionalmente puede haber un requisito para ejecutar un flujo de forma síncrona. El flujo podría ser:

  • Iniciar un flujo de IPF usando una solicitud HTTP

  • Esperar hasta que el flujo (o flujos) termine(n)

  • Enviar una respuesta HTTP cuando el flujo complete correctamente o se produzca un fallo.

Este documento explica algunas formas de implementar este patrón.

No es seguro para producción

Este enfoque es bueno para probar o experimentar con IPF, pero no es una solución lista para producción, porque solo puede funcionar con un único nodo. Ver Consideraciones de resiliencia y Alternativas a este enfoque.

Concepto

La idea es implementar un Request Tracker que referencia una lista indexada por el Unit of Work ID y cuyo valor es un CompletionStage que se completa cuando el flujo termina (con éxito o no).

Request Tracker

Considera la siguiente implementación de un Request Tracker:

RequestTracker.java
public class RequestTracker<T> {

    private final Map<UnitOfWorkId, CompletionStage<T>> OUTSTANDING_REQUESTS = new ConcurrentHashMap<>();

    public CompletionStage<T> track(UnitOfWorkId unitOfWorkId) {
        CompletionStage<T> completionStage = new CompletableFuture<>();
        OUTSTANDING_REQUESTS.put(unitOfWorkId, completionStage);
        return completionStage;
    }

    public boolean complete(UnitOfWorkId unitOfWorkId, T result) {
        return OUTSTANDING_REQUESTS.remove(unitOfWorkId).toCompletableFuture().complete(result);
    }
}

Cuando se inicia un flujo, podemos llamar a track con el UnitOfWorkId, y cuando este flujo completa (o falla) llamamos a complete. Ten en cuenta que complete llama a Map#remove, lo que ayuda a evitar la acumulación de futures completados y permite la recolección de basura de solicitudes completadas.

Observa que el valor del future a devolver está parametrizado (T), lo que te permite crear múltiples RequestTrackers si es necesario, y no estar atado a un framework específico.

Si no tienes el requisito de ninguna de estas cosas, entonces puedes reemplazar el argumento parametrizado con la implementación concreta que quieras usar.

Configuración del flujo

Escribe el flujo como de costumbre. Los únicos cambios son:

  • Añadir una notificación final para completar el future (que es un hook para permitirnos llamar al método RequestTracker#complete)

  • Añadir una implementación de FlowErrorExtensions que también reciba RequestTracker, lo que nos permite completarlo con una respuesta de fallo

Aquí está el flujo de ejemplo:

sync http

Código

Aquí están los fragmentos de código para registrar un UnitOfWorkId con el RequestTracker. El ejemplo a continuación usa Spring Framework y ResponseEntity, pero este patrón puede adaptarse a cualquier otro framework.

Paso 1: Registrar un Unit of Work ID

Cuando recibimos una solicitud HTTP para iniciar un flujo, registramos nuestra ejecución así:

public class MyController {

    private final RequestTracker<ResponseEntity<?>> requestTracker;

    @PostMapping("/submit")
    public CompletionStage<ResponseEntity<?>> submit(@RequestBody Body myBody) {
        var processingContext = ... (1)
        var future = requestTracker.track(processingContext.getUnitOfWorkId()); (2)
        return HttpflowmodelDomain.initiation().handle(
                        new InitiateExampleFlowInput.Builder()
                                .withProcessingContext(processingContext)
                                .withPaymentJourneyType("PAYMENT")
                                .withCustomerCreditTransfer(cct)
                                .build())
                .thenCompose(__ -> future); (3)
    }
}
1 Construir el ProcessingContext según corresponda
2 Registrar el Unit of Work ID del ProcessingContext con el RequestTracker
3 Devolver el future (aún no completado) al llamante

Paso 2: Completar el future cuando se notifique

El fragmento siguiente es la implementación de la notificación "Notify Completion" del flujo anterior:

public class SampleActionAdapter implements SampleActionPort {

    private final RequestTracker<ResponseEntity<?>> requestTracker;

    public SampleActionAdapter(RequestTracker<ResponseEntity<?>> requestTracker) {
        this.requestTracker = requestTracker;
    }

    @Override
    public CompletionStage<Void> execute(NotifyCompletionAction action) {
        return CompletableFuture.runAsync(() -> requestTracker.complete(
                action.getProcessingContext().getUnitOfWorkId(), (1)
                ResponseEntity.ok(action.getPaymentStatusReport()) (2)
        ));
    }
}
1 El UnitOfWorkId suministrado por el flujo vía NotifyCompletionAction
2 El resultado completado (en este caso un pacs.002 Payment Status Report, pero puede ser cualquier otro tipo)

Paso 3: Manejador de errores catch-all

Puedes manejar escenarios donde se lanza una excepción en una etapa del flujo proporcionando un manejador de errores global en la definición de tu dominio de flujo.

Para hacerlo, pasa una implementación personalizada de FlowErrorExtensions a la definición de tu dominio a través del método withFallbackExtensionProvider. En este ejemplo, implementaremos un manejador de errores que llama a complete en el RequestTracker con un ProblemDetail relevante que explique qué salió mal.

También tenemos acceso al Aggregate en el manejador de errores, por lo que podrías devolver información parcial si fuera necesario.

El siguiente ejemplo simplemente devuelve un 500 Internal Server Error, junto con el Throwable que provocó que se invocara el manejador de errores catch-all:

@Configuration
public class MyConfig {
    @Bean
    public HttpflowmodelDomain httpflowmodelDomain(ActorSystem actorSystem, FlowErrorExtensions<Aggregate> flowErrorExtensions,
                                                   RequestTracker<ResponseEntity<?>> requestTracker) {
        // Todos los adapters deben añadirse al domain model
        return new HttpflowmodelDomain.Builder(actorSystem)
                .withFallbackExtensionProvider(ExtensionProvider.builder().flowErrorExtensions(flowErrorExtensions).build())
                //...otros adapters, mappers, etc...
                .build();
    }

    @Bean
    FlowErrorExtensions<Aggregate> flowErrorExtensions(RequestTracker<ResponseEntity<?>> requestTracker) {
        return (aggregate, t) -> {
            var problemDetail = ProblemDetail.forStatus(500);
            problemDetail.setDetail(t.getMessage());
            requestTracker.complete(aggregate.getProcessingContext().getUnitOfWorkId(), ResponseEntity.of(problemDetail).build());
        };
    }
}

Aplicación de ejemplo

Se ha proporcionado una aplicación de ejemplo que demuestra los conceptos discutidos en esta página. Si se invoca sin argumentos devolverá un pacs.002 Payment Status Report. Si se invoca con una cantidad de 1, una de las etapas lanzará una excepción, y devolverá un 500 Internal Server Error desde FlowErrorExtensions.

Ejecuta la clase principal de la aplicación usando com.iconsolutions.example.httpflow.app.Application.

Ejemplo de invocación normal:

curl -X POST localhost:8080/submit

Ejemplo de invocación que devolverá una respuesta de error:

curl -H "Content-Type: application/json" -d '{"value":"1"}' -X POST localhost:8080/submit

Consideraciones de resiliencia

  • No ejecutes flujos de larga duración (más de unos pocos segundos) de esta manera, debido a varios timeouts HTTP en el cliente y el servidor

  • El RequestTracker es local a un nodo específico, y si un nodo muere entonces su mapa UoW-a-future asociado desaparecerá con él

  • Este enfoque no puede ser clusterizado: no hay garantía de que un flujo se lance en el nodo que recibió la solicitud HTTP

  • En el caso (altamente no recomendado) de que desees que el RequestTracker sea persistente, considera reemplazar el mapa con un almacén persistente como MongoDB para rastrear los Unit of Work IDs y sus futures

Alternativas a este enfoque

Una mejor solución a este problema sería tener un servicio web sin estado delante de una cola o tópico (p. ej., JMS o Kafka):

recommended

El RequestTracker todavía puede usarse en el controlador HTTP, pero la adición de un bus de mensajería desacopla el requisito de afinidad de nodo de IPF del cliente ascendente que espera una respuesta HTTP.

Código de ejemplo

Se puede encontrar una implementación de ejemplo de este patrón aquí: aquí. Por favor, contacta con el soporte de IPF si no puedes acceder a esto.