Interacción con el Bulker

Existen dos maneras de interactuar con el Bulker:

  • Incorporado

  • Externo

Embedded

Para acceder a la funcionalidad masiva desde dentro de la misma aplicación (integrada), añada el siguiente módulo.

<dependency>
    <groupId>com.iconsolutions.ipf.bulk</groupId>
    <artifactId>ipf-bulker-aggregate-akka</artifactId>
</dependency>

Esto proporciona el API--ipf-bulker-aggregate-api) y un valor predeterminado Akka implementación para el Bulker Agregue. Más detalles de las operaciones disponibles a través de esto. API están disponibles desde Agregado a granel.

Externo

Para procesar solicitudes desde fuera de la aplicación para realizar Bulker operaciones, incluya el siguiente módulo:

<dependency>
    <groupId>com.iconsolutions.ipf.bulk</groupId>
    <artifactId>ipf-bulker-ingestion-connector-kafka</artifactId>
</dependency>

Esto proporciona un ReceiveConnector que escucha en un Kafka tema para solicitudes de ejecución Bulk operaciones. Ahora, una implementación de BulkIngestionReceiveClientAdapter necesita ser proporcionado en el contexto de la aplicación para manejar los diversos tipos de mensajes de solicitud admitidos detallados a continuación.

A continuación se proporciona un ejemplo de implementación, este ejemplo también muestra cómo cada tipo de mensaje se asigna a la correspondiente llamada de agregado masivo:

BulkIngestionReceiveAdapter.java
public class BulkIngestionReceiveAdapter implements BulkIngestionReceiveClientAdapter {

    private final BulkAggregate bulkAggregate;
    private final RecurringBulkAggregate recurringBulkAggregate;
    private final BulkStatusAdapter bulkStatusAdapter;

    @Override
    public ProcessingContext determineContextFor(BulkIngestionMessage request) {
        return ProcessingContext.builder()
                .clientRequestId(request.getRequestId())
                .associationId(AssociationId.unknown())
                .unitOfWorkId(UnitOfWorkId.createRandom())
                .build();
    }

    @Override
    public CompletionStage<Void> handle(ReceivingContext context, BulkIngestionMessage request) {
        if (request instanceof CreateBulkMessage createBulkMessage) { (1)
            CreateBulkCommand createBulkCommand = new CreateBulkCommand(BulkId.of(createBulkMessage.getBulkId()),
                    createBulkMessage.getConfigName());
            return bulkAggregate.createBulk(createBulkCommand)
                    .thenCompose(bulkIdResponse -> sendStatusMessage(bulkIdResponse, request));
        } else if (request instanceof AddComponentMessage addComponentMessage) { (2)
            return bulkAggregate.addComponent(
                    new AddComponentCommand(BulkId.of(addComponentMessage.getBulkId()),
                            BulkComponentId.of(addComponentMessage.getParentId()),
                            BulkComponentId.of(addComponentMessage.getComponentId()),
                            addComponentMessage.getPath(),
                            addComponentMessage.getContent())
            ).thenCompose(response -> sendStatusMessage(response, request));
        } else if (request instanceof AddComponentWithBulkAutoCreateMessage addComponentMessage) { (3)
            return bulkAggregate.addComponent(
                    new AddComponentWithAutoCreateCommand(BulkId.of(addComponentMessage.getBulkId()),
                            BulkComponentId.of(addComponentMessage.getParentId()),
                            BulkComponentId.of(addComponentMessage.getComponentId()),
                            addComponentMessage.getPath(),
                            addComponentMessage.getContent(),
                            addComponentMessage.getConfigName())
            ).thenCompose(response -> sendStatusMessage(response, request));
        } else if (request instanceof UpdateComponentMessage updateComponentMessage) { (4)
            return bulkAggregate.updateComponent(
                    new UpdateComponentCommand(BulkId.of(updateComponentMessage.getBulkId()),
                            BulkComponentId.of(updateComponentMessage.getComponentId()),
                            updateComponentMessage.getContent())
            ).thenCompose(response -> sendStatusMessage(response, request));
        } else if (request instanceof CloseBulkMessage closeBulkMessage) { (5)
            return bulkAggregate.closeBulk(new CloseBulkCommand(BulkId.of(closeBulkMessage.getBulkId())))
                    .thenCompose(response -> sendStatusMessage(response, request));
        } else if (request instanceof RemoveComponentMessage removeComponentMessage) { (6)
            return bulkAggregate.removeComponent(new RemoveComponentCommand(BulkId.of(removeComponentMessage.getBulkId()),
                            BulkComponentId.of(removeComponentMessage.getComponentId())))
                    .thenCompose(response -> sendStatusMessage(response, request));
        } else if (request instanceof FinaliseBulkMessage finaliseBulkMessage) { (7)
            return bulkAggregate.finaliseBulk(new FinaliseBulkCommand(BulkId.of(finaliseBulkMessage.getBulkId()), finaliseBulkMessage.getBulkFileName()))
                    .thenCompose(response -> sendStatusMessage(response, request))
                    .thenApply(ignored -> null);
        } else if (request instanceof TerminateBulkMessage terminateBulkMessage) { (8)
            return bulkAggregate.terminateBulk(new TerminateBulkCommand(BulkId.of(terminateBulkMessage.getBulkId())))
                    .thenCompose(response -> sendStatusMessage(response, request));
        } else if (request instanceof GetBulkReportMessage getBulkReportMessage) { (9)
            return bulkAggregate.getBulkReport(new GetBulkReportCommand(BulkId.of(getBulkReportMessage.getBulkId())))
                    .thenCompose(response -> sendStatusMessage(response, request));
        } else if (request instanceof CreateRecurringBulkMessage createRecurringBulkMessage) { (10)
            return recurringBulkAggregate.configureBulk(
                    new ConfigureBulkCommand(
                            BulkId.of(createRecurringBulkMessage.getBulkId()), createRecurringBulkMessage.getConfigName()
                    )
            ).thenCompose(currentOpenBulkResponse -> sendStatusMessage(currentOpenBulkResponse, request));
        } else if (request instanceof AddRecurringBulkComponentMessage addRecurringBulkComponentMessage) { (11)
            return recurringBulkAggregate.addComponent(
                    new com.iconsolutions.ipf.bulk.aggregate.api.recurring.command.AddComponentCommand(
                            CommandId.of(addRecurringBulkComponentMessage.getRequestId()),
                            BulkId.of(addRecurringBulkComponentMessage.getBulkId()),
                            BulkComponentId.of(addRecurringBulkComponentMessage.getComponentId()),
                            addRecurringBulkComponentMessage.getPath(),
                            addRecurringBulkComponentMessage.getContent(),
                            addRecurringBulkComponentMessage.getConfigName()
                    )
            ).thenCompose(recurringBulkComponentIdResponse -> sendStatusMessage(recurringBulkComponentIdResponse, request));
        } else if (request instanceof GetCurrentOpenBulkMessage getCurrentOpenBulkMessage) { (12)
            return recurringBulkAggregate.getCurrentOpenBulk(new GetCurrentOpenBulkCommand(BulkId.of(getCurrentOpenBulkMessage.getBulkId())))
                    .thenCompose(currentOpenBulkResponse -> sendStatusMessage(currentOpenBulkResponse, request));
        } else if (request instanceof RejectBulkMessage rejectBulkMessage) { (13)
            return bulkAggregate.rejectBulk(new RejectBulkCommand(
                            BulkId.of(rejectBulkMessage.getBulkId()),
                            rejectBulkMessage.getRejectBulkReason(),
                            rejectBulkMessage.getRejectDescription()))
                    .thenCompose(rejectBulkResponse -> sendStatusMessage(rejectBulkResponse, request));
        } else if (request instanceof ArchiveBulkMessage archiveBulkMessage) { (14)
            return bulkAggregate.archiveBulk(new ArchiveBulkCommand(
                            BulkId.of(archiveBulkMessage.getBulkId())))
                    .thenCompose(archiveBulkResponse -> sendStatusMessage(archiveBulkResponse, request));
        } else if (request instanceof CompleteBulkMessage completeBulkMessage) { (15)
            return bulkAggregate.completeBulk(new CompleteBulkCommand(
                            BulkId.of(completeBulkMessage.getBulkId())))
                    .thenCompose(completeBulkResponse -> sendStatusMessage(completeBulkResponse, request));
        }

        log.warn("Unhandled BulkIngestionMessage: {}", request);
        return CompletableFuture.completedStage(null);
    }
1 Crear Comando Masivo- crea un nuevo lote listo para que se añadan componentes
2 Agregar Comando De Componente- agrega un componente a un volumen ya abierto
3 Agregar Componente Con Comando Auto Crear- agrega un componente a un lote; si el lote no está creado, se creará uno nuevo automáticamente.
4 Actualizar Comando De Componente- actualiza un componente ya almacenado en un lote
5 Cerrar Comando Masivo- cierra un lote para que no se puedan añadir más componentes a él
6 Eliminar Comando De Componente- elimina un componente de un lote
7 Finalizar Comando Masivo- Una vez que un lote está cerrado y antes de que se escriba en el archivo, este comando permitirá que se actualice el encabezado (es decir, calcular totales y sumas de verificación, poblar fechas de creación, etc.).
8 Terminar Comando Masivo- Elimina un volumen abierto o cerrado.
9 Obtener Informe Masivo Comando- devuelve el tamaño del volumen y el número de componentes en cada nivel
10 Configurar Comando Masivo- este comando es procesado por el recurringBulkAggregate
11 Agregar Comando De Componente- este comando es procesado por el recurringBulkAggregate
12 Obtener Comando Masivo Abierto Actual- este comando es procesado por el recurringBulkAggregate
13 Rechazar Comando Masivo- rechaza un lote finalizado y elimina un archivo de lote producido
14 Comando Archivo Masivo- copia el archivo en bloque producido desde su ubicación de salida al archivo
15 Comando De Carga Completa- completa un lote finalizado

El agregado masivo recurrente es responsable de gestionar cargas individuales.

Mensajes de Solicitud Soportados

Se pueden encontrar más detalles sobre los mensajes de solicitud admitidos.aquí.

Configuración Aplicable

Config

Tipo

Comentario

Predeterminado

ipf.bulker.bulk-ingestion.kafka.consumer.topic

Cadena

El tema es escuchar para BulkIngestionMessage solicitudes

BULK_INGESTION_REQUEST