Documentation for a newer release is available. View Latest

DPS Client Notification

DPS Client Notification feature represents the way of receiving dps notifications. It supports receiving DpsCrudNotification message so far.

Within the dynamic-processing-settings-client-port module there is a DpsCrudNotificationHandlerPort interface that should be implemented.

public interface DpsCrudNotificationHandlerPort {

    CompletionStage<Void> handle(DpsCrudNotification notification);
}

DPS Client Notification Kafka

This is the kafka implementation of the dynamic-processing-settings-client-port module.

It contains configuration for kafka receive connector transport that looks like this:

ipf.dps-api {
  client.notification {
    kafka {
      consumer {
        topic = DPS_CRUD_NOTIFICATION
        restart-settings = ${common-flow-restart-settings}
        kafka-clients {
          group.id = dps-crud-notification-group
        }
      }
    }
  }
}
common-flow-restart-settings {
  min-backoff = 1s
  max-backoff = 5s
  random-factor = 0.25
  max-restarts = 5
  max-restarts-within = 10m
}

It also contains autoconfiguration with ReceiveConnector and ReceiveConnectorTransport beans:

    @Bean
    ReceiveConnector<DpsCrudNotification> fileAvailableNotificationReceiveConnector(ReceiveConnectorTransport dpsCrudNotificationReceiveConnectorTransport,
                                                                                    ObjectProvider<MessageLogger> messageLoggerProvider,
                                                                                    ClassicActorSystemProvider actorSystem,
                                                                                    DpsCrudNotificationHandlerPort crudNotificationHandler) {
        return ReceiveConnector.<DpsCrudNotification>builder("dpsCrudNotificationReceiveConnector", CONFIG_ROOT_PATH + ".connector.dps-crud", actorSystem)
                .withConnectorTransport(dpsCrudNotificationReceiveConnectorTransport)
                .withProcessingContextExtractor(tm -> ProcessingContext.of(UnitOfWorkId.createRandom()))
                .withReceiveTransportMessageConverter(message -> SerializationHelper.stringToObject(message.getPayload().toString(), DpsCrudNotification.class))
                .withReceiveHandler((receivingContext, message) -> crudNotificationHandler.handle(message))
                .withManualStart(false)
                .withMessageLogger(messageLoggerProvider.getIfAvailable(this::fallbackMessageLogger))
                .build();
    }

    @Bean
    ReceiveConnectorTransport dpsCrudNotificationReceiveConnectorTransport(ClassicActorSystemProvider actorSystem) {
        Config config = AlpakkaConfigProvider.getConsumerConfig(actorSystem.classicSystem(), CONFIG_ROOT_PATH);
        StringDeserializerConsumerConfig consumerConfig = new StringDeserializerConsumerConfig(config);

        return KafkaAckReceiveConnectorTransport.<String, String>builder()
                .withName("DpsCrudNotificationReceiveConnectorTransport")
                .withTopics(consumerConfig.topic())
                .withConsumerSettings(consumerConfig.consumerSettings())
                .withRestartSettings(consumerConfig.restartSettings())
                .withActorSystem(actorSystem)
                .build();
    }

    private MessageLogger fallbackMessageLogger() {
        return messageLogEntry -> CompletableFuture.completedFuture(null);
    }

User just needs to put in the following dependency:

<dependency>
    <groupId>com.iconsolutions.ipf.core.dynamicsettings.v2</groupId>
    <artifactId>dynamic-processing-settings-client-notification-kafka</artifactId>
    <version>${icon-dynamic-processing-settings-api.version}</version>
</dependency>

and implement the DpsCrudNotificationHandlerPort interface. Here is the example:

public class DpsCrudNotificationKafkaHandler implements DpsCrudNotificationHandlerPort {

    @Override
    public CompletionStage<Void> handle(DpsCrudNotification notification) {
        return null;
    }
}