-
Notifications
You must be signed in to change notification settings - Fork 5.6k
/
Copy pathapply_ops.js
211 lines (182 loc) · 7.95 KB
/
apply_ops.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
/**
* Tests that a change stream will correctly unwind applyOps entries generated by a transaction.
* @tags: [
* uses_transactions,
* requires_fcv_61, # Pre-6.1 builds do not emit change stream events for applyOps.
* requires_majority_read_concern,
* requires_snapshot_read,
* ]
*/
import {withTxnAndAutoRetryOnMongos} from "jstests/libs/auto_retry_transaction_in_sharding.js";
import {assertDropAndRecreateCollection} from "jstests/libs/collection_drop_recreate.js";
import {FixtureHelpers} from "jstests/libs/fixture_helpers.js";
import {ChangeStreamTest} from "jstests/libs/query/change_stream_util.js";
const otherCollName = "change_stream_apply_ops_2";
const coll = assertDropAndRecreateCollection(db, "change_stream_apply_ops");
assertDropAndRecreateCollection(db, otherCollName);
const otherDbName = "change_stream_apply_ops_db";
const otherDbCollName = "someColl";
assertDropAndRecreateCollection(db.getSiblingDB(otherDbName), otherDbCollName);
// Insert a document that gets deleted as part of the transaction.
const kDeletedDocumentId = 0;
const insertRes = assert.commandWorked(coll.runCommand("insert", {
documents: [{_id: kDeletedDocumentId, a: "I was here before the transaction"}],
writeConcern: {w: "majority"}
}));
// Record the clusterTime of the insert, and increment it to give the test start time.
const testStartTime = insertRes.$clusterTime.clusterTime;
testStartTime.i++;
let cst = new ChangeStreamTest(db);
let changeStream = cst.startWatchingChanges({
pipeline: [{$changeStream: {}}, {$project: {"lsid.uid": 0}}],
collection: coll,
doNotModifyInPassthroughs:
true // A collection drop only invalidates single-collection change streams.
});
const sessionOptions = {
causalConsistency: false
};
const txnOptions = {
readConcern: {level: "snapshot"},
writeConcern: {w: "majority"}
};
const session = db.getMongo().startSession(sessionOptions);
// Create these variables before starting the transaction. In sharded passthroughs, accessing
// db[collname] may attempt to implicitly shard the collection, which is not allowed in a txn.
const sessionDb = session.getDatabase(db.getName());
const sessionColl = sessionDb[coll.getName()];
const sessionOtherColl = sessionDb[otherCollName];
const sessionOtherDbColl = session.getDatabase(otherDbName)[otherDbCollName];
withTxnAndAutoRetryOnMongos(session, () => {
// Two inserts on the main test collection.
assert.commandWorked(sessionColl.insert({_id: 1, a: 0}));
assert.commandWorked(sessionColl.insert({_id: 2, a: 0}));
// One insert on a collection that we're not watching. This should be skipped by the
// single-collection changestream.
assert.commandWorked(sessionOtherColl.insert({_id: 111, a: "Doc on other collection"}));
// One insert on a collection in a different database. This should be skipped by the single
// collection and single-db changestreams.
assert.commandWorked(sessionOtherDbColl.insert({_id: 222, a: "Doc on other DB"}));
assert.commandWorked(sessionColl.updateOne({_id: 1}, {$inc: {a: 1}}));
assert.commandWorked(sessionColl.deleteOne({_id: kDeletedDocumentId}));
}, txnOptions);
// Do applyOps on the collection that we care about. This is an "external" applyOps, though (not run
// as part of a transaction). This checks that although this applyOps doesn't have an 'lsid' and
// 'txnNumber', the field gets unwound and a change stream event is emitted. Skip if running in a
// sharded passthrough, since the applyOps command does not exist on mongoS.
if (!FixtureHelpers.isMongos(db)) {
assert.commandWorked(db.runCommand({
applyOps: [
{op: "i", ns: coll.getFullName(), o: {_id: 3, a: "insert from applyOps"}},
]
}));
}
// Drop the collection. This will trigger an "invalidate" event at the end of the stream.
assert.commandWorked(db.runCommand({drop: coll.getName()}));
// Define the set of changes expected for the single-collection case per the operations above.
let expectedChanges = [
{
documentKey: {_id: 1},
fullDocument: {_id: 1, a: 0},
ns: {db: db.getName(), coll: coll.getName()},
operationType: "insert",
lsid: session.getSessionId(),
txnNumber: session.getTxnNumber_forTesting(),
},
{
documentKey: {_id: 2},
fullDocument: {_id: 2, a: 0},
ns: {db: db.getName(), coll: coll.getName()},
operationType: "insert",
lsid: session.getSessionId(),
txnNumber: session.getTxnNumber_forTesting(),
},
{
documentKey: {_id: 1},
ns: {db: db.getName(), coll: coll.getName()},
operationType: "update",
updateDescription: {removedFields: [], updatedFields: {a: 1}, truncatedArrays: []},
lsid: session.getSessionId(),
txnNumber: session.getTxnNumber_forTesting(),
},
{
documentKey: {_id: kDeletedDocumentId},
ns: {db: db.getName(), coll: coll.getName()},
operationType: "delete",
lsid: session.getSessionId(),
txnNumber: session.getTxnNumber_forTesting(),
}
];
if (!FixtureHelpers.isMongos(db)) {
expectedChanges.push({
documentKey: {_id: 3},
fullDocument: {_id: 3, a: "insert from applyOps"},
ns: {db: db.getName(), coll: coll.getName()},
operationType: "insert",
});
}
expectedChanges.push({
operationType: "drop",
ns: {db: db.getName(), coll: coll.getName()},
});
//
// Test behavior of single-collection change streams with apply ops.
//
// Verify that the stream returns the expected sequence of changes.
cst.assertNextChangesEqualWithDeploymentAwareness(
{cursor: changeStream, expectedChanges: expectedChanges});
// Single collection change stream should also be invalidated by the drop.
cst.assertNextChangesEqualWithDeploymentAwareness({
cursor: changeStream,
expectedChanges: [{operationType: "invalidate"}],
expectInvalidate: true
});
//
// Test behavior of whole-db change streams with apply ops.
//
// In a sharded cluster, whole-db-or-cluster streams will see a collection drop from each shard.
for (let i = 1; i < FixtureHelpers.numberOfShardsForCollection(coll); ++i) {
expectedChanges.push({operationType: "drop", ns: {db: db.getName(), coll: coll.getName()}});
}
// Add an entry for the insert on db.otherColl into expectedChanges.
expectedChanges.splice(2, 0, {
documentKey: {_id: 111},
fullDocument: {_id: 111, a: "Doc on other collection"},
ns: {db: db.getName(), coll: otherCollName},
operationType: "insert",
lsid: session.getSessionId(),
txnNumber: session.getTxnNumber_forTesting(),
});
// Verify that a whole-db stream returns the expected sequence of changes, including the insert
// on the other collection but NOT the changes on the other DB or the manual applyOps.
changeStream = cst.startWatchingChanges({
pipeline: [{$changeStream: {startAtOperationTime: testStartTime}}, {$project: {"lsid.uid": 0}}],
collection: 1
});
cst.assertNextChangesEqualWithDeploymentAwareness(
{cursor: changeStream, expectedChanges: expectedChanges});
//
// Test behavior of whole-cluster change streams with apply ops.
//
// Add an entry for the insert on otherDb.otherDbColl into expectedChanges.
expectedChanges.splice(3, 0, {
documentKey: {_id: 222},
fullDocument: {_id: 222, a: "Doc on other DB"},
ns: {db: otherDbName, coll: otherDbCollName},
operationType: "insert",
lsid: session.getSessionId(),
txnNumber: session.getTxnNumber_forTesting(),
});
// Verify that a whole-cluster stream returns the expected sequence of changes, including the
// inserts on the other collection and the other database, but NOT the manual applyOps.
cst = new ChangeStreamTest(db.getSiblingDB("admin"));
changeStream = cst.startWatchingChanges({
pipeline: [
{$changeStream: {startAtOperationTime: testStartTime, allChangesForCluster: true}},
{$project: {"lsid.uid": 0}}
],
collection: 1
});
cst.assertNextChangesEqualWithDeploymentAwareness(
{cursor: changeStream, expectedChanges: expectedChanges});
cst.cleanUp();