HTTP Receiving Flow Quickstart
When receiving messages from external systems over HTTP it is possible to leverage the benefits of the connector framework without necessarily using a receive connector. This page demonstrates this using Spring Rest Controllers and is possible by using the HttpReceiveFlow and HttpReceiveFlowService components.
Dependencies
Before building a HTTP Receive Flow/HTTP Receive Service, the connector-http library must be included as a dependency.
<dependency>
<groupId>com.iconsolutions.ipf.core.connector</groupId>
<artifactId>connector-http</artifactId>
<version>${connector.version}</version>
</dependency>
The latest version of the connector library can be found using this Nexus search.
Getting Started: HTTP Receive Flow
HTTP Receive Flows can be defined to provide a standardised way to implement functionality such as logging, correlation and other functionality traditionally available when using a receive connector, when using another mechanism than the receive connector to expose an endpoint (e.g. spring controller)
Builder Pattern
HTTP Receive Flows are instantiated using the builder pattern. This is because http receive flows have many parameters to configure and most are optional or have default values.
Let’s see how we use the builder pattern to instantiate a http receive flow.
When building a http receive flow we set it up to be either an initiating receiver or a response receiver. An initiating receiver receives requests from an external system, whereas a response receiver expects messages to be responses to requests made previously via a sending connector.
Initiating Receiver
The following example demonstrates the minimum properties that must be provided when building an initiating receive flow.
HttpReceiveFlow<ExampleType> connector = HttpReceiveFlow
.<ExampleType>builder("ExampleSystem") (1)
.withReceiveTransportMessageConverter(converter) (3)
.withProcessingContextExtractor(processingContextExtractor) (4)
.withReceiveHandler(receiver) (5)
.withActorSystem(actorSystem) (6)
.build();
| 1 | Sets the name of the http receive flow. The name should represent the external system messages that the flow is processing messages from. |
| 2 | Provides an implementation of the ReceiveTransportMessageConverter interface.
Takes the received TransportMessage and converts it to the target type T (ExampleType in this instance). |
| 3 | Provides an implementation of the ProcessingContextExtractor interface.
This field is what makes this an initiating receiving connector as it extracts (or generates) a ProcessingContext from the message instead of fetching one from the correlation service as would be the case in a response receiving connector. |
| 4 | An implementation of ReceiveHandler.
This is where application logic would go to decide how to handle requests. |
| 5 | Sets the actor system used throughout the application. |
Response Receiver
The next example demonstrates how to build a minimal response receiving connector.
HttpReceiveFlow<ExampleType> connector = HttpReceiveFlow
.<ExampleType>builder("connector-name") (1)
.withReceiveTransportMessageConverter(converter) (3)
.withCorrelationIdExtractor(correlationIdExtractor) (4)
.withCorrelationService(correlationService) (5)
.withReceiveHandler(receiver) (6)
.withActorSystem(actorSystem) (7)
.build();
| 1 | Set the name of the http receive flow. The name should represent the external system messages that the flow is processing messages from. |
| 2 | Provides an implementation of the ReceiveTransportMessageConverter interface.
Takes the received TransportMessage and converts it to the target type, ExampleType in this case. |
| 3 | Provides an implementation of the CorrelationIdExtractor interface.
Takes the received message and extracts the correlation identifier so that we can correlate it with the original request made via a sending connector. |
| 4 | Provides an implementation of the CorrelationService interface.
The correlation service takes the extracted correlation identifier and returns the associated ProcessingContext used when the original request was sent via a sending connector. |
| 5 | An implementation of ReceiveHandler.
This is where application logic would go to decide how to handle responses. |
| 6 | Sets the actor system used throughout the application. |
HTTP Receive Flow Service
Once you have setup a HTTP Receive flow, you then need to define a HTTP Receive Flow Service bean. This service is responsible for sending the requests received from the spring controller onto the Akka Streams flow for processing and returning the response.
Below is an example of how to configure the bean
@Bean
HttpReceiveFlowService<RequestMessage> httpReceiveFlowService(ActorSystem actorSystem, HttpReceiveFlow<RequestMessage> httpReceiveFlow) {
return new HttpReceiveFlowService<>(actorSystem, httpReceiveFlow);
}
Processing Messages
Once the HttpReceiveFlow and HttpReceiveFlowService beans have been configured, you simply wire the HttpReceiveFlowService into your spring controller and invoke the process method, passing in the request to be processed. Invoking this method means that the request will be processed through all the configured functionality on the HttpReceiveFlow such as message logging and correlation. Optionally you can also pass MessageHeaders as the second parameter to the process method.
package com.iconsolutions.ipf.core.connector.example.app;
import com.iconsolutions.ipf.core.connector.HttpReceiveFlowService;
import com.iconsolutions.ipf.core.connector.example.model.RequestMessage;
import lombok.RequiredArgsConstructor;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RestController;
import reactor.core.publisher.Mono;
import java.util.Optional;
import static com.iconsolutions.ipf.core.connector.HttpReceiveFlowService.ACK;
import static com.iconsolutions.ipf.core.connector.HttpReceiveFlowService.HTTP_RECEIVE_FLOW_SERVICE_RESPONSE;
import static com.iconsolutions.ipf.core.connector.HttpReceiveFlowService.NACK;
@RestController
@RequiredArgsConstructor
public class ExampleController {
private final HttpReceiveFlowService<RequestMessage> httpReceiveFlowService;
@RequestMapping(value = "/submit", method = RequestMethod.POST)
public Mono<ResponseEntity<Object>> submit(@RequestBody RequestMessage requestMessage) {
return httpReceiveFlowService.process(requestMessage) (1)
.map(response -> {
if(ACK.equals(Optional.ofNullable(response.getMessageHeaders().getHeaderMap().get(HTTP_RECEIVE_FLOW_SERVICE_RESPONSE)).map(Object::toString).orElse(NACK))) {
return ResponseEntity.accepted().build();
}
return ResponseEntity.internalServerError().build();
});
}
}
| 1 | The HttpReceiveFlowService returns a Mono<TransportMessage> making it technology agnostic.
The message will contain a MessageHeader to indicate the success or failure of the request (ack/nack) as well as details of exceptions that may have occurred.
If the message is an acknowledgement the payload will contain the receiveContext of the original request,
or in the case of negative acknowledgement it will contain the original request payload. In the above we example we translate this response into a spring specific ResponseEntity. |
Configuration
The values that can be configured via configuration properties are shown in the following table.
| Property | Description | Example |
|---|---|---|
|
If the value is set, limits the throughput to a specified number of consumed messages per time unit. If it is set. If this value is set, throttle-duration also needs to be set. |
10 |
|
If set, it is used along with |
1s |
|
If set, limits the number of concurrent mapping operations executed on consumed messages. |
number of available processors |
|
Defines the way messages are handled in parallel.
|
ORDERED_PARTITIONED |
|
If set, limits the number of mapped messages that are allowed to be processed concurrently. |
number of available processors |
|
Only applied if |
1 |
|
The resiliency settings that will be used when receiving. For more details, see the Resilience documentation. |