Skip to content

Commit

Permalink
rs de-spaghetti
Browse files Browse the repository at this point in the history
  • Loading branch information
dwight committed Jul 22, 2010
1 parent 48e9ea7 commit 6212e5c
Show file tree
Hide file tree
Showing 9 changed files with 136 additions and 84 deletions.
2 changes: 1 addition & 1 deletion db/oplog.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ namespace mongo {

long long hNew;
if( theReplSet ) {
massert(13312, "replSet error : logOp() but not primary?", theReplSet->isPrimary());
massert(13312, "replSet error : logOp() but not primary?", theReplSet->box.getState().primary());
hNew = (theReplSet->lastH * 131 + ts.asLL()) * 17 + theReplSet->selfId();
}
else {
Expand Down
36 changes: 18 additions & 18 deletions db/repl/health.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -95,26 +95,26 @@ namespace mongo {
}

string ReplSetImpl::stateAsHtml(MemberState s) {
if( s == RS_STARTUP ) return a("", "serving still starting up, or still trying to initiate the set", "STARTUP");
if( s == RS_PRIMARY ) return a("", "this server thinks it is primary", "PRIMARY");
if( s == RS_SECONDARY ) return a("", "this server thinks it is a secondary (slave mode)", "SECONDARY");
if( s == RS_RECOVERING ) return a("", "recovering/resyncing; after recovery usually auto-transitions to secondary", "RECOVERING");
if( s == RS_FATAL ) return a("", "something bad has occurred and server is not completely offline with regard to the replica set. fatal error.", "RS_FATAL");
if( s == RS_STARTUP2 ) return a("", "loaded config, still determining who is primary", "RS_STARTUP2");
if( s == RS_ARBITER ) return a("", "this server is an arbiter only", "ARBITER");
if( s == RS_DOWN ) return a("", "member is down, slow, or unreachable", "DOWN");
if( s.s == MemberState::RS_STARTUP ) return a("", "serving still starting up, or still trying to initiate the set", "STARTUP");
if( s.s == MemberState::RS_PRIMARY ) return a("", "this server thinks it is primary", "PRIMARY");
if( s.s == MemberState::RS_SECONDARY ) return a("", "this server thinks it is a secondary (slave mode)", "SECONDARY");
if( s.s == MemberState::RS_RECOVERING ) return a("", "recovering/resyncing; after recovery usually auto-transitions to secondary", "RECOVERING");
if( s.s == MemberState::RS_FATAL ) return a("", "something bad has occurred and server is not completely offline with regard to the replica set. fatal error.", "RS_FATAL");
if( s.s == MemberState::RS_STARTUP2 ) return a("", "loaded config, still determining who is primary", "RS_STARTUP2");
if( s.s == MemberState::RS_ARBITER ) return a("", "this server is an arbiter only", "ARBITER");
if( s.s == MemberState::RS_DOWN ) return a("", "member is down, slow, or unreachable", "DOWN");
return "";
}

string ReplSetImpl::stateAsStr(MemberState s) {
if( s == RS_STARTUP ) return "STARTUP";
if( s == RS_PRIMARY ) return "PRIMARY";
if( s == RS_SECONDARY ) return "SECONDARY";
if( s == RS_RECOVERING ) return "RECOVERING";
if( s == RS_FATAL ) return "FATAL";
if( s == RS_STARTUP2 ) return "STARTUP2";
if( s == RS_ARBITER ) return "ARBITER";
if( s == RS_DOWN ) return "DOWN";
if( s.s == MemberState::RS_STARTUP ) return "STARTUP";
if( s.s == MemberState::RS_PRIMARY ) return "PRIMARY";
if( s.s == MemberState::RS_SECONDARY ) return "SECONDARY";
if( s.s == MemberState::RS_RECOVERING ) return "RECOVERING";
if( s.s == MemberState::RS_FATAL ) return "FATAL";
if( s.s == MemberState::RS_STARTUP2 ) return "STARTUP2";
if( s.s == MemberState::RS_ARBITER ) return "ARBITER";
if( s.s == MemberState::RS_DOWN ) return "DOWN";
return "";
}

Expand Down Expand Up @@ -262,7 +262,7 @@ namespace mongo {
td(ago(started)) <<
td("") << // last heartbeat
td(ToString(_self->config().votes)) <<
td(stateAsHtml(_myState));
td(stateAsHtml(box.getState()));
s << td( _hbmsg );
stringstream q;
q << "/_replSetOplog?" << _self->id();
Expand Down Expand Up @@ -320,7 +320,7 @@ namespace mongo {
}
b.append("set", name());
b.appendTimeT("date", time(0));
b.append("myState", _myState);
b.append("myState", box.getState().s);
b.append("members", v);
}

Expand Down
4 changes: 2 additions & 2 deletions db/repl/heartbeat.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ namespace mongo {
return false;
}
result.append("set", theReplSet->name());
result.append("state", theReplSet->state());
result.append("state", theReplSet->state().s);
result.append("hbmsg", theReplSet->hbmsg());
result.appendDate("opTime", theReplSet->lastOpTimeWritten.asDate());
int v = theReplSet->config().version;
Expand Down Expand Up @@ -128,7 +128,7 @@ namespace mongo {
{
be state = info["state"];
if( state.ok() )
mem.hbstate = (MemberState) state.Int();
mem.hbstate = MemberState(state.Int());
}
if( ok ) {
if( mem.upSince == 0 ) {
Expand Down
17 changes: 7 additions & 10 deletions db/repl/manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ namespace mongo {
Member *m = rs->head();
Member *p = 0;
while( m ) {
if( m->state() == RS_PRIMARY && m->hbinfo().up() ) {
if( m->state().primary() && m->hbinfo().up() ) {
if( p ) throw "twomasters"; // our polling is asynchronous, so this is often ok.
p = m;
}
Expand All @@ -54,14 +54,10 @@ namespace mongo {
}

void Manager::noteARemoteIsPrimary(const Member *m) {
if( rs->currentPrimary() == m )
if( rs->box.getPrimary() == m )
return;
rs->_currentPrimary = m;
rs->_self->lhb() = "";
if( rs->iAmArbiterOnly() )
rs->changeState(RS_ARBITER);
else
rs->changeState(RS_RECOVERING);
rs->box.set(rs->iAmArbiterOnly() ? MemberState::RS_ARBITER : MemberState::RS_RECOVERING, m);
}

/** called as the health threads get new results */
Expand All @@ -71,17 +67,18 @@ namespace mongo {

if( busyWithElectSelf ) return;

const Member *p = rs->currentPrimary();
const Member *p = rs->box.getPrimary();
if( p && !p->hbinfo().up() ) {
assert( p != rs->_self );
p = rs->_currentPrimary = 0;
p = 0;
rs->box.setOtherPrimary(0);
}

const Member *p2;
try { p2 = findOtherPrimary(); }
catch(string s) {
/* two other nodes think they are primary (asynchronously polled) -- wait for things to settle down. */
log() << "replSet warning DIAG TODO 2primary" << s << rsLog;
log() << "replSet warning DIAG 2 primary" << s << rsLog;
return;
}

Expand Down
4 changes: 2 additions & 2 deletions db/repl/replset_commands.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ namespace mongo {
virtual bool run(const string& , BSONObj& cmdObj, string& errmsg, BSONObjBuilder& result, bool fromRepl) {
if( !check(errmsg, result) )
return false;
if( !theReplSet->isPrimary() ) {
if( !theReplSet->box.getState().primary() ) {
errmsg = "replSetReconfig command must be sent to the current replica set primary.";
return false;
}
Expand Down Expand Up @@ -140,7 +140,7 @@ namespace mongo {
virtual bool run(const string& , BSONObj& cmdObj, string& errmsg, BSONObjBuilder& result, bool fromRepl) {
if( !check(errmsg, result) )
return false;
if( !theReplSet->isPrimary() ) {
if( !theReplSet->box.getState().primary() ) {
errmsg = "not primary so can't step down";
return false;
}
Expand Down
54 changes: 31 additions & 23 deletions db/repl/rs.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,33 +31,31 @@ namespace mongo {

void ReplSetImpl::assumePrimary() {
assert( iAmPotentiallyHot() );
writelock lk("admin."); // so we are synchronized with _logOp()
_myState = RS_PRIMARY;
_currentPrimary = _self;
writelock lk("admin."); // so we are synchronized with _logOp()
box.setSelfPrimary(_self);
log(2) << "replSet self (" << _self->id() << ") is now primary" << rsLog;
}

void ReplSetImpl::changeState(MemberState s) {
/* TODO LOCKING ? */
/* TODO call this don't touch mystate directly */
_myState = s;
// todo check if primary ptr needs settings or removing???
box.change(s);
}

void ReplSetImpl::relinquish() {
if( state() == RS_PRIMARY ) {
changeState(RS_RECOVERING);
if( box.getState().primary() ) {
changeState(MemberState::RS_RECOVERING);
log() << "replSet info relinquished primary state" << rsLog;
}
else if( state() == RS_STARTUP2 ) {
else if( box.getState().startup2() ) {
// ? add comment
changeState(RS_RECOVERING);
changeState(MemberState::RS_RECOVERING);
}
}

bool ReplSetImpl::_stepDown() {
lock lk(this);
if( isPrimary() ) {
changeState(RS_RECOVERING);
if( box.getState().primary() ) {
changeState(MemberState::RS_RECOVERING);
elect.steppedDown = time(0) + 60;
log() << "replSet info stepped down as primary" << rsLog;
return true;
Expand Down Expand Up @@ -95,9 +93,10 @@ namespace mongo {
}

void ReplSetImpl::_fillIsMaster(BSONObjBuilder& b) {
bool isp = isPrimary();
const StateBox::SP sp = box.get();
bool isp = sp.state.primary();
b.append("ismaster", isp);
b.append("secondary", isSecondary());
b.append("secondary", sp.state.secondary());
b.append("msg", "replica sets not yet fully implemented. do not use yet. v1.5.6=alpha");
{
vector<string> hosts, passives, arbiters;
Expand All @@ -119,7 +118,7 @@ namespace mongo {
}

if( !isp ) {
const Member *m = currentPrimary();
const Member *m = sp.primary;
if( m )
b.append("primary", m->h().toString());
}
Expand Down Expand Up @@ -176,8 +175,7 @@ namespace mongo {
memset(_hbmsg, 0, sizeof(_hbmsg));
*_hbmsg = '.'; // temp...just to see
lastH = 0;
_myState = RS_STARTUP;
_currentPrimary = 0;
changeState(MemberState::RS_STARTUP);

vector<HostAndPort> *seeds = new vector<HostAndPort>;
set<HostAndPort> seedSet;
Expand Down Expand Up @@ -224,7 +222,8 @@ namespace mongo {
dbexit( EXIT_REPLICATION_ERROR );
return;
}
_myState = RS_STARTUP2;

changeState(MemberState::RS_STARTUP2);
startThreads();
newReplUp(); // oplog.cpp
}
Expand Down Expand Up @@ -264,22 +263,31 @@ namespace mongo {

endOldHealthTasks();

int oldPrimaryId = currentPrimary() ? currentPrimary()->id() : -1;
_currentPrimary = 0;


int oldPrimaryId = -1;
{
const Member *p = box.getPrimary();
if( p )
oldPrimaryId = p->id();
}
box.setOtherPrimary(0);
_self = 0;
for( vector<ReplSetConfig::MemberCfg>::iterator i = _cfg->members.begin(); i != _cfg->members.end(); i++ ) {
const ReplSetConfig::MemberCfg& m = *i;
Member *mi;
if( m.h.isSelf() ) {
assert( _self == 0 );
mi = _self = new Member(m.h, m._id, &m, true);
if( mi->id() == oldPrimaryId )
box.setSelfPrimary(mi);
} else {
mi = new Member(m.h, m._id, &m, false);
_members.push(mi);
startHealthTaskFor(mi);
if( mi->id() == oldPrimaryId )
box.setOtherPrimary(mi);
}
if( mi->id() == oldPrimaryId )
_currentPrimary = mi;
}
return true;
}
Expand Down Expand Up @@ -385,7 +393,7 @@ namespace mongo {
void ReplSetImpl::_fatal()
{
//lock l(this);
_myState = RS_FATAL;
box.set(MemberState::RS_FATAL, 0);
sethbmsg("fatal error");
log() << "replSet error fatal error, stopping replication" << rsLog;
}
Expand Down
53 changes: 43 additions & 10 deletions db/repl/rs.h
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,45 @@ namespace mongo {

class ReplSetHealthPollTask;

/* safe container for our state that keeps member pointer and state variables always aligned */
class StateBox : boost::noncopyable {
public:
struct SP { // SP is like pair<MemberState,const Member *> but nicer
SP() : state(MemberState::RS_STARTUP), primary(0) { }
MemberState state;
const Member *primary;
};
const SP get() {
scoped_lock lk(m);
return sp;
}
MemberState getState() const { return sp.state; }
const Member* getPrimary() const { return sp.primary; }
void change(MemberState s) {
scoped_lock lk(m);
sp.state = s;
// note : we don't correct primary if RS_PRIMARY was set here. that must be done upstream.
}
void set(MemberState s, const Member *p) {
scoped_lock lk(m);
sp.state = s; sp.primary = p;
}
void setSelfPrimary(const Member *self) {
scoped_lock lk(m);
sp.state = MemberState::RS_PRIMARY;
sp.primary = self;
}
void setOtherPrimary(const Member *mem) {
scoped_lock lk(m);
assert( !sp.state.primary() );
sp.primary = mem;
}
StateBox() : m("StateBox") { }
private:
mutex m;
SP sp;
};

/* information about the entire repl set, such as the various servers in the set, and their state */
/* note: We currently do not free mem when the set goes away - it is assumed the replset is a
singleton and long lived.
Expand All @@ -169,10 +208,8 @@ namespace mongo {

/* todo thread */
void msgUpdateHBInfo(HeartbeatInfo);
bool isPrimary() const { return _myState == RS_PRIMARY; }
bool isSecondary() const { return _myState == RS_SECONDARY; }

//bool initiated() const { return curOpTime.initiated(); }
StateBox box;

OpTime lastOpTimeWritten;
long long lastH; // hash we use to make sure we are reading the right flow of ops and aren't on an out-of-date "fork"
Expand All @@ -183,7 +220,7 @@ namespace mongo {

private:
Consensus elect;
bool ok() const { return _myState != RS_FATAL; }
bool ok() const { return !box.getState().fatal(); }

void relinquish();
protected:
Expand Down Expand Up @@ -214,7 +251,7 @@ namespace mongo {
void _fillIsMasterHost(const Member*, vector<string>&, vector<string>&, vector<string>&);
const ReplSetConfig& config() { return *_cfg; }
string name() const { return _name; } /* @return replica set's logical name */
MemberState state() const { return _myState; }
MemberState state() const { return box.getState(); }
void _fatal();
void _getOplogDiagsAsHtml(unsigned server_id, stringstream& ss) const;
void _summarizeAsHtml(stringstream&) const;
Expand All @@ -230,8 +267,6 @@ namespace mongo {
void _go();

private:

MemberState _myState;
string _name;
const vector<HostAndPort> *_seeds;
ReplSetConfig *_cfg;
Expand All @@ -243,11 +278,9 @@ namespace mongo {
void loadConfig();

list<HostAndPort> memberHostnames() const;
const Member* currentPrimary() const { return _currentPrimary; }
const ReplSetConfig::MemberCfg& myConfig() const { return _self->config(); }
bool iAmArbiterOnly() const { return myConfig().arbiterOnly; }
bool iAmPotentiallyHot() const { return myConfig().potentiallyHot(); }
const Member *_currentPrimary;
Member *_self;
List1<Member> _members; /* all members of the set EXCEPT self. */

Expand Down Expand Up @@ -352,7 +385,7 @@ namespace mongo {

inline bool ReplSet::isMaster(const char *client) {
/* todo replset */
return isPrimary();
return box.getState().primary();
}

}
Loading

0 comments on commit 6212e5c

Please sign in to comment.