Documentation for a newer release is available. View Latest

Receiving Connector Quickstart

This page provides details on how to get started with receiving messages from external systems, using receiving connectors provided by the connector library.

Dependencies

Before building a receiving connector, the connector-core library must be included as a dependency.

 <dependency>
    <groupId>com.iconsolutions.ipf.core.connector</groupId>
    <artifactId>connector-core</artifactId>
    <version>${connector.version}</version>
</dependency>

The latest version of the connector library can be found using this Nexus search.

Unless providing your own implementation, at least one transport library should be declared. The naming scheme for all transports included in the connector library is connector-[transport], where [transport] matches the transport scheme that this connector should use. For more details on connector transports check out the Connector Transports documentation.

Here’s an example of declaring the dependency to use JMS.

<dependency>
    <groupId>com.iconsolutions.ipf.core</groupId>
    <artifactId>connector-jms</artifactId>
    <version>${connector.version}</version>
</dependency>

Getting Started: Receiving Connector

Receiving connectors are used for receiving messages, either as a response to a previously sent message or a new request.

Builder Pattern

Receiving connectors are instantiated using the builder pattern. This is because connectors have many parameters to configure and most are optional or have default values.

Let’s see how we use the builder pattern to instantiate a receiving connector.

When building a receiving connector we set it up to be either an initiating receiver or a response receiver. An initiating receiver receives requests from an external system, whereas a response receiver expects messages to be responses to requests made previously via a sending connector.

Initiating Receiver

The following example demonstrates the minimum properties that must be provided when building an initiating receive connector.

ReceiveConnector<ExampleType> connector = ReceiveConnector
         .<ExampleType>builder("ExampleSystem") (1)
         .withConnectorTransport(transport) (2)
         .withReceiveTransportMessageConverter(converter) (3)
         .withProcessingContextExtractor(processingContextExtractor) (4)
         .withReceiveHandler(receiver) (5)
         .withActorSystem(actorSystem) (6)
         .build();
1 Sets the name of the connector. The name should represent what the connector is connecting to.
2 Provides an implementation of the ReceivingConnectorTransport interface.
3 Provides an implementation of the ReceiveTransportMessageConverter interface. Takes the received TransportMessage and converts it to the target type T (ExampleType in this instance).
4 Provides an implementation of the ProcessingContextExtractor interface. This field is what makes this an initiating receiving connector as it extracts (or generates) a ProcessingContext from the message instead of fetching one from the correlation service as would be the case in a response receiving connector.
5 An implementation of ReceiveHandler. This is where application logic would go to decide how to handle requests.
6 Sets the actor system used throughout the application.

Response Receiver

The next example demonstrates how to build a minimal response receiving connector.

ReceiveConnector<ExampleType> connector = ReceiveConnector
        .<ExampleType>builder("connector-name") (1)
        .withConnectorTransport(transport) (2)
        .withReceiveTransportMessageConverter(converter) (3)
        .withCorrelationIdExtractor(correlationIdExtractor) (4)
        .withCorrelationService(correlationService) (5)
        .withReceiveHandler(receiver) (6)
        .withActorSystem(actorSystem) (7)
        .build();
1 Set the name of the connector. The name should represent what the connector is connecting to.
2 Provides an implementation of the ReceivingConnectorTransport interface.
3 Provides an implementation of the ReceiveTransportMessageConverter interface. Takes the received TransportMessage and converts it to the target type, ExampleType in this case.
4 Provides an implementation of the CorrelationIdExtractor interface. Takes the received message and extracts the correlation identifier so that we can correlate it with the original request made via a sending connector.
5 Provides an implementation of the CorrelationService interface. The correlation service takes the extracted correlation identifier and returns the associated ProcessingContext used when the original request was sent via a sending connector.
6 An implementation of ReceiveHandler. This is where application logic would go to decide how to handle responses.
7 Sets the actor system used throughout the application.

Start Receiving Messages

The final step is to start the connector by calling its start method. At this point you should have a connector that can receive messages via the configured transport.

For reference, the default configuration is.

default-receive-connector {
  manual-start = true
  receiver-parallelism-type = ORDERED_PARTITIONED
  receiver-parallelism-per-partition = 1
  resiliency-settings = ${ipf.connector.default-resiliency-settings}
}
application.conf
example-receive-connector {
  manual-start = false
  throttle-count = 5
  throttle-duration = 10s
}

The values that can be configured via configuration properties are shown in the following table.

Property Description Example

manual-start

When set to false, the connector is started automatically after creation; otherwise, its start method must be invoked manually.

true

throttle-count

If the value is set, limits the throughput to a specified number of consumed messages per time unit. If it is set. If this value is set, throttle-duration also needs to be set.

10

throttle-duration

If set, it is used along with throttle-count to set the maximum rate for consuming messages. For more details, see doc.akka.io/japi/akka/2.6/akka/stream/javadsl/Flow.html#throttle(int,java.time.Duration)

1s

mapping-parallelism

If set, limits the number of concurrent mapping operations executed on consumed messages.

number of available processors

receiver-parallelism-type

Defines the way messages are handled in parallel.

  • ORDERED - messages are consumed in parallel in the order they are received, and they are acknowledged in the same order

  • ORDERED_PARTITIONED - messages are consumed in parallel in the order they are received, and they are acknowledged in the same order, but the parallelism for messages sharing the UnitOfWorkId is capped to a configurable degree

  • UNORDERED - messages are consumed in parallel in the order they are received, and they are acknowledged in the their completion order, which may impact some transports (e.g. Kafka)

ORDERED_PARTITIONED

receiver-parallelism

If set, limits the number of mapped messages that are allowed to be processed concurrently.

number of available processors

receiver-parallelism-per-partition

Only applied if receiver-parallelism-type is set to ORDERED_PARTITIONED. If set, limits the number of mapped messages per UnitOfWorkId that are allowed to be processed concurrently. Must be less than receiver-parallelism.

1

resiliency-settings

The resiliency settings that will be used when receiving. For more details, see the Resilience documentation.