Inicio rápido de flujo de recepción HTTP
Al recibir mensajes de sistemas externos por HTTP, es posible aprovechar los beneficios del framework de connector sin usar necesariamente un receive connector. Esta página lo demuestra usando Spring REST Controllers y es posible mediante los componentes HttpReceiveFlow y HttpReceiveFlowService.
Dependencias
Antes de construir un HTTP Receive Flow/HTTP Receive Service, la biblioteca connector-http debe incluirse como dependencia.
<dependency>
<groupId>com.iconsolutions.ipf.core.connector</groupId>
<artifactId>connector-http</artifactId>
<version>${connector.version}</version>
</dependency>
La última versión de la biblioteca del connector puede encontrarse usando esta búsqueda en Nexus.
Primeros pasos: HTTP Receive Flow
Los HTTP Receive Flows se pueden definir para proporcionar una forma estandarizada de implementar funcionalidades como logging, correlation y otras funcionalidades tradicionalmente disponibles cuando se usa un receive connector, al usar otro mecanismo distinto del receive connector para exponer un endpoint (por ejemplo, un controlador de Spring).
Patrón Builder
Los HTTP Receive Flows se instancian utilizando el patrón builder. Esto se debe a que los HTTP receive flows tienen muchos parámetros que configurar, y la mayoría son opcionales o tienen valores predeterminados.
Veamos cómo usamos el patrón builder para instanciar un HTTP receive flow.
Al construir un HTTP receive flow lo configuramos para que sea un initiating receiver o un response receiver. Un initiating receiver recibe solicitudes de un sistema externo, mientras que un response receiver espera que los mensajes sean respuestas a solicitudes hechas previamente mediante un sending connector.
Initiating Receiver
El siguiente ejemplo muestra las propiedades mínimas que deben proporcionarse al construir un initiating receive flow.
HttpReceiveFlow<ExampleType> connector = HttpReceiveFlow
.<ExampleType>builder("ExampleSystem") (1)
.withReceiveTransportMessageConverter(converter) (3)
.withProcessingContextExtractor(processingContextExtractor) (4)
.withReceiveHandler(receiver) (5)
.withActorSystem(actorSystem) (6)
.build();
| 1 | Establece el nombre del HTTP receive flow. El nombre debe representar el sistema externo del que el flow está procesando mensajes. |
| 2 | Proporciona una implementación de la interfaz ReceiveTransportMessageConverter.
Toma el TransportMessage recibido y lo convierte al tipo objetivo T (ExampleType en este caso). |
| 3 | Proporciona una implementación de la interfaz ProcessingContextExtractor.
Este campo es lo que hace que esto sea un initiating receiving connector ya que extrae (o genera) un ProcessingContext del mensaje en lugar de obtener uno del correlation service como sería el caso en un response receiving connector. |
| 4 | Una implementación de ReceiveHandler.
Aquí iría la lógica de la aplicación para decidir cómo manejar las solicitudes. |
| 5 | Establece el actor system utilizado en toda la aplicación. |
Response Receiver
El siguiente ejemplo muestra cómo construir un response receiving connector mínimo.
HttpReceiveFlow<ExampleType> connector = HttpReceiveFlow
.<ExampleType>builder("connector-name") (1)
.withReceiveTransportMessageConverter(converter) (3)
.withCorrelationIdExtractor(correlationIdExtractor) (4)
.withCorrelationService(correlationService) (5)
.withReceiveHandler(receiver) (6)
.withActorSystem(actorSystem) (7)
.build();
| 1 | Establece el nombre del HTTP receive flow. El nombre debe representar el sistema externo del que el flow está procesando mensajes. |
| 2 | Proporciona una implementación de la interfaz ReceiveTransportMessageConverter.
Toma el TransportMessage recibido y lo convierte al tipo objetivo, ExampleType en este caso. |
| 3 | Proporciona una implementación de la interfaz CorrelationIdExtractor.
Toma el mensaje recibido y extrae el identificador de correlación para que podamos correlacionarlo con la solicitud original realizada mediante un sending connector. |
| 4 | Proporciona una implementación de la interfaz CorrelationService.
El correlation service toma el identificador de correlación extraído y devuelve el ProcessingContext asociado utilizado cuando la solicitud original se envió mediante un sending connector. |
| 5 | Una implementación de ReceiveHandler.
Aquí iría la lógica de la aplicación para decidir cómo manejar las respuestas. |
| 6 | Establece el actor system utilizado en toda la aplicación. |
HTTP Receive Flow Service
Una vez que haya configurado un HTTP Receive flow, debe definir un bean HTTP Receive Flow Service. Este servicio es responsable de enviar las solicitudes recibidas desde el controlador de Spring hacia el flujo de Akka Streams para su procesamiento y devolver la respuesta.
A continuación se muestra un ejemplo de cómo configurar el bean
@Bean
HttpReceiveFlowService<RequestMessage> httpReceiveFlowService(ActorSystem actorSystem, HttpReceiveFlow<RequestMessage> httpReceiveFlow) {
return new HttpReceiveFlowService<>(actorSystem, httpReceiveFlow);
}
Procesamiento de mensajes
Una vez que los beans HttpReceiveFlow y HttpReceiveFlowService se han configurado, simplemente conecte el HttpReceiveFlowService en su controlador de Spring e invoque el método process, pasando la solicitud a procesar. Invocar este método significa que la solicitud se procesará a través de toda la funcionalidad configurada en el HttpReceiveFlow como message logging y correlation. Opcionalmente también puede pasar MessageHeaders como segundo parámetro del método process.
package com.iconsolutions.ipf.core.connector.example.app;
import com.iconsolutions.ipf.core.connector.HttpReceiveFlowService;
import com.iconsolutions.ipf.core.connector.example.model.RequestMessage;
import lombok.RequiredArgsConstructor;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RestController;
import reactor.core.publisher.Mono;
import java.util.Optional;
import static com.iconsolutions.ipf.core.connector.HttpReceiveFlowService.ACK;
import static com.iconsolutions.ipf.core.connector.HttpReceiveFlowService.HTTP_RECEIVE_FLOW_SERVICE_RESPONSE;
import static com.iconsolutions.ipf.core.connector.HttpReceiveFlowService.NACK;
@RestController
@RequiredArgsConstructor
public class ExampleController {
private final HttpReceiveFlowService<RequestMessage> httpReceiveFlowService;
@RequestMapping(value = "/submit", method = RequestMethod.POST)
public Mono<ResponseEntity<Object>> submit(@RequestBody RequestMessage requestMessage) {
return httpReceiveFlowService.process(requestMessage) (1)
.map(response -> {
if(ACK.equals(Optional.ofNullable(response.getMessageHeaders().getHeaderMap().get(HTTP_RECEIVE_FLOW_SERVICE_RESPONSE)).map(Object::toString).orElse(NACK))) {
return ResponseEntity.accepted().build();
}
return ResponseEntity.internalServerError().build();
});
}
}
| 1 | El HttpReceiveFlowService devuelve un Mono<TransportMessage> haciéndolo independiente de la tecnología.
El mensaje contendrá un MessageHeader para indicar el éxito o fracaso de la solicitud (ack/nack) así como detalles de las excepciones que puedan haber ocurrido.
Si el mensaje es un acknowledgement el payload contendrá el receiveContext de la solicitud original,
o en el caso de un negative acknowledgement contendrá el payload de la solicitud original. En el ejemplo anterior traducimos esta respuesta a un ResponseEntity específico de Spring. |
Configuración
Los valores que pueden configurarse mediante propiedades de configuración se muestran en la siguiente tabla.
| Property | Description | Example |
|---|---|---|
|
Si se establece el valor, limita el throughput a un número especificado de mensajes consumidos por unidad de tiempo. Si se establece. Si este valor se establece, también es necesario establecer throttle-duration. |
10 |
|
Si se establece, se usa junto con |
1s |
|
Si se establece, limita el número de operaciones de mapping concurrentes ejecutadas sobre los mensajes consumidos. |
número de procesadores disponibles |
|
Define la manera en que los mensajes se manejan en paralelo.
|
ORDERED_PARTITIONED |
|
Si se establece, limita el número de mensajes mapeados que pueden procesarse concurrentemente. |
número de procesadores disponibles |
|
Solo se aplica si |
1 |
|
La configuración de resiliencia que se utilizará al recibir. Para más detalles, consulte la documentación de Resilience. |