IPF System Event Exporter Spring Boot starter packs

In order to standardise the way we export system events to consumers outside the originating JVM, ipf-system-events-exporter module provides several Spring Boot starter modules (Java libraries providing customisable Spring auto-configurations).

At the moment, ipf-system-events-exporter parent module contains the following starters:

  • ipf-system-events-exporter-starter-core, providing auto-configurations that enable this module’s users to send system event as JSON to an external system by providing their own connector transports.

  • ipf-system-events-exporter-starter-jms, providing auto-configurations that enable this module’s users to send system events as JSON to a JMS destination.

  • ipf-system-events-exporter-starter-kafka, providing auto-configurations that enable this module’s users to send system events as JSON to a Kafka topic.

IPF System Event Exporter Starter Core

This module provides its users with the basic event-exporting capabilities - by default, it sets up two EventProcessor beans that export IPF system events to logs and a user-provided external system.

To use the module, add the following code snippet to your pom.xml 's dependency section:

<dependency>
    <groupId>com.iconsolutions.ipf.core.systemevents</groupId>
    <artifactId>ipf-system-events-exporter-starter-core</artifactId>
    <version>${ipf-system-events.version}</version>
</dependency>

Unless the export to an external system has been disabled, the module users will need to provide a ConnectorTransport<IPFSystemEvent<?>> bean in their configurations:

    @Configuration
    static class ProvideConnectorTransportConf {
        @Bean
        ConnectorTransport<IPFSystemEvent<?>> eventSendingConnectorTransport() {
            return new ConnectorTransport<IPFSystemEvent<?>>("name") {
                @Override
                public void startProcessingMessages(Integer maxConcurrentOffers) {

                }


                @Override
                public void startHealthCheck() {

                }

                @Override
                public CompletionStage<DeliveryOutcome> offer(MessageDelivery<IPFSystemEvent<?>> messageDelivery) {
                    // provide an actual implementation here
                    return CompletableFuture.completedStage(DeliveryOutcome.deliverySuccess(null));
                }
            };
        }
    }

How to customise included configuration

The provided autoconfiguration allows customisations via two major approaches:

  • override the default configuration to disable certain exporters

  • override the default beans with your custom ones

Overriding the default configuration

Default configuration included below

ipf.system-events.exporter {
  # Allows users of this module to enable or disable system event exporting altogether
  enabled = true

  # Allows users of this module to enable or disable exporting of system events to logs
  slf4j.enabled = true

  # Allows users of this module to enable or disable exporting of system events to external systems (JMS, Kafaka, DB, etc)
  sending.enabled = true

  # Allows users to override the send connector configuration provided in `connector.default-send-connector`
  connector {
  }
}

Overriding the default beans

Bean Name Description Example

eventSendingPredicate

A predicate which chooses which events to send to an external endpoint. The default predicate accepts all events so users of this module should override this bean if they wish to be more selective in their exports.

    @Configuration
    static class OverridePredicateConf {
        @Bean
        Predicate<IPFSystemEvent<?>> eventSendingPredicate() {
            // provide an actual implementation here
            return evt -> true;
        }
    }

eventSendingMapper

A transport mapper used by the default send connector to convert the system events into a wire format acceptable by an external system. The default mapper creates a JSON string representation of the events and supplies the Java type of the event via the EventType message header. So, users of this module should override this bean if their ConnectorTransport requires a different payload format (e.g. Protobuf, Avro, etc.) or if they would like to include additional headers in their messages.

    @Configuration
    static class OverrideMapperConf {
        @Bean
        SendTransportMessageConverter<IPFSystemEvent<?>> eventSendingMapper() {
            // provide an actual implementation here
            return TransportMessage::new;
        }
    }

eventSendingConnector

A SendingConnector used to send the system events to an external system. The default send connector bean expects users of this module to provide a ConnectorTransport bean that performs the actual sending. However, the ConnectorTransport implementations that come out of the box with IPF only support fire and forget type of endpoints, so users should override the SendingConnector in cases where they need a request-reply type of communication with the external system.

    @Configuration
    static class OverrideSendingConnectorConf {
        @Bean
        SendingConnector<IPFSystemEvent<?>, ?> eventSendingConnector(
                ClassicActorSystemProvider actorSystem) {
            // provide an actual RequestReplySendConnector configuration here
            return new RequestReplySendConnector
                    .Builder<IPFSystemEvent<?>, IPFSystemEvent<?>, String, String>("name")
                    .withActorSystem(actorSystem)
                    .withCorrelationIdExtractor(systemEvent -> CorrelationId.of(systemEvent.getProcessingContext().getUnitOfWorkId().getValue()))
                    .withSendTransportMessageConverter(TransportMessage::new)
                    .withReceiveTransportMessageConverter(TransportMessage::toString)
                    .withConnectorTransport(transport())
                    .build();
        }

        private ConnectorTransport<IPFSystemEvent<?>> transport() {
            // provide an actual transport, e.g. RequestReplyHttpConnectorTransport
            return mock(ConnectorTransport.class, RETURNS_MOCKS);
        }
    }

IPF System Event Exporter Starter JMS

This module includes the ipf-system-events-exporter-starter-core module and provides its users with the basic JMS ConnectorTransport implementation.

To use the module, add the following code snippet to your pom.xml 's dependency section:

<dependency>
    <groupId>com.iconsolutions.ipf.core.systemevents</groupId>
    <artifactId>ipf-system-events-exporter-starter-jms</artifactId>
    <version>${ipf-system-events.version}</version>
</dependency>

How to customise included configuration

The provided autoconfiguration allows customisations via two major approaches:

  • override the default configuration to disable certain exporters

  • override the default JMS ConnectionFactory bean with your custom one

Overriding the default configuration

Default configuration included below.

ipf.system-events.exporter {
  # Used to select the type of exporter, in this case JMS
  type = jms

  # JMS transport-specific configuration
  jms {
    # The JMS queue to publish the system events onto
    queue = SystemEventQueue

    # The URL of the JMS broker that we're connecting to
    broker-url = "tcp://localhost:61616"

    # Sets the window size for flow control of the consumers created through this factory.
    # Value must be -1 (to disable flow control),
    # 0 (to not buffer any messages) 
    # or greater than 0 (to set the maximum size of the buffer).
    consumer-window-size = 0

    # Sets the blocking call failover timeout.
    # When the client is awaiting failover, this is over and above the normal call timeout.
    # Value must be greater or equals to -1, -1 means forever.
    failover-timeout = 5000
  }
}

Overriding the default ConnectionFactory bean

To override the default ConnectionFactory bean, you need to provide a bean definition similar to the one listed below (which showcases setting up a basic IBM MQ ConnectionFactory configuration).

    @Configuration
    class ConnectionFactoryConfig {

        @Bean
        ConnectionFactory systemEventConnectionFactory(IbmMqConnectionFactoryProperties properties) {
            // provide the actual ConnectionFactory configuration here
            var connectionFactory = new MQQueueConnectionFactory();
            connectionFactory.setHostName(properties.getHost());
            connectionFactory.setTransportType(WMQConstants.WMQ_CM_CLIENT);
            connectionFactory.setCCSID(1208);
            connectionFactory.setChannel(properties.getChannel());
            connectionFactory.setPort(properties.getPort());
            connectionFactory.setQueueManager(properties.getQueueManager());
            return connectionFactory;
        }
    }

How to consume published events

Events can be consumed in the same way you would consume any other JMS message, e.g.

    static class TestConsumer {

        private static final BlockingQueue<ReceivedEvent> EVENTS = new ArrayBlockingQueue<>(10);

        private final ObjectMapper objectMapper = SerializationHelper.objectMapper();

        @SneakyThrows
        @JmsListener(destination = "SystemEventQueue")
        public void receiveEvents(@Payload String message, @Headers Map<String, Object> headers) {
            // read the Java type of the event from message headers
            var eventType = headers.get("EventType").toString(); 
            
            // use the eventType to deserialize the event into a proper POJO
            var event = objectMapper.readValue(message, Class.forName(eventType));
            
            // do something with the event here
            EVENTS.add(new ReceivedEvent(message, event));
        }
    }

IPF System Event Exporter Starter Kafka

This module includes the ipf-system-events-exporter-starter-kafka module and provides its users with the basic Kafka ConnectorTransport implementation.

To use the module, add the following code snippet to your pom.xml 's dependency section:

<dependency>
    <groupId>com.iconsolutions.ipf.core.systemevents</groupId>
    <artifactId>ipf-system-events-exporter-starter-kafka</artifactId>
    <version>${ipf-system-events.version}</version>
</dependency>

How to customise included configuration

The provided autoconfiguration allows customisations via externalized configuration overrides.

Default configuration included below.

ipf.system-events.exporter {
  # Used to select the type of exporter, in this case Kafka
  type = kafka

  # Kafka transport-specific configuration, by default most configuration is 
  kafka {
    producer {
      # The Kafka topic to publish the system events onto
      topic = SYSTEM_EVENTS
      
      # Restart options for the connector transport
      restart-settings = {
        min-backoff = 1s
        max-backoff = 5s
        random-factor = 0.25
        max-restarts = 5
        max-restarts-within = 10m
      }
      
      # Overrides to the akka.kafka.producer.kafka-clients defaults,
      # which are inherited by this producer
      kafka-clients {
      }
    }
  }
}

How to consume published events

Events can be consumed in the same way you would consume any other Kafka message, e.g.

    static class TestConsumer {

        private static final BlockingQueue<ReceivedEvent> EVENTS = new ArrayBlockingQueue<>(10);

        private final ObjectMapper objectMapper = SerializationHelper.objectMapper();

        @SneakyThrows
        @KafkaListener(topics = "SYSTEM_EVENTS", groupId = "test")
        public void receiveEvents(@Payload String message, @Headers Map<String, Object> headers) {
            // read the Java type of the event from message headers
            var eventType = new String((byte[]) headers.get("EventType"));
            
            // use the eventType to deserialize the event into a proper POJO
            var event = objectMapper.readValue(message, Class.forName(eventType));
            
            // do something with the event here
            EVENTS.add(new ReceivedEvent(message, event));
        }
    }