DPS Notificación al Cliente
DPS La función de Notificación al Cliente representa la forma de recibir dps notifications. Hasta ahora, admite la recepción del mensaje DpsCrudNotification.
Dentro de la dynamic-processing-settings-client-port módulo hay un DpsCrudNotificationHandlerPort interfaz que debe ser implementada.
public interface DpsCrudNotificationHandlerPort {
CompletionStage<Void> handle(DpsCrudNotification notification);
}
DPS Notificación al Cliente Kafka
Esto es el kafka implementación del dynamic-processing-settings-client-port módulo.
Contiene configuración para kafka receive connector transporte que se ve así:
ipf.dps-api {
client.notification {
kafka {
consumer {
topic = DPS_CRUD_NOTIFICATION
restart-settings = ${common-flow-restart-settings}
kafka-clients {
group.id = dps-crud-notification-group
}
}
}
}
}
common-flow-restart-settings {
min-backoff = 1s
max-backoff = 5s
random-factor = 0.25
max-restarts = 5
max-restarts-within = 10m
}
También contiene autoconfiguración con ReceiveConnector y ReceiveConnectorTransport.beans:
@Bean
ReceiveConnector<DpsCrudNotification> fileAvailableNotificationReceiveConnector(ReceiveConnectorTransport dpsCrudNotificationReceiveConnectorTransport,
ObjectProvider<MessageLogger> messageLoggerProvider,
ClassicActorSystemProvider actorSystem,
DpsCrudNotificationHandlerPort crudNotificationHandler) {
return ReceiveConnector.<DpsCrudNotification>builder("dpsCrudNotificationReceiveConnector", CONFIG_ROOT_PATH + ".connector.dps-crud", actorSystem)
.withConnectorTransport(dpsCrudNotificationReceiveConnectorTransport)
.withProcessingContextExtractor(tm -> ProcessingContext.of(UnitOfWorkId.createRandom()))
.withReceiveTransportMessageConverter(message -> SerializationHelper.stringToObject(message.getPayload().toString(), DpsCrudNotification.class))
.withReceiveHandler((receivingContext, message) -> crudNotificationHandler.handle(message))
.withManualStart(false)
.withMessageLogger(messageLoggerProvider.getIfAvailable(this::fallbackMessageLogger))
.build();
}
@Bean
ReceiveConnectorTransport dpsCrudNotificationReceiveConnectorTransport(ClassicActorSystemProvider actorSystem) {
Config config = AlpakkaConfigProvider.getConsumerConfig(actorSystem.classicSystem(), CONFIG_ROOT_PATH);
StringDeserializerConsumerConfig consumerConfig = new StringDeserializerConsumerConfig(config);
return KafkaAckReceiveConnectorTransport.<String, String>builder()
.withName("DpsCrudNotificationReceiveConnectorTransport")
.withTopics(consumerConfig.topic())
.withConsumerSettings(consumerConfig.consumerSettings())
.withRestartSettings(consumerConfig.restartSettings())
.withActorSystem(actorSystem)
.build();
}
private MessageLogger fallbackMessageLogger() {
return messageLogEntry -> CompletableFuture.completedFuture(null);
}
El usuario solo necesita incluir la siguiente dependencia:
<dependency>
<groupId>com.iconsolutions.ipf.core.dynamicsettings.v2</groupId>
<artifactId>dynamic-processing-settings-client-notification-kafka</artifactId>
<version>${icon-dynamic-processing-settings-api.version}</version>
</dependency>
y implemente el DpsCrudNotificationHandlerPort interfaz. Aquí está el ejemplo:
public class DpsCrudNotificationKafkaHandler implements DpsCrudNotificationHandlerPort {
@Override
public CompletionStage<Void> handle(DpsCrudNotification notification) {
return null;
}
}