Documentation for a newer release is available. View Latest

Interacting With the Bulker

There are two ways to interact with the Bulker:

  • Embedded

  • External

Embedded

To access bulk functionality from within the same application (embedded), add the following module

<dependency>
    <groupId>com.iconsolutions.ipf.bulk</groupId>
    <artifactId>ipf-bulker-aggregate-akka</artifactId>
</dependency>

This provides the API (from ipf-bulker-aggregate-api) and a default Akka implementation for the Bulker Aggregate. More details of the operations available via this API are available from Bulk Aggregate.

External

To process requests from outside the application to perform Bulker operations, include the following module:

<dependency>
    <groupId>com.iconsolutions.ipf.bulk</groupId>
    <artifactId>ipf-bulker-ingestion-connector-kafka</artifactId>
</dependency>

This provides a ReceiveConnector listening on a Kafka topic for requests to perform Bulk operations. Now, an implementation of BulkIngestionReceiveClientAdapter needs to provided in the application context to handle the various supported request message types detailed below.

An example implementation is provided below, this example also shows how each message type maps to the corresponding bulk aggregate call:

BulkIngestionReceiveAdapter.java
public class BulkIngestionReceiveAdapter implements BulkIngestionReceiveClientAdapter {

    private final BulkAggregate bulkAggregate;
    private final RecurringBulkAggregate recurringBulkAggregate;
    private final BulkStatusAdapter bulkStatusAdapter;

    @Override
    public ProcessingContext determineContextFor(BulkIngestionMessage request) {
        return ProcessingContext.builder()
                .clientRequestId(request.getRequestId())
                .associationId(AssociationId.unknown())
                .unitOfWorkId(UnitOfWorkId.createRandom())
                .build();
    }

    @Override
    public CompletionStage<Void> handle(ReceivingContext context, BulkIngestionMessage request) {
        if (request instanceof CreateBulkMessage createBulkMessage) { (1)
            CreateBulkCommand createBulkCommand = new CreateBulkCommand(BulkId.of(createBulkMessage.getBulkId()),
                    createBulkMessage.getConfigName());
            return bulkAggregate.createBulk(createBulkCommand)
                    .thenCompose(bulkIdResponse -> sendStatusMessage(bulkIdResponse, request));
        } else if (request instanceof AddComponentMessage addComponentMessage) { (2)
            return bulkAggregate.addComponent(
                    new AddComponentCommand(BulkId.of(addComponentMessage.getBulkId()),
                            BulkComponentId.of(addComponentMessage.getParentId()),
                            BulkComponentId.of(addComponentMessage.getComponentId()),
                            addComponentMessage.getPath(),
                            addComponentMessage.getContent())
            ).thenCompose(response -> sendStatusMessage(response, request));
        } else if (request instanceof AddComponentWithBulkAutoCreateMessage addComponentMessage) { (3)
            return bulkAggregate.addComponent(
                    new AddComponentWithAutoCreateCommand(BulkId.of(addComponentMessage.getBulkId()),
                            BulkComponentId.of(addComponentMessage.getParentId()),
                            BulkComponentId.of(addComponentMessage.getComponentId()),
                            addComponentMessage.getPath(),
                            addComponentMessage.getContent(),
                            addComponentMessage.getConfigName())
            ).thenCompose(response -> sendStatusMessage(response, request));
        } else if (request instanceof UpdateComponentMessage updateComponentMessage) { (4)
            return bulkAggregate.updateComponent(
                    new UpdateComponentCommand(BulkId.of(updateComponentMessage.getBulkId()),
                            BulkComponentId.of(updateComponentMessage.getComponentId()),
                            updateComponentMessage.getContent())
            ).thenCompose(response -> sendStatusMessage(response, request));
        } else if (request instanceof CloseBulkMessage closeBulkMessage) { (5)
            return bulkAggregate.closeBulk(new CloseBulkCommand(BulkId.of(closeBulkMessage.getBulkId())))
                    .thenCompose(response -> sendStatusMessage(response, request));
        } else if (request instanceof RemoveComponentMessage removeComponentMessage) { (6)
            return bulkAggregate.removeComponent(new RemoveComponentCommand(BulkId.of(removeComponentMessage.getBulkId()),
                            BulkComponentId.of(removeComponentMessage.getComponentId())))
                    .thenCompose(response -> sendStatusMessage(response, request));
        } else if (request instanceof FinaliseBulkMessage finaliseBulkMessage) { (7)
            return bulkAggregate.finaliseBulk(new FinaliseBulkCommand(BulkId.of(finaliseBulkMessage.getBulkId()), finaliseBulkMessage.getBulkFileName()))
                    .thenCompose(response -> sendStatusMessage(response, request))
                    .thenApply(ignored -> null);
        } else if (request instanceof TerminateBulkMessage terminateBulkMessage) { (8)
            return bulkAggregate.terminateBulk(new TerminateBulkCommand(BulkId.of(terminateBulkMessage.getBulkId())))
                    .thenCompose(response -> sendStatusMessage(response, request));
        } else if (request instanceof GetBulkReportMessage getBulkReportMessage) { (9)
            return bulkAggregate.getBulkReport(new GetBulkReportCommand(BulkId.of(getBulkReportMessage.getBulkId())))
                    .thenCompose(response -> sendStatusMessage(response, request));
        } else if (request instanceof CreateRecurringBulkMessage createRecurringBulkMessage) { (10)
            return recurringBulkAggregate.configureBulk(
                    new ConfigureBulkCommand(
                            BulkId.of(createRecurringBulkMessage.getBulkId()), createRecurringBulkMessage.getConfigName()
                    )
            ).thenCompose(currentOpenBulkResponse -> sendStatusMessage(currentOpenBulkResponse, request));
        } else if (request instanceof AddRecurringBulkComponentMessage addRecurringBulkComponentMessage) { (11)
            return recurringBulkAggregate.addComponent(
                    new com.iconsolutions.ipf.bulk.aggregate.api.recurring.command.AddComponentCommand(
                            CommandId.of(addRecurringBulkComponentMessage.getRequestId()),
                            BulkId.of(addRecurringBulkComponentMessage.getBulkId()),
                            BulkComponentId.of(addRecurringBulkComponentMessage.getComponentId()),
                            addRecurringBulkComponentMessage.getPath(),
                            addRecurringBulkComponentMessage.getContent(),
                            addRecurringBulkComponentMessage.getConfigName()
                    )
            ).thenCompose(recurringBulkComponentIdResponse -> sendStatusMessage(recurringBulkComponentIdResponse, request));
        } else if (request instanceof GetCurrentOpenBulkMessage getCurrentOpenBulkMessage) { (12)
            return recurringBulkAggregate.getCurrentOpenBulk(new GetCurrentOpenBulkCommand(BulkId.of(getCurrentOpenBulkMessage.getBulkId())))
                    .thenCompose(currentOpenBulkResponse -> sendStatusMessage(currentOpenBulkResponse, request));
        } else if (request instanceof RejectBulkMessage rejectBulkMessage) { (13)
            return bulkAggregate.rejectBulk(new RejectBulkCommand(
                            BulkId.of(rejectBulkMessage.getBulkId()),
                            rejectBulkMessage.getRejectBulkReason(),
                            rejectBulkMessage.getRejectDescription()))
                    .thenCompose(rejectBulkResponse -> sendStatusMessage(rejectBulkResponse, request));
        } else if (request instanceof ArchiveBulkMessage archiveBulkMessage) { (14)
            return bulkAggregate.archiveBulk(new ArchiveBulkCommand(
                            BulkId.of(archiveBulkMessage.getBulkId())))
                    .thenCompose(archiveBulkResponse -> sendStatusMessage(archiveBulkResponse, request));
        } else if (request instanceof CompleteBulkMessage completeBulkMessage) { (15)
            return bulkAggregate.completeBulk(new CompleteBulkCommand(
                            BulkId.of(completeBulkMessage.getBulkId())))
                    .thenCompose(completeBulkResponse -> sendStatusMessage(completeBulkResponse, request));
        }

        log.warn("Unhandled BulkIngestionMessage: {}", request);
        return CompletableFuture.completedStage(null);
    }
1 CreateBulkCommand - creates a new bulk ready for components to be added to
2 AddComponentCommand - adds a component to an already opened bulk
3 AddComponentWithAutoCreateCommand - adds a component to a bulk, if the bulk is not created, a new one will be created automatically
4 UpdateComponentCommand - updates a component already stored in a bulk
5 CloseBulkCommand - closes a bulk so that no more components can be added to it
6 RemoveComponentCommand - removes a component from a bulk
7 FinaliseBulkCommand - once a bulk is closed and before it is written to file, this command will allow the header to be updated (ie to calculate totals and checksums, populate creations dates etc)
8 TerminateBulkCommand - Deletes am open or closed bulk
9 GetBulkReportCommand - returns the size of the bulk and the number of components at each level
10 ConfigureBulkCommand - this command is processed by the recurringBulkAggregate
11 AddComponentCommand - this command is processed by the recurringBulkAggregate
12 GetCurrentOpenBulkCommand - this command is processed by the recurringBulkAggregate
13 RejectBulkCommand - rejects a finalised bulk and deletes a produced bulk file
14 ArchiveBulkCommand - copies the produced bulk file from its output location to the archive
15 CompleteBulkCommand - completes a finalised bulk

The recurring bulk aggregate is responsible for managing single bulks.

Supported Request Messages

Further details of the supported request messages can be found here.

Applicable Configuration

Config

Type

Comment

Default

ipf.bulker.bulk-ingestion.kafka.consumer.topic

String

Topic is listen for BulkIngestionMessage requests

BULK_INGESTION_REQUEST