Skip to content

Commit

Permalink
SERVER-13115 Create DeleteExecutor to facilitate unlocked query parsi…
Browse files Browse the repository at this point in the history
…ng in delete operations.

Follows the pattern of the UpdateExecutor.  Applies the optimization to legacy
and command form deletes.
  • Loading branch information
amschwerin committed Mar 12, 2014
1 parent 4ddba59 commit 4b96cf1
Show file tree
Hide file tree
Showing 7 changed files with 450 additions and 169 deletions.
1 change: 1 addition & 0 deletions src/mongo/SConscript
Original file line number Diff line number Diff line change
Expand Up @@ -627,6 +627,7 @@ serverOnlyFiles = [ "db/curop.cpp",
"db/geo/s2common.cpp",
"db/ops/count.cpp",
"db/ops/delete.cpp",
"db/ops/delete_executor.cpp",
"db/ops/insert.cpp",
"db/ops/update.cpp",
"db/ops/update_executor.cpp",
Expand Down
111 changes: 55 additions & 56 deletions src/mongo/db/commands/write_commands/batch_executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,8 @@
#include "mongo/db/introspect.h"
#include "mongo/db/kill_current_op.h"
#include "mongo/db/lasterror.h"
#include "mongo/db/ops/delete.h"
#include "mongo/db/ops/delete_executor.h"
#include "mongo/db/ops/delete_request.h"
#include "mongo/db/ops/insert.h"
#include "mongo/db/ops/update_executor.h"
#include "mongo/db/ops/update_lifecycle_impl.h"
Expand Down Expand Up @@ -775,11 +776,11 @@ namespace mongo {
}
}
catch ( const DBException& ex ) {
Status stat(ex.toStatus());
if (ErrorCodes::isInterruption(stat.code())) {
Status status(ex.toStatus());
if (ErrorCodes::isInterruption(status.code())) {
throw;
}
currResult.error = toWriteError(stat);
currResult.error = toWriteError(status);
}

//
Expand Down Expand Up @@ -925,44 +926,18 @@ namespace mongo {

// Removes are similar to updates, but page faults are handled externally

const BatchedCommandRequest& request = *removeItem.getRequest();
const NamespaceString nss( removeItem.getRequest()->getNS() );

// BEGIN CURRENT OP
scoped_ptr<CurOp> currentOp( beginCurrentOp( _client, removeItem ) );
incOpStats( removeItem );

WriteOpResult result;

while ( true ) {
multiRemove( removeItem, &result );

{
// NOTE: Deletes will not fault outside the lock once any data has been written
PageFaultRetryableSection pFaultSection;

///////////////////////////////////////////
Lock::DBWrite writeLock( nss.ns() );
///////////////////////////////////////////

// Check version once we're locked

if ( !checkShardVersion( &shardingState, request, &result.error ) ) {
// Version error
break;
}

// Context once we're locked, to set more details in currentOp()
// TODO: better constructor?
Client::Context writeContext( nss.ns(),
storageGlobalParams.dbpath,
false /* don't check version */);

multiRemove( removeItem, &result );

if ( !result.fault ) {
incWriteStats( removeItem, result.stats, result.error, currentOp.get() );
break;
}
if ( !result.fault ) {
incWriteStats( removeItem, result.stats, result.error, currentOp.get() );
break;
}

//
Expand Down Expand Up @@ -1021,11 +996,11 @@ namespace mongo {
result->fault = new PageFaultException( ex );
}
catch ( const DBException& ex ) {
Status stat(ex.toStatus());
if (ErrorCodes::isInterruption(stat.code())) {
Status status(ex.toStatus());
if (ErrorCodes::isInterruption(status.code())) {
throw;
}
result->error = toWriteError(stat);
result->error = toWriteError(status);
}

}
Expand Down Expand Up @@ -1065,18 +1040,18 @@ namespace mongo {
result->fault = new PageFaultException( ex );
}
catch ( const DBException& ex ) {
Status stat(ex.toStatus());
if (ErrorCodes::isInterruption(stat.code())) {
Status status = ex.toStatus();
if (ErrorCodes::isInterruption(status.code())) {
throw;
}
result->error = toWriteError(stat);
result->error = toWriteError(status);
}
}

static void multiUpdate( const BatchItemRef& updateItem,
WriteOpResult* result ) {

NamespaceString nsString(updateItem.getRequest()->getNS());
const NamespaceString nsString(updateItem.getRequest()->getNS());
UpdateRequest request(nsString);
request.setQuery(updateItem.getUpdate()->getQuery());
request.setUpdates(updateItem.getUpdate()->getUpdateExpr());
Expand Down Expand Up @@ -1119,11 +1094,11 @@ namespace mongo {
result->stats.upsertedID = resUpsertedID;
}
catch (const DBException& ex) {
Status stat(ex.toStatus());
if (ErrorCodes::isInterruption(stat.code())) {
status = ex.toStatus();
if (ErrorCodes::isInterruption(status.code())) {
throw;
}
result->error = toWriteError(stat);
result->error = toWriteError(status);
}
}

Expand All @@ -1136,28 +1111,52 @@ namespace mongo {
static void multiRemove( const BatchItemRef& removeItem,
WriteOpResult* result ) {

Lock::assertWriteLocked( removeItem.getRequest()->getNS() );
const NamespaceString nss( removeItem.getRequest()->getNS() );
DeleteRequest request( nss );
request.setQuery( removeItem.getDelete()->getQuery() );
request.setMulti( removeItem.getDelete()->getLimit() != 1 );
request.setUpdateOpLog(true);
request.setGod( false );
DeleteExecutor executor( &request );
Status status = executor.prepare();
if ( !status.isOK() ) {
result->error = toWriteError( status );
return;
}

// NOTE: Deletes will not fault outside the lock once any data has been written
PageFaultRetryableSection pFaultSection;

///////////////////////////////////////////
Lock::DBWrite writeLock( nss.ns() );
///////////////////////////////////////////

// Check version once we're locked

if ( !checkShardVersion( &shardingState, *removeItem.getRequest(), &result->error ) ) {
// Version error
return;
}

// Context once we're locked, to set more details in currentOp()
// TODO: better constructor?
Client::Context writeContext( nss.ns(),
storageGlobalParams.dbpath,
false /* don't check version */);

try {
long long n = deleteObjects( removeItem.getRequest()->getNS(),
removeItem.getDelete()->getQuery(),
removeItem.getDelete()->getLimit() == 1, // justOne
true, // logOp
false // god
);

result->stats.n = n;
result->stats.n = executor.execute();
}
catch ( const PageFaultException& ex ) {
// TODO: An actual data structure that's not an exception for this
result->fault = new PageFaultException( ex );
}
catch ( const DBException& ex ) {
Status stat(ex.toStatus());
if (ErrorCodes::isInterruption(stat.code())) {
status = ex.toStatus();
if (ErrorCodes::isInterruption(status.code())) {
throw;
}
result->error = toWriteError(stat);
result->error = toWriteError(status);
}
}

Expand Down
15 changes: 11 additions & 4 deletions src/mongo/db/instance.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,8 @@
#include "mongo/db/mongod_options.h"
#include "mongo/db/namespace_string.h"
#include "mongo/db/ops/count.h"
#include "mongo/db/ops/delete.h"
#include "mongo/db/ops/delete_executor.h"
#include "mongo/db/ops/delete_request.h"
#include "mongo/db/ops/insert.h"
#include "mongo/db/ops/update_lifecycle_impl.h"
#include "mongo/db/ops/update_driver.h"
Expand Down Expand Up @@ -647,15 +648,21 @@ namespace mongo {
PageFaultRetryableSection s;
while ( 1 ) {
try {
DeleteRequest request(ns);
request.setQuery(pattern);
request.setMulti(!justOne);
request.setUpdateOpLog(true);
DeleteExecutor executor(&request);
uassertStatusOK(executor.prepare());
Lock::DBWrite lk(ns.ns());

// if this ever moves to outside of lock, need to adjust check Client::Context::_finishInit
if ( ! broadcast && handlePossibleShardedMessage( m , 0 ) )
return;

Client::Context ctx(ns);
long long n = deleteObjects(ns.ns(), pattern, justOne, true);

long long n = executor.execute();
lastError.getSafe()->recordDelete( n );
op.debug().ndeleted = n;
break;
Expand Down
119 changes: 10 additions & 109 deletions src/mongo/db/ops/delete.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,17 +28,8 @@

#include "mongo/db/ops/delete.h"

#include "mongo/db/client.h"
#include "mongo/db/clientcursor.h"
#include "mongo/db/curop.h"
#include "mongo/db/catalog/database.h"
#include "mongo/db/structure/catalog/namespace_details.h"
#include "mongo/db/query/get_runner.h"
#include "mongo/db/query/query_planner_common.h"
#include "mongo/db/repl/is_master.h"
#include "mongo/db/repl/oplog.h"
#include "mongo/db/catalog/collection.h"

#include "mongo/db/ops/delete_executor.h"
#include "mongo/db/ops/delete_request.h"

namespace mongo {

Expand All @@ -48,104 +39,14 @@ namespace mongo {
god: allow access to system namespaces, and don't yield
*/
long long deleteObjects(const StringData& ns, BSONObj pattern, bool justOne, bool logop, bool god) {
if (!god) {
if (ns.find( ".system.") != string::npos) {
// note a delete from system.indexes would corrupt the db if done here, as there are
// pointers into those objects in NamespaceDetails.
uassert(12050, "cannot delete from system namespace", legalClientSystemNS( ns, true ) );
}

if (ns.find('$') != string::npos) {
log() << "cannot delete from collection with reserved $ in name: " << ns << endl;
uasserted( 10100, "cannot delete from collection with reserved $ in name" );
}
}

Collection* collection = currentClient.get()->database()->getCollection(ns);
if (NULL == collection) {
return 0;
}

uassert(10101,
str::stream() << "can't remove from a capped collection: " << ns,
!collection->isCapped());

string nsForLogOp = ns.toString(); // XXX-ERH

uassert(ErrorCodes::NotMaster,
str::stream() << "Not primary while removing from " << ns,
!logop || isMasterNs(nsForLogOp.c_str()));

long long nDeleted = 0;

CanonicalQuery* cq;
if (!CanonicalQuery::canonicalize(ns.toString(), pattern, &cq).isOK()) {
uasserted(17218, "Can't canonicalize query " + pattern.toString());
return 0;
}

bool canYield = !god && !QueryPlannerCommon::hasNode(cq->root(), MatchExpression::ATOMIC);

Runner* rawRunner;
if (!getRunner(cq, &rawRunner).isOK()) {
uasserted(17219, "Can't get runner for query " + pattern.toString());
return 0;
}

auto_ptr<Runner> runner(rawRunner);
auto_ptr<ScopedRunnerRegistration> safety;

if (canYield) {
safety.reset(new ScopedRunnerRegistration(runner.get()));
runner->setYieldPolicy(Runner::YIELD_AUTO);
}

DiskLoc rloc;
Runner::RunnerState state;
CurOp* curOp = cc().curop();
int oldYieldCount = curOp->numYields();
while (Runner::RUNNER_ADVANCED == (state = runner->getNext(NULL, &rloc))) {
if (oldYieldCount != curOp->numYields()) {
uassert(ErrorCodes::NotMaster,
str::stream() << "No longer primary while removing from " << ns,
!logop || isMasterNs(nsForLogOp.c_str()));
oldYieldCount = curOp->numYields();
}
BSONObj toDelete;

// TODO: do we want to buffer docs and delete them in a group rather than
// saving/restoring state repeatedly?
runner->saveState();
collection->deleteDocument(rloc, false, false, logop ? &toDelete : NULL );
runner->restoreState();

nDeleted++;

if (logop) {
if ( toDelete.isEmpty() ) {
problem() << "deleted object without id, not logging" << endl;
}
else {
bool replJustOne = true;
logOp("d", nsForLogOp.c_str(), toDelete, 0, &replJustOne);
}
}

if (justOne) {
break;
}

if (!god) {
getDur().commitIfNeeded();
}

if (debug && god && nDeleted == 100) {
log() << "warning high number of deletes with god=true "
<< " which could use significant memory b/c we don't commit journal";
}
}

return nDeleted;
NamespaceString nsString(ns);
DeleteRequest request(nsString);
request.setQuery(pattern);
request.setMulti(!justOne);
request.setUpdateOpLog(logop);
request.setGod(god);
DeleteExecutor executor(&request);
return executor.execute();
}

} // namespace mongo
Loading

0 comments on commit 4b96cf1

Please sign in to comment.