Documentation for a newer release is available. View Latest
Esta página no está disponible actualmente en Español. Si lo necesita, póngase en contacto con el servicio de asistencia de Icon (correo electrónico)

Sending Connector Quickstart

This page explains details on how to get started with sending messages to external systems, using sending connectors provided by creating the connector library.

Dependencies

Before building a sending connector, the connector-core library must be included as a dependency.

<dependency>
    <groupId>com.iconsolutions.ipf.core.connector</groupId>
    <artifactId>connector-core</artifactId>
    <version>${connector.version}</version>
</dependency>

The latest version of the connector library can be found using this Nexus search.

Unless providing your own implementation, at least one transport library should be declared. The naming scheme for all transports included in the connector library is connector-[transport], where [transport] matches the transport scheme that this connector should use. For more details on connector transports check out the Connector Transports documentation.

Here’s an example of declaring the dependency to use JMS.

<dependency>
    <groupId>com.iconsolutions.ipf.core</groupId>
    <artifactId>connector-jms</artifactId>
    <version>${connector.version}</version>
</dependency>

Getting Started: Sending Connector

Sending connectors are used for sending messages to some destination, either as a reply to a previously received message or to make a request.

Builder Pattern

Sending connectors are instantiated using the builder pattern. This is because connectors have many parameters to configure and most are optional or have default values.

Let’s see how we use the builder pattern to instantiate a sending connector.

SendConnector<ExampleType, ExampleType> connector = SendConnector
        .<ExampleType, ExampleType>builder("ExampleSystem") (1)
        .withConnectorTransport(transport) (2)
        .withSendTransportMessageConverter(converter)(3)
        .withCorrelationIdExtractor(correlationIdExtractor) (4)
        .withCorrelationService(correlationService) (5)
        .withActorSystem(actorSystem) (6)
        .build();
1 Sets the name of the connector. The name should represent what the connector is connecting to.
2 Provides an implementation of the SendingConnectorTransport interface.
3 Provides an implementation of the SendTransportMessageConverter interface. Takes the message payload of type T (ExampleType in this instance) and converts it to a TransportMessage object.
4 Provides an implementation of the CorrelationIdExtractor interface. Takes the message payload and extracts (or generates) a correlation identifier to use when persisting the ProcessingContext via the correlation service.
5 Provides an implementation of the CorrelationService. The correlation service takes a ProcessingContext and a CorrelationId and persists them using the services' configured repository. This is used to correlate a request with its response which will be handled by a separate process.
6 Sets the actor system used throughout the application.

Start Sending Messages

At this point we should have successfully instantiated a SendConnector<ExampleType>, which can be used to send messages over the configured transport.

The SendingConnector interface defines two methods for sending messages, the signatures for these are.

public interface SendingConnector<D, R> extends OperableConnector {

    /**
     * Send a request via the connector without a supporting context
     */
    default CompletionStage<R> send(@NotNull ProcessingContext processingContext,
                                    @NotNull D domainMessage) {
        return send(processingContext, domainMessage, SupportingContext.empty());
    }

    /**
     * Send a request via the connector with a full sending context
     * that allows users to supply a supporting context as well
     */
    CompletionStage<R> send(@NotNull ProcessingContext processingContext,
                            @NotNull D domainMessage,
                            @NotNull SupportingContext supportingContext);

    /**
     * Allows setting of the meterRegistry implementation for a connector
     * after connector build
     * This allows implementations to set the registry for all SendingConnector instances
     * at once instead of having to set them at a per connector bases at build time
     */
    void setMeterRegistry(MeterRegistry meterRegistry);

    SendingConnectorConfig getConfig();
}

The generic domain message type D in this instance is our ExampleType. ExampleType has the following definition.

@Data
@AllArgsConstructor
@EqualsAndHashCode
public static class ExampleType implements UpdatebleProcessingContextHolder {
    private final String name;
    private final LocalDate dob;
    private final int shoeSize;
    private ProcessingContext processingContext;

    public ExampleType(String name, LocalDate dob, int shoeSize) {
        this(name, dob, shoeSize, null);
    }
}

We can now create an instance of the domain message and send it to the transport via the connector.

ExampleType message = new ExampleType(
        "Isaac Newton",
        LocalDate.of(1642, 12, 25),
        8
);
CompletionStage<DeliveryOutcome> future = connector.send(processingContext, message);

This returns a CompletionStage<DeliveryOutcome> which we can either block to await the DeliveryOutcome (not ideal), or use various methods on CompletionStage to perform other tasks asynchronously when the future completes.

Getting Started: Request-Reply Sending Connector

The default sending connector works in a one-way fashion where the response of the request is the delivery result. However, for some protocols (namely HTTP) it is desirable and expected that a well-formed reply would be returned from a request.

A RequestReplySendConnector is built similarly to other connectors, using the builder pattern.

RequestReplySendConnector<ExampleRequest, ExampleRequest, ExampleResponse, ExampleResponse> connector;
connector = RequestReplySendConnector
        .<ExampleRequest, ExampleRequest, ExampleResponse, ExampleResponse>builder("ExampleSystem")
        .withConnectorTransport(transport)
        .withCorrelationIdExtractor(correlationIdExtractor)
        .withSendTransportMessageConverter(sendMessageConverter)
        .withReceiveTransportMessageConverter(receiveMessageConverter)
        .withActorSystem(actorSystem)
        .build();

This connector is slightly more complex as it requires converting to and from the TransportMessage. However, some complexity is reduced because there is no longer any need for correlation related parameters.

The signature of the send method now returns a future containing the response. Since the transport is unreliable, a fail-safe exists to ensure that the request eventually completes, albeit with an exception after some timeout duration. A default is provided, but can be configured with the builder method, .withCallTimeoutSeconds.

When a message send times out, the response future will complete exceptionally with a TimeoutException as the cause.

For reference, the default configuration is.

default-send-connector {
  manual-start = false
  call-timeout = 30s
  queue-size = 50
  max-concurrent-offers = 500
  parallelism = 500
  parallelism-per-partition = 1
  send-message-association = true
  resiliency-settings = ${ipf.connector.default-resiliency-settings}
}

The values that can be configured via configuration properties are shown in the following table.

Property Description Example

manual-start

When set to false, the connector is started automatically after creation; otherwise, its start method must be invoked manually.

false

queue-size

Size of a source queue which can be used to handle backpressure, for example fast producer situations.

50

call-timeout

Maximum duration to wait for an acknowledgement before completing the returned future exceptionally with a TimeoutException.

30s

max-concurrent-offers

Maximum number of pending offers when buffer is full.

500

parallelism

Maximum number of parallel calls for send connector

500

parallelism-per-partition

Maximum number of parallel sends per UnitOfWorkId. Must be less than parallelism.

1

throttle-count

Limits the throughput to a specified number of consumed messages per time unit. When this value is set, throttle-duration must also be provided.

10

throttle-duration

Is used with 'throttle-count' to set the maximum rate for consuming messages. For more details, see the Message Throttling documentation.

1s

resiliency-settings

The resiliency settings that will be used when sending. For more details, see the Resilience documentation.