Interactuando con el Bulker
Hay dos formas de interactuar con el Bulker:
-
Integrado (Embedded)
-
Externo
Integrado
Para acceder a la funcionalidad de bulk desde dentro de la misma aplicación (embedded), añade el siguiente módulo
<dependency>
<groupId>com.iconsolutions.ipf.bulk</groupId>
<artifactId>ipf-bulker-aggregate-akka</artifactId>
</dependency>
Esto proporciona la API (desde ipf-bulker-aggregate-api) y una implementación Akka por defecto para el Bulk Aggregate. Más detalles de las operaciones disponibles a través de esta API están disponibles en Bulk Aggregate.
Externo
Para procesar solicitudes desde fuera de la aplicación para realizar operaciones del Bulker, incluye el siguiente módulo:
<dependency>
<groupId>com.iconsolutions.ipf.bulk</groupId>
<artifactId>ipf-bulker-ingestion-connector-kafka</artifactId>
</dependency>
Esto proporciona un ReceiveConnector que escucha en un tópico de Kafka las solicitudes para realizar operaciones de Bulk. Ahora, se debe proporcionar una implementación de BulkIngestionReceiveClientAdapter en el contexto de la aplicación para manejar los diferentes tipos de mensajes de solicitud soportados que se detallan a continuación.
A continuación se proporciona una implementación de ejemplo; este ejemplo también muestra cómo cada tipo de mensaje se asigna a la llamada correspondiente del bulk aggregate:
BulkIngestionReceiveAdapter.java
public class BulkIngestionReceiveAdapter implements BulkIngestionReceiveClientAdapter {
private final BulkAggregate bulkAggregate;
private final RecurringBulkAggregate recurringBulkAggregate;
private final BulkStatusAdapter bulkStatusAdapter;
@Override
public ProcessingContext determineContextFor(BulkIngestionMessage request) {
return ProcessingContext.builder()
.clientRequestId(request.getRequestId())
.associationId(AssociationId.unknown())
.unitOfWorkId(UnitOfWorkId.createRandom())
.build();
}
@Override
public CompletionStage<Void> handle(ReceivingContext context, BulkIngestionMessage request) {
if (request instanceof CreateBulkMessage createBulkMessage) { (1)
CreateBulkCommand createBulkCommand = new CreateBulkCommand(BulkId.of(createBulkMessage.getBulkId()),
createBulkMessage.getConfigName());
return bulkAggregate.createBulk(createBulkCommand)
.thenCompose(bulkIdResponse -> sendStatusMessage(bulkIdResponse, request));
} else if (request instanceof AddComponentMessage addComponentMessage) { (2)
return bulkAggregate.addComponent(
new AddComponentCommand(BulkId.of(addComponentMessage.getBulkId()),
BulkComponentId.of(addComponentMessage.getParentId()),
BulkComponentId.of(addComponentMessage.getComponentId()),
addComponentMessage.getPath(),
addComponentMessage.getContent())
).thenCompose(response -> sendStatusMessage(response, request));
} else if (request instanceof AddComponentWithBulkAutoCreateMessage addComponentMessage) { (3)
return bulkAggregate.addComponent(
new AddComponentWithAutoCreateCommand(BulkId.of(addComponentMessage.getBulkId()),
BulkComponentId.of(addComponentMessage.getParentId()),
BulkComponentId.of(addComponentMessage.getComponentId()),
addComponentMessage.getPath(),
addComponentMessage.getContent(),
addComponentMessage.getConfigName())
).thenCompose(response -> sendStatusMessage(response, request));
} else if (request instanceof UpdateComponentMessage updateComponentMessage) { (4)
return bulkAggregate.updateComponent(
new UpdateComponentCommand(BulkId.of(updateComponentMessage.getBulkId()),
BulkComponentId.of(updateComponentMessage.getComponentId()),
updateComponentMessage.getContent())
).thenCompose(response -> sendStatusMessage(response, request));
} else if (request instanceof CloseBulkMessage closeBulkMessage) { (5)
return bulkAggregate.closeBulk(new CloseBulkCommand(BulkId.of(closeBulkMessage.getBulkId())))
.thenCompose(response -> sendStatusMessage(response, request));
} else if (request instanceof RemoveComponentMessage removeComponentMessage) { (6)
return bulkAggregate.removeComponent(new RemoveComponentCommand(BulkId.of(removeComponentMessage.getBulkId()),
BulkComponentId.of(removeComponentMessage.getComponentId())))
.thenCompose(response -> sendStatusMessage(response, request));
} else if (request instanceof FinaliseBulkMessage finaliseBulkMessage) { (7)
return bulkAggregate.finaliseBulk(new FinaliseBulkCommand(BulkId.of(finaliseBulkMessage.getBulkId()), finaliseBulkMessage.getBulkFileName()))
.thenCompose(response -> sendStatusMessage(response, request))
.thenApply(ignored -> null);
} else if (request instanceof TerminateBulkMessage terminateBulkMessage) { (8)
return bulkAggregate.terminateBulk(new TerminateBulkCommand(BulkId.of(terminateBulkMessage.getBulkId())))
.thenCompose(response -> sendStatusMessage(response, request));
} else if (request instanceof GetBulkReportMessage getBulkReportMessage) { (9)
return bulkAggregate.getBulkReport(new GetBulkReportCommand(BulkId.of(getBulkReportMessage.getBulkId())))
.thenCompose(response -> sendStatusMessage(response, request));
} else if (request instanceof CreateRecurringBulkMessage createRecurringBulkMessage) { (10)
return recurringBulkAggregate.configureBulk(
new ConfigureBulkCommand(
BulkId.of(createRecurringBulkMessage.getBulkId()), createRecurringBulkMessage.getConfigName()
)
).thenCompose(currentOpenBulkResponse -> sendStatusMessage(currentOpenBulkResponse, request));
} else if (request instanceof AddRecurringBulkComponentMessage addRecurringBulkComponentMessage) { (11)
return recurringBulkAggregate.addComponent(
new com.iconsolutions.ipf.bulk.aggregate.api.recurring.command.AddComponentCommand(
CommandId.of(addRecurringBulkComponentMessage.getRequestId()),
BulkId.of(addRecurringBulkComponentMessage.getBulkId()),
BulkComponentId.of(addRecurringBulkComponentMessage.getComponentId()),
addRecurringBulkComponentMessage.getPath(),
addRecurringBulkComponentMessage.getContent(),
addRecurringBulkComponentMessage.getConfigName()
)
).thenCompose(recurringBulkComponentIdResponse -> sendStatusMessage(recurringBulkComponentIdResponse, request));
} else if (request instanceof GetCurrentOpenBulkMessage getCurrentOpenBulkMessage) { (12)
return recurringBulkAggregate.getCurrentOpenBulk(new GetCurrentOpenBulkCommand(BulkId.of(getCurrentOpenBulkMessage.getBulkId())))
.thenCompose(currentOpenBulkResponse -> sendStatusMessage(currentOpenBulkResponse, request));
} else if (request instanceof RejectBulkMessage rejectBulkMessage) { (13)
return bulkAggregate.rejectBulk(new RejectBulkCommand(
BulkId.of(rejectBulkMessage.getBulkId()),
rejectBulkMessage.getRejectBulkReason(),
rejectBulkMessage.getRejectDescription()))
.thenCompose(rejectBulkResponse -> sendStatusMessage(rejectBulkResponse, request));
} else if (request instanceof ArchiveBulkMessage archiveBulkMessage) { (14)
return bulkAggregate.archiveBulk(new ArchiveBulkCommand(
BulkId.of(archiveBulkMessage.getBulkId())))
.thenCompose(archiveBulkResponse -> sendStatusMessage(archiveBulkResponse, request));
} else if (request instanceof CompleteBulkMessage completeBulkMessage) { (15)
return bulkAggregate.completeBulk(new CompleteBulkCommand(
BulkId.of(completeBulkMessage.getBulkId())))
.thenCompose(completeBulkResponse -> sendStatusMessage(completeBulkResponse, request));
}
log.warn("Unhandled BulkIngestionMessage: {}", request);
return CompletableFuture.completedStage(null);
}
| 1 | CreateBulkCommand - crea un nuevo bulk listo para agregarle componentes |
| 2 | AddComponentCommand - agrega un componente a un bulk ya abierto |
| 3 | AddComponentWithAutoCreateCommand - agrega un componente a un bulk; si el bulk no existe, se creará uno nuevo automáticamente |
| 4 | UpdateComponentCommand - actualiza un componente ya almacenado en un bulk |
| 5 | CloseBulkCommand - cierra un bulk para que no se le puedan añadir más componentes |
| 6 | RemoveComponentCommand - elimina un componente de un bulk |
| 7 | FinaliseBulkCommand - una vez que un bulk está cerrado y antes de que se escriba en archivo, este comando permite actualizar el encabezado (por ejemplo, calcular totales y checksums, completar fechas de creación, etc.) |
| 8 | TerminateBulkCommand - elimina un bulk abierto o cerrado |
| 9 | GetBulkReportCommand - devuelve el tamaño del bulk y el número de componentes en cada nivel |
| 10 | ConfigureBulkCommand - este comando es procesado por el recurringBulkAggregate |
| 11 | AddComponentCommand - este comando es procesado por el recurringBulkAggregate |
| 12 | GetCurrentOpenBulkCommand - este comando es procesado por el recurringBulkAggregate |
| 13 | RejectBulkCommand - rechaza un bulk finalizado y elimina el archivo de bulk producido |
| 14 | ArchiveBulkCommand - copia el archivo de bulk producido desde su ubicación de salida al archivo de archivo (archive) |
| 15 | CompleteBulkCommand - completa un bulk finalizado |
El recurring bulk aggregate es responsable de gestionar bulks individuales.
Mensajes de solicitud soportados
Puedes encontrar más detalles de los mensajes de solicitud soportados aquí.