Skip to content

Commit

Permalink
SERVER-35357 Implement a temporary map of transaction runtime states …
Browse files Browse the repository at this point in the history
…on MongoS
  • Loading branch information
renctan committed Jun 20, 2018
1 parent c0c902a commit 862bdee
Show file tree
Hide file tree
Showing 12 changed files with 596 additions and 66 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,6 @@ selector:
- jstests/sharding/resume_change_stream.js
- jstests/sharding/change_stream_resume_from_different_mongos.js
- jstests/sharding/change_stream_shard_failover.js
- jstests/sharding/transactions_prohibited_in_sharded_cluster.js
# Requires count command to be accurate on sharded clusters, introduced in v4.0.
- jstests/sharding/accurate_count_with_predicate.js
# Enable when SERVER-33538 is backported.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,6 @@ selector:
- jstests/sharding/resume_change_stream.js
- jstests/sharding/change_stream_resume_from_different_mongos.js
- jstests/sharding/change_stream_shard_failover.js
- jstests/sharding/transactions_prohibited_in_sharded_cluster.js
# Requires count command to be accurate on sharded clusters, introduced in v4.0.
- jstests/sharding/accurate_count_with_predicate.js
# Disabled throughout sharding. See SERVER-31937 for details.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,6 @@ selector:
- jstests/sharding/change_streams_whole_db.js
- jstests/sharding/lookup_change_stream_post_image_compound_shard_key.js
- jstests/sharding/resume_change_stream.js
- jstests/sharding/transactions_prohibited_in_sharded_cluster.js
# Requires count command to be accurate on sharded clusters, introduced in v4.0.
- jstests/sharding/accurate_count_with_predicate.js
# Enable when SERVER-33538 is backported.
Expand Down
57 changes: 0 additions & 57 deletions jstests/sharding/transactions_prohibited_in_sharded_cluster.js

This file was deleted.

1 change: 1 addition & 0 deletions src/mongo/SConscript
Original file line number Diff line number Diff line change
Expand Up @@ -428,6 +428,7 @@ mongos = env.Program(
's/is_mongos',
's/sharding_egress_metadata_hook_for_mongos',
's/sharding_initialization',
's/transaction/router_session',
's/query/cluster_cursor_cleanup_job',
'transport/service_entry_point',
'transport/transport_layer_manager',
Expand Down
2 changes: 2 additions & 0 deletions src/mongo/s/SConscript
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ env.SConscript(
'client',
'commands',
'query',
'transaction',
'write_ops',
],
exports=[
Expand Down Expand Up @@ -100,6 +101,7 @@ env.Library(
"$BUILD_DIR/mongo/s/client/sharding_client",
"$BUILD_DIR/mongo/s/coreshard",
'$BUILD_DIR/mongo/s/client/shard_interface',
'transaction/router_session',
],
)

Expand Down
17 changes: 16 additions & 1 deletion src/mongo/s/async_requests_sender.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
#include "mongo/rpc/get_status_from_command_result.h"
#include "mongo/s/client/shard_registry.h"
#include "mongo/s/grid.h"
#include "mongo/s/transaction/router_session_runtime_state.h"
#include "mongo/stdx/memory.h"
#include "mongo/transport/baton.h"
#include "mongo/transport/transport_layer.h"
Expand Down Expand Up @@ -68,8 +69,16 @@ AsyncRequestsSender::AsyncRequestsSender(OperationContext* opCtx,
_db(dbName.toString()),
_readPreference(readPreference),
_retryPolicy(retryPolicy) {
auto routerSession = RouterSessionRuntimeState::get(opCtx);

for (const auto& request : requests) {
_remotes.emplace_back(request.shardId, request.cmdObj);
auto cmdObj = request.cmdObj;
if (routerSession) {
auto& participant = routerSession->getOrCreateParticipant(request.shardId);
cmdObj = participant.attachTxnFieldsIfNeeded(request.cmdObj);
}

_remotes.emplace_back(request.shardId, cmdObj);
}

// Initialize command metadata to handle the read preference.
Expand Down Expand Up @@ -292,7 +301,13 @@ void AsyncRequestsSender::_makeProgress(OperationContext* opCtx) {
// Store the response or error.
if (job->cbData.response.status.isOK()) {
remote.swResponse = std::move(job->cbData.response);

if (auto routerSession = RouterSessionRuntimeState::get(opCtx)) {
auto& participant = routerSession->getOrCreateParticipant(remote.shardId);
participant.markAsCommandSent();
}
} else {
// TODO: call participant.markAsCommandSent on "transaction already started" errors?
remote.swResponse = std::move(job->cbData.response.status);
}
}
Expand Down
25 changes: 20 additions & 5 deletions src/mongo/s/commands/strategy.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@
#include "mongo/s/query/cluster_cursor_manager.h"
#include "mongo/s/query/cluster_find.h"
#include "mongo/s/stale_exception.h"
#include "mongo/s/transaction/router_session_runtime_state.h"
#include "mongo/util/fail_point_service.h"
#include "mongo/util/log.h"
#include "mongo/util/mongoutils/str.h"
Expand Down Expand Up @@ -301,10 +302,6 @@ void runCommand(OperationContext* opCtx,
}

CommandHelpers::uassertShouldAttemptParse(opCtx, command, request);
// Transactions are disallowed in sharded clusters in MongoDB 4.0.
uassert(50841,
"Multi-document transactions cannot be run in a sharded cluster.",
!request.body.hasField("startTransaction") && !request.body.hasField("autocommit"));

// Parse the 'maxTimeMS' command option, and use it to set a deadline for the operation on
// the OperationContext. Be sure to do this as soon as possible so that further processing by
Expand Down Expand Up @@ -335,7 +332,25 @@ void runCommand(OperationContext* opCtx,
// Fill out all currentOp details.
CurOp::get(opCtx)->setGenericOpRequestDetails(opCtx, nss, command, request.body, opType);

initializeOperationSessionInfo(opCtx, request.body, command->requiresAuth(), true, true, true);
boost::optional<ScopedRouterSession> scopedSession;
if (auto osi = initializeOperationSessionInfo(
opCtx, request.body, command->requiresAuth(), true, true, true)) {

if (osi->getAutocommit()) {
scopedSession.emplace(opCtx);

auto routerSession = RouterSessionRuntimeState::get(opCtx);
invariant(routerSession);

auto txnNumber = opCtx->getTxnNumber();
invariant(txnNumber);

auto startTxnSetting = osi->getStartTransaction();
bool startTransaction = startTxnSetting ? *startTxnSetting : false;

routerSession->beginOrContinueTxn(*txnNumber, startTransaction);
}
}

auto& readConcernArgs = repl::ReadConcernArgs::get(opCtx);
auto readConcernParseStatus = readConcernArgs.initialize(request.body);
Expand Down
26 changes: 26 additions & 0 deletions src/mongo/s/transaction/SConscript
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
# -*- mode: python -*-

Import("env")

env = env.Clone()

env.Library(
target='router_session',
source=[
'router_session_runtime_state.cpp',
],
LIBDEPS=[
'$BUILD_DIR/mongo/db/logical_session_id',
'$BUILD_DIR/mongo/s/common_s',
],
)

env.CppUnitTest(
target='router_session_test',
source=[
'router_session_runtime_state_test.cpp',
],
LIBDEPS=[
'router_session',
]
)
Loading

0 comments on commit 862bdee

Please sign in to comment.