Documentation for a newer release is available. View Latest

IPF System Event Exporter Spring Boot starter packs

Para estandarizar la manera en que exportamos eventos del sistema a consumidores fuera de la JVM de origen, el módulo ipf-system-events-exporter proporciona varios módulos Starter de Spring Boot (bibliotecas de Java que ofrecen autoconfiguraciones de Spring personalizables).

En este momento, el módulo padre ipf-system-events-exporter contiene los siguientes starters:

  • ipf-system-events-exporter-starter-core, que proporciona autoconfiguraciones que permiten a los usuarios de este módulo enviar eventos del sistema como JSON a un sistema externo proporcionando sus propios Connector Transports.

  • ipf-system-events-exporter-starter-jms, que proporciona autoconfiguraciones que permiten a los usuarios de este módulo enviar eventos del sistema como JSON a un destino JMS.

  • ipf-system-events-exporter-starter-kafka, que proporciona autoconfiguraciones que permiten a los usuarios de este módulo enviar eventos del sistema como JSON a un tópico de Kafka.

IPF System Event Exporter Starter Core

Este módulo ofrece a sus usuarios las capacidades básicas de exportación de eventos: de forma predeterminada, configura dos beans EventProcessor que exportan eventos del sistema de IPF a los registros (logs) y a un sistema externo proporcionado por el usuario.

Para usar el módulo, añade el siguiente fragmento de código a la sección de dependencias de tu pom.xml:

<dependency>
    <groupId>com.iconsolutions.ipf.core.systemevents</groupId>
    <artifactId>ipf-system-events-exporter-starter-core</artifactId>
    <version>${ipf-system-events.version}</version>
</dependency>

A menos que el envío a un sistema externo se haya deshabilitado, los usuarios del módulo deberán proporcionar un bean ConnectorTransport<IPFSystemEvent<?>> en sus configuraciones:

    @Configuration
    static class ProvideConnectorTransportConf {
        @Bean
        ConnectorTransport<IPFSystemEvent<?>> eventSendingConnectorTransport() {
            return new ConnectorTransport<IPFSystemEvent<?>>("name") {
                @Override
                public void startProcessingMessages(Integer maxConcurrentOffers) {

                }


                @Override
                public void startHealthCheck() {

                }

                @Override
                public CompletionStage<DeliveryOutcome> offer(MessageDelivery<IPFSystemEvent<?>> messageDelivery) {
                    // provide an actual implementation here
                    return CompletableFuture.completedStage(DeliveryOutcome.deliverySuccess(null));
                }
            };
        }
    }

Cómo personalizar la configuración incluida

La autoconfiguración proporcionada permite personalizaciones a través de dos enfoques principales:

  • sobrescribir la configuración predeterminada para deshabilitar ciertos exportadores

  • sobrescribir los beans predeterminados con los tuyos

Sobrescribir la configuración predeterminada

Configuración predeterminada incluida a continuación

ipf.system-events.exporter {
  # Allows users of this module to enable or disable system event exporting altogether
  enabled = true

  # Allows users of this module to enable or disable exporting of system events to logs
  slf4j.enabled = true

  # Allows users of this module to enable or disable exporting of system events to external systems (JMS, Kafaka, DB, etc)
  sending.enabled = true

  # Allows users to override the send connector configuration provided in `connector.default-send-connector`
  connector {
  }
}

Sobrescribir los beans predeterminados

Bean Name Descripción Ejemplo

eventSendingPredicate

Un predicado que elige qué eventos enviar a un endpoint externo. El predicado predeterminado acepta todos los eventos, por lo que los usuarios de este módulo deberían sobrescribir este bean si desean ser más selectivos en sus exportaciones.

    @Configuration
    static class OverridePredicateConf {
        @Bean
        Predicate<IPFSystemEvent<?>> eventSendingPredicate() {
            // provide an actual implementation here
            return evt -> true;
        }
    }

eventSendingMapper

Un transport mapper utilizado por el send connector predeterminado para convertir los eventos del sistema a un formato de transmisión aceptable por un sistema externo. El mapper predeterminado crea una representación JSON de los eventos y suministra el tipo Java del evento mediante el encabezado de mensaje EventType. Así que los usuarios de este módulo deberían sobrescribir este bean si su ConnectorTransport requiere un formato de carga útil diferente (por ejemplo, Protobuf, Avro, etc.) o si desean incluir encabezados adicionales en sus mensajes.

    @Configuration
    static class OverrideMapperConf {
        @Bean
        SendTransportMessageConverter<IPFSystemEvent<?>> eventSendingMapper() {
            // provide an actual implementation here
            return TransportMessage::new;
        }
    }

eventSendingConnector

Un SendingConnector utilizado para enviar los eventos del sistema a un sistema externo. El bean de send connector predeterminado espera que los usuarios de este módulo proporcionen un bean ConnectorTransport que realice el envío real. Sin embargo, las implementaciones de ConnectorTransport que vienen listas para usar con IPF solo soportan endpoints de tipo fire‑and‑forget, por lo que los usuarios deberían sobrescribir el SendingConnector en casos donde necesiten una comunicación de tipo request‑reply con el sistema externo.

    @Configuration
    static class OverrideSendingConnectorConf {
        @Bean
        SendingConnector<IPFSystemEvent<?>, ?> eventSendingConnector(
                ClassicActorSystemProvider actorSystem) {
            // provide an actual RequestReplySendConnector configuration here
            return new RequestReplySendConnector
                    .Builder<IPFSystemEvent<?>, IPFSystemEvent<?>, String, String>("name")
                    .withActorSystem(actorSystem)
                    .withCorrelationIdExtractor(systemEvent -> CorrelationId.of(systemEvent.getProcessingContext().getUnitOfWorkId().getValue()))
                    .withSendTransportMessageConverter(TransportMessage::new)
                    .withReceiveTransportMessageConverter(TransportMessage::toString)
                    .withConnectorTransport(transport())
                    .build();
        }

        private ConnectorTransport<IPFSystemEvent<?>> transport() {
            // provide an actual transport, e.g. RequestReplyHttpConnectorTransport
            return mock(ConnectorTransport.class, RETURNS_MOCKS);
        }
    }

IPF System Event Exporter Starter JMS

Este módulo incluye el módulo ipf-system-events-exporter-starter-core y proporciona a sus usuarios la implementación básica de ConnectorTransport para JMS.

Para usar el módulo, añade el siguiente fragmento de código a la sección de dependencias de tu pom.xml:

<dependency>
    <groupId>com.iconsolutions.ipf.core.systemevents</groupId>
    <artifactId>ipf-system-events-exporter-starter-jms</artifactId>
    <version>${ipf-system-events.version}</version>
</dependency>

Cómo personalizar la configuración incluida

La autoconfiguración proporcionada permite personalizaciones mediante dos enfoques principales:

  • sobrescribir la configuración predeterminada para deshabilitar ciertos exportadores

  • sobrescribir el bean ConnectionFactory de JMS predeterminado con uno propio

Sobrescribir la configuración predeterminada

Configuración predeterminada incluida a continuación.

ipf.system-events.exporter {
  # Used to select the type of exporter, in this case JMS
  type = jms

  # JMS transport-specific configuration
  jms {
    # The JMS queue to publish the system events onto
    queue = SystemEventQueue

    # The URL of the JMS broker that we're connecting to
    broker-url = "tcp://localhost:61616"

    # Sets the window size for flow control of the consumers created through this factory.
    # Value must be -1 (to disable flow control),
    # 0 (to not buffer any messages) 
    # or greater than 0 (to set the maximum size of the buffer).
    consumer-window-size = 0

    # Sets the blocking call failover timeout.
    # When the client is awaiting failover, this is over and above the normal call timeout.
    # Value must be greater or equals to -1, -1 means forever.
    failover-timeout = 5000
  }
}

Sobrescribir el bean ConnectionFactory predeterminado

Para sobrescribir el bean ConnectionFactory predeterminado, necesitas proporcionar una definición de bean similar a la que se lista a continuación (que muestra cómo configurar una ConnectionFactory básica de IBM MQ).

    @Configuration
    class ConnectionFactoryConfig {

        @Bean
        ConnectionFactory systemEventConnectionFactory(IbmMqConnectionFactoryProperties properties) {
            // provide the actual ConnectionFactory configuration here
            var connectionFactory = new MQQueueConnectionFactory();
            connectionFactory.setHostName(properties.getHost());
            connectionFactory.setTransportType(WMQConstants.WMQ_CM_CLIENT);
            connectionFactory.setCCSID(1208);
            connectionFactory.setChannel(properties.getChannel());
            connectionFactory.setPort(properties.getPort());
            connectionFactory.setQueueManager(properties.getQueueManager());
            return connectionFactory;
        }
    }

Cómo consumir los eventos publicados

Los eventos pueden consumirse de la misma manera que consumirías cualquier otro mensaje JMS, p. ej.

    static class TestConsumer {

        private static final BlockingQueue<ReceivedEvent> EVENTS = new ArrayBlockingQueue<>(10);

        private final ObjectMapper objectMapper = SerializationHelper.objectMapper();

        @SneakyThrows
        @JmsListener(destination = "SystemEventQueue")
        public void receiveEvents(@Payload String message, @Headers Map<String, Object> headers) {
            // read the Java type of the event from message headers
            var eventType = headers.get("EventType").toString(); 
            
            // use the eventType to deserialize the event into a proper POJO
            var event = objectMapper.readValue(message, Class.forName(eventType));
            
            // do something with the event here
            EVENTS.add(new ReceivedEvent(message, event));
        }
    }

IPF System Event Exporter Starter Kafka

Este módulo incluye el módulo ipf-system-events-exporter-starter-kafka y proporciona a sus usuarios la implementación básica de ConnectorTransport para Kafka.

Para usar el módulo, añade el siguiente fragmento de código a la sección de dependencias de tu pom.xml:

<dependency>
    <groupId>com.iconsolutions.ipf.core.systemevents</groupId>
    <artifactId>ipf-system-events-exporter-starter-kafka</artifactId>
    <version>${ipf-system-events.version}</version>
</dependency>

Cómo personalizar la configuración incluida

La autoconfiguración proporcionada permite personalizaciones mediante overrides de configuración externalizada.

Configuración predeterminada incluida a continuación.

ipf.system-events.exporter {
  # Used to select the type of exporter, in this case Kafka
  type = kafka

  # Kafka transport-specific configuration, by default most configuration is 
  kafka {
    producer {
      # The Kafka topic to publish the system events onto
      topic = SYSTEM_EVENTS
      
      # Restart options for the connector transport
      restart-settings = {
        min-backoff = 1s
        max-backoff = 5s
        random-factor = 0.25
        max-restarts = 5
        max-restarts-within = 10m
      }
      
      # Overrides to the akka.kafka.producer.kafka-clients defaults,
      # which are inherited by this producer
      kafka-clients {
      }
    }
  }
}

Cómo consumir los eventos publicados

Los eventos pueden consumirse de la misma manera que consumirías cualquier otro mensaje de Kafka, p. ej.

    static class TestConsumer {

        private static final BlockingQueue<ReceivedEvent> EVENTS = new ArrayBlockingQueue<>(10);

        private final ObjectMapper objectMapper = SerializationHelper.objectMapper();

        @SneakyThrows
        @KafkaListener(topics = "SYSTEM_EVENTS", groupId = "test")
        public void receiveEvents(@Payload String message, @Headers Map<String, Object> headers) {
            // read the Java type of the event from message headers
            var eventType = new String((byte[]) headers.get("EventType"));
            
            // use the eventType to deserialize the event into a proper POJO
            var event = objectMapper.readValue(message, Class.forName(eventType));
            
            // do something with the event here
            EVENTS.add(new ReceivedEvent(message, event));
        }
    }