Skip to content

Commit

Permalink
SERVER-8070 Make sure buffer is drained before choosing sync target
Browse files Browse the repository at this point in the history
  • Loading branch information
kchodorow committed Jan 15, 2013
1 parent db62658 commit f380d40
Show file tree
Hide file tree
Showing 3 changed files with 165 additions and 2 deletions.
142 changes: 142 additions & 0 deletions jstests/replsets/server8070.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,142 @@
// Test for SERVER-8070: Flush buffer before changing sync targets to prevent unnecessary rollbacks
// This test writes 50 ops to one secondary's data (member2) and 25 ops to the other secondary's
// data (member3), then puts 50 more ops in member3's buffer and makes sure that member3 doesn't try
// to sync from member2.

var replSet = new ReplSetTest({name: 'testSet', nodes: 3});
replSet.startSet();
replSet.initiate(
{
_id:'testSet',
members:
[
{_id: 0, host: getHostName()+":"+replSet.ports[0]},
{_id: 1, host: getHostName()+":"+replSet.ports[1], priority: 0},
{_id: 2, host: getHostName()+":"+replSet.ports[2], priority: 0}
]
}
);

var checkRepl = function(db1, db2) {
assert.soon(
function() {
var last1 = db1.getSisterDB("local").oplog.rs.find().sort({$natural:-1}).limit(1)
.next();
var last2 = db2.getSisterDB("local").oplog.rs.find().sort({$natural:-1}).limit(1)
.next();
print(tojson(last1)+" "+tojson(last2));

return last2.ts.t == last1.ts.t;
}
);
};

// Do an initial write
var master = replSet.getMaster();
master.getDB("foo").bar.insert({x:1});
replSet.awaitReplication();

var primary = master.getDB("foo");
replSet.nodes[1].setSlaveOk();
replSet.nodes[2].setSlaveOk();
var member2 = replSet.nodes[1].getDB("admin");
var member3 = replSet.nodes[2].getDB("admin");

print("Make sure 2 & 3 are syncing from the primary");
member2.adminCommand({replSetSyncFrom : getHostName()+":"+replSet.ports[0]});
member3.adminCommand({replSetSyncFrom : getHostName()+":"+replSet.ports[0]});

print("Stop 2's replication");
member2.runCommand({configureFailPoint: 'rsSyncApplyStop', mode: 'alwaysOn'});

print("Do a few writes");
for (var i = 0; i < 25; i++) {
primary.bar.insert({x: i});
}

print("Make sure 3 is at write #25");
checkRepl(primary, member3);
// This means 3's buffer is empty

print("Stop 3's replication");
member3.runCommand({configureFailPoint: 'rsSyncApplyStop', mode: 'alwaysOn'});

print("Start 2's replication");
member2.runCommand({configureFailPoint: 'rsSyncApplyStop', mode: 'off'});

print("Do some writes");
for (var i = 25; i < 50; i++) {
primary.bar.insert({x: i});
}

print("Make sure 2 is at write #50");
checkRepl(primary, member2);
// This means 2's buffer is empty

print("Stop 2's replication");
member2.runCommand({configureFailPoint: 'rsSyncApplyStop', mode: 'alwaysOn'});

print("Do some writes - 2 & 3 should have up to write #75 in their buffers, but unapplied");
for (var i = 50; i < 75; i++) {
primary.bar.insert({x: i});
}
var last = primary.getSisterDB("local").oplog.rs.find().sort({$natural:-1}).limit(1).next();

print("waiting a bit for the secondaries to get the write");
sleep(10000);

print("Shut down the primary");
replSet.stop(0);

// This was used before the server was fixed to prove that the code was broken
var unfixed = function() {
print("3 should attempt to sync from 2, as 2 is 'ahead'");
assert.soon(
function() {
var syncingTo = member3.adminCommand({replSetGetStatus:1}).syncingTo;
return syncingTo == getHostName()+":"+replSet.ports[1];
}
);
};

var fixed = function() {
print("3 should not attempt to sync from 2, as it cannot clear its buffer");
assert.throws(
function() {
assert.soon(
function() {
var syncingTo = member3.adminCommand({replSetGetStatus:1}).syncingTo;
return syncingTo == getHostName()+":"+replSet.ports[1];
}
);
}
);
};

//unfixed();
fixed();

print(" --- pause 3's bgsync thread ---");
member3.runCommand({configureFailPoint: 'rsBgSyncProduce', mode: 'alwaysOn'});

print("Allow 3 to apply ops 25-75");
member3.runCommand({configureFailPoint: 'rsSyncApplyStop', mode: 'off'});
assert.soon(
function() {
var last3 = member3.getSisterDB("local").oplog.rs.find().sort({$natural:-1}).limit(1)
.next();
return last.ts.t == last3.ts.t;
}
);

print(" --- start 3's bgsync thread ---");
member3.runCommand({configureFailPoint: 'rsBgSyncProduce', mode: 'off'});

print("Shouldn't hit rollback");
var end = (new Date()).getTime()+30000;
while ((new Date()).getTime() < end) {
assert('ROLLBACK' != member3.runCommand({replSetGetStatus:1}).members[2].stateStr);
sleep(1000);
}

replSet.stopSet();
17 changes: 15 additions & 2 deletions src/mongo/db/repl/bgsync.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,12 @@
#include "mongo/db/commands/server_status.h"
#include "mongo/db/repl/bgsync.h"
#include "mongo/db/repl/rs_sync.h"
#include "mongo/util/fail_point_service.h"

namespace mongo {
namespace replset {
MONGO_FP_DECLARE(rsBgSyncProduce);

BackgroundSync* BackgroundSync::s_instance = 0;
boost::mutex BackgroundSync::s_mutex;

Expand Down Expand Up @@ -294,6 +297,10 @@ namespace replset {
return;
}

while (MONGO_FAIL_POINT(rsBgSyncProduce)) {
sleepmillis(0);
}

uassert(1000, "replSet source for syncing doesn't seem to be await capable -- is it an older version of mongodb?", r.awaitCapable() );

if (isRollbackRequired(r)) {
Expand Down Expand Up @@ -434,13 +441,18 @@ namespace replset {
const Member *target = NULL, *stale = NULL;
BSONObj oldest;

// then we're initial syncing and we're still waiting for this to be set
{
boost::unique_lock<boost::mutex> lock(_mutex);
if (_lastOpTimeFetched.isNull()) {
// then we're initial syncing and we're still waiting for this to be set
_currentSyncTarget = NULL;
return;
}

// Wait until we've applied the ops we have before we choose a sync target
while (!_appliedBuffer) {
_condvar.wait(lock);
}
}

verify(r.conn() == NULL);
Expand Down Expand Up @@ -496,7 +508,8 @@ namespace replset {
}
OpTime theirTS = theirLastOp["ts"]._opTime();
if (theirTS < _lastOpTimeFetched) {
log() << "replSet we are ahead of the primary, will try to roll back" << rsLog;
log() << "replSet we are ahead of the sync source, will try to roll back"
<< rsLog;
theReplSet->syncRollback(r);
return true;
}
Expand Down
8 changes: 8 additions & 0 deletions src/mongo/db/repl/rs_sync.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
#include "mongo/db/repl/bgsync.h"
#include "mongo/db/repl/rs.h"
#include "mongo/db/repl/rs_sync.h"
#include "mongo/util/fail_point_service.h"

namespace mongo {

Expand All @@ -38,6 +39,8 @@ namespace mongo {

namespace replset {

MONGO_FP_DECLARE(rsSyncApplyStop);

SyncTail::SyncTail(BackgroundSyncInterface *q) :
Sync(""), oplogVersion(0), _networkQueue(q)
{}
Expand Down Expand Up @@ -409,6 +412,11 @@ namespace replset {
}
}

// For pausing replication in tests
while (MONGO_FAIL_POINT(rsSyncApplyStop)) {
sleepmillis(0);
}

const BSONObj& lastOp = ops.getDeque().back();
setOplogVersion(lastOp);
handleSlaveDelay(lastOp);
Expand Down

0 comments on commit f380d40

Please sign in to comment.