Skip to content

Commit

Permalink
SERVER-31030 Use full OpTime instead of just Timestamps to refer to o…
Browse files Browse the repository at this point in the history
…plog entries
  • Loading branch information
renctan committed Oct 4, 2017
1 parent d6267ee commit 0ab7000
Show file tree
Hide file tree
Showing 32 changed files with 655 additions and 539 deletions.
2 changes: 1 addition & 1 deletion jstests/replsets/transaction_table_oplog_replay.js
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
let res = table.findOne({"_id.id": lsid.id});

assert.eq(res.txnNum, txnNumber);
assert.eq(res.lastWriteOpTimeTs, ts);
assert.eq(res.lastWriteOpTime.ts, ts);
}

/**
Expand Down
98 changes: 65 additions & 33 deletions jstests/sharding/session_info_in_oplog.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,24 +6,31 @@
(function() {
"use strict";

var checkOplog = function(oplog, lsid, uid, txnNum, stmtId, prevTs) {
var checkOplog = function(oplog, lsid, uid, txnNum, stmtId, prevTs, prevTerm) {
assert(oplog != null);
assert(oplog.lsid != null);
assert.eq(lsid, oplog.lsid.id);
assert.eq(uid, oplog.lsid.uid);
assert.eq(txnNum, oplog.txnNumber);
assert.eq(stmtId, oplog.stmtId);
assert.eq(prevTs.getTime(), oplog.prevTs.getTime());
assert.eq(prevTs.getInc(), oplog.prevTs.getInc());

var oplogPrevTs = oplog.prevOpTime.ts;
assert.eq(prevTs.getTime(), oplogPrevTs.getTime());
assert.eq(prevTs.getInc(), oplogPrevTs.getInc());
assert.eq(prevTerm, oplog.prevOpTime.t);
};

var checkSessionCatalog = function(conn, sessionId, uid, txnNum, expectedTs) {
var checkSessionCatalog = function(conn, sessionId, uid, txnNum, expectedTs, expectedTerm) {
var coll = conn.getDB('config').transactions;
var sessionDoc = coll.findOne({'_id': {id: sessionId, uid: uid}});

assert.eq(txnNum, sessionDoc.txnNum);
assert.eq(expectedTs.getTime(), sessionDoc.lastWriteOpTimeTs.getTime());
assert.eq(expectedTs.getInc(), sessionDoc.lastWriteOpTimeTs.getInc());

var oplogTs = sessionDoc.lastWriteOpTime.ts;
assert.eq(expectedTs.getTime(), oplogTs.getTime());
assert.eq(expectedTs.getInc(), oplogTs.getInc());

assert.eq(expectedTerm, sessionDoc.lastWriteOpTime.t);
};

var runTests = function(mainConn, priConn) {
Expand Down Expand Up @@ -61,12 +68,12 @@
var oplog = priConn.getDB('local').oplog.rs;

var firstDoc = oplog.findOne({ns: 'test.user', 'o._id': 10});
checkOplog(firstDoc, lsid, uid, txnNumber, 0, Timestamp(0, 0));
checkOplog(firstDoc, lsid, uid, txnNumber, 0, Timestamp(0, 0), -1);

var secondDoc = oplog.findOne({ns: 'test.user', 'o._id': 30});
checkOplog(secondDoc, lsid, uid, txnNumber, 1, firstDoc.ts);
checkOplog(secondDoc, lsid, uid, txnNumber, 1, firstDoc.ts, firstDoc.t);

checkSessionCatalog(priConn, lsid, uid, txnNumber, secondDoc.ts);
checkSessionCatalog(priConn, lsid, uid, txnNumber, secondDoc.ts, secondDoc.t);

////////////////////////////////////////////////////////////////////////
// Test update command
Expand All @@ -87,15 +94,15 @@
assert.commandWorked(mainConn.getDB('test').runCommand(cmd));

firstDoc = oplog.findOne({ns: 'test.user', op: 'u', 'o2._id': 10});
checkOplog(firstDoc, lsid, uid, txnNumber, 0, Timestamp(0, 0));
checkOplog(firstDoc, lsid, uid, txnNumber, 0, Timestamp(0, 0), -1);

secondDoc = oplog.findOne({ns: 'test.user', op: 'i', 'o._id': 20});
checkOplog(secondDoc, lsid, uid, txnNumber, 1, firstDoc.ts);
checkOplog(secondDoc, lsid, uid, txnNumber, 1, firstDoc.ts, firstDoc.t);

var thirdDoc = oplog.findOne({ns: 'test.user', op: 'u', 'o2._id': 30});
checkOplog(thirdDoc, lsid, uid, txnNumber, 2, secondDoc.ts);
checkOplog(thirdDoc, lsid, uid, txnNumber, 2, secondDoc.ts, secondDoc.t);

checkSessionCatalog(priConn, lsid, uid, txnNumber, thirdDoc.ts);
checkSessionCatalog(priConn, lsid, uid, txnNumber, thirdDoc.ts, thirdDoc.t);

////////////////////////////////////////////////////////////////////////
// Test delete command
Expand All @@ -112,12 +119,12 @@
assert.commandWorked(mainConn.getDB('test').runCommand(cmd));

firstDoc = oplog.findOne({ns: 'test.user', op: 'd', 'o._id': 10});
checkOplog(firstDoc, lsid, uid, txnNumber, 0, Timestamp(0, 0));
checkOplog(firstDoc, lsid, uid, txnNumber, 0, Timestamp(0, 0), -1);

secondDoc = oplog.findOne({ns: 'test.user', op: 'd', 'o._id': 20});
checkOplog(secondDoc, lsid, uid, txnNumber, 1, firstDoc.ts);
checkOplog(secondDoc, lsid, uid, txnNumber, 1, firstDoc.ts, firstDoc.t);

checkSessionCatalog(priConn, lsid, uid, txnNumber, secondDoc.ts);
checkSessionCatalog(priConn, lsid, uid, txnNumber, secondDoc.ts, secondDoc.t);

////////////////////////////////////////////////////////////////////////
// Test findAndModify command (upsert)
Expand All @@ -136,12 +143,12 @@
assert.commandWorked(mainConn.getDB('test').runCommand(cmd));

firstDoc = oplog.findOne({ns: 'test.user', op: 'i', 'o._id': 40});
checkOplog(firstDoc, lsid, uid, txnNumber, 0, Timestamp(0, 0));
checkOplog(firstDoc, lsid, uid, txnNumber, 0, Timestamp(0, 0), -1);

assert.eq(null, firstDoc.preImageTs);
assert.eq(null, firstDoc.postImageTs);

checkSessionCatalog(priConn, lsid, uid, txnNumber, firstDoc.ts);
checkSessionCatalog(priConn, lsid, uid, txnNumber, firstDoc.ts, firstDoc.t);
var lastTs = firstDoc.ts;

////////////////////////////////////////////////////////////////////////
Expand All @@ -162,14 +169,19 @@
var res = assert.commandWorked(mainConn.getDB('test').runCommand(cmd));

firstDoc = oplog.findOne({ns: 'test.user', op: 'u', 'o2._id': 40, ts: {$gt: lastTs}});
checkOplog(firstDoc, lsid, uid, txnNumber, 0, Timestamp(0, 0));
checkOplog(firstDoc, lsid, uid, txnNumber, 0, Timestamp(0, 0), -1);

assert.eq(null, firstDoc.postImageTs);

var savedDoc = oplog.findOne({ns: 'test.user', op: 'n', ts: firstDoc.preImageTs});
var savedDoc = oplog.findOne({
ns: 'test.user',
op: 'n',
ts: firstDoc.preImageOpTime.ts,
t: firstDoc.preImageOpTime.t
});
assert.eq(beforeDoc, savedDoc.o);

checkSessionCatalog(priConn, lsid, uid, txnNumber, firstDoc.ts);
checkSessionCatalog(priConn, lsid, uid, txnNumber, firstDoc.ts, firstDoc.t);
lastTs = firstDoc.ts;

////////////////////////////////////////////////////////////////////////
Expand All @@ -190,14 +202,19 @@
var afterDoc = mainConn.getDB('test').user.findOne({_id: 40});

firstDoc = oplog.findOne({ns: 'test.user', op: 'u', 'o2._id': 40, ts: {$gt: lastTs}});
checkOplog(firstDoc, lsid, uid, txnNumber, 0, Timestamp(0, 0));
checkOplog(firstDoc, lsid, uid, txnNumber, 0, Timestamp(0, 0), -1);

assert.eq(null, firstDoc.preImageTs);

savedDoc = oplog.findOne({ns: 'test.user', op: 'n', ts: firstDoc.postImageTs});
savedDoc = oplog.findOne({
ns: 'test.user',
op: 'n',
ts: firstDoc.postImageOpTime.ts,
t: firstDoc.postImageOpTime.t
});
assert.eq(afterDoc, savedDoc.o);

checkSessionCatalog(priConn, lsid, uid, txnNumber, firstDoc.ts);
checkSessionCatalog(priConn, lsid, uid, txnNumber, firstDoc.ts, firstDoc.t);
lastTs = firstDoc.ts;

////////////////////////////////////////////////////////////////////////
Expand All @@ -218,14 +235,19 @@
res = assert.commandWorked(mainConn.getDB('test').runCommand(cmd));

firstDoc = oplog.findOne({ns: 'test.user', op: 'u', 'o2._id': 40, ts: {$gt: lastTs}});
checkOplog(firstDoc, lsid, uid, txnNumber, 0, Timestamp(0, 0));
checkOplog(firstDoc, lsid, uid, txnNumber, 0, Timestamp(0, 0), -1);

assert.eq(null, firstDoc.postImageTs);

savedDoc = oplog.findOne({ns: 'test.user', op: 'n', ts: firstDoc.preImageTs});
savedDoc = oplog.findOne({
ns: 'test.user',
op: 'n',
ts: firstDoc.preImageOpTime.ts,
t: firstDoc.preImageOpTime.t
});
assert.eq(beforeDoc, savedDoc.o);

checkSessionCatalog(priConn, lsid, uid, txnNumber, firstDoc.ts);
checkSessionCatalog(priConn, lsid, uid, txnNumber, firstDoc.ts, firstDoc.t);
lastTs = firstDoc.ts;

////////////////////////////////////////////////////////////////////////
Expand All @@ -246,14 +268,19 @@
afterDoc = mainConn.getDB('test').user.findOne({_id: 40});

firstDoc = oplog.findOne({ns: 'test.user', op: 'u', 'o2._id': 40, ts: {$gt: lastTs}});
checkOplog(firstDoc, lsid, uid, txnNumber, 0, Timestamp(0, 0));
checkOplog(firstDoc, lsid, uid, txnNumber, 0, Timestamp(0, 0), -1);

assert.eq(null, firstDoc.preImageTs);

savedDoc = oplog.findOne({ns: 'test.user', op: 'n', ts: firstDoc.postImageTs});
savedDoc = oplog.findOne({
ns: 'test.user',
op: 'n',
ts: firstDoc.postImageOpTime.ts,
t: firstDoc.postImageOpTime.t
});
assert.eq(afterDoc, savedDoc.o);

checkSessionCatalog(priConn, lsid, uid, txnNumber, firstDoc.ts);
checkSessionCatalog(priConn, lsid, uid, txnNumber, firstDoc.ts, firstDoc.t);
lastTs = firstDoc.ts;

////////////////////////////////////////////////////////////////////////
Expand All @@ -273,14 +300,19 @@
res = assert.commandWorked(mainConn.getDB('test').runCommand(cmd));

firstDoc = oplog.findOne({ns: 'test.user', op: 'd', 'o._id': 40, ts: {$gt: lastTs}});
checkOplog(firstDoc, lsid, uid, txnNumber, 0, Timestamp(0, 0));
checkOplog(firstDoc, lsid, uid, txnNumber, 0, Timestamp(0, 0), -1);

assert.eq(null, firstDoc.postImageTs);

savedDoc = oplog.findOne({ns: 'test.user', op: 'n', ts: firstDoc.preImageTs});
savedDoc = oplog.findOne({
ns: 'test.user',
op: 'n',
ts: firstDoc.preImageOpTime.ts,
t: firstDoc.preImageOpTime.t
});
assert.eq(beforeDoc, savedDoc.o);

checkSessionCatalog(priConn, lsid, uid, txnNumber, firstDoc.ts);
checkSessionCatalog(priConn, lsid, uid, txnNumber, firstDoc.ts, firstDoc.t);
lastTs = firstDoc.ts;
};

Expand Down
34 changes: 14 additions & 20 deletions src/mongo/db/op_observer_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -75,13 +75,12 @@ void onWriteOpCompleted(OperationContext* opCtx,
Session* session,
std::vector<StmtId> stmtIdsWritten,
const repl::OpTime& lastStmtIdWriteOpTime) {
const auto lastStmtIdWriteTs = lastStmtIdWriteOpTime.getTimestamp();
if (lastStmtIdWriteTs.isNull())
if (lastStmtIdWriteOpTime.isNull())
return;

if (session) {
session->onWriteOpCompletedOnPrimary(
opCtx, *opCtx->getTxnNumber(), std::move(stmtIdsWritten), lastStmtIdWriteTs);
opCtx, *opCtx->getTxnNumber(), std::move(stmtIdsWritten), lastStmtIdWriteOpTime);
}
}

Expand Down Expand Up @@ -140,7 +139,7 @@ OpTimeBundle replLogUpdate(OperationContext* opCtx,
if (session) {
sessionInfo.setSessionId(*opCtx->getLogicalSessionId());
sessionInfo.setTxnNumber(*opCtx->getTxnNumber());
oplogLink.prevTs = session->getLastWriteOpTimeTs(*opCtx->getTxnNumber());
oplogLink.prevOpTime = session->getLastWriteOpTime(*opCtx->getTxnNumber());
}

OpTimeBundle opTimes;
Expand All @@ -160,9 +159,9 @@ OpTimeBundle replLogUpdate(OperationContext* opCtx,
opTimes.prePostImageOpTime = noteUpdateOpTime;

if (args.storeDocOption == OplogUpdateEntryArgs::StoreDocOption::PreImage) {
oplogLink.preImageTs = noteUpdateOpTime.getTimestamp();
oplogLink.preImageOpTime = noteUpdateOpTime;
} else if (args.storeDocOption == OplogUpdateEntryArgs::StoreDocOption::PostImage) {
oplogLink.postImageTs = noteUpdateOpTime.getTimestamp();
oplogLink.postImageOpTime = noteUpdateOpTime;
}
}

Expand Down Expand Up @@ -197,7 +196,7 @@ OpTimeBundle replLogDelete(OperationContext* opCtx,
if (session) {
sessionInfo.setSessionId(*opCtx->getLogicalSessionId());
sessionInfo.setTxnNumber(*opCtx->getTxnNumber());
oplogLink.prevTs = session->getLastWriteOpTimeTs(*opCtx->getTxnNumber());
oplogLink.prevOpTime = session->getLastWriteOpTime(*opCtx->getTxnNumber());
}

OpTimeBundle opTimes;
Expand All @@ -206,7 +205,7 @@ OpTimeBundle replLogDelete(OperationContext* opCtx,
auto noteOplog = repl::logOp(
opCtx, "n", nss, uuid, deletedDoc.get(), nullptr, false, sessionInfo, stmtId, {});
opTimes.prePostImageOpTime = noteOplog;
oplogLink.preImageTs = noteOplog.getTimestamp();
oplogLink.preImageOpTime = noteOplog;
}

opTimes.writeOpTime = repl::logOp(opCtx,
Expand Down Expand Up @@ -280,10 +279,7 @@ void OpObserverImpl::onInserts(OperationContext* opCtx,
bool fromMigrate) {
Session* const session = opCtx->getTxnNumber() ? OperationContextSession::get(opCtx) : nullptr;

const size_t count = end - begin;
auto timestamps = stdx::make_unique<Timestamp[]>(count);
const auto lastOpTime =
repl::logInsertOps(opCtx, nss, uuid, session, begin, end, timestamps.get(), fromMigrate);
const auto opTimeList = repl::logInsertOps(opCtx, nss, uuid, session, begin, end, fromMigrate);

auto css = CollectionShardingState::get(opCtx, nss.ns());

Expand All @@ -292,10 +288,12 @@ void OpObserverImpl::onInserts(OperationContext* opCtx,
AuthorizationManager::get(opCtx->getServiceContext())
->logOp(opCtx, "i", nss, it->doc, nullptr);
if (!fromMigrate) {
css->onInsertOp(opCtx, it->doc, timestamps[index]);
auto opTime = opTimeList.empty() ? repl::OpTime() : opTimeList[index];
css->onInsertOp(opCtx, it->doc, opTime);
}
}

auto lastOpTime = opTimeList.empty() ? repl::OpTime() : opTimeList.back();
if (nss.coll() == "system.js") {
Scope::storedFuncMod(opCtx);
} else if (nss.coll() == DurableViewCatalog::viewsCollectionName()) {
Expand Down Expand Up @@ -336,8 +334,8 @@ void OpObserverImpl::onUpdate(OperationContext* opCtx, const OplogUpdateEntryArg
args.criteria,
args.update,
args.updatedDoc,
opTime.writeOpTime.getTimestamp(),
opTime.prePostImageOpTime.getTimestamp());
opTime.writeOpTime,
opTime.prePostImageOpTime);
}

if (args.nss.coll() == "system.js") {
Expand All @@ -351,7 +349,6 @@ void OpObserverImpl::onUpdate(OperationContext* opCtx, const OplogUpdateEntryArg
SessionCatalog::get(opCtx)->invalidateSessions(opCtx, args.updatedDoc);
}


onWriteOpCompleted(
opCtx, args.nss, session, std::vector<StmtId>{args.stmtId}, opTime.writeOpTime);
}
Expand Down Expand Up @@ -383,10 +380,7 @@ void OpObserverImpl::onDelete(OperationContext* opCtx,

auto css = CollectionShardingState::get(opCtx, nss.ns());
if (!fromMigrate) {
css->onDeleteOp(opCtx,
deleteState,
opTime.writeOpTime.getTimestamp(),
opTime.prePostImageOpTime.getTimestamp());
css->onDeleteOp(opCtx, deleteState, opTime.writeOpTime, opTime.prePostImageOpTime);
}

if (nss.coll() == "system.js") {
Expand Down
20 changes: 10 additions & 10 deletions src/mongo/db/ops/write_ops_retryability.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ void validateFindAndModifyRetryability(const FindAndModifyRequest& request,
uassert(40607,
str::stream() << "No pre-image available for findAndModify retry request:"
<< redact(request.toBSON()),
oplogWithCorrectLinks.getPreImageTs());
oplogWithCorrectLinks.getPreImageOpTime());
} else if (opType == repl::OpTypeEnum::kInsert) {
uassert(
40608,
Expand Down Expand Up @@ -99,7 +99,7 @@ void validateFindAndModifyRetryability(const FindAndModifyRequest& request,
<< ts.toString()
<< ", oplog: "
<< redact(oplogEntry.toBSON()),
oplogWithCorrectLinks.getPostImageTs());
oplogWithCorrectLinks.getPostImageOpTime());
} else {
uassert(40612,
str::stream() << "findAndModify retry request: " << redact(request.toBSON())
Expand All @@ -108,7 +108,7 @@ void validateFindAndModifyRetryability(const FindAndModifyRequest& request,
<< ts.toString()
<< ", oplog: "
<< redact(oplogEntry.toBSON()),
oplogWithCorrectLinks.getPreImageTs());
oplogWithCorrectLinks.getPreImageOpTime());
}
}
}
Expand All @@ -118,21 +118,21 @@ void validateFindAndModifyRetryability(const FindAndModifyRequest& request,
* oplog.
*/
BSONObj extractPreOrPostImage(OperationContext* opCtx, const repl::OplogEntry& oplog) {
invariant(oplog.getPreImageTs() || oplog.getPostImageTs());
auto ts =
oplog.getPreImageTs() ? oplog.getPreImageTs().value() : oplog.getPostImageTs().value();
invariant(oplog.getPreImageOpTime() || oplog.getPostImageOpTime());
auto opTime = oplog.getPreImageOpTime() ? oplog.getPreImageOpTime().value()
: oplog.getPostImageOpTime().value();

DBDirectClient client(opCtx);
auto oplogDoc = client.findOne(NamespaceString::kRsOplogNamespace.ns(), BSON("ts" << ts));
auto oplogDoc = client.findOne(NamespaceString::kRsOplogNamespace.ns(), opTime.asQuery());

uassert(40613,
str::stream() << "oplog no longer contains the complete write history of this "
"transaction, log with ts "
<< ts.toString()
"transaction, log with opTime "
<< opTime.toString()
<< " cannot be found",
!oplogDoc.isEmpty());
auto oplogEntry = uassertStatusOK(repl::OplogEntry::parse(oplogDoc));

auto oplogEntry = uassertStatusOK(repl::OplogEntry::parse(oplogDoc));
return oplogEntry.getObject().getOwned();
}

Expand Down
Loading

0 comments on commit 0ab7000

Please sign in to comment.