Proceso Debulked Componentes

Procesamiento de Clientes es un módulo enchufable cuyo propósito es enviar una notificación de que los componentes generados por el depurador están listos para ser procesados. Una vez que se envía esa notificación, algo debe procesarla y comenzar a acceder a los componentes.

Notificación

Fuera de la caja un Implementación de Kafka de esto se proporciona. Para el Kafka la implementación del tema predeterminado es CLIENT_PROCESSING_REQUEST (es decir, el Debulker solicita que el cliente procese los componentes).

El InitiateComponentProcessingCommand se envía a Kafka y esto informa a la aplicación/flujo del cliente que se ha recibido un lote, se ha desagregado y los componentes están listos para su procesamiento.

El mensaje/evento encontrado en el Kafka el tema debe ser de la forma

{"bulkId":"pain.001.12345","configName":"pain.001.001.02"}

Procesando

Al recibir la notificación, la aplicación/flujo del cliente puede procesar los componentes individuales accediendo a la File Component Store.

Para hacer esto, el Component Store proporciona una interfaz que permite al cliente llamar y solicitar todos los componentes-por ejemplo, llamando a finalAllByBulkId:

Flow. Publisher<Component<T>> findAllByBulkId(BulkId bulkId);

Procesando Cada Componente

Alternativamente, el procesador de componentes abstractos proporcionado en el ipf-debulker-starter el módulo puede ser extendido. Esta clase simplifica la creación de un procesador de componentes al tener funcionalidades estándar, como la recuperación de componentes de la tienda de componentes, ya implementadas, lo que significa que se puede crear un procesador de componentes simplemente implementando los métodos abstractos de la ComponentProcessor clase.

public abstract class ComponentProcessor<T,C> { (1)

    protected final ComponentStore<NoCustomData> componentStore;
    protected final Function<String, T> mapper; (2)
    protected final boolean rootComponentProcessor;

    protected ComponentProcessor(ComponentStore<NoCustomData> componentStore, Function<String,T> objectMapper, boolean rootComponentProcessor) {
        this.componentStore = componentStore;
        this.mapper = objectMapper;
        this.rootComponentProcessor = rootComponentProcessor; (3)
    }

    protected abstract Class<T> getTargetClass(); (4)

    protected abstract CompletionStage<Done> handle(C result); (5)

    protected abstract Function<C, ComponentId> getParentComponentId(); (6)

    public abstract Function<C,String> getMarker(); (7)

    public CompletionStage<Void> process(ProcessingContext processingContext, C resultWithContext) { (8)
        if(Objects.isNull(processingContext)) {
            return CompletableFuture.failedFuture(new IconRuntimeException("ProcessingContext is required"));
        }
        if(rootComponentProcessor) {
            return getEnrichedRootComponent(processingContext, getMarker(), resultWithContext)
                    .concatMap(result -> Mono.fromCompletionStage(handle(result))
                            .doOnNext(report -> log(result, report)))
                    .next()
                    .then()
                    .toFuture();
        } else {
            return getComponentsByParentId(getParentComponentId().apply(resultWithContext), getMarker().apply(resultWithContext))
                    .map(transformComponent(resultWithContext))
                    .concatMap(result -> Mono.fromCompletionStage(handle(result))
                            .doOnNext(report -> log(result, report)))
                    .then()
                    .toFuture();
        }
    }

    protected abstract Function<Component<NoCustomData>, C> transformComponent(C additionalData); (9)

    private Flux<C> getEnrichedRootComponent(ProcessingContext processingContext, Function<C,String> marker, C contextData) {
        return componentStore.findAllByBulkIdAndMarkerFlux(BulkId.of(processingContext.getClientRequestId().getValue()), marker.apply(contextData))
                .map(transformComponent(contextData));
    }

    protected void log(C result, Done report) {
        log.debug("Processed component from component store {} . Result {} ", result, report);
    }

    protected Flux<Component<NoCustomData>> getComponentsByParentId(ComponentId componentId, String marker) {
        return componentStore.findAllByParentIdAndMarkerFlux(componentId, marker);
    }
}
1 El primer parámetro de tipo genérico debe ser el tipo deserializado del componente.
El segundo parámetro de tipo genérico debe ser una clase de registro que incluya datos del componente, así como datos contextuales adicionales. Este registro sirve como una versión enriquecida de los datos del componente recuperados de la tienda de componentes.
2 Función que define cómo deserializar el contenido en bruto recibido del almacén de componentes en un tipo estructurado.
3 Indicador booleano para indicar si este procesador de componentes es para un componente raíz. Si es verdadero, se recupera un único componente (componente raíz) de la tienda de componentes mediante bulkId y marker. Un componente raíz es aquel que no tiene un padre. Si es falso, los componentes se recuperan mediante parentId y marker
4 Este método debe devolver la clase asociada al contenido deserializado recuperado del almacén de componentes.
5 Este método se llama para cada componente recuperado del almacén de componentes y debe contener qué procesamiento debe ocurrir para cada componente. Por ejemplo, un flujo podría ser triggered utilizando los datos recuperados del almacén de componentes
6 Función que devuelve el parentComponentId dado un objeto de contexto (un objeto de contexto representado como el segundo argumento genérico mencionado en el punto 1). Para un procesador de componente raíz, esto sería nulo.
7 Función que devuelve el marcador asociado a los componentes procesados por este procesador de componentes.
8 Al desear procesar componentes con un procesador de componentes particular, este método debe ser llamado. Para un componente raíz, el bulkId se recuperará de processingContext.clientRequestId.value
9 Este método es responsable de tomar el componente recuperado y mapping it to the custom clase de registro especificada en el segundo argumento de tipo genérico desde el punto 1. Por mapping el componente en esto custom objeto, es posible pasar datos de contexto adicionales para cada componente recuperado de la tienda de componentes

Componentes del Proceso Recuperados

Un ejemplo de implementación del handle método descrito en el punto 5 arriba está incluido a continuación:
El método a continuación es para procesar los componentes a nivel de transacción pain001.

@Override
protected CompletionStage<Done> handle(Pain001TransactionContext result) { (1)
        CreditTransferTransaction39 cdtTrfTxInf = xmlMapper.apply(result.rawContent()); (2)
        String unitOfWorkId = UUID.randomUUID().toString(); // UOW Id to uniquely identify a request.
        String clientRequestId = cdtTrfTxInf.getPmtId().getEndToEndId(); // Payment related Id

        log.info("cdtTrfTxInf is instance of CreditTransferTransaction-initiate flow to process, UOW Id {}", unitOfWorkId);

        return DebulkerModelDomain.initiation().handle(new InitiateProcessDebulkedComponentsInput. Builder() (3)
                .withProcessingContext(ProcessingContext.builder()
                        .unitOfWorkId(unitOfWorkId)
                        .clientRequestId(clientRequestId)
                        .build())
                .withIsoCreditTransferComponent(cdtTrfTxInf)
                .withPaymentJourneyType("PAYMENT")
                .build()
        ).thenApply(report -> {
            log.debug("{}: Transaction Validation outcome is {}", cdtTrfTxInf.getPmtId().getEndToEndId(), report.getResult());
            return report;
        });

    }

<1>`Pain001TransactionContext` contiene datos de componentes recuperados de la tienda de componentes, así como datos adicionales como parentMarker <2> El procesador de componentes puede opcionalmente especificar una función para deserializar el contenido recuperado del almacén de componentes. En esta línea, se invoca la función para deserializar el contenido de modo que pueda ser pasado al MPS flujo <3> Invoque un MPS flujo. Así que para cada componente gestionado por este procesador de componentes, un MPS el flujo será invocado

Ejemplo de Registro de Contexto

@Builder(toBuilder = true)
public record Pain001TransactionContext(ProcessingContext processingContext, String rawContent, ComponentId componentId, String marker, ComponentId parentId, String parentMarker) {
}

El fragmento anterior muestra el Pain001TransactionContext asociado al ejemplo handle método mostrado anteriormente. El transformComponent el método mapea varios campos diferentes recuperados del almacén de componentes al objeto de contexto, así como elementos de datos adicionales que podrían no provenir del almacén de componentes, por ejemplo.processingContext

Ejemplo de Mapeo de Registros de Contexto

    @Override
    protected Function<Component<NoCustomData>, Pain001TransactionContext> transformComponent(Pain001TransactionContext additionalData) {
        return component -> additionalData.toBuilder() (1)
                .componentId(component.getId())
                .marker(component.getMarker())
                .rawContent(component.getContent())
                .build();
    }
1 El toBuilder asegura que cualquier dato de contexto existente, que podría haber sido pasado a través del process La llamada, por ejemplo, se preserva y se enriquece con información adicional del almacén de componentes.
Los dos fragmentos anteriores se han extraído del Tutorial de IPF. El tutorial tiene una sección completa sobre el Procesamiento de Componentes que ofrece un ejemplo más completo.Procesamiento del Cliente Debulker

Así, es posible recibir el InitiateComponentProcessingCommand, informándonos que los componentes de archivo en bloque están listos para su procesamiento. Utilizando esto, podemos recuperar cada componente y luego iniciar una instancia de un flujo de procesamiento IPF para procesar cada componente de manera independiente.