IPF System Event Exporter Spring Boot starter packs
In order to standardise the way we export system events to consumers outside the originating JVM, ipf-system-events-exporter module provides several Spring Boot starter modules (Java libraries providing customisable Spring auto-configurations).
At the moment, ipf-system-events-exporter parent module contains the following starters:
-
ipf-system-events-exporter-starter-core, providing auto-configurations that enable this module’s users to send system event as JSON to an external system by providing their own connector transports. -
ipf-system-events-exporter-starter-jms, providing auto-configurations that enable this module’s users to send system events as JSON to a JMS destination. -
ipf-system-events-exporter-starter-kafka, providing auto-configurations that enable this module’s users to send system events as JSON to a Kafka topic.
IPF System Event Exporter Starter Core
This module provides its users with the basic event-exporting capabilities - by default, it sets up two EventProcessor beans that export IPF system events to logs and a user-provided external system.
To use the module, add the following code snippet to your pom.xml 's dependency section:
<dependency>
<groupId>com.iconsolutions.ipf.core.systemevents</groupId>
<artifactId>ipf-system-events-exporter-starter-core</artifactId>
<version>${ipf-system-events.version}</version>
</dependency>
Unless the export to an external system has been disabled, the module users will need to provide a ConnectorTransport<IPFSystemEvent<?>> bean in their configurations:
@Configuration
static class ProvideConnectorTransportConf {
@Bean
ConnectorTransport<IPFSystemEvent<?>> eventSendingConnectorTransport() {
return new ConnectorTransport<IPFSystemEvent<?>>("name") {
@Override
public void startProcessingMessages(Integer maxConcurrentOffers) {
}
@Override
public void startHealthCheck() {
}
@Override
public CompletionStage<DeliveryOutcome> offer(MessageDelivery<IPFSystemEvent<?>> messageDelivery) {
// provide an actual implementation here
return CompletableFuture.completedStage(DeliveryOutcome.deliverySuccess(null));
}
};
}
}
How to customise included configuration
The provided autoconfiguration allows customisations via two major approaches:
-
override the default configuration to disable certain exporters
-
override the default beans with your custom ones
Overriding the default configuration
Default configuration included below
ipf.system-events.exporter {
# Allows users of this module to enable or disable system event exporting altogether
enabled = true
# Allows users of this module to enable or disable exporting of system events to logs
slf4j.enabled = true
# Allows users of this module to enable or disable exporting of system events to external systems (JMS, Kafaka, DB, etc)
sending.enabled = true
# Allows users to override the send connector configuration provided in `connector.default-send-connector`
connector {
}
}
Overriding the default beans
| Bean Name | Description | Example |
|---|---|---|
|
A predicate which chooses which events to send to an external endpoint. The default predicate accepts all events so users of this module should override this bean if they wish to be more selective in their exports. |
|
|
A transport mapper used by the default send connector to convert the system events into a wire format acceptable by an external system.
The default mapper creates a JSON string representation of the events and supplies the Java type of the event via the |
|
|
A |
|
IPF System Event Exporter Starter JMS
This module includes the ipf-system-events-exporter-starter-core module and provides its users with the basic JMS ConnectorTransport implementation.
To use the module, add the following code snippet to your pom.xml 's dependency section:
<dependency>
<groupId>com.iconsolutions.ipf.core.systemevents</groupId>
<artifactId>ipf-system-events-exporter-starter-jms</artifactId>
<version>${ipf-system-events.version}</version>
</dependency>
How to customise included configuration
The provided autoconfiguration allows customisations via two major approaches:
-
override the default configuration to disable certain exporters
-
override the default JMS
ConnectionFactorybean with your custom one
Overriding the default configuration
Default configuration included below.
ipf.system-events.exporter {
# Used to select the type of exporter, in this case JMS
type = jms
# JMS transport-specific configuration
jms {
# The JMS queue to publish the system events onto
queue = SystemEventQueue
# The URL of the JMS broker that we're connecting to
broker-url = "tcp://localhost:61616"
# Sets the window size for flow control of the consumers created through this factory.
# Value must be -1 (to disable flow control),
# 0 (to not buffer any messages)
# or greater than 0 (to set the maximum size of the buffer).
consumer-window-size = 0
# Sets the blocking call failover timeout.
# When the client is awaiting failover, this is over and above the normal call timeout.
# Value must be greater or equals to -1, -1 means forever.
failover-timeout = 5000
}
}
Overriding the default ConnectionFactory bean
To override the default ConnectionFactory bean, you need to provide a bean definition similar to the one listed below (which showcases setting up a basic IBM MQ ConnectionFactory configuration).
@Configuration
class ConnectionFactoryConfig {
@Bean
ConnectionFactory systemEventConnectionFactory(IbmMqConnectionFactoryProperties properties) {
// provide the actual ConnectionFactory configuration here
var connectionFactory = new MQQueueConnectionFactory();
connectionFactory.setHostName(properties.getHost());
connectionFactory.setTransportType(WMQConstants.WMQ_CM_CLIENT);
connectionFactory.setCCSID(1208);
connectionFactory.setChannel(properties.getChannel());
connectionFactory.setPort(properties.getPort());
connectionFactory.setQueueManager(properties.getQueueManager());
return connectionFactory;
}
}
How to consume published events
Events can be consumed in the same way you would consume any other JMS message, e.g.
static class TestConsumer {
private static final BlockingQueue<ReceivedEvent> EVENTS = new ArrayBlockingQueue<>(10);
private final ObjectMapper objectMapper = SerializationHelper.objectMapper();
@SneakyThrows
@JmsListener(destination = "SystemEventQueue")
public void receiveEvents(@Payload String message, @Headers Map<String, Object> headers) {
// read the Java type of the event from message headers
var eventType = headers.get("EventType").toString();
// use the eventType to deserialize the event into a proper POJO
var event = objectMapper.readValue(message, Class.forName(eventType));
// do something with the event here
EVENTS.add(new ReceivedEvent(message, event));
}
}
IPF System Event Exporter Starter Kafka
This module includes the ipf-system-events-exporter-starter-kafka module and provides its users with the basic Kafka ConnectorTransport implementation.
To use the module, add the following code snippet to your pom.xml 's dependency section:
<dependency>
<groupId>com.iconsolutions.ipf.core.systemevents</groupId>
<artifactId>ipf-system-events-exporter-starter-kafka</artifactId>
<version>${ipf-system-events.version}</version>
</dependency>
How to customise included configuration
The provided autoconfiguration allows customisations via externalized configuration overrides.
Default configuration included below.
ipf.system-events.exporter {
# Used to select the type of exporter, in this case Kafka
type = kafka
# Kafka transport-specific configuration, by default most configuration is
kafka {
producer {
# The Kafka topic to publish the system events onto
topic = SYSTEM_EVENTS
# Restart options for the connector transport
restart-settings = {
min-backoff = 1s
max-backoff = 5s
random-factor = 0.25
max-restarts = 5
max-restarts-within = 10m
}
# Overrides to the akka.kafka.producer.kafka-clients defaults,
# which are inherited by this producer
kafka-clients {
}
}
}
}
How to consume published events
Events can be consumed in the same way you would consume any other Kafka message, e.g.
static class TestConsumer {
private static final BlockingQueue<ReceivedEvent> EVENTS = new ArrayBlockingQueue<>(10);
private final ObjectMapper objectMapper = SerializationHelper.objectMapper();
@SneakyThrows
@KafkaListener(topics = "SYSTEM_EVENTS", groupId = "test")
public void receiveEvents(@Payload String message, @Headers Map<String, Object> headers) {
// read the Java type of the event from message headers
var eventType = new String((byte[]) headers.get("EventType"));
// use the eventType to deserialize the event into a proper POJO
var event = objectMapper.readValue(message, Class.forName(eventType));
// do something with the event here
EVENTS.add(new ReceivedEvent(message, event));
}
}