Documentation for a newer release is available. View Latest
Esta página no está disponible actualmente en Español. Si lo necesita, póngase en contacto con el servicio de asistencia de Icon (correo electrónico)

Interacting With the Bulker

There are two ways to interact with the Bulker

  • Embedded

  • External

Embedded

To access bulk functionality from within the same application (embedded), add the following module

<dependency>
    <groupId>com.iconsolutions.ipf.bulk</groupId>
    <artifactId>ipf-bulker-aggregate-akka</artifactId>
</dependency>

This provides the API (from ipf-bulker-aggregate-api) and a default Akka implementation for the Bulker Aggregate. More details of the operations available via this API are available from Bulk Aggregate.

External

To process requests from outside the application to perform Bulker operations, include the following module:

<dependency>
    <groupId>com.iconsolutions.ipf.bulk</groupId>
    <artifactId>ipf-bulker-ingestion-connector-kafka</artifactId>
</dependency>

This provides a ReceiveConnector listening on a Kafka topic for requests to perform Bulk operations. Now, an implementation of BulkIngestionReceiveClientAdapter needs to provided in the application context to handle the various supported request message types detailed below.

An example implementation is provided below, this example also shows how each message type maps to the corresponding bulk aggregate call:

BulkIngestionReceiveAdapter.java
public class BulkIngestionReceiveAdapter implements BulkIngestionReceiveClientAdapter {

    private final BulkAggregate bulkAggregate;
    private final RecurringBulkAggregate recurringBulkAggregate;
    private final BulkStatusAdapter bulkStatusAdapter;

    @Override
    public ProcessingContext determineContextFor(BulkIngestionMessage request) {
        return ProcessingContext.builder()
                .clientRequestId(request.getRequestId())
                .associationId(AssociationId.unknown())
                .unitOfWorkId(UnitOfWorkId.createRandom())
                .build();
    }

    @Override
    public CompletionStage<Void> handle(ReceivingContext context, BulkIngestionMessage request) {
        if (request instanceof CreateBulkMessage) { (1)
            CreateBulkMessage createBulkMessage = (CreateBulkMessage) request;
            CreateBulkCommand createBulkCommand = new CreateBulkCommand(BulkId.of(createBulkMessage.getBulkId()),
                    createBulkMessage.getConfigName());
            return bulkAggregate.createBulk(createBulkCommand)
                    .thenCompose(bulkIdResponse -> sendStatusMessage(bulkIdResponse, request));
        } else if (request instanceof AddComponentMessage) { (2)
            AddComponentMessage addComponentMessage = (AddComponentMessage) request;
            return bulkAggregate.addComponent(
                    new AddComponentCommand(BulkId.of(addComponentMessage.getBulkId()),
                            BulkComponentId.of(addComponentMessage.getParentId()),
                            addComponentMessage.getPath(),
                            addComponentMessage.getContent())
            ).thenCompose(response -> sendStatusMessage(response, request));
        } else if (request instanceof AddComponentWithBulkAutoCreateMessage) { (3)
            AddComponentWithBulkAutoCreateMessage addComponentMessage = (AddComponentWithBulkAutoCreateMessage) request;
            return bulkAggregate.addComponent(
                    new AddComponentWithAutoCreateCommand(BulkId.of(addComponentMessage.getBulkId()),
                            BulkComponentId.of(addComponentMessage.getParentId()),
                            addComponentMessage.getPath(),
                            addComponentMessage.getContent(),
                            addComponentMessage.getConfigName())
            ).thenCompose(response -> sendStatusMessage(response, request));
        } else if (request instanceof UpdateComponentMessage) { (4)
            UpdateComponentMessage updateComponentMessage = (UpdateComponentMessage) request;
            return bulkAggregate.updateComponent(
                    new UpdateComponentCommand(BulkId.of(updateComponentMessage.getBulkId()),
                            BulkComponentId.of(updateComponentMessage.getComponentId()),
                            updateComponentMessage.getContent())
            ).thenCompose(response -> sendStatusMessage(response, request));
        } else if (request instanceof CloseBulkMessage) { (5)
            CloseBulkMessage closeBulkMessage = (CloseBulkMessage) request;
            return bulkAggregate.closeBulk(new CloseBulkCommand(BulkId.of(closeBulkMessage.getBulkId())))
                    .thenCompose(response -> sendStatusMessage(response, request));
        } else if (request instanceof RemoveComponentMessage) { (6)
            RemoveComponentMessage removeComponentMessage = (RemoveComponentMessage) request;
            return bulkAggregate.removeComponent(new RemoveComponentCommand(BulkId.of(removeComponentMessage.getBulkId()),
                            BulkComponentId.of(removeComponentMessage.getComponentId())))
                    .thenCompose(response -> sendStatusMessage(response, request));
        } else if (request instanceof FinaliseBulkMessage) { (7)
            FinaliseBulkMessage finaliseBulkMessage = (FinaliseBulkMessage) request;
            return bulkAggregate.finaliseBulk(new FinaliseBulkCommand(BulkId.of(finaliseBulkMessage.getBulkId()), finaliseBulkMessage.getBulkFileName()))
                    .thenCompose(response -> sendStatusMessage(response, request))
                    .thenApply(ignored -> null);
        } else if (request instanceof TerminateBulkMessage) { (8)
            TerminateBulkMessage terminateBulkMessage = (TerminateBulkMessage) request;
            return bulkAggregate.terminateBulk(new TerminateBulkCommand(BulkId.of(terminateBulkMessage.getBulkId())))
                    .thenCompose(response -> sendStatusMessage(response, request));
        } else if (request instanceof GetBulkReportMessage) { (9)
            GetBulkReportMessage getBulkReportMessage = (GetBulkReportMessage) request;
            return bulkAggregate.getBulkReport(new GetBulkReportCommand(BulkId.of(getBulkReportMessage.getBulkId())))
                    .thenCompose(response -> sendStatusMessage(response, request));
        } else if (request instanceof CreateRecurringBulkMessage) { (10)
            CreateRecurringBulkMessage createRecurringBulkMessage = (CreateRecurringBulkMessage) request;
            return recurringBulkAggregate.configureBulk(
                    new ConfigureBulkCommand(
                            BulkId.of(createRecurringBulkMessage.getBulkId()), createRecurringBulkMessage.getConfigName()
                    )
            ).thenCompose(currentOpenBulkResponse -> sendStatusMessage(currentOpenBulkResponse, request));
        } else if (request instanceof AddRecurringBulkComponentMessage) { (11)
            AddRecurringBulkComponentMessage addRecurringBulkComponentMessage = (AddRecurringBulkComponentMessage) request;
            return recurringBulkAggregate.addComponent(
                    new com.iconsolutions.ipf.bulk.aggregate.api.recurring.command.AddComponentCommand(
                            CommandId.of(addRecurringBulkComponentMessage.getRequestId()),
                            BulkId.of(addRecurringBulkComponentMessage.getBulkId()),
                            addRecurringBulkComponentMessage.getPath(),
                            addRecurringBulkComponentMessage.getContent(),
                            addRecurringBulkComponentMessage.getConfigName()
                    )
            ).thenCompose(recurringBulkComponentIdResponse -> sendStatusMessage(recurringBulkComponentIdResponse, request));
        } else if (request instanceof GetCurrentOpenBulkMessage) { (12)
            GetCurrentOpenBulkMessage getCurrentOpenBulkMessage = (GetCurrentOpenBulkMessage) request;
            return recurringBulkAggregate.getCurrentOpenBulk(new GetCurrentOpenBulkCommand(BulkId.of(getCurrentOpenBulkMessage.getBulkId())))
                    .thenCompose(currentOpenBulkResponse -> sendStatusMessage(currentOpenBulkResponse, request));
        } else if (request instanceof RejectBulkMessage) { (13)
            RejectBulkMessage rejectBulkMessage = (RejectBulkMessage) request;
            return bulkAggregate.rejectBulk(new RejectBulkCommand(
                            BulkId.of(rejectBulkMessage.getBulkId()),
                            rejectBulkMessage.getRejectBulkReason(),
                            rejectBulkMessage.getRejectDescription()))
                    .thenCompose(rejectBulkResponse -> sendStatusMessage(rejectBulkResponse, request));
        } else if (request instanceof ArchiveBulkMessage) { (14)
            ArchiveBulkMessage archiveBulkMessage = (ArchiveBulkMessage) request;
            return bulkAggregate.archiveBulk(new ArchiveBulkCommand(
                            BulkId.of(archiveBulkMessage.getBulkId())))
                    .thenCompose(archiveBulkResponse -> sendStatusMessage(archiveBulkResponse, request));
        }

        log.warn("Unhandled BulkIngestionMessage: {}", request);
        return CompletableFuture.completedStage(null);
    }
1 CreateBulkCommand
2 AddComponentCommand
3 AddComponentWithAutoCreateCommand
4 UpdateComponentCommand
5 CloseBulkCommand
6 RemoveComponentCommand
7 FinaliseBulkCommand
8 TerminateBulkCommand
9 GetBulkReportCommand
10 ConfigureBulkCommand - this command is processed by the recurringBulkAggregate
11 AddComponentCommand - this command is processed by the recurringBulkAggregate
12 GetCurrentOpenBulkCommand - this command is processed by the recurringBulkAggregate
13 RejectBulkCommand
14 ArchiveBulkCommand

The recurring bulk aggregate is responsible for managing single bulks

Supported Request Messages

Further details of the supported request messages can be found here

Applicable Configuration

Config

Type

Comment

Default

ipf.bulker.bulk-ingestion.kafka.consumer.topic

String

Topic is listen for BulkIngestionMessage requests

BULK_INGESTION_REQUEST