Skip to content

Commit

Permalink
SERVER-26077 Tighten checks around migration session id
Browse files Browse the repository at this point in the history
Transmits the migration session id together with the abort command and
ensures that the migration session id reported in the status matches that
of the cloner.
  • Loading branch information
kaloianm committed Sep 13, 2016
1 parent 7291699 commit 0c8ffec
Show file tree
Hide file tree
Showing 10 changed files with 114 additions and 125 deletions.
21 changes: 6 additions & 15 deletions jstests/sharding/bulk_shard_insert.js
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
// Test bulk inserts with sharding
// Test bulk inserts running alonside the auto-balancer. Ensures that they do not conflict with each
// other.
(function() {
'use strict';

// Setup randomized test
var seed = new Date().getTime();
Expand All @@ -8,7 +10,7 @@
Random.srand(seed);
print("Seeded with " + seed);

var st = new ShardingTest({name: jsTestName(), shards: 4, chunkSize: 1});
var st = new ShardingTest({shards: 4, chunkSize: 1});

// Setup sharded collection
var mongos = st.s0;
Expand All @@ -19,8 +21,7 @@
// Insert lots of bulk documents
var numDocs = 1000000;

var bulkSize = Math.floor(Random.rand() * 1000) + 2;
bulkSize = 4000;
var bulkSize = 4000;
var docSize = 128; /* bytes */
print("\n\n\nBulk size is " + bulkSize);

Expand Down Expand Up @@ -62,20 +63,10 @@
// Check we inserted all the documents
st.printShardingStatus();

var count = coll.find().count();
var itcount = count; // coll.find().itcount()

print("Inserted " + docsInserted + " count : " + count + " itcount : " + itcount);

st.startBalancer();

var count = coll.find().count();
var itcount = coll.find().itcount();

print("Inserted " + docsInserted + " count : " + count + " itcount : " + itcount);

// SERVER-3645
// assert.eq( docsInserted, count )
assert.eq(docsInserted, itcount);

st.stop();
})();
9 changes: 5 additions & 4 deletions src/mongo/db/s/active_migrations_registry.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -56,10 +56,11 @@ StatusWith<ScopedRegisterMigration> ActiveMigrationsRegistry::registerMigration(
return {ScopedRegisterMigration(nullptr, false, _activeMoveChunkState->notification)};
}

return {ErrorCodes::ConflictingOperationInProgress,
str::stream()
<< "Unable start new migration because this shard is currently donating chunk for "
<< _activeMoveChunkState->args.getNss().ns()};
return {
ErrorCodes::ConflictingOperationInProgress,
str::stream()
<< "Unable to start new migration because this shard is currently donating chunk for "
<< _activeMoveChunkState->args.getNss().ns()};
}

boost::optional<NamespaceString> ActiveMigrationsRegistry::getActiveMigrationNss() {
Expand Down
30 changes: 23 additions & 7 deletions src/mongo/db/s/migration_chunk_cloner_source_legacy.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -73,10 +73,11 @@ bool isInRange(const BSONObj& obj,
return k.woCompare(min) >= 0 && k.woCompare(max) < 0;
}

BSONObj createRecvChunkCommitRequest(const NamespaceString& nss,
const MigrationSessionId& sessionId) {
BSONObj createRequestWithSessionId(StringData commandName,
const NamespaceString& nss,
const MigrationSessionId& sessionId) {
BSONObjBuilder builder;
builder.append(kRecvChunkCommit, nss.ns());
builder.append(commandName, nss.ns());
sessionId.append(&builder);
return builder.obj();
}
Expand Down Expand Up @@ -179,7 +180,13 @@ MigrationChunkClonerSourceLegacy::~MigrationChunkClonerSourceLegacy() {

Status MigrationChunkClonerSourceLegacy::startClone(OperationContext* txn) {
invariant(!txn->lockState()->isLocked());
auto scopedGuard = MakeGuard([&] { cancelClone(txn); });

// TODO (Kal): This can be changed to cancelClone after 3.4 is released. The reason to only do
// internal cleanup in 3.4 is for backwards compatibility with 3.2 nodes, which cannot
// differentiate between cancellations for different migration sessions. It is thus possible
// that a second migration from different donor, but the same recipient would certainly abort an
// already running migration.
auto scopedGuard = MakeGuard([&] { _cleanup(txn); });

// Resolve the donor and recipient shards and their connection string

Expand Down Expand Up @@ -282,9 +289,17 @@ Status MigrationChunkClonerSourceLegacy::awaitUntilCriticalSectionIsAppropriate(
return {ErrorCodes::OperationFailed, "Data transfer error"};
}

auto migrationSessionIdStatus = MigrationSessionId::extractFromBSON(res);
if (!migrationSessionIdStatus.isOK()) {
return {ErrorCodes::OperationIncomplete,
str::stream() << "Unable to retrieve the id of the migration session due to "
<< migrationSessionIdStatus.getStatus().toString()};
}

if (res["ns"].str() != _args.getNss().ns() || res["from"].str() != _donorCS.toString() ||
!res["min"].isABSONObj() || res["min"].Obj().woCompare(_args.getMinKey()) != 0 ||
!res["max"].isABSONObj() || res["max"].Obj().woCompare(_args.getMaxKey()) != 0) {
!res["max"].isABSONObj() || res["max"].Obj().woCompare(_args.getMaxKey()) != 0 ||
!_sessionId.matches(migrationSessionIdStatus.getValue())) {
// This can happen when the destination aborted the migration and received another
// recvChunk before this thread sees the transition to the abort state. This is
// currently possible only if multiple migrations are happening at once. This is an
Expand Down Expand Up @@ -318,7 +333,8 @@ Status MigrationChunkClonerSourceLegacy::commitClone(OperationContext* txn) {
invariant(!_cloneCompleted);
}

auto responseStatus = _callRecipient(createRecvChunkCommitRequest(_args.getNss(), _sessionId));
auto responseStatus =
_callRecipient(createRequestWithSessionId(kRecvChunkCommit, _args.getNss(), _sessionId));
if (responseStatus.isOK()) {
_cleanup(txn);
return Status::OK();
Expand All @@ -337,7 +353,7 @@ void MigrationChunkClonerSourceLegacy::cancelClone(OperationContext* txn) {
return;
}

_callRecipient(BSON(kRecvChunkAbort << _args.getNss().ns()));
_callRecipient(createRequestWithSessionId(kRecvChunkAbort, _args.getNss(), _sessionId));
_cleanup(txn);
}

Expand Down
85 changes: 34 additions & 51 deletions src/mongo/db/s/migration_destination_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -318,26 +318,37 @@ Status MigrationDestinationManager::start(const string& ns,
_migrateThreadHandle.join();
}

_migrateThreadHandle = stdx::thread([this,
ns,
sessionId,
min,
max,
shardKeyPattern,
fromShardConnString,
epoch,
writeConcern]() {
_migrateThread(
ns, sessionId, min, max, shardKeyPattern, fromShardConnString, epoch, writeConcern);
});
_migrateThreadHandle = stdx::thread(
[this, ns, min, max, shardKeyPattern, fromShardConnString, epoch, writeConcern]() {
_migrateThread(ns, min, max, shardKeyPattern, fromShardConnString, epoch, writeConcern);
});

return Status::OK();
}

void MigrationDestinationManager::abort() {
bool MigrationDestinationManager::abort(const MigrationSessionId& sessionId) {
stdx::lock_guard<stdx::mutex> sl(_mutex);

if (!_sessionId) {
return false;
}

if (!_sessionId->matches(sessionId)) {
warning() << "received abort request from a stale session " << sessionId.toString()
<< ". Current session is " << _sessionId->toString();
return false;
}

_state = ABORT;
_errmsg = "aborted";

return true;
}

void MigrationDestinationManager::abortWithoutSessionIdCheck() {
stdx::lock_guard<stdx::mutex> sl(_mutex);
_state = ABORT;
_errmsg = "aborted without session id check";
}

bool MigrationDestinationManager::startCommit(const MigrationSessionId& sessionId) {
Expand Down Expand Up @@ -382,7 +393,6 @@ bool MigrationDestinationManager::startCommit(const MigrationSessionId& sessionI
}

void MigrationDestinationManager::_migrateThread(std::string ns,
MigrationSessionId sessionId,
BSONObj min,
BSONObj max,
BSONObj shardKeyPattern,
Expand All @@ -398,15 +408,8 @@ void MigrationDestinationManager::_migrateThread(std::string ns,
}

try {
_migrateDriver(opCtx.get(),
ns,
sessionId,
min,
max,
shardKeyPattern,
fromShardConnString,
epoch,
writeConcern);
_migrateDriver(
opCtx.get(), ns, min, max, shardKeyPattern, fromShardConnString, epoch, writeConcern);
} catch (std::exception& e) {
{
stdx::lock_guard<stdx::mutex> sl(_mutex);
Expand Down Expand Up @@ -440,20 +443,20 @@ void MigrationDestinationManager::_migrateThread(std::string ns,

void MigrationDestinationManager::_migrateDriver(OperationContext* txn,
const string& ns,
const MigrationSessionId& sessionId,
const BSONObj& min,
const BSONObj& max,
const BSONObj& shardKeyPattern,
const ConnectionString& fromShardConnString,
const OID& epoch,
const WriteConcernOptions& writeConcern) {
invariant(isActive());
invariant(_sessionId);
invariant(!min.isEmpty());
invariant(!max.isEmpty());

log() << "starting receiving-end of migration of chunk " << redact(min) << " -> " << redact(max)
log() << "Starting receiving end of migration of chunk " << redact(min) << " -> " << redact(max)
<< " for collection " << ns << " from " << fromShardConnString << " at epoch "
<< epoch.toString();
<< epoch.toString() << " with session id " << *_sessionId;

string errmsg;
MoveTimingHelper timing(txn, "to", ns, min, max, 6 /* steps */, &errmsg, ShardId(), ShardId());
Expand All @@ -479,6 +482,7 @@ void MigrationDestinationManager::_migrateDriver(OperationContext* txn,

{
// 0. copy system.namespaces entry if collection doesn't already exist

OldClientWriteContext ctx(txn, ns);
if (!repl::getGlobalReplicationCoordinator()->canAcceptWritesFor(nss)) {
errmsg = str::stream() << "Not primary during migration: " << ns
Expand Down Expand Up @@ -610,12 +614,11 @@ void MigrationDestinationManager::_migrateDriver(OperationContext* txn,
}

timing.done(1);

MONGO_FAIL_POINT_PAUSE_WHILE_SET(migrateThreadHangAtStep1);
}

{
// 2. delete any data already in range
// 2. Synchronously delete any data which might have been left orphaned in range being moved

RangeDeleterOptions deleterOptions(
KeyRange(ns, min.getOwned(), max.getOwned(), shardKeyPattern));
Expand All @@ -641,32 +644,14 @@ void MigrationDestinationManager::_migrateDriver(OperationContext* txn,
}

timing.done(2);

MONGO_FAIL_POINT_PAUSE_WHILE_SET(migrateThreadHangAtStep2);
}

State currentState = getState();
if (currentState == FAIL || currentState == ABORT) {
string errMsg;
RangeDeleterOptions deleterOptions(
KeyRange(ns, min.getOwned(), max.getOwned(), shardKeyPattern));
deleterOptions.writeConcern = writeConcern;
// No need to wait since all existing cursors will filter out this range when
// returning the results.
deleterOptions.waitForOpenCursors = false;
deleterOptions.fromMigrate = true;
deleterOptions.onlyRemoveOrphanedDocs = true;

if (!getDeleter()->queueDelete(txn, deleterOptions, NULL /* notifier */, &errMsg)) {
warning() << "Failed to queue delete for migrate abort: " << redact(errMsg);
}
}

{
// 3. Initial bulk clone
setState(CLONE);

const BSONObj migrateCloneRequest = createMigrateCloneRequest(sessionId);
const BSONObj migrateCloneRequest = createMigrateCloneRequest(*_sessionId);

while (true) {
BSONObj res;
Expand Down Expand Up @@ -742,15 +727,14 @@ void MigrationDestinationManager::_migrateDriver(OperationContext* txn,
}

timing.done(3);

MONGO_FAIL_POINT_PAUSE_WHILE_SET(migrateThreadHangAtStep3);
}

// If running on a replicated system, we'll need to flush the docs we cloned to the
// secondaries
repl::OpTime lastOpApplied = repl::ReplClientInfo::forClient(txn->getClient()).getLastOp();

const BSONObj xferModsRequest = createTransferModsRequest(sessionId);
const BSONObj xferModsRequest = createTransferModsRequest(*_sessionId);

{
// 4. Do bulk of mods
Expand Down Expand Up @@ -806,7 +790,6 @@ void MigrationDestinationManager::_migrateDriver(OperationContext* txn,
}

timing.done(4);

MONGO_FAIL_POINT_PAUSE_WHILE_SET(migrateThreadHangAtStep4);
}

Expand Down Expand Up @@ -896,13 +879,13 @@ void MigrationDestinationManager::_migrateDriver(OperationContext* txn,
}

timing.done(5);

MONGO_FAIL_POINT_PAUSE_WHILE_SET(migrateThreadHangAtStep5);
}

setState(DONE);
timing.done(6);
MONGO_FAIL_POINT_PAUSE_WHILE_SET(migrateThreadHangAtStep6);

conn.done();
}

Expand Down
15 changes: 12 additions & 3 deletions src/mongo/db/s/migration_destination_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,18 @@ class MigrationDestinationManager {
const OID& epoch,
const WriteConcernOptions& writeConcern);

void abort();
/**
* Idempotent method, which causes the current ongoing migration to abort only if it has the
* specified session id, otherwise returns false. If the migration is already aborted, does
* nothing.
*/
bool abort(const MigrationSessionId& sessionId);

/**
* Same as 'abort' above, but unconditionally aborts the current migration without checking the
* session id. Only used for backwards compatibility.
*/
void abortWithoutSessionIdCheck();

bool startCommit(const MigrationSessionId& sessionId);

Expand All @@ -107,7 +118,6 @@ class MigrationDestinationManager {
* Thread which drives the migration apply process on the recipient side.
*/
void _migrateThread(std::string ns,
MigrationSessionId sessionId,
BSONObj min,
BSONObj max,
BSONObj shardKeyPattern,
Expand All @@ -117,7 +127,6 @@ class MigrationDestinationManager {

void _migrateDriver(OperationContext* txn,
const std::string& ns,
const MigrationSessionId& sessionId,
const BSONObj& min,
const BSONObj& max,
const BSONObj& shardKeyPattern,
Expand Down
Loading

0 comments on commit 0c8ffec

Please sign in to comment.