Documentation for a newer release is available. View Latest
Esta página no está disponible actualmente en Español. Si lo necesita, póngase en contacto con el servicio de asistencia de Icon (correo electrónico)

Process Debulked Components

Client Processing is a pluggable module whose purpose is sending a notification that components generated by debulker are ready to be processed. Once that notification is sent, something needs to process it and start accessing the components.

Notification

Out of the box a Kafka implementation of this is provided. For the Kafka implementation the default topic is CLIENT_PROCESSING_REQUEST (i.e. the Debulker requests the client processes the components).

The InitiateComponentProcessingCommand is sent to Kafka and this informs the client application/flow a bulk has been received, debulked and the components are ready for processing.

The message/event found in the Kafka topic should be of the form

{"bulkId":"pain.001.12345","configName":"pain.001.001.02"}

Processing

On receipt of the notification, the client application/flow can process the individual components by accessing the File Component Store.

To do this, the Component Store provides an interface which allows the client to call and ask for all components - for example calling finalAllByBulkId:

Flow.Publisher<Component<T>> findAllByBulkId(BulkId bulkId);

Processing Each Component

Alternatively, the abstract component processor provided in the ipf-debulker-starter module can be extended. This class simplifies the creation of a component processor by having standard functionality such as the retrieval of components from the component store already implemented, meaning a component processor can be created by just implementing the abstract methods of the ComponentProcessor class.

public abstract class ComponentProcessor<T,C> { (1)

    protected final ComponentStore<NoCustomData> componentStore;
    protected final Function<String, T> mapper; (2)
    protected final boolean rootComponentProcessor;

    protected ComponentProcessor(ComponentStore<NoCustomData> componentStore, Function<String,T> objectMapper, boolean rootComponentProcessor) {
        this.componentStore = componentStore;
        this.mapper = objectMapper;
        this.rootComponentProcessor = rootComponentProcessor; (3)
    }

    protected abstract Class<T> getTargetClass(); (4)

    protected abstract CompletionStage<Done> handle(C result); (5)

    protected abstract Function<C, ComponentId> getParentComponentId(); (6)

    public abstract Function<C,String> getMarker(); (7)

    public CompletionStage<Void> process(ProcessingContext processingContext, C resultWithContext) { (8)
        if(Objects.isNull(processingContext)) {
            return CompletableFuture.failedFuture(new IconRuntimeException("ProcessingContext is required"));
        }
        if(rootComponentProcessor) {
            return getEnrichedRootComponent(processingContext, getMarker(), resultWithContext)
                    .concatMap(result -> Mono.fromCompletionStage(handle(result))
                            .doOnNext(report -> log(result, report)))
                    .next()
                    .then()
                    .toFuture();
        } else {
            return getComponentsByParentId(getParentComponentId().apply(resultWithContext), getMarker().apply(resultWithContext))
                    .map(transformComponent(resultWithContext))
                    .concatMap(result -> Mono.fromCompletionStage(handle(result))
                            .doOnNext(report -> log(result, report)))
                    .then()
                    .toFuture();
        }
    }

    protected abstract Function<Component<NoCustomData>, C> transformComponent(C additionalData); (9)

    private Flux<C> getEnrichedRootComponent(ProcessingContext processingContext, Function<C,String> marker, C contextData) {
        return componentStore.findAllByBulkIdAndMarkerFlux(BulkId.of(processingContext.getClientRequestId().getValue()), marker.apply(contextData))
                .map(transformComponent(contextData));
    }

    protected void log(C result, Done report) {
        log.debug("Processed component from component store {} . Result {} ", result, report);
    }

    protected Flux<Component<NoCustomData>> getComponentsByParentId(ComponentId componentId, String marker) {
        return componentStore.findAllByParentIdAndMarkerFlux(componentId, marker);
    }
}
1 The first generic type parameter should be the unmarshalled type of the component.
The second generic type parameter should be a record class that includes data from the component as well as additional contextual data. This record serves as an enriched version of the component data retrieved from the component store.
2 Function which defines how to unmarshal the raw content received from the component store into a structured type
3 Boolean indicator to indicate whether this component processor is for a root component. If true, a single component (root component) is retrieved from the component store by bulkId and marker. A root component is one which does not have a parent. If false, components are retrieved by parentId and marker
4 This method should return the class associated to the unmarshalled content retrieved from the component store
5 This method is called when for each component retrieved from the component store and should contain what processing should occur for each component. E.g. a flow could be triggered using the data retrieved from the component store
6 Function which returns the parentComponentId given a context object (a context object represented as the second generic argument mentioned in point 1). For a root component processor this would be null
7 Function which returns the marker associated to components processed by this component processor
8 When wanting to process components with a particular component processor, this method should be called. For a root component, the bulkId will be retrieved from processingContext.clientRequestId.value
9 This method is responsible for taking the retrieved component and mapping it to the custom record class specified in the second generic type argument from point 1. By mapping the component into this custom object, it’s possible to pass additional context data for each component retrieved from the component store

Process Components Retrieved

An example implementation of the handle method described in point 5 above is included below:
The below method is for processing pain001 transaction level components

@Override
protected CompletionStage<Done> handle(Pain001TransactionContext result) { (1)
        CreditTransferTransaction39 cdtTrfTxInf = xmlMapper.apply(result.rawContent()); (2)
        String unitOfWorkId = UUID.randomUUID().toString(); // UOW Id to uniquely identify a request.
        String clientRequestId = cdtTrfTxInf.getPmtId().getEndToEndId(); // Payment related Id

        log.info("cdtTrfTxInf is instance of CreditTransferTransaction - initiate flow to process, UOW Id {}", unitOfWorkId);

        return DebulkerModelDomain.initiation().handle(new InitiateProcessDebulkedComponentsInput.Builder() (3)
                .withProcessingContext(ProcessingContext.builder()
                        .unitOfWorkId(unitOfWorkId)
                        .clientRequestId(clientRequestId)
                        .build())
                .withIsoCreditTransferComponent(cdtTrfTxInf)
                .withPaymentJourneyType("PAYMENT")
                .build()
        ).thenApply(report -> {
            log.debug("{}: Transaction Validation outcome is {}", cdtTrfTxInf.getPmtId().getEndToEndId(), report.getResult());
            return report;
        });

    }
1 Pain001TransactionContext contains component data retrieved from the component store as well as additional data such as parentMarker
2 The component processor can optionally specify a function for unmarshalling content retrieved from the component store. In this line, the function is invoked to unmarshal the content so that it can be passed to the MPS flow
3 Invoke an MPS flow. So for each component handled by this component processor an MPS flow will be invoked

Example Context Record

@Builder(toBuilder = true)
public record Pain001TransactionContext(ProcessingContext processingContext, String rawContent, ComponentId componentId, String marker, ComponentId parentId, String parentMarker) {
}

The above snippet shows the Pain001TransactionContext associated to the example handle method shown previously. The transformComponent method maps various different fields retrieved from the component store to the context object, as well as additional data items that might not come from the component store, e.g. processingContext

Example Context Record Mapping

    @Override
    protected Function<Component<NoCustomData>, Pain001TransactionContext> transformComponent(Pain001TransactionContext additionalData) {
        return component -> additionalData.toBuilder() (1)
                .componentId(component.getId())
                .marker(component.getMarker())
                .rawContent(component.getContent())
                .build();
    }
1 The toBuilder ensures that any existing context data, that might have been passed in via the process call for example, is preserved and enriched with additional information from the component store.
This above two snippets are lifted from the IPF Tutorial. The tutorial has a whole section on Component Processing which gives a more complete example Debulker Client Processing]

Thus it is possible to receive the InitiateComponentProcessingCommand, telling us bulk file components are ready for processing. Using this, we can retrieve each component and then initiate an instance of a IPF processing flow to process each component independently.