@@ -72,30 +72,45 @@ namespace {
72
72
} // namespace
73
73
74
74
ReplicationCoordinatorExternalStateImpl::ReplicationCoordinatorExternalStateImpl () :
75
- _nextThreadId (0 ) {}
75
+ _startedThreads (false )
76
+ , _nextThreadId(0 ) {}
76
77
ReplicationCoordinatorExternalStateImpl::~ReplicationCoordinatorExternalStateImpl () {}
77
78
78
79
void ReplicationCoordinatorExternalStateImpl::startThreads () {
80
+ boost::lock_guard<boost::mutex> lk (_threadMutex);
81
+ if (_startedThreads) {
82
+ return ;
83
+ }
84
+ log () << " Starting replication applier threads" ;
79
85
_applierThread.reset (new boost::thread (runSyncThread));
80
86
BackgroundSync* bgsync = BackgroundSync::get ();
81
87
_producerThread.reset (new boost::thread (stdx::bind (&BackgroundSync::producerThread,
82
88
bgsync)));
83
89
_syncSourceFeedbackThread.reset (new boost::thread (stdx::bind (&SyncSourceFeedback::run,
84
90
&_syncSourceFeedback)));
85
- newReplUp () ;
91
+ _startedThreads = true ;
86
92
}
87
93
88
94
void ReplicationCoordinatorExternalStateImpl::startMasterSlave (OperationContext* txn) {
89
95
repl::startMasterSlave (txn);
90
96
}
91
97
92
98
void ReplicationCoordinatorExternalStateImpl::shutdown () {
93
- _syncSourceFeedback.shutdown ();
94
- _syncSourceFeedbackThread->join ();
95
- _applierThread->join ();
96
- BackgroundSync* bgsync = BackgroundSync::get ();
97
- bgsync->shutdown ();
98
- _producerThread->join ();
99
+ boost::lock_guard<boost::mutex> lk (_threadMutex);
100
+ if (_startedThreads) {
101
+ log () << " Stopping replication applier threads" ;
102
+ _syncSourceFeedback.shutdown ();
103
+ _syncSourceFeedbackThread->join ();
104
+ _applierThread->join ();
105
+ BackgroundSync* bgsync = BackgroundSync::get ();
106
+ bgsync->shutdown ();
107
+ _producerThread->join ();
108
+ }
109
+ }
110
+
111
+ void ReplicationCoordinatorExternalStateImpl::initiateOplog (OperationContext* txn) {
112
+ createOplog (txn);
113
+ logOpInitiate (txn, BSON (" msg" << " initiating set" ));
99
114
}
100
115
101
116
void ReplicationCoordinatorExternalStateImpl::forwardSlaveHandshake () {
0 commit comments