Interacting With the Bulker
There are two ways to interact with the Bulker
-
Embedded
-
External
Embedded
To access bulk functionality from within the same application (embedded), add the following module
<dependency>
<groupId>com.iconsolutions.ipf.bulk</groupId>
<artifactId>ipf-bulker-aggregate-akka</artifactId>
</dependency>
This provides the API (from ipf-bulker-aggregate-api) and a default Akka implementation for the Bulker Aggregate. More details of the operations available via this API are available from Bulk Aggregate.
External
To process requests from outside the application to perform Bulker operations, include the following module:
<dependency>
<groupId>com.iconsolutions.ipf.bulk</groupId>
<artifactId>ipf-bulker-ingestion-connector-kafka</artifactId>
</dependency>
This provides a ReceiveConnector listening on a Kafka topic for requests to perform Bulk operations. Now, an implementation of BulkIngestionReceiveClientAdapter needs to provided in the application context to handle the various supported request message types detailed below.
An example implementation is provided below, this example also shows how each message type maps to the corresponding bulk aggregate call:
BulkIngestionReceiveAdapter.java
public class BulkIngestionReceiveAdapter implements BulkIngestionReceiveClientAdapter {
private final BulkAggregate bulkAggregate;
private final RecurringBulkAggregate recurringBulkAggregate;
private final BulkStatusAdapter bulkStatusAdapter;
@Override
public ProcessingContext determineContextFor(BulkIngestionMessage request) {
return ProcessingContext.builder()
.clientRequestId(request.getRequestId())
.associationId(AssociationId.unknown())
.unitOfWorkId(UnitOfWorkId.createRandom())
.build();
}
@Override
public CompletionStage<Void> handle(ReceivingContext context, BulkIngestionMessage request) {
if (request instanceof CreateBulkMessage) { (1)
CreateBulkMessage createBulkMessage = (CreateBulkMessage) request;
CreateBulkCommand createBulkCommand = new CreateBulkCommand(BulkId.of(createBulkMessage.getBulkId()),
createBulkMessage.getConfigName());
return bulkAggregate.createBulk(createBulkCommand)
.thenCompose(bulkIdResponse -> sendStatusMessage(bulkIdResponse, request));
} else if (request instanceof AddComponentMessage) { (2)
AddComponentMessage addComponentMessage = (AddComponentMessage) request;
return bulkAggregate.addComponent(
new AddComponentCommand(BulkId.of(addComponentMessage.getBulkId()),
BulkComponentId.of(addComponentMessage.getParentId()),
addComponentMessage.getPath(),
addComponentMessage.getContent())
).thenCompose(response -> sendStatusMessage(response, request));
} else if (request instanceof AddComponentWithBulkAutoCreateMessage) { (3)
AddComponentWithBulkAutoCreateMessage addComponentMessage = (AddComponentWithBulkAutoCreateMessage) request;
return bulkAggregate.addComponent(
new AddComponentWithAutoCreateCommand(BulkId.of(addComponentMessage.getBulkId()),
BulkComponentId.of(addComponentMessage.getParentId()),
addComponentMessage.getPath(),
addComponentMessage.getContent(),
addComponentMessage.getConfigName())
).thenCompose(response -> sendStatusMessage(response, request));
} else if (request instanceof UpdateComponentMessage) { (4)
UpdateComponentMessage updateComponentMessage = (UpdateComponentMessage) request;
return bulkAggregate.updateComponent(
new UpdateComponentCommand(BulkId.of(updateComponentMessage.getBulkId()),
BulkComponentId.of(updateComponentMessage.getComponentId()),
updateComponentMessage.getContent())
).thenCompose(response -> sendStatusMessage(response, request));
} else if (request instanceof CloseBulkMessage) { (5)
CloseBulkMessage closeBulkMessage = (CloseBulkMessage) request;
return bulkAggregate.closeBulk(new CloseBulkCommand(BulkId.of(closeBulkMessage.getBulkId())))
.thenCompose(response -> sendStatusMessage(response, request));
} else if (request instanceof RemoveComponentMessage) { (6)
RemoveComponentMessage removeComponentMessage = (RemoveComponentMessage) request;
return bulkAggregate.removeComponent(new RemoveComponentCommand(BulkId.of(removeComponentMessage.getBulkId()),
BulkComponentId.of(removeComponentMessage.getComponentId())))
.thenCompose(response -> sendStatusMessage(response, request));
} else if (request instanceof FinaliseBulkMessage) { (7)
FinaliseBulkMessage finaliseBulkMessage = (FinaliseBulkMessage) request;
return bulkAggregate.finaliseBulk(new FinaliseBulkCommand(BulkId.of(finaliseBulkMessage.getBulkId()), finaliseBulkMessage.getBulkFileName()))
.thenCompose(response -> sendStatusMessage(response, request))
.thenApply(ignored -> null);
} else if (request instanceof TerminateBulkMessage) { (8)
TerminateBulkMessage terminateBulkMessage = (TerminateBulkMessage) request;
return bulkAggregate.terminateBulk(new TerminateBulkCommand(BulkId.of(terminateBulkMessage.getBulkId())))
.thenCompose(response -> sendStatusMessage(response, request));
} else if (request instanceof GetBulkReportMessage) { (9)
GetBulkReportMessage getBulkReportMessage = (GetBulkReportMessage) request;
return bulkAggregate.getBulkReport(new GetBulkReportCommand(BulkId.of(getBulkReportMessage.getBulkId())))
.thenCompose(response -> sendStatusMessage(response, request));
} else if (request instanceof CreateRecurringBulkMessage) { (10)
CreateRecurringBulkMessage createRecurringBulkMessage = (CreateRecurringBulkMessage) request;
return recurringBulkAggregate.configureBulk(
new ConfigureBulkCommand(
BulkId.of(createRecurringBulkMessage.getBulkId()), createRecurringBulkMessage.getConfigName()
)
).thenCompose(currentOpenBulkResponse -> sendStatusMessage(currentOpenBulkResponse, request));
} else if (request instanceof AddRecurringBulkComponentMessage) { (11)
AddRecurringBulkComponentMessage addRecurringBulkComponentMessage = (AddRecurringBulkComponentMessage) request;
return recurringBulkAggregate.addComponent(
new com.iconsolutions.ipf.bulk.aggregate.api.recurring.command.AddComponentCommand(
CommandId.of(addRecurringBulkComponentMessage.getRequestId()),
BulkId.of(addRecurringBulkComponentMessage.getBulkId()),
addRecurringBulkComponentMessage.getPath(),
addRecurringBulkComponentMessage.getContent(),
addRecurringBulkComponentMessage.getConfigName()
)
).thenCompose(recurringBulkComponentIdResponse -> sendStatusMessage(recurringBulkComponentIdResponse, request));
} else if (request instanceof GetCurrentOpenBulkMessage) { (12)
GetCurrentOpenBulkMessage getCurrentOpenBulkMessage = (GetCurrentOpenBulkMessage) request;
return recurringBulkAggregate.getCurrentOpenBulk(new GetCurrentOpenBulkCommand(BulkId.of(getCurrentOpenBulkMessage.getBulkId())))
.thenCompose(currentOpenBulkResponse -> sendStatusMessage(currentOpenBulkResponse, request));
} else if (request instanceof RejectBulkMessage) { (13)
RejectBulkMessage rejectBulkMessage = (RejectBulkMessage) request;
return bulkAggregate.rejectBulk(new RejectBulkCommand(
BulkId.of(rejectBulkMessage.getBulkId()),
rejectBulkMessage.getRejectBulkReason(),
rejectBulkMessage.getRejectDescription()))
.thenCompose(rejectBulkResponse -> sendStatusMessage(rejectBulkResponse, request));
} else if (request instanceof ArchiveBulkMessage) { (14)
ArchiveBulkMessage archiveBulkMessage = (ArchiveBulkMessage) request;
return bulkAggregate.archiveBulk(new ArchiveBulkCommand(
BulkId.of(archiveBulkMessage.getBulkId())))
.thenCompose(archiveBulkResponse -> sendStatusMessage(archiveBulkResponse, request));
}
log.warn("Unhandled BulkIngestionMessage: {}", request);
return CompletableFuture.completedStage(null);
}
| 1 | CreateBulkCommand |
| 2 | AddComponentCommand |
| 3 | AddComponentWithAutoCreateCommand |
| 4 | UpdateComponentCommand |
| 5 | CloseBulkCommand |
| 6 | RemoveComponentCommand |
| 7 | FinaliseBulkCommand |
| 8 | TerminateBulkCommand |
| 9 | GetBulkReportCommand |
| 10 | ConfigureBulkCommand - this command is processed by the recurringBulkAggregate |
| 11 | AddComponentCommand - this command is processed by the recurringBulkAggregate |
| 12 | GetCurrentOpenBulkCommand - this command is processed by the recurringBulkAggregate |
| 13 | RejectBulkCommand |
| 14 | ArchiveBulkCommand |
The recurring bulk aggregate is responsible for managing single bulks
Supported Request Messages
Further details of the supported request messages can be found here