Creando un Custom Configuración del Ingester

Configuración del componente

Con el fin de ingerir/importar configuraciones en el dynamic processing settings plataforma que necesita configurar lo siguiente:

  • Ingesta de Archivos Cluster Singleton

  • ReceiveConnector/SendConnector

  • Connector Transport

  • Convertidor de Archivos

  • Procesador de Archivos

  • Definición del Proceso

  • Manejador de Entradas

  • Habilitar/Deshabilitar el procesamiento de SWIFT archivos

A continuación se incluye un ejemplo de configuración para la ingestión de los ajustes de IbanPlus. Componentes similares tendrían que configurarse si usted desea establecer la ingesta de configuraciones desde una nueva fuente.

Ingesta de Archivos Cluster Singleton

El cluster singleton acepta el restart ajustes del singleton actor a través de la configuración. El cluster singleton acepta el restart ajustes del singleton actor a través de la configuración.

ipf.csm-reachability.ingestion {
  restart-settings {
    min-backoff = 3s
    max-backoff = 30s
    random-factor = 0.25
    max-restarts = 20
    max-restarts-within = 30m
  }
}

ReceiveConnector/SendConnector

Dependiendo de cómo se deba consumir la configuración, ya sea un SendConnector o se debe definir un ReceiveConnector. Al consumir archivos de un directorio local, se definiría un ReceiveConnector. Sin embargo, si consume configuraciones de un REST API, esto sería un SendConnector.

        receiveConnector = ReceiveConnectorBuilderHelper.<IngestedFile>defaultBuilder(name + " File Ingestion Connector", actorSystem)
                .withConnectorTransport(localDirectoryConnectorTransport)
                .withReceiveTransportMessageConverter(ingestedFileReceiveTransportMessageConverter)
                .withProcessingContextExtractor(connectorMessage -> InitiatingProcessingContextExtractor.<IngestedFile>builder().build().extract(connectorMessage))
                .withReceiveHandler((aggregateId, payload) -> {
                    return processManager.process(payload)
                            .toFuture()
                            .whenComplete(((processedEntryResponses, throwable) -> {
                                if (throwable != null) {
                                    log.error("Error occurred during processing of " + name + " File", throwable);
                                } else {
                                    log.info("Successfully processed {} File", name);
                                }
                            })).thenApply(done -> null);
                })
                .withLoggingErrorHandler(exception -> {
                    log.error("Exception occurred during file ingestion for file : " + name, exception);
                    return CompletableFuture.completedStage(null);
                })
                .build();

Connector Transport

Una vez definido, deberá ser referenciado en el Conector. Los Conectores de Transporte soportados incluyen, LocalDirectory (archivo), Http,Kafka y JMS.

Al definir un LocalDirectoryConnectorTransport, un nuevo directorio mapping se deberá añadir la entrada:

ipf.file-ingestion.directory-mappings += {
    directory-id = "your-ingester-id" (1)
    # has to match the job name used in the ProcessDefinition below
    job-name = "your-ingester-job-name" (2)
}

<1>`directory-id` representa un identificador de directorio de la configuración del LocalDirectoryConnectorTransport. <2>`job-name` representa el nombre del trabajo asociado con el ProcessDefinition#jobName

    @Bean
    LocalDirectoryConnectorTransport ibanPlusLocalDirectoryConnectorTransport(EventBus eventBus,
                                                                              DirectoryConfiguration directoryConfiguration,
                                                                              List<ProcessDefinition<? extends CanonicalFile, ? extends CanonicalFileEntry>> processDefinitions) {

        var errorHandler = new FileHandlerCommonIngestionDefaultReceiveErrorHandler(eventBus,
                directoryConfiguration,
                processDefinitions,
                actorSystem,
                FILE_INGESTION_CONFIG_ROOT_PATH);

        return LocalDirectoryConnectorTransport.builder()
                .withActorSystem(actorSystem)
                .withConfigRootPath(FILE_INGESTION_CONFIG_ROOT_PATH)
                .withFileIngestionConfiguration(FileIngestionConfiguration.create(
                        FILE_INGESTION_CONFIG_ROOT_PATH,
                        actorSystem.classicSystem().settings().config()
                ))
                .withName("Iban Plus Local Directory Transport")
                .withTransportErrorHandler(errorHandler)
                .build();
    }

Convertidor de Archivos

Esto es responsable de convertir la fuente de ingestión propietaria (por ejemplo, archivo) en una representación canónica (IbanPlusFile).

@RequiredArgsConstructor
public class IbanPlusFileConverter implements FileConverter<IbanPlusFile> {

    private final XmlMapper xmlObjectMapper;
    private final IbanPlusLineParser ibanPlusLineParser;

    @Override
    public IbanPlusFile convert(IngestedFile ingestedFile) {

        List<IbanPlusEntry> entries;
        FileType fileType;

        if ("xml".equalsIgnoreCase(ingestedFile.getMetaData().getExtension())) {
            Dataexport dataexport = parseRootObjectXml(ingestedFile);
            entries = getFileEntries(dataexport);
            fileType = StringUtils.equalsIgnoreCase(dataexport.getFiletype(), FileType.FULL.name()) ? FileType.FULL : FileType.DELTA;

        } else {
            entries = getTxtFileEntries(ingestedFile);
            fileType = StringUtils.containsIgnoreCase(ingestedFile.getMetaData().getName(), FileType.FULL.name())? FileType.FULL : FileType.DELTA;
        }

        CanonicalFileMetaData metaData = CanonicalFileMetaData.builder()
                .name(ingestedFile.getMetaData().getName())
                .dateTime(ingestedFile.getMetaData().getDateTime())
                .extension(ingestedFile.getMetaData().getExtension())
                .build();

        return new IbanPlusFile(metaData, entries, fileType);
    }

    private List<IbanPlusEntry> getFileEntries(Dataexport dataexport) {
        return dataexport.getIbanplusV3s()
                .stream()
                .map(ibanplusV3 -> IbanPlusEntry.builder()
                        .modificationFlag(ibanplusV3.getModificationFlag())
                        .ibanBic(ibanplusV3.getIbanBic())
                        .ibanIsoCountryCode(ibanplusV3.getIbanIsoCountryCode())
                        .isoCountryCode(ibanplusV3.getIsoCountryCode())
                        .ibanNationalId(ibanplusV3.getIbanNationalId())
                        .institutionName(ibanplusV3.getInstitutionName())
                        .routingBic(ibanplusV3.getRoutingBic())
                        .serviceContext(ibanplusV3.getServiceContext())
                        .build())
                .collect(Collectors.toList());
    }

    private List<IbanPlusEntry> getTxtFileEntries(IngestedFile ingestedFile) {
        String rawText = parseRootObjectTxt(ingestedFile);
        List<String> textRows = List.of(rawText.split(System.lineSeparator()));
        String resultsRow = textRows.stream()
                .findFirst()
                .orElseThrow(() -> new IconRuntimeException(String.format("Row missing or incomplete in ingested file: %s", ingestedFile.getMetaData().getName())));

        List<String> entryLines = textRows.subList(textRows.indexOf(resultsRow) + 1, textRows.size());
        return entryLines.stream()
                .map(ibanPlusLineParser::parseLine)
                .filter(Optional::isPresent)
                .map(Optional::get)
                .collect(Collectors.toList());
    }

    @SneakyThrows
    private Dataexport parseRootObjectXml(IngestedFile ingestedFile) {
        InputStream inputStream = new ByteArrayInputStream(ingestedFile.getContent());
        return xmlObjectMapper.readValue(inputStream, Dataexport.class);
    }

    @SneakyThrows
    private static String parseRootObjectTxt(IngestedFile ingestedFile) {
        InputStream inputStream = new ByteArrayInputStream(ingestedFile.getContent());
        return IOUtils.toString(inputStream, StandardCharsets.UTF_8);
    }
}
    @Bean
    IbanPlusFileConverter ibanPlusFileConverter(XmlMapper xmlObjectMapper, IbanPlusLineParser ibanPlusLineParser) {
        return new IbanPlusFileConverter(xmlObjectMapper, ibanPlusLineParser);
    }

Procesador de Archivos

Divide el archivo que se está procesando en entradas individuales (configuraciones) que pueden ser procesadas por el OutputEntryHandler.

@Slf4j
public class IbanPlusFileProcessor extends AbstractSwiftFileProcessor<IbanPlusFile, IbanPlusEntry, IbanPlus> {

    private final IbanPlusQuery ibanPlusQuery;

    public IbanPlusFileProcessor(OutputEntryHandler<IbanPlusEntry> entryHandler,
                                 IbanPlusQuery ibanPlusQuery,
                                 SettingToCanonicalFileEntryConverter<IbanPlus> settingToCanonicalFileEntryConverter,
                                 SettingQuery settingQuery,
                                 Integer limitRate,
                                 Integer parallelism,
                                 Duration throttleDuration,
                                 Integer readQueryBatchSize,
                                 ClassicActorSystemProvider actorSystem,
                                 EventBus eventBus) {
        super(entryHandler, settingToCanonicalFileEntryConverter,
                limitRate, parallelism, throttleDuration, readQueryBatchSize,
                settingQuery, actorSystem, eventBus);
        this.ibanPlusQuery = ibanPlusQuery;
    }

    @Override
    protected String getPersistenceId(IbanPlusEntry entry) {
        return IdProvider.getPersistenceId("ibanplus", getSettingId(entry));
    }

    @Override
    protected String getSettingId(IbanPlusEntry entry) {
        return entry.getIsoCountryCode() + "-" +  entry.getIbanIsoCountryCode() + "-" + entry.getIbanNationalId();
    }

    @Override
    protected boolean isFullFileImport(IbanPlusFile aFile) {
        return aFile.getFileType() == FileType.FULL;
    }

    @Override
    protected CompletionStage<Response<SettingsDTO<IbanPlus>>> getBatchedSettings(List<String> idList, Integer paginationSize) {
        return ibanPlusQuery.getBatchIbanPlus(idList, paginationSize);
    }

    @Override
    protected boolean shouldBeDeleted(IbanPlusEntry entry) {
        return entry.getModificationFlag().equals("D");
    }

    @Override
    protected boolean shouldBeUpserted(IbanPlusEntry entry) {
        return entry.getModificationFlag().equals("A") || entry.getModificationFlag().equals("M");
    }
}
    @Bean
    FileProcessor<IbanPlusFile, IbanPlusEntry> ibanPlusFileProcessor(OutputEntryHandler<IbanPlusEntry> ibanPlusEntryHandler,
                                                                     IbanPlusQuery ibanPlusQuery,
                                                                     SettingToCanonicalFileEntryConverter<IbanPlus> settingToCanonicalFileEntryConverter,
                                                                     SettingQuery settingQuery,
                                                                     @Value("${ipf.csm-reachability.settings-api.limit-rate}") Integer limitRate,
                                                                     @Value("${ipf.csm-reachability.settings-api.iban-plus.parallelism}") Integer parallelism,
                                                                     @Value("${ipf.csm-reachability.settings-api.throttle-duration}") Duration throttleDuration,
                                                                     @Value("${ipf.csm-reachability.settings-api.read-query-batch-size.iban-plus-processor}") Integer readQueryBatchSize,
                                                                     EventBus eventBus) {
        return new IbanPlusFileProcessor(ibanPlusEntryHandler, ibanPlusQuery, settingToCanonicalFileEntryConverter, settingQuery, limitRate, parallelism, throttleDuration, readQueryBatchSize, actorSystem, eventBus);
    }

    @Bean
    FilePostProcessor ibanPlusFilePostProcessor(SettingQuery settingQuery,
                                                IbanPlusQuery ibanPlusQuery,
                                                @Value("${ipf.csm-reachability.settings-api.limit-rate}") Integer limitRate,
                                                @Value("${ipf.csm-reachability.settings-api.throttle-duration}") Duration throttleDuration) {
        return new IbanPlusFilePostProcessor(ibanPlusQuery, settingQuery, limitRate, throttleDuration, actorSystem);
    }

Definición del Proceso

Asocia el tipo de archivo canónico a un procesador de archivos y a un convertidor de archivos, así como opcionalmente proporciona una condición de predicado sobre cuándo procesar desde la fuente de ingestión.

    @Bean
    ProcessDefinition<IbanPlusFile, IbanPlusEntry> ibanPlusFileProcessDefinition(FileProcessor<IbanPlusFile, IbanPlusEntry> ibanPlusFileProcessor,
                                                                                 FilePostProcessor ibanPlusFilePostProcessor,
                                                                                 IbanPlusFileConverter ibanPlusFileConverter) {
        return ProcessDefinition.<IbanPlusFile, IbanPlusEntry>builder()
                .processName("IbanPlus Ingestion File Process")
                .jobName("IbanPlus import")
                .processGate(basicFileMetaData -> ValidateFileUtils.validateFileType(basicFileMetaData.getExtension()))
                .fileProcessor(ibanPlusFileProcessor)
                .filePostProcessor(ibanPlusFilePostProcessor)
                .converter(ibanPlusFileConverter)
                .build();
    }

Manejador de Salida

Punto de integración entre la ingestión de archivos y setting management componentes. Invoca el Setting Management API para crear una configuración en el Dynamic Processing Settings plataforma (DPS).

La configuración para la comunicación directa con DPS(DPS is embedded dentro de la aplicación):

ipf.dps-api.client-type="direct"

La configuración para http comunicación con DPS(DPS es una aplicación independiente):

ipf.dps-api {
  client-type = "connector"
  http.client {
    host = "localhost"
    endpoint-url = "/settings-objects/"
    port = 8080
  }
}
Para comunicarse con el Setting Management API over HTTP, todos relacionados HTTP Las configuraciones del cliente se encuentran en el ipf.dps-api.http y ipf.dps-api.default-connector configuraciones.
    @Bean
    @ConditionalOnProperty(name = "ipf.csm-reachability.settings-api.file-handling.connection", havingValue = "http")
    OutputEntryHandler outputEntryHandlerViaHttp(DpsCrudClientPort dpsCrudClientPort,
                                                 DomainToTargetTypeConverter<CanonicalFileEntry, CreateSetting<?>> canonicalEntryToSetting) {
        log.info("Using http transport");
        return new CanonicalEntryHandlerViaHttp<>(dpsCrudClientPort, canonicalEntryToSetting);
    }

    @Bean
    @ConditionalOnProperty(name = "ipf.csm-reachability.settings-api.file-handling.connection", havingValue = "direct")
    OutputEntryHandler outputEntryHandlerDirect(ClassicActorSystemProvider actorSystem,
                                                DomainToTargetTypeConverter<CanonicalFileEntry, CreateSetting<?>> canonicalEntryToSetting,
                                                List<SettingDefinition<?>> settingDefinitions, DpsCrudClientPort dpsCrudClientPort) {
        log.info("Using direct transport");
        CanonicalEntryHandlerDirect.Settings settings = CanonicalEntryHandlerDirect.Settings.from(actorSystem);
        return new CanonicalEntryHandlerDirect<>(actorSystem, settings, canonicalEntryToSetting, settingDefinitions, dpsCrudClientPort);
    }

La configuración puede ser cargada a través de archivos, por ejemplo, IBANPlus, IBANStructure. Pero también puede ser consumida a través de HTTP/REST como es el caso con SIC

Ejemplo de configuración para consumir archivos del directorio local:

ipf.csm-reachability {
  participant {
    tips {
      process-participant.enabled = true
      file-ingestion {
        files-directory = "/import/csm-participant/eurosystem-tips-directory"
        directory-id = "TIPS"
        initial-delay = 5s
        interval = 30s
      }
    }

    rt1 {
      process-participant.enabled = true
      file-ingestion {
        files-directory = "/import/csm-participant/eba-rt1-routing-tables"
        directory-id = "RT1"
        initial-delay = 5s
        interval = 30s
      }
    }
  }

  iban-plus {
    enabled = true
    file-ingestion {
      files-directory = "IBANPLUS"
      files-directory = "/import/iban-plus-directory/swiftref-iban-plus"
      initial-delay = 5s
      interval = 30s
    }
  }

  iban-structure {
    enabled = true
    file-ingestion {
      files-directory = "/import/iban-plus-directory/swiftref-iban-structure"
      directory-id = "iban-structure"
      initial-delay = 5s
      interval = 1h
    }
  }

  party-entity.six-bankmaster {
    enabled = true
    file-ingestion {
      directory-id = "party-entity-bank-master"
      files-directory = "/import/party-entity-directory/six-bank-master-3.0/"
      initial-delay = 5s
      interval = 30s
    }
  }

  party-entity.swift-bankplus {
    enabled = true
    file-ingestion {
      directory-id = "party-entity-swift"
      files-directory = "/import/party-entity-directory/swiftref-bank-directory-plus"
      initial-delay = 5s
      interval = 30s
    }
  }

  exclusion-list {
    enabled = true
    file-ingestion {
      files-directory = "/import/iban-plus-directory/swiftref-exclusion-list"
      directory-id = "exclusion-list"
      initial-delay = 5s
      interval = 30s
    }
  }

  bic-dir-2018 {
    enabled = true
    file-ingestion {
      files-directory = "/import/bic-dir-2018/swiftref-bic-dir-2018"
      directory-id = "BICDIR2018"
      initial-delay = 5s
      interval = 30s
    }
  }

  party-entity.identifiers {
    enabled = true
    file-ingestion {
      directory-id = "party-entity-identifiers"
      files-directory = "/import/party-entity-directory/party-entity-identifiers/"
      initial-delay = 5s
      interval = 30s
    }
  }

  iban-structure.formats {
    enabled = true
    file-ingestion {
      directory-id = "iban-structure-formats"
      files-directory = "import/iban-plus-directory/iban-structure-formats/"
      initial-delay = 5s
      interval = 30s
    }
  }
}

Parámetro de Configuración

Descripción

directorio-de-archivos

Ubicación en el directorio local para sondear archivos

directory-id

Esto debe coincidir con una entrada de directoryID en el directorio.mapping configuración

retraso-inicial

Retraso antes de la primera encuesta

intervalo

Tiempo entre sondeos para archivos. Evite valores muy pequeños para el intervalo, ya que pueden provocar condiciones de carrera en los hilos.

Ejemplo de configuración para consumir archivos de Participante SIC a través de HTTP/REST:

ipf.csm-reachability.participant.sic {
  process-participant {
    # Specify where from SIC Participants can be fetched and processed.
    # Two modes are available: `http` and `file-ingestion`. By default,
    # http mode is enabled. Both modes are configured under `sic-participant` configuration section.
    mode = http

    # Enables SIC Participant processing by setting `true`
    # or disables it by setting `false` flag.
    enabled = true
  }

  resiliency-settings {
    minimum-number-of-calls = 10
    max-attempts = 10
    reset-timeout = 3s
    initial-retry-wait-duration = 1s
    backoff-multiplier = 2
  }

  # Configures HTTP client how to talk to six group bank master API
  http {
    client {
      endpoint-url = "https://api.six-group.com/api/epcd/bankmaster/v2/public"
    }
  }
}

Ejemplo de configuración para consumir archivos de Participantes SIC a través de la ingestión de archivos mediante un directorio local:

ipf.csm-reachability.participant.sic {
  process-participant {
    # Specify where from SIC Participants can be fetched and processed.
    # Two modes are available: `http` and `file-ingestion`. By default,
    # http mode is enabled. Both modes are configured under `sic-participant` configuration section.
    mode = file-ingestion

    # Enables SIC Participant processing by setting `true`
    # or disables it by setting `false` flag.
    enabled = true
  }

  resiliency-settings {
    minimum-number-of-calls = 10
    max-attempts = 10
    reset-timeout = 3s
    initial-retry-wait-duration = 1s
    backoff-multiplier = 2
  }

  # Configures ingesting SIC participants through a file
  file-ingestion {
    files-directory = "/import/csm-participant/six-bank-master-3.0/"
    directory-id = "SIC"
    initial-delay = 5s
    interval = 30s
  }
}

El scheduling configuraciones para HTTP se configuran por separado según:

ipf.csm-reachability.sic.scheduler-settings {
  initial-delay = 10s
  interval = 1d
}

Habilitar/Deshabilitar el procesamiento de archivos

Parámetro de configuración obsoleto

Parámetro de configuración

Descripción

Valor predeterminado

swift.ibanplus.process-ibanplus.enabled

ipf.csm-reachability.iban-plus.enabled

Habilite el procesamiento de Swift Archivo IBAN Plus

true

ibanstructure.process-ibanstructure.enabled

ipf.csm-reachability.iban-structure.enabled

Habilite el procesamiento de Swift Archivo de estructura de IBAN

true

swift.bankplus.procesar-directorio-bancario-plus.habilitado

ipf.csm-reachability.party-entity.swift-bankplus.enabled

Habilite el procesamiento de Swift Archivo del Directorio Bancario Plus

true

exclusionlist.procesar-exclusionlist.habilitado

ipf.csm-reachability.exclusion-list.enabled

Habilitar el procesamiento de Swift Archivo de la Lista de Exclusión

true

bic-dir-2018.proceso-bics.habilitado

ipf.csm-reachability.bic-dir-2018.enabled

Habilite el procesamiento de Swift Archivo BICDir2018

true

party-entity.six.bankmaster.procesar-maestro-bancario.habilitado

ipf.csm-reachability.party-entity.six-bankmaster.enabled

Habilite el procesamiento del archivo maestro de Six Bank

true

tips.process-participant.enabled

ipf.csm-reachability.participant.tips.process-participant.enabled

Habilite el procesamiento del archivo de participantes de TIPS

true

rt1.proceso-participante.habilitado

ipf.csm-reachability.participant.rt1.process-participant.enabled

Habilite el procesamiento del archivo de participantes RT1

true

paso2.participante-proceso.habilitado

ipf.csm-reachability.participant.step2.process-participant.enabled

Habilite el procesamiento del archivo de participantes de STEP2

true

sic.proces-participante.habilitado

ipf.csm-reachability.participant.sic.process-participant.enabled

Habilite el procesamiento del archivo de participantes SIC

true

no propiedad obsoleta

ipf.csm-reachability.party-entity.identifiers.enabled

Habilite el procesamiento del archivo IDENTIFICADORES-ALL de SwiftRef

true

no propiedad obsoleta

ipf.csm-reachability.iban-structure.formats.enabled

Habilite el procesamiento de archivos SwiftRef FORMATS-ALL/FORMATS-CTRY

true

Límites de tasas de ingestión de archivos y configuración de limitación

Parámetros de configuración antiguos que estaban bajo settings-api están ahora obsoletos y scheduled para su eliminación en futuras versiones. Esta configuración está ahora bajo ipf.csm-reachability.settings-api y sigue el estándar IPF.

Parámetro de Configuración

Descripción

Valor por defecto

ipf.csm-reachability.settings-api.limit-rate

Limita el rendimiento a un número específico de registros consumidos. Cuando este valor se establece,throttle-duration debe también ser proporcionado.

ipf.csm-reachability.settings-api.throttle-duration

Se utiliza con limit-rate para establecer la tasa máxima para consumir registros; Especificado como duración, p. ej. 1s, 1min

ipf.csm-reachability.settings-api.read-query-batch-size.participant-processor

Número máximo de configuraciones a obtener de API llamada.

1000

ipf.csm-reachability.settings-api.read-query-batch-size.bicDir2018-processor

Número máximo de configuraciones a obtener de API llamada.

1000

ipf.csm-reachability.settings-api.exclusion-list.parallelism

Número máximo de lotes paralelos API llamadas.

1

ipf.csm-reachability.settings-api.read-query-batch-size.exclusion-list-processor

Número máximo de configuraciones a obtener de API llamada.

1000

ipf.csm-reachability.settings-api.iban-plus.parallelism

Número máximo de lotes paralelos API llamadas.

1

ipf.csm-reachability.settings-api.read-query-batch-size.iban-plus-processor

Número máximo de configuraciones a obtener de API llamada.

1000

ipf.csm-reachability.settings-api.read-query-batch-size.iban-structure-processor

Número máximo de configuraciones a obtener de API llamada.

1000

ipf.csm-reachability.settings-api.party-entity.parallelism

Número máximo de lotes paralelos API llamadas.

1

ipf.csm-reachability.settings-api.read-query-batch-size.party-entity-processor

Número máximo de configuraciones a obtener de API llamada.

1000

ipf.csm-reachability.settings-api.direct.request-queue-size

Cola de búfer

3000

ipf.csm-reachability.settings-api.direct.throttle-elements

Limita el rendimiento a un número específico de registros consumidos. Cuando este valor está establecido,throttle-duration también debe ser proporcionado. Cuando 0, entonces está apagado.

0

ipf.csm-reachability.settings-api.direct.throttle-duration

Se utiliza con 'throttle-elements' para establecer la tasa máxima de consumo de registros. Cuando es 0, está desactivado.

0

ipf.csm-reachability.settings-api.direct.parallelism

Número máximo de paralelos API llamadas.

3

El csm-el-archivo-de-notificación-de-alcance-s3 está configurado de acuerdo con:

ipf.file-manager.s3 {
  region = "us-east-1"
  upload-parallelism = 1
  credentials {
    access-key-id = "accessKey"
    secret-access-key = "secretAccessKey"
  }
  resiliency-settings {
    # Determines the maximum number of retries to be made. Note that this includes the first failed attempt.
    max-attempts = 2
    # Retry if HTTP error code is in the list
    retryable-status-codes = [500, 503]
    attempt-timeout = 2s
    call-timeout = 3s
  }
}

ipf.file-manager.s3 {
  region = "us-east-1"
  upload-parallelism = 1
  credentials {
    access-key-id = "accessKey"
    secret-access-key = "secretAccessKey"
  }

  resiliency {
    retry = 3
    api-call-timeout = 10s # duration
  }
}

Servicio de notificación de ingestión de archivos configuración

El Servicio de Notificación de Ingesta de Archivos admite los siguientes formatos:

  • Participante de CSM - RT1 (Participantes Directos e Indirectos)

  • Participante de CSM - PASO2 SCT (Participantes Directos e Indirectos)

  • Participante de CSM - directorio TIPS

  • Participante de CSM - SicInst

  • Directorio de Entidades de Parte - Banco Master 3. 0

  • Directorio de Entidades de Parte - Directorio Bancario Plus

  • Directorio de Entidades de Parte - Identificadores de SwiftRef Todos

  • IBAN Plus - Directorio Iban Plus

  • Estructura del IBAN - Directorio Iban Plus

  • Estructura del IBAN - SwiftRef Formatos de Archivos

  • Lista de Exclusión - Directorio Iban Plus

  • Bic Dir 2018 - Bic Dir 2018

La función de ingestión de archivos desde AWS S3 puede ser habilitada/deshabilitada configurando ipf.csm-reachability.file-ingestion.s3.enabled propiedad a verdadero/falso. Está configurado como falso por defecto. Habilitar o deshabilitar la función de ingestión de archivos s3 no afecta la ingestión de archivos del directorio local.

La configuración para fileProcessedNotificationSendConnector y fileAvailableNotificationReceiveConnector puede ser anulada en las rutas respectivas, ipf.csm-reachability.file-ingestion.notification-service.connector.file-processed`y `ipf.csm-reachability.file-ingestion.notification-service.connector.file-available.

El csm-el servicio de notificación de ingestión de archivos de alcanzabilidad está configurado de acuerdo con:

akka {
  kafka {
    consumer {
      kafka-clients {
        bootstrap.servers = "kafka:9092"
      }
      restart-settings = ${common-flow-restart-settings}
    }
    producer {
      kafka-clients {
        bootstrap.servers = "kafka:9092"
      }
      restart-settings = ${common-flow-restart-settings}
    }
  }
}

ipf.csm-reachability {

  file-ingestion {
    s3.enabled = false

    notification-service {
      kafka {
        producer {
          topic = FILE_PROCESSED_NOTIFICATION
          restart-settings = ${common-flow-restart-settings}
          kafka-clients {
            group.id = file-processing-notification-group
          }
        }
        consumer {
          topic = FILE_AVAILABLE_NOTIFICATION
          restart-settings = ${common-flow-restart-settings}
          kafka-clients {
            group.id = file-available-notification-group
          }
        }
      }

      default-file-ingestion {
        # path which should be overriden
        files-directory = "/import"
        initial-delay = 5s
        interval = 30s
        timestamp-archived-and-failed-files = true
      }
      # directory id values for file ingestion. By default they are referencing respective values from local directory configuration
      directory-ids {
        RT02SCI = [${?ipf.csm-reachability.participant.rt1.file-ingestion.directory-id}]
        S204SCT = [${?ipf.csm-reachability.participant.step2.file-ingestion.directory-id}]
        TIPS = [${?ipf.csm-reachability.participant.tips.file-ingestion.directory-id}]
        agreements = [${?ipf.csm-reachability.participant.stet.file-ingestion.directory-id}]
        bankmaster = [
          ${?ipf.csm-reachability.party-entity.six-bankmaster.file-ingestion.directory-id},
          ${?ipf.csm-reachability.participant.sic.file-ingestion.directory-id}
        ]
        IDENTIFIERS-ALL = [${?ipf.csm-reachability.party-entity.identifiers.file-ingestion.directory-id}]
        FORMATS = [${?ipf.csm-reachability.iban-structure.formats.file-ingestion.directory-id}]
        BANKDIRECTORYPLUS = [${?ipf.csm-reachability.party-entity.swift-bankplus.file-ingestion.directory-id}]
        IBANPLUS = [${?ipf.csm-reachability.iban-plus.file-ingestion.directory-id}]
        IBANSTRUCTURE = [${?ipf.csm-reachability.iban-structure.file-ingestion.directory-id}]
        EXCLUSIONLIST = [${?ipf.csm-reachability.exclusion-list.file-ingestion.directory-id}]
        BICDIR2018 = [${?ipf.csm-reachability.bic-dir-2018.file-ingestion.directory-id}]
      }
    }
  }
}

common-flow-restart-settings {
  min-backoff = 1s
  max-backoff = 5s
  random-factor = 0.25
  max-restarts = 5
  max-restarts-within = 10m
}