Documentation for a newer release is available. View Latest

¿Cómo migro una aplicación IPF existente para la purga de Journal y Snapshot?

Si tienes un sistema IPF existente y quieres implementar la purga de Journal y Snapshot, necesitas habilitar la purga en tus IPF Flow Applications existentes y migrar los documentos de Journal y Snapshot existentes para que sean eliminados correctamente por la rutina de purga de la base de datos.

Habilitar la purga de persistencia para tus aplicaciones de Flow existentes

Cada una de tus IPF Flow applications debe configurarse para habilitar la purga de persistencia y evitar el riesgo de que una base de datos quede no disponible por llenar todo el almacenamiento disponible.

La siguiente configuración debe habilitarse explícitamente en cada IPF Flow Application:

ipf.behaviour.config.persistence {
  snapshot-when-terminal=true
  delete-events-on-snapshot=true
}

Esto asegura que se creen Snapshots para transacciones que han alcanzado un estado terminal, y que el Akka Persistence Plugin marque como eliminados los documentos terminales del Journal.

Una vez que hayas reiniciado cada IPF Flow Application con la nueva configuración, todos los nuevos documentos de Journal y Snapshot contendrán los campos necesarios para determinar cuándo deben ser purgados. Puedes elegir ejecutar los siguientes scripts de migración mientras tus IPF Applications están apagadas, o después de que se hayan reiniciado correctamente con la nueva configuración.

Migrando datos existentes de Journal y Snapshot

Lo siguiente asume que has construido tus Flow Applications usando el Akka Persistence Plugin for MongoDB. Esta documentación se refiere a campos y funcionalidades que son específicos de esa implementación del plugin para que la purga funcione en consecuencia.

Dependiendo de la base de datos utilizada, se necesitarán migrar diferentes índices de time-to-live y campos de documento. Si tienes un gran número de documentos a migrar, los siguientes scripts podrían causar problemas de rendimiento en el servidor. Para evitar estos problemas, planifica crear índices y realizar actualizaciones de documentos fuera del horario laboral.

Estas operaciones deben realizarse después de habilitar la purga para tus aplicaciones de flow existentes.
Esto es para asegurar que todos los nuevos datos se persistan con la información necesaria para que la purga ocurra correctamente.

Migrando datos existentes de MongoDB

Los documentos existentes en las colecciones journal y snapshots no contienen un campo BSON Date, por lo que por defecto los índices de la colección no tendrán efecto sobre los datos existentes. Se recomienda actualizar los datos existentes dentro de estas colecciones para establecer un valor deletedAt e insertedAt para los Journal Events y los Snapshots, respectivamente.

Para proteger contra la purga no intencional de datos no terminales (ya que puede que no sepas si los datos existentes han alcanzado un estado terminal o no), puedes actualizar los respectivos campos TTL para tener una fecha BSON en el futuro. Por ejemplo, si el índice configurado expireAfterSeconds evalúa a 20 días, y estableces el campo deletedAt de un Journal Event a 10 días en el futuro, el documento será purgado realmente 30 días después de que se realice la actualización.

Los siguientes scripts de migración pueden usarse como un inicio rápido para configurar tus colecciones existentes de MongoDB con purga de journal y snapshot; pueden ejecutarse usando un shell de Mongo.

Creación de índices - MongoDB

Este script se usa para crear los dos índices a nivel de colección en los campos deletedAt e insertedAt como se describe en el MongoDB setup. Esto requiere sobrescribir los dos parámetros de entrada siguientes (el script no hará nada y lanzará un error si no se sobrescriben):

  • journalTtlInSeconds - Un entero que define el valor expireAfterSeconds para el índice deletedAt de la colección journal

  • snapshotTtlInSeconds - Un entero que define el valor expireAfterSeconds para el índice insertedAt de la colección snapshots

Este script no debe ejecutarse múltiples veces con diferentes parámetros de entrada. El shell de Mongo lanzará una excepción si se intenta crear una clave de índice idéntica con un valor de expireAfterSeconds diferente
MongoDB purging index creation
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
// This script adds ttl indexes that are needed for purging of the journal and snapshot collections for MongoDB.
//
// This script does the following...
//   1. Creates a ttl index for both the journal and snapshots collections for the fields `deletedAt` and `insertedAt` respectively
//
// Notes:
//   This script should not be run multiple times with different input parameters.
//   An exception will be thrown if trying to create an identical index key with a different expireAfterSeconds value
//
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////

// The dateTime at which Mongo will delete the journal documents from the database is determined by the value of the deletedAt field + journalTtlInSeconds
//      e.g. deletedAt = 2024-08-13T15:24:10.651Z, journalTtlInSeconds = 31536000 (1 year)
//          The document will be eligible for removal by the database after 2025-08-13T15:24:10.651Z
// The dateTime at which Mongo will delete the snapshot documents from the database is determined by the value of the insertedAt field + snapshotTtlInSeconds
//      e.g. insertedAt = 2024-08-13T15:24:10.651Z, snapshotTtlInSeconds = 31536000 (1 year)
//          The document will be eligible for removal by the database after 2025-08-13T15:24:10.651Z

// IMPORTANT - Client must override the following two values before running this script to determine when these documents will be removed by the database
// e.g. (1 year in seconds = 31536000)
const journalTtlInSeconds = null;
// e.g. (1 year in seconds = 31536000)
const snapshotTtlInSeconds = null;

const journal = db.getCollection("journal");
const snapshots = db.getCollection("snapshots");

function mongoPurgingIndexCreation(journalTtl, snapshotTtl) {
    if (journalTtl == null || snapshotTtl == null) {
        throw new Error("Encountered a null parameter. Parameter values: (journalTtlInSeconds:" + journalTtl + ", snapshotTtlInSeconds: " + snapshotTtl + ")");
    }

    // Step 1 - Create ttl index for journal and snapshots collections
    print("1. Create ttl index deletedAt on journal collection with expireAfterSeconds: " + journalTtl);
    journal.createIndex( { "deletedAt": 1 }, { expireAfterSeconds: journalTtl } );
    print("2. Create ttl index insertedAt on snapshot collection with expireAfterSeconds: " + snapshotTtl);
    snapshots.createIndex( { "insertedAt": 1 }, { expireAfterSeconds: snapshotTtl } );
}

mongoPurgingIndexCreation(journalTtlInSeconds, snapshotTtlInSeconds);

Actualizar documentos existentes - MongoDB

Este script actualiza los documentos existentes de Journal y Snapshot para que puedan ser elegibles para la purga. Esto se hace mediante:

  • Actualizar todos los documentos de Journal, estableciendo un campo deletedAt equivalente a la marca de tiempo del documento (cuando fue insertado por el plugin de persistencia) + un período de retención dado. Esto requiere sobrescribir el siguiente parámetro (el script no hará nada y lanzará un error si no se sobrescribe):

    • retentionPeriodInDays - Un entero que especifica el número de días después de la marca de tiempo original del documento en el que el documento se considera eliminado. Se usa para proteger contra la purga de entradas de Journal antes de que una transacción haya alcanzado un estado terminal.

  • Actualizar todos los documentos de Snapshot, estableciendo un campo insertedAt equivalente a la marca de tiempo del documento (cuando fue insertado por el plugin de persistencia).

Este script es indiscriminado de si los documentos de Journal pertenecen a una transacción que ha alcanzado un estado terminal o no. Actuará como si todos los documentos existentes de Journal alcanzaran un estado terminal después del período de retención.
Para tratar de proteger contra la purga no intencional de datos no terminales, se recomienda establecer el período de retención en un valor lo suficientemente alto como para que los datos de transacciones existentes deban haber alcanzado un estado terminal (por ejemplo, esperaríamos que cualquier dato de transacción existente haya alcanzado un estado terminal al menos 30 días desde que se insertó en la colección Journal).

Este script es idempotente. Si encuentras un error, es seguro volver a ejecutarlo hasta que todos los journals y snapshots existentes tengan un campo ttl de índice correspondiente. Todos los documentos de journal y snapshot que fueron migrados hasta el error serán omitidos en ejecuciones posteriores.

MongoDB migration script
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
// This script sets the deletedAt and insertedAt fields for existing journal and snapshot purging of the journal and snapshot collections for MongoDB.
//
// This script does the following...
//   1. Sets a deletedAt dateTime for all journal documents that do not have a `deletedAt` field
//          This field is calculated from the existing timestamp (when the document was inserted)
//   2. Sets a insertedAt dateTime for all snapshot documents that do not have a `insertedAt` field
//          This field is calculated from the existing timestamp (when the document was inserted)
//
// Notes:
//   This script is idempotent. If you encounter an error, it is safe to re-run until all existing journal and snapshots have a corresponding ttl index field.
//   All journal and snapshot documents that were migrated up until the error will be skipped on subsequent runs.
//
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////

// The dateTime at which Mongo will delete the journal documents from the database is determined by the value of the existing timestamp + the retentionPeriodInSeconds + the value of the journal collection deletedAt ttl index
//      e.g. existing timestamp = 1723559050651 (2024-08-13T15:24:10.651Z), retentionPeriodInSeconds 30 = (30 days), journal Ttl index in Seconds = 31536000 (1 year)
//          The document's deletedAt field value will be set to: 2024-09-12T15:24:10.651Z, and the document will be eligible for removal by the database after 2025-09-12T15:24:10.651Z
// The dateTime at which Mongo will delete the snapshot documents from the database is determined by the value of the existing timestamp + the value of the snapshot collection insertedAt ttl index
//      e.g. existing timestamp = 1723559050651 (2024-08-13T15:24:10.651Z), snapshot Ttl index in Seconds = 31536000 (1 year)
//          The document's insertedAt field value will be set to: 2024-08-13T15:24:10.651Z, and the document will be eligible for removal by the database after 2025-08-13T15:24:10.651Z

// Step 1 - For each journal document without a deletedAt field, set deletedAt to the value of the existing timestamp

// IMPORTANT - Client must override the following value before running this script to determine when Journal documents will be removed by the database
// e.g. (30 days = 30)
const retentionPeriodInDays = null;

const journal = db.getCollection("journal");
const snapshots = db.getCollection("snapshots");

function purgingDocumentMigration(retentionPeriodInDays) {
    if (retentionPeriodInDays == null) {
        throw new Error("The journal retention period parameter cannot be null");
    }
    // Convert days to milliseconds
    const retentionPeriod = retentionPeriodInDays * 86400000;

    print("1 - Set the deletedAt field on all journal documents that do not have one");
    const journalsToBeUpdated = journal.countDocuments({ deletedAt: null })
    print(journalsToBeUpdated + " journal documents to be updated");
    journal.updateMany(
        // Only match journal documents that either do not contain the deletedAt field or contain the deletedAt field whose value is null
        { deletedAt: null },
        // Set deletedAt datetime of the value of the existing eventPayloads.timestamp
        [ { $set: { deletedAt: { $add: [ { $toDate: { $arrayElemAt: ["$eventPayloads.timestamp", 0] } } , retentionPeriod ] } } } ]
    )
    const remainingJournals = journal.count({ deletedAt: null });
    print('Successfully set deletedAt field on ' + (journalsToBeUpdated - remainingJournals) + ' journal documents');
    print(remainingJournals + ' journal documents remain without a deletedAt field');

// Step 2 - For each snapshot document without a insertedAt field, set insertedAt to the value of the existing timestamp
    print("2 - Set the insertedAt field on all snapshot documents that do not have one");
    const snapshotsToBeUpdated = snapshots.countDocuments({ insertedAt: null });
    print(snapshotsToBeUpdated + " snapshot documents to be updated");
    snapshots.updateMany(
        // Only match snapshot documents that either do not contain the insertedAt field or contain the insertedAt field whose value is null
        { insertedAt : null },
        // Set insertedAt to match the value of the top-level timestamp field
        [ { $set: { insertedAt: { $toDate: "$timestamp" } } } ]
    )
    const remainingSnapshots = snapshots.countDocuments( { insertedAt: null });
    print("Successfully set insertedAt field on " + (snapshotsToBeUpdated - remainingSnapshots) + " snapshot documents");
    print(remainingSnapshots + ' snapshot documents remain without a insertedAt field');
}

purgingDocumentMigration(retentionPeriodInDays);

Migrando datos existentes de CosmosDB

Por defecto, CosmosDB rellenará y actualizará automáticamente el campo _ts para un documento dado, manteniendo la marca de tiempo de la última modificación del documento. Crear un índice a nivel de colección tanto para la colección journal como snapshots permitirá la eliminación de datos existentes una determinada cantidad de tiempo después de que el documento fuera actualizado por última vez.

Puede que quieras actualizar los Journal Events existentes para tener un valor TTL a nivel de documento. Esto sobrescribirá el valor del índice TTL de la colección y deberá establecerse manualmente. Consulta la documentación oficial de CosmosDB para más información sobre cómo establecer un valor TTL a nivel de documento.

Como ejemplo, podrías querer establecer el índice a nivel de colección de journal a 365 días, pero que todos los Journal Events existentes se purguen en 30 días. Por lo tanto, actualizarías todos los Journal Events existentes para establecer el campo ttl en 2592000 (30 días en segundos).

Script de migración de CosmosDB

El siguiente script de migración puede usarse como inicio rápido para configurar tus colecciones existentes de CosmosDB con purga de journal y snapshot; puede ejecutarse usando el Mongo shell de Cosmos.

Este script se usa para crear los dos índices a nivel de colección __ts como se describe en el CosmosDB setup. Esto requiere sobrescribir los dos parámetros de entrada siguientes (el script no hará nada y lanzará un error si no se sobrescriben):

  • journalTtlInSeconds - Un entero que define el valor expireAfterSeconds para el índice _ts de la colección journal

  • snapshotTtlInSeconds - Un entero que define el valor expireAfterSeconds para el índice _ts de la colección snapshots

Una vez que se crean los índices a nivel de colección, Cosmos gestionará automáticamente cuándo eliminar los documentos existentes de Journal y Snapshot ya que el campo específico de CosmosDB _ts ya está poblado en cada documento y contiene la marca de tiempo de la última modificación del documento. No es necesario actualizar los documentos de Journal existentes.

Este script no debe ejecutarse múltiples veces con diferentes parámetros de entrada. El Mongo shell de Cosmos lanzará una excepción si se intenta crear una clave de índice idéntica con un valor de expireAfterSeconds diferente.
CosmosDB purging index creation
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
// This script adds ttl indexes that are needed for purging of the journal and snapshot collections for CosmosDB.
//
// This script does the following...
//   1. Creates a ttl index for both the journal and snapshots collections for the Cosmos specific `_ts` field
//
// Notes:
//   This script should not be run multiple times with different input parameters.
//   An exception will be thrown if trying to create an identical index key with a different expireAfterSeconds value
//
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////

// The dateTime at which Cosmos will delete the journal documents from the database is determined by the value of the collection `_ts` index and when the snapshot document was last updated (which is managed by Cosmos)
//      e.g. if the document was last updated at 2024-08-13T15:24:10.651Z, journalTtlInSeconds = 31536000 (1 year)
//          Cosmos will expire the document after 2025-08-13T15:24:10.651Z
// The dateTime at which Cosmos will delete the snapshot documents from the database is determined by the value of the collection `_ts` index and when the snapshot document was last updated (which is managed by Cosmos)
//      e.g. if the document was last updated at 2024-08-13T15:24:10.651Z, snapshotTtlInSeconds = 31536000 (1 year)
//          Cosmos will expire the document after 2025-08-13T15:24:10.651Z

// IMPORTANT - Client must override the following two values before running this script to determine when these documents will be removed by the database
// e.g. (1 year in seconds = 31536000)
const journalTtlInSeconds = null;
// e.g. (1 year in seconds = 31536000)
const snapshotTtlInSeconds = null;

const journal = db.getCollection("journal");
const snapshots = db.getCollection("snapshots");

function cosmosPurgingIndexCreation(journalTtl, snapshotTtl) {
    if (journalTtl == null || snapshotTtl == null) {
        throw new Error("Encountered a null parameter. Parameter values: (journalTtlInSeconds:" + journalTtl + ", snapshotTtlInSeconds: " + snapshotTtl + ")");
    }

    // Step 1 - Create ttl index for journal and snapshots collections
    print("1 - Create ttl index _ts on journal collection with expireAfterSeconds: " + journalTtl);
    journal.createIndex( { "_ts": 1 }, { expireAfterSeconds: journalTtl } );
    print("2 - Create ttl index _ts on snapshot collection with expireAfterSeconds: " + snapshotTtl);
    snapshots.createIndex( { "_ts": 1 }, { expireAfterSeconds: snapshotTtl } );
}

cosmosPurgingIndexCreation(journalTtlInSeconds, snapshotTtlInSeconds);