Documentation for a newer release is available. View Latest

Interactuando con el Bulker

Hay dos formas de interactuar con el Bulker:

  • Integrado (Embedded)

  • Externo

Integrado

Para acceder a la funcionalidad de bulk desde dentro de la misma aplicación (embedded), añade el siguiente módulo

<dependency>
    <groupId>com.iconsolutions.ipf.bulk</groupId>
    <artifactId>ipf-bulker-aggregate-akka</artifactId>
</dependency>

Esto proporciona la API (desde ipf-bulker-aggregate-api) y una implementación Akka por defecto para el Bulk Aggregate. Más detalles de las operaciones disponibles a través de esta API están disponibles en Bulk Aggregate.

Externo

Para procesar solicitudes desde fuera de la aplicación para realizar operaciones del Bulker, incluye el siguiente módulo:

<dependency>
    <groupId>com.iconsolutions.ipf.bulk</groupId>
    <artifactId>ipf-bulker-ingestion-connector-kafka</artifactId>
</dependency>

Esto proporciona un ReceiveConnector que escucha en un tópico de Kafka las solicitudes para realizar operaciones de Bulk. Ahora, se debe proporcionar una implementación de BulkIngestionReceiveClientAdapter en el contexto de la aplicación para manejar los diferentes tipos de mensajes de solicitud soportados que se detallan a continuación.

A continuación se proporciona una implementación de ejemplo; este ejemplo también muestra cómo cada tipo de mensaje se asigna a la llamada correspondiente del bulk aggregate:

BulkIngestionReceiveAdapter.java
public class BulkIngestionReceiveAdapter implements BulkIngestionReceiveClientAdapter {

    private final BulkAggregate bulkAggregate;
    private final RecurringBulkAggregate recurringBulkAggregate;
    private final BulkStatusAdapter bulkStatusAdapter;

    @Override
    public ProcessingContext determineContextFor(BulkIngestionMessage request) {
        return ProcessingContext.builder()
                .clientRequestId(request.getRequestId())
                .associationId(AssociationId.unknown())
                .unitOfWorkId(UnitOfWorkId.createRandom())
                .build();
    }

    @Override
    public CompletionStage<Void> handle(ReceivingContext context, BulkIngestionMessage request) {
        if (request instanceof CreateBulkMessage createBulkMessage) { (1)
            CreateBulkCommand createBulkCommand = new CreateBulkCommand(BulkId.of(createBulkMessage.getBulkId()),
                    createBulkMessage.getConfigName());
            return bulkAggregate.createBulk(createBulkCommand)
                    .thenCompose(bulkIdResponse -> sendStatusMessage(bulkIdResponse, request));
        } else if (request instanceof AddComponentMessage addComponentMessage) { (2)
            return bulkAggregate.addComponent(
                    new AddComponentCommand(BulkId.of(addComponentMessage.getBulkId()),
                            BulkComponentId.of(addComponentMessage.getParentId()),
                            BulkComponentId.of(addComponentMessage.getComponentId()),
                            addComponentMessage.getPath(),
                            addComponentMessage.getContent())
            ).thenCompose(response -> sendStatusMessage(response, request));
        } else if (request instanceof AddComponentWithBulkAutoCreateMessage addComponentMessage) { (3)
            return bulkAggregate.addComponent(
                    new AddComponentWithAutoCreateCommand(BulkId.of(addComponentMessage.getBulkId()),
                            BulkComponentId.of(addComponentMessage.getParentId()),
                            BulkComponentId.of(addComponentMessage.getComponentId()),
                            addComponentMessage.getPath(),
                            addComponentMessage.getContent(),
                            addComponentMessage.getConfigName())
            ).thenCompose(response -> sendStatusMessage(response, request));
        } else if (request instanceof UpdateComponentMessage updateComponentMessage) { (4)
            return bulkAggregate.updateComponent(
                    new UpdateComponentCommand(BulkId.of(updateComponentMessage.getBulkId()),
                            BulkComponentId.of(updateComponentMessage.getComponentId()),
                            updateComponentMessage.getContent())
            ).thenCompose(response -> sendStatusMessage(response, request));
        } else if (request instanceof CloseBulkMessage closeBulkMessage) { (5)
            return bulkAggregate.closeBulk(new CloseBulkCommand(BulkId.of(closeBulkMessage.getBulkId())))
                    .thenCompose(response -> sendStatusMessage(response, request));
        } else if (request instanceof RemoveComponentMessage removeComponentMessage) { (6)
            return bulkAggregate.removeComponent(new RemoveComponentCommand(BulkId.of(removeComponentMessage.getBulkId()),
                            BulkComponentId.of(removeComponentMessage.getComponentId())))
                    .thenCompose(response -> sendStatusMessage(response, request));
        } else if (request instanceof FinaliseBulkMessage finaliseBulkMessage) { (7)
            return bulkAggregate.finaliseBulk(new FinaliseBulkCommand(BulkId.of(finaliseBulkMessage.getBulkId()), finaliseBulkMessage.getBulkFileName()))
                    .thenCompose(response -> sendStatusMessage(response, request))
                    .thenApply(ignored -> null);
        } else if (request instanceof TerminateBulkMessage terminateBulkMessage) { (8)
            return bulkAggregate.terminateBulk(new TerminateBulkCommand(BulkId.of(terminateBulkMessage.getBulkId())))
                    .thenCompose(response -> sendStatusMessage(response, request));
        } else if (request instanceof GetBulkReportMessage getBulkReportMessage) { (9)
            return bulkAggregate.getBulkReport(new GetBulkReportCommand(BulkId.of(getBulkReportMessage.getBulkId())))
                    .thenCompose(response -> sendStatusMessage(response, request));
        } else if (request instanceof CreateRecurringBulkMessage createRecurringBulkMessage) { (10)
            return recurringBulkAggregate.configureBulk(
                    new ConfigureBulkCommand(
                            BulkId.of(createRecurringBulkMessage.getBulkId()), createRecurringBulkMessage.getConfigName()
                    )
            ).thenCompose(currentOpenBulkResponse -> sendStatusMessage(currentOpenBulkResponse, request));
        } else if (request instanceof AddRecurringBulkComponentMessage addRecurringBulkComponentMessage) { (11)
            return recurringBulkAggregate.addComponent(
                    new com.iconsolutions.ipf.bulk.aggregate.api.recurring.command.AddComponentCommand(
                            CommandId.of(addRecurringBulkComponentMessage.getRequestId()),
                            BulkId.of(addRecurringBulkComponentMessage.getBulkId()),
                            BulkComponentId.of(addRecurringBulkComponentMessage.getComponentId()),
                            addRecurringBulkComponentMessage.getPath(),
                            addRecurringBulkComponentMessage.getContent(),
                            addRecurringBulkComponentMessage.getConfigName()
                    )
            ).thenCompose(recurringBulkComponentIdResponse -> sendStatusMessage(recurringBulkComponentIdResponse, request));
        } else if (request instanceof GetCurrentOpenBulkMessage getCurrentOpenBulkMessage) { (12)
            return recurringBulkAggregate.getCurrentOpenBulk(new GetCurrentOpenBulkCommand(BulkId.of(getCurrentOpenBulkMessage.getBulkId())))
                    .thenCompose(currentOpenBulkResponse -> sendStatusMessage(currentOpenBulkResponse, request));
        } else if (request instanceof RejectBulkMessage rejectBulkMessage) { (13)
            return bulkAggregate.rejectBulk(new RejectBulkCommand(
                            BulkId.of(rejectBulkMessage.getBulkId()),
                            rejectBulkMessage.getRejectBulkReason(),
                            rejectBulkMessage.getRejectDescription()))
                    .thenCompose(rejectBulkResponse -> sendStatusMessage(rejectBulkResponse, request));
        } else if (request instanceof ArchiveBulkMessage archiveBulkMessage) { (14)
            return bulkAggregate.archiveBulk(new ArchiveBulkCommand(
                            BulkId.of(archiveBulkMessage.getBulkId())))
                    .thenCompose(archiveBulkResponse -> sendStatusMessage(archiveBulkResponse, request));
        } else if (request instanceof CompleteBulkMessage completeBulkMessage) { (15)
            return bulkAggregate.completeBulk(new CompleteBulkCommand(
                            BulkId.of(completeBulkMessage.getBulkId())))
                    .thenCompose(completeBulkResponse -> sendStatusMessage(completeBulkResponse, request));
        }

        log.warn("Unhandled BulkIngestionMessage: {}", request);
        return CompletableFuture.completedStage(null);
    }
1 CreateBulkCommand - crea un nuevo bulk listo para agregarle componentes
2 AddComponentCommand - agrega un componente a un bulk ya abierto
3 AddComponentWithAutoCreateCommand - agrega un componente a un bulk; si el bulk no existe, se creará uno nuevo automáticamente
4 UpdateComponentCommand - actualiza un componente ya almacenado en un bulk
5 CloseBulkCommand - cierra un bulk para que no se le puedan añadir más componentes
6 RemoveComponentCommand - elimina un componente de un bulk
7 FinaliseBulkCommand - una vez que un bulk está cerrado y antes de que se escriba en archivo, este comando permite actualizar el encabezado (por ejemplo, calcular totales y checksums, completar fechas de creación, etc.)
8 TerminateBulkCommand - elimina un bulk abierto o cerrado
9 GetBulkReportCommand - devuelve el tamaño del bulk y el número de componentes en cada nivel
10 ConfigureBulkCommand - este comando es procesado por el recurringBulkAggregate
11 AddComponentCommand - este comando es procesado por el recurringBulkAggregate
12 GetCurrentOpenBulkCommand - este comando es procesado por el recurringBulkAggregate
13 RejectBulkCommand - rechaza un bulk finalizado y elimina el archivo de bulk producido
14 ArchiveBulkCommand - copia el archivo de bulk producido desde su ubicación de salida al archivo de archivo (archive)
15 CompleteBulkCommand - completa un bulk finalizado

El recurring bulk aggregate es responsable de gestionar bulks individuales.

Mensajes de solicitud soportados

Puedes encontrar más detalles de los mensajes de solicitud soportados aquí.

Configuración aplicable

Config

Type

Comentario

Predeterminado

ipf.bulker.bulk-ingestion.kafka.consumer.topic

String

El tópico donde se escuchan solicitudes de BulkIngestionMessage

BULK_INGESTION_REQUEST