Creating a Custom Settings Ingester

Component setup

In order to ingest/import settings into the dynamic processing settings platform you need to configure the following:

  • File Ingestion Cluster Singleton

  • ReceiveConnector/SendConnector

  • Connector Transport

  • File Converter

  • File Processor

  • Process Definition

  • Entry Handler

  • Enable/Disable processing of SWIFT files

An example configuration for ingesting IbanPlus settings is included below. Similar components would need to be setup if you wanted to set up setting ingestion from a new source

File Ingestion Cluster Singleton

The cluster singleton accepts the restart settings of the singleton actor via config. The cluster singleton accepts the restart settings of the singleton actor via config.

ipf.csm-reachability.ingestion {
  restart-settings {
    min-backoff = 3s
    max-backoff = 30s
    random-factor = 0.25
    max-restarts = 20
    max-restarts-within = 30m
  }
}

ReceiveConnector/SendConnector

Depending on how the configuration is to be consumed either a SendConnector or a ReceiveConnector needs to be defined. When consuming files from a local directory, a ReceiveConnector would be defined. However, if consuming settings from a REST API, this would be a SendConnector.

        receiveConnector = ReceiveConnectorBuilderHelper.<IngestedFile>defaultBuilder(name + " File Ingestion Connector", actorSystem)
                .withConnectorTransport(localDirectoryConnectorTransport)
                .withReceiveTransportMessageConverter(ingestedFileReceiveTransportMessageConverter)
                .withProcessingContextExtractor(connectorMessage -> InitiatingProcessingContextExtractor.<IngestedFile>builder().build().extract(connectorMessage))
                .withReceiveHandler((aggregateId, payload) -> {
                    return processManager.process(payload)
                            .toFuture()
                            .whenComplete(((processedEntryResponses, throwable) -> {
                                if (throwable != null) {
                                    log.error("Error occurred during processing of " + name + " File", throwable);
                                } else {
                                    log.info("Successfully processed {} File", name);
                                }
                            })).thenApply(done -> null);
                })
                .withLoggingErrorHandler(exception -> {
                    log.error("Exception occurred during file ingestion for file : " + name, exception);
                    return CompletableFuture.completedStage(null);
                })
                .build();

Connector Transport

Once defined it will need to be referenced in the Connector. Supported ConnectorTransports include, LocalDirectory (file), Http, Kafka and JMS.

When defining a LocalDirectoryConnectorTransport, a new directory mapping entry will have to be added:

ipf.file-ingestion.directory-mappings += {
    directory-id = "your-ingester-id" (1)
    # has to match the job name used in the ProcessDefinition below
    job-name = "your-ingester-job-name" (2)
}
1 directory-id represents a directory-id from the configuration of the LocalDirectoryConnectorTransport.
2 job-name represents job name associated with the ProcessDefinition#jobName
    @Bean
    LocalDirectoryConnectorTransport ibanPlusLocalDirectoryConnectorTransport(EventBus eventBus,
                                                                              DirectoryConfiguration directoryConfiguration,
                                                                              List<ProcessDefinition<? extends CanonicalFile, ? extends CanonicalFileEntry>> processDefinitions) {

        var errorHandler = new FileHandlerCommonIngestionDefaultReceiveErrorHandler(eventBus,
                directoryConfiguration,
                processDefinitions,
                actorSystem,
                FILE_INGESTION_CONFIG_ROOT_PATH);

        return LocalDirectoryConnectorTransport.builder()
                .withActorSystem(actorSystem)
                .withConfigRootPath(FILE_INGESTION_CONFIG_ROOT_PATH)
                .withFileIngestionConfiguration(FileIngestionConfiguration.create(
                        FILE_INGESTION_CONFIG_ROOT_PATH,
                        actorSystem.classicSystem().settings().config()
                ))
                .withName("Iban Plus Local Directory Transport")
                .withTransportErrorHandler(errorHandler)
                .build();
    }

File Converter

This is responsible for converting the proprietary ingestion source (e.g. file) into a canonical representation (IbanPlusFile)

@RequiredArgsConstructor
public class IbanPlusFileConverter implements FileConverter<IbanPlusFile> {

    private final XmlMapper xmlObjectMapper;
    private final IbanPlusLineParser ibanPlusLineParser;

    @Override
    public IbanPlusFile convert(IngestedFile ingestedFile) {

        List<IbanPlusEntry> entries;
        FileType fileType;

        if ("xml".equalsIgnoreCase(ingestedFile.getMetaData().getExtension())) {
            Dataexport dataexport = parseRootObjectXml(ingestedFile);
            entries = getFileEntries(dataexport);
            fileType = StringUtils.equalsIgnoreCase(dataexport.getFiletype(), FileType.FULL.name()) ? FileType.FULL : FileType.DELTA;

        } else {
            entries = getTxtFileEntries(ingestedFile);
            fileType = StringUtils.containsIgnoreCase(ingestedFile.getMetaData().getName(), FileType.FULL.name())? FileType.FULL : FileType.DELTA;
        }

        CanonicalFileMetaData metaData = CanonicalFileMetaData.builder()
                .name(ingestedFile.getMetaData().getName())
                .dateTime(ingestedFile.getMetaData().getDateTime())
                .extension(ingestedFile.getMetaData().getExtension())
                .build();

        return new IbanPlusFile(metaData, entries, fileType);
    }

    private List<IbanPlusEntry> getFileEntries(Dataexport dataexport) {
        return dataexport.getIbanplusV3s()
                .stream()
                .map(ibanplusV3 -> IbanPlusEntry.builder()
                        .modificationFlag(ibanplusV3.getModificationFlag())
                        .ibanBic(ibanplusV3.getIbanBic())
                        .ibanIsoCountryCode(ibanplusV3.getIbanIsoCountryCode())
                        .isoCountryCode(ibanplusV3.getIsoCountryCode())
                        .ibanNationalId(ibanplusV3.getIbanNationalId())
                        .institutionName(ibanplusV3.getInstitutionName())
                        .routingBic(ibanplusV3.getRoutingBic())
                        .serviceContext(ibanplusV3.getServiceContext())
                        .build())
                .collect(Collectors.toList());
    }

    private List<IbanPlusEntry> getTxtFileEntries(IngestedFile ingestedFile) {
        String rawText = parseRootObjectTxt(ingestedFile);
        List<String> textRows = List.of(rawText.split(System.lineSeparator()));
        String resultsRow = textRows.stream()
                .findFirst()
                .orElseThrow(() -> new IconRuntimeException(String.format("Row missing or incomplete in ingested file: %s", ingestedFile.getMetaData().getName())));

        List<String> entryLines = textRows.subList(textRows.indexOf(resultsRow) + 1, textRows.size());
        return entryLines.stream()
                .map(ibanPlusLineParser::parseLine)
                .filter(Optional::isPresent)
                .map(Optional::get)
                .collect(Collectors.toList());
    }

    @SneakyThrows
    private Dataexport parseRootObjectXml(IngestedFile ingestedFile) {
        InputStream inputStream = new ByteArrayInputStream(ingestedFile.getContent());
        return xmlObjectMapper.readValue(inputStream, Dataexport.class);
    }

    @SneakyThrows
    private static String parseRootObjectTxt(IngestedFile ingestedFile) {
        InputStream inputStream = new ByteArrayInputStream(ingestedFile.getContent());
        return IOUtils.toString(inputStream, StandardCharsets.UTF_8);
    }
}
    @Bean
    IbanPlusFileConverter ibanPlusFileConverter(XmlMapper xmlObjectMapper, IbanPlusLineParser ibanPlusLineParser) {
        return new IbanPlusFileConverter(xmlObjectMapper, ibanPlusLineParser);
    }

File Processor

Breaks up the file being processed into individual entries (settings) which can then be processed by the OutputEntryHandler

@Slf4j
public class IbanPlusFileProcessor extends AbstractSwiftFileProcessor<IbanPlusFile, IbanPlusEntry, IbanPlus> {

    private final IbanPlusQuery ibanPlusQuery;

    public IbanPlusFileProcessor(OutputEntryHandler<IbanPlusEntry> entryHandler,
                                 IbanPlusQuery ibanPlusQuery,
                                 SettingToCanonicalFileEntryConverter<IbanPlus> settingToCanonicalFileEntryConverter,
                                 SettingQuery settingQuery,
                                 Integer limitRate,
                                 Integer parallelism,
                                 Duration throttleDuration,
                                 Integer readQueryBatchSize,
                                 ClassicActorSystemProvider actorSystem,
                                 EventBus eventBus) {
        super(entryHandler, settingToCanonicalFileEntryConverter,
                limitRate, parallelism, throttleDuration, readQueryBatchSize,
                settingQuery, actorSystem, eventBus);
        this.ibanPlusQuery = ibanPlusQuery;
    }

    @Override
    protected String getPersistenceId(IbanPlusEntry entry) {
        return IdProvider.getPersistenceId("ibanplus", getSettingId(entry));
    }

    @Override
    protected String getSettingId(IbanPlusEntry entry) {
        return entry.getIsoCountryCode() + "-" +  entry.getIbanIsoCountryCode() + "-" + entry.getIbanNationalId();
    }

    @Override
    protected boolean isFullFileImport(IbanPlusFile aFile) {
        return aFile.getFileType() == FileType.FULL;
    }

    @Override
    protected CompletionStage<Response<SettingsDTO<IbanPlus>>> getBatchedSettings(List<String> idList, Integer paginationSize) {
        return ibanPlusQuery.getBatchIbanPlus(idList, paginationSize);
    }

    @Override
    protected boolean shouldBeDeleted(IbanPlusEntry entry) {
        return entry.getModificationFlag().equals("D");
    }

    @Override
    protected boolean shouldBeUpserted(IbanPlusEntry entry) {
        return entry.getModificationFlag().equals("A") || entry.getModificationFlag().equals("M");
    }
}
    @Bean
    FileProcessor<IbanPlusFile, IbanPlusEntry> ibanPlusFileProcessor(OutputEntryHandler<IbanPlusEntry> ibanPlusEntryHandler,
                                                                     IbanPlusQuery ibanPlusQuery,
                                                                     SettingToCanonicalFileEntryConverter<IbanPlus> settingToCanonicalFileEntryConverter,
                                                                     SettingQuery settingQuery,
                                                                     @Value("${ipf.csm-reachability.settings-api.limit-rate}") Integer limitRate,
                                                                     @Value("${ipf.csm-reachability.settings-api.iban-plus.parallelism}") Integer parallelism,
                                                                     @Value("${ipf.csm-reachability.settings-api.throttle-duration}") Duration throttleDuration,
                                                                     @Value("${ipf.csm-reachability.settings-api.read-query-batch-size.iban-plus-processor}") Integer readQueryBatchSize,
                                                                     EventBus eventBus) {
        return new IbanPlusFileProcessor(ibanPlusEntryHandler, ibanPlusQuery, settingToCanonicalFileEntryConverter, settingQuery, limitRate, parallelism, throttleDuration, readQueryBatchSize, actorSystem, eventBus);
    }

    @Bean
    FilePostProcessor ibanPlusFilePostProcessor(SettingQuery settingQuery,
                                                IbanPlusQuery ibanPlusQuery,
                                                @Value("${ipf.csm-reachability.settings-api.limit-rate}") Integer limitRate,
                                                @Value("${ipf.csm-reachability.settings-api.throttle-duration}") Duration throttleDuration) {
        return new IbanPlusFilePostProcessor(ibanPlusQuery, settingQuery, limitRate, throttleDuration, actorSystem);
    }

Process Definition

Associates the canonical file type to a file processor and file converter, as well as optionally providing a predicate condition as to when to process from the ingestion source

    @Bean
    ProcessDefinition<IbanPlusFile, IbanPlusEntry> ibanPlusFileProcessDefinition(FileProcessor<IbanPlusFile, IbanPlusEntry> ibanPlusFileProcessor,
                                                                                 FilePostProcessor ibanPlusFilePostProcessor,
                                                                                 IbanPlusFileConverter ibanPlusFileConverter) {
        return ProcessDefinition.<IbanPlusFile, IbanPlusEntry>builder()
                .processName("IbanPlus Ingestion File Process")
                .jobName("IbanPlus import")
                .processGate(basicFileMetaData -> ValidateFileUtils.validateFileType(basicFileMetaData.getExtension()))
                .fileProcessor(ibanPlusFileProcessor)
                .filePostProcessor(ibanPlusFilePostProcessor)
                .converter(ibanPlusFileConverter)
                .build();
    }

Output Entry Handler

Integration point between file ingestion and setting management components. It invokes the Setting Management API to create a setting on the Dynamic Processing Settings platform (DPS).

The configuration for direct communication with DPS (DPS is embedded inside application):

ipf.dps-api.client-type="direct"

The configuration for http communication with DPS (DPS is standalone application):

ipf.dps-api {
  client-type = "connector"
  http.client {
    host = "localhost"
    endpoint-url = "/settings-objects/"
    port = 8080
  }
}
To communicate with the Setting Management API over HTTP, all related HTTP client configurations are located in the ipf.dps-api.http and ipf.dps-api.default-connector settings.
    @Bean
    @ConditionalOnProperty(name = "ipf.csm-reachability.settings-api.file-handling.connection", havingValue = "http")
    OutputEntryHandler outputEntryHandlerViaHttp(DpsCrudClientPort dpsCrudClientPort,
                                                 DomainToTargetTypeConverter<CanonicalFileEntry, CreateSetting<?>> canonicalEntryToSetting) {
        log.info("Using http transport");
        return new CanonicalEntryHandlerViaHttp<>(dpsCrudClientPort, canonicalEntryToSetting);
    }

    @Bean
    @ConditionalOnProperty(name = "ipf.csm-reachability.settings-api.file-handling.connection", havingValue = "direct")
    OutputEntryHandler outputEntryHandlerDirect(ClassicActorSystemProvider actorSystem,
                                                DomainToTargetTypeConverter<CanonicalFileEntry, CreateSetting<?>> canonicalEntryToSetting,
                                                List<SettingDefinition<?>> settingDefinitions, DpsCrudClientPort dpsCrudClientPort) {
        log.info("Using direct transport");
        CanonicalEntryHandlerDirect.Settings settings = CanonicalEntryHandlerDirect.Settings.from(actorSystem);
        return new CanonicalEntryHandlerDirect<>(actorSystem, settings, canonicalEntryToSetting, settingDefinitions, dpsCrudClientPort);
    }

Configuration can be loaded via files e.g. IBANPlus, IBANStructure But it can also be consumed via HTTP/REST as is the case with SIC

Example config for consuming files from the local directory:

ipf.csm-reachability {
  participant {
    tips {
      process-participant.enabled = true
      file-ingestion {
        files-directory = "/import/csm-participant/eurosystem-tips-directory"
        directory-id = "TIPS"
        initial-delay = 5s
        interval = 30s
      }
    }

    rt1 {
      process-participant.enabled = true
      file-ingestion {
        files-directory = "/import/csm-participant/eba-rt1-routing-tables"
        directory-id = "RT1"
        initial-delay = 5s
        interval = 30s
      }
    }
  }

  iban-plus {
    enabled = true
    file-ingestion {
      files-directory = "IBANPLUS"
      files-directory = "/import/iban-plus-directory/swiftref-iban-plus"
      initial-delay = 5s
      interval = 30s
    }
  }

  iban-structure {
    enabled = true
    file-ingestion {
      files-directory = "/import/iban-plus-directory/swiftref-iban-structure"
      directory-id = "iban-structure"
      initial-delay = 5s
      interval = 1h
    }
  }

  party-entity.six-bankmaster {
    enabled = true
    file-ingestion {
      directory-id = "party-entity-bank-master"
      files-directory = "/import/party-entity-directory/six-bank-master-3.0/"
      initial-delay = 5s
      interval = 30s
    }
  }

  party-entity.swift-bankplus {
    enabled = true
    file-ingestion {
      directory-id = "party-entity-swift"
      files-directory = "/import/party-entity-directory/swiftref-bank-directory-plus"
      initial-delay = 5s
      interval = 30s
    }
  }

  exclusion-list {
    enabled = true
    file-ingestion {
      files-directory = "/import/iban-plus-directory/swiftref-exclusion-list"
      directory-id = "exclusion-list"
      initial-delay = 5s
      interval = 30s
    }
  }

  bic-dir-2018 {
    enabled = true
    file-ingestion {
      files-directory = "/import/bic-dir-2018/swiftref-bic-dir-2018"
      directory-id = "BICDIR2018"
      initial-delay = 5s
      interval = 30s
    }
  }

  party-entity.identifiers {
    enabled = true
    file-ingestion {
      directory-id = "party-entity-identifiers"
      files-directory = "/import/party-entity-directory/party-entity-identifiers/"
      initial-delay = 5s
      interval = 30s
    }
  }

  iban-structure.formats {
    enabled = true
    file-ingestion {
      directory-id = "iban-structure-formats"
      files-directory = "import/iban-plus-directory/iban-structure-formats/"
      initial-delay = 5s
      interval = 30s
    }
  }
}

Config Parameter

Description

files-directory

Location on local directory to poll for files

directory-id

This should match to a directoryID entry in the directory-mapping configuration

initial-delay

Delay before first poll

interval

Time between polling for files. Avoid very small values for the interval, as they can lead to thread race

Example config for consuming SIC Participant files via HTTP/REST:

ipf.csm-reachability.participant.sic {
  process-participant {
    # Specify where from SIC Participants can be fetched and processed.
    # Two modes are available: `http` and `file-ingestion`. By default,
    # http mode is enabled. Both modes are configured under `sic-participant` configuration section.
    mode = http

    # Enables SIC Participant processing by setting `true`
    # or disables it by setting `false` flag.
    enabled = true
  }

  resiliency-settings {
    minimum-number-of-calls = 10
    max-attempts = 10
    reset-timeout = 3s
    initial-retry-wait-duration = 1s
    backoff-multiplier = 2
  }

  # Configures HTTP client how to talk to six group bank master API
  http {
    client {
      endpoint-url = "https://api.six-group.com/api/epcd/bankmaster/v2/public"
    }
  }
}

Example config for consuming SIC Participant files via file ingestion through local directory:

ipf.csm-reachability.participant.sic {
  process-participant {
    # Specify where from SIC Participants can be fetched and processed.
    # Two modes are available: `http` and `file-ingestion`. By default,
    # http mode is enabled. Both modes are configured under `sic-participant` configuration section.
    mode = file-ingestion

    # Enables SIC Participant processing by setting `true`
    # or disables it by setting `false` flag.
    enabled = true
  }

  resiliency-settings {
    minimum-number-of-calls = 10
    max-attempts = 10
    reset-timeout = 3s
    initial-retry-wait-duration = 1s
    backoff-multiplier = 2
  }

  # Configures ingesting SIC participants through a file
  file-ingestion {
    files-directory = "/import/csm-participant/six-bank-master-3.0/"
    directory-id = "SIC"
    initial-delay = 5s
    interval = 30s
  }
}

The scheduling settings for HTTP are configured separately as per:

ipf.csm-reachability.sic.scheduler-settings {
  initial-delay = 10s
  interval = 1d
}

Enable/Disable processing of files

Deprecated Config Parameter

Config Parameter

Description

Default value

swift.ibanplus.process-ibanplus.enabled

ipf.csm-reachability.iban-plus.enabled

Enable processing of Swift IBAN Plus file

true

ibanstructure.process-ibanstructure.enabled

ipf.csm-reachability.iban-structure.enabled

Enable processing of Swift IBAN Structure file

true

swift.bankplus.process-bank-directory-plus.enabled

ipf.csm-reachability.party-entity.swift-bankplus.enabled

Enable processing of Swift Bank Directory Plus file

true

exclusionlist.process-exclusionlist.enabled

ipf.csm-reachability.exclusion-list.enabled

Enable processing of Swift Exclusion List file

true

bic-dir-2018.process-bics.enabled

ipf.csm-reachability.bic-dir-2018.enabled

Enable processing of Swift BICDir2018 file

true

party-entity.six.bankmaster.process-bank-master.enabled

ipf.csm-reachability.party-entity.six-bankmaster.enabled

Enable processing of Six Bank Master file

true

tips.process-participant.enabled

ipf.csm-reachability.participant.tips.process-participant.enabled

Enable processing of TIPS participant file

true

rt1.process-participant.enabled

ipf.csm-reachability.participant.rt1.process-participant.enabled

Enable processing of RT1 participant file

true

step2.process-participant.enabled

ipf.csm-reachability.participant.step2.process-participant.enabled

Enable processing of STEP2 participant file

true

sic.process-participant.enabled

ipf.csm-reachability.participant.sic.process-participant.enabled

Enable processing of SIC participant file

true

no deprecated property

ipf.csm-reachability.party-entity.identifiers.enabled

Enable processing of SwiftRef IDENTIFIERS-ALL file

true

no deprecated property

ipf.csm-reachability.iban-structure.formats.enabled

Enable processing of SwiftRef FORMATS-ALL/FORMATS-CTRY files

true

File ingestion limit rates and throttle configuration

Old configuration parameters that were under settings-api are now deprecated and scheduled for removal in future releases. This configuration is now under ipf.csm-reachability.settings-api and follows the IPF standard.

Config Parameter

Description

Default value

ipf.csm-reachability.settings-api.limit-rate

Limits the throughput to a specified number of consumed records. When this value is set, throttle-duration must also be provided.

ipf.csm-reachability.settings-api.throttle-duration

Is used with limit-rate to set the maximum rate for consuming records; Specified as duration, e.g. 1s, 1min

ipf.csm-reachability.settings-api.read-query-batch-size.participant-processor

Maximum number of settings to get from API call.

1000

ipf.csm-reachability.settings-api.read-query-batch-size.bicDir2018-processor

Maximum number of settings to get from API call.

1000

ipf.csm-reachability.settings-api.exclusion-list.parallelism

Maximum number of parallel batch API calls.

1

ipf.csm-reachability.settings-api.read-query-batch-size.exclusion-list-processor

Maximum number of settings to get from API call.

1000

ipf.csm-reachability.settings-api.iban-plus.parallelism

Maximum number of parallel batch API calls.

1

ipf.csm-reachability.settings-api.read-query-batch-size.iban-plus-processor

Maximum number of settings to get from API call.

1000

ipf.csm-reachability.settings-api.read-query-batch-size.iban-structure-processor

Maximum number of settings to get from API call.

1000

ipf.csm-reachability.settings-api.party-entity.parallelism

Maximum number of parallel batch API calls.

1

ipf.csm-reachability.settings-api.read-query-batch-size.party-entity-processor

Maximum number of settings to get from API call.

1000

ipf.csm-reachability.settings-api.direct.request-queue-size

Buffer queue

3000

ipf.csm-reachability.settings-api.direct.throttle-elements

Limits the throughput to a specified number of consumed records. When this value is set, throttle-duration must also be provided. When 0 then it is off.

0

ipf.csm-reachability.settings-api.direct.throttle-duration

Is used with 'throttle-elements' to set the maximum rate for consuming records. When 0 then it is off.

0

ipf.csm-reachability.settings-api.direct.parallelism

Maximum number of parallel API calls.

3

The csm-reachability-file-notification-s3 is configured as per:

ipf.file-manager.s3 {
  region = "us-east-1"
  upload-parallelism = 1
  credentials {
    access-key-id = "accessKey"
    secret-access-key = "secretAccessKey"
  }
  resiliency-settings {
    # Determines the maximum number of retries to be made. Note that this includes the first failed attempt.
    max-attempts = 2
    # Retry if HTTP error code is in the list
    retryable-status-codes = [500, 503]
    attempt-timeout = 2s
    call-timeout = 3s
  }
}

ipf.file-manager.s3 {
  region = "us-east-1"
  upload-parallelism = 1
  credentials {
    access-key-id = "accessKey"
    secret-access-key = "secretAccessKey"
  }

  resiliency {
    retry = 3
    api-call-timeout = 10s # duration
  }
}

File Ingestion Notification Service configuration

File Ingestion Notification Service supports the following formats:

  • CSM Participant - RT1 (Direct and Indirect Participants)

  • CSM Participant - STEP2 SCT (Direct and Indirect Participants)

  • CSM Participant - TIPS directory

  • CSM Participant - SicInst

  • Party Entity Directory - Bank Master 3.0

  • Party Entity Directory - Bank Directory Plus

  • Party Entity Directory - SwiftRef Identifiers All

  • IBAN Plus - Iban Plus Directory

  • IBAN Structure - Iban Plus Directory

  • IBAN Structure - SwiftRef Formats Files

  • Exclusion List - Iban Plus Directory

  • Bic Dir 2018 - Bic Dir 2018

File ingestion from aws s3 feature can be enabled/disabled by setting ipf.csm-reachability.file-ingestion.s3.enabled property to true/false. It is set to false by default. Enabling or disabling the s3 file ingestion feature doesn’t affect local directory file ingestion.

Configuration for fileProcessedNotificationSendConnector and fileAvailableNotificationReceiveConnector can be overridden on respective paths, ipf.csm-reachability.file-ingestion.notification-service.connector.file-processed and ipf.csm-reachability.file-ingestion.notification-service.connector.file-available.

The csm-reachability-file-notification-ingestion-notification-service is configured as per:

akka {
  kafka {
    consumer {
      kafka-clients {
        bootstrap.servers = "kafka:9092"
      }
      restart-settings = ${common-flow-restart-settings}
    }
    producer {
      kafka-clients {
        bootstrap.servers = "kafka:9092"
      }
      restart-settings = ${common-flow-restart-settings}
    }
  }
}

ipf.csm-reachability {

  file-ingestion {
    s3.enabled = false

    notification-service {
      kafka {
        producer {
          topic = FILE_PROCESSED_NOTIFICATION
          restart-settings = ${common-flow-restart-settings}
          kafka-clients {
            group.id = file-processing-notification-group
          }
        }
        consumer {
          topic = FILE_AVAILABLE_NOTIFICATION
          restart-settings = ${common-flow-restart-settings}
          kafka-clients {
            group.id = file-available-notification-group
          }
        }
      }

      default-file-ingestion {
        # path which should be overriden
        files-directory = "/import"
        initial-delay = 5s
        interval = 30s
        timestamp-archived-and-failed-files = true
      }
      # directory id values for file ingestion. By default they are referencing respective values from local directory configuration
      directory-ids {
        RT02SCI = [${?ipf.csm-reachability.participant.rt1.file-ingestion.directory-id}]
        S204SCT = [${?ipf.csm-reachability.participant.step2.file-ingestion.directory-id}]
        TIPS = [${?ipf.csm-reachability.participant.tips.file-ingestion.directory-id}]
        agreements = [${?ipf.csm-reachability.participant.stet.file-ingestion.directory-id}]
        bankmaster = [
          ${?ipf.csm-reachability.party-entity.six-bankmaster.file-ingestion.directory-id},
          ${?ipf.csm-reachability.participant.sic.file-ingestion.directory-id}
        ]
        IDENTIFIERS-ALL = [${?ipf.csm-reachability.party-entity.identifiers.file-ingestion.directory-id}]
        FORMATS = [${?ipf.csm-reachability.iban-structure.formats.file-ingestion.directory-id}]
        BANKDIRECTORYPLUS = [${?ipf.csm-reachability.party-entity.swift-bankplus.file-ingestion.directory-id}]
        IBANPLUS = [${?ipf.csm-reachability.iban-plus.file-ingestion.directory-id}]
        IBANSTRUCTURE = [${?ipf.csm-reachability.iban-structure.file-ingestion.directory-id}]
        EXCLUSIONLIST = [${?ipf.csm-reachability.exclusion-list.file-ingestion.directory-id}]
        BICDIR2018 = [${?ipf.csm-reachability.bic-dir-2018.file-ingestion.directory-id}]
      }
    }
  }
}

common-flow-restart-settings {
  min-backoff = 1s
  max-backoff = 5s
  random-factor = 0.25
  max-restarts = 5
  max-restarts-within = 10m
}