Skip to content

Commit

Permalink
SERVER-37238 Get rid of TransactionRouter::Participant::markAsSent
Browse files Browse the repository at this point in the history
  • Loading branch information
renctan committed Sep 27, 2018
1 parent 5ce00a8 commit 4fc06cf
Show file tree
Hide file tree
Showing 5 changed files with 198 additions and 247 deletions.
14 changes: 3 additions & 11 deletions src/mongo/s/multi_statement_transaction_requests_sender.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -47,9 +47,8 @@ std::vector<AsyncRequestsSender::Request> attachTxnDetails(
newRequests.reserve(requests.size());

for (auto request : requests) {
auto& participant = txnRouter->getOrCreateParticipant(request.shardId);
newRequests.emplace_back(request.shardId,
participant.attachTxnFieldsIfNeeded(request.cmdObj));
newRequests.emplace_back(
request.shardId, txnRouter->attachTxnFieldsIfNeeded(request.shardId, request.cmdObj));
}

return newRequests;
Expand All @@ -74,14 +73,7 @@ bool MultiStatementTransactionRequestsSender::done() {
}

AsyncRequestsSender::Response MultiStatementTransactionRequestsSender::next() {
auto result = _ars.next();

if (auto txnRouter = TransactionRouter::get(_opCtx)) {
auto& participant = txnRouter->getOrCreateParticipant(result.shardId);
participant.markAsCommandSent();
}

return result;
return _ars.next();
}

void MultiStatementTransactionRequestsSender::stopRetrying() {
Expand Down
11 changes: 2 additions & 9 deletions src/mongo/s/query/cluster_aggregate.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -198,8 +198,7 @@ BSONObj genericTransformForShards(MutableDocument&& cmdForShards,

if (shardId) {
if (auto txnRouter = TransactionRouter::get(opCtx)) {
auto& participant = txnRouter->getOrCreateParticipant(*shardId);
aggCmd = participant.attachTxnFieldsIfNeeded(aggCmd);
aggCmd = txnRouter->attachTxnFieldsIfNeeded(*shardId, aggCmd);
}
}

Expand Down Expand Up @@ -276,8 +275,7 @@ BSONObj createCommandForMergingShard(const AggregationRequest& request,
auto aggCmd = mergeCmd.freeze().toBson();

if (auto txnRouter = TransactionRouter::get(mergeCtx->opCtx)) {
auto& participant = txnRouter->getOrCreateParticipant(shardId);
aggCmd = participant.attachTxnFieldsIfNeeded(aggCmd);
aggCmd = txnRouter->attachTxnFieldsIfNeeded(shardId, aggCmd);
}

// agg creates temp collection and should handle implicit create separately.
Expand Down Expand Up @@ -1207,11 +1205,6 @@ Status ClusterAggregate::aggPassthrough(OperationContext* opCtx,
: std::move(cmdObj),
Shard::RetryPolicy::kIdempotent));

if (txnRouter) {
auto& participant = txnRouter->getOrCreateParticipant(shardId);
participant.markAsCommandSent();
}

if (ErrorCodes::isStaleShardVersionError(cmdResponse.commandStatus.code())) {
uassertStatusOK(
cmdResponse.commandStatus.withContext("command failed because of stale config"));
Expand Down
50 changes: 28 additions & 22 deletions src/mongo/s/transaction_router.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,8 @@ TransactionRouter::Participant::Participant(bool isCoordinator,
_stmtIdCreatedAt(stmtIdCreatedAt),
_sharedOptions(sharedOptions) {}

BSONObj TransactionRouter::Participant::attachTxnFieldsIfNeeded(BSONObj cmd) const {
BSONObj TransactionRouter::Participant::attachTxnFieldsIfNeeded(BSONObj cmd,
bool isFirstStatementInThisParticipant) const {
// Perform checks first before calling std::move on cmd.
auto isTxnCmd = isTransactionCommand(cmd);

Expand All @@ -201,9 +202,9 @@ BSONObj TransactionRouter::Participant::attachTxnFieldsIfNeeded(BSONObj cmd) con
// TODO: SERVER-37045 assert when attaching startTransaction to killCursors command.

// The first command sent to a participant must start a transaction, unless it is a transaction
// command, which don't support the options that start transactions, i.e. starTransaction and
// command, which don't support the options that start transactions, i.e. startTransaction and
// readConcern. Otherwise the command must not have a read concern.
bool mustStartTransaction = _state == State::kMustStart && !isTxnCmd;
bool mustStartTransaction = isFirstStatementInThisParticipant && !isTxnCmd;

if (!mustStartTransaction) {
dassert(!cmd.hasField(repl::ReadConcernArgs::kReadConcernFieldName));
Expand Down Expand Up @@ -239,16 +240,6 @@ bool TransactionRouter::Participant::isCoordinator() const {
return _isCoordinator;
}

bool TransactionRouter::Participant::mustStartTransaction() const {
return _state == State::kMustStart;
}

void TransactionRouter::Participant::markAsCommandSent() {
if (_state == State::kMustStart) {
_state = State::kStarted;
}
}

StmtId TransactionRouter::Participant::getStmtIdCreatedAt() const {
return _stmtIdCreatedAt;
}
Expand Down Expand Up @@ -281,15 +272,29 @@ boost::optional<ShardId> TransactionRouter::getCoordinatorId() const {
return _coordinatorId;
}

TransactionRouter::Participant& TransactionRouter::getOrCreateParticipant(const ShardId& shard) {
BSONObj TransactionRouter::attachTxnFieldsIfNeeded(const ShardId& shardId, const BSONObj& cmdObj) {
if (auto txnPart = getParticipant(shardId)) {
return txnPart->attachTxnFieldsIfNeeded(cmdObj, false);
}

auto txnPart = _createParticipant(shardId);
return txnPart.attachTxnFieldsIfNeeded(cmdObj, true);
}

boost::optional<TransactionRouter::Participant&> TransactionRouter::getParticipant(
const ShardId& shard) {
auto iter = _participants.find(shard.toString());
if (iter != _participants.end()) {
// TODO SERVER-37223: Once mongos aborts transactions by only sending abortTransaction to
// shards that have been successfully contacted we should be able to add an invariant here
// to ensure the atClusterTime on the participant matches that on the transaction router.
return iter->second;
if (iter == _participants.end()) {
return boost::none;
}

// TODO SERVER-37223: Once mongos aborts transactions by only sending abortTransaction to
// shards that have been successfully contacted we should be able to add an invariant here
// to ensure the atClusterTime on the participant matches that on the transaction router.
return iter->second;
}

TransactionRouter::Participant& TransactionRouter::_createParticipant(const ShardId& shard) {
// The first participant is chosen as the coordinator.
auto isFirstParticipant = _participants.empty();
if (isFirstParticipant) {
Expand Down Expand Up @@ -517,7 +522,8 @@ Shard::CommandResponse TransactionRouter::_commitSingleShardTransaction(Operatio
opCtx,
ReadPreferenceSetting{ReadPreference::PrimaryOnly},
"admin",
participant.attachTxnFieldsIfNeeded(commitCmd.toBSON(opCtx->getWriteConcern().toBSON())),
participant.attachTxnFieldsIfNeeded(commitCmd.toBSON(opCtx->getWriteConcern().toBSON()),
false),
Shard::RetryPolicy::kIdempotent));
}

Expand Down Expand Up @@ -552,7 +558,7 @@ Shard::CommandResponse TransactionRouter::_commitMultiShardTransaction(Operation
shard->runFireAndForgetCommand(opCtx,
ReadPreferenceSetting{ReadPreference::PrimaryOnly},
"admin",
participant.attachTxnFieldsIfNeeded(prepareCmdObj));
participant.attachTxnFieldsIfNeeded(prepareCmdObj, false));
}

auto coordinatorShard = uassertStatusOK(shardRegistry->getShard(opCtx, *_coordinatorId));
Expand All @@ -569,7 +575,7 @@ Shard::CommandResponse TransactionRouter::_commitMultiShardTransaction(Operation
ReadPreferenceSetting{ReadPreference::PrimaryOnly},
"admin",
coordinatorIter->second.attachTxnFieldsIfNeeded(
coordinateCommitCmd.toBSON(opCtx->getWriteConcern().toBSON())),
coordinateCommitCmd.toBSON(opCtx->getWriteConcern().toBSON()), false),
Shard::RetryPolicy::kIdempotent));
}

Expand Down
48 changes: 23 additions & 25 deletions src/mongo/s/transaction_router.h
Original file line number Diff line number Diff line change
Expand Up @@ -76,42 +76,23 @@ class TransactionRouter {
StmtId stmtIdCreatedAt,
SharedTransactionOptions sharedOptions);

enum class State {
// Next transaction should include startTransaction.
kMustStart,
// startTransaction already sent to this participant.
kStarted,
};

/**
* Attaches necessary fields if this is participating in a multi statement transaction.
*/
BSONObj attachTxnFieldsIfNeeded(BSONObj cmd) const;
BSONObj attachTxnFieldsIfNeeded(BSONObj cmd, bool isFirstStatementInThisParticipant) const;

/**
* True if the participant has been chosen as the coordinator for its transaction.
*/
bool isCoordinator() const;

/**
* True if the represented shard has not been sent a command with startTransaction=true.
*/
bool mustStartTransaction() const;

/**
* Mark this participant as a node that has been successful sent a command with
* startTransaction=true.
*/
void markAsCommandSent();

/**
* Returns the highest statement id of the command during which this participant was
* created.
*/
StmtId getStmtIdCreatedAt() const;

private:
State _state{State::kMustStart};
const bool _isCoordinator{false};

// The highest statement id of the request during which this participant was created.
Expand All @@ -128,14 +109,21 @@ class TransactionRouter {
*/
void beginOrContinueTxn(OperationContext* opCtx, TxnNumber txnNumber, bool startTransaction);

/**
* Returns the participant for this transaction. Creates a new one if it doesn't exist.
*/
Participant& getOrCreateParticipant(const ShardId& shard);

void checkIn();
void checkOut();

/**
* Attaches the required transaction related fields for a request to be sent to the given
* shard.
*
* Calling this method has the following side effects:
* 1. Potentially selecting a coordinator.
* 2. Adding the shard to the list of participants.
* 3. Also append fields for first statements (ex. startTransaction, readConcern)
* if the shard was newly added to the list of participants.
*/
BSONObj attachTxnFieldsIfNeeded(const ShardId& shardId, const BSONObj& cmdObj);

/**
* Updates the transaction state to allow for a retry of the current command on a stale version
* error. Will throw if the transaction cannot be continued.
Expand Down Expand Up @@ -197,6 +185,11 @@ class TransactionRouter {
*/
static TransactionRouter* get(OperationContext* opCtx);

/**
* Returns the participant for this transaction.
*/
boost::optional<Participant&> getParticipant(const ShardId& shard);

private:
/**
* Run basic commit for transactions that touched a single shard.
Expand Down Expand Up @@ -235,6 +228,11 @@ class TransactionRouter {
*/
void _clearPendingParticipants();

/**
* Creates a new participant for the shard.
*/
Participant& _createParticipant(const ShardId& shard);

const LogicalSessionId _sessionId;
TxnNumber _txnNumber{kUninitializedTxnNumber};

Expand Down
Loading

0 comments on commit 4fc06cf

Please sign in to comment.