Interacción con el Bulker
Existen dos maneras de interactuar con el Bulker:
-
Incorporado
-
Externo
Embedded
Para acceder bulk funcionalidad desde dentro de la misma aplicación (embedded), añada el siguiente módulo
<dependency>
<groupId>com.iconsolutions.ipf.bulk</groupId>
<artifactId>ipf-bulker-aggregate-akka</artifactId>
</dependency>
Esto proporciona el API--ipf-bulker-aggregate-api) y un valor predeterminado Akka implementación para el Bulker Agregue. Más detalles de las operaciones disponibles a través de esto. API están disponibles desde Agregado a Granel.
Externo
Para procesar solicitudes desde fuera de la aplicación para realizar Bulker operaciones, incluya el siguiente módulo:
<dependency>
<groupId>com.iconsolutions.ipf.bulk</groupId>
<artifactId>ipf-bulker-ingestion-connector-kafka</artifactId>
</dependency>
Esto proporciona un ReceiveConnector que escucha en un Kafka tema para solicitudes de ejecución Bulk operaciones. Ahora, una implementación de BulkIngestionReceiveClientAdapter needs to provided in the application context para manejar las diversas solicitudes soportadas message type s detallado a continuación.
A continuación se proporciona un ejemplo de implementación, este ejemplo también muestra cómo cada message type se asigna a lo correspondiente bulk llamada agregada:
BulkIngestionReceiveAdapter.java
public class BulkIngestionReceiveAdapter implements BulkIngestionReceiveClientAdapter {
private final BulkAggregate bulkAggregate;
private final RecurringBulkAggregate recurringBulkAggregate;
private final BulkStatusAdapter bulkStatusAdapter;
@Override
public ProcessingContext determineContextFor(BulkIngestionMessage request) {
return ProcessingContext.builder()
.clientRequestId(request.getRequestId())
.associationId(AssociationId.unknown())
.unitOfWorkId(UnitOfWorkId.createRandom())
.build();
}
@Override
public CompletionStage<Void> handle(ReceivingContext context, BulkIngestionMessage request) {
if (request instanceof CreateBulkMessage createBulkMessage) { (1)
CreateBulkCommand createBulkCommand = new CreateBulkCommand(BulkId.of(createBulkMessage.getBulkId()),
createBulkMessage.getConfigName());
return bulkAggregate.createBulk(createBulkCommand)
.thenCompose(bulkIdResponse -> sendStatusMessage(bulkIdResponse, request));
} else if (request instanceof AddComponentMessage addComponentMessage) { (2)
return bulkAggregate.addComponent(
new AddComponentCommand(BulkId.of(addComponentMessage.getBulkId()),
BulkComponentId.of(addComponentMessage.getParentId()),
BulkComponentId.of(addComponentMessage.getComponentId()),
addComponentMessage.getPath(),
addComponentMessage.getContent())
).thenCompose(response -> sendStatusMessage(response, request));
} else if (request instanceof AddComponentWithBulkAutoCreateMessage addComponentMessage) { (3)
return bulkAggregate.addComponent(
new AddComponentWithAutoCreateCommand(BulkId.of(addComponentMessage.getBulkId()),
BulkComponentId.of(addComponentMessage.getParentId()),
BulkComponentId.of(addComponentMessage.getComponentId()),
addComponentMessage.getPath(),
addComponentMessage.getContent(),
addComponentMessage.getConfigName())
).thenCompose(response -> sendStatusMessage(response, request));
} else if (request instanceof UpdateComponentMessage updateComponentMessage) { (4)
return bulkAggregate.updateComponent(
new UpdateComponentCommand(BulkId.of(updateComponentMessage.getBulkId()),
BulkComponentId.of(updateComponentMessage.getComponentId()),
updateComponentMessage.getContent())
).thenCompose(response -> sendStatusMessage(response, request));
} else if (request instanceof CloseBulkMessage closeBulkMessage) { (5)
return bulkAggregate.closeBulk(new CloseBulkCommand(BulkId.of(closeBulkMessage.getBulkId())))
.thenCompose(response -> sendStatusMessage(response, request));
} else if (request instanceof RemoveComponentMessage removeComponentMessage) { (6)
return bulkAggregate.removeComponent(new RemoveComponentCommand(BulkId.of(removeComponentMessage.getBulkId()),
BulkComponentId.of(removeComponentMessage.getComponentId())))
.thenCompose(response -> sendStatusMessage(response, request));
} else if (request instanceof FinaliseBulkMessage finaliseBulkMessage) { (7)
return bulkAggregate.finaliseBulk(new FinaliseBulkCommand(BulkId.of(finaliseBulkMessage.getBulkId()), finaliseBulkMessage.getBulkFileName()))
.thenCompose(response -> sendStatusMessage(response, request))
.thenApply(ignored -> null);
} else if (request instanceof TerminateBulkMessage terminateBulkMessage) { (8)
return bulkAggregate.terminateBulk(new TerminateBulkCommand(BulkId.of(terminateBulkMessage.getBulkId())))
.thenCompose(response -> sendStatusMessage(response, request));
} else if (request instanceof GetBulkReportMessage getBulkReportMessage) { (9)
return bulkAggregate.getBulkReport(new GetBulkReportCommand(BulkId.of(getBulkReportMessage.getBulkId())))
.thenCompose(response -> sendStatusMessage(response, request));
} else if (request instanceof CreateRecurringBulkMessage createRecurringBulkMessage) { (10)
return recurringBulkAggregate.configureBulk(
new ConfigureBulkCommand(
BulkId.of(createRecurringBulkMessage.getBulkId()), createRecurringBulkMessage.getConfigName()
)
).thenCompose(currentOpenBulkResponse -> sendStatusMessage(currentOpenBulkResponse, request));
} else if (request instanceof AddRecurringBulkComponentMessage addRecurringBulkComponentMessage) { (11)
return recurringBulkAggregate.addComponent(
new com.iconsolutions.ipf.bulk.aggregate.api.recurring.command.AddComponentCommand(
CommandId.of(addRecurringBulkComponentMessage.getRequestId()),
BulkId.of(addRecurringBulkComponentMessage.getBulkId()),
BulkComponentId.of(addRecurringBulkComponentMessage.getComponentId()),
addRecurringBulkComponentMessage.getPath(),
addRecurringBulkComponentMessage.getContent(),
addRecurringBulkComponentMessage.getConfigName()
)
).thenCompose(recurringBulkComponentIdResponse -> sendStatusMessage(recurringBulkComponentIdResponse, request));
} else if (request instanceof GetCurrentOpenBulkMessage getCurrentOpenBulkMessage) { (12)
return recurringBulkAggregate.getCurrentOpenBulk(new GetCurrentOpenBulkCommand(BulkId.of(getCurrentOpenBulkMessage.getBulkId())))
.thenCompose(currentOpenBulkResponse -> sendStatusMessage(currentOpenBulkResponse, request));
} else if (request instanceof RejectBulkMessage rejectBulkMessage) { (13)
return bulkAggregate.rejectBulk(new RejectBulkCommand(
BulkId.of(rejectBulkMessage.getBulkId()),
rejectBulkMessage.getRejectBulkReason(),
rejectBulkMessage.getRejectDescription()))
.thenCompose(rejectBulkResponse -> sendStatusMessage(rejectBulkResponse, request));
} else if (request instanceof ArchiveBulkMessage archiveBulkMessage) { (14)
return bulkAggregate.archiveBulk(new ArchiveBulkCommand(
BulkId.of(archiveBulkMessage.getBulkId())))
.thenCompose(archiveBulkResponse -> sendStatusMessage(archiveBulkResponse, request));
} else if (request instanceof CompleteBulkMessage completeBulkMessage) { (15)
return bulkAggregate.completeBulk(new CompleteBulkCommand(
BulkId.of(completeBulkMessage.getBulkId())))
.thenCompose(completeBulkResponse -> sendStatusMessage(completeBulkResponse, request));
}
log.warn("Unhandled BulkIngestionMessage: {}", request);
return CompletableFuture.completedStage(null);
}
| 1 | Crear Comando Masivo- crea un nuevo bulk listo para que se añadan componentes a |
| 2 | Agregar Comando De Componente- agrega un componente a un ya abierto bulk |
| 3 | Agregar Componente Con Comando Auto Crear- añade un componente a un bulk, si el bulk no se crea, se creará uno nuevo automáticamente |
| 4 | Actualizar Comando De Componente- actualiza un componente ya almacenado en un bulk |
| 5 | Cerrar Comando Masivo- cierra un bulk para que no se puedan añadir más componentes a él |
| 6 | Eliminar Comando De Componente- elimina un componente de un bulk |
| 7 | Finalice Comando Masivo- una vez un bulk está cerrado y antes de que se escriba en el archivo, este comando permitirá que se actualice el encabezado (es decir, calcular totales y sumas de verificación, poblar fechas de creación, etc.) |
| 8 | Terminar Comando Masivo- Elimina am abierto o cerrado bulk |
| 9 | Obtener Informe Masivo Comando- devuelve el tamaño de la bulk y el número de componentes en cada nivel |
| 10 | Configurar Comando Masivo- este comando es procesado por el recurringBulkAggregate |
| 11 | Agregar Comando De Componente- este comando es procesado por el recurringBulkAggregate |
| 12 | Obtener Comando Masivo Abierto Actual- este comando es procesado por el recurringBulkAggregate |
| 13 | Rechazar Comando Masivo- rechaza un finalizado bulk y elimina un producido bulk archivo |
| 14 | Comando Archivo Masivo- copia los producidos bulk archivo desde su ubicación de salida al archivo |
| 15 | Complete Bulk Command- completa un finalizado bulk |
La recurrente bulk aggregate es responsable de gestionar único bulks.
Mensajes de Solicitud Soportados
Se pueden encontrar más detalles sobre los mensajes de solicitud admitidos.aquí.