Transaction Caching
IPF es una aplicación basada en event sourcing y, como resultado, el lado de lectura ("query") es eventualmente consistente. Esto significa que, para casos donde necesitamos mirar el histórico muy reciente de la aplicación, puede que no sea representativo mirar el lado de lectura, ya que puede que aún no haya alcanzado todos los eventos que han tenido lugar en el lado de escritura.
Como resultado, los usuarios tienen la capacidad de implementar una transaction cache, usando el servicio de transaction cache proporcionado. Puede utilizarse para satisfacer requisitos de negocio como:
-
Verificaciones funcionales de duplicados
-
Verificaciones técnicas de duplicados
Población de la Transaction Cache
La transaction cache requiere algo de ensamblaje, ¡pero no es tan complicado comenzar! Esto es lo que necesitas hacer:
1. Añadir la dependencia
Si usas el Icon BOM, añade la siguiente dependencia a tu módulo -app:
<dependency>
<groupId>com.iconsolutions.ipf.core.platform</groupId>
<artifactId>ipf-transaction-cache</artifactId>
</dependency>
2. Identificar los tipos de transacción a persistir
Debes identificar qué deseas persistir en la cache.
PersistentTransactionCacheService tiene un tipo genérico T
que puedes usar para insertar cualquier POJO persistible en MongoDB.
En nuestro ejemplo queremos persistir este objeto de pago:
public static class Payment {
private final String from;
private final String to;
private final BigDecimal amount;
private final LocalDate paymentDate;
}
3. Seleccionar los datos de negocio importantes
Necesitamos implementar un "ID extractor" para determinar qué campos son importantes al determinar si ya hemos visto esta transacción antes. Algunos ejemplos pueden ser:
-
Amount
-
End-to-end ID
-
From Identifier
-
To Identifier
-
Unstructured data
Esto se implementa como Function<T, List<String>>.
Esto le da al servicio una manera de extraer los campos relevantes y hacerles un hash en conjunto para intentar buscarlos más adelante de forma eficiente.
Aquí hay un ejemplo de cómo inicializar el servicio de transaction cache:
var transactionCacheService = new PersistentTransactionCacheService<>(
payment -> List.of(payment.getFrom(), payment.getTo(), payment.getAmount().toString())
, repo, repositoryRetryProvider);
El primer argumento consiste en la lista de campos (¡que deben estar protegidos contra nulls!) que forman parte del hash. La implementación particular de cache service que hemos seleccionado aquí está basada en MongoDB, por lo que el segundo argumento recibe un repositorio para almacenar entradas de caché. Diferentes implementaciones pueden tener firmas diferentes dependiendo de sus requisitos.
4. Crear un enum de tipo de entrada
La cache puede contener potencialmente diferentes tipos de transacciones.
Por esa razón necesitamos poder enumerar los diferentes tipos.
Esto está representado por la interfaz TransactionCacheEntryType.
He aquí su definición:
public interface TransactionCacheEntryType {
String getName();
}
Podemos ver que realmente es solo una forma de poder diferenciar entre diferentes tipos de transacciones almacenadas en caché. Necesitamos esto porque algunos flujos de transacción pueden compartir el mismo tipo de mensaje raíz (piensa en mensajes entrantes y salientes del mismo tipo, p. ej., pacs.008).
Aquí hay una implementación de ejemplo de TransactionCacheEntryType:
public enum ExampleTransactionCacheEntryType implements TransactionCacheEntryType {
TYPE_ONE,
TYPE_TWO;
@Override
public String getName() {
return name();
}
}
Este es un enum que implementa la interfaz TransactionCacheEntryType y puede soportar dos tipos diferentes de entradas de caché: TYPE_ONE y TYPE_TWO.
Luego podemos usar el servicio para persistir nuestros tipos a persistir.
5. Envolver y guardar
Ahora podemos llamar al servicio de transaction cache para guardar nuestro Payment con su tipo así:
var payment = new Payment("Me", "You", new BigDecimal("4.20"), LocalDate.now());
var saveFuture = transactionCacheService.saveToCache(TYPE_ONE, payment, "messageId");
Para evitar guardar dos entradas para el mismo mensaje físico (por ejemplo, en caso de un reintento o reactivación), llamamos al método saveToCache con un parámetro messageId.
MessageId debe ser un identificador único para el contenido que almacenamos en la cache y típicamente sería el persistenceId en un flujo.
| El messageId no se refiere al msgId en un mensaje Iso20022 |
No se guardará una nueva entrada si ya existe en la cache, para el tipo dado, una entrada con el mismo hash y messageId; en su lugar se devolverá el registro existente.
Comprobación de la Transaction Cache
El servicio de cache tiene el siguiente método para recuperar datos de la cache:
CompletionStage<List<TransactionCacheEntry<T>>> findInCache(TransactionCacheEntryType type, T content);
Necesita el tipo de entrada que deseas encontrar, seguido del tipo T que deseas comprobar para ver si es un duplicado funcional.
Devuelve un future que contiene una lista de entradas de cache coincidentes.
Quizás quieras inspeccionar su creationDate para comprobar duplicados funcionales dentro de una ventana de tiempo.
Consideraciones de implementación
Purga (TTL)
Para la implementación con MongoDB, considera usar un MongoDB TTL index
sobre el campo creationDate para expirar entradas.
Un índice para buscar por hash se crea por defecto, pero puede que quieras añadir un índice TTL para expirar (eliminar) entradas después de un periodo de tiempo específico si ya no se requieren.
La creación de índices por defecto puede deshabilitarse con:
ipf.transaction-cache.mongodb.create-indexes=false
Los índices pueden deshabilitarse globalmente con:
ipf.mongodb.create-indexes=false
Para deshabilitar la indexación globalmente pero mantenerla para la transaction cache, aplica lo siguiente, respetando el orden:
ipf.mongodb.create-indexes=false
ipf.transaction-cache.mongodb.create-indexes=true
Commit Quorum
El commit quorum puede controlarse de forma similar con:
ipf.transaction-cache.mongodb.commit-quorum=1
O sobrescribirse globalmente con:
ipf.mongodb.commit-quorum=1
Purga (TransactionCachePurgingScheduler)
Para la implementación con MongoDB, también existe la opción de programar un job repetitivo para eliminar todas las entradas de un tipo y antigüedad específicos usando TransactionCachePurgingScheduler.
TransactionCachePurgingScheduler está disponible dentro del módulo Transaction Cache y no es necesario añadir dependencias adicionales.
1. Proporciona tu configuración:
your.purging.config.path{
transaction-cache-entry-type = "TYPE_ONE"
retain-from-time = "17:00:00"
retain-from-offset = "1 day"
scheduling-specification = "0 0 17 ? * *"
}
-
transaction-cache-entry-type: Debe coincidir con la cadena proporcionada por el método getName() de tu TransactionCacheEntryType. -
retain-from-time: La hora del día de ejecución de la purga desde la cual quieres retener entradas. -
retain-from-offset: Proporciona una duración que se restará de retain-from-time. Debe ser una duración hocon. Usa 0 days si no se necesita compensación desde retain-from-time. -
scheduling-specification: Una cadena que representa una expresión cron de cuándo ejecutar el job de purga. Para ayuda con la construcción de una expresión cron, usa un generador online como este.
El job de ejemplo anterior se ejecutará a las 5pm todos los días ("0 0 17 ? * *") y eliminará todas las entradas TYPE_ONE de tu transaction cache que sean anteriores a las 5pm del día anterior a la ejecución del job ("17:00:00" menos el offset de "1 day"). P. ej., si se ejecuta a las 17:00 el 23 de abril de 2024, se purgarán las entradas anteriores a las 17:00 del 22 de abril de 2024.
| Ten cuidado con la relación entre tu especificación de programación y tu hora de retención. Es posible que el job se ejecute antes de tu hora de retención. |
| Ejecutar un job de purga podría resultar en que se elimine de una vez un número muy grande de entradas que cumplan con el criterio. Esta gran carga de trabajo puede causar problemas de rendimiento. Considera programar jobs para horas valle/periodos tranquilos para reducir este riesgo. |
2. Crea un bean
Define un bean para TransactionCachePurgingScheduler dentro de una clase de configuración de Spring relevante o de autoconfiguración.
@Configuration
public class TransactionCachePurgingConfig {
@Bean
public TransactionCachePurgingScheduler<Payment> transactionCachePurgingScheduler(
TransactionCacheService<Payment> transactionCacheService,
SchedulingModuleInterface schedulingModule,
Clock clock,
Config config) {
return new TransactionCachePurgingScheduler<>(
transactionCacheService,
schedulingModule,
clock,
config.getConfig("your.purging.config.path"));
}
}
Sustituye <Payment> por el nombre de clase relevante para tu transaction cache.
Enfoque de purga a usar
El enfoque de purga que querrás usar dependerá de tu caso de uso. Las diferencias entre los enfoques y la recomendación de su uso se resumen en la tabla siguiente.
| Type | Action | Recommended usage |
|---|---|---|
TTL |
Purges all data from the collection based on its age in the collection |
Purging all data at a specific age where you have no requirement to be selective |
TransactionCachePurgingScheduler |
Purges data by type and age (at a specified time) |
Purging datasets at different frequencies & controlling when the purge happens |