Proceso Debulked Componentes

Procesamiento de Clientes es un módulo enchufable cuyo propósito es enviar una notificación de que los componentes generados por debulker están listos para ser procesados. Una vez que se envíe esa notificación, algo debe procesarlo y comenzar a acceder a los componentes.

Notificación

Fuera de la caja un Implementación de Kafka de esto se proporciona. Para el Kafka la implementación del tema predeterminado es CLIENT_PROCESSING_REQUEST (es decir, el Debulker solicita que el cliente procese los componentes).

El InitiateComponentProcessingCommand se envía a Kafka y esto informa la aplicación/flujo del cliente a un bulk ha sido recibido,debulked y los componentes están listos para su procesamiento.

El mensaje/event encontrado en el Kafka el tema debe ser de la forma

{"bulkId":"pain. 001. 12345","configName":"pain. 001. 001. 02"}

Procesando

Al recibir la notificación, la aplicación/flujo del cliente puede procesar los componentes individuales accediendo a la File Component Store.

Para hacer esto, el Component Store proporciona una interfaz que permite al cliente llamar y solicitar todos los componentes-por ejemplo, llamando a finalAllByBulkId:

Flow. Publisher<Component<T>> findAllByBulkId(BulkId bulkId);

Procesando Cada Componente

Alternativamente, el procesador de componentes abstractos proporcionado en el ipf-debulker-starter el módulo puede ser extendido. Esta clase simplifica la creación de un procesador de componentes al tener funcionalidades estándar como la recuperación de componentes de la component store ya implementado, lo que significa que se puede crear un procesador de componentes simplemente implementando los métodos abstractos del ComponentProcessor clase.

public abstract class ComponentProcessor<T,C> { (1)

    protected final ComponentStore<NoCustomData> componentStore;
    protected final Function<String, T> mapper; (2)
    protected final boolean rootComponentProcessor;

    protected ComponentProcessor(ComponentStore<NoCustomData> componentStore, Function<String,T> objectMapper, boolean rootComponentProcessor) {
        this.componentStore = componentStore;
        this.mapper = objectMapper;
        this.rootComponentProcessor = rootComponentProcessor; (3)
    }

    protected abstract Class<T> getTargetClass(); (4)

    protected abstract CompletionStage<Done> handle(C result); (5)

    protected abstract Function<C, ComponentId> getParentComponentId(); (6)

    public abstract Function<C,String> getMarker(); (7)

    public CompletionStage<Void> process(ProcessingContext processingContext, C resultWithContext) { (8)
        if(Objects.isNull(processingContext)) {
            return CompletableFuture.failedFuture(new IconRuntimeException("ProcessingContext is required"));
        }
        if(rootComponentProcessor) {
            return getEnrichedRootComponent(processingContext, getMarker(), resultWithContext)
                    .concatMap(result -> Mono.fromCompletionStage(handle(result))
                            .doOnNext(report -> log(result, report)))
                    .next()
                    .then()
                    .toFuture();
        } else {
            return getComponentsByParentId(getParentComponentId().apply(resultWithContext), getMarker().apply(resultWithContext))
                    .map(transformComponent(resultWithContext))
                    .concatMap(result -> Mono.fromCompletionStage(handle(result))
                            .doOnNext(report -> log(result, report)))
                    .then()
                    .toFuture();
        }
    }

    protected abstract Function<Component<NoCustomData>, C> transformComponent(C additionalData); (9)

    private Flux<C> getEnrichedRootComponent(ProcessingContext processingContext, Function<C,String> marker, C contextData) {
        return componentStore.findAllByBulkIdAndMarkerFlux(BulkId.of(processingContext.getClientRequestId().getValue()), marker.apply(contextData))
                .map(transformComponent(contextData));
    }

    protected void log(C result, Done report) {
        log.debug("Processed component from component store {} . Result {} ", result, report);
    }

    protected Flux<Component<NoCustomData>> getComponentsByParentId(ComponentId componentId, String marker) {
        return componentStore.findAllByParentIdAndMarkerFlux(componentId, marker);
    }
}
1 El primer parámetro de tipo genérico debe ser el tipo deserializado del componente.
El segundo parámetro de tipo genérico debe ser una clase de registro que incluya datos del componente, así como datos contextuales adicionales. Este registro sirve como una versión enriquecida de los datos del componente recuperados de la component store.
2 Función que define cómo deserializar el contenido en bruto recibido de la component store en un tipo estructurado
3 Indicador booleano para indicar si este procesador de componentes es para un componente raíz. Si es verdadero, se recupera un único componente (componente raíz) de la component store por bulkId y marker. Un componente raíz es aquel que no tiene un padre. Si es falso, los componentes se recuperan mediante parentId y marker
4 Este método debe devolver la clase asociada al contenido deserializado recuperado de la component store
5 Este método se llama cuando se recupera cada componente de la component store y debe contener qué procesamiento debe ocurrir para cada componente. Por ejemplo, un flujo podría ser triggered utilizando los datos recuperados de la component store
6 Función que devuelve el parentComponentId dado un objeto de contexto (un objeto de contexto representado como el segundo argumento genérico mencionado en el punto 1). Para un procesador de componente raíz, esto sería nulo.
7 Función que devuelve el marker asociados a los componentes procesados por este procesador de componentes
8 Al desear procesar componentes con un procesador de componentes particular, este método debe ser llamado. Para un componente raíz, el bulkId se recuperará de processingContext.clientRequestId.value
9 Este método es responsable de tomar el componente recuperado y mapping it to the custom clase de registro especificada en el segundo argumento de tipo genérico desde el punto 1. Por mapping el componente en esto custom objeto, es posible pasar datos de contexto adicionales para cada componente recuperado de la component store

Componentes del Proceso Recuperados

Un ejemplo de implementación del handle método descrito en el punto 5 above is included below:
El método a continuación es para procesar los componentes a nivel de transacción pain001.

@Override
protected CompletionStage<Done> handle(Pain001TransactionContext result) { (1)
        CreditTransferTransaction39 cdtTrfTxInf = xmlMapper.apply(result.rawContent()); (2)
        String unitOfWorkId = UUID.randomUUID().toString(); // UOW Id to uniquely identify a request.
        String clientRequestId = cdtTrfTxInf.getPmtId().getEndToEndId(); // Payment related Id

        log.info("cdtTrfTxInf is instance of CreditTransferTransaction-initiate flow to process, UOW Id {}", unitOfWorkId);

        return DebulkerModelDomain.initiation().handle(new InitiateProcessDebulkedComponentsInput. Builder() (3)
                .withProcessingContext(ProcessingContext.builder()
                        .unitOfWorkId(unitOfWorkId)
                        .clientRequestId(clientRequestId)
                        .build())
                .withIsoCreditTransferComponent(cdtTrfTxInf)
                .withPaymentJourneyType("PAYMENT")
                .build()
        ).thenApply(report -> {
            log.debug("{}: Transaction Validation outcome is {}", cdtTrfTxInf.getPmtId().getEndToEndId(), report.getResult());
            return report;
        });

    }

<1>`Pain001TransactionContext` contiene datos de componentes recuperados de la component store así como datos adicionales como parentMarker <2> El procesador de componentes puede opcionalmente especificar una función para deserializar el contenido recuperado de la component store. En esta línea, se invoca la función para deserializar el contenido de manera que pueda ser pasado a la MPS flujo <3> Invoque un MPS flujo. Así que para cada componente manejado por este procesador de componentes, un MPS el flujo será invocado

Ejemplo de Registro de Contexto

@Builder(toBuilder = true)
public record Pain001TransactionContext(ProcessingContext processingContext, String rawContent, ComponentId componentId, String marker, ComponentId parentId, String parentMarker) {
}

El fragmento anterior muestra el Pain001TransactionContext asociado al ejemplo handle método mostrado anteriormente. El transformComponent método mapea varios campos diferentes recuperados de la component store al objeto de contexto, así como elementos de datos adicionales que podrían no provenir de la component store, p. ej.processingContext

Ejemplo de Registro de Contexto Mapping

    @Override
    protected Function<Component<NoCustomData>, Pain001TransactionContext> transformComponent(Pain001TransactionContext additionalData) {
        return component -> additionalData.toBuilder() (1)
                .componentId(component.getId())
                .marker(component.getMarker())
                .rawContent(component.getContent())
                .build();
    }
1 El toBuilder asegura que cualquier dato de contexto existente, que podría haber sido pasado a través del process la llamada, por ejemplo, se preserva y enriquece con información adicional de la component store.
Los dos fragmentos anteriores se han extraído del Tutorial de IPF. El tutorial tiene una sección completa sobre el Procesamiento de Componentes que ofrece un ejemplo más completo.Procesamiento del Cliente Debulker

Así es posible recibir el InitiateComponentProcessingCommand, indicándonos bulk Los componentes del archivo están listos para su procesamiento. Utilizando esto, podemos recuperar cada componente y luego iniciar una instancia de un flujo de procesamiento IPF para procesar cada componente de manera independiente.