diff --git a/src/kudu/consensus/CMakeLists.txt b/src/kudu/consensus/CMakeLists.txt index d8d330641b..929fb16148 100644 --- a/src/kudu/consensus/CMakeLists.txt +++ b/src/kudu/consensus/CMakeLists.txt @@ -103,6 +103,7 @@ set(CONSENSUS_SRCS leader_election.cc log_cache.cc peer_manager.cc + pending_rounds.cc quorum_util.cc raft_consensus.cc raft_consensus_state.cc diff --git a/src/kudu/consensus/pending_rounds.cc b/src/kudu/consensus/pending_rounds.cc new file mode 100644 index 0000000000..121cb2456b --- /dev/null +++ b/src/kudu/consensus/pending_rounds.cc @@ -0,0 +1,228 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "kudu/consensus/pending_rounds.h" + +#include "kudu/consensus/time_manager.h" +#include "kudu/gutil/map-util.h" +#include "kudu/gutil/strings/substitute.h" +#include "kudu/util/logging.h" +#include "kudu/util/pb_util.h" +#include "kudu/util/status.h" + +namespace kudu { +namespace consensus { + +using std::string; +using strings::Substitute; + +//------------------------------------------------------------ +// PendingRounds +//------------------------------------------------------------ + +PendingRounds::PendingRounds(string log_prefix, scoped_refptr time_manager) + : log_prefix_(std::move(log_prefix)), + last_committed_op_id_(MinimumOpId()), + time_manager_(std::move(time_manager)) {} + +PendingRounds::~PendingRounds() { +} + +Status PendingRounds::CancelPendingTransactions() { + ThreadRestrictions::AssertWaitAllowed(); + if (pending_txns_.empty()) { + return Status::OK(); + } + + LOG_WITH_PREFIX(INFO) << "Trying to abort " << pending_txns_.size() + << " pending transactions."; + for (const auto& txn : pending_txns_) { + const scoped_refptr& round = txn.second; + // We cancel only transactions whose applies have not yet been triggered. + LOG_WITH_PREFIX(INFO) << "Aborting transaction as it isn't in flight: " + << SecureShortDebugString(*txn.second->replicate_msg()); + round->NotifyReplicationFinished(Status::Aborted("Transaction aborted")); + } + return Status::OK(); +} + +void PendingRounds::AbortOpsAfter(int64_t index) { + LOG_WITH_PREFIX(INFO) << "Aborting all transactions after (but not including) " + << index; + + DCHECK_GE(index, 0); + OpId new_preceding; + + auto iter = pending_txns_.lower_bound(index); + + // Either the new preceding id is in the pendings set or it must be equal to the + // committed index since we can't truncate already committed operations. + if (iter != pending_txns_.end() && (*iter).first == index) { + new_preceding = (*iter).second->replicate_msg()->id(); + ++iter; + } else { + CHECK_EQ(index, last_committed_op_id_.index()); + new_preceding = last_committed_op_id_; + } + + for (; iter != pending_txns_.end();) { + const scoped_refptr& round = (*iter).second; + auto op_type = round->replicate_msg()->op_type(); + LOG_WITH_PREFIX(INFO) + << "Aborting uncommitted " << OperationType_Name(op_type) + << " operation due to leader change: " << round->replicate_msg()->id(); + + round->NotifyReplicationFinished(Status::Aborted("Transaction aborted by new leader")); + // Erase the entry from pendings. + pending_txns_.erase(iter++); + } +} + +Status PendingRounds::AddPendingOperation(const scoped_refptr& round) { + InsertOrDie(&pending_txns_, round->replicate_msg()->id().index(), round); + return Status::OK(); +} + +scoped_refptr PendingRounds::GetPendingOpByIndexOrNull(int64_t index) { + return FindPtrOrNull(pending_txns_, index); +} + +bool PendingRounds::IsOpCommittedOrPending(const OpId& op_id, bool* term_mismatch) { + + *term_mismatch = false; + + if (op_id.index() <= GetCommittedIndex()) { + return true; + } + + scoped_refptr round = GetPendingOpByIndexOrNull(op_id.index()); + if (!round) { + return false; + } + + if (round->id().term() != op_id.term()) { + *term_mismatch = true; + return false; + } + return true; +} + +OpId PendingRounds::GetLastPendingTransactionOpId() const { + return pending_txns_.empty() + ? MinimumOpId() : (--pending_txns_.end())->second->id(); +} + +Status PendingRounds::AdvanceCommittedIndex(int64_t committed_index) { + // If we already committed up to (or past) 'id' return. + // This can happen in the case that multiple UpdateConsensus() calls end + // up in the RPC queue at the same time, and then might get interleaved out + // of order. + if (last_committed_op_id_.index() >= committed_index) { + VLOG_WITH_PREFIX(1) + << "Already marked ops through " << last_committed_op_id_ << " as committed. " + << "Now trying to mark " << committed_index << " which would be a no-op."; + return Status::OK(); + } + + if (pending_txns_.empty()) { + LOG(ERROR) << "Advancing commit index to " << committed_index + << " from " << last_committed_op_id_ + << " we have no pending txns" + << GetStackTrace(); + VLOG_WITH_PREFIX(1) << "No transactions to mark as committed up to: " + << committed_index; + return Status::OK(); + } + + // Start at the operation after the last committed one. + auto iter = pending_txns_.upper_bound(last_committed_op_id_.index()); + // Stop at the operation after the last one we must commit. + auto end_iter = pending_txns_.upper_bound(committed_index); + CHECK(iter != pending_txns_.end()); + + VLOG_WITH_PREFIX(1) << "Last triggered apply was: " + << last_committed_op_id_ + << " Starting to apply from log index: " << (*iter).first; + + while (iter != end_iter) { + scoped_refptr round = (*iter).second; // Make a copy. + DCHECK(round); + const OpId& current_id = round->id(); + + if (PREDICT_TRUE(!OpIdEquals(last_committed_op_id_, MinimumOpId()))) { + CHECK_OK(CheckOpInSequence(last_committed_op_id_, current_id)); + } + + pending_txns_.erase(iter++); + last_committed_op_id_ = round->id(); + time_manager_->AdvanceSafeTimeWithMessage(*round->replicate_msg()); + round->NotifyReplicationFinished(Status::OK()); + } + + return Status::OK(); +} + +Status PendingRounds::SetInitialCommittedOpId(const OpId& committed_op) { + CHECK_EQ(last_committed_op_id_.index(), 0); + if (!pending_txns_.empty()) { + int64_t first_pending_index = pending_txns_.begin()->first; + if (committed_op.index() < first_pending_index) { + if (committed_op.index() != first_pending_index - 1) { + return Status::Corruption(Substitute( + "pending operations should start at first operation " + "after the committed operation (committed=$0, first pending=$1)", + OpIdToString(committed_op), first_pending_index)); + } + last_committed_op_id_ = committed_op; + } + + RETURN_NOT_OK(AdvanceCommittedIndex(committed_op.index())); + CHECK_EQ(SecureShortDebugString(last_committed_op_id_), + SecureShortDebugString(committed_op)); + + } else { + last_committed_op_id_ = committed_op; + } + return Status::OK(); +} + +Status PendingRounds::CheckOpInSequence(const OpId& previous, const OpId& current) { + if (current.term() < previous.term()) { + return Status::Corruption(Substitute("New operation's term is not >= than the previous " + "op's term. Current: $0. Previous: $1", OpIdToString(current), OpIdToString(previous))); + } + if (current.index() != previous.index() + 1) { + return Status::Corruption(Substitute("New operation's index does not follow the previous" + " op's index. Current: $0. Previous: $1", OpIdToString(current), OpIdToString(previous))); + } + return Status::OK(); +} + +int64_t PendingRounds::GetCommittedIndex() const { + return last_committed_op_id_.index(); +} + +int64_t PendingRounds::GetTermWithLastCommittedOp() const { + return last_committed_op_id_.term(); +} + +int PendingRounds::GetNumPendingTxns() const { + return pending_txns_.size(); +} + +} // namespace consensus +} // namespace kudu diff --git a/src/kudu/consensus/pending_rounds.h b/src/kudu/consensus/pending_rounds.h new file mode 100644 index 0000000000..02a8686515 --- /dev/null +++ b/src/kudu/consensus/pending_rounds.h @@ -0,0 +1,116 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include +#include + +#include "kudu/consensus/consensus.h" +#include "kudu/consensus/opid_util.h" +#include "kudu/gutil/macros.h" +#include "kudu/util/status.h" + +namespace kudu { +namespace consensus { + +class TimeManager; + +// Tracks the pending consensus rounds being managed by a Raft replica (either leader +// or follower). +// +// This class is not thread-safe. +// +// TODO(todd): this class inconsistently uses the term "round", "op", and "transaction". +// We should consolidate to "round". +class PendingRounds { + public: + PendingRounds(std::string log_prefix, scoped_refptr time_manager); + ~PendingRounds(); + + // Set the committed op during startup. This should be done after + // appending any of the pending transactions, and will take care + // of triggering any that are now considered committed. + Status SetInitialCommittedOpId(const OpId& committed_op); + + // Returns the the ConsensusRound with the provided index, if there is any, or NULL + // if there isn't. + scoped_refptr GetPendingOpByIndexOrNull(int64_t index); + + // Add 'round' to the set of rounds waiting to be committed. + Status AddPendingOperation(const scoped_refptr& round); + + // Advances the committed index. + // This is a no-op if the committed index has not changed. + Status AdvanceCommittedIndex(int64_t committed_index); + + // Aborts pending operations after, but not including 'index'. The OpId with 'index' + // will become our new last received id. If there are pending operations with indexes + // higher than 'index' those operations are aborted. + void AbortOpsAfter(int64_t index); + + // Returns true if an operation is in this replica's log, namely: + // - If the op's index is lower than or equal to our committed index + // - If the op id matches an inflight op. + // If an operation with the same index is in our log but the terms + // are different 'term_mismatch' is set to true, it is false otherwise. + bool IsOpCommittedOrPending(const OpId& op_id, bool* term_mismatch); + + // Returns the id of the latest pending transaction (i.e. the one with the + // latest index). This must be called under the lock. + OpId GetLastPendingTransactionOpId() const; + + // Used by replicas to cancel pending transactions. Pending transaction are those + // that have completed prepare/replicate but are waiting on the LEADER's commit + // to complete. This does not cancel transactions being applied. + Status CancelPendingTransactions(); + + // Returns the number of transactions that are currently in the pending state + // i.e. transactions for which Prepare() is done or under way. + int GetNumPendingTxns() const; + + // Returns the watermark below which all operations are known to + // be committed according to consensus. + // TODO(todd): these should probably be removed in favor of using the queue. + int64_t GetCommittedIndex() const; + int64_t GetTermWithLastCommittedOp() const; + + // Checks that 'current' correctly follows 'previous'. Specifically it checks + // that the term is the same or higher and that the index is sequential. + static Status CheckOpInSequence(const OpId& previous, const OpId& current); + + private: + const std::string& LogPrefix() const { return log_prefix_; } + + const std::string log_prefix_; + + // Index=>Round map that manages pending ops, i.e. operations for which we've + // received a replicate message from the leader but have yet to be committed. + // The key is the index of the replicate operation. + typedef std::map > IndexToRoundMap; + IndexToRoundMap pending_txns_; + + // The OpId of the round that was last committed. Initialized to MinimumOpId(). + OpId last_committed_op_id_; + + scoped_refptr time_manager_; + + DISALLOW_COPY_AND_ASSIGN(PendingRounds); +}; + +} // namespace consensus +} // namespace kudu diff --git a/src/kudu/consensus/raft_consensus.h b/src/kudu/consensus/raft_consensus.h index 0c6fa34ebd..b2186dbbdd 100644 --- a/src/kudu/consensus/raft_consensus.h +++ b/src/kudu/consensus/raft_consensus.h @@ -29,6 +29,7 @@ #include "kudu/consensus/consensus.pb.h" #include "kudu/consensus/consensus_meta.h" #include "kudu/consensus/consensus_queue.h" +#include "kudu/consensus/pending_rounds.h" #include "kudu/consensus/raft_consensus_state.h" #include "kudu/consensus/time_manager.h" #include "kudu/util/atomic.h" diff --git a/src/kudu/consensus/raft_consensus_state.cc b/src/kudu/consensus/raft_consensus_state.cc index e06eea7921..c7c26dbf3d 100644 --- a/src/kudu/consensus/raft_consensus_state.cc +++ b/src/kudu/consensus/raft_consensus_state.cc @@ -15,22 +15,14 @@ // specific language governing permissions and limitations // under the License. -#include -#include +#include "kudu/consensus/raft_consensus_state.h" -#include "kudu/consensus/log_util.h" #include "kudu/consensus/quorum_util.h" -#include "kudu/consensus/raft_consensus_state.h" -#include "kudu/consensus/time_manager.h" -#include "kudu/gutil/map-util.h" -#include "kudu/gutil/strings/join.h" -#include "kudu/gutil/strings/strcat.h" #include "kudu/gutil/strings/substitute.h" #include "kudu/util/debug/trace_event.h" #include "kudu/util/logging.h" #include "kudu/util/pb_util.h" #include "kudu/util/status.h" -#include "kudu/util/trace.h" namespace kudu { namespace consensus { @@ -38,7 +30,6 @@ namespace consensus { using std::string; using std::unique_ptr; using strings::Substitute; -using strings::SubstituteAndAppend; ////////////////////////////////////////////////// // ReplicaState @@ -364,201 +355,6 @@ string ReplicaState::ToStringUnlocked() const { peer_uuid_, state_, RaftPeerPB::Role_Name(GetActiveRoleUnlocked())); } -//------------------------------------------------------------ -// PendingRounds -// TODO(todd): move to its own file -//------------------------------------------------------------ - -PendingRounds::PendingRounds(string log_prefix, scoped_refptr time_manager) - : log_prefix_(std::move(log_prefix)), - last_committed_op_id_(MinimumOpId()), - time_manager_(std::move(time_manager)) {} - -PendingRounds::~PendingRounds() { -} - -Status PendingRounds::CancelPendingTransactions() { - ThreadRestrictions::AssertWaitAllowed(); - if (pending_txns_.empty()) { - return Status::OK(); - } - - LOG_WITH_PREFIX(INFO) << "Trying to abort " << pending_txns_.size() - << " pending transactions."; - for (const auto& txn : pending_txns_) { - const scoped_refptr& round = txn.second; - // We cancel only transactions whose applies have not yet been triggered. - LOG_WITH_PREFIX(INFO) << "Aborting transaction as it isn't in flight: " - << SecureShortDebugString(*txn.second->replicate_msg()); - round->NotifyReplicationFinished(Status::Aborted("Transaction aborted")); - } - return Status::OK(); -} - -void PendingRounds::AbortOpsAfter(int64_t index) { - LOG_WITH_PREFIX(INFO) << "Aborting all transactions after (but not including) " - << index; - - DCHECK_GE(index, 0); - OpId new_preceding; - - auto iter = pending_txns_.lower_bound(index); - - // Either the new preceding id is in the pendings set or it must be equal to the - // committed index since we can't truncate already committed operations. - if (iter != pending_txns_.end() && (*iter).first == index) { - new_preceding = (*iter).second->replicate_msg()->id(); - ++iter; - } else { - CHECK_EQ(index, last_committed_op_id_.index()); - new_preceding = last_committed_op_id_; - } - - for (; iter != pending_txns_.end();) { - const scoped_refptr& round = (*iter).second; - auto op_type = round->replicate_msg()->op_type(); - LOG_WITH_PREFIX(INFO) - << "Aborting uncommitted " << OperationType_Name(op_type) - << " operation due to leader change: " << round->replicate_msg()->id(); - - round->NotifyReplicationFinished(Status::Aborted("Transaction aborted by new leader")); - // Erase the entry from pendings. - pending_txns_.erase(iter++); - } -} - -Status PendingRounds::AddPendingOperation(const scoped_refptr& round) { - InsertOrDie(&pending_txns_, round->replicate_msg()->id().index(), round); - return Status::OK(); -} - -scoped_refptr PendingRounds::GetPendingOpByIndexOrNull(int64_t index) { - return FindPtrOrNull(pending_txns_, index); -} - -bool PendingRounds::IsOpCommittedOrPending(const OpId& op_id, bool* term_mismatch) { - - *term_mismatch = false; - - if (op_id.index() <= GetCommittedIndex()) { - return true; - } - - scoped_refptr round = GetPendingOpByIndexOrNull(op_id.index()); - if (!round) { - return false; - } - - if (round->id().term() != op_id.term()) { - *term_mismatch = true; - return false; - } - return true; -} - -OpId PendingRounds::GetLastPendingTransactionOpId() const { - return pending_txns_.empty() - ? MinimumOpId() : (--pending_txns_.end())->second->id(); -} - -Status PendingRounds::AdvanceCommittedIndex(int64_t committed_index) { - // If we already committed up to (or past) 'id' return. - // This can happen in the case that multiple UpdateConsensus() calls end - // up in the RPC queue at the same time, and then might get interleaved out - // of order. - if (last_committed_op_id_.index() >= committed_index) { - VLOG_WITH_PREFIX(1) - << "Already marked ops through " << last_committed_op_id_ << " as committed. " - << "Now trying to mark " << committed_index << " which would be a no-op."; - return Status::OK(); - } - - if (pending_txns_.empty()) { - LOG(ERROR) << "Advancing commit index to " << committed_index - << " from " << last_committed_op_id_ - << " we have no pending txns" - << GetStackTrace(); - VLOG_WITH_PREFIX(1) << "No transactions to mark as committed up to: " - << committed_index; - return Status::OK(); - } - - // Start at the operation after the last committed one. - auto iter = pending_txns_.upper_bound(last_committed_op_id_.index()); - // Stop at the operation after the last one we must commit. - auto end_iter = pending_txns_.upper_bound(committed_index); - CHECK(iter != pending_txns_.end()); - - VLOG_WITH_PREFIX(1) << "Last triggered apply was: " - << last_committed_op_id_ - << " Starting to apply from log index: " << (*iter).first; - - while (iter != end_iter) { - scoped_refptr round = (*iter).second; // Make a copy. - DCHECK(round); - const OpId& current_id = round->id(); - - if (PREDICT_TRUE(!OpIdEquals(last_committed_op_id_, MinimumOpId()))) { - CHECK_OK(CheckOpInSequence(last_committed_op_id_, current_id)); - } - - pending_txns_.erase(iter++); - last_committed_op_id_ = round->id(); - time_manager_->AdvanceSafeTimeWithMessage(*round->replicate_msg()); - round->NotifyReplicationFinished(Status::OK()); - } - - return Status::OK(); -} - -Status PendingRounds::SetInitialCommittedOpId(const OpId& committed_op) { - CHECK_EQ(last_committed_op_id_.index(), 0); - if (!pending_txns_.empty()) { - int64_t first_pending_index = pending_txns_.begin()->first; - if (committed_op.index() < first_pending_index) { - if (committed_op.index() != first_pending_index - 1) { - return Status::Corruption(Substitute( - "pending operations should start at first operation " - "after the committed operation (committed=$0, first pending=$1)", - OpIdToString(committed_op), first_pending_index)); - } - last_committed_op_id_ = committed_op; - } - - RETURN_NOT_OK(AdvanceCommittedIndex(committed_op.index())); - CHECK_EQ(SecureShortDebugString(last_committed_op_id_), - SecureShortDebugString(committed_op)); - - } else { - last_committed_op_id_ = committed_op; - } - return Status::OK(); -} - -Status PendingRounds::CheckOpInSequence(const OpId& previous, const OpId& current) { - if (current.term() < previous.term()) { - return Status::Corruption(Substitute("New operation's term is not >= than the previous " - "op's term. Current: $0. Previous: $1", OpIdToString(current), OpIdToString(previous))); - } - if (current.index() != previous.index() + 1) { - return Status::Corruption(Substitute("New operation's index does not follow the previous" - " op's index. Current: $0. Previous: $1", OpIdToString(current), OpIdToString(previous))); - } - return Status::OK(); -} - -int64_t PendingRounds::GetCommittedIndex() const { - return last_committed_op_id_.index(); -} - -int64_t PendingRounds::GetTermWithLastCommittedOp() const { - return last_committed_op_id_.term(); -} - -int PendingRounds::GetNumPendingTxns() const { - return pending_txns_.size(); -} - } // namespace consensus } // namespace kudu diff --git a/src/kudu/consensus/raft_consensus_state.h b/src/kudu/consensus/raft_consensus_state.h index 9350ab8a84..ee53f25790 100644 --- a/src/kudu/consensus/raft_consensus_state.h +++ b/src/kudu/consensus/raft_consensus_state.h @@ -14,37 +14,21 @@ // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. -#ifndef KUDU_CONSENSUS_RAFT_CONSENSUS_UTIL_H_ -#define KUDU_CONSENSUS_RAFT_CONSENSUS_UTIL_H_ -#include +#pragma once + #include -#include -#include #include -#include -#include #include "kudu/consensus/consensus.h" #include "kudu/consensus/consensus.pb.h" #include "kudu/consensus/consensus_meta.h" -#include "kudu/consensus/log_util.h" #include "kudu/consensus/opid_util.h" -#include "kudu/gutil/port.h" #include "kudu/util/locks.h" #include "kudu/util/status.h" namespace kudu { - -class HostPort; -class ThreadPool; - -namespace rpc { -class Messenger; -} - namespace consensus { -class TimeManager; // Class that coordinates access to the persistent Raft state (independently of Role). // This has a 1-1 relationship with RaftConsensus and is essentially responsible for @@ -244,87 +228,5 @@ class ReplicaState { State state_; }; -// Tracks the pending consensus rounds being managed by a Raft replica (either leader -// or follower). -// -// This class is not thread-safe. -// -// TODO(todd): this class inconsistently uses the term "round", "op", and "transaction". -// We should consolidate to "round". -class PendingRounds { - public: - PendingRounds(std::string log_prefix, scoped_refptr time_manager); - ~PendingRounds(); - - // Set the committed op during startup. This should be done after - // appending any of the pending transactions, and will take care - // of triggering any that are now considered committed. - Status SetInitialCommittedOpId(const OpId& committed_op); - - // Returns the the ConsensusRound with the provided index, if there is any, or NULL - // if there isn't. - scoped_refptr GetPendingOpByIndexOrNull(int64_t index); - - // Add 'round' to the set of rounds waiting to be committed. - Status AddPendingOperation(const scoped_refptr& round); - - // Advances the committed index. - // This is a no-op if the committed index has not changed. - Status AdvanceCommittedIndex(int64_t committed_index); - - // Aborts pending operations after, but not including 'index'. The OpId with 'index' - // will become our new last received id. If there are pending operations with indexes - // higher than 'index' those operations are aborted. - void AbortOpsAfter(int64_t index); - - // Returns true if an operation is in this replica's log, namely: - // - If the op's index is lower than or equal to our committed index - // - If the op id matches an inflight op. - // If an operation with the same index is in our log but the terms - // are different 'term_mismatch' is set to true, it is false otherwise. - bool IsOpCommittedOrPending(const OpId& op_id, bool* term_mismatch); - - // Returns the id of the latest pending transaction (i.e. the one with the - // latest index). This must be called under the lock. - OpId GetLastPendingTransactionOpId() const; - - // Used by replicas to cancel pending transactions. Pending transaction are those - // that have completed prepare/replicate but are waiting on the LEADER's commit - // to complete. This does not cancel transactions being applied. - Status CancelPendingTransactions(); - - // Returns the number of transactions that are currently in the pending state - // i.e. transactions for which Prepare() is done or under way. - int GetNumPendingTxns() const; - - // Returns the watermark below which all operations are known to - // be committed according to consensus. - // TODO(todd): these should probably be removed in favor of using the queue. - int64_t GetCommittedIndex() const; - int64_t GetTermWithLastCommittedOp() const; - - // Checks that 'current' correctly follows 'previous'. Specifically it checks - // that the term is the same or higher and that the index is sequential. - static Status CheckOpInSequence(const OpId& previous, const OpId& current); - - private: - const std::string& LogPrefix() const { return log_prefix_; } - - const std::string log_prefix_; - - // Index=>Round map that manages pending ops, i.e. operations for which we've - // received a replicate message from the leader but have yet to be committed. - // The key is the index of the replicate operation. - typedef std::map > IndexToRoundMap; - IndexToRoundMap pending_txns_; - - // The OpId of the round that was last committed. Initialized to MinimumOpId(). - OpId last_committed_op_id_; - - scoped_refptr time_manager_; -}; - } // namespace consensus } // namespace kudu - -#endif /* KUDU_CONSENSUS_RAFT_CONSENSUS_UTIL_H_ */