Skip to content

Commit

Permalink
fix connection pooling with socket timeouts
Browse files Browse the repository at this point in the history
  • Loading branch information
erh committed Jul 12, 2011
1 parent db033b9 commit 68e0ade
Show file tree
Hide file tree
Showing 6 changed files with 66 additions and 21 deletions.
56 changes: 39 additions & 17 deletions client/connpool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -46,17 +46,24 @@ namespace mongo {
}
}

DBClientBase * PoolForHost::get( DBConnectionPool * pool ) {
DBClientBase * PoolForHost::get( DBConnectionPool * pool , double socketTimeout ) {

time_t now = time(0);

while ( ! _pool.empty() ) {
StoredConnection sc = _pool.top();
_pool.pop();
if ( sc.ok( now ) )
return sc.conn;
pool->onDestory( sc.conn );
delete sc.conn;

if ( ! sc.ok( now ) ) {
pool->onDestory( sc.conn );
delete sc.conn;
continue;
}

assert( sc.conn->getSoTimeout() == socketTimeout );

return sc.conn;

}

return NULL;
Expand Down Expand Up @@ -125,17 +132,17 @@ namespace mongo {
_hooks( new list<DBConnectionHook*>() ) {
}

DBClientBase* DBConnectionPool::_get(const string& ident) {
DBClientBase* DBConnectionPool::_get(const string& ident , double socketTimeout ) {
assert( ! inShutdown() );
scoped_lock L(_mutex);
PoolForHost& p = _pools[ident];
return p.get( this );
PoolForHost& p = _pools[PoolKey(ident,socketTimeout)];
return p.get( this , socketTimeout );
}

DBClientBase* DBConnectionPool::_finishCreate( const string& host , DBClientBase* conn ) {
DBClientBase* DBConnectionPool::_finishCreate( const string& host , double socketTimeout , DBClientBase* conn ) {
{
scoped_lock L(_mutex);
PoolForHost& p = _pools[host];
PoolForHost& p = _pools[PoolKey(host,socketTimeout)];
p.createdOne( conn );
}

Expand All @@ -146,7 +153,7 @@ namespace mongo {
}

DBClientBase* DBConnectionPool::get(const ConnectionString& url, double socketTimeout) {
DBClientBase * c = _get( url.toString() );
DBClientBase * c = _get( url.toString() , socketTimeout );
if ( c ) {
onHandedOut( c );
return c;
Expand All @@ -156,11 +163,11 @@ namespace mongo {
c = url.connect( errmsg, socketTimeout );
uassert( 13328 , _name + ": connect failed " + url.toString() + " : " + errmsg , c );

return _finishCreate( url.toString() , c );
return _finishCreate( url.toString() , socketTimeout , c );
}

DBClientBase* DBConnectionPool::get(const string& host, double socketTimeout) {
DBClientBase * c = _get( host );
DBClientBase * c = _get( host , socketTimeout );
if ( c ) {
onHandedOut( c );
return c;
Expand All @@ -173,7 +180,7 @@ namespace mongo {
c = cs.connect( errmsg, socketTimeout );
if ( ! c )
throw SocketException( SocketException::CONNECT_ERROR , host , 11002 , str::stream() << _name << " error: " << errmsg );
return _finishCreate( host , c );
return _finishCreate( host , socketTimeout , c );
}

void DBConnectionPool::release(const string& host, DBClientBase *c) {
Expand All @@ -183,7 +190,7 @@ namespace mongo {
return;
}
scoped_lock L(_mutex);
_pools[host].done(this,c);
_pools[PoolKey(host,c->getSoTimeout())].done(this,c);
}


Expand Down Expand Up @@ -244,7 +251,8 @@ namespace mongo {
if ( i->second.numCreated() == 0 )
continue;

string s = i->first;
string s = str::stream() << i->first.ident << "::" << i->first.timeout;

BSONObjBuilder temp( bb.subobjStart( s ) );
temp.append( "available" , i->second.numAvailable() );
temp.appendNumber( "created" , i->second.numCreated() );
Expand Down Expand Up @@ -277,6 +285,20 @@ namespace mongo {

return ap < bp;
}

bool DBConnectionPool::poolKeyCompare::operator()( const PoolKey& a , const PoolKey& b ) const {
string ap = str::before( a.ident , "/" );
string bp = str::before( b.ident , "/" );

if ( ap < bp )
return true;

if ( ap > bp )
return false;

return a.timeout < b.timeout;
}


void DBConnectionPool::taskDoWork() {
vector<DBClientBase*> toDelete;
Expand Down
19 changes: 15 additions & 4 deletions client/connpool.h
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ namespace mongo {
/**
* gets a connection or return NULL
*/
DBClientBase * get( DBConnectionPool * pool);
DBClientBase * get( DBConnectionPool * pool , double socketTimeout );

void done( DBConnectionPool * pool , DBClientBase * c );

Expand All @@ -77,6 +77,7 @@ namespace mongo {
};

std::stack<StoredConnection> _pool;

long long _created;
ConnectionString::ConnectionType _type;

Expand Down Expand Up @@ -141,11 +142,21 @@ namespace mongo {
private:
DBConnectionPool( DBConnectionPool& p );

DBClientBase* _get( const string& ident );
DBClientBase* _get( const string& ident , double socketTimeout );

DBClientBase* _finishCreate( const string& ident , DBClientBase* conn );
DBClientBase* _finishCreate( const string& ident , double socketTimeout, DBClientBase* conn );

typedef map<string,PoolForHost,serverNameCompare> PoolMap; // servername -> pool
struct PoolKey {
PoolKey( string i , double t ) : ident( i ) , timeout( t ) {}
string ident;
double timeout;
};

struct poolKeyCompare {
bool operator()( const PoolKey& a , const PoolKey& b ) const;
};

typedef map<PoolKey,PoolForHost,poolKeyCompare> PoolMap; // servername -> pool

mongo::mutex _mutex;
string _name;
Expand Down
3 changes: 3 additions & 0 deletions client/dbclient.h
Original file line number Diff line number Diff line change
Expand Up @@ -799,6 +799,8 @@ namespace mongo {
// virtual bool callWrite( Message& toSend , Message& response ) = 0; // TODO: add this if needed

virtual ConnectionString::ConnectionType type() const = 0;

virtual double getSoTimeout() const = 0;

}; // DBClientBase

Expand Down Expand Up @@ -917,6 +919,7 @@ namespace mongo {
virtual bool call( Message &toSend, Message &response, bool assertOk = true , string * actualServer = 0 );
virtual ConnectionString::ConnectionType type() const { return ConnectionString::MASTER; }
void setSoTimeout(double to) { _so_timeout = to; }
double getSoTimeout() const { return _so_timeout; }

static int getNumConnections() {
return _numConnections;
Expand Down
6 changes: 6 additions & 0 deletions client/dbclient_rs.h
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,11 @@ namespace mongo {

// ----- informational ----

/**
* timeout not supported in DBClientReplicaSet yet
*/
double getSoTimeout() const { return 0; }

string toString() { return getServerAddress(); }

string getServerAddress() const { return _monitor->getServerAddress(); }
Expand All @@ -239,6 +244,7 @@ namespace mongo {
virtual bool call( Message &toSend, Message &response, bool assertOk=true , string * actualServer = 0 );
virtual bool callRead( Message& toSend , Message& response ) { return checkMaster()->callRead( toSend , response ); }


protected:
virtual void sayPiggyBack( Message &toSend ) { checkMaster()->say( toSend ); }

Expand Down
2 changes: 2 additions & 0 deletions client/syncclusterconnection.h
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,8 @@ namespace mongo {
virtual ConnectionString::ConnectionType type() const { return ConnectionString::SYNC; }

void setAllSoTimeouts( double socketTimeout );
double getSoTimeout() const { return _socketTimeout; }

virtual bool auth(const string &dbname, const string &username, const string &password_text, string& errmsg, bool digestPassword);

private:
Expand Down
1 change: 1 addition & 0 deletions db/instance.h
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,7 @@ namespace mongo {

virtual ConnectionString::ConnectionType type() const { return ConnectionString::MASTER; }

double getSoTimeout() const { return 0; }
private:
static HostAndPort _clientHost;
};
Expand Down

0 comments on commit 68e0ade

Please sign in to comment.