Documentation for a newer release is available. View Latest

How to migrate an existing IPF application for Journal and Snapshot purging

If you have an existing IPF system and want to implement Journal and Snapshot purging, you need to enable purging on your existing IPF Flow Applications and migrate the existing Journal and Snapshot documents so that they are correctly removed by the database purging routine.

Enable persistence purging for your existing Flow applications

Each of your IPF Flow applications should be configured to enable persistence purging to avoid the risk that a database can become unavailable due to filling up all available storage.

The following configuration should be explicitly enabled in each IPF Flow Application:

ipf.behaviour.config.persistence {
  snapshot-when-terminal=true
  delete-events-on-snapshot=true
}

This ensures that Snapshots are created for transactions that have reached a terminal state, and that the Akka Persistence Plugin marks terminal Journal documents as deleted.

Once you have restarted each IPF Flow Application with the new configuration, all new Journal and Snapshot documents will contain the fields needed to determine when they should be purged. You can either choose to run the following migration scripts while your IPF Applications are down, or after they have successfully restarted with the new configuration.

Migrating Existing Journal and Snapshot Data

The following assumes you have built your Flow Applications using the Akka Persistence Plugin for MongoDB. This documentation refers to fields and functionality that are specific to that plugin implementation for purging to work accordingly.

Depending on the database being used, different time-to-live indexes and document fields will need to be migrated. If you have a large number of documents to migrate, the following scripts might cause performance issues on the server. To avoid these issues, plan to create indexes and perform document updates during off hours.

These operations should be performed after enabling purging for your existing flow applications.
This is to ensure that all new data will be persisted with the necessary information for purging to occur successfully.

Migrating existing MongoDB data

Existing documents in the journal and snapshots collections do not contain a BSON Date field, so by default the collection indexes will have no effect on existing data. It is recommended to update existing data within these collections to set a deletedAt and insertedAt value for Journal Events and Snapshots, respectively.

To safeguard against the unintentional purging of non-terminal data (as you may not know if existing data has reached a terminal state or not), you can update the respective TTL fields to have a BSON Date in the future. For example, if the configured index expireAfterSeconds evaluates to 20 days, and you set a Journal Event’s deletedAt field to 10 days in the future, the document will actually be purged 30 days after the update is made.

The following migration scripts can be used as a quickstart for setting up your existing MongoDB collections with journal and snapshot purging, they can be run using a Mongo shell.

Index creation - MongoDB

This script is used to create the two collection level indexes on the deletedAt and insertedAt fields as outlined in the MongoDB setup. This requires overriding the following two input parameters (the script will do nothing and throw an error if these are not overridden):

  • journalTtlInSeconds - A integer that defines the expireAfterSeconds value for the journal collection deletedAt index

  • snapshotTtlInSeconds - A integer that defines the expireAfterSeconds value for the snapshots collection insertedAt index

This script should not be run multiple times with different input parameters. An exception will be thrown by the Mongo shell if trying to create an identical index key with a different expireAfterSeconds value
MongoDB purging index creation
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
// This script adds ttl indexes that are needed for purging of the journal and snapshot collections for MongoDB.
//
// This script does the following...
//   1. Creates a ttl index for both the journal and snapshots collections for the fields `deletedAt` and `insertedAt` respectively
//
// Notes:
//   This script should not be run multiple times with different input parameters.
//   An exception will be thrown if trying to create an identical index key with a different expireAfterSeconds value
//
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////

// The dateTime at which Mongo will delete the journal documents from the database is determined by the value of the deletedAt field + journalTtlInSeconds
//      e.g. deletedAt = 2024-08-13T15:24:10.651Z, journalTtlInSeconds = 31536000 (1 year)
//          The document will be eligible for removal by the database after 2025-08-13T15:24:10.651Z
// The dateTime at which Mongo will delete the snapshot documents from the database is determined by the value of the insertedAt field + snapshotTtlInSeconds
//      e.g. insertedAt = 2024-08-13T15:24:10.651Z, snapshotTtlInSeconds = 31536000 (1 year)
//          The document will be eligible for removal by the database after 2025-08-13T15:24:10.651Z

// IMPORTANT - Client must override the following two values before running this script to determine when these documents will be removed by the database
// e.g. (1 year in seconds = 31536000)
const journalTtlInSeconds = null;
// e.g. (1 year in seconds = 31536000)
const snapshotTtlInSeconds = null;

const journal = db.getCollection("journal");
const snapshots = db.getCollection("snapshots");

function mongoPurgingIndexCreation(journalTtl, snapshotTtl) {
    if (journalTtl == null || snapshotTtl == null) {
        throw new Error("Encountered a null parameter. Parameter values: (journalTtlInSeconds:" + journalTtl + ", snapshotTtlInSeconds: " + snapshotTtl + ")");
    }

    // Step 1 - Create ttl index for journal and snapshots collections
    print("1. Create ttl index deletedAt on journal collection with expireAfterSeconds: " + journalTtl);
    journal.createIndex( { "deletedAt": 1 }, { expireAfterSeconds: journalTtl } );
    print("2. Create ttl index insertedAt on snapshot collection with expireAfterSeconds: " + snapshotTtl);
    snapshots.createIndex( { "insertedAt": 1 }, { expireAfterSeconds: snapshotTtl } );
}

mongoPurgingIndexCreation(journalTtlInSeconds, snapshotTtlInSeconds);

Updating existing documents - MongoDD

This script updates existing Journal and Snapshot documents so that they can be eligible for purging. This is done by:

  • Updating all Journal documents, setting a deletedAt field that is equivalent to the document’s timestamp (when it was inserted by the persistence plugin) + a given retention period. This requires overriding the following parameter (the script will do nothing and throw an error if not overridden):

    • retentionPeriodInDays - An integer that specifies the number of days after the document’s original timestamp at which the document is considered deleted. Used to guard against purging Journal entries before a transaction has hit a terminal state.

  • Updating all Snapshot documents, setting a insertedAt field that is equivalent to the document’s timestamp (when it was inserted by the persistence plugin).

This script is indiscriminate of whether Journal documents belong to a transaction that has reached a terminal state or not. It will act as if all existing Journal documents reached a terminal state after the retention period.
To try and safeguard against the unintentional purging of non-terminal data, it is recommended to set the retention period to a value high enough that existing transaction data should have reached a terminal state (e.g. we would expect any existing transaction data to have reached a terminal state at least 30 days since it was inserted into the Journal collection).

This script is idempotent. If you encounter an error, it is safe to re-run until all existing journal and snapshots have a corresponding ttl index field. All journal and snapshot documents that were migrated up until the error will be skipped on subsequent runs.

MongoDB migration script
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
// This script sets the deletedAt and insertedAt fields for existing journal and snapshot purging of the journal and snapshot collections for MongoDB.
//
// This script does the following...
//   1. Sets a deletedAt dateTime for all journal documents that do not have a `deletedAt` field
//          This field is calculated from the existing timestamp (when the document was inserted)
//   2. Sets a insertedAt dateTime for all snapshot documents that do not have a `insertedAt` field
//          This field is calculated from the existing timestamp (when the document was inserted)
//
// Notes:
//   This script is idempotent. If you encounter an error, it is safe to re-run until all existing journal and snapshots have a corresponding ttl index field.
//   All journal and snapshot documents that were migrated up until the error will be skipped on subsequent runs.
//
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////

// The dateTime at which Mongo will delete the journal documents from the database is determined by the value of the existing timestamp + the retentionPeriodInSeconds + the value of the journal collection deletedAt ttl index
//      e.g. existing timestamp = 1723559050651 (2024-08-13T15:24:10.651Z), retentionPeriodInSeconds 30 = (30 days), journal Ttl index in Seconds = 31536000 (1 year)
//          The document's deletedAt field value will be set to: 2024-09-12T15:24:10.651Z, and the document will be eligible for removal by the database after 2025-09-12T15:24:10.651Z
// The dateTime at which Mongo will delete the snapshot documents from the database is determined by the value of the existing timestamp + the value of the snapshot collection insertedAt ttl index
//      e.g. existing timestamp = 1723559050651 (2024-08-13T15:24:10.651Z), snapshot Ttl index in Seconds = 31536000 (1 year)
//          The document's insertedAt field value will be set to: 2024-08-13T15:24:10.651Z, and the document will be eligible for removal by the database after 2025-08-13T15:24:10.651Z

// Step 1 - For each journal document without a deletedAt field, set deletedAt to the value of the existing timestamp

// IMPORTANT - Client must override the following value before running this script to determine when Journal documents will be removed by the database
// e.g. (30 days = 30)
const retentionPeriodInDays = null;

const journal = db.getCollection("journal");
const snapshots = db.getCollection("snapshots");

function purgingDocumentMigration(retentionPeriodInDays) {
    if (retentionPeriodInDays == null) {
        throw new Error("The journal retention period parameter cannot be null");
    }
    // Convert days to milliseconds
    const retentionPeriod = retentionPeriodInDays * 86400000;

    print("1 - Set the deletedAt field on all journal documents that do not have one");
    const journalsToBeUpdated = journal.countDocuments({ deletedAt: null })
    print(journalsToBeUpdated + " journal documents to be updated");
    journal.updateMany(
        // Only match journal documents that either do not contain the deletedAt field or contain the deletedAt field whose value is null
        { deletedAt: null },
        // Set deletedAt datetime of the value of the existing eventPayloads.timestamp
        [ { $set: { deletedAt: { $add: [ { $toDate: { $arrayElemAt: ["$eventPayloads.timestamp", 0] } } , retentionPeriod ] } } } ]
    )
    const remainingJournals = journal.count({ deletedAt: null });
    print('Successfully set deletedAt field on ' + (journalsToBeUpdated - remainingJournals) + ' journal documents');
    print(remainingJournals + ' journal documents remain without a deletedAt field');

// Step 2 - For each snapshot document without a insertedAt field, set insertedAt to the value of the existing timestamp
    print("2 - Set the insertedAt field on all snapshot documents that do not have one");
    const snapshotsToBeUpdated = snapshots.countDocuments({ insertedAt: null });
    print(snapshotsToBeUpdated + " snapshot documents to be updated");
    snapshots.updateMany(
        // Only match snapshot documents that either do not contain the insertedAt field or contain the insertedAt field whose value is null
        { insertedAt : null },
        // Set insertedAt to match the value of the top-level timestamp field
        [ { $set: { insertedAt: { $toDate: "$timestamp" } } } ]
    )
    const remainingSnapshots = snapshots.countDocuments( { insertedAt: null });
    print("Successfully set insertedAt field on " + (snapshotsToBeUpdated - remainingSnapshots) + " snapshot documents");
    print(remainingSnapshots + ' snapshot documents remain without a insertedAt field');
}

purgingDocumentMigration(retentionPeriodInDays);

Migrating existing CosmosDB data

By default, CosmosDB will automatically populate and update the _ts field for a given document, maintaining the timestamp of the document’s last modification. Creating a collection level index for both the journal and snapshots collection will enable the deletion of existing data a given amount of time after the document was last updated.

You may want to update existing Journal Events to have a document level TTL value. This will override the collection TTL index value and will need to be set manually. See the official CosmosDB documentation for more information about how to set a document level TTL value.

As an example, you might want to set the journal collection level index to 365 days, but you want all existing Journal Events to be purged in 30 days time. So you would update all existing Journal Events to set the ttl field to 2592000 (30 days in seconds).

CosmosDB migration script

The following migration script can be used as a quickstart for setting up your existing CosmosDB collections with journal and snapshot purging, it can be run using Cosmos' Mongo shell.

This script is used to create the two collection level __ts indexes as outlined in the CosmosDB setup. This requires overriding the following two input parameters (the script will do nothing and throw an error if these are not overridden):

  • journalTtlInSeconds - A integer that defines the expireAfterSeconds value for the journal collection _ts index

  • snapshotTtlInSeconds - A integer that defines the expireAfterSeconds value for the snapshots collection _ts index

Once the collection level indexes are created, Cosmos will automatically manage when to remove existing Journal and Snapshot documents as the CosmosDB specific _ts field is already populated on each document which contains the timestamp of the document’s last modification. It is not necessary to update existing Journal documents.

This script should not be run multiple times with different input parameters. An exception will be thrown by the Mongo shell if trying to create an identical index key with a different expireAfterSeconds value
MongoDB purging index creation
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
// This script adds ttl indexes that are needed for purging of the journal and snapshot collections for CosmosDB.
//
// This script does the following...
//   1. Creates a ttl index for both the journal and snapshots collections for the Cosmos specific `_ts` field
//
// Notes:
//   This script should not be run multiple times with different input parameters.
//   An exception will be thrown if trying to create an identical index key with a different expireAfterSeconds value
//
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////

// The dateTime at which Cosmos will delete the journal documents from the database is determined by the value of the collection `_ts` index and when the snapshot document was last updated (which is managed by Cosmos)
//      e.g. if the document was last updated at 2024-08-13T15:24:10.651Z, journalTtlInSeconds = 31536000 (1 year)
//          Cosmos will expire the document after 2025-08-13T15:24:10.651Z
// The dateTime at which Cosmos will delete the snapshot documents from the database is determined by the value of the collection `_ts` index and when the snapshot document was last updated (which is managed by Cosmos)
//      e.g. if the document was last updated at 2024-08-13T15:24:10.651Z, snapshotTtlInSeconds = 31536000 (1 year)
//          Cosmos will expire the document after 2025-08-13T15:24:10.651Z

// IMPORTANT - Client must override the following two values before running this script to determine when these documents will be removed by the database
// e.g. (1 year in seconds = 31536000)
const journalTtlInSeconds = null;
// e.g. (1 year in seconds = 31536000)
const snapshotTtlInSeconds = null;

const journal = db.getCollection("journal");
const snapshots = db.getCollection("snapshots");

function cosmosPurgingIndexCreation(journalTtl, snapshotTtl) {
    if (journalTtl == null || snapshotTtl == null) {
        throw new Error("Encountered a null parameter. Parameter values: (journalTtlInSeconds:" + journalTtl + ", snapshotTtlInSeconds: " + snapshotTtl + ")");
    }

    // Step 1 - Create ttl index for journal and snapshots collections
    print("1 - Create ttl index _ts on journal collection with expireAfterSeconds: " + journalTtl);
    journal.createIndex( { "_ts": 1 }, { expireAfterSeconds: journalTtl } );
    print("2 - Create ttl index _ts on snapshot collection with expireAfterSeconds: " + snapshotTtl);
    snapshots.createIndex( { "_ts": 1 }, { expireAfterSeconds: snapshotTtl } );
}

cosmosPurgingIndexCreation(journalTtlInSeconds, snapshotTtlInSeconds);