Akka Persistence Plugin for MongoDB
Akka Persistence comes with three plugins developed by Lightbend to provide a storage backend for persistent actors:
-
Apache Cassandra
-
JDBC (for RDBMSes)
-
Couchbase
Other storage backends are available as community plugins which can be found here.
This is a plugin which implements MongoDB as a storage backend for Akka Persistence. Storage backends typically need to implement the following three pieces of functionality:
-
Write journal: Persisting events by an actor
-
Read journal: Providing a CQRS read-side view of a persistent actor
-
Snapshot store: Creating and retrieving an aggregate of all events up to a certain point for an actor
Using Akka Persistence Plugin for MongoDB
Here’s how to get started using MongoDB as a storage backend for Akka Persistence using our plugin:
1. Declare a Dependency
It’s available under:
<dependency>
<groupId>com.iconsolutions.ipf.core.persistence</groupId>
<artifactId>akka-persistence-mongodb-icon-plugin</artifactId>
</dependency>
2. Set Akka Persistence Plugin for MongoDB as the Provider
As stated earlier the Akka Persistence Plugin for MongoDB implements four pieces of functionality:
-
write journal
-
read journal
-
snapshot store
-
durable state
To configure Akka Persistence Plugin for MongoDB as the provider for write, snapshot and durable state, configure the following:
akka {
persistence.journal.plugin = "iconsolutions.akka.persistence.mongodb"
persistence.snapshot-store.plugin = "iconsolutions.akka.persistence.mongodb.snapshot"
persistence.state.plugin = "iconsolutions.akka.persistence.mongodb.durable-state"
}
3. Configure MongoDB
The plugin uses a single MongoClient to connect to MongoDB.
Specify the connection settings using Hocon.
The configuration properties with their defaults are shown below:
| Config key | Description | Default value |
|---|---|---|
|
The MongoDB URI to use to connect to the database. Set the value of this field if not using the plugin from within IPF or if not intending to use the same database as IPF. Honours the global |
|
|
The name of the collection that will hold the domain events. |
|
|
The name of the collection that will hold the state snapshots. |
|
|
The name of the collection that will hold the domain events. |
|
|
Whether to bypass the MongoDB schema validation rules. |
|
|
The plugin supports two flavours — MongoDB ( Honours the global |
|
|
Used to provide a read concern to use when reading from the journal and snapshot collections. See the official docs for more information. |
|
|
Should Disable this if the database user is not granted the Honours the global IPF |
|
|
The commit quorum to use when creating the indexes for Akka Persistence MongoDB. Honours the global IPF |
|
|
Whether to enable SSL support. Setting this to Honours the global |
|
|
Type of the key store. Honours the global |
|
|
Path to the key store that holds the SSL certificate (typically a jks file). Honours the global |
|
|
Password used to access the key store. Honours the global |
|
|
Password used to access the key in the key store. Honours the global |
|
|
Type of the trust store. Honours the global |
|
|
Path to the trust store that holds the SSL certificate. Honours the global |
|
|
Password used to access the trust store. Honours the global |
|
|
The max number of times a failed operation will be attempted. Honours the global |
|
|
A list of MongoDB error codes which should be retried. Honours the global |
|
|
The delay to use between each of the attempts if the error itself does not indicate a backoff duration. Honours the global |
|
See Connection String Options for all options that can be specified using the MongoDB Connection String.
4. Enabled TTL Purging (Optional)
If required, Time-to-live (TTL) purging can be configured for your database. See the Journal and Snapshot Purging documentation for more details.
Using the Read Journal
Use the read journal to subscribe to the domain events.
Usually used to create a CQRS "read-side view" based on the events persisted so far by a persistent actor.
The configuration properties with their defaults are shown below:
| Config key | Description | Default value |
|---|---|---|
|
Whether to use the change stream (MongoDB CDC) implementation of the read journal instead of the periodic query based one. |
|
|
The look-back period for fetching events, applicable only to the query-based read journal. Only events within the (largest_seen_object_id, current_object_id - time-drift) range will be returned by the event stream query. |
|
|
How much data should MongoDB return in a single batch.
Larger values reduce the number of |
|
The important setting here is allowed-time-drift, this duration protects the read side from pulling events created in the last X seconds.
It is important when using multiple nodes as clock times can drift, which can effect the offset (time epoch in ObjectId) as a higher epoch time on node A will cause some node B events to be skipped.
This time simply ensures all events are in the DB for the configured duration before we try taking them, avoiding a race condition.
Also ensure that the configured value is larger than the max observed akka_event_persistence_time_ns metric as a lower value may result in events being missed.
If use-change-streams is set to true, the plugin will switch from a polling approach - where queries are repeatedly executed to fetch the latest events - to a change streams-based approach, where the journal collection is watched for inserts, and events arrive via a tailable cursor.
| The change-stream approach is experimental, and if using it, switching between change-streams and query-based should be avoided for now. |
The change-streams approach can be less intensive on the mongodb server, and achieves better throughput, but duplicates are more likely. If the plugin is unable to keep up with the rate at which events are produced, and the cursor overwrites an event that hasn’t been handled yet, it will fall back to querying to catch up, before switching to the change-stream again.
Like other Akka Persistence plugin, this plugin provides a Scala and Java DSL variant for interacting with the read side. You can retrieve them like this:
// Java DSL
IpfMongoReadJournal javaDslReadJournal = PersistenceQuery
.get(actorSystem)
.getReadJournalFor(IpfMongoReadJournal.class, IpfMongoReadJournal.identifier());
// Scala DSL (but you will probably use this from Scala not Java!)
ReadJournal scalaDslReadJournal = PersistenceQuery
.get(actorSystem)
.readJournalFor(IpfMongoReadJournal.identifier());
These will return a ReadJournal (Java API, Scala API) which you can use to run three types of queries:
-
eventsByTag: Returns aSource<EventEnvelope, NotUsed>of all events by tag, given an offset. If no offset is available, useOffset.noOffset(), which will stream all events for a tag from the beginning. -
currentEventsByTag: Replica of the above -
currentEventsByPersistenceId: Returns aSource<EventEnvelope, NotUsed>of
For the two eventsByTag operations, the plugin uses MongoDB Change Streams to stream updates by tag and offset.
Tips for Using the Read Journal
Here are some tips that may be useful when writing a read journal using the Akka Persistence Plugin for MongoDB
-
It is the consumer’s responsibility to store the offset that was last provided by the plugin.
-
Wrap the
eventsByTagoperations in a RestartSource so that when/if the stream crashes due to some error, it can automatically restart from the last offset that was persisted, instead of requiring a restart of the application -
It is recommended to batch offset saves to not affect performance
-
It is of course possible to persist the offset that your application last saw into another MongoDB collection
-
If the consumer application crashes after some events have been processed but before the offset was saved, the plugin will send duplicate events between the last saved offset and the crash. The downstream application should be able to handle such duplicates
Turning MongoDB Document Validation Off to Ensure High Performance
By default, MongoDB enables strict schema validation for all inserted or updated documents.
Even though the collection used for the event journal doesn’t require (or include) any schema validations, it is advisable to explicitly turn the validation off, which can be accomplished by specifying a validationLevel: 'off' option when creating the collection, e.g.
db.createCollection("journal",
{
validationLevel: "off"
})
Database Mode
The plugin is compatible with other databases that implement the MongoDB wire protocol, such as Azure CosmosDB by Microsoft, or AWS DocumentDB.
Some of these databases have limitations on the type of indexes that can be created, namely with whether the database supports creation of compound fields that feature a nested field.
The plugin creates such an index for the eventsByTag query as part of Persistence Query.
If using the plugin with a MongoDB-compatible database (not MongoDB itself), and the target database doesn’t support nested fields as part of compound indexes, set the iconsolutions.akka.persistence.mongodb.database-mode
configuration key to something other than mongo.
This will create a slightly less efficient index but that is compatible with such third-party databases.
Read Concern
The journal and snapshot collections may need a more reliable read concern than what is used as the database default, in which case the readConcernLevel connection string option should be used to set it.
See the official causal consistency docs for details on why this setting matters, and this link for more details on each of the available read concern options.
For how the read and write concern settings relate to CosmosDB’s MongoDB API, please refer to the official Azure docs.
Journal and Snapshot Purging
Database purging can be enabled to help ensure consistent database performance and avoid the risk that a database can become unavailable due to filling up all available storage. For the journal and snapshot collections, purging can be managed using Time-to-live (TTL) functionality. TTL functionality allows the database to automatically expire data from the journal and snapshots collections.
| When building a domain using IPF Flo-Lang, the Akka EventSourcedBehaviour commands the Akka Persistence Plugin to create snapshots and delete journal events. There are a number of configurable options to determine when snapshots should be created and if events should be deleted, more information about this configuration can be found in the Flo-Lang Purging documentation. This should be configured alongside the below steps to successfully Purge data. |
To enable TTL purging, steps need to be taken to configure the utilised database and, if using CosmosDB, configure the Akka Persistence MongoDB Plugin.
MongoDB setup
When using MongoDB, the TTL index is a special single-field index configured for a BSON Date field. For more information, see the official MongoDB TTL documentation.
Journal collection Index
For the journal collection, a TTL index should be created for the deletedAt field.
An example MongoDB shell command to create this index is:
db.getCollection("journal").createIndex( { "deletedAt": 1 }, {
expireAfterSeconds: 31536000 } )
This command means that journal entries are eligible for removal from the database 31536000 seconds (365 days) after the date value of the deletedAt field.
The plugin Write Journal is responsible for setting the deletedAt field when marking Journal Events for deletion.
Snapshots Collection Index
For the snapshot collection, a TTL index should be created for the 'insertedAt' field. An example MongoDB shell command to create this index is:
db.getCollection("snapshots").createIndex( { "insertedAt": 1 }, {
expireAfterSeconds: 31536000 } )
This command means that snapshot entries are eligible for removal from the database 31536000 seconds (365 days) after the date value of the insertedAt field
Migrating Existing Data
Existing documents in the journal and snapshots collections do not contain a BSON Date field, so by default the collection indexes will have no effect on existing data.
It is recommended to update existing data within these collections to set a deletedAt and insertedAt value for Journal Events and Snapshots, respectively.
To safeguard against the unintentional purging of non-terminal data (as you may not know if existing data has reached a terminal state or not), you can update the respective TTL fields to have a BSON Date in the future.
For example, if the configured index expireAfterSeconds evaluates to 20 days, and you set a Journal Event’s deletedAt field to 10 days in the future, the document will actually be purged 30 days after the update is made.
See the Purging migration documentation for more information about setting up and existing IPF system for Journal and Snapshot purging.
Things to consider
MongoDB TTL functionality requires a BSON date field to be set, and so if that field does not exist on the document, it will not be purged. Therefore, if a Journal Event is not marked to be deleted, it will not be purged by the MongoDB TTL index.
As specified in the MongoDB TTL documentation, deleting a large number of documents at once creates a large workload and might cause performance issues. This is mainly something to consider for migrating existing data, and it’s recommended that you do not try and purge all existing data at once. We instead advise that purging of existing data is done during non-business hours or in batches.
For example, if the configured index expireAfterSeconds evaluates to 20 days, you may want to update all existing Journal Events to have a deletedAt value of 2024-07-17T03:00:00.000Z.
In this scenario, the document would be scheduled to be purged at 3am on 27/07/2024.
CosmosDB setup
When using TTL indexes with CosmosDB, configuration needs to be enabled for the Akka Persistence Plugin and for each CosmosDB collection.
There are two supported types of CosmosDB TTL indexes: setting a default TTL value on the whole collection, and setting individual TTL values for each document. For more information, see the official CosmosDB TTL documentation.
The collection level index is set on the CosmosDB specific _ts field.
Once the index is created, the database will automatically delete any documents in that collection that have not been modified for a configured amount of seconds.
The document level TTL is not configured as an index, but is a value that exists on each document.
The document(s) must contain a root-level property ttl.
The ttl value must be an int32 (or int64 that fits an int32), and a collection level TTL index described above must have been created for that collection.
TTL values set on a document will override the collection’s TTL value.
Journal collection Index
For the journal collection, a collection level TTL index should be created for the _ts field.
An example MongoDB shell command to create this index is:
db.getCollection("journal").createIndex( { "_ts": 1 }, {
expireAfterSeconds: 31536000 } )
This commands means that journal entries are eligible for removal from the database 31536000 seconds (365 days) after they have last been modified.
In addition to this, the journal collection should also utilise document level TTL properties.
When the Akka Persistence plugin is configured correctly, the Journal Entry ttl property will be set upon deletion through the Akka Persistence Plugin Write Journal.
Akka Persistence Plugin Configuration
Additional configuration needs to be set for the Akka Persistence Plugin to enable a document level TTL for Journal Events. The following Duration based HOCON should be configured:
iconsolutions.akka.persistence.mongodb.write-journal.cosmos-ttl = 365d
This enables the Plugin to set the ttl property for Journal Events when they are deleted by the Write Journal.
Using the above example value, Journal Events are updated to set the ttl property to 31536000 (365 days in seconds).
This enables the document level TTL property for CosmosDB that will override the collection level TTL index.
If the iconsolutions.akka.persistence.mongodb.write-journal.cosmos-ttl configuration is not explicitly overridden, then the document level ttl property will not be set upon Journal Event deletion.
If you want to utilise CosmosDB purging, it is recommended that you override this configuration.
|
Snapshots Collection Index
For the snapshot collection, a collection level TTL index should be created for the _ts field.
An example MongoDB shell command to create this index is:
db.getCollection("snapshots").createIndex( { "_ts": 1 }, {
expireAfterSeconds: 31536000 } )
This command means that snapshot entries are eligible for removal from the database 31536000 seconds (365 days) after they have last been modified.
Migrating Existing Data
By default, CosmosDB will automatically populate and update the _ts field for a given document, maintaining the timestamp of the document’s last modification.
Creating a collection level index for both the journal and snapshots collection will have the same effect as outlined above.
You may want to update existing Journal Events to have a document level TTL value. This will override the collection TTL index value and will need to be set manually. See the official CosmosDB documentation for more information about how to set a document level TTL value.
As an example, you might want to set the journal collection level index to 365 days, but you want all existing Journal Events to be purged in 30 days time.
So you would update all existing Journal Events to set the ttl field to 2592000 (30 days in seconds).
See the Purging migration documentation for more information about setting up and existing IPF system for Journal and Snapshot purging.
Things to consider
Unfortunately, due to CosmosDB requiring a collection level TTL index to be configured, there is a chance that some Journal Events could be purged before a Snapshot is created for the given Persistence ID. Depending on your configured TTL value, this is unlikely to happen, but still a possibility to be aware of.
In CosmosDB, deletion of expired items is a background task that consumes left-over Request Units, if there are not enough RUs available, the data deletion is delayed. Therefore, you do not need to take into consideration the same performance impact of deleting many documents at once as you might need to when using MongoDB TTL based purging