Documentation for a newer release is available. View Latest

Procesar componentes desagregados

Client Processing es un módulo acoplable cuyo propósito es enviar una notificación de que los componentes generados por el debulker están listos para ser procesados. Una vez enviada esa notificación, algo debe procesarla y comenzar a acceder a los componentes.

Notificación

De serie se proporciona una implementación en Kafka. Para la implementación en Kafka el tópico por defecto es CLIENT_PROCESSING_REQUEST (es decir, el Debulker solicita que el cliente procese los componentes).

El InitiateComponentProcessingCommand se envía a Kafka e informa a la aplicación/flujo cliente de que se ha recibido un archivo agrupado, se ha desagregado y los componentes están listos para su procesamiento.

El mensaje/evento que se encuentra en el tópico de Kafka debe tener la forma

{"bulkId":"pain.001.12345","configName":"pain.001.001.02"}

Procesamiento

Al recibir la notificación, la aplicación/flujo cliente puede procesar los componentes individuales accediendo al File Component Store.

Para ello, el Component Store proporciona una interfaz que permite al cliente llamar y solicitar todos los componentes; por ejemplo, llamando a findAllByBulkId:

Flow.Publisher<Component<T>> findAllByBulkId(BulkId bulkId);

Procesar cada componente

Como alternativa, puede ampliarse el procesador de componentes abstracto proporcionado en el módulo ipf-debulker-starter. Esta clase simplifica la creación de un procesador de componentes al tener ya implementada la funcionalidad estándar como la recuperación de componentes desde el component store, lo que significa que se puede crear un procesador de componentes implementando únicamente los métodos abstractos de la clase ComponentProcessor.

public abstract class ComponentProcessor<T,C> { (1)

    protected final ComponentStore<NoCustomData> componentStore;
    protected final Function<String, T> mapper; (2)
    protected final boolean rootComponentProcessor;

    protected ComponentProcessor(ComponentStore<NoCustomData> componentStore, Function<String,T> objectMapper, boolean rootComponentProcessor) {
        this.componentStore = componentStore;
        this.mapper = objectMapper;
        this.rootComponentProcessor = rootComponentProcessor; (3)
    }

    protected abstract Class<T> getTargetClass(); (4)

    protected abstract CompletionStage<Done> handle(C result); (5)

    protected abstract Function<C, ComponentId> getParentComponentId(); (6)

    public abstract Function<C,String> getMarker(); (7)

    public CompletionStage<Void> process(ProcessingContext processingContext, C resultWithContext) { (8)
        if(Objects.isNull(processingContext)) {
            return CompletableFuture.failedFuture(new IconRuntimeException("ProcessingContext is required"));
        }
        if(rootComponentProcessor) {
            return getEnrichedRootComponent(processingContext, getMarker(), resultWithContext)
                    .concatMap(result -> Mono.fromCompletionStage(handle(result))
                            .doOnNext(report -> log(result, report)))
                    .next()
                    .then()
                    .toFuture();
        } else {
            return getComponentsByParentId(getParentComponentId().apply(resultWithContext), getMarker().apply(resultWithContext))
                    .map(transformComponent(resultWithContext))
                    .concatMap(result -> Mono.fromCompletionStage(handle(result))
                            .doOnNext(report -> log(result, report)))
                    .then()
                    .toFuture();
        }
    }

    protected abstract Function<Component<NoCustomData>, C> transformComponent(C additionalData); (9)

    private Flux<C> getEnrichedRootComponent(ProcessingContext processingContext, Function<C,String> marker, C contextData) {
        return componentStore.findAllByBulkIdAndMarkerFlux(BulkId.of(processingContext.getClientRequestId().getValue()), marker.apply(contextData))
                .map(transformComponent(contextData));
    }

    protected void log(C result, Done report) {
        log.debug("Processed component from component store {} . Result {} ", result, report);
    }

    protected Flux<Component<NoCustomData>> getComponentsByParentId(ComponentId componentId, String marker) {
        return componentStore.findAllByParentIdAndMarkerFlux(componentId, marker);
    }
}
1 El primer parámetro de tipo genérico debe ser el tipo deserializado del componente.
El segundo parámetro de tipo genérico debe ser una clase record que incluya datos del componente así como datos contextuales adicionales. Este record sirve como una versión enriquecida de los datos del componente recuperados del component store.
2 Función que define cómo deserializar el contenido en bruto recibido del component store en un tipo estructurado
3 Indicador booleano para indicar si este procesador de componentes es para un componente raíz. Si es true, se recupera un único componente (componente raíz) del component store por bulkId y marker. Un componente raíz es aquel que no tiene padre. Si es false, los componentes se recuperan por parentId y marker
4 Este método debe devolver la clase asociada al contenido deserializado recuperado del component store
5 Este método se llama para cada componente recuperado del component store y debe contener qué procesamiento debe ocurrir para cada componente. Por ejemplo, se podría disparar un flow usando los datos recuperados del component store
6 Función que devuelve el parentComponentId dado un objeto de contexto (un objeto de contexto representado como el segundo argumento genérico mencionado en el punto 1). Para un procesador de componentes raíz esto sería null
7 Función que devuelve el marker asociado a los componentes procesados por este procesador de componentes
8 Cuando se quieran procesar componentes con un procesador de componentes concreto, se debe llamar a este método. Para un componente raíz, el bulkId se recuperará de processingContext.clientRequestId.value
9 Este método es responsable de tomar el componente recuperado y mapearlo a la clase record personalizada especificada en el segundo argumento de tipo genérico del punto 1. Al mapear el componente en este objeto personalizado, es posible pasar datos de contexto adicionales para cada componente recuperado del component store

Procesar componentes recuperados

A continuación se incluye un ejemplo de implementación del método handle descrito en el punto 5 anterior:
El siguiente método es para procesar componentes a nivel de transacción de pain001

@Override
protected CompletionStage<Done> handle(Pain001TransactionContext result) { (1)
        CreditTransferTransaction39 cdtTrfTxInf = xmlMapper.apply(result.rawContent()); (2)
        String unitOfWorkId = UUID.randomUUID().toString(); // UOW Id to uniquely identify a request.
        String clientRequestId = cdtTrfTxInf.getPmtId().getEndToEndId(); // Payment related Id

        log.info("cdtTrfTxInf is instance of CreditTransferTransaction - initiate flow to process, UOW Id {}", unitOfWorkId);

        return DebulkerModelDomain.initiation().handle(new InitiateProcessDebulkedComponentsInput.Builder() (3)
                .withProcessingContext(ProcessingContext.builder()
                        .unitOfWorkId(unitOfWorkId)
                        .clientRequestId(clientRequestId)
                        .build())
                .withIsoCreditTransferComponent(cdtTrfTxInf)
                .withPaymentJourneyType("PAYMENT")
                .build()
        ).thenApply(report -> {
            log.debug("{}: Transaction Validation outcome is {}", cdtTrfTxInf.getPmtId().getEndToEndId(), report.getResult());
            return report;
        });

    }
1 Pain001TransactionContext contiene datos de componentes recuperados del component store así como datos adicionales como parentMarker
2 El procesador de componentes puede opcionalmente especificar una función para deserializar el contenido recuperado del component store. En esta línea, se invoca la función para deserializar el contenido de modo que pueda pasarse al MPS flow
3 Invocar un MPS flow. Así, por cada componente gestionado por este procesador de componentes se invocará un MPS flow

Ejemplo de record de contexto

@Builder(toBuilder = true)
public record Pain001TransactionContext(ProcessingContext processingContext, String rawContent, ComponentId componentId, String marker, ComponentId parentId, String parentMarker) {
}

El fragmento anterior muestra el Pain001TransactionContext asociado al método de ejemplo handle mostrado previamente. El método transformComponent mapea varios campos diferentes recuperados del component store al objeto de contexto, así como elementos de datos adicionales que podrían no provenir del component store, por ejemplo processingContext

Mapeo del record de contexto de ejemplo

    @Override
    protected Function<Component<NoCustomData>, Pain001TransactionContext> transformComponent(Pain001TransactionContext additionalData) {
        return component -> additionalData.toBuilder() (1)
                .componentId(component.getId())
                .marker(component.getMarker())
                .rawContent(component.getContent())
                .build();
    }
1 toBuilder garantiza que cualquier dato de contexto existente, que podría haberse pasado por ejemplo a través de la llamada process, se preserve y enriquezca con información adicional del component store.

NOTA: Estos dos fragmentos anteriores están tomados del IPF Tutorial. El tutorial tiene toda una sección sobre el procesamiento de componentes que ofrece un ejemplo más completo Debulker Client Processing]

Así, es posible recibir el InitiateComponentProcessingCommand, informándonos de que los componentes de archivos agrupados están listos para procesarse. Con esto, podemos recuperar cada componente e iniciar una instancia de un flujo de procesamiento de IPF para procesar cada componente de forma independiente.