Skip to content

Commit

Permalink
a bunch of refactoring for mongos linking
Browse files Browse the repository at this point in the history
  • Loading branch information
erh committed Apr 13, 2010
1 parent 552fe0f commit 61ae5fa
Show file tree
Hide file tree
Showing 12 changed files with 573 additions and 408 deletions.
12 changes: 7 additions & 5 deletions SConstruct
Original file line number Diff line number Diff line change
Expand Up @@ -382,9 +382,11 @@ else:
commonFiles += [ "util/processinfo_none.cpp" ]

coreDbFiles = [ "db/commands.cpp" ]
coreServerFiles = [ "util/message_server_port.cpp" , "util/message_server_asio.cpp" ]
coreServerFiles = [ "util/message_server_port.cpp" , "util/message_server_asio.cpp" ,
"db/matcher.cpp" , "db/indexkey.cpp" ]

serverOnlyFiles = Split( "db/query.cpp db/update.cpp db/introspect.cpp db/btree.cpp db/clientcursor.cpp db/tests.cpp db/repl.cpp db/oplog.cpp db/repl_block.cpp db/btreecursor.cpp db/cloner.cpp db/namespace.cpp db/matcher_covered.cpp db/dbeval.cpp db/dbwebserver.cpp db/dbhelpers.cpp db/instance.cpp db/client.cpp db/database.cpp db/pdfile.cpp db/cursor.cpp db/security_commands.cpp db/security.cpp util/miniwebserver.cpp db/storage.cpp db/reccache.cpp db/queryoptimizer.cpp db/extsort.cpp db/mr.cpp s/d_util.cpp db/cmdline.cpp" )

serverOnlyFiles = Split( "db/query.cpp db/update.cpp db/introspect.cpp db/btree.cpp db/clientcursor.cpp db/tests.cpp db/repl.cpp db/oplog.cpp db/repl_block.cpp db/btreecursor.cpp db/cloner.cpp db/namespace.cpp db/matcher.cpp db/dbeval.cpp db/dbwebserver.cpp db/dbhelpers.cpp db/instance.cpp db/database.cpp db/pdfile.cpp db/cursor.cpp db/security_commands.cpp db/client.cpp db/security.cpp util/miniwebserver.cpp db/storage.cpp db/reccache.cpp db/queryoptimizer.cpp db/extsort.cpp db/mr.cpp s/d_util.cpp db/cmdline.cpp" )
serverOnlyFiles += [ "db/index.cpp" ] + Glob( "db/index_*.cpp" )

serverOnlyFiles += Glob( "db/dbcommands*.cpp" )
Expand Down Expand Up @@ -1079,11 +1081,11 @@ def checkErrorCodes():
checkErrorCodes()

# main db target
mongod = env.Program( "mongod" , commonFiles + coreDbFiles + serverOnlyFiles + [ "db/db.cpp" ] )
mongod = env.Program( "mongod" , commonFiles + coreDbFiles + coreServerFiles + serverOnlyFiles + [ "db/db.cpp" ] )
Default( mongod )

# tools
allToolFiles = commonFiles + coreDbFiles + serverOnlyFiles + [ "client/gridfs.cpp", "tools/tool.cpp" ]
allToolFiles = commonFiles + coreDbFiles + coreServerFiles + serverOnlyFiles + [ "client/gridfs.cpp", "tools/tool.cpp" ]
normalTools = [ "dump" , "restore" , "export" , "import" , "files" , "stat" ]
env.Alias( "tools" , [ "mongo" + x for x in normalTools ] )
for x in normalTools:
Expand All @@ -1098,7 +1100,7 @@ mongos = env.Program( "mongos" , commonFiles + coreDbFiles + coreServerFiles + s
clientLibName = str( env.Library( "mongoclient" , allClientFiles )[0] )
if GetOption( "sharedclient" ):
sharedClientLibName = str( env.SharedLibrary( "mongoclient" , allClientFiles )[0] )
env.Library( "mongotestfiles" , commonFiles + coreDbFiles + serverOnlyFiles + ["client/gridfs.cpp"])
env.Library( "mongotestfiles" , commonFiles + coreDbFiles + coreServerFiles + serverOnlyFiles + ["client/gridfs.cpp"])

clientTests = []

Expand Down
64 changes: 55 additions & 9 deletions client/parallel.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,52 @@ namespace mongo {
}


// -------- FilteringClientCursor -----------
FilteringClientCursor::FilteringClientCursor( const BSONObj filter )
: _matcher( filter ){
}

FilteringClientCursor::FilteringClientCursor( auto_ptr<DBClientCursor> cursor , const BSONObj filter )
: _matcher( filter ) , _cursor( cursor ){
}

FilteringClientCursor::~FilteringClientCursor(){
}

void FilteringClientCursor::reset( auto_ptr<DBClientCursor> cursor ){
_cursor = cursor;
_next = BSONObj();
}

bool FilteringClientCursor::more(){
if ( ! _next.isEmpty() )
return true;

_advance();
return ! _next.isEmpty();
}

BSONObj FilteringClientCursor::next(){
assert( ! _next.isEmpty() );
BSONObj ret = _next;
_next = BSONObj();
_advance();
return ret;
}

void FilteringClientCursor::_advance(){
assert( _next.isEmpty() );
if ( ! _cursor.get() )
return;

while ( _cursor->more() ){
_next = _cursor->next();
if ( _matcher.matches( _next ) )
return;
_next = BSONObj();
}
}

// -------- SerialServerClusteredCursor -----------

SerialServerClusteredCursor::SerialServerClusteredCursor( const set<ServerAndQuery>& servers , QueryMessage& q , int sortOrder) : ClusteredCursor( q ){
Expand All @@ -110,7 +156,7 @@ namespace mongo {
}

bool SerialServerClusteredCursor::more(){
if ( _current.get() && _current->more() )
if ( _current.more() )
return true;

if ( _serverIndex >= _servers.size() ){
Expand All @@ -119,8 +165,8 @@ namespace mongo {

ServerAndQuery& sq = _servers[_serverIndex++];

_current = query( sq._server , 0 , sq._extra );
if ( _current->more() )
_current.reset( query( sq._server , 0 , sq._extra ) );
if ( _current.more() )
return true;

// this sq has nothing, so keep looking
Expand All @@ -129,7 +175,7 @@ namespace mongo {

BSONObj SerialServerClusteredCursor::next(){
uassert( 10018 , "no more items" , more() );
return _current->next();
return _current.next();
}

// -------- ParallelSortClusteredCursor -----------
Expand All @@ -151,14 +197,14 @@ namespace mongo {

void ParallelSortClusteredCursor::_init(){
_numServers = _servers.size();
_cursors = new auto_ptr<DBClientCursor>[_numServers];
_cursors = new FilteringClientCursor[_numServers];
_nexts = new BSONObj[_numServers];

// TODO: parellize
int num = 0;
for ( set<ServerAndQuery>::iterator i = _servers.begin(); i!=_servers.end(); i++ ){
const ServerAndQuery& sq = *i;
_cursors[num++] = query( sq._server , 0 , sq._extra );
_cursors[num++].reset( query( sq._server , 0 , sq._extra ) );
}

}
Expand All @@ -173,7 +219,7 @@ namespace mongo {
if ( ! _nexts[i].isEmpty() )
return true;

if ( _cursors[i].get() && _cursors[i]->more() )
if ( _cursors[i].more() )
return true;
}
return false;
Expand Down Expand Up @@ -217,12 +263,12 @@ namespace mongo {
continue;
}

if ( ! _cursors[i]->more() ){
if ( ! _cursors[i].more() ){
// cursor is dead, oh well
continue;
}

_nexts[i] = _cursors[i]->next();
_nexts[i] = _cursors[i].next();
}

}
Expand Down
26 changes: 23 additions & 3 deletions client/parallel.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
#include "../stdafx.h"
#include "dbclient.h"
#include "../db/dbmessage.h"
#include "../db/matcher.h"

namespace mongo {

Expand Down Expand Up @@ -91,6 +92,25 @@ namespace mongo {
BSONObj _orderObject;
};

class FilteringClientCursor {
public:
FilteringClientCursor( const BSONObj filter = BSONObj() );
FilteringClientCursor( auto_ptr<DBClientCursor> cursor , const BSONObj filter = BSONObj() );
~FilteringClientCursor();

void reset( auto_ptr<DBClientCursor> cursor );

bool more();
BSONObj next();

private:
void _advance();

Matcher _matcher;
auto_ptr<DBClientCursor> _cursor;

BSONObj _next;
};

/**
* runs a query in serial across any number of servers
Expand All @@ -106,7 +126,7 @@ namespace mongo {
vector<ServerAndQuery> _servers;
unsigned _serverIndex;

auto_ptr<DBClientCursor> _current;
FilteringClientCursor _current;
};


Expand All @@ -131,8 +151,8 @@ namespace mongo {
int _numServers;
set<ServerAndQuery> _servers;
BSONObj _sortKey;

auto_ptr<DBClientCursor> * _cursors;
FilteringClientCursor * _cursors;
BSONObj * _nexts;
};

Expand Down
Loading

0 comments on commit 61ae5fa

Please sign in to comment.