IPF System Event Exportador Spring Boot paquetes de inicio

Con el fin de estandarizar la forma en que exportamos system events a consumidores fuera del origen JVM,ipf-system-events-exporter el módulo proporciona varios Spring Boot módulos de inicio (Java bibliotecas que proporcionan customisable Configuraciones automáticas de Spring).

En este momento,ipf-system-events-exporter el módulo principal contiene los siguientes iniciadores:

  • ipf-system-events-exporter-starter-core, proporcionando autoconfiguraciones que permiten a los usuarios de este módulo enviar system event as JSON a un sistema externo proporcionando su propio connector transport s.

  • ipf-system-events-exporter-starter-jms, proporcionando autoconfiguraciones que permiten a los usuarios de este módulo enviar system events as JSON a un JMS destino.

  • ipf-system-events-exporter-starter-kafka, proporcionando autoconfiguraciones que permiten a los usuarios de este módulo enviar system events as JSON a un Kafka tema.

IPF System Event Exportador Inicial Core

Este módulo proporciona a sus usuarios los conceptos básicos event-capacidades de exportación-por defecto, configura dos EventProcessor beans que exporta IPF system events a los registros y a un sistema externo proporcionado por el usuario.

Para utilizar el módulo, añada el siguiente fragmento de código a su pom.xml la sección de dependencias de

<dependency>
    <groupId>com.iconsolutions.ipf.core.systemevents</groupId>
    <artifactId>ipf-system-events-exporter-starter-core</artifactId>
    <version>${ipf-system-events.version}</version>
</dependency>

A menos que la exportación a un sistema externo haya sido deshabilitada, los usuarios del módulo deberán proporcionar un ConnectorTransport<IPFSystemEvent<?>> bean en sus configuraciones:

    @Configuration
    static class ProvideConnectorTransportConf {
        @Bean
        ConnectorTransport<IPFSystemEvent<?>> eventSendingConnectorTransport() {
            return new ConnectorTransport<IPFSystemEvent<?>>("name") {
                @Override
                public void startProcessingMessages(Integer maxConcurrentOffers) {

                }


                @Override
                public void startHealthCheck() {

                }

                @Override
                public CompletionStage<DeliveryOutcome> offer(MessageDelivery<IPFSystemEvent<?>> messageDelivery) {
                    // provide an actual implementation here
                    return CompletableFuture.completedStage(DeliveryOutcome.deliverySuccess(null));
                }
            };
        }
    }

Cómo customise configuración incluida

La autoconfiguración proporcionada permite customisations a través de dos enfoques principales:

  • anule la configuración predeterminada para deshabilitar ciertos exportadores

  • sobrescribir el predeterminado beans con su custom ones

Anulando la configuración predeterminada

Configuración predeterminada incluida a continuación

ipf.system-events.exporter {
  # Allows users of this module to enable or disable system event exporting altogether
  enabled = true

  # Allows users of this module to enable or disable exporting of system events to logs
  slf4j.enabled = true

  # Allows users of this module to enable or disable exporting of system events to external systems (JMS, Kafaka, DB, etc)
  sending.enabled = true

  # Allows users to override the send connector configuration provided in `connector.default-send-connector`
  connector {
  }
}

Sobrescribiendo el valor predeterminado beans

Bean Name Descripción Ejemplo

eventSendingPredicate

Un predicado que elige cuál events para enviar a un punto final externo. El predicado por defecto acepta todos events por lo que los usuarios de este módulo deben anular esto bean si desean ser más selectivos en sus exportaciones.

    @Configuration
    static class OverridePredicateConf {
        @Bean
        Predicate<IPFSystemEvent<?>> eventSendingPredicate() {
            // provide an actual implementation here
            return evt -> true;
        }
    }

eventSendingMapper

Un transporte mapper utilizado por el predeterminado send connector para convertir el system events en un formato de transferencia aceptable por un sistema externo. El predeterminado mapper crea un JSON representación de cadena del events y suministra el Java tipo de la event a través de la EventType encabezado del mensaje. Por lo tanto, los usuarios de este módulo deben anular esto.bean si su ConnectorTransport requiere un formato de carga diferente (por ejemplo,Protobuf,Avro, etc.) o si desean incluir encabezados adicionales en sus mensajes.

    @Configuration
    static class OverrideMapperConf {
        @Bean
        SendTransportMessageConverter<IPFSystemEvent<?>> eventSendingMapper() {
            // provide an actual implementation here
            return TransportMessage::new;
        }
    }

eventSendingConnector

A SendingConnector utilizado para enviar el system events a un sistema externo. El predeterminado send connector bean espera que los usuarios de este módulo proporcionen un ConnectorTransport bean que realiza el envío real. Sin embargo, el ConnectorTransport las implementaciones que vienen listas para usar con IPF solo admiten tipos de endpoints de "fire and forget", por lo que los usuarios deben anular el SendingConnector en los casos en que necesiten un tipo de comunicación de solicitud-respuesta con el sistema externo.

    @Configuration
    static class OverrideSendingConnectorConf {
        @Bean
        SendingConnector<IPFSystemEvent<?>, ?> eventSendingConnector(
                ClassicActorSystemProvider actorSystem) {
            // provide an actual RequestReplySendConnector configuration here
            return new RequestReplySendConnector
                    .Builder<IPFSystemEvent<?>, IPFSystemEvent<?>, String, String>("name")
                    .withActorSystem(actorSystem)
                    .withCorrelationIdExtractor(systemEvent -> CorrelationId.of(systemEvent.getProcessingContext().getUnitOfWorkId().getValue()))
                    .withSendTransportMessageConverter(TransportMessage::new)
                    .withReceiveTransportMessageConverter(TransportMessage::toString)
                    .withConnectorTransport(transport())
                    .build();
        }

        private ConnectorTransport<IPFSystemEvent<?>> transport() {
            // provide an actual transport, e.g. RequestReplyHttpConnectorTransport
            return mock(ConnectorTransport.class, RETURNS_MOCKS);
        }
    }

IPF System Event Exportador Inicial JMS

Este módulo incluye el ipf-system-events-exporter-starter-core módulo y proporciona a sus usuarios lo básico JMS ConnectorTransport implementación.

Para utilizar el módulo, añada el siguiente fragmento de código a su pom.xml la sección de dependencias de

<dependency>
    <groupId>com.iconsolutions.ipf.core.systemevents</groupId>
    <artifactId>ipf-system-events-exporter-starter-jms</artifactId>
    <version>${ipf-system-events.version}</version>
</dependency>

Cómo customise configuración incluida

La autoconfiguración proporcionada permite customisations a través de dos enfoques principales:

  • anule la configuración predeterminada para deshabilitar ciertos exportadores

  • sobrescribir el predeterminado JMS ConnectionFactory bean con su custom uno

Anulando la configuración predeterminada

Configuración predeterminada incluida a continuación.

ipf.system-events.exporter {
  # Used to select the type of exporter, in this case JMS
  type = jms

  # JMS transport-specific configuration
  jms {
    # The JMS queue to publish the system events onto
    queue = SystemEventQueue

    # The URL of the JMS broker that we're connecting to
    broker-url = "tcp://localhost:61616"

    # Sets the window size for flow control of the consumers created through this factory.
    # Value must be -1 (to disable flow control),
    # 0 (to not buffer any messages) 
    # or greater than 0 (to set the maximum size of the buffer).
    consumer-window-size = 0

    # Sets the blocking call failover timeout.
    # When the client is awaiting failover, this is over and above the normal call timeout.
    # Value must be greater or equals to -1, -1 means forever.
    failover-timeout = 5000
  }
}

Sobrescribiendo el valor predeterminado ConnectionFactory bean

Para anular el valor predeterminado ConnectionFactory bean, necesita proporcionar un bean definition similar al que se enumera a continuación (que muestra la configuración de un básico IBM MQ ConnectionFactory configuración).

    @Configuration
    class ConnectionFactoryConfig {

        @Bean
        ConnectionFactory systemEventConnectionFactory(IbmMqConnectionFactoryProperties properties) {
            // provide the actual ConnectionFactory configuration here
            var connectionFactory = new MQQueueConnectionFactory();
            connectionFactory.setHostName(properties.getHost());
            connectionFactory.setTransportType(WMQConstants. WMQ_CM_CLIENT);
            connectionFactory.setCCSID(1208);
            connectionFactory.setChannel(properties.getChannel());
            connectionFactory.setPort(properties.getPort());
            connectionFactory.setQueueManager(properties.getQueueManager());
            return connectionFactory;
        }
    }

Cómo consumir publicado events

Events puede ser consumido de la misma manera en que consumiría cualquier otro JMS mensaje, p. ej.
    static class TestConsumer {

        private static final BlockingQueue<ReceivedEvent> EVENTS = new ArrayBlockingQueue<>(10);

        private final ObjectMapper objectMapper = SerializationHelper.objectMapper();

        @SneakyThrows
        @JmsListener(destination = "SystemEventQueue")
        public void receiveEvents(@Payload String message, @Headers Map<String, Object> headers) {
            // read the Java type of the event from message headers
            var eventType = headers.get("EventType").toString(); 
            
            // use the eventType to deserialize the event into a proper POJO
            var event = objectMapper.readValue(message, Class.forName(eventType));
            
            // do something with the event here
            EVENTS.add(new ReceivedEvent(message, event));
        }
    }

IPF System Event Exportador Inicial Kafka

Este módulo incluye el ipf-system-events-exporter-starter-kafka módulo y proporciona a sus usuarios las funciones básicas Kafka ConnectorTransport implementación.

Para utilizar el módulo, añada el siguiente fragmento de código a su pom.xml la sección de dependencias de

<dependency>
    <groupId>com.iconsolutions.ipf.core.systemevents</groupId>
    <artifactId>ipf-system-events-exporter-starter-kafka</artifactId>
    <version>${ipf-system-events.version}</version>
</dependency>

Cómo customise configuración incluida

La autoconfiguración proporcionada permite customisations a través de configuraciones externalizadas que anulan.

Configuración predeterminada incluida a continuación.

ipf.system-events.exporter {
  # Used to select the type of exporter, in this case Kafka
  type = kafka

  # Kafka transport-specific configuration, by default most configuration is 
  kafka {
    producer {
      # The Kafka topic to publish the system events onto
      topic = SYSTEM_EVENTS
      
      # Restart options for the connector transport
      restart-settings = {
        min-backoff = 1s
        max-backoff = 5s
        random-factor = 0.25
        max-restarts = 5
        max-restarts-within = 10m
      }
      
      # Overrides to the akka.kafka.producer.kafka-clients defaults,
      # which are inherited by this producer
      kafka-clients {
      }
    }
  }
}

Cómo consumir publicado events

Events puede ser consumido de la misma manera en que consumiría cualquier otro Kafka mensaje, p. ej.
    static class TestConsumer {

        private static final BlockingQueue<ReceivedEvent> EVENTS = new ArrayBlockingQueue<>(10);

        private final ObjectMapper objectMapper = SerializationHelper.objectMapper();

        @SneakyThrows
        @KafkaListener(topics = "SYSTEM_EVENTS", groupId = "test")
        public void receiveEvents(@Payload String message, @Headers Map<String, Object> headers) {
            // read the Java type of the event from message headers
            var eventType = new String((byte[]) headers.get("EventType"));
            
            // use the eventType to deserialize the event into a proper POJO
            var event = objectMapper.readValue(message, Class.forName(eventType));
            
            // do something with the event here
            EVENTS.add(new ReceivedEvent(message, event));
        }
    }