Documentation for a newer release is available.
View Latest
Notificación del Cliente DPS
La función DPS Client Notification representa la forma de recibir notificaciones de DPS. Actualmente soporta la recepción del mensaje DpsCrudNotification.
Dentro del módulo dynamic-processing-settings-client-port existe una interfaz DpsCrudNotificationHandlerPort que debe implementarse.
public interface DpsCrudNotificationHandlerPort {
CompletionStage<Void> handle(DpsCrudNotification notification);
}
DPS Client Notification Kafka
Esta es la implementación Kafka del módulo dynamic-processing-settings-client-port.
Contiene configuración para el conector de recepción de transporte de Kafka que luce así:
ipf.dps-api {
client.notification {
kafka {
consumer {
topic = DPS_CRUD_NOTIFICATION
restart-settings = ${common-flow-restart-settings}
kafka-clients {
group.id = dps-crud-notification-group
}
}
}
}
}
common-flow-restart-settings {
min-backoff = 1s
max-backoff = 5s
random-factor = 0.25
max-restarts = 5
max-restarts-within = 10m
}
También contiene autoconfiguración con los beans ReceiveConnector y ReceiveConnectorTransport:
@Bean
ReceiveConnector<DpsCrudNotification> fileAvailableNotificationReceiveConnector(ReceiveConnectorTransport dpsCrudNotificationReceiveConnectorTransport,
ObjectProvider<MessageLogger> messageLoggerProvider,
ClassicActorSystemProvider actorSystem,
DpsCrudNotificationHandlerPort crudNotificationHandler) {
return ReceiveConnector.<DpsCrudNotification>builder("dpsCrudNotificationReceiveConnector", CONFIG_ROOT_PATH + ".connector.dps-crud", actorSystem)
.withConnectorTransport(dpsCrudNotificationReceiveConnectorTransport)
.withProcessingContextExtractor(tm -> ProcessingContext.of(UnitOfWorkId.createRandom()))
.withReceiveTransportMessageConverter(message -> SerializationHelper.stringToObject(message.getPayload().toString(), DpsCrudNotification.class))
.withReceiveHandler((receivingContext, message) -> crudNotificationHandler.handle(message))
.withManualStart(false)
.withMessageLogger(messageLoggerProvider.getIfAvailable(this::fallbackMessageLogger))
.build();
}
@Bean
ReceiveConnectorTransport dpsCrudNotificationReceiveConnectorTransport(ClassicActorSystemProvider actorSystem) {
Config config = AlpakkaConfigProvider.getConsumerConfig(actorSystem.classicSystem(), CONFIG_ROOT_PATH);
StringDeserializerConsumerConfig consumerConfig = new StringDeserializerConsumerConfig(config);
return KafkaAckReceiveConnectorTransport.<String, String>builder()
.withName("DpsCrudNotificationReceiveConnectorTransport")
.withTopics(consumerConfig.topic())
.withConsumerSettings(consumerConfig.consumerSettings())
.withRestartSettings(consumerConfig.restartSettings())
.withActorSystem(actorSystem)
.build();
}
private MessageLogger fallbackMessageLogger() {
return messageLogEntry -> CompletableFuture.completedFuture(null);
}
El usuario solo necesita añadir la siguiente dependencia:
<dependency>
<groupId>com.iconsolutions.ipf.core.dynamicsettings.v2</groupId>
<artifactId>dynamic-processing-settings-client-notification-kafka</artifactId>
<version>${icon-dynamic-processing-settings-api.version}</version>
</dependency>
Y aplicar la interfaz DpsCrudNotificationHandlerPort. Aquí hay un ejemplo:
public class DpsCrudNotificationKafkaHandler implements DpsCrudNotificationHandlerPort {
@Override
public CompletionStage<Void> handle(DpsCrudNotification notification) {
return null;
}
}