Creando un Custom Configuración del Ingester
Configuración del componente
Con el fin de ingerir/importar configuraciones en el dynamic processing settings plataforma que necesita configurar lo siguiente:
-
Ingesta de Archivos Cluster Singleton
-
ReceiveConnector/SendConnector
-
Connector Transport
-
Convertidor de Archivos
-
Procesador de Archivos
-
Definición del Proceso
-
Manejador de Entradas
-
Habilitar/Deshabilitar el procesamiento de SWIFT archivos
A continuación se incluye un ejemplo de configuración para la ingestión de los ajustes de IbanPlus. Componentes similares tendrían que configurarse si usted desea establecer la ingesta de configuraciones desde una nueva fuente.
Ingesta de Archivos Cluster Singleton
El cluster singleton acepta el restart ajustes del singleton actor a través de la configuración. El cluster singleton acepta el restart ajustes del singleton actor a través de la configuración.
ipf.csm-reachability.ingestion {
restart-settings {
min-backoff = 3s
max-backoff = 30s
random-factor = 0.25
max-restarts = 20
max-restarts-within = 30m
}
}
ReceiveConnector/SendConnector
Dependiendo de cómo se deba consumir la configuración, ya sea un SendConnector o se debe definir un ReceiveConnector. Al consumir archivos de un directorio local, se definiría un ReceiveConnector. Sin embargo, si consume configuraciones de un REST API, esto sería un SendConnector.
receiveConnector = ReceiveConnectorBuilderHelper.<IngestedFile>defaultBuilder(name + " File Ingestion Connector", actorSystem)
.withConnectorTransport(localDirectoryConnectorTransport)
.withReceiveTransportMessageConverter(ingestedFileReceiveTransportMessageConverter)
.withProcessingContextExtractor(connectorMessage -> InitiatingProcessingContextExtractor.<IngestedFile>builder().build().extract(connectorMessage))
.withReceiveHandler((aggregateId, payload) -> {
return processManager.process(payload)
.toFuture()
.whenComplete(((processedEntryResponses, throwable) -> {
if (throwable != null) {
log.error("Error occurred during processing of " + name + " File", throwable);
} else {
log.info("Successfully processed {} File", name);
}
})).thenApply(done -> null);
})
.withLoggingErrorHandler(exception -> {
log.error("Exception occurred during file ingestion for file : " + name, exception);
return CompletableFuture.completedStage(null);
})
.build();
Connector Transport
Una vez definido, deberá ser referenciado en el Conector. Los Conectores de Transporte soportados incluyen, LocalDirectory (archivo), Http,Kafka y JMS.
Al definir un LocalDirectoryConnectorTransport, un nuevo directorio mapping se deberá añadir la entrada:
ipf.file-ingestion.directory-mappings += {
directory-id = "your-ingester-id" (1)
# has to match the job name used in the ProcessDefinition below
job-name = "your-ingester-job-name" (2)
}
<1>`directory-id` representa un identificador de directorio de la configuración del LocalDirectoryConnectorTransport.
<2>`job-name` representa el nombre del trabajo asociado con el ProcessDefinition#jobName
@Bean
LocalDirectoryConnectorTransport ibanPlusLocalDirectoryConnectorTransport(EventBus eventBus,
DirectoryConfiguration directoryConfiguration,
List<ProcessDefinition<? extends CanonicalFile, ? extends CanonicalFileEntry>> processDefinitions) {
var errorHandler = new FileHandlerCommonIngestionDefaultReceiveErrorHandler(eventBus,
directoryConfiguration,
processDefinitions,
actorSystem,
FILE_INGESTION_CONFIG_ROOT_PATH);
return LocalDirectoryConnectorTransport.builder()
.withActorSystem(actorSystem)
.withConfigRootPath(FILE_INGESTION_CONFIG_ROOT_PATH)
.withFileIngestionConfiguration(FileIngestionConfiguration.create(
FILE_INGESTION_CONFIG_ROOT_PATH,
actorSystem.classicSystem().settings().config()
))
.withName("Iban Plus Local Directory Transport")
.withTransportErrorHandler(errorHandler)
.build();
}
Convertidor de Archivos
Esto es responsable de convertir la fuente de ingestión propietaria (por ejemplo, archivo) en una representación canónica (IbanPlusFile).
@RequiredArgsConstructor
public class IbanPlusFileConverter implements FileConverter<IbanPlusFile> {
private final XmlMapper xmlObjectMapper;
private final IbanPlusLineParser ibanPlusLineParser;
@Override
public IbanPlusFile convert(IngestedFile ingestedFile) {
List<IbanPlusEntry> entries;
FileType fileType;
if ("xml".equalsIgnoreCase(ingestedFile.getMetaData().getExtension())) {
Dataexport dataexport = parseRootObjectXml(ingestedFile);
entries = getFileEntries(dataexport);
fileType = StringUtils.equalsIgnoreCase(dataexport.getFiletype(), FileType.FULL.name()) ? FileType.FULL : FileType.DELTA;
} else {
entries = getTxtFileEntries(ingestedFile);
fileType = StringUtils.containsIgnoreCase(ingestedFile.getMetaData().getName(), FileType.FULL.name())? FileType.FULL : FileType.DELTA;
}
CanonicalFileMetaData metaData = CanonicalFileMetaData.builder()
.name(ingestedFile.getMetaData().getName())
.dateTime(ingestedFile.getMetaData().getDateTime())
.extension(ingestedFile.getMetaData().getExtension())
.build();
return new IbanPlusFile(metaData, entries, fileType);
}
private List<IbanPlusEntry> getFileEntries(Dataexport dataexport) {
return dataexport.getIbanplusV3s()
.stream()
.map(ibanplusV3 -> IbanPlusEntry.builder()
.modificationFlag(ibanplusV3.getModificationFlag())
.ibanBic(ibanplusV3.getIbanBic())
.ibanIsoCountryCode(ibanplusV3.getIbanIsoCountryCode())
.isoCountryCode(ibanplusV3.getIsoCountryCode())
.ibanNationalId(ibanplusV3.getIbanNationalId())
.institutionName(ibanplusV3.getInstitutionName())
.routingBic(ibanplusV3.getRoutingBic())
.serviceContext(ibanplusV3.getServiceContext())
.build())
.collect(Collectors.toList());
}
private List<IbanPlusEntry> getTxtFileEntries(IngestedFile ingestedFile) {
String rawText = parseRootObjectTxt(ingestedFile);
List<String> textRows = List.of(rawText.split(System.lineSeparator()));
String resultsRow = textRows.stream()
.findFirst()
.orElseThrow(() -> new IconRuntimeException(String.format("Row missing or incomplete in ingested file: %s", ingestedFile.getMetaData().getName())));
List<String> entryLines = textRows.subList(textRows.indexOf(resultsRow) + 1, textRows.size());
return entryLines.stream()
.map(ibanPlusLineParser::parseLine)
.filter(Optional::isPresent)
.map(Optional::get)
.collect(Collectors.toList());
}
@SneakyThrows
private Dataexport parseRootObjectXml(IngestedFile ingestedFile) {
InputStream inputStream = new ByteArrayInputStream(ingestedFile.getContent());
return xmlObjectMapper.readValue(inputStream, Dataexport.class);
}
@SneakyThrows
private static String parseRootObjectTxt(IngestedFile ingestedFile) {
InputStream inputStream = new ByteArrayInputStream(ingestedFile.getContent());
return IOUtils.toString(inputStream, StandardCharsets.UTF_8);
}
}
@Bean
IbanPlusFileConverter ibanPlusFileConverter(XmlMapper xmlObjectMapper, IbanPlusLineParser ibanPlusLineParser) {
return new IbanPlusFileConverter(xmlObjectMapper, ibanPlusLineParser);
}
Procesador de Archivos
Divide el archivo que se está procesando en entradas individuales (configuraciones) que pueden ser procesadas por el OutputEntryHandler.
@Slf4j
public class IbanPlusFileProcessor extends AbstractSwiftFileProcessor<IbanPlusFile, IbanPlusEntry, IbanPlus> {
private final IbanPlusQuery ibanPlusQuery;
public IbanPlusFileProcessor(OutputEntryHandler<IbanPlusEntry> entryHandler,
IbanPlusQuery ibanPlusQuery,
SettingToCanonicalFileEntryConverter<IbanPlus> settingToCanonicalFileEntryConverter,
SettingQuery settingQuery,
Integer limitRate,
Integer parallelism,
Duration throttleDuration,
Integer readQueryBatchSize,
ClassicActorSystemProvider actorSystem,
EventBus eventBus) {
super(entryHandler, settingToCanonicalFileEntryConverter,
limitRate, parallelism, throttleDuration, readQueryBatchSize,
settingQuery, actorSystem, eventBus);
this.ibanPlusQuery = ibanPlusQuery;
}
@Override
protected String getPersistenceId(IbanPlusEntry entry) {
return IdProvider.getPersistenceId("ibanplus", getSettingId(entry));
}
@Override
protected String getSettingId(IbanPlusEntry entry) {
return entry.getIsoCountryCode() + "-" + entry.getIbanIsoCountryCode() + "-" + entry.getIbanNationalId();
}
@Override
protected boolean isFullFileImport(IbanPlusFile aFile) {
return aFile.getFileType() == FileType.FULL;
}
@Override
protected CompletionStage<Response<SettingsDTO<IbanPlus>>> getBatchedSettings(List<String> idList, Integer paginationSize) {
return ibanPlusQuery.getBatchIbanPlus(idList, paginationSize);
}
@Override
protected boolean shouldBeDeleted(IbanPlusEntry entry) {
return entry.getModificationFlag().equals("D");
}
@Override
protected boolean shouldBeUpserted(IbanPlusEntry entry) {
return entry.getModificationFlag().equals("A") || entry.getModificationFlag().equals("M");
}
}
@Bean
FileProcessor<IbanPlusFile, IbanPlusEntry> ibanPlusFileProcessor(OutputEntryHandler<IbanPlusEntry> ibanPlusEntryHandler,
IbanPlusQuery ibanPlusQuery,
SettingToCanonicalFileEntryConverter<IbanPlus> settingToCanonicalFileEntryConverter,
SettingQuery settingQuery,
@Value("${ipf.csm-reachability.settings-api.limit-rate}") Integer limitRate,
@Value("${ipf.csm-reachability.settings-api.iban-plus.parallelism}") Integer parallelism,
@Value("${ipf.csm-reachability.settings-api.throttle-duration}") Duration throttleDuration,
@Value("${ipf.csm-reachability.settings-api.read-query-batch-size.iban-plus-processor}") Integer readQueryBatchSize,
EventBus eventBus) {
return new IbanPlusFileProcessor(ibanPlusEntryHandler, ibanPlusQuery, settingToCanonicalFileEntryConverter, settingQuery, limitRate, parallelism, throttleDuration, readQueryBatchSize, actorSystem, eventBus);
}
@Bean
FilePostProcessor ibanPlusFilePostProcessor(SettingQuery settingQuery,
IbanPlusQuery ibanPlusQuery,
@Value("${ipf.csm-reachability.settings-api.limit-rate}") Integer limitRate,
@Value("${ipf.csm-reachability.settings-api.throttle-duration}") Duration throttleDuration) {
return new IbanPlusFilePostProcessor(ibanPlusQuery, settingQuery, limitRate, throttleDuration, actorSystem);
}
Definición del Proceso
Asocia el tipo de archivo canónico a un procesador de archivos y a un convertidor de archivos, así como opcionalmente proporciona una condición de predicado sobre cuándo procesar desde la fuente de ingestión.
@Bean
ProcessDefinition<IbanPlusFile, IbanPlusEntry> ibanPlusFileProcessDefinition(FileProcessor<IbanPlusFile, IbanPlusEntry> ibanPlusFileProcessor,
FilePostProcessor ibanPlusFilePostProcessor,
IbanPlusFileConverter ibanPlusFileConverter) {
return ProcessDefinition.<IbanPlusFile, IbanPlusEntry>builder()
.processName("IbanPlus Ingestion File Process")
.jobName("IbanPlus import")
.processGate(basicFileMetaData -> ValidateFileUtils.validateFileType(basicFileMetaData.getExtension()))
.fileProcessor(ibanPlusFileProcessor)
.filePostProcessor(ibanPlusFilePostProcessor)
.converter(ibanPlusFileConverter)
.build();
}
Manejador de Salida
Punto de integración entre la ingestión de archivos y setting management componentes. Invoca el Setting Management API para crear una configuración en el Dynamic Processing Settings plataforma (DPS).
La configuración para la comunicación directa con DPS(DPS is embedded dentro de la aplicación):
ipf.dps-api.client-type="direct"
La configuración para http comunicación con DPS(DPS es una aplicación independiente):
ipf.dps-api {
client-type = "connector"
http.client {
host = "localhost"
endpoint-url = "/settings-objects/"
port = 8080
}
}
| Para comunicarse con el Setting Management API over HTTP, todos relacionados HTTP Las configuraciones del cliente se encuentran en el ipf.dps-api.http y ipf.dps-api.default-connector configuraciones. |
@Bean
@ConditionalOnProperty(name = "ipf.csm-reachability.settings-api.file-handling.connection", havingValue = "http")
OutputEntryHandler outputEntryHandlerViaHttp(DpsCrudClientPort dpsCrudClientPort,
DomainToTargetTypeConverter<CanonicalFileEntry, CreateSetting<?>> canonicalEntryToSetting) {
log.info("Using http transport");
return new CanonicalEntryHandlerViaHttp<>(dpsCrudClientPort, canonicalEntryToSetting);
}
@Bean
@ConditionalOnProperty(name = "ipf.csm-reachability.settings-api.file-handling.connection", havingValue = "direct")
OutputEntryHandler outputEntryHandlerDirect(ClassicActorSystemProvider actorSystem,
DomainToTargetTypeConverter<CanonicalFileEntry, CreateSetting<?>> canonicalEntryToSetting,
List<SettingDefinition<?>> settingDefinitions, DpsCrudClientPort dpsCrudClientPort) {
log.info("Using direct transport");
CanonicalEntryHandlerDirect.Settings settings = CanonicalEntryHandlerDirect.Settings.from(actorSystem);
return new CanonicalEntryHandlerDirect<>(actorSystem, settings, canonicalEntryToSetting, settingDefinitions, dpsCrudClientPort);
}
La configuración puede ser cargada a través de archivos, por ejemplo, IBANPlus, IBANStructure. Pero también puede ser consumida a través de HTTP/REST como es el caso con SIC
Ejemplo de configuración para consumir archivos del directorio local:
ipf.csm-reachability {
participant {
tips {
process-participant.enabled = true
file-ingestion {
files-directory = "/import/csm-participant/eurosystem-tips-directory"
directory-id = "TIPS"
initial-delay = 5s
interval = 30s
}
}
rt1 {
process-participant.enabled = true
file-ingestion {
files-directory = "/import/csm-participant/eba-rt1-routing-tables"
directory-id = "RT1"
initial-delay = 5s
interval = 30s
}
}
}
iban-plus {
enabled = true
file-ingestion {
files-directory = "IBANPLUS"
files-directory = "/import/iban-plus-directory/swiftref-iban-plus"
initial-delay = 5s
interval = 30s
}
}
iban-structure {
enabled = true
file-ingestion {
files-directory = "/import/iban-plus-directory/swiftref-iban-structure"
directory-id = "iban-structure"
initial-delay = 5s
interval = 1h
}
}
party-entity.six-bankmaster {
enabled = true
file-ingestion {
directory-id = "party-entity-bank-master"
files-directory = "/import/party-entity-directory/six-bank-master-3.0/"
initial-delay = 5s
interval = 30s
}
}
party-entity.swift-bankplus {
enabled = true
file-ingestion {
directory-id = "party-entity-swift"
files-directory = "/import/party-entity-directory/swiftref-bank-directory-plus"
initial-delay = 5s
interval = 30s
}
}
exclusion-list {
enabled = true
file-ingestion {
files-directory = "/import/iban-plus-directory/swiftref-exclusion-list"
directory-id = "exclusion-list"
initial-delay = 5s
interval = 30s
}
}
bic-dir-2018 {
enabled = true
file-ingestion {
files-directory = "/import/bic-dir-2018/swiftref-bic-dir-2018"
directory-id = "BICDIR2018"
initial-delay = 5s
interval = 30s
}
}
party-entity.identifiers {
enabled = true
file-ingestion {
directory-id = "party-entity-identifiers"
files-directory = "/import/party-entity-directory/party-entity-identifiers/"
initial-delay = 5s
interval = 30s
}
}
iban-structure.formats {
enabled = true
file-ingestion {
directory-id = "iban-structure-formats"
files-directory = "import/iban-plus-directory/iban-structure-formats/"
initial-delay = 5s
interval = 30s
}
}
}
Parámetro de Configuración |
Descripción |
directorio-de-archivos |
Ubicación en el directorio local para sondear archivos |
directory-id |
Esto debe coincidir con una entrada de directoryID en el directorio.mapping configuración |
retraso-inicial |
Retraso antes de la primera encuesta |
intervalo |
Tiempo entre sondeos para archivos. Evite valores muy pequeños para el intervalo, ya que pueden provocar condiciones de carrera en los hilos. |
Ejemplo de configuración para consumir archivos de Participante SIC a través de HTTP/REST:
ipf.csm-reachability.participant.sic {
process-participant {
# Specify where from SIC Participants can be fetched and processed.
# Two modes are available: `http` and `file-ingestion`. By default,
# http mode is enabled. Both modes are configured under `sic-participant` configuration section.
mode = http
# Enables SIC Participant processing by setting `true`
# or disables it by setting `false` flag.
enabled = true
}
resiliency-settings {
minimum-number-of-calls = 10
max-attempts = 10
reset-timeout = 3s
initial-retry-wait-duration = 1s
backoff-multiplier = 2
}
# Configures HTTP client how to talk to six group bank master API
http {
client {
endpoint-url = "https://api.six-group.com/api/epcd/bankmaster/v2/public"
}
}
}
Ejemplo de configuración para consumir archivos de Participantes SIC a través de la ingestión de archivos mediante un directorio local:
ipf.csm-reachability.participant.sic {
process-participant {
# Specify where from SIC Participants can be fetched and processed.
# Two modes are available: `http` and `file-ingestion`. By default,
# http mode is enabled. Both modes are configured under `sic-participant` configuration section.
mode = file-ingestion
# Enables SIC Participant processing by setting `true`
# or disables it by setting `false` flag.
enabled = true
}
resiliency-settings {
minimum-number-of-calls = 10
max-attempts = 10
reset-timeout = 3s
initial-retry-wait-duration = 1s
backoff-multiplier = 2
}
# Configures ingesting SIC participants through a file
file-ingestion {
files-directory = "/import/csm-participant/six-bank-master-3.0/"
directory-id = "SIC"
initial-delay = 5s
interval = 30s
}
}
El scheduling configuraciones para HTTP se configuran por separado según:
ipf.csm-reachability.sic.scheduler-settings {
initial-delay = 10s
interval = 1d
}
Habilitar/Deshabilitar el procesamiento de archivos
Parámetro de configuración obsoleto |
Parámetro de configuración |
Descripción |
Valor predeterminado |
swift.ibanplus.process-ibanplus.enabled |
|
Habilite el procesamiento de Swift Archivo IBAN Plus |
|
ibanstructure.process-ibanstructure.enabled |
|
Habilite el procesamiento de Swift Archivo de estructura de IBAN |
|
swift.bankplus.procesar-directorio-bancario-plus.habilitado |
|
Habilite el procesamiento de Swift Archivo del Directorio Bancario Plus |
|
exclusionlist.procesar-exclusionlist.habilitado |
|
Habilitar el procesamiento de Swift Archivo de la Lista de Exclusión |
|
bic-dir-2018.proceso-bics.habilitado |
|
Habilite el procesamiento de Swift Archivo BICDir2018 |
|
party-entity.six.bankmaster.procesar-maestro-bancario.habilitado |
|
Habilite el procesamiento del archivo maestro de Six Bank |
|
tips.process-participant.enabled |
|
Habilite el procesamiento del archivo de participantes de TIPS |
|
rt1.proceso-participante.habilitado |
|
Habilite el procesamiento del archivo de participantes RT1 |
|
paso2.participante-proceso.habilitado |
|
Habilite el procesamiento del archivo de participantes de STEP2 |
|
sic.proces-participante.habilitado |
|
Habilite el procesamiento del archivo de participantes SIC |
|
no propiedad obsoleta |
|
Habilite el procesamiento del archivo IDENTIFICADORES-ALL de SwiftRef |
|
no propiedad obsoleta |
|
Habilite el procesamiento de archivos SwiftRef FORMATS-ALL/FORMATS-CTRY |
|
Límites de tasas de ingestión de archivos y configuración de limitación
Parámetros de configuración antiguos que estaban bajo settings-api están ahora obsoletos y scheduled para su eliminación en futuras versiones.
Esta configuración está ahora bajo ipf.csm-reachability.settings-api y sigue el estándar IPF.
|
Parámetro de Configuración |
Descripción |
Valor por defecto |
|
Limita el rendimiento a un número específico de registros consumidos. Cuando este valor se establece, |
|
|
Se utiliza con |
|
|
Número máximo de configuraciones a obtener de API llamada. |
1000 |
|
Número máximo de configuraciones a obtener de API llamada. |
1000 |
|
Número máximo de lotes paralelos API llamadas. |
1 |
|
Número máximo de configuraciones a obtener de API llamada. |
1000 |
|
Número máximo de lotes paralelos API llamadas. |
1 |
|
Número máximo de configuraciones a obtener de API llamada. |
1000 |
|
Número máximo de configuraciones a obtener de API llamada. |
1000 |
|
Número máximo de lotes paralelos API llamadas. |
1 |
|
Número máximo de configuraciones a obtener de API llamada. |
1000 |
|
Cola de búfer |
3000 |
|
Limita el rendimiento a un número específico de registros consumidos. Cuando este valor está establecido, |
0 |
|
Se utiliza con 'throttle-elements' para establecer la tasa máxima de consumo de registros. Cuando es 0, está desactivado. |
0 |
|
Número máximo de paralelos API llamadas. |
3 |
El csm-el-archivo-de-notificación-de-alcance-s3 está configurado de acuerdo con:
ipf.file-manager.s3 {
region = "us-east-1"
upload-parallelism = 1
credentials {
access-key-id = "accessKey"
secret-access-key = "secretAccessKey"
}
resiliency-settings {
# Determines the maximum number of retries to be made. Note that this includes the first failed attempt.
max-attempts = 2
# Retry if HTTP error code is in the list
retryable-status-codes = [500, 503]
attempt-timeout = 2s
call-timeout = 3s
}
}
ipf.file-manager.s3 {
region = "us-east-1"
upload-parallelism = 1
credentials {
access-key-id = "accessKey"
secret-access-key = "secretAccessKey"
}
resiliency {
retry = 3
api-call-timeout = 10s # duration
}
}
Servicio de notificación de ingestión de archivos configuración
El Servicio de Notificación de Ingesta de Archivos admite los siguientes formatos:
-
Participante de CSM - RT1 (Participantes Directos e Indirectos)
-
Participante de CSM - PASO2 SCT (Participantes Directos e Indirectos)
-
Participante de CSM - directorio TIPS
-
Participante de CSM - SicInst
-
Directorio de Entidades de Parte - Banco Master 3. 0
-
Directorio de Entidades de Parte - Directorio Bancario Plus
-
Directorio de Entidades de Parte - Identificadores de SwiftRef Todos
-
IBAN Plus - Directorio Iban Plus
-
Estructura del IBAN - Directorio Iban Plus
-
Estructura del IBAN - SwiftRef Formatos de Archivos
-
Lista de Exclusión - Directorio Iban Plus
-
Bic Dir 2018 - Bic Dir 2018
La función de ingestión de archivos desde AWS S3 puede ser habilitada/deshabilitada configurando ipf.csm-reachability.file-ingestion.s3.enabled propiedad a verdadero/falso.
Está configurado como falso por defecto.
Habilitar o deshabilitar la función de ingestión de archivos s3 no afecta la ingestión de archivos del directorio local.
La configuración para fileProcessedNotificationSendConnector y fileAvailableNotificationReceiveConnector puede ser anulada en las rutas respectivas,
ipf.csm-reachability.file-ingestion.notification-service.connector.file-processed`y `ipf.csm-reachability.file-ingestion.notification-service.connector.file-available.
El csm-el servicio de notificación de ingestión de archivos de alcanzabilidad está configurado de acuerdo con:
akka {
kafka {
consumer {
kafka-clients {
bootstrap.servers = "kafka:9092"
}
restart-settings = ${common-flow-restart-settings}
}
producer {
kafka-clients {
bootstrap.servers = "kafka:9092"
}
restart-settings = ${common-flow-restart-settings}
}
}
}
ipf.csm-reachability {
file-ingestion {
s3.enabled = false
notification-service {
kafka {
producer {
topic = FILE_PROCESSED_NOTIFICATION
restart-settings = ${common-flow-restart-settings}
kafka-clients {
group.id = file-processing-notification-group
}
}
consumer {
topic = FILE_AVAILABLE_NOTIFICATION
restart-settings = ${common-flow-restart-settings}
kafka-clients {
group.id = file-available-notification-group
}
}
}
default-file-ingestion {
# path which should be overriden
files-directory = "/import"
initial-delay = 5s
interval = 30s
timestamp-archived-and-failed-files = true
}
# directory id values for file ingestion. By default they are referencing respective values from local directory configuration
directory-ids {
RT02SCI = [${?ipf.csm-reachability.participant.rt1.file-ingestion.directory-id}]
S204SCT = [${?ipf.csm-reachability.participant.step2.file-ingestion.directory-id}]
TIPS = [${?ipf.csm-reachability.participant.tips.file-ingestion.directory-id}]
agreements = [${?ipf.csm-reachability.participant.stet.file-ingestion.directory-id}]
bankmaster = [
${?ipf.csm-reachability.party-entity.six-bankmaster.file-ingestion.directory-id},
${?ipf.csm-reachability.participant.sic.file-ingestion.directory-id}
]
IDENTIFIERS-ALL = [${?ipf.csm-reachability.party-entity.identifiers.file-ingestion.directory-id}]
FORMATS = [${?ipf.csm-reachability.iban-structure.formats.file-ingestion.directory-id}]
BANKDIRECTORYPLUS = [${?ipf.csm-reachability.party-entity.swift-bankplus.file-ingestion.directory-id}]
IBANPLUS = [${?ipf.csm-reachability.iban-plus.file-ingestion.directory-id}]
IBANSTRUCTURE = [${?ipf.csm-reachability.iban-structure.file-ingestion.directory-id}]
EXCLUSIONLIST = [${?ipf.csm-reachability.exclusion-list.file-ingestion.directory-id}]
BICDIR2018 = [${?ipf.csm-reachability.bic-dir-2018.file-ingestion.directory-id}]
}
}
}
}
common-flow-restart-settings {
min-backoff = 1s
max-backoff = 5s
random-factor = 0.25
max-restarts = 5
max-restarts-within = 10m
}