Documentation for a newer release is available. View Latest

Creación de un ingestor de Settings personalizado

Configuración de componentes

Para ingerir/importar settings en la plataforma de dynamic processing settings debes configurar lo siguiente:

  • File Ingestion Cluster Singleton

  • ReceiveConnector/SendConnector

  • Connector Transport

  • File Converter

  • File Processor

  • Process Definition

  • Entry Handler

  • Habilitar/Deshabilitar el procesamiento de archivos SWIFT

A continuación se incluye un ejemplo de configuración para ingerir settings de IbanPlus. Se deberían configurar componentes similares si quisieras configurar la ingesta de settings desde una nueva fuente.

File Ingestion Cluster Singleton

El cluster singleton acepta los restart settings del actor singleton vía configuración. El cluster singleton acepta los restart settings del actor singleton vía configuración.

ipf.csm-reachability.ingestion {
  restart-settings {
    min-backoff = 3s
    max-backoff = 30s
    random-factor = 0.25
    max-restarts = 20
    max-restarts-within = 30m
  }
}

ReceiveConnector/SendConnector

Según cómo se vaya a consumir la configuración, se debe definir un SendConnector o un ReceiveConnector. Al consumir archivos desde un directorio local, se definiría un ReceiveConnector. Sin embargo, si se consumen settings desde una REST API, sería un SendConnector.

        receiveConnector = ReceiveConnectorBuilderHelper.<IngestedFile>defaultBuilder(name + " File Ingestion Connector", actorSystem)
                .withConnectorTransport(localDirectoryConnectorTransport)
                .withReceiveTransportMessageConverter(ingestedFileReceiveTransportMessageConverter)
                .withProcessingContextExtractor(connectorMessage -> InitiatingProcessingContextExtractor.<IngestedFile>builder().build().extract(connectorMessage))
                .withReceiveHandler((aggregateId, payload) -> {
                    return processManager.process(payload)
                            .toFuture()
                            .whenComplete(((processedEntryResponses, throwable) -> {
                                if (throwable != null) {
                                    log.error("Error occurred during processing of " + name + " File", throwable);
                                } else {
                                    log.info("Successfully processed {} File", name);
                                }
                            })).thenApply(done -> null);
                })
                .withLoggingErrorHandler(exception -> {
                    log.error("Exception occurred during file ingestion for file : " + name, exception);
                    return CompletableFuture.completedStage(null);
                })
                .build();

Connector Transport

Una vez definido, deberá referenciarse en el Connector. Los ConnectorTransports admitidos incluyen LocalDirectory (file), Http, Kafka y JMS.

Al definir un LocalDirectoryConnectorTransport, será necesario añadir una nueva entrada de mapeo de directorio:

ipf.file-ingestion.directory-mappings += {
    directory-id = "your-ingester-id" (1)
    # has to match the job name used in the ProcessDefinition below
    job-name = "your-ingester-job-name" (2)
}
1 directory-id representa un directory-id de la configuración de LocalDirectoryConnectorTransport.
2 job-name representa el nombre de job asociado con ProcessDefinition#jobName
    @Bean
    LocalDirectoryConnectorTransport ibanPlusLocalDirectoryConnectorTransport(EventBus eventBus,
                                                                              DirectoryConfiguration directoryConfiguration,
                                                                              List<ProcessDefinition<? extends CanonicalFile, ? extends CanonicalFileEntry>> processDefinitions) {

        var errorHandler = new FileHandlerCommonIngestionDefaultReceiveErrorHandler(eventBus,
                directoryConfiguration,
                processDefinitions,
                actorSystem,
                FILE_INGESTION_CONFIG_ROOT_PATH);

        return LocalDirectoryConnectorTransport.builder()
                .withActorSystem(actorSystem)
                .withConfigRootPath(FILE_INGESTION_CONFIG_ROOT_PATH)
                .withFileIngestionConfiguration(FileIngestionConfiguration.create(
                        FILE_INGESTION_CONFIG_ROOT_PATH,
                        actorSystem.classicSystem().settings().config()
                ))
                .withName("Iban Plus Local Directory Transport")
                .withTransportErrorHandler(errorHandler)
                .build();
    }

File Converter

Es responsable de convertir la fuente de ingesta propietaria (p. ej., archivo) en una representación canónica (IbanPlusFile)

@RequiredArgsConstructor
public class IbanPlusFileConverter implements FileConverter<IbanPlusFile> {

    private final XmlMapper xmlObjectMapper;
    private final IbanPlusLineParser ibanPlusLineParser;

    @Override
    public IbanPlusFile convert(IngestedFile ingestedFile) {

        List<IbanPlusEntry> entries;
        FileType fileType;

        if ("xml".equalsIgnoreCase(ingestedFile.getMetaData().getExtension())) {
            Dataexport dataexport = parseRootObjectXml(ingestedFile);
            entries = getFileEntries(dataexport);
            fileType = StringUtils.equalsIgnoreCase(dataexport.getFiletype(), FileType.FULL.name()) ? FileType.FULL : FileType.DELTA;

        } else {
            entries = getTxtFileEntries(ingestedFile);
            fileType = StringUtils.containsIgnoreCase(ingestedFile.getMetaData().getName(), FileType.FULL.name())? FileType.FULL : FileType.DELTA;
        }

        CanonicalFileMetaData metaData = CanonicalFileMetaData.builder()
                .name(ingestedFile.getMetaData().getName())
                .dateTime(ingestedFile.getMetaData().getDateTime())
                .extension(ingestedFile.getMetaData().getExtension())
                .build();

        return new IbanPlusFile(metaData, entries, fileType);
    }

    private List<IbanPlusEntry> getFileEntries(Dataexport dataexport) {
        return dataexport.getIbanplusV3s()
                .stream()
                .map(ibanplusV3 -> IbanPlusEntry.builder()
                        .modificationFlag(ibanplusV3.getModificationFlag())
                        .ibanBic(ibanplusV3.getIbanBic())
                        .ibanIsoCountryCode(ibanplusV3.getIbanIsoCountryCode())
                        .isoCountryCode(ibanplusV3.getIsoCountryCode())
                        .ibanNationalId(ibanplusV3.getIbanNationalId())
                        .institutionName(ibanplusV3.getInstitutionName())
                        .routingBic(ibanplusV3.getRoutingBic())
                        .serviceContext(ibanplusV3.getServiceContext())
                        .build())
                .collect(Collectors.toList());
    }

    private List<IbanPlusEntry> getTxtFileEntries(IngestedFile ingestedFile) {
        String rawText = parseRootObjectTxt(ingestedFile);
        List<String> textRows = List.of(rawText.split(System.lineSeparator()));
        String resultsRow = textRows.stream()
                .findFirst()
                .orElseThrow(() -> new IconRuntimeException(String.format("Row missing or incomplete in ingested file: %s", ingestedFile.getMetaData().getName())));

        List<String> entryLines = textRows.subList(textRows.indexOf(resultsRow) + 1, textRows.size());
        return entryLines.stream()
                .map(ibanPlusLineParser::parseLine)
                .filter(Optional::isPresent)
                .map(Optional::get)
                .collect(Collectors.toList());
    }

    @SneakyThrows
    private Dataexport parseRootObjectXml(IngestedFile ingestedFile) {
        InputStream inputStream = new ByteArrayInputStream(ingestedFile.getContent());
        return xmlObjectMapper.readValue(inputStream, Dataexport.class);
    }

    @SneakyThrows
    private static String parseRootObjectTxt(IngestedFile ingestedFile) {
        InputStream inputStream = new ByteArrayInputStream(ingestedFile.getContent());
        return IOUtils.toString(inputStream, StandardCharsets.UTF_8);
    }
}
    @Bean
    IbanPlusFileConverter ibanPlusFileConverter(XmlMapper xmlObjectMapper, IbanPlusLineParser ibanPlusLineParser) {
        return new IbanPlusFileConverter(xmlObjectMapper, ibanPlusLineParser);
    }

File Processor

Divide el archivo que se está procesando en entradas individuales (settings) que luego pueden ser procesadas por el OutputEntryHandler

@Slf4j
public class IbanPlusFileProcessor extends AbstractSwiftFileProcessor<IbanPlusFile, IbanPlusEntry, IbanPlus> {

    private final IbanPlusQuery ibanPlusQuery;

    public IbanPlusFileProcessor(OutputEntryHandler<IbanPlusEntry> entryHandler,
                                 IbanPlusQuery ibanPlusQuery,
                                 SettingToCanonicalFileEntryConverter<IbanPlus> settingToCanonicalFileEntryConverter,
                                 SettingQuery settingQuery,
                                 Integer limitRate,
                                 Integer parallelism,
                                 Duration throttleDuration,
                                 Integer readQueryBatchSize,
                                 ClassicActorSystemProvider actorSystem,
                                 EventBus eventBus) {
        super(entryHandler, settingToCanonicalFileEntryConverter,
                limitRate, parallelism, throttleDuration, readQueryBatchSize,
                settingQuery, actorSystem, eventBus);
        this.ibanPlusQuery = ibanPlusQuery;
    }

    @Override
    protected String getPersistenceId(IbanPlusEntry entry) {
        return IdProvider.getPersistenceId("ibanplus", getSettingId(entry));
    }

    @Override
    protected String getSettingId(IbanPlusEntry entry) {
        return entry.getIsoCountryCode() + "-" +  entry.getIbanIsoCountryCode() + "-" + entry.getIbanNationalId();
    }

    @Override
    protected boolean isFullFileImport(IbanPlusFile aFile) {
        return aFile.getFileType() == FileType.FULL;
    }

    @Override
    protected CompletionStage<Response<SettingsDTO<IbanPlus>>> getBatchedSettings(List<String> idList, Integer paginationSize) {
        return ibanPlusQuery.getBatchIbanPlus(idList, paginationSize);
    }

    @Override
    protected boolean shouldBeDeleted(IbanPlusEntry entry) {
        return entry.getModificationFlag().equals("D");
    }

    @Override
    protected boolean shouldBeUpserted(IbanPlusEntry entry) {
        return entry.getModificationFlag().equals("A") || entry.getModificationFlag().equals("M");
    }
}
    @Bean
    FileProcessor<IbanPlusFile, IbanPlusEntry> ibanPlusFileProcessor(OutputEntryHandler<IbanPlusEntry> ibanPlusEntryHandler,
                                                                     IbanPlusQuery ibanPlusQuery,
                                                                     SettingToCanonicalFileEntryConverter<IbanPlus> settingToCanonicalFileEntryConverter,
                                                                     SettingQuery settingQuery,
                                                                     @Value("${ipf.csm-reachability.settings-api.limit-rate}") Integer limitRate,
                                                                     @Value("${ipf.csm-reachability.settings-api.iban-plus.parallelism}") Integer parallelism,
                                                                     @Value("${ipf.csm-reachability.settings-api.throttle-duration}") Duration throttleDuration,
                                                                     @Value("${ipf.csm-reachability.settings-api.read-query-batch-size.iban-plus-processor}") Integer readQueryBatchSize,
                                                                     EventBus eventBus) {
        return new IbanPlusFileProcessor(ibanPlusEntryHandler, ibanPlusQuery, settingToCanonicalFileEntryConverter, settingQuery, limitRate, parallelism, throttleDuration, readQueryBatchSize, actorSystem, eventBus);
    }

    @Bean
    FilePostProcessor ibanPlusFilePostProcessor(SettingQuery settingQuery,
                                                IbanPlusQuery ibanPlusQuery,
                                                @Value("${ipf.csm-reachability.settings-api.limit-rate}") Integer limitRate,
                                                @Value("${ipf.csm-reachability.settings-api.throttle-duration}") Duration throttleDuration) {
        return new IbanPlusFilePostProcessor(ibanPlusQuery, settingQuery, limitRate, throttleDuration, actorSystem);
    }

Process Definition

Asocia el tipo de archivo canónico a un file processor y un file converter, además de proporcionar opcionalmente una condición de predicado sobre cuándo procesar desde la fuente de ingesta

    @Bean
    ProcessDefinition<IbanPlusFile, IbanPlusEntry> ibanPlusFileProcessDefinition(FileProcessor<IbanPlusFile, IbanPlusEntry> ibanPlusFileProcessor,
                                                                                 FilePostProcessor ibanPlusFilePostProcessor,
                                                                                 IbanPlusFileConverter ibanPlusFileConverter) {
        return ProcessDefinition.<IbanPlusFile, IbanPlusEntry>builder()
                .processName("IbanPlus Ingestion File Process")
                .jobName("IbanPlus import")
                .processGate(basicFileMetaData -> ValidateFileUtils.validateFileType(basicFileMetaData.getExtension()))
                .fileProcessor(ibanPlusFileProcessor)
                .filePostProcessor(ibanPlusFilePostProcessor)
                .converter(ibanPlusFileConverter)
                .build();
    }

Output Entry Handler

Punto de integración entre la ingesta de archivos y los componentes de gestión de settings. Invoca la Setting Management API para crear un setting en la plataforma Dynamic Processing Settings (DPS).

La configuración para comunicación directa con DPS (DPS está embebido dentro de la aplicación):

ipf.dps-api.client-type="direct"

La configuración para comunicación http con DPS (DPS es una aplicación standalone):

ipf.dps-api {
  client-type = "connector"
  http.client {
    host = "localhost"
    endpoint-url = "/settings-objects/"
    port = 8080
  }
}

IMPORTANTE: Para comunicarse con la Setting Management API sobre HTTP, todas las configuraciones del cliente HTTP relacionadas se ubican en los ajustes ipf.dps-api.http e ipf.dps-api.default-connector.

    @Bean
    @ConditionalOnProperty(name = "ipf.csm-reachability.settings-api.file-handling.connection", havingValue = "http")
    OutputEntryHandler outputEntryHandlerViaHttp(DpsCrudClientPort dpsCrudClientPort,
                                                 DomainToTargetTypeConverter<CanonicalFileEntry, CreateSetting<?>> canonicalEntryToSetting) {
        log.info("Using http transport");
        return new CanonicalEntryHandlerViaHttp<>(dpsCrudClientPort, canonicalEntryToSetting);
    }

    @Bean
    @ConditionalOnProperty(name = "ipf.csm-reachability.settings-api.file-handling.connection", havingValue = "direct")
    OutputEntryHandler outputEntryHandlerDirect(ClassicActorSystemProvider actorSystem,
                                                DomainToTargetTypeConverter<CanonicalFileEntry, CreateSetting<?>> canonicalEntryToSetting,
                                                List<SettingDefinition<?>> settingDefinitions, DpsCrudClientPort dpsCrudClientPort) {
        log.info("Using direct transport");
        CanonicalEntryHandlerDirect.Settings settings = CanonicalEntryHandlerDirect.Settings.from(actorSystem);
        return new CanonicalEntryHandlerDirect<>(actorSystem, settings, canonicalEntryToSetting, settingDefinitions, dpsCrudClientPort);
    }

La configuración puede cargarse mediante archivos, p. ej., IBANPlus, IBANStructure; pero también puede consumirse vía HTTP/REST, como es el caso de SIC.

Ejemplo de configuración para consumir archivos desde el directorio local:

ipf.csm-reachability {
  participant {
    tips {
      process-participant.enabled = true
      file-ingestion {
        files-directory = "/import/csm-participant/eurosystem-tips-directory"
        directory-id = "TIPS"
        initial-delay = 5s
        interval = 30s
      }
    }

    rt1 {
      process-participant.enabled = true
      file-ingestion {
        files-directory = "/import/csm-participant/eba-rt1-routing-tables"
        directory-id = "RT1"
        initial-delay = 5s
        interval = 30s
      }
    }
  }

  iban-plus {
    enabled = true
    file-ingestion {
      files-directory = "IBANPLUS"
      files-directory = "/import/iban-plus-directory/swiftref-iban-plus"
      initial-delay = 5s
      interval = 30s
    }
  }

  iban-structure {
    enabled = true
    file-ingestion {
      files-directory = "/import/iban-plus-directory/swiftref-iban-structure"
      directory-id = "iban-structure"
      initial-delay = 5s
      interval = 1h
    }
  }

  party-entity.six-bankmaster {
    enabled = true
    file-ingestion {
      directory-id = "party-entity-bank-master"
      files-directory = "/import/party-entity-directory/six-bank-master-3.0/"
      initial-delay = 5s
      interval = 30s
    }
  }

  party-entity.swift-bankplus {
    enabled = true
    file-ingestion {
      directory-id = "party-entity-swift"
      files-directory = "/import/party-entity-directory/swiftref-bank-directory-plus"
      initial-delay = 5s
      interval = 30s
    }
  }

  exclusion-list {
    enabled = true
    file-ingestion {
      files-directory = "/import/iban-plus-directory/swiftref-exclusion-list"
      directory-id = "exclusion-list"
      initial-delay = 5s
      interval = 30s
    }
  }

  bic-dir-2018 {
    enabled = true
    file-ingestion {
      files-directory = "/import/bic-dir-2018/swiftref-bic-dir-2018"
      directory-id = "BICDIR2018"
      initial-delay = 5s
      interval = 30s
    }
  }
}

Config Parameter

Descripción

files-directory

Ubicación en el directorio local para sondear archivos

directory-id

Esto debe coincidir con una entrada directoryID en la configuración directory-mapping

initial-delay

Retraso antes del primer sondeo

interval

Tiempo entre sondeos de archivos. Evita valores muy pequeños para el intervalo, ya que pueden provocar condiciones de carrera entre hilos

Ejemplo de configuración para consumir archivos de participantes SIC vía HTTP/REST:

ipf.csm-reachability.participant.sic {
  process-participant {
    # Specify where from SIC Participants can be fetched and processed.
    # Two modes are available: `http` and `file-ingestion`. By default,
    # http mode is enabled. Both modes are configured under `sic-participant` configuration section.
    mode = http

    # Enables SIC Participant processing by setting `true`
    # or disables it by setting `false` flag.
    enabled = true
  }

  resiliency-settings {
    minimum-number-of-calls = 10
    max-attempts = 10
    reset-timeout = 3s
    initial-retry-wait-duration = 1s
    backoff-multiplier = 2
  }

  # Configures HTTP client how to talk to six group bank master API
  http {
    client {
      endpoint-url = "https://api.six-group.com/api/epcd/bankmaster/v2/public"
    }
  }
}

Ejemplo de configuración para consumir archivos de participantes SIC vía ingesta de archivos mediante directorio local:

ipf.csm-reachability.participant.sic {
  process-participant {
    # Specify where from SIC Participants can be fetched and processed.
    # Two modes are available: `http` and `file-ingestion`. By default,
    # http mode is enabled. Both modes are configured under `sic-participant` configuration section.
    mode = file-ingestion

    # Enables SIC Participant processing by setting `true`
    # or disables it by setting `false` flag.
    enabled = true
  }

  resiliency-settings {
    minimum-number-of-calls = 10
    max-attempts = 10
    reset-timeout = 3s
    initial-retry-wait-duration = 1s
    backoff-multiplier = 2
  }

  # Configures ingesting SIC participants through a file
  file-ingestion {
    files-directory = "/import/csm-participant/six-bank-master-3.0/"
    directory-id = "SIC"
    initial-delay = 5s
    interval = 30s
  }
}

Los ajustes de planificación (scheduling) para HTTP se configuran por separado según:

ipf.csm-reachability.sic.scheduler-settings {
  initial-delay = 10s
  interval = 1d
}

Habilitar/Deshabilitar el procesamiento de archivos

Deprecated Config Parameter

Config Parameter

Descripción

Valor predeterminado

swift.ibanplus.process-ibanplus.enabled

ipf.csm-reachability.iban-plus.enabled

Habilitar el procesamiento del archivo Swift IBAN Plus

true

ibanstructure.process-ibanstructure.enabled

ipf.csm-reachability.iban-structure.enabled

Habilitar el procesamiento del archivo Swift IBAN Structure

true

swift.bankplus.process-bank-directory-plus.enabled

ipf.csm-reachability.party-entity.swift-bankplus.enabled

Habilitar el procesamiento del archivo Swift Bank Directory Plus

true

exclusionlist.process-exclusionlist.enabled

ipf.csm-reachability.exclusion-list.enabled

Habilitar el procesamiento del archivo Swift Exclusion List

true

bic-dir-2018.process-bics.enabled

ipf.csm-reachability.bic-dir-2018.enabled

Habilitar el procesamiento del archivo Swift BICDir2018

true

party-entity.six.bankmaster.process-bank-master.enabled

ipf.csm-reachability.party-entity.six-bankmaster.enabled

Habilitar el procesamiento del archivo Six Bank Master

true

tips.process-participant.enabled

ipf.csm-reachability.participant.tips.process-participant.enabled

Habilitar el procesamiento del archivo de participantes TIPS

true

rt1.process-participant.enabled

ipf.csm-reachability.participant.rt1.process-participant.enabled

Habilitar el procesamiento del archivo de participantes RT1

true

step2.process-participant.enabled

ipf.csm-reachability.participant.step2.process-participant.enabled

Habilitar el procesamiento del archivo de participantes STEP2

true

sic.process-participant.enabled

ipf.csm-reachability.participant.sic.process-participant.enabled

Habilitar el procesamiento del archivo de participantes SIC

true

Límites de tasa de ingesta de archivos y configuración de throttle

ADVERTENCIA: Los parámetros de configuración antiguos que estaban bajo settings-api ahora están en desuso y programados para su eliminación en futuras versiones. Esta configuración ahora está bajo ipf.csm-reachability.settings-api y sigue el estándar IPF.

Config Parameter

Descripción

Valor predeterminado

ipf.csm-reachability.settings-api.limit-rate

Limita el throughput a un número especificado de registros consumidos. Cuando se establece este valor, también se debe proporcionar throttle-duration.

ipf.csm-reachability.settings-api.throttle-duration

Se usa con limit-rate para establecer la tasa máxima de consumo de registros; Especificado como duración, p. ej. 1s, 1min

ipf.csm-reachability.settings-api.read-query-batch-size.participant-processor

Número máximo de settings a obtener por llamada a la API.

1000

ipf.csm-reachability.settings-api.read-query-batch-size.bicDir2018-processor

Número máximo de settings a obtener por llamada a la API.

1000

ipf.csm-reachability.settings-api.exclusion-list.parallelism

Número máximo de llamadas a la API por lotes en paralelo.

1

ipf.csm-reachability.settings-api.read-query-batch-size.exclusion-list-processor

Número máximo de settings a obtener por llamada a la API.

1000

ipf.csm-reachability.settings-api.iban-plus.parallelism

Número máximo de llamadas a la API por lotes en paralelo.

1

ipf.csm-reachability.settings-api.read-query-batch-size.iban-plus-processor

Número máximo de settings a obtener por llamada a la API.

1000

ipf.csm-reachability.settings-api.read-query-batch-size.iban-structure-processor

Número máximo de settings a obtener por llamada a la API.

1000

ipf.csm-reachability.settings-api.party-entity.parallelism

Número máximo de llamadas a la API por lotes en paralelo.

1

ipf.csm-reachability.settings-api.read-query-batch-size.party-entity-processor

Número máximo de settings a obtener por llamada a la API.

1000

ipf.csm-reachability.settings-api.direct.request-queue-size

Cola de búfer

3000

ipf.csm-reachability.settings-api.direct.throttle-elements

Limita el throughput a un número especificado de registros consumidos. Cuando se establece este valor, también se debe proporcionar throttle-duration. Cuando es 0, está desactivado.

0

ipf.csm-reachability.settings-api.direct.throttle-duration

Se usa con 'throttle-elements' para establecer la tasa máxima de consumo de registros. Cuando es 0, está desactivado.

0

ipf.csm-reachability.settings-api.direct.parallelism

Número máximo de llamadas a la API en paralelo.

3

El csm-reachability-file-notification-s3 se configura según:

ipf.file-manager.s3 {
  region = "us-east-1"
  upload-parallelism = 1
  credentials {
    access-key-id = "accessKey"
    secret-access-key = "secretAccessKey"
  }
  resiliency-settings {
    # Determines the maximum number of retries to be made. Note that this includes the first failed attempt.
    max-attempts = 2
    # Retry if HTTP error code is in the list
    retryable-status-codes = [500, 503]
    attempt-timeout = 2s
    call-timeout = 3s
  }
}

ipf.file-manager.s3 {
  region = "us-east-1"
  upload-parallelism = 1
  credentials {
    access-key-id = "accessKey"
    secret-access-key = "secretAccessKey"
  }

  resiliency {
    retry = 3
    api-call-timeout = 10s # duration
  }
}

Configuración del File Ingestion Notification Service

El File Ingestion Notification Service admite los siguientes formatos:

  • CSM Participant - RT1 (Direct and Indirect Participants)

  • CSM Participant - STEP2 SCT (Direct and Indirect Participants)

  • CSM Participant - TIPS directory

  • CSM Participant - STET directory

  • CSM Participant - SicInst

  • Party Entity Directory - Bank Master 3.0

  • Party Entity Directory - Bank Directory Plus

  • IBAN Plus

  • IBAN Structure

  • Exclusion List

  • Bic Dir 2018

La ingesta de archivos desde aws s3 se puede habilitar/deshabilitar estableciendo la propiedad ipf.csm-reachability.file-ingestion.s3.enabled a true/false. Está establecida en false de forma predeterminada. Habilitar o deshabilitar la función de ingesta desde s3 no afecta a la ingesta de archivos desde directorio local.

La configuración para fileProcessedNotificationSendConnector y fileAvailableNotificationReceiveConnector puede sobrescribirse en las rutas respectivas, ipf.csm-reachability.file-ingestion.notification-service.connector.file-processed y ipf.csm-reachability.file-ingestion.notification-service.connector.file-available.

El csm-reachability-file-notification-ingestion-notification-service se configura según:

akka {
  kafka {
    consumer {
      kafka-clients {
        bootstrap.servers = "kafka:9092"
      }
      restart-settings = ${common-flow-restart-settings}
    }
    producer {
      kafka-clients {
        bootstrap.servers = "kafka:9092"
      }
      restart-settings = ${common-flow-restart-settings}
    }
  }
}

ipf.csm-reachability {

  file-ingestion {
    s3.enabled = false

    notification-service {
      kafka {
        producer {
          topic = FILE_PROCESSED_NOTIFICATION
          restart-settings = ${common-flow-restart-settings}
          kafka-clients {
            group.id = file-processing-notification-group
          }
        }
        consumer {
          topic = FILE_AVAILABLE_NOTIFICATION
          restart-settings = ${common-flow-restart-settings}
          kafka-clients {
            group.id = file-available-notification-group
          }
        }
      }

      default-file-ingestion {
        # path which should be overriden
        files-directory = "/import"
        initial-delay = 5s
        interval = 30s
      }
      # directory id values for file ingestion. By default they are referencing respective values from local directory configuration
      directory-ids {
        RT02SCI = [${?ipf.csm-reachability.participant.rt1.file-ingestion.directory-id}]
        S204SCT = [${?ipf.csm-reachability.participant.step2.file-ingestion.directory-id}]
        TIPS = [${?ipf.csm-reachability.participant.tips.file-ingestion.directory-id}]
        agreements = [${?ipf.csm-reachability.participant.stet.file-ingestion.directory-id}]
        bankmaster = [
          ${?ipf.csm-reachability.party-entity.six-bankmaster.file-ingestion.directory-id},
          ${?ipf.csm-reachability.participant.sic.file-ingestion.directory-id}
        ]
        BANKDIRECTORYPLUS = [${?ipf.csm-reachability.party-entity.swift-bankplus.file-ingestion.directory-id}]
        IBANPLUS = [${?ipf.csm-reachability.iban-plus.file-ingestion.directory-id}]
        IBANSTRUCTURE = [${?ipf.csm-reachability.iban-structure.file-ingestion.directory-id}]
        EXCLUSIONLIST = [${?ipf.csm-reachability.exclusion-list.file-ingestion.directory-id}]
        BICDIR2018 = [${?ipf.csm-reachability.bic-dir-2018.file-ingestion.directory-id}]
      }
    }
  }
}

common-flow-restart-settings {
  min-backoff = 1s
  max-backoff = 5s
  random-factor = 0.25
  max-restarts = 5
  max-restarts-within = 10m
}