Documentation for a newer release is available. View Latest

Notificación del Cliente DPS

La función DPS Client Notification representa la forma de recibir notificaciones de DPS. Actualmente soporta la recepción del mensaje DpsCrudNotification.

Dentro del módulo dynamic-processing-settings-client-port existe una interfaz DpsCrudNotificationHandlerPort que debe implementarse.

public interface DpsCrudNotificationHandlerPort {

    CompletionStage<Void> handle(DpsCrudNotification notification);
}

DPS Client Notification Kafka

Esta es la implementación Kafka del módulo dynamic-processing-settings-client-port.

Contiene configuración para el conector de recepción de transporte de Kafka que luce así:

ipf.dps-api {
  client.notification {
    kafka {
      consumer {
        topic = DPS_CRUD_NOTIFICATION
        restart-settings = ${common-flow-restart-settings}
        kafka-clients {
          group.id = dps-crud-notification-group
        }
      }
    }
  }
}
common-flow-restart-settings {
  min-backoff = 1s
  max-backoff = 5s
  random-factor = 0.25
  max-restarts = 5
  max-restarts-within = 10m
}

También contiene autoconfiguración con los beans ReceiveConnector y ReceiveConnectorTransport:

    @Bean
    ReceiveConnector<DpsCrudNotification> fileAvailableNotificationReceiveConnector(ReceiveConnectorTransport dpsCrudNotificationReceiveConnectorTransport,
                                                                                    ObjectProvider<MessageLogger> messageLoggerProvider,
                                                                                    ClassicActorSystemProvider actorSystem,
                                                                                    DpsCrudNotificationHandlerPort crudNotificationHandler) {
        return ReceiveConnector.<DpsCrudNotification>builder("dpsCrudNotificationReceiveConnector", CONFIG_ROOT_PATH + ".connector.dps-crud", actorSystem)
                .withConnectorTransport(dpsCrudNotificationReceiveConnectorTransport)
                .withProcessingContextExtractor(tm -> ProcessingContext.of(UnitOfWorkId.createRandom()))
                .withReceiveTransportMessageConverter(message -> SerializationHelper.stringToObject(message.getPayload().toString(), DpsCrudNotification.class))
                .withReceiveHandler((receivingContext, message) -> crudNotificationHandler.handle(message))
                .withManualStart(false)
                .withMessageLogger(messageLoggerProvider.getIfAvailable(this::fallbackMessageLogger))
                .build();
    }

    @Bean
    ReceiveConnectorTransport dpsCrudNotificationReceiveConnectorTransport(ClassicActorSystemProvider actorSystem) {
        Config config = AlpakkaConfigProvider.getConsumerConfig(actorSystem.classicSystem(), CONFIG_ROOT_PATH);
        StringDeserializerConsumerConfig consumerConfig = new StringDeserializerConsumerConfig(config);

        return KafkaAckReceiveConnectorTransport.<String, String>builder()
                .withName("DpsCrudNotificationReceiveConnectorTransport")
                .withTopics(consumerConfig.topic())
                .withConsumerSettings(consumerConfig.consumerSettings())
                .withRestartSettings(consumerConfig.restartSettings())
                .withActorSystem(actorSystem)
                .build();
    }

    private MessageLogger fallbackMessageLogger() {
        return messageLogEntry -> CompletableFuture.completedFuture(null);
    }

El usuario solo necesita añadir la siguiente dependencia:

<dependency>
    <groupId>com.iconsolutions.ipf.core.dynamicsettings.v2</groupId>
    <artifactId>dynamic-processing-settings-client-notification-kafka</artifactId>
    <version>${icon-dynamic-processing-settings-api.version}</version>
</dependency>

Y aplicar la interfaz DpsCrudNotificationHandlerPort. Aquí hay un ejemplo:

public class DpsCrudNotificationKafkaHandler implements DpsCrudNotificationHandlerPort {

    @Override
    public CompletionStage<Void> handle(DpsCrudNotification notification) {
        return null;
    }
}