Change Streams (original) (raw)

Change streams allow applications to access real-time data changes without the prior complexity and risk of manually tailing the oplog. Applications can use change streams to subscribe to all data changes on a single collection, a database, or an entire deployment, and immediately react to them. Because change streams use the aggregation framework, applications can also filter for specific changes or transform the notifications at will.

Starting in MongoDB 5.1, change streams are optimized, providing more efficient resource utilization and faster execution of some aggregation pipeline stages.

Change streams are available for replica sets andsharded clusters:

Change streams are included in Stable API V1. However, the showExpandedEventsoption is not included in Stable API V1.

Connections for a change stream can either use DNS seed lists with the +srv connection option or by listing the servers individually in the connection string.

If the driver loses the connection to a change stream or the connection goes down, it attempts to reestablish a connection to the change stream through another node in the cluster that has a matchingread preference. If the driver cannot find a node with the correct read preference, it throws an exception.

For more information, see Connection String URI Format.

You can open change streams against:

Target Description
A collection You can open a change stream cursor for a single collection (except system collections, or any collections in theadmin, local, and config databases).The examples on this page use the MongoDB drivers to open and work with a change stream cursor for a single collection. See also the mongosh methoddb.collection.watch().
A database You can open a change stream cursor for a single database (excludingadmin, local, and config database) to watch for changes to all its non-system collections.For the MongoDB driver method, refer to your driver documentation. See also the mongosh methoddb.watch().
A deployment You can open a change stream cursor for a deployment (either a replica set or a sharded cluster) to watch for changes to all non-system collections across all databases except for admin, local, andconfig.For the MongoDB driver method, refer to your driver documentation. See also the mongosh methodMongo.watch().

Note

Change Stream Examples

The examples on this page use the MongoDB drivers to illustrate how to open a change stream cursor for a collection and work with the change stream cursor.

If the amount of active change streams opened against a database exceeds theconnection pool size, you may experience notification latency. Each change stream uses a connection and a getMoreoperation on the change stream for the period of time that it waits for the next event. To avoid any latency issues, you should ensure that the pool size is greater than the number of opened change streams. For details see the maxPoolSize setting.

When a change stream is opened on a sharded cluster:

For best performance, limit the use of $lookup queries in change streams.

To open a change stream:

The following example opens a change stream for a collection and iterates over the cursor to retrieve the change stream documents.[1]


➤ Use the Select your language drop-down menu in the upper-right to set the language of the examples on this page.


The C examples below assume that you have connected to a MongoDB replica set and have accessed a databasethat contains an inventory collection.


mongoc_collection_t *collection;

bson_t *pipeline = bson_new ();

bson_t opts = BSON_INITIALIZER;

mongoc_change_stream_t *stream;

const bson_t *change;

const bson_t *resume_token;

bson_error_t error;

collection = mongoc_database_get_collection (db, "inventory");

stream = mongoc_collection_watch (collection, pipeline, NULL /* opts */);

mongoc_change_stream_next (stream, &change);

if (mongoc_change_stream_error_document (stream, &error, NULL)) {

   MONGOC_ERROR ("%s\n", error.message);

}

mongoc_change_stream_destroy (stream);

The C# examples below assume that you have connected to a MongoDB replica set and have accessed a databasethat contains an inventory collection.


var cursor = inventory.Watch();

while (cursor.MoveNext() && cursor.Current.Count() == 0) { } // keep calling MoveNext until we've read the first batch

var next = cursor.Current.First();

cursor.Dispose();

The Kotlin examples below assume that you are connected to a MongoDB replica set and can access a database that contains the inventory collection. To learn more about completing these tasks, see theKotlin Coroutine Driver Databases and Collections guide.

The Node.js examples below assume that you have connected to a MongoDB replica set and have accessed a databasethat contains an inventory collection.

The following example uses stream to process the change events.


const collection = db.collection('inventory');

const changeStream = collection.watch();

changeStream

  .on('change', next => {

    // process next document

  })

  .once('error', () => {

    // handle error

  });

Alternatively, you can also use iterator to process the change events:


const collection = db.collection('inventory');

const changeStream = collection.watch();

const next = await changeStream.next();

ChangeStream extends EventEmitter.

To retrieve the data change event from the cursor, iterate the change stream cursor. For information on the change stream event, see Change Events.

The change stream cursor remains open until one of the following occurs:

Note

The lifecycle of an unclosed cursor is language-dependent.


➤ Use the Select your language drop-down menu in the upper-right to set the language of the examples on this page.


You can control change stream output by providing an array of one or more of the following pipeline stages when configuring the change stream:


pipeline = BCON_NEW ("pipeline",

                     "[",

                     "{",

                     "$match",

                     "{",

                     "fullDocument.username",

                     BCON_UTF8 ("alice"),

                     "}",

                     "}",

                     "{",

                     "$addFields",

                     "{",

                     "newField",

                     BCON_UTF8 ("this is an added field!"),

                     "}",

                     "}",

                     "]");

stream = mongoc_collection_watch (collection, pipeline, &opts);

mongoc_change_stream_next (stream, &change);

if (mongoc_change_stream_error_document (stream, &error, NULL)) {

   MONGOC_ERROR ("%s\n", error.message);

}

mongoc_change_stream_destroy (stream);

You can control change stream output by providing an array of one or more of the following pipeline stages when configuring the change stream:


var pipeline = new EmptyPipelineDefinition<ChangeStreamDocument<BsonDocument>>()

    .Match(change =>

        change.FullDocument["username"] == "alice" ||

        change.OperationType == ChangeStreamOperationType.Delete)

    .AppendStage<ChangeStreamDocument<BsonDocument>, ChangeStreamDocument<BsonDocument>, BsonDocument>(

        "{ $addFields : { newField : 'this is an added field!' } }");

var collection = database.GetCollection<BsonDocument>("inventory");

using (var cursor = collection.Watch(pipeline))

{

    while (cursor.MoveNext() && cursor.Current.Count() == 0) { } // keep calling MoveNext until we've read the first batch

    var next = cursor.Current.First();

}

You can control change stream output by providing an array of one or more of the following pipeline stages when configuring the change stream:


MongoClient mongoClient = MongoClients.create("mongodb://<username>:<password>@<host>:<port>");

// Select the MongoDB database and collection to open the change stream against

MongoDatabase db = mongoClient.getDatabase("myTargetDatabase");

MongoCollection<Document> collection = db.getCollection("myTargetCollection");

// Create $match pipeline stage.

List<Bson> pipeline = singletonList(Aggregates.match(Filters.or(

   Document.parse("{'fullDocument.username': 'alice'}"),

   Filters.in("operationType", asList("delete")))));

// Create the change stream cursor, passing the pipeline to the

// collection.watch() method

MongoCursor<Document> cursor = collection.watch(pipeline).iterator();

The pipeline list includes a single $match stage that filters for any operations that meet one or both of the following criteria:

Passing the pipeline to the watch() method directs the change stream to return notifications after passing them through the specified pipeline.

You can control change stream output by providing an array of one or more of the following pipeline stages when configuring the change stream:

The pipeline list includes a single $match stage that filters for any operations that meet one or both of the following criteria:

Passing the pipeline to the watch() method directs the change stream to return notifications after passing them through the specified pipeline.

You can control change stream output by providing an array of one or more of the following pipeline stages when configuring the change stream:

The following example uses stream to process the change events.


const pipeline = [

  { $match: { 'fullDocument.username': 'alice' } },

  { $addFields: { newField: 'this is an added field!' } }

];

const collection = db.collection('inventory');

const changeStream = collection.watch(pipeline);

changeStream

  .on('change', next => {

    // process next document

  })

  .once('error', error => {

    // handle error

  });

Alternatively, you can also use iterator to process the change events:


const changeStreamIterator = collection.watch(pipeline);

const next = await changeStreamIterator.next();

You can control change stream output by providing an array of one or more of the following pipeline stages when configuring the change stream:


$pipeline = [

    ['$match' => ['fullDocument.username' => 'alice']],

    ['$addFields' => ['newField' => 'this is an added field!']],

];
 <span class="katex"><span class="katex-mathml"><math xmlns="http://www.w3.org/1998/Math/MathML"><semantics><mrow><mi>c</mi><mi>h</mi><mi>a</mi><mi>n</mi><mi>g</mi><mi>e</mi><mi>S</mi><mi>t</mi><mi>r</mi><mi>e</mi><mi>a</mi><mi>m</mi><mo>=</mo></mrow><annotation encoding="application/x-tex">changeStream = </annotation></semantics></math></span><span class="katex-html" aria-hidden="true"><span class="base"><span class="strut" style="height:0.8889em;vertical-align:-0.1944em;"></span><span class="mord mathnormal">c</span><span class="mord mathnormal">han</span><span class="mord mathnormal" style="margin-right:0.03588em;">g</span><span class="mord mathnormal">e</span><span class="mord mathnormal">St</span><span class="mord mathnormal">re</span><span class="mord mathnormal">am</span><span class="mspace" style="margin-right:0.2778em;"></span><span class="mrel">=</span></span></span></span>db->inventory->watch($pipeline);

$changeStream->rewind();
 <span class="katex"><span class="katex-mathml"><math xmlns="http://www.w3.org/1998/Math/MathML"><semantics><mrow><mi>f</mi><mi>i</mi><mi>r</mi><mi>s</mi><mi>t</mi><mi>C</mi><mi>h</mi><mi>a</mi><mi>n</mi><mi>g</mi><mi>e</mi><mo>=</mo></mrow><annotation encoding="application/x-tex">firstChange = </annotation></semantics></math></span><span class="katex-html" aria-hidden="true"><span class="base"><span class="strut" style="height:0.8889em;vertical-align:-0.1944em;"></span><span class="mord mathnormal" style="margin-right:0.10764em;">f</span><span class="mord mathnormal">i</span><span class="mord mathnormal">rs</span><span class="mord mathnormal" style="margin-right:0.07153em;">tC</span><span class="mord mathnormal">han</span><span class="mord mathnormal" style="margin-right:0.03588em;">g</span><span class="mord mathnormal">e</span><span class="mspace" style="margin-right:0.2778em;"></span><span class="mrel">=</span></span></span></span>changeStream->current();

$changeStream->next();
 <span class="katex"><span class="katex-mathml"><math xmlns="http://www.w3.org/1998/Math/MathML"><semantics><mrow><mi>s</mi><mi>e</mi><mi>c</mi><mi>o</mi><mi>n</mi><mi>d</mi><mi>C</mi><mi>h</mi><mi>a</mi><mi>n</mi><mi>g</mi><mi>e</mi><mo>=</mo></mrow><annotation encoding="application/x-tex">secondChange = </annotation></semantics></math></span><span class="katex-html" aria-hidden="true"><span class="base"><span class="strut" style="height:0.8889em;vertical-align:-0.1944em;"></span><span class="mord mathnormal">seco</span><span class="mord mathnormal">n</span><span class="mord mathnormal">d</span><span class="mord mathnormal" style="margin-right:0.07153em;">C</span><span class="mord mathnormal">han</span><span class="mord mathnormal" style="margin-right:0.03588em;">g</span><span class="mord mathnormal">e</span><span class="mspace" style="margin-right:0.2778em;"></span><span class="mrel">=</span></span></span></span>changeStream->current();

Tip

The _id field of the change stream event document act as the resume token. Do not use the pipeline to modify or remove the change stream event's _id field.

Starting in MongoDB 4.2, change streams will throw an exception if the change stream aggregation pipeline modifies an event's _id field.

See Change Events for more information on the change stream response document format.

By default, change streams only return the delta of fields during the update operation. However, you can configure the change stream to return the most current majority-committed version of the updated document.


➤ Use the Select your language drop-down menu in the upper-right to set the language of the examples on this page.


To return the most current majority-committed version of the updated document, pass the "fullDocument" option with the "updateLookup" value to themongoc_collection_watch method.

In the example below, all update operations notifications include a fullDocument field that represents the _current_version of the document affected by the update operation.


BSON_APPEND_UTF8 (&opts, "fullDocument", "updateLookup");

stream = mongoc_collection_watch (collection, pipeline, &opts);

mongoc_change_stream_next (stream, &change);

if (mongoc_change_stream_error_document (stream, &error, NULL)) {

   MONGOC_ERROR ("%s\n", error.message);

}

mongoc_change_stream_destroy (stream);

To return the most current majority-committed version of the updated document, pass "FullDocument = ChangeStreamFullDocumentOption.UpdateLookup" to thedb.collection.watch() method.

In the example below, all update operations notifications include a FullDocument field that represents the _current_version of the document affected by the update operation.


var options = new ChangeStreamOptions { FullDocument = ChangeStreamFullDocumentOption.UpdateLookup };

var cursor = inventory.Watch(options);

while (cursor.MoveNext() && cursor.Current.Count() == 0) { } // keep calling MoveNext until we've read the first batch

var next = cursor.Current.First();

cursor.Dispose();

To return the most current majority-committed version of the updated document, SetFullDocument(options.UpdateLookup)change stream option.

To return the most current majority-committed version of the updated document, pass FullDocument.UPDATE_LOOKUP to thedb.collection.watch.fullDocument() method.

In the example below, all update operations notifications include a FullDocument field that represents the _current_version of the document affected by the update operation.


cursor = inventory.watch().fullDocument(FullDocument.UPDATE_LOOKUP).iterator();

next = cursor.next();

To return the most current majority-committed version of the updated document, pass FullDocument.UPDATE_LOOKUP to theChangeStreamFlow.fullDocument() method.

In the example below, all update operations notifications include a FullDocument field that represents the _current_version of the document affected by the update operation.

To return the most current majority-committed version of the updated document, pass full_document='updateLookup' to thedb.collection.watch() method.

In the example below, all update operations notifications include a `full_document field that represents the _current_version of the document affected by the update operation.


cursor = db.inventory.watch(full_document="updateLookup")

document = await cursor.next()

To return the most current majority-committed version of the updated document, pass { fullDocument: 'updateLookup' } to thedb.collection.watch() method.

In the example below, all update operations notifications include a fullDocument field that represents the _current_version of the document affected by the update operation.

The following example uses stream to process the change events.


const collection = db.collection('inventory');

const changeStream = collection.watch([], { fullDocument: 'updateLookup' });

changeStream

  .on('change', next => {

    // process next document

  })

  .once('error', error => {

    // handle error

  });

Alternatively, you can also use iterator to process the change events:


const changeStreamIterator = collection.watch([], { fullDocument: 'updateLookup' });

const next = await changeStreamIterator.next();

To return the most current majority-committed version of the updated document, pass"fullDocument' => \MongoDB\Operation\ChangeStreamCommand::FULL_DOCUMENT_UPDATE_LOOKUP"to the db.watch() method.

In the example below, all update operations notifications include a fullDocument field that represents the _current_version of the document affected by the update operation.

 <span class="katex"><span class="katex-mathml"><math xmlns="http://www.w3.org/1998/Math/MathML"><semantics><mrow><mi>c</mi><mi>h</mi><mi>a</mi><mi>n</mi><mi>g</mi><mi>e</mi><mi>S</mi><mi>t</mi><mi>r</mi><mi>e</mi><mi>a</mi><mi>m</mi><mo>=</mo></mrow><annotation encoding="application/x-tex">changeStream = </annotation></semantics></math></span><span class="katex-html" aria-hidden="true"><span class="base"><span class="strut" style="height:0.8889em;vertical-align:-0.1944em;"></span><span class="mord mathnormal">c</span><span class="mord mathnormal">han</span><span class="mord mathnormal" style="margin-right:0.03588em;">g</span><span class="mord mathnormal">e</span><span class="mord mathnormal">St</span><span class="mord mathnormal">re</span><span class="mord mathnormal">am</span><span class="mspace" style="margin-right:0.2778em;"></span><span class="mrel">=</span></span></span></span>db->inventory->watch([], ['fullDocument' => \MongoDB\Operation\Watch::FULL_DOCUMENT_UPDATE_LOOKUP]);

$changeStream->rewind();
 <span class="katex"><span class="katex-mathml"><math xmlns="http://www.w3.org/1998/Math/MathML"><semantics><mrow><mi>f</mi><mi>i</mi><mi>r</mi><mi>s</mi><mi>t</mi><mi>C</mi><mi>h</mi><mi>a</mi><mi>n</mi><mi>g</mi><mi>e</mi><mo>=</mo></mrow><annotation encoding="application/x-tex">firstChange = </annotation></semantics></math></span><span class="katex-html" aria-hidden="true"><span class="base"><span class="strut" style="height:0.8889em;vertical-align:-0.1944em;"></span><span class="mord mathnormal" style="margin-right:0.10764em;">f</span><span class="mord mathnormal">i</span><span class="mord mathnormal">rs</span><span class="mord mathnormal" style="margin-right:0.07153em;">tC</span><span class="mord mathnormal">han</span><span class="mord mathnormal" style="margin-right:0.03588em;">g</span><span class="mord mathnormal">e</span><span class="mspace" style="margin-right:0.2778em;"></span><span class="mrel">=</span></span></span></span>changeStream->current();

$changeStream->next();
 <span class="katex"><span class="katex-mathml"><math xmlns="http://www.w3.org/1998/Math/MathML"><semantics><mrow><mi>s</mi><mi>e</mi><mi>c</mi><mi>o</mi><mi>n</mi><mi>d</mi><mi>C</mi><mi>h</mi><mi>a</mi><mi>n</mi><mi>g</mi><mi>e</mi><mo>=</mo></mrow><annotation encoding="application/x-tex">secondChange = </annotation></semantics></math></span><span class="katex-html" aria-hidden="true"><span class="base"><span class="strut" style="height:0.8889em;vertical-align:-0.1944em;"></span><span class="mord mathnormal">seco</span><span class="mord mathnormal">n</span><span class="mord mathnormal">d</span><span class="mord mathnormal" style="margin-right:0.07153em;">C</span><span class="mord mathnormal">han</span><span class="mord mathnormal" style="margin-right:0.03588em;">g</span><span class="mord mathnormal">e</span><span class="mspace" style="margin-right:0.2778em;"></span><span class="mrel">=</span></span></span></span>changeStream->current();

To return the most current majority-committed version of the updated document, pass full_document='updateLookup' to thedb.collection.watch() method.

In the example below, all update operations notifications include a full_document field that represents the _current_version of the document affected by the update operation.


cursor = db.inventory.watch(full_document="updateLookup")

next(cursor)

To return the most current majority-committed version of the updated document, pass full_document: 'updateLookup' to thedb.watch() method.

In the example below, all update operations notifications include a full_document field that represents the _current_version of the document affected by the update operation.


cursor = inventory.watch([], full_document: 'updateLookup').to_enum

next_change = cursor.next

To return the most current majority-committed version of the updated document, pass options: ChangeStreamOptions(fullDocument: .updateLookup) to thewatch() method.

To return the most current majority-committed version of the updated document, pass options: ChangeStreamOptions(fullDocument: .updateLookup) to thewatch() method.

Note

If there are one or more majority-committed operations that modified the updated document after the update operation but before the lookup, the full document returned may differ significantly from the document at the time of the update operation.

However, the deltas included in the change stream document always correctly describe the watched collection changes that applied to that change stream event.

The fullDocument field for an update event may be missing if one of the following is true:

See Change Events for more information on the change stream response document format.

Change streams are resumable by specifying a resume token to eitherresumeAfter orstartAfter when opening the cursor.

You can resume a change stream after a specific event by passing a resume token to resumeAfter when opening the cursor.

See Resume Tokens for more information on the resume token.

Important

In the example below, the resumeAfter option is appended to the stream options to recreate the stream after it has been destroyed. Passing the _id to the change stream attempts to resume notifications starting after the operation specified.


stream = mongoc_collection_watch (collection, pipeline, NULL);

if (mongoc_change_stream_next (stream, &change)) {

   resume_token = mongoc_change_stream_get_resume_token (stream);

   BSON_APPEND_DOCUMENT (&opts, "resumeAfter", resume_token);

   mongoc_change_stream_destroy (stream);

   stream = mongoc_collection_watch (collection, pipeline, &opts);

   mongoc_change_stream_next (stream, &change);

   mongoc_change_stream_destroy (stream);

} else {

   if (mongoc_change_stream_error_document (stream, &error, NULL)) {

      MONGOC_ERROR ("%s\n", error.message);

   }

   mongoc_change_stream_destroy (stream);

}

In the example below, the resumeToken is retrieved from the last change stream document and passed to the Watch() method as an option. Passing the resumeTokento the Watch() method directs the change stream to attempt to resume notifications starting after the operation specified in the resume token.


  var resumeToken = previousCursor.GetResumeToken();

  var options = new ChangeStreamOptions { ResumeAfter = resumeToken };

  var cursor = inventory.Watch(options);

  cursor.MoveNext();

  var next = cursor.Current.First();

  cursor.Dispose();

You can use ChangeStreamOptions.SetResumeAfterto specify the resume token for the change stream. If the resumeAfter option is set, the change stream resumes notifications after the operation specified in the resume token. The SetResumeAfter takes a value that must resolve to a resume token, e.g.resumeToken in the example below.

You can use the resumeAfter() method to resume notifications after the operation specified in the resume token. The resumeAfter() method takes a value that must resolve to a resume token, e.g. resumeToken in the example below.


BsonDocument resumeToken = next.getResumeToken();

cursor = inventory.watch().resumeAfter(resumeToken).iterator();

next = cursor.next();

You can use the ChangeStreamFlow.resumeAfter()method to resume notifications after the operation specified in the resume token. The resumeAfter() method takes a value that must resolve to a resume token, such as the resumeToken variable in the example below.

You can use the resume_after modifier to resume notifications after the operation specified in the resume token. The resume_after modifier takes a value that must resolve to a resume token, e.g. resume_token in the example below.


resume_token = cursor.resume_token

cursor = db.inventory.watch(resume_after=resume_token)

document = await cursor.next()

You can use the resumeAfter option to resume notifications after the operation specified in the resume token. The resumeAfter option takes a value that must resolve to a resume token, e.g. resumeToken in the example below.


const collection = db.collection('inventory');

const changeStream = collection.watch();

let newChangeStream;

changeStream

  .once('change', next => {

    const resumeToken = changeStream.resumeToken;

    changeStream.close();

    newChangeStream = collection.watch([], { resumeAfter: resumeToken });

    newChangeStream

      .on('change', next => {

        processChange(next);

      })

      .once('error', error => {

        // handle error

      });

  })

  .once('error', error => {

    // handle error

  });

You can use the resumeAfter option to resume notifications after the operation specified in the resume token. The resumeAfter option takes a value that must resolve to a resume token, e.g. $resumeToken in the example below.

 <span class="katex"><span class="katex-mathml"><math xmlns="http://www.w3.org/1998/Math/MathML"><semantics><mrow><mi>r</mi><mi>e</mi><mi>s</mi><mi>u</mi><mi>m</mi><mi>e</mi><mi>T</mi><mi>o</mi><mi>k</mi><mi>e</mi><mi>n</mi><mo>=</mo></mrow><annotation encoding="application/x-tex">resumeToken = </annotation></semantics></math></span><span class="katex-html" aria-hidden="true"><span class="base"><span class="strut" style="height:0.6944em;"></span><span class="mord mathnormal">res</span><span class="mord mathnormal">u</span><span class="mord mathnormal">m</span><span class="mord mathnormal">e</span><span class="mord mathnormal" style="margin-right:0.13889em;">T</span><span class="mord mathnormal">o</span><span class="mord mathnormal" style="margin-right:0.03148em;">k</span><span class="mord mathnormal">e</span><span class="mord mathnormal">n</span><span class="mspace" style="margin-right:0.2778em;"></span><span class="mrel">=</span></span></span></span>changeStream->getResumeToken();

if ($resumeToken === null) {

    throw new \Exception('Resume token was not found');

}
 <span class="katex"><span class="katex-mathml"><math xmlns="http://www.w3.org/1998/Math/MathML"><semantics><mrow><mi>c</mi><mi>h</mi><mi>a</mi><mi>n</mi><mi>g</mi><mi>e</mi><mi>S</mi><mi>t</mi><mi>r</mi><mi>e</mi><mi>a</mi><mi>m</mi><mo>=</mo></mrow><annotation encoding="application/x-tex">changeStream = </annotation></semantics></math></span><span class="katex-html" aria-hidden="true"><span class="base"><span class="strut" style="height:0.8889em;vertical-align:-0.1944em;"></span><span class="mord mathnormal">c</span><span class="mord mathnormal">han</span><span class="mord mathnormal" style="margin-right:0.03588em;">g</span><span class="mord mathnormal">e</span><span class="mord mathnormal">St</span><span class="mord mathnormal">re</span><span class="mord mathnormal">am</span><span class="mspace" style="margin-right:0.2778em;"></span><span class="mrel">=</span></span></span></span>db->inventory->watch([], ['resumeAfter' => $resumeToken]);

$changeStream->rewind();
 <span class="katex"><span class="katex-mathml"><math xmlns="http://www.w3.org/1998/Math/MathML"><semantics><mrow><mi>f</mi><mi>i</mi><mi>r</mi><mi>s</mi><mi>t</mi><mi>C</mi><mi>h</mi><mi>a</mi><mi>n</mi><mi>g</mi><mi>e</mi><mo>=</mo></mrow><annotation encoding="application/x-tex">firstChange = </annotation></semantics></math></span><span class="katex-html" aria-hidden="true"><span class="base"><span class="strut" style="height:0.8889em;vertical-align:-0.1944em;"></span><span class="mord mathnormal" style="margin-right:0.10764em;">f</span><span class="mord mathnormal">i</span><span class="mord mathnormal">rs</span><span class="mord mathnormal" style="margin-right:0.07153em;">tC</span><span class="mord mathnormal">han</span><span class="mord mathnormal" style="margin-right:0.03588em;">g</span><span class="mord mathnormal">e</span><span class="mspace" style="margin-right:0.2778em;"></span><span class="mrel">=</span></span></span></span>changeStream->current();

You can use the resume_after modifier to resume notifications after the operation specified in the resume token. The resume_after modifier takes a value that must resolve to a resume token, e.g. resume_token in the example below.


resume_token = cursor.resume_token

cursor = db.inventory.watch(resume_after=resume_token)

next(cursor)

You can use the resume_after modifier to resume notifications after the operation specified in the resume token. The resume_after modifier takes a value that must resolve to a resume token, e.g. resume_token in the example below.


  change_stream = inventory.watch

  cursor = change_stream.to_enum

  next_change = cursor.next

  resume_token = change_stream.resume_token

  new_cursor = inventory.watch([], resume_after: resume_token).to_enum

  resumed_change = new_cursor.next

You can use the resumeAfter option to resume notifications after the operation specified in the resume token. The resumeAfter option takes a value that must resolve to a resume token, e.g. resumeToken in the example below.

You can use the resumeAfter option to resume notifications after the operation specified in the resume token. The resumeAfter option takes a value that must resolve to a resume token, e.g. resumeToken in the example below.

You can start a new change stream after a specific event by passing a resume token to startAfter when opening the cursor. UnlikeresumeAfter, startAfter can resume notifications after an invalidate eventby creating a new change stream.

See Resume Tokens for more information on the resume token.

Important

The resume token is available from multiple sources:

Source Description
Change Events Each change event notification includes a resume token on the _id field.
Aggregation The $changeStream aggregation stage includes a resume token on the cursor.postBatchResumeToken field.This field only appears when using the aggregatecommand.
Get More The getMore command includes a resume token on thecursor.postBatchResumeToken field.

Starting in MongoDB 4.2, change streams will throw an exception if the change stream aggregation pipeline modifies an event's _id field.

Tip

MongoDB provides a "snippet", an extension to mongosh, that decodes hex-encoded resume tokens.

You can install and run the resumetokensnippet from mongosh:


snippet install resumetoken

decodeResumeToken('<RESUME TOKEN>')

You can also run resumetokenfrom the command line (without using mongosh) if npmis installed on your system:


npx mongodb-resumetoken-decoder <RESUME TOKEN>

See the following for more details on:

Change event notifications include a resume token on the _id field:


{

   "_id": {

      "_data": "82635019A0000000012B042C0100296E5A1004AB1154ACACD849A48C61756D70D3B21F463C6F7065726174696F6E54797065003C696E736572740046646F63756D656E744B65790046645F69640064635019A078BE67426D7CF4D2000004"

    },

    "operationType": "insert",

    "clusterTime": Timestamp({ "t": 1666193824, "i": 1 }),

    "collectionUUID": new UUID("ab1154ac-acd8-49a4-8c61-756d70d3b21f"),

    "wallTime": ISODate("2022-10-19T15:37:04.604Z"),

    "fullDocument": {

       "_id": ObjectId("635019a078be67426d7cf4d2"'),

       "name": "Giovanni Verga"

    },

    "ns": {

       "db": "test",

       "coll": "names"

    },

    "documentKey": {

       "_id": ObjectId("635019a078be67426d7cf4d2")

    }

}

When using the aggregate command, the $changeStreamaggregation stage includes a resume token on thecursor.postBatchResumeToken field:


{

   "cursor": {

      "firstBatch": [],

      "postBatchResumeToken": {

         "_data": "8263515EAC000000022B0429296E1404"

      },

      "id": Long("4309380460777152828"),

      "ns": "test.names"

   },

   "ok": 1,

   "$clusterTime": {

      "clusterTime": Timestamp({ "t": 1666277036, "i": 1 }),

      "signature": {

         "hash": Binary(Buffer.from("0000000000000000000000000000000000000000", "hex"), 0),

         "keyId": Long("0")

      }

   },

   "operationTime": Timestamp({ "t": 1666277036, "i": 1 })

}

The getMore command also includes a resume token on thecursor.postBatchResumeToken field:


{

   "cursor": {

      "nextBatch": [],

      "postBatchResumeToken": {

         "_data": "8263515979000000022B0429296E1404"

      },

      "id": Long("7049907285270685005"),

      "ns": "test.names"

   },

   "ok": 1,

   "$clusterTime": {

      "clusterTime": Timestamp( { "t": 1666275705, "i": 1 } ),

      "signature": {

         "hash": Binary(Buffer.from("0000000000000000000000000000000000000000", "hex"), 0),

         "keyId": Long("0")

      }

   },

   "operationTime": Timestamp({ "t": 1666275705, "i": 1 })

}

Change streams can benefit architectures with reliant business systems, informing downstream systems once data changes are durable. For example, change streams can save time for developers when implementing Extract, Transform, and Load (ETL) services, cross-platform synchronization, collaboration functionality, and notification services.

For deployments enforcing Authentication on Self-Managed Deployments and authorization:

{ resource: { db: <dbname>, collection: <collection> }, actions: [ "find", "changeStream" ] }  
{ resource: { db: <dbname>, collection: "" }, actions: [ "find", "changeStream" ] }  
{ resource: { db: "", collection: "" }, actions: [ "find", "changeStream" ] }  

Change streams only notify on data changes that have persisted to a majority of data-bearing members in the replica set. This ensures that notifications are triggered only by majority-committed changes that are durable in failure scenarios.

For example, consider a 3-member replica set with a change stream cursor opened against the primary. If a client issues an insert operation, the change stream only notifies the application of the data change once that insert has persisted to a majority of data-bearing members.

If an operation is associated with a transaction, the change event document includes thetxnNumber and the lsid.

Change streams use simple binary comparisons unless an explicit collation is provided.

Starting in MongoDB 5.3, during range migration, change streamevents are not generated for updates to orphaned documents.

Starting in MongoDB 6.0, you can use change stream events to output the version of a document before and after changes (the document pre- and post-images):

Pre- and post-images are not available for a change stream event if the images were:

Additional considerations:

Important

Backward-Incompatible Feature

Starting in MongoDB 6.0, if you are using document pre- and post-images for change streams, you must disablechangeStreamPreAndPostImages for each collection using the collMod command before you can downgrade to an earlier MongoDB version.

See also:

For complete examples with the change stream output, seeChange Streams with Document Pre- and Post-Images.