¿Cómo migrar un existente?IPF Application¿para la purga de Journal y Snapshot?
Si tiene un sistema IPF existente y desea implementar la purga de Journal y Snapshot, debe habilitar la purga en su existente IPF Flow Aplique y migre los documentos de Journal y Snapshot existentes para que sean eliminados correctamente por la rutina de purga de la base de datos.
Habilite la purga de persistencia para sus aplicaciones Flow existentes
Cada uno de sus IPF Flow Las aplicaciones deben configurarse para habilitar la purga de persistencia y evitar el riesgo de que una base de datos pueda volverse no disponible debido a llenar todo el almacenamiento disponible.
La siguiente configuración debe ser habilitada explícitamente en cada IPF Flow Aplicación:
ipf.behaviour.config.persistence {
snapshot-when-terminal=true
delete-events-on-snapshot=true
}
Esto asegura que se creen instantáneas para las transacciones que han alcanzado un terminal.state, y que el Akka Persistence El complemento marca los documentos del diario terminal como eliminados.
Una vez que usted tenga restarted cada IPF Flow Aplicación con la nueva configuración, todos los nuevos documentos de Journal y Snapshot contendrán los campos necesarios para determinar cuándo deben ser eliminados. Puede elegir ejecutar los siguientes scripts de migración mientras su IPF Applications están inactivos, o después de que hayan sido exitosamente restarted con la nueva configuración.
Migrando Datos Existentes de Diario y Captura
Lo siguiente asume que ha construido sus Aplicaciones de Flujo utilizando el Akka Persistence Plugin for MongoDB. Esta documentación se refiere a campos y funcionalidades que son específicos de esa implementación del plugin para que la purga funcione adecuadamente.
Dependiendo de la base de datos que se esté utilizando, será necesario migrar diferentes índices de tiempo de vida y campos de documentos. Si usted tiene un gran número de documentos para migrar, los siguientes scripts podrían causar problemas de rendimiento en el servidor. Para evitar estos problemas, planifique crear índices y realizar actualizaciones de documentos durante horas no laborables.
|
Estas operaciones deben realizarse después de habilitando la purga para sus aplicaciones de flujo existentes. Esto es para asegurar que todos los nuevos datos se conservarán con la información necesaria para que la purga se realice con éxito. |
Migrando existente MongoDB datos
Los documentos existentes en las colecciones de journal y snapshots no contienen un campo BSON Date, por lo que, por defecto, los índices de la colección no tendrán efecto en los datos existentes. Se recomienda actualizar los datos existentes dentro de estas colecciones para establecer un valor deletedAt e insertedAt para Journal. Events y Snapshots, respectivamente.
Para salvaguardar contra la eliminación involuntaria de datos no terminales (ya que usted puede no saber si los datos existentes han alcanzado un estado terminal state o no), puede actualizar los campos TTL respectivos para tener una fecha BSON en el futuro. Por ejemplo, si el índice configurado expireAfterSeconds evalúa a 20 días, y usted establece un Journal Event El campo deletedAt de ’ se establecerá en 10 días en el futuro; el documento será purgado 30 días después de que se realice la actualización.
Los siguientes scripts de migración pueden ser utilizados como un inicio rápido para configurar su existente MongoDB las colecciones con purga de diarios y instantáneas pueden ejecutarse utilizando un shell de Mongo.
Creación de índices - MongoDB
Este script se utiliza para crear los dos índices a nivel de colección en el deletedAt y insertedAt campos según lo descrito en el Configuración de MongoDB. Esto requiere anular los siguientes dos parámetros de entrada (el script no hará nada y generará un error si no se anulan):
-
journalTtlInSeconds- Un entero que define elexpireAfterSecondsvalor para eljournalcoleccióndeletedAtíndice -
snapshotTtlInSeconds- Un entero que define elexpireAfterSecondsvalor para elsnapshotscoleccióninsertedAtíndiceEste script no debe ejecutarse múltiples veces con diferentes parámetros de entrada. Se lanzará una excepción por parte de la shell de Mongo si se intenta crear una clave de índice idéntica con un valor diferente de expireAfterSeconds.
Creación de índices de purga en MongoDB
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
// This script adds ttl indexes that are needed for purging of the journal and snapshot collections for MongoDB.
//
// This script does the following...
// 1. Creates a ttl index for both the journal and snapshots collections for the fields `deletedAt` and `insertedAt` respectively
//
// Notes:
// This script should not be run multiple times with different input parameters.
// An exception will be thrown if trying to create an identical index key with a different expireAfterSeconds value
//
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
// The dateTime at which Mongo will delete the journal documents from the database is determined by the value of the deletedAt field + journalTtlInSeconds
// e.g. deletedAt = 2024-08-13T15:24:10.651Z, journalTtlInSeconds = 31536000 (1 year)
// The document will be eligible for removal by the database after 2025-08-13T15:24:10.651Z
// The dateTime at which Mongo will delete the snapshot documents from the database is determined by the value of the insertedAt field + snapshotTtlInSeconds
// e.g. insertedAt = 2024-08-13T15:24:10.651Z, snapshotTtlInSeconds = 31536000 (1 year)
// The document will be eligible for removal by the database after 2025-08-13T15:24:10.651Z
// IMPORTANT - Client must override the following two values before running this script to determine when these documents will be removed by the database
// e.g. (1 year in seconds = 31536000)
const journalTtlInSeconds = null;
// e.g. (1 year in seconds = 31536000)
const snapshotTtlInSeconds = null;
const journal = db.getCollection("journal");
const snapshots = db.getCollection("snapshots");
function mongoPurgingIndexCreation(journalTtl, snapshotTtl) {
if (journalTtl == null || snapshotTtl == null) {
throw new Error("Encountered a null parameter. Parameter values: (journalTtlInSeconds:" + journalTtl + ", snapshotTtlInSeconds: " + snapshotTtl + ")");
}
// Step 1 - Create ttl index for journal and snapshots collections
print("1. Create ttl index deletedAt on journal collection with expireAfterSeconds: " + journalTtl);
journal.createIndex( { "deletedAt": 1 }, { expireAfterSeconds: journalTtl } );
print("2. Create ttl index insertedAt on snapshot collection with expireAfterSeconds: " + snapshotTtl);
snapshots.createIndex( { "insertedAt": 1 }, { expireAfterSeconds: snapshotTtl } );
}
mongoPurgingIndexCreation(journalTtlInSeconds, snapshotTtlInSeconds);
Actualizando documentos existentes - MongoDD
Este script actualiza los documentos de Diario y Snapshot existentes para que puedan ser elegibles para su eliminación. Esto se realiza mediante:
-
Actualizando todos los documentos del Diario, estableciendo un
deletedAtcampo que es equivalente a la marca de tiempo del documento (cuando fue insertado por el complemento de persistencia) + un período de retención dado. Esto requiere anular el siguiente parámetro (el script no hará nada y generará un error si no se anula):-
retentionPeriodInDays- Un entero que especifica el número de días después de la marca de tiempo original del documento en el cual el documento se considera eliminado. Se utiliza para protegerse contra la eliminación de entradas del Diario antes de que una transacción haya llegado a un terminal.state.
-
-
Actualizando todos los documentos de Snapshot, estableciendo un
insertedAtcampo que es equivalente a la marca de tiempo del documento (cuando fue insertado por el complemento de persistencia).Este script es indiscriminado respecto a si los documentos del Diario pertenecen a una transacción que ha alcanzado un terminal.state o no. Actuará como si todos los documentos del Diario existentes alcanzaran un terminal state después del período de retención.
Para intentar salvaguardar contra la eliminación involuntaria de datos no terminales, se recomienda establecer el período de retención en un valor lo suficientemente alto como para que los datos de transacciones existentes hayan alcanzado un estado terminal.state(por ejemplo, esperaríamos que cualquier dato de transacción existente haya llegado a un terminal state al menos 30 días desde que fue insertado en la colección del Journal).
Este script es idempotente. Si encuentra un error, es seguro volver a ejecutarlo hasta que todos los registros existentes y las instantáneas tengan un campo de índice ttl correspondiente. Todos los documentos de diario y de instantánea que fueron migrados hasta el error serán omitidos en ejecuciones posteriores.
Script de migración de MongoDB
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
// This script sets the deletedAt and insertedAt fields for existing journal and snapshot purging of the journal and snapshot collections for MongoDB.
//
// This script does the following...
// 1. Sets a deletedAt dateTime for all journal documents that do not have a `deletedAt` field
// This field is calculated from the existing timestamp (when the document was inserted)
// 2. Sets a insertedAt dateTime for all snapshot documents that do not have a `insertedAt` field
// This field is calculated from the existing timestamp (when the document was inserted)
//
// Notes:
// This script is idempotent. If you encounter an error, it is safe to re-run until all existing journal and snapshots have a corresponding ttl index field.
// All journal and snapshot documents that were migrated up until the error will be skipped on subsequent runs.
//
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
// The dateTime at which Mongo will delete the journal documents from the database is determined by the value of the existing timestamp + the retentionPeriodInSeconds + the value of the journal collection deletedAt ttl index
// e.g. existing timestamp = 1723559050651 (2024-08-13T15:24:10.651Z), retentionPeriodInSeconds 30 = (30 days), journal Ttl index in Seconds = 31536000 (1 year)
// The document's deletedAt field value will be set to: 2024-09-12T15:24:10.651Z, and the document will be eligible for removal by the database after 2025-09-12T15:24:10.651Z
// The dateTime at which Mongo will delete the snapshot documents from the database is determined by the value of the existing timestamp + the value of the snapshot collection insertedAt ttl index
// e.g. existing timestamp = 1723559050651 (2024-08-13T15:24:10.651Z), snapshot Ttl index in Seconds = 31536000 (1 year)
// The document's insertedAt field value will be set to: 2024-08-13T15:24:10.651Z, and the document will be eligible for removal by the database after 2025-08-13T15:24:10.651Z
// Step 1 - For each journal document without a deletedAt field, set deletedAt to the value of the existing timestamp
// IMPORTANT - Client must override the following value before running this script to determine when Journal documents will be removed by the database
// e.g. (30 days = 30)
const retentionPeriodInDays = null;
const journal = db.getCollection("journal");
const snapshots = db.getCollection("snapshots");
function purgingDocumentMigration(retentionPeriodInDays) {
if (retentionPeriodInDays == null) {
throw new Error("The journal retention period parameter cannot be null");
}
// Convert days to milliseconds
const retentionPeriod = retentionPeriodInDays * 86400000;
print("1 - Set the deletedAt field on all journal documents that do not have one");
const journalsToBeUpdated = journal.countDocuments({ deletedAt: null })
print(journalsToBeUpdated + " journal documents to be updated");
journal.updateMany(
// Only match journal documents that either do not contain the deletedAt field or contain the deletedAt field whose value is null
{ deletedAt: null },
// Set deletedAt datetime of the value of the existing eventPayloads.timestamp
[ { $set: { deletedAt: { $add: [ { $toDate: { $arrayElemAt: ["$eventPayloads.timestamp", 0] } } , retentionPeriod ] } } } ]
)
const remainingJournals = journal.count({ deletedAt: null });
print('Successfully set deletedAt field on ' + (journalsToBeUpdated - remainingJournals) + ' journal documents');
print(remainingJournals + ' journal documents remain without a deletedAt field');
// Step 2 - For each snapshot document without a insertedAt field, set insertedAt to the value of the existing timestamp
print("2 - Set the insertedAt field on all snapshot documents that do not have one");
const snapshotsToBeUpdated = snapshots.countDocuments({ insertedAt: null });
print(snapshotsToBeUpdated + " snapshot documents to be updated");
snapshots.updateMany(
// Only match snapshot documents that either do not contain the insertedAt field or contain the insertedAt field whose value is null
{ insertedAt : null },
// Set insertedAt to match the value of the top-level timestamp field
[ { $set: { insertedAt: { $toDate: "$timestamp" } } } ]
)
const remainingSnapshots = snapshots.countDocuments( { insertedAt: null });
print("Successfully set insertedAt field on " + (snapshotsToBeUpdated - remainingSnapshots) + " snapshot documents");
print(remainingSnapshots + ' snapshot documents remain without a insertedAt field');
}
purgingDocumentMigration(retentionPeriodInDays);
Migrando datos existentes de CosmosDB
Por defecto, CosmosDB automáticamente poblará y actualizará el campo _ts para un documento dado, manteniendo la marca de tiempo de la última modificación del documento. Crear un índice a nivel de colección tanto para la colección de diarios como para la de instantáneas permitirá la eliminación de datos existentes un tiempo determinado después de que el documento fue actualizado por última vez.
Puede que desee actualizar el Diario existente. Events para tener un valor TTL a nivel de documento. Esto anulará el valor del índice TTL de la colección y deberá establecerse manualmente. Consulte la documentación oficial de CosmosDB para obtener más información sobre cómo establecer un valor TTL a nivel de documento.
Como ejemplo, puede que desee establecer el índice del nivel de colección de revistas a 365 días, pero desea que todas las revistas existentes Events para ser eliminado en 30 días. Por lo tanto, usted debe actualizar todos los Journal Events para establecer el campo ttl en 2592000 (30 días en segundos).
Script de migración de CosmosDB
El siguiente script de migración puede ser utilizado como un inicio rápido para configurar sus colecciones existentes de CosmosDB con purga de registros y instantáneas, puede ejecutarse utilizando el shell de Mongo de Cosmos.
Este script se utiliza para crear los dos niveles de colección.__ts índices como se detalla en el Configuración de CosmosDB. Esto requiere anular los siguientes dos parámetros de entrada (el script no hará nada y generará un error si no se anulan):
-
journalTtlInSeconds- Un entero que define elexpireAfterSecondsvalor para eljournalcolección_tsíndice -
snapshotTtlInSeconds- Un entero que define elexpireAfterSecondsvalor para elsnapshotscolección_tsíndice
Una vez que se crean los índices a nivel de colección, Cosmos gestionará automáticamente cuándo eliminar los documentos de Journal y Snapshot existentes, según lo específico de CosmosDB._ts El campo ya está poblado en cada documento, el cual contiene la marca de tiempo de la última modificación del documento. No es necesario actualizar los documentos del Diario existentes.
| Este script no debe ejecutarse múltiples veces con diferentes parámetros de entrada. Se lanzará una excepción por parte del shell de Mongo de Cosmos si se intenta crear una clave de índice idéntica con un valor diferente de expireAfterSeconds. |
Creación de índice de purga en CosmosDB
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
// This script adds ttl indexes that are needed for purging of the journal and snapshot collections for CosmosDB.
//
// This script does the following...
// 1. Creates a ttl index for both the journal and snapshots collections for the Cosmos specific `_ts` field
//
// Notes:
// This script should not be run multiple times with different input parameters.
// An exception will be thrown if trying to create an identical index key with a different expireAfterSeconds value
//
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
// The dateTime at which Cosmos will delete the journal documents from the database is determined by the value of the collection `_ts` index and when the snapshot document was last updated (which is managed by Cosmos)
// e.g. if the document was last updated at 2024-08-13T15:24:10.651Z, journalTtlInSeconds = 31536000 (1 year)
// Cosmos will expire the document after 2025-08-13T15:24:10.651Z
// The dateTime at which Cosmos will delete the snapshot documents from the database is determined by the value of the collection `_ts` index and when the snapshot document was last updated (which is managed by Cosmos)
// e.g. if the document was last updated at 2024-08-13T15:24:10.651Z, snapshotTtlInSeconds = 31536000 (1 year)
// Cosmos will expire the document after 2025-08-13T15:24:10.651Z
// IMPORTANT - Client must override the following two values before running this script to determine when these documents will be removed by the database
// e.g. (1 year in seconds = 31536000)
const journalTtlInSeconds = null;
// e.g. (1 year in seconds = 31536000)
const snapshotTtlInSeconds = null;
const journal = db.getCollection("journal");
const snapshots = db.getCollection("snapshots");
function cosmosPurgingIndexCreation(journalTtl, snapshotTtl) {
if (journalTtl == null || snapshotTtl == null) {
throw new Error("Encountered a null parameter. Parameter values: (journalTtlInSeconds:" + journalTtl + ", snapshotTtlInSeconds: " + snapshotTtl + ")");
}
// Step 1 - Create ttl index for journal and snapshots collections
print("1 - Create ttl index _ts on journal collection with expireAfterSeconds: " + journalTtl);
journal.createIndex( { "_ts": 1 }, { expireAfterSeconds: journalTtl } );
print("2 - Create ttl index _ts on snapshot collection with expireAfterSeconds: " + snapshotTtl);
snapshots.createIndex( { "_ts": 1 }, { expireAfterSeconds: snapshotTtl } );
}
cosmosPurgingIndexCreation(journalTtlInSeconds, snapshotTtlInSeconds);