Creando un Ingester de Configuraciones Personalizadas
Configuración del componente
Para ingerir/importar configuraciones en la plataforma de configuraciones de procesamiento dinámico, usted debe configurar lo siguiente:
-
Ingesta de Archivos Cluster Singleton
-
ReceiveConnector/SendConnector
-
Connector Transport
-
Convertidor de Archivos
-
Procesador de Archivos
-
Definición del Proceso
-
Manejador de Entradas
-
Habilitar/Deshabilitar el procesamiento de SWIFT archivos
Un ejemplo de configuración para la ingestión IbanPlus La configuración se incluye a continuación. Se necesitarían componentes similares si desea configurar la ingesta de configuraciones desde una nueva fuente.
Ingesta de Archivos Cluster Singleton
El clúster singleton acepta la configuración de reinicio de la singleton actor a través de la configuración. El clúster singleton acepta la configuración de reinicio de la singleton actor a través de la configuración.
ipf.csm-reachability.ingestion {
restart-settings {
min-backoff = 3s
max-backoff = 30s
random-factor = 0.25
max-restarts = 20
max-restarts-within = 30m
}
}
ReceiveConnector/SendConnector
Dependiendo de cómo se deba consumir la configuración, ya sea un SendConnector or a ReceiveConnector necesita ser definido. Al consumir archivos de un directorio local, un ReceiveConnector sería definido. Sin embargo, si consume configuraciones de un REST API, esto sería un SendConnector.
receiveConnector = ReceiveConnectorBuilderHelper.<IngestedFile>defaultBuilder(name + " File Ingestion Connector", actorSystem)
.withConnectorTransport(localDirectoryConnectorTransport)
.withReceiveTransportMessageConverter(ingestedFileReceiveTransportMessageConverter)
.withProcessingContextExtractor(connectorMessage -> InitiatingProcessingContextExtractor.<IngestedFile>builder().build().extract(connectorMessage))
.withReceiveHandler((aggregateId, payload) -> {
return processManager.process(payload)
.toFuture()
.whenComplete(((processedEntryResponses, throwable) -> {
if (throwable != null) {
log.error("Error occurred during processing of " + name + " File", throwable);
} else {
log.info("Successfully processed {} File", name);
}
})).thenApply(done -> null);
})
.withLoggingErrorHandler(exception -> {
log.error("Exception occurred during file ingestion for file : " + name, exception);
return CompletableFuture.completedStage(null);
})
.build();
Connector Transport
Una vez definido, deberá ser referenciado en el Conector. Soportado ConnectorTransports incluir,LocalDirectory(archivo), Http,Kafka y JMS.
Al definir un LocalDirectoryConnectorTransport, un nuevo directorio mapping se deberá añadir la entrada:
ipf.file-ingestion.directory-mappings += {
directory-id = "your-ingester-id" (1)
# has to match the job name used in the ProcessDefinition below
job-name = "your-ingester-job-name" (2)
}
<1>`directory-id` representa un identificador de directorio de la configuración del LocalDirectoryConnectorTransport.
<2>`job-name` representa el nombre del trabajo asociado con el ProcessDefinition#jobName
@Bean
LocalDirectoryConnectorTransport ibanPlusLocalDirectoryConnectorTransport(EventBus eventBus,
DirectoryConfiguration directoryConfiguration,
List<ProcessDefinition<? extends CanonicalFile, ? extends CanonicalFileEntry>> processDefinitions) {
var errorHandler = new FileHandlerCommonIngestionDefaultReceiveErrorHandler(eventBus,
directoryConfiguration,
processDefinitions,
actorSystem,
FILE_INGESTION_CONFIG_ROOT_PATH);
return LocalDirectoryConnectorTransport.builder()
.withActorSystem(actorSystem)
.withConfigRootPath(FILE_INGESTION_CONFIG_ROOT_PATH)
.withFileIngestionConfiguration(FileIngestionConfiguration.create(
FILE_INGESTION_CONFIG_ROOT_PATH,
actorSystem.classicSystem().settings().config()
))
.withName("Iban Plus Local Directory Transport")
.withTransportErrorHandler(errorHandler)
.build();
}
Convertidor de Archivos
Esto es responsable de convertir la fuente de ingestión propietaria (por ejemplo, archivo) en una representación canónica (IbanPlusFile)
@RequiredArgsConstructor
public class IbanPlusFileConverter implements FileConverter<IbanPlusFile> {
private final XmlMapper xmlObjectMapper;
private final IbanPlusLineParser ibanPlusLineParser;
@Override
public IbanPlusFile convert(IngestedFile ingestedFile) {
List<IbanPlusEntry> entries;
FileType fileType;
if ("xml".equalsIgnoreCase(ingestedFile.getMetaData().getExtension())) {
Dataexport dataexport = parseRootObjectXml(ingestedFile);
entries = getFileEntries(dataexport);
fileType = StringUtils.equalsIgnoreCase(dataexport.getFiletype(), FileType.FULL.name()) ? FileType.FULL : FileType.DELTA;
} else {
entries = getTxtFileEntries(ingestedFile);
fileType = StringUtils.containsIgnoreCase(ingestedFile.getMetaData().getName(), FileType.FULL.name())? FileType.FULL : FileType.DELTA;
}
CanonicalFileMetaData metaData = CanonicalFileMetaData.builder()
.name(ingestedFile.getMetaData().getName())
.dateTime(ingestedFile.getMetaData().getDateTime())
.extension(ingestedFile.getMetaData().getExtension())
.build();
return new IbanPlusFile(metaData, entries, fileType);
}
private List<IbanPlusEntry> getFileEntries(Dataexport dataexport) {
return dataexport.getIbanplusV3s()
.stream()
.map(ibanplusV3 -> IbanPlusEntry.builder()
.modificationFlag(ibanplusV3.getModificationFlag())
.ibanBic(ibanplusV3.getIbanBic())
.ibanIsoCountryCode(ibanplusV3.getIbanIsoCountryCode())
.isoCountryCode(ibanplusV3.getIsoCountryCode())
.ibanNationalId(ibanplusV3.getIbanNationalId())
.institutionName(ibanplusV3.getInstitutionName())
.routingBic(ibanplusV3.getRoutingBic())
.serviceContext(ibanplusV3.getServiceContext())
.build())
.collect(Collectors.toList());
}
private List<IbanPlusEntry> getTxtFileEntries(IngestedFile ingestedFile) {
String rawText = parseRootObjectTxt(ingestedFile);
List<String> textRows = List.of(rawText.split(System.lineSeparator()));
String resultsRow = textRows.stream()
.findFirst()
.orElseThrow(() -> new IconRuntimeException(String.format("Row missing or incomplete in ingested file: %s", ingestedFile.getMetaData().getName())));
List<String> entryLines = textRows.subList(textRows.indexOf(resultsRow) + 1, textRows.size());
return entryLines.stream()
.map(ibanPlusLineParser::parseLine)
.filter(Optional::isPresent)
.map(Optional::get)
.collect(Collectors.toList());
}
@SneakyThrows
private Dataexport parseRootObjectXml(IngestedFile ingestedFile) {
InputStream inputStream = new ByteArrayInputStream(ingestedFile.getContent());
return xmlObjectMapper.readValue(inputStream, Dataexport.class);
}
@SneakyThrows
private static String parseRootObjectTxt(IngestedFile ingestedFile) {
InputStream inputStream = new ByteArrayInputStream(ingestedFile.getContent());
return IOUtils.toString(inputStream, StandardCharsets.UTF_8);
}
}
@Bean
IbanPlusFileConverter ibanPlusFileConverter(XmlMapper xmlObjectMapper, IbanPlusLineParser ibanPlusLineParser) {
return new IbanPlusFileConverter(xmlObjectMapper, ibanPlusLineParser);
}
Procesador de Archivos
Divide el archivo que se está procesando en entradas individuales (configuraciones) que pueden ser procesadas por el OutputEntryHandler
@Slf4j
public class IbanPlusFileProcessor extends AbstractSwiftFileProcessor<IbanPlusFile, IbanPlusEntry, IbanPlus> {
private final IbanPlusQuery ibanPlusQuery;
public IbanPlusFileProcessor(OutputEntryHandler<IbanPlusEntry> entryHandler,
IbanPlusQuery ibanPlusQuery,
SettingQuery settingQuery,
Integer limitRate,
Integer parallelism,
Duration throttleDuration,
Integer readQueryBatchSize,
ClassicActorSystemProvider actorSystem,
EventBus eventBus,
IdMapper idMapper) {
super(entryHandler, limitRate, parallelism, throttleDuration, readQueryBatchSize, settingQuery, actorSystem, eventBus);
this.ibanPlusQuery = ibanPlusQuery;
this.idMapper = idMapper;
}
@Override
protected String getPersistenceId(IbanPlusEntry entry) {
return IdProvider.getPersistenceId("ibanplus", getSettingId(entry));
}
@Override
protected String getSettingId(IbanPlusEntry entry) {
return entry.getIsoCountryCode() + "-" + entry.getIbanIsoCountryCode() + "-" + entry.getIbanNationalId();
}
@Override
protected boolean isFullFileImport(IbanPlusFile aFile) {
return aFile.getFileType() == FileType.FULL;
}
@Override
protected CompletionStage<Response<SettingsDTO<IbanPlus>>> getBatchedSettings(List<String> idList, Integer paginationSize) {
return ibanPlusQuery.getBatchIbanPlus(idList, paginationSize);
}
@Override
protected boolean shouldBeDeleted(IbanPlusEntry entry) {
return entry.getModificationFlag().equals("D");
}
@Override
protected boolean shouldBeUpserted(IbanPlusEntry entry) {
return entry.getModificationFlag().equals("A") || entry.getModificationFlag().equals("M");
}
}
@Bean
FileProcessor<IbanPlusFile, IbanPlusEntry> ibanPlusFileProcessor(OutputEntryHandler<IbanPlusEntry> ibanPlusEntryHandler,
IbanPlusQuery ibanPlusQuery,
SettingQuery settingQuery,
@Value("${ipf.csm-reachability.settings-api.limit-rate}") Integer limitRate,
@Value("${ipf.csm-reachability.settings-api.iban-plus.parallelism}") Integer parallelism,
@Value("${ipf.csm-reachability.settings-api.throttle-duration}") Duration throttleDuration,
@Value("${ipf.csm-reachability.settings-api.read-query-batch-size.iban-plus-processor}") Integer readQueryBatchSize,
EventBus eventBus) {
return new IbanPlusFileProcessor(ibanPlusEntryHandler, ibanPlusQuery, settingQuery, limitRate, parallelism, throttleDuration, readQueryBatchSize, actorSystem, eventBus);
}
@Bean
FilePostProcessor ibanPlusFilePostProcessor(SettingQuery settingQuery,
IbanPlusQuery ibanPlusQuery,
@Value("${ipf.csm-reachability.settings-api.limit-rate}") Integer limitRate,
@Value("${ipf.csm-reachability.settings-api.throttle-duration}") Duration throttleDuration) {
return new IbanPlusFilePostProcessor(ibanPlusQuery, settingQuery, limitRate, throttleDuration, actorSystem);
}
Definición del Proceso
Asocia el tipo de archivo canónico a un procesador de archivos y a un convertidor de archivos, así como opcionalmente proporciona una condición de predicado sobre cuándo procesar desde la fuente de ingestión.
@Bean
ProcessDefinition<IbanPlusFile, IbanPlusEntry> ibanPlusFileProcessDefinition(FileProcessor<IbanPlusFile, IbanPlusEntry> ibanPlusFileProcessor,
FilePostProcessor ibanPlusFilePostProcessor,
IbanPlusFileConverter ibanPlusFileConverter) {
return ProcessDefinition.<IbanPlusFile, IbanPlusEntry>builder()
.processName("IbanPlus Ingestion File Process")
.jobName("IbanPlus import")
.processGate(basicFileMetaData -> ValidateFileUtils.validateFileType(basicFileMetaData.getExtension()))
.fileProcessor(ibanPlusFileProcessor)
.filePostProcessor(ibanPlusFilePostProcessor)
.converter(ibanPlusFileConverter)
.build();
}
Manejador de Salida
Punto de integración entre la ingestión de archivos y los componentes de gestión de configuración. Invoca el Setting Management API para crear una configuración en el Dynamic Processing Settings plataforma DPS).
La configuración para la comunicación directa con DPS(DPS está incrustado dentro de la aplicación):
ipf.dps-api.client-type="direct"
La configuración para la comunicación http con DPS(DPS es una aplicación independiente):
ipf.dps-api {
client-type = "connector"
http.client {
host = "localhost"
endpoint-url = "/settings-objects/"
port = 8080
}
}
IMPORTANTE: Para comunicarse con el Setting Management API over HTTP, todo relacionado HTTP Las configuraciones del cliente se encuentran en los ajustes ipf.dps-api.http e ipf.dps-api.default-connector.
@Bean
@ConditionalOnProperty(name = "ipf.file-ingestion.settings-api.file-handling.connection", havingValue = "http")
OutputEntryHandler outputEntryHandlerViaHttp(DpsCrudClientPort dpsCrudClientPort,
DomainToTargetTypeConverter<CanonicalFileEntry, CreateSetting<?>> canonicalEntryToSetting) {
log.info("Using http transport");
return new CanonicalEntryHandlerViaHttp<>(dpsCrudClientPort, canonicalEntryToSetting);
}
@Bean
@ConditionalOnProperty(name = "ipf.file-ingestion.settings-api.file-handling.connection", havingValue = "direct")
OutputEntryHandler outputEntryHandlerDirect(ClassicActorSystemProvider actorSystem,
DomainToTargetTypeConverter<CanonicalFileEntry, CreateSetting<?>> canonicalEntryToSetting,
List<SettingDefinition<?>> settingDefinitions, DpsCrudClientPort dpsCrudClientPort) {
log.info("Using direct transport");
CanonicalEntryHandlerDirect.Settings settings = CanonicalEntryHandlerDirect.Settings.from(actorSystem);
return new CanonicalEntryHandlerDirect<>(actorSystem, settings, canonicalEntryToSetting, settingDefinitions, dpsCrudClientPort);
}
La configuración puede ser cargada a través de archivos, por ejemplo, IBANPlus, IBANStructure. Pero también puede ser consumida a través de HTTP/REST como es el caso con SIC
Ejemplo de configuración para consumir archivos del directorio local:
ipf.csm-reachability {
participant {
tips {
process-participant.enabled = true
file-ingestion {
files-directory = "/import/csm-participant/eurosystem-tips-directory"
directory-id = "TIPS"
initial-delay = 5s
interval = 30s
}
}
rt1 {
process-participant.enabled = true
file-ingestion {
files-directory = "/import/csm-participant/eba-rt1-routing-tables"
directory-id = "RT1"
initial-delay = 5s
interval = 30s
}
}
}
iban-plus {
enabled = true
file-ingestion {
files-directory = "IBANPLUS"
files-directory = "/import/iban-plus-directory/swiftref-iban-plus"
initial-delay = 5s
interval = 30s
}
}
iban-structure {
enabled = true
file-ingestion {
files-directory = "/import/iban-plus-directory/swiftref-iban-structure"
directory-id = "iban-structure"
initial-delay = 5s
interval = 1h
}
}
party-entity.six-bankmaster {
enabled = true
file-ingestion {
directory-id = "party-entity-bank-master"
files-directory = "/import/party-entity-directory/six-bank-master-3.0/"
initial-delay = 5s
interval = 30s
}
}
party-entity.swift-bankplus {
enabled = true
file-ingestion {
directory-id = "party-entity-swift"
files-directory = "/import/party-entity-directory/swiftref-bank-directory-plus"
initial-delay = 5s
interval = 30s
}
}
exclusion-list {
enabled = true
file-ingestion {
files-directory = "/import/iban-plus-directory/swiftref-exclusion-list"
directory-id = "exclusion-list"
initial-delay = 5s
interval = 30s
}
}
bic-dir-2018 {
enabled = true
file-ingestion {
files-directory = "/import/bic-dir-2018/swiftref-bic-dir-2018"
directory-id = "BICDIR2018"
initial-delay = 5s
interval = 30s
}
}
party-entity.identifiers {
enabled = true
file-ingestion {
directory-id = "party-entity-identifiers"
files-directory = "/import/party-entity-directory/party-entity-identifiers/"
initial-delay = 5s
interval = 30s
}
}
iban-structure.formats {
enabled = true
file-ingestion {
directory-id = "iban-structure-formats"
files-directory = "import/iban-plus-directory/iban-structure-formats/"
initial-delay = 5s
interval = 30s
}
}
}
Parámetro de Configuración |
Descripción |
directorio-de-archivos |
Ubicación en el directorio local para sondear archivos |
directory-id |
Esto debería coincidir con un directoryID entrada en el directorio-mapping configuración |
retraso-inicial |
Retraso antes de la primera encuesta |
intervalo |
Tiempo entre sondeos para archivos. Evite valores muy pequeños para el intervalo, ya que pueden provocar condiciones de carrera en los hilos. |
Ejemplo de configuración para consumir archivos de Participante SIC a través de HTTP/REST:
ipf.csm-reachability.participant.sic {
process-participant {
# Specify where from SIC Participants can be fetched and processed.
# Two modes are available: `http` and `file-ingestion`. By default,
# http mode is enabled. Both modes are configured under `sic-participant` configuration section.
mode = http
# Enables SIC Participant processing by setting `true`
# or disables it by setting `false` flag.
enabled = true
}
resiliency-settings {
minimum-number-of-calls = 10
max-attempts = 10
reset-timeout = 3s
initial-retry-wait-duration = 1s
backoff-multiplier = 2
}
# Configures HTTP client how to talk to six group bank master API
http {
client {
endpoint-url = "https://api.six-group.com/api/epcd/bankmaster/v2/public"
}
}
}
Ejemplo de configuración para consumir archivos de Participante SIC a través de la ingestión de archivos mediante un directorio local:
ipf.csm-reachability.participant.sic {
process-participant {
# Specify where from SIC Participants can be fetched and processed.
# Two modes are available: `http` and `file-ingestion`. By default,
# http mode is enabled. Both modes are configured under `sic-participant` configuration section.
mode = file-ingestion
# Enables SIC Participant processing by setting `true`
# or disables it by setting `false` flag.
enabled = true
}
resiliency-settings {
minimum-number-of-calls = 10
max-attempts = 10
reset-timeout = 3s
initial-retry-wait-duration = 1s
backoff-multiplier = 2
}
# Configures ingesting SIC participants through a file
file-ingestion {
files-directory = "/import/csm-participant/six-bank-master-3.0/"
directory-id = "SIC"
initial-delay = 5s
interval = 30s
}
}
El scheduling configuraciones para HTTP se configuran por separado según:
ipf.csm-reachability.sic.scheduler-settings {
initial-delay = 10s
interval = 1d
}
Habilitar/Deshabilitar el procesamiento de archivos
Parámetro de configuración obsoleto |
Parámetro de configuración |
Descripción |
Valor predeterminado |
swift.ibanplus.process-ibanplus.enabled |
|
Habilite el procesamiento del archivo Swift IBAN Plus |
|
ibanstructure.process-ibanstructure.enabled |
|
Habilite el procesamiento del archivo de estructura de IBAN Swift |
|
swift.bankplus.process-bank-directory-plus.enabled |
|
Habilite el procesamiento del archivo Swift Bank Directory Plus |
|
exclusionlist.procesar-exclusiones.habilitado |
|
Habilite el procesamiento del archivo de la Lista de Exclusión de Swift |
|
bic-dir-2018.proceso-bics.habilitado |
|
Habilite el procesamiento del archivo Swift BICDir2018 |
|
party-entity.six.bankmaster.procesar-maestro-bancario.habilitado |
|
Habilite el procesamiento del archivo maestro de Six Bank |
|
tips.process-participant.enabled |
|
Habilite el procesamiento del archivo de participantes de TIPS |
|
rt1.participante-proceso.habilitado |
|
Habilite el procesamiento del archivo de participantes RT1 |
|
paso2.proceso-participante.habilitado |
|
Habilite el procesamiento del archivo de participantes de STEP2 |
|
sic.proceso-participante.habilitado |
|
Habilite el procesamiento del archivo de participantes SIC |
|
no hay propiedad obsoleta |
|
Habilite el procesamiento de SwiftRef IDENTIFICADORES-TODOS archivo |
|
no hay propiedad obsoleta |
|
Habilite el procesamiento de SwiftRef ARCHIVOS FORMATS-ALL/FORMATS-CTRY |
|
Límites de tasas de ingestión de archivos y configuración de limitación
Parámetros de configuración antiguos que estaban bajo settings-api están ahora obsoletos y programados para su eliminación en futuras versiones.
Esta configuración está ahora bajo ipf.csm-reachability.settings-api y sigue el estándar IPF.
|
Parámetros de configuración antiguos que estaban bajo ipf.csm-reachability.settings-api están ahora obsoletos y programados para su eliminación en futuras versiones.
Esta configuración está ahora bajo ipf.file-ingestion.settings-api y sigue el estándar IPF.
|
Parámetros de configuración antiguos que estaban bajo ipf.csm-reachability.default-file-ingestion están ahora obsoletos y programados para su eliminación en futuras versiones.
Esta configuración está ahora bajo ipf.file-ingestion.default-file-ingestion y sigue el estándar IPF.
|
Parámetro de Configuración |
Descripción |
Valor por defecto |
|
Limita el rendimiento a un número específico de registros consumidos. Cuando este valor se establece, |
|
|
Se utiliza con |
|
|
Número máximo de configuraciones a obtener de API llamada. |
1000 |
|
Número máximo de configuraciones a obtener de API llamada. |
1000 |
|
Número máximo de lotes paralelos API llamadas. |
1 |
|
Número máximo de configuraciones a obtener de API llamada. |
1000 |
|
Número máximo de lotes paralelos API llamadas. |
1 |
|
Número máximo de configuraciones a obtener de API llamada. |
1000 |
|
Número máximo de configuraciones a obtener de API llamada. |
1000 |
|
Número máximo de lotes paralelos API llamadas. |
1 |
|
Número máximo de configuraciones a obtener de API llamada. |
1000 |
|
Cola de búfer |
3000 |
|
Limita el rendimiento a un número específico de registros consumidos. Cuando este valor se establece, |
0 |
|
Se utiliza con 'throttle-elements' para establecer la tasa máxima de consumo de registros. Cuando es 0, entonces está desactivado. |
0 |
|
Número máximo de paralelos API llamadas. |
3 |
El archivo de notificación de accesibilidad csm-reachability-file-notification-s3 está configurado de acuerdo con:
ipf.file-manager.s3 {
region = "us-east-1"
upload-parallelism = 1
credentials {
access-key-id = "accessKey"
secret-access-key = "secretAccessKey"
}
resiliency-settings {
# Determines the maximum number of retries to be made. Note that this includes the first failed attempt.
max-attempts = 2
# Retry if HTTP error code is in the list
retryable-status-codes = [500, 503]
attempt-timeout = 2s
call-timeout = 3s
}
}
ipf.file-manager.s3 {
region = "us-east-1"
upload-parallelism = 1
credentials {
access-key-id = "accessKey"
secret-access-key = "secretAccessKey"
}
resiliency {
retry = 3
api-call-timeout = 10s # duration
}
}
Configuración del servicio de notificación de ingestión de archivos
El Servicio de Notificación de Ingesta de Archivos admite los siguientes formatos:
-
Participante de CSM - RT1 (Participantes Directos e Indirectos)
-
Participante de CSM - PASO2 SCT (Participantes Directos e Indirectos)
-
Participante de CSM - directorio TIPS
-
Participante de CSM - SicInst
-
Directorio de Entidades de Parte - Banco Master 3.0
-
Directorio de Entidades de Parte - Directorio Bancario Plus
-
Directorio de Entidades de Parte - SwiftRef Identificadores Todos
-
IBAN Plus - Directorio Iban Plus
-
Estructura del IBAN - Directorio Iban Plus
-
Estructura del IBAN - SwiftRef Formatea Archivos
-
Lista de Exclusión - Directorio Iban Plus
-
Bic Dir 2018 - Bic Dir 2018
La función de ingestión de archivos desde AWS S3 puede ser habilitada/deshabilitada configurando ipf.csm-reachability.file-ingestion.s3.enabled propiedad a verdadero/falso.
Está configurado como falso por defecto.
Habilitar o deshabilitar la función de ingestión de archivos s3 no afecta la ingestión de archivos del directorio local.
Configuración para fileProcessedNotificationSendConnector y fileAvailableNotificationReceiveConnector puede ser anulado en las rutas respectivas,
ipf.csm-reachability.file-ingestion.notification-service.connector.file-processed`y `ipf.csm-reachability.file-ingestion.notification-service.connector.file-available.
El servicio de notificación de ingestión de archivos de accesibilidad de CSM está configurado de acuerdo con:
akka {
kafka {
consumer {
kafka-clients {
bootstrap.servers = "kafka:9092"
}
restart-settings = ${common-flow-restart-settings}
}
producer {
kafka-clients {
bootstrap.servers = "kafka:9092"
}
restart-settings = ${common-flow-restart-settings}
}
}
}
ipf.csm-reachability {
file-ingestion {
s3.enabled = false
notification-service {
kafka {
producer {
topic = FILE_PROCESSED_NOTIFICATION
restart-settings = ${common-flow-restart-settings}
kafka-clients {
group.id = file-processing-notification-group
}
}
consumer {
topic = FILE_AVAILABLE_NOTIFICATION
restart-settings = ${common-flow-restart-settings}
kafka-clients {
group.id = file-available-notification-group
}
}
}
default-file-ingestion {
# path which should be overriden
files-directory = "/import"
initial-delay = 5s
interval = 30s
timestamp-archived-and-failed-files = true
}
# directory id values for file ingestion. By default they are referencing respective values from local directory configuration
directory-ids {
RT02SCI = [${?ipf.csm-reachability.participant.rt1.file-ingestion.directory-id}]
S204SCT = [${?ipf.csm-reachability.participant.step2.file-ingestion.directory-id}]
TIPS = [${?ipf.csm-reachability.participant.tips.file-ingestion.directory-id}]
agreements = [${?ipf.csm-reachability.participant.stet.file-ingestion.directory-id}]
bankmaster = [
${?ipf.csm-reachability.party-entity.six-bankmaster.file-ingestion.directory-id},
${?ipf.csm-reachability.participant.sic.file-ingestion.directory-id}
]
IDENTIFIERS-ALL = [${?ipf.csm-reachability.party-entity.identifiers.file-ingestion.directory-id}]
FORMATS = [${?ipf.csm-reachability.iban-structure.formats.file-ingestion.directory-id}]
BANKDIRECTORYPLUS = [${?ipf.csm-reachability.party-entity.swift-bankplus.file-ingestion.directory-id}]
IBANPLUS = [${?ipf.csm-reachability.iban-plus.file-ingestion.directory-id}]
IBANSTRUCTURE = [${?ipf.csm-reachability.iban-structure.file-ingestion.directory-id}]
EXCLUSIONLIST = [${?ipf.csm-reachability.exclusion-list.file-ingestion.directory-id}]
BICDIR2018 = [${?ipf.csm-reachability.bic-dir-2018.file-ingestion.directory-id}]
}
}
}
}
common-flow-restart-settings {
min-backoff = 1s
max-backoff = 5s
random-factor = 0.25
max-restarts = 5
max-restarts-within = 10m
}