@@ -541,7 +541,7 @@ namespace {
541
541
}
542
542
543
543
void ReplicationCoordinatorImpl::signalUpstreamUpdater () {
544
- _externalState->forwardSlaveHandshake ();
544
+ _externalState->forwardSlaveProgress ();
545
545
}
546
546
547
547
ReplicationCoordinatorImpl::SlaveInfo*
@@ -717,16 +717,19 @@ namespace {
717
717
return _slaveInfo[_getMyIndexInSlaveInfo_inlock ()].opTime ;
718
718
}
719
719
720
- Status ReplicationCoordinatorImpl::setLastOptime_forTest (const OID& rid, const OpTime& ts) {
720
+ Status ReplicationCoordinatorImpl::setLastOptime_forTest (long long cfgVer,
721
+ long long memberId,
722
+ const OpTime& ts) {
721
723
boost::lock_guard<boost::mutex> lock (_mutex);
722
724
invariant (_getReplicationMode_inlock () == modeReplSet);
723
725
724
- const UpdatePositionArgs::UpdateInfo update (rid, ts, -1 , -1 );
725
- return _setLastOptime_inlock (update);
726
+ const UpdatePositionArgs::UpdateInfo update (OID (), ts, cfgVer, memberId);
727
+ long long configVersion;
728
+ return _setLastOptime_inlock (update, &configVersion);
726
729
}
727
730
728
731
Status ReplicationCoordinatorImpl::_setLastOptime_inlock (
729
- const UpdatePositionArgs::UpdateInfo& args) {
732
+ const UpdatePositionArgs::UpdateInfo& args, long long * configVersion ) {
730
733
731
734
if (_selfIndex == -1 ) {
732
735
// Ignore updates when we're in state REMOVED
@@ -735,57 +738,50 @@ namespace {
735
738
}
736
739
invariant (_getReplicationMode_inlock () == modeReplSet);
737
740
741
+ if (args.memberId < 0 ) {
742
+ std::string errmsg = str::stream ()
743
+ << " Received replSetUpdatePosition for node with memberId "
744
+ << args.memberId << " which is negative and therefore invalid" ;
745
+ LOG (1 ) << errmsg;
746
+ return Status (ErrorCodes::NodeNotFound, errmsg);
747
+ }
748
+
738
749
if (args.rid == _getMyRID_inlock () ||
739
750
args.memberId == _rsConfig.getMemberAt (_selfIndex).getId ()) {
740
751
// Do not let remote nodes tell us what our optime is.
741
752
return Status::OK ();
742
753
}
743
754
744
- LOG (2 ) << " received notification that node with RID " << args.rid <<
745
- " has reached optime: " << args.ts ;
755
+ LOG (2 ) << " received notification that node with memberID " << args.memberId <<
756
+ " in config with version " << args. cfgver << " has reached optime: " << args.ts ;
746
757
747
758
SlaveInfo* slaveInfo = NULL ;
748
- if (args.memberId >= 0 ) {
749
- if (args. cfgver != _rsConfig. getConfigVersion ()) {
750
- std::string errmsg = str::stream ()
751
- << " Received replSetUpdatePosition for node with memberId "
752
- << args. memberId << " whose config version of " << args. cfgver
753
- << " doesn't match our config version of "
754
- << _rsConfig. getConfigVersion () ;
755
- LOG ( 1 ) << errmsg ;
756
- return Status (ErrorCodes::InvalidReplicaSetConfig, errmsg);
757
- }
759
+ if (args.cfgver != _rsConfig. getConfigVersion () ) {
760
+ std::string errmsg = str::stream ()
761
+ << " Received replSetUpdatePosition for node with memberId "
762
+ << args. memberId << " whose config version of " << args. cfgver
763
+ << " doesn't match our config version of "
764
+ << _rsConfig. getConfigVersion ();
765
+ LOG ( 1 ) << errmsg ;
766
+ *configVersion = _rsConfig. getConfigVersion () ;
767
+ return Status (ErrorCodes::InvalidReplicaSetConfig, errmsg);
768
+ }
758
769
759
- slaveInfo = _findSlaveInfoByMemberID_inlock (args.memberId );
760
- if (!slaveInfo) {
761
- invariant (!_rsConfig.findMemberByID (args.memberId ));
770
+ slaveInfo = _findSlaveInfoByMemberID_inlock (args.memberId );
771
+ if (!slaveInfo) {
772
+ invariant (!_rsConfig.findMemberByID (args.memberId ));
762
773
763
- std::string errmsg = str::stream ()
764
- << " Received replSetUpdatePosition for node with memberId "
765
- << args.memberId << " which doesn't exist in our config" ;
766
- LOG (1 ) << errmsg;
767
- return Status (ErrorCodes::NodeNotFound, errmsg);
768
- }
769
- }
770
- else {
771
- // The command we received didn't contain a memberId, most likely this is because it
772
- // came from a member running something prior to 3.0.
773
- // Fall back to finding the node by RID.
774
- slaveInfo = _findSlaveInfoByRID_inlock (args.rid );
775
- if (!slaveInfo) {
776
- std::string errmsg = str::stream ()
777
- << " Received replSetUpdatePosition for node with RID " << args.rid
778
- << " , but we haven't yet received a handshake for that node." ;
779
- LOG (1 ) << errmsg;
780
- return Status (ErrorCodes::NodeNotFound, errmsg);
781
- }
782
- invariant (slaveInfo->memberId >= 0 );
774
+ std::string errmsg = str::stream ()
775
+ << " Received replSetUpdatePosition for node with memberId "
776
+ << args.memberId << " which doesn't exist in our config" ;
777
+ LOG (1 ) << errmsg;
778
+ return Status (ErrorCodes::NodeNotFound, errmsg);
783
779
}
784
- invariant (slaveInfo);
785
- invariant (args.memberId < 0 || args.memberId == slaveInfo->memberId );
786
780
787
- LOG (3 ) << " Node with RID " << args.rid << " and memberId " << slaveInfo->memberId
788
- << " currently has optime " << slaveInfo->opTime << " ; updating to " << args.ts ;
781
+ invariant (args.memberId == slaveInfo->memberId );
782
+
783
+ LOG (3 ) << " Node with memberID " << args.memberId << " currently has optime " <<
784
+ slaveInfo->opTime << " ; updating to " << args.ts ;
789
785
790
786
// Only update remote optimes if they increase.
791
787
if (slaveInfo->opTime < args.ts ) {
@@ -1345,41 +1341,13 @@ namespace {
1345
1341
// we need to keep sending it for 2.6 compatibility.
1346
1342
// TODO(spencer): Remove this after 3.0 is released.
1347
1343
const MemberConfig* member = _rsConfig.findMemberByID (itr->memberId );
1348
- fassert (18651 , member); // We ensured the member existed in processHandshake.
1344
+ fassert (18651 , member);
1349
1345
entry.append (" config" , member->toBSON (_rsConfig.getTagConfig ()));
1350
1346
}
1351
1347
}
1352
1348
return true ;
1353
1349
}
1354
1350
1355
- void ReplicationCoordinatorImpl::prepareReplSetUpdatePositionCommandHandshakes (
1356
- std::vector<BSONObj>* handshakes) {
1357
- boost::lock_guard<boost::mutex> lock (_mutex);
1358
- // handshake objs for ourself and all chained members
1359
- for (SlaveInfoVector::const_iterator itr = _slaveInfo.begin ();
1360
- itr != _slaveInfo.end (); ++itr) {
1361
- if (!itr->rid .isSet ()) {
1362
- // Don't include info on members we haven't heard from yet.
1363
- continue ;
1364
- }
1365
-
1366
- BSONObjBuilder cmd;
1367
- cmd.append (" replSetUpdatePosition" , 1 );
1368
- {
1369
- BSONObjBuilder subCmd (cmd.subobjStart (" handshake" ));
1370
- subCmd.append (" handshake" , itr->rid );
1371
- subCmd.append (" member" , itr->memberId );
1372
- // SERVER-14550 Even though the "config" field isn't used on the other end in 3.0,
1373
- // we need to keep sending it for 2.6 compatibility.
1374
- // TODO(spencer): Remove this after 3.0 is released.
1375
- const MemberConfig* member = _rsConfig.findMemberByID (itr->memberId );
1376
- fassert (18650 , member); // We ensured the member existed in processHandshake.
1377
- subCmd.append (" config" , member->toBSON (_rsConfig.getTagConfig ()));
1378
- }
1379
- handshakes->push_back (cmd.obj ());
1380
- }
1381
- }
1382
-
1383
1351
Status ReplicationCoordinatorImpl::processReplSetGetStatus (BSONObjBuilder* response) {
1384
1352
Status result (ErrorCodes::InternalError, " didn't set status in prepareStatusResponse" );
1385
1353
CBHStatus cbh = _replExecutor.scheduleWork (
@@ -1452,6 +1420,11 @@ namespace {
1452
1420
}
1453
1421
}
1454
1422
1423
+ ReplicaSetConfig ReplicationCoordinatorImpl::getConfig () const {
1424
+ boost::lock_guard<boost::mutex> lock (_mutex);
1425
+ return _rsConfig;
1426
+ }
1427
+
1455
1428
void ReplicationCoordinatorImpl::processReplSetGetConfig (BSONObjBuilder* result) {
1456
1429
boost::lock_guard<boost::mutex> lock (_mutex);
1457
1430
result->append (" config" , _rsConfig.toBSON ());
@@ -2109,15 +2082,15 @@ namespace {
2109
2082
}
2110
2083
2111
2084
Status ReplicationCoordinatorImpl::processReplSetUpdatePosition (
2112
- const UpdatePositionArgs& updates) {
2085
+ const UpdatePositionArgs& updates, long long * configVersion ) {
2113
2086
2114
2087
boost::unique_lock<boost::mutex> lock (_mutex);
2115
2088
Status status = Status::OK ();
2116
2089
bool somethingChanged = false ;
2117
2090
for (UpdatePositionArgs::UpdateIterator update = updates.updatesBegin ();
2118
2091
update != updates.updatesEnd ();
2119
2092
++update) {
2120
- status = _setLastOptime_inlock (*update);
2093
+ status = _setLastOptime_inlock (*update, configVersion );
2121
2094
if (!status.isOK ()) {
2122
2095
break ;
2123
2096
}
@@ -2136,36 +2109,12 @@ namespace {
2136
2109
LOG (2 ) << " Received handshake " << handshake.toBSON ();
2137
2110
2138
2111
boost::unique_lock<boost::mutex> lock (_mutex);
2139
- if (_getReplicationMode_inlock () == modeReplSet) {
2140
- if (_selfIndex == -1 ) {
2141
- // Ignore updates when we're in state REMOVED
2142
- return Status (ErrorCodes::NotMasterOrSecondaryCode,
2143
- " Received replSetUpdatePosition command but we are in state REMOVED" );
2144
- }
2145
2112
2146
- int memberId = handshake.getMemberId ();
2147
- const MemberConfig* member = _rsConfig.findMemberByID (memberId);
2148
- if (!member) {
2149
- return Status (ErrorCodes::NodeNotFound,
2150
- str::stream () << " Node with replica set memberId " << memberId <<
2151
- " could not be found in replica set config while attempting"
2152
- " to associate it with RID " << handshake.getRid () <<
2153
- " in replication handshake. ReplSet Config: " <<
2154
- _rsConfig.toBSON ().toString ());
2155
- }
2156
- SlaveInfo* slaveInfo = _findSlaveInfoByMemberID_inlock (handshake.getMemberId ());
2157
- invariant (slaveInfo); // If it's in the config it must be in _slaveInfo
2158
- slaveInfo->rid = handshake.getRid ();
2159
- slaveInfo->hostAndPort = member->getHostAndPort ();
2160
-
2161
- if (!_getMemberState_inlock ().primary ()) {
2162
- lock.unlock ();
2163
- _externalState->forwardSlaveHandshake (); // must do outside _mutex
2164
- }
2165
- return Status::OK ();
2113
+ if (_getReplicationMode_inlock () != modeMasterSlave) {
2114
+ return Status (ErrorCodes::IllegalOperation,
2115
+ " The handshake command is only used for master/slave replication" );
2166
2116
}
2167
2117
2168
- // master-slave from here down
2169
2118
SlaveInfo* slaveInfo = _findSlaveInfoByRID_inlock (handshake.getRid ());
2170
2119
if (slaveInfo) {
2171
2120
return Status::OK (); // nothing to do
0 commit comments