Akka Persistence Plugin for MongoDB
Akka Persistence comes with three plugins developed by Lightbend to provide a storage backend for persistent actors:
-
Apache Cassandra
-
JDBC (for RDBMSes)
-
Couchbase
Other storage backends are available as community plugins which can be found here.
This is a plugin which implements MongoDB as a storage backend for Akka Persistence. Storage backends typically need to implement the following three pieces of functionality:
-
Write journal: Persisting events by an actor
-
Read journal: Providing a CQRS read-side view of a persistent actor
-
Snapshot store: Creating and retrieving an aggregate of all events up to a certain point for an actor
Using Akka Persistence Plugin for MongoDB
Here’s how to get started using MongoDB as a storage backend for Akka Persistence using our plugin:
1. Declare a Dependency
It’s available under:
<dependency>
<groupId>com.iconsolutions.ipf.core.persistence</groupId>
<artifactId>akka-persistence-mongodb-icon-plugin</artifactId>
</dependency>
2. Set Akka Persistence Plugin for MongoDB as the Provider
As stated earlier the Akka Persistence Plugin for MongoDB implements four pieces of functionality:
-
write journal
-
read journal
-
snapshot store
-
durable state
To configure Akka Persistence Plugin for MongoDB as the provider for write, snapshot and durable state, configure the following:
akka {
persistence.journal.plugin = "iconsolutions.akka.persistence.mongodb"
persistence.snapshot-store.plugin = "iconsolutions.akka.persistence.mongodb.snapshot"
persistence.state.plugin = "iconsolutions.akka.persistence.mongodb.durable-state"
}
3. Configure MongoDB
The plugin uses a single MongoClient to connect to MongoDB.
You can specify the connection settings using HOCON.
The configuration properties with their defaults are shown below:
| Config key | Description | Default value |
|---|---|---|
|
The MongoDB URI to use to connect to the database. Set the value of this field if not using the plugin from within IPF or if not intending to use the same database as IPF. Honours the global |
|
|
The name of the collection that will hold the domain events. |
|
|
The name of the collection that will hold the snapshots of actor state. |
|
|
The name of the collection that will hold the actor state. |
|
|
Whether to bypass the MongoDB schema validation rules. |
|
|
The plugin supports two flavours — MongoDB ( Honours the global |
|
|
Should Disable this if the database user is not granted the Honours the global IPF |
|
|
The commit quorum to use when creating the indexes for Akka Persistence MongoDB. Honours the global IPF |
|
|
Whether to enable SSL support. Setting this to Honours the global |
|
|
Type of the key store. Honours the global |
|
|
Path to the key store that holds the SSL certificate (typically a jks file). Honours the global |
|
|
Password used to access the key store. Honours the global |
|
|
Password used to access the key in the key store. Honours the global |
|
|
Type of the trust store. Honours the global |
|
|
Path to the trust store that holds the SSL certificate. Honours the global |
|
|
Password used to access the trust store. Honours the global |
|
|
The max number of times a failed operation will be attempted (including the initial attempt). Honours the global |
|
|
A list of MongoDB error codes which should be retried. Honours the global |
|
|
The delay to use between each of the attempts if the error itself does not indicate a backoff duration. Honours the global |
|
See Connection String Options for all options that can be specified using the MongoDB Connection String.
4. Enabled TTL Purging (Optional)
If required, Time-to-live (TTL) purging can be configured for your database. See the Journal and Snapshot Purging documentation for more details.
Using the Read Journal
Use the read journal to subscribe to the domain events.
Usually used to create a CQRS "read-side view" based on the events persisted so far by a persistent actor.
The configuration properties with their defaults are shown below:
| Config key | Description | Default value |
|---|---|---|
|
Determines whether the Change Streams (MongoDB CDC) implementation of the read journal is used instead of the periodic query-based one. |
|
|
A buffer period to help prevent data loss caused by timing delays or race conditions when fetching events. Only events within the [largest_seen_object_id, (current_object_id - allowed_time_drift)] range will be returned by the event stream query. |
|
|
Defines how much data MongoDB should return in a single batch.
Larger values reduce the number of |
|
The allowed-time-drift setting shifts the event read window backwards (by the provided duration) to accommodate differences in node system clocks and possible replication delays.
This is essential in multi-node environments, since MongoDB’s ObjectId uses epoch time as an offset, and a slow or out-of-sync node could otherwise result in events not being consumed if they were persisted near the previous query boundary.
Setting allowed-time-drift to a value greater than the maximum observed akka_event_persistence_time_ns metric ensures that any events which are delayed in being fully persisted (due to processing or replication lag) are still included in the next read window. This helps prevent race conditions and ensures no events are missed, even if there’s a delay in their availability.
If use-change-streams is set to true, the plugin will switch from a polling approach, where queries are repeatedly executed to fetch the latest events to a Change Streams based approach, where data changes in the journal collection are subscribed to and change events are received in real time via a change stream cursor.
Additional configuration properties specific to RU-based CosmosDB deployments are available when use-change-streams is enabled. These properties and their defaults are shown below:
| Config key | Description | Default value |
|---|---|---|
|
The initial backoff duration before restarting the |
|
|
The maximum backoff duration before restarting the |
|
|
A random factor that adds jitter to the backoff duration. A zero random factor disables jitter. |
|
| The Change Streams approach is experimental. If used, switching between Change Streams and query-based implementations is not recommended. |
The Change Streams approach can be less intensive on the MongoDB server and achieve better throughput, but duplicates are more likely. If the plugin is unable to keep up with the rate at which events are produced, the cursor may overwrite an event that hasn’t been handled yet. When this happens, the plugin will fall back to querying to catch up. Once caught up, it will then switch back to using Change Streams.
Like other Akka Persistence plugins, this plugin provides a Scala and Java DSL variant for interacting with the read side. You can retrieve them like this:
// Java DSL
IpfMongoReadJournal javaDslReadJournal = PersistenceQuery
.get(actorSystem)
.getReadJournalFor(IpfMongoReadJournal.class, IpfMongoReadJournal.identifier());
// Scala DSL (but you will probably use this from Scala not Java!)
ReadJournal scalaDslReadJournal = PersistenceQuery
.get(actorSystem)
.readJournalFor(IpfMongoReadJournal.identifier());
These will return a ReadJournal (Java API, Scala API) which you can use to run three types of queries:
-
eventsByTag: Returns a stream of all events by tag, given an offset. Ifuse-change-streamsis enabled, continuously streams events in real-time using Change Streams; otherwise queries the journal for events from the offset up to the current timestamp and completes. UseOffset.noOffset()to stream from the beginning. -
currentEventsByTag: Always queries the journal (regardless ofuse-change-streamssetting) and completes once all events up to the current timestamp have been consumed. -
currentEventsByPersistenceId: Returns a stream of all events by persistenceId, given the provided sequence number range (inclusive).
All the above queries have a return type of: Source<EventEnvelope, NotUsed>
|
Tips for Using the Read Journal
Here are some tips that may be useful when writing a read journal using the Akka Persistence Plugin for MongoDB:
-
It is the consumer’s responsibility to store the offset that was last provided by the plugin.
-
Wrap the
eventsByTagoperations in a RestartSource so that when/if the stream crashes due to some error, it can automatically restart from the last offset that was persisted, instead of requiring a restart of the application -
Batching offset saves is recommended to reduce performance overhead
-
It is, of course, possible to persist the offset that your application last saw into another MongoDB collection
-
If the consumer application crashes after some events have been processed but before the offset was saved, the plugin will send duplicate events between the last saved offset and the crash. The downstream application should be able to handle such duplicates
Turning MongoDB Document Validation Off to Ensure High Performance
By default, MongoDB enables strict schema validation for all inserted or updated documents.
Even though the collection used for the event journal doesn’t require (or include) any schema validations, it is advisable to explicitly turn the validation off, which can be accomplished by specifying a validationLevel: 'off' option when creating the collection, e.g.
db.createCollection("journal",
{
validationLevel: "off"
})
Database Mode
The plugin is compatible with other databases that implement the MongoDB wire protocol, such as Azure CosmosDB by Microsoft, or AWS DocumentDB.
Some of these databases have limitations on the type of indexes that can be created, namely with whether the database supports creation of compound fields that feature a nested field.
The plugin creates such an index for the eventsByTag query as part of Persistence Query.
If using the plugin with a MongoDB-compatible database (not MongoDB itself), and the target database doesn’t support nested fields as part of compound indexes, set the iconsolutions.akka.persistence.mongodb.database-mode
configuration key to something other than mongo.
This will create a slightly less efficient index but that is compatible with such third-party databases.
Read Concern
The journal and snapshot collections may need a more reliable read concern than what is used as the database default, in which case the readConcernLevel connection string option should be used to set it.
See the official causal consistency docs for details on why this setting matters, and this link for more details on each of the available read concern options.
For how the read and write concern settings relate to CosmosDB’s MongoDB API, please refer to the official Azure docs.
Journal and Snapshot Purging
Database purging can be enabled to help ensure consistent database performance and avoid the risk that a database can become unavailable due to filling up all available storage. For the journal and snapshot collections, purging can be managed using Time-to-live (TTL) functionality. TTL functionality allows the database to automatically expire data from the journal and snapshots collections.
| When building a domain using IPF Flo-Lang, the Akka EventSourcedBehaviour commands the Akka Persistence Plugin to create snapshots and delete journal events. There are a number of configurable options to determine when snapshots should be created and if events should be deleted, more information about this configuration can be found in the Flo-Lang Purging documentation. This should be configured alongside the below steps to successfully Purge data. |
To enable TTL purging, steps need to be taken to configure the utilised database and, if using CosmosDB, configure the Akka Persistence MongoDB Plugin.
MongoDB setup
When using MongoDB, the TTL index is a special single-field index configured for a BSON Date field. For more information, see the official MongoDB TTL documentation.
Journal collection Index
For the journal collection, a TTL index should be created for the deletedAt field.
An example MongoDB shell command to create this index is:
db.getCollection("journal").createIndex( { "deletedAt": 1 }, {
expireAfterSeconds: 31536000 } )
This command means that journal entries are eligible for removal from the database 31536000 seconds (365 days) after the date value of the deletedAt field.
The plugin Write Journal is responsible for setting the deletedAt field when marking Journal Events for deletion.
Snapshots Collection Index
For the snapshot collection, a TTL index should be created for the 'insertedAt' field. An example MongoDB shell command to create this index is:
db.getCollection("snapshots").createIndex( { "insertedAt": 1 }, {
expireAfterSeconds: 31536000 } )
This command means that snapshot entries are eligible for removal from the database 31536000 seconds (365 days) after the date value of the insertedAt field
Migrating Existing Data
Existing documents in the journal and snapshots collections do not contain a BSON Date field, so by default the collection indexes will have no effect on existing data.
It is recommended to update existing data within these collections to set a deletedAt and insertedAt value for Journal Events and Snapshots, respectively.
To safeguard against the unintentional purging of non-terminal data (as you may not know if existing data has reached a terminal state or not), you can update the respective TTL fields to have a BSON Date in the future.
For example, if the configured index expireAfterSeconds evaluates to 20 days, and you set a Journal Event’s deletedAt field to 10 days in the future, the document will actually be purged 30 days after the update is made.
See the Purging migration documentation for more information about setting up and existing IPF system for Journal and Snapshot purging.
Things to consider
MongoDB TTL functionality requires a BSON date field to be set, and so if that field does not exist on the document, it will not be purged. Therefore, if a Journal Event is not marked to be deleted, it will not be purged by the MongoDB TTL index.
As specified in the MongoDB TTL documentation, deleting a large number of documents at once creates a large workload and might cause performance issues. This is mainly something to consider for migrating existing data, and it’s recommended that you do not try and purge all existing data at once. We instead advise that purging of existing data is done during non-business hours or in batches.
For example, if the configured index expireAfterSeconds evaluates to 20 days, you may want to update all existing Journal Events to have a deletedAt value of 2024-07-17T03:00:00.000Z.
In this scenario, the document would be scheduled to be purged at 3am on 27/07/2024.
CosmosDB setup
When using TTL indexes with CosmosDB, configuration needs to be enabled for the Akka Persistence Plugin and for each CosmosDB collection.
There are two supported types of CosmosDB TTL indexes: setting a default TTL value on the whole collection, and setting individual TTL values for each document. For more information, see the official CosmosDB TTL documentation.
The collection level index is set on the CosmosDB specific _ts field.
Once the index is created, the database will automatically delete any documents in that collection that have not been modified for a configured amount of seconds.
The document level TTL is not configured as an index, but is a value that exists on each document.
The document(s) must contain a root-level property ttl.
The ttl value must be an int32 (or int64 that fits an int32), and a collection level TTL index described above must have been created for that collection.
TTL values set on a document will override the collection’s TTL value.
Journal collection Index
For the journal collection, a collection level TTL index should be created for the _ts field.
An example MongoDB shell command to create this index is:
db.getCollection("journal").createIndex( { "_ts": 1 }, {
expireAfterSeconds: 31536000 } )
This commands means that journal entries are eligible for removal from the database 31536000 seconds (365 days) after they have last been modified.
In addition to this, the journal collection should also utilise document level TTL properties.
When the Akka Persistence plugin is configured correctly, the Journal Entry ttl property will be set upon deletion through the Akka Persistence Plugin Write Journal.
Akka Persistence Plugin Configuration
Additional configuration needs to be set for the Akka Persistence Plugin to enable a document level TTL for Journal Events. The following Duration based HOCON should be configured:
iconsolutions.akka.persistence.mongodb.write-journal.cosmos-ttl = 365d
This enables the Plugin to set the ttl property for Journal Events when they are deleted by the Write Journal.
Using the above example value, Journal Events are updated to set the ttl property to 31536000 (365 days in seconds).
This enables the document level TTL property for CosmosDB that will override the collection level TTL index.
If the iconsolutions.akka.persistence.mongodb.write-journal.cosmos-ttl configuration is not explicitly overridden, then the document level ttl property will not be set upon Journal Event deletion.
If you want to utilise CosmosDB purging, it is recommended that you override this configuration.
|
Snapshots Collection Index
For the snapshot collection, a collection level TTL index should be created for the _ts field.
An example MongoDB shell command to create this index is:
db.getCollection("snapshots").createIndex( { "_ts": 1 }, {
expireAfterSeconds: 31536000 } )
This command means that snapshot entries are eligible for removal from the database 31536000 seconds (365 days) after they have last been modified.
Migrating Existing Data
By default, CosmosDB will automatically populate and update the _ts field for a given document, maintaining the timestamp of the document’s last modification.
Creating a collection level index for both the journal and snapshots collection will have the same effect as outlined above.
You may want to update existing Journal Events to have a document level TTL value. This will override the collection TTL index value and will need to be set manually. See the official CosmosDB documentation for more information about how to set a document level TTL value.
As an example, you might want to set the journal collection level index to 365 days, but you want all existing Journal Events to be purged in 30 days time.
So you would update all existing Journal Events to set the ttl field to 2592000 (30 days in seconds).
See the Purging migration documentation for more information about setting up and existing IPF system for Journal and Snapshot purging.
Things to consider
Unfortunately, due to CosmosDB requiring a collection level TTL index to be configured, there is a chance that some Journal Events could be purged before a Snapshot is created for the given Persistence ID. Depending on your configured TTL value, this is unlikely to happen, but still a possibility to be aware of.
In CosmosDB, deletion of expired items is a background task that consumes left-over Request Units, if there are not enough RUs available, the data deletion is delayed. Therefore, you do not need to take into consideration the same performance impact of deleting many documents at once as you might need to when using MongoDB TTL based purging