DPS Adapter Deployment

This is a guide on how to set up IPF Studio Configuration DPS properly in deployment if there are multiple instances/nodes that rely on DPS Adapter for dynamic variable consistency.

Below are examples of docker-compose file and configuration, as well as kubernetes manifests with comments explaining importance of each property. DPS Adapter is a part of some example project application (flow) in these files, and not a standalone application.

It is important to note that in environment with multiple instances of the application using dps adapter, consumer group id (in this case ipf.cps-api.client.notification.kafka.consumer.kafka-clients.group.id) needs to be unique for each instance in deployment. This way each instance is receiving all CPS kafka notifications and keeping their dynamic registry consistent.

Docker

Docker Compose
version: "3.0"
services:

  ipf-mongo:
    image: registry.ipf.iconsolutions.com/ipf-docker-mongodb:latest
    container_name: ipf-mongo
    ports:
      - 27017:27017
    command: --replSet test --bind_ip_all
    healthcheck:
      test: echo 'db.runCommand("ping").ok' | mongo localhost:27017/test --quiet
    volumes:
      - "./config/mongodb/init-replication.sh:/docker-entrypoint-initdb.d/01_init_replication.sh"

  zookeeper:
    image: zookeeper
    container_name: zookeeper
    restart: unless-stopped
    ports:
      - "2181:2181"
    healthcheck:
      test: [ "CMD", "zkServer.sh", "status" ]
      interval: 5s
      timeout: 10s
      retries: 10

  kafka:
    image: registry.ipf.iconsolutions.com/kafka-icon:2.13-2.7.1
    container_name: kafka
    restart: always
    ports:
      - "9092:9092"
      - "9093:9093"
    environment:
      - KAFKA_BROKER_ID=0
      - KAFKA_AUTO_CREATE_TOPICS_ENABLE=false
      - KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181
      - KAFKA_LOG_RETENTION_MINUTES=10
      - KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR=1
      - KAFKA_OFFSETS_TOPIC_NUM_PARTITIONS=1
      - KAFKA_LISTENERS=PLAINTEXT://:9092,LOCAL://:9093
      - KAFKA_LISTENER_SECURITY_PROTOCOL_MAP=LOCAL:PLAINTEXT,PLAINTEXT:PLAINTEXT
      - KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://kafka:9092,LOCAL://localhost:9093
      - KAFKA_CREATE_TOPICS=CPS_CRUD_NOTIFICATION:3:1
    healthcheck:
      test: nc -z localhost 9092 || exit -1
      interval: 5s
      timeout: 10s
      retries: 10
    depends_on:
      zookeeper:
        condition: service_healthy

  ipf-developer-app:
    image: registry.ipf.iconsolutions.com/ipf-developer-app:latest
    container_name: ipf-developer-app
    ports:
      - 8081:8081
    volumes:
      - ./config/ipf-developer-app:/ipf-developer-app/conf
      - ./logs:/ipf/logs
    user: "${UID:-1000}:${GID:-1000}"
    environment:
      - IPF_JAVA_ARGS=-Dma.glasnost.orika.writeClassFiles=false -Dma.glasnost.orika.writeSourceFiles=false -Dconfig.override_with_env_vars=true
    depends_on:
      - ipf-mongo
    healthcheck:
      test: [ "CMD", "curl", "http://localhost:8081/actuator/health" ]

  custom-processing-settings-app:
    image: registry.ipf.iconsolutions.com/custom-processing-settings-application:latest
    container_name: custom-processing-settings-app
    ports:
      - "8089:8080"
      - "5006:5005"
    user: "${UID:-root}:${GID:-root}"
    environment:
      - IPF_JAVA_ARGS=-Dma.glasnost.orika.writeClassFiles=false -Dma.glasnost.orika.writeSourceFiles=false
        -Dspring.data.mongodb.uri=mongodb://ipf-mongo:27017/ipf
        -Dactor-system-name=custom-processing-settings
        -Dakka.cluster.seed-nodes.0=akka://custom-processing-settings@custom-processing-settings-app:55002
        -Dakka.remote.artery.canonical.hostname=custom-processing-settings-app
        -Dakka.remote.artery.canonical.port=55002
        -Dakka.remote.artery.bind.hostname=0.0.0.0
        -Dakka.remote.artery.bind.port=55002
        -Dakka.management.http.bind-hostname=0.0.0.0
        -Dakka.kafka.producer.kafka-clients.bootstrap.servers=kafka:9092
    healthcheck:
      test: [ "CMD", "curl", "-f", "http://localhost:8080/actuator/health" ]
      interval: 5s
    depends_on:
      ipf-mongo:
        condition: service_healthy
      kafka:
        condition: service_healthy

  # below are the instance of example project application each relying on dps adapter for dynamic variables consistency
  # each instance will be its own consumer group in order for nodes to receive all kafka notifications and update their registries accordingly

  example-project-app-1:
    image: registry.ipf.iconsolutions.com/example-project-app:latest
    container_name: example-project-app-1
    ports:
      - "8082:8080"
      - "5003:5005"
    user: "${UID:-root}:${GID:-root}"
    environment:
      - IPF_JAVA_ARGS=-Dma.glasnost.orika.writeClassFiles=false -Dma.glasnost.orika.writeSourceFiles=false
    volumes:
      - ./config/example-project-app-1:/example-project-app/conf
    healthcheck:
      test: [ "CMD", "curl", "-f", "http://localhost:8080/actuator/health" ]
      interval: 5s
    depends_on:
      custom-processing-settings-app:
        condition: service_healthy

  example-project-app-2:
    image: registry.ipf.iconsolutions.com/example-project-app:latest
    container_name: example-project-app-2
    ports:
      - "8083:8080"
      - "5004:5005"
    user: "${UID:-root}:${GID:-root}"
    environment:
      - IPF_JAVA_ARGS=-Dma.glasnost.orika.writeClassFiles=false -Dma.glasnost.orika.writeSourceFiles=false
    volumes:
      - ./config/example-project-app-2:/example-project-app/conf
    healthcheck:
      test: [ "CMD", "curl", "-f", "http://localhost:8080/actuator/health" ]
      interval: 5s
    depends_on:
      custom-processing-settings-app:
        condition: service_healthy

  example-project-app-3:
    image: registry.ipf.iconsolutions.com/example-project-app:latest
    container_name: example-project-app-3
    ports:
      - "8084:8080"
      - "5005:5005"
    user: "${UID:-root}:${GID:-root}"
    environment:
      - IPF_JAVA_ARGS=-Dma.glasnost.orika.writeClassFiles=false -Dma.glasnost.orika.writeSourceFiles=false
    volumes:
      - ./config/example-project-app-3:/example-project-app/conf
    healthcheck:
      test: [ "CMD", "curl", "-f", "http://localhost:8080/actuator/health" ]
      interval: 5s
    depends_on:
      custom-processing-settings-app:
        condition: service_healthy
application.conf
ipf {
  # ipf configuration for example project flow
  # consisting of different models with entities and configurable values
  studio = {
    models = [
      {
        name = "exampleModel",
        configurable-values-global = {
          bankId = "fromConfig_global_MyBank",
        }
        entities = [
          {
            name = "BANK_ENTITY_1",
            configurable-values = {
              bankId = "fromConfig_BankEntity1_MyBank",
            }
          },
          {
            name = "BANK_ENTITY_2",
            configurable-values = {
              bankId = "fromConfig_BankEntity2_MyBank",
              domesticPaymentLimit = 300000
            }
          }
        ]
      }
    ]
  }

  # example of dps adapter configuration
  # configuring dynamic config types, configuration entries and setting categories
  dynamic-config-adapter {
    dynamic-config-types = ["customProcessingSettings"]
    setting-categories = ["pain001DeterminePaymentTypeVariables"]
    correlation-entries = [
      {
        destination-variable-key = "exampleModel_limitCheckEnabled"
        variable-value-type: BOOLEAN
        source-dynamic-config: {
          config-type: "customProcessingSettings"
          config-name: "domesticLimitChecks"
        }
      },
      {
        destination-variable-key = "exampleModel_domesticPaymentLimit"
        variable-value-type: NUMBER
        source-dynamic-config: {
          config-type: "customProcessingSettings"
          config-name: "domesticPaymentLimit"
        }
      }
    ]
    # the period at which the values are refreshed automatically
    # if we don't want to rely on CPS kafka notifications to much
    # this property can be set to 10s/30s
    value-refresh-period: 1m
  }

  # configuration for retreival of custom processing settings
  custom-processing-settings-api {
    http.client {
      host = "custom-processing-settings-app"
      port = 8080
      endpoint-url = "/api/v1/"
    }
  }

  # important to note that if there are multiple nodes in environment
  # for each node to be able to receive all of these notifications and update their registries accordingly
  # consumer group id needs to be setup differently for each node in deployment
  # other instances should, for example, have this property set as cps-crud-notification-group-2, cps-crud-notification-group-3
  cps-api.client.notification.kafka.consumer.kafka-clients.group.id = cps-crud-notification-group-1

  # ipf processing data egress set up
  processing-data.egress {
    enabled = true
    transport = http
    http {
      client {
        host = "ipf-developer-app"
        port = 8081
        endpoint-url = "/ipf-processing-data"
      }
    }
  }

  system-events.exporter {
    type = ipf-processing-data-egress
  }

  // default timeout
  behaviour.retries {
    initial-timeout = 1s
  }

  # mongo set up
  mongodb.url = "mongodb://ipf-mongo:27017/ipf"
}

spring.data.mongodb.uri = ${ipf.mongodb.url}

# actor system is the same for all instances of example project app
actor-system-name = "example-project-actor-system"

# akka cluster set up with seed nodes
akka {
  cluster.bootstrap.enabled = false
  cluster.seed-nodes = [
    // one seed node that is the same for all instances (example project-app-1)
    "akka://"${actor-system-name}"@example-project-app-1:55001"
  ]
  remote.artery {
    canonical.port = 55001
    # this hostname should be different for other instances of example project app
    # xample-project-app-2, xample-project-app-3
    canonical.hostname = example-project-app-1
    bind.hostname = 0.0.0.0
    bind.port = 55001
  }

  # kafka configuration
  kafka {
    consumer {
      kafka-clients {
        bootstrap.servers = "kafka:9092"
      }
    }
  }
}

Kubernetes

Deployment Manifest
apiVersion: apps/v1
kind: Deployment
metadata:
  name: example-project
  annotations:
    prometheus.io/scrape: "true"
    prometheus.io/path: "/"
    prometheus.io/port: "9001"
spec:
  replicas: 3
  selector:
    matchLabels:
      app: example-project
      product: ipfv2
  template:
    metadata:
      annotations:
        prometheus.io/scrape: "true"
        prometheus.io/path: "/"
        prometheus.io/port: "9001"
      labels:
        app: example-project
        product: ipfv2
    spec:
      imagePullSecrets:
        - name: "registrysecret"
      containers:
        - name: example-project
          image: ${docker.registry}/example-project-application:latest
          imagePullPolicy: Always
          securityContext:
            runAsUser: 0
            allowPrivilegeEscalation: true
          ports:
            - name: http
              containerPort: 8080
            - name: debug-port
              containerPort: 5005
            - name: akka-artery
              containerPort: 55001
            - name: akka-management
              containerPort: 8558
            - name: akka-metrics
              containerPort: 9001
          livenessProbe:
            failureThreshold: 5
            httpGet:
              path: /health/alive
              port: akka-management
              scheme: HTTP
            initialDelaySeconds: 60
            periodSeconds: 2
            successThreshold: 1
            timeoutSeconds: 1
          readinessProbe:
            failureThreshold: 3
            httpGet:
              path: /health/ready
              port: akka-management
              scheme: HTTP
            initialDelaySeconds: 60
            periodSeconds: 2
            successThreshold: 10
            timeoutSeconds: 1
          env:
            - name: "POD_NAME"
              valueFrom:
                fieldRef:
                  fieldPath: metadata.name
            - name: "IPF_JAVA_ARGS"
              value: "-Dma.glasnost.orika.writeClassFiles=false -Dma.glasnost.orika.writeSourceFiles=false"
            - name: "POD_IP"
              valueFrom:
                fieldRef:
                  fieldPath: status.podIP
            - name: "KUBERNETES_NAMESPACE"
              valueFrom:
                fieldRef:
                  fieldPath: metadata.namespace
          resources:
            requests:
              memory: ${example-project.min-mem}
              cpu: ${example-project.min-cpu}
          volumeMounts:
            - mountPath: /example-project-application/conf/logback.xml
              name: config-volume
              subPath: logback.xml
            - mountPath: /example-project-application/conf/application.conf
              name: config-volume
              subPath: application.conf
      volumes:
        - name: config-volume
          configMap:
            name: example-project-cm
        - name: keystore
          secret:
            secretName: keystore
Service Manifest
apiVersion: v1
kind: Service
metadata:
  name: example-project
  labels:
    product: ipfv2
spec:
  selector:
    app: example-project
    product: ipfv2
  ports:
    - protocol: TCP
      port: 80
      targetPort: 8080
      name: actuator
    - protocol: TCP
      port: 9001
      targetPort: 9001
      name: akka-metrics
    - protocol: TCP
      port: 55001
      targetPort: 55001
      name: remoting
    - protocol: TCP
      port: 5005
      targetPort: 5005
      name: debug
ConfigMap Manifest
apiVersion: v1
kind: ConfigMap
metadata:
  name: example-project-cm
data:
  application.conf: |-
    # mongo configuration setup
    spring.data.mongodb.uri = "${ipf.mongodb.url}"
    iconsolutions.akka.persistence.mongodb {
      class = "com.iconsolutions.akka.persistence.mongodb.MongoDBAsyncWriteJournal"
      plugin-dispatcher = "akka.actor.default-dispatcher"
      url = "${ipf.mongodb.url}"
    }
    # akka configuration setup
    akka {
      # Use Kubernetes API to discover the cluster
      discovery {
        kubernetes-api {
          pod-label-selector = "app=example-project"
        }
      }
      actor.provider = cluster
      cluster {
        # configuring seed nodes
        seed-nodes = []
        downing-provider-class = "akka.cluster.sbr.SplitBrainResolverProvider"
        split-brain-resolver {
          active-strategy = keep-majority
          stable-after = 20s
        }
        # sharding
        sharding {
          handoff-timeout = 8s
          least-shard-allocation-strategy.rebalance-absolute-limit = 20
          rebalance-interval = 2s
          number-of-shards = 10
          passivation {
            strategy = "default-strategy"
            default-strategy.active-entity-limit = 20000
          }
        }
      }
      # using specific pod IP as hostaname for each instance
      remote.artery.canonical.hostname = ${POD_IP}
      management {
        health-checks {
          readiness-path = "health/ready"
          liveness-path = "health/alive"
        }
        # use the Kubernetes API to create the cluster
        cluster.bootstrap {
          contact-point-discovery {
            service-name = "example-project"
            discovery-method = kubernetes-api
            required-contact-point-nr = 1
            required-contact-point-nr = ${?REQUIRED_CONTACT_POINT_NR}
          }
        }
      }
      # kafka configuration
      kafka {
        consumer {
          kafka-clients {
            bootstrap.servers = "kafka:9092"
          }
        }
      }
    }
    
    ipf {
      # ipf configuration for example project flow
      # consisting of different models with entities and configurable values
      studio = {
        models = [
          {
            name = "exampleModel",
            configurable-values-global = {
              bankId = "fromConfig_global_MyBank",
            }
            entities = [
              {
                name = "BANK_ENTITY_1",
                configurable-values = {
                  bankId = "fromConfig_BankEntity1_MyBank",
                }
              },
              {
                name = "BANK_ENTITY_2",
                configurable-values = {
                  bankId = "fromConfig_BankEntity2_MyBank",
                  domesticPaymentLimit = 300000
                }
              }
            ]
          }
        ]
      }
      
      # example of dps adapter configuration
      # configuring dynamic config types, configuration entries and setting categories
      dynamic-config-adapter {
        dynamic-config-types = ["customProcessingSettings"]
        setting-categories = ["pain001DeterminePaymentTypeVariables"]
        correlation-entries = [
          {
            destination-variable-key = "exampleModel_limitCheckEnabled"
            variable-value-type: BOOLEAN
            source-dynamic-config: {
              config-type: "customProcessingSettings"
              config-name: "domesticLimitChecks"
            }
          },
          {
            destination-variable-key = "exampleModel_domesticPaymentLimit"
            variable-value-type: NUMBER
            source-dynamic-config: {
              config-type: "customProcessingSettings"
              config-name: "domesticPaymentLimit"
            }
          }
        ]
        # the period at which the values are refreshed automatically
        # if we don't want to rely on CPS kafka notifications to much
        # this property can be set to 10s/30s
        value-refresh-period: 1m
      }
      
      # configuration for retreival of custom processing settings
      custom-processing-settings-api {
        http.client {
          host = "custom-processing-settings-app"
          port = 8080
          endpoint-url = "/api/v1/"
        }
      }
          
      cps-api.client.notification.kafka.consumer.kafka-clients {
        # important to note that if there are multiple instances in environment
        # for each instance to be able to receive all of these notifications and update their registries accordingly
        # consumer group id needs to be setup differently, it needs to be unique
        # that is why here we rely on POD_NAME as consumer group id
        # POD_NAME is coming through environment in deployment manifest
        group.id = ${POD_NAME}
        # since each time pod is restarted we will have different consumer group created
        # it is important to set auto.offset.reset to latest
        # so we always read the latest offset, latest message
        auto.offset.reset = latest
      }
      
      # ipf processing data egress set up
      processing-data.egress {
        enabled = true
        transport = http
        http {
          client {
            host = "ipf-developer-app"
            port = 8081
            endpoint-url = "/ipf-processing-data"
          }
        }
      }
      
      system-events.exporter {
        type = ipf-processing-data-egress
      }
      
      // default timeout
      behaviour.retries {
        initial-timeout = 1s
      }
      
      # ipf mongo set up
      mongodb.url = "mongodb://ipf-mongo:27017/ipf"
    }
            

    
    

  logback.xml: |-
    <?xml version="1.0" encoding="UTF-8"?>
    <configuration>

        <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
            <encoder>
                <pattern>
                    %d{dd-MM-yyyy HH:mm:ss.SSS} %magenta([%thread]) %highlight(%-5level) %logger{36}.%M - %msg%n
                </pattern>
            </encoder>
            <immediateFlush>false</immediateFlush>
        </appender>

        <appender name="ASYNC_STDOUT" class="ch.qos.logback.classic.AsyncAppender">
            <queueSize>1000</queueSize>
            <discardingThreshold>0</discardingThreshold>
            <appender-ref ref="STDOUT"/>
            <includeCallerData>true</includeCallerData>
        </appender>

        <logger name="akka.stream.scaladsl.RestartWithBackoffSource" level="OFF"/>

        <root level="INFO">
            <appender-ref ref="ASYNC_STDOUT"/>
        </root>

    </configuration>