Documentation for a newer release is available. View Latest

Inicio rápido de flujo de recepción HTTP

Al recibir mensajes de sistemas externos por HTTP, es posible aprovechar los beneficios del framework de connector sin usar necesariamente un receive connector. Esta página lo demuestra usando Spring REST Controllers y es posible mediante los componentes HttpReceiveFlow y HttpReceiveFlowService.

Dependencias

Antes de construir un HTTP Receive Flow/HTTP Receive Service, la biblioteca connector-http debe incluirse como dependencia.

 <dependency>
    <groupId>com.iconsolutions.ipf.core.connector</groupId>
    <artifactId>connector-http</artifactId>
    <version>${connector.version}</version>
</dependency>

La última versión de la biblioteca del connector puede encontrarse usando esta búsqueda en Nexus.

Primeros pasos: HTTP Receive Flow

Los HTTP Receive Flows se pueden definir para proporcionar una forma estandarizada de implementar funcionalidades como logging, correlation y otras funcionalidades tradicionalmente disponibles cuando se usa un receive connector, al usar otro mecanismo distinto del receive connector para exponer un endpoint (por ejemplo, un controlador de Spring).

Patrón Builder

Los HTTP Receive Flows se instancian utilizando el patrón builder. Esto se debe a que los HTTP receive flows tienen muchos parámetros que configurar, y la mayoría son opcionales o tienen valores predeterminados.

Veamos cómo usamos el patrón builder para instanciar un HTTP receive flow.

Al construir un HTTP receive flow lo configuramos para que sea un initiating receiver o un response receiver. Un initiating receiver recibe solicitudes de un sistema externo, mientras que un response receiver espera que los mensajes sean respuestas a solicitudes hechas previamente mediante un sending connector.

Initiating Receiver

El siguiente ejemplo muestra las propiedades mínimas que deben proporcionarse al construir un initiating receive flow.

HttpReceiveFlow<ExampleType> connector = HttpReceiveFlow
        .<ExampleType>builder("ExampleSystem") (1)
        .withReceiveTransportMessageConverter(converter) (3)
        .withProcessingContextExtractor(processingContextExtractor) (4)
        .withReceiveHandler(receiver) (5)
        .withActorSystem(actorSystem) (6)
        .build();
1 Establece el nombre del HTTP receive flow. El nombre debe representar el sistema externo del que el flow está procesando mensajes.
2 Proporciona una implementación de la interfaz ReceiveTransportMessageConverter. Toma el TransportMessage recibido y lo convierte al tipo objetivo T (ExampleType en este caso).
3 Proporciona una implementación de la interfaz ProcessingContextExtractor. Este campo es lo que hace que esto sea un initiating receiving connector ya que extrae (o genera) un ProcessingContext del mensaje en lugar de obtener uno del correlation service como sería el caso en un response receiving connector.
4 Una implementación de ReceiveHandler. Aquí iría la lógica de la aplicación para decidir cómo manejar las solicitudes.
5 Establece el actor system utilizado en toda la aplicación.

Response Receiver

El siguiente ejemplo muestra cómo construir un response receiving connector mínimo.

HttpReceiveFlow<ExampleType> connector = HttpReceiveFlow
        .<ExampleType>builder("connector-name") (1)
        .withReceiveTransportMessageConverter(converter) (3)
        .withCorrelationIdExtractor(correlationIdExtractor) (4)
        .withCorrelationService(correlationService) (5)
        .withReceiveHandler(receiver) (6)
        .withActorSystem(actorSystem) (7)
        .build();
1 Establece el nombre del HTTP receive flow. El nombre debe representar el sistema externo del que el flow está procesando mensajes.
2 Proporciona una implementación de la interfaz ReceiveTransportMessageConverter. Toma el TransportMessage recibido y lo convierte al tipo objetivo, ExampleType en este caso.
3 Proporciona una implementación de la interfaz CorrelationIdExtractor. Toma el mensaje recibido y extrae el identificador de correlación para que podamos correlacionarlo con la solicitud original realizada mediante un sending connector.
4 Proporciona una implementación de la interfaz CorrelationService. El correlation service toma el identificador de correlación extraído y devuelve el ProcessingContext asociado utilizado cuando la solicitud original se envió mediante un sending connector.
5 Una implementación de ReceiveHandler. Aquí iría la lógica de la aplicación para decidir cómo manejar las respuestas.
6 Establece el actor system utilizado en toda la aplicación.

HTTP Receive Flow Service

Una vez que haya configurado un HTTP Receive flow, debe definir un bean HTTP Receive Flow Service. Este servicio es responsable de enviar las solicitudes recibidas desde el controlador de Spring hacia el flujo de Akka Streams para su procesamiento y devolver la respuesta.

A continuación se muestra un ejemplo de cómo configurar el bean

@Bean
HttpReceiveFlowService<RequestMessage> httpReceiveFlowService(ActorSystem actorSystem, HttpReceiveFlow<RequestMessage> httpReceiveFlow) {
    return new HttpReceiveFlowService<>(actorSystem, httpReceiveFlow);
}

Procesamiento de mensajes

Una vez que los beans HttpReceiveFlow y HttpReceiveFlowService se han configurado, simplemente conecte el HttpReceiveFlowService en su controlador de Spring e invoque el método process, pasando la solicitud a procesar. Invocar este método significa que la solicitud se procesará a través de toda la funcionalidad configurada en el HttpReceiveFlow como message logging y correlation. Opcionalmente también puede pasar MessageHeaders como segundo parámetro del método process.

package com.iconsolutions.ipf.core.connector.example.app;

import com.iconsolutions.ipf.core.connector.HttpReceiveFlowService;
import com.iconsolutions.ipf.core.connector.example.model.RequestMessage;
import lombok.RequiredArgsConstructor;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RestController;
import reactor.core.publisher.Mono;

import java.util.Optional;

import static com.iconsolutions.ipf.core.connector.HttpReceiveFlowService.ACK;
import static com.iconsolutions.ipf.core.connector.HttpReceiveFlowService.HTTP_RECEIVE_FLOW_SERVICE_RESPONSE;
import static com.iconsolutions.ipf.core.connector.HttpReceiveFlowService.NACK;

@RestController
@RequiredArgsConstructor
public class ExampleController {

    private final HttpReceiveFlowService<RequestMessage> httpReceiveFlowService;
    @RequestMapping(value = "/submit", method = RequestMethod.POST)
    public Mono<ResponseEntity<Object>> submit(@RequestBody RequestMessage requestMessage) {
        return httpReceiveFlowService.process(requestMessage) (1)
                .map(response -> {
                    if(ACK.equals(Optional.ofNullable(response.getMessageHeaders().getHeaderMap().get(HTTP_RECEIVE_FLOW_SERVICE_RESPONSE)).map(Object::toString).orElse(NACK))) {
                        return ResponseEntity.accepted().build();
                    }
                    return ResponseEntity.internalServerError().build();
                });
    }

}
1 El HttpReceiveFlowService devuelve un Mono<TransportMessage> haciéndolo independiente de la tecnología. El mensaje contendrá un MessageHeader para indicar el éxito o fracaso de la solicitud (ack/nack) así como detalles de las excepciones que puedan haber ocurrido. Si el mensaje es un acknowledgement el payload contendrá el receiveContext de la solicitud original, o en el caso de un negative acknowledgement contendrá el payload de la solicitud original. En el ejemplo anterior traducimos esta respuesta a un ResponseEntity específico de Spring.

Configuración

Los valores que pueden configurarse mediante propiedades de configuración se muestran en la siguiente tabla.

Property Description Example

throttle-count

Si se establece el valor, limita el throughput a un número especificado de mensajes consumidos por unidad de tiempo. Si se establece. Si este valor se establece, también es necesario establecer throttle-duration.

10

throttle-duration

Si se establece, se usa junto con throttle-count para fijar la velocidad máxima de consumo de mensajes. Para más detalles, vea doc.akka.io/japi/akka/2.6/akka/stream/javadsl/Flow.html#throttle(int,java.time.Duration)

1s

mapping-parallelism

Si se establece, limita el número de operaciones de mapping concurrentes ejecutadas sobre los mensajes consumidos.

número de procesadores disponibles

receiver-parallelism-type

Define la manera en que los mensajes se manejan en paralelo.

  • ORDERED - los mensajes se consumen en paralelo en el orden en que se reciben, y se reconocen (ack) en el mismo orden

  • ORDERED_PARTITIONED - los mensajes se consumen en paralelo en el orden en que se reciben, y se reconocen en el mismo orden, pero el paralelismo para mensajes que comparten el UnitOfWorkId se limita a un grado configurable

  • UNORDERED - los mensajes se consumen en paralelo en el orden en que se reciben, y se reconocen en el orden de finalización, lo cual puede impactar algunos transports (por ejemplo, Kafka)

ORDERED_PARTITIONED

receiver-parallelism

Si se establece, limita el número de mensajes mapeados que pueden procesarse concurrentemente.

número de procesadores disponibles

receiver-parallelism-per-partition

Solo se aplica si receiver-parallelism-type está establecido en ORDERED_PARTITIONED. Si se establece, limita el número de mensajes mapeados por UnitOfWorkId que pueden procesarse concurrentemente. Debe ser menor que receiver-parallelism.

1

resiliency-settings

La configuración de resiliencia que se utilizará al recibir. Para más detalles, consulte la documentación de Resilience.