Skip to content

Commit

Permalink
[consensus] Add consensus op-level lag metrics
Browse files Browse the repository at this point in the history
It may be useful to see how far behind the leaders each follower is
with regard to replicating operations. As such, this patch adds a
metric facilitate this.

The new metric counts the number of operations behind the leader each
peer is. It is updated when a follower receives a request from a
leader, and after the peer message queue appends operations.

Change-Id: Ida8e992cc2397ca8d5873e62961a65f618d52c36
Reviewed-on: http://gerrit.cloudera.org:8080/6451
Reviewed-by: Mike Percy <[email protected]>
Tested-by: Kudu Jenkins
  • Loading branch information
andrwng authored and mpercy committed Mar 24, 2017
1 parent 80026a3 commit e4495d6
Show file tree
Hide file tree
Showing 7 changed files with 84 additions and 15 deletions.
6 changes: 5 additions & 1 deletion src/kudu/consensus/consensus.proto
Original file line number Diff line number Diff line change
Expand Up @@ -340,7 +340,7 @@ message ConsensusRequestPB {
// committed during the same request.
repeated ReplicateMsg ops = 6;

// The lowest index that is known to be replicated by all members of
// The highest index that is known to be replicated by all members of
// the configuration.
//
// NOTE: this is not necessarily monotonically increasing. For example, if a node is in
Expand All @@ -354,6 +354,10 @@ message ConsensusRequestPB {
// the "safe time" past the timestamp of the last committed message and answer snapshot scans
// in the present in the absense of writes.
optional fixed64 safe_timestamp = 10;

// The index of the most recent operation appended to the leader.
// Followers can use this to determine roughly how far behind they are from the leader.
optional int64 last_idx_appended_to_leader = 11;
}

message ConsensusResponsePB {
Expand Down
31 changes: 25 additions & 6 deletions src/kudu/consensus/consensus_queue-test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -244,10 +244,15 @@ TEST_F(ConsensusQueueTest, TestGetPagedMessages) {
queue_->SetLeaderMode(kMinimumOpIdIndex, kMinimumTerm, BuildRaftConfigPBForTests(2));

// helper to estimate request size so that we can set the max batch size appropriately
// Note: This estimator must be precise, as it is used to set the max batch size.
// In order for the estimate to be correct, all members of the request protobuf must be set.
// If not all fields are set, this will set the batch size to be too small to hold the expected
// number of ops.
ConsensusRequestPB page_size_estimator;
page_size_estimator.set_caller_term(14);
page_size_estimator.set_committed_index(0);
page_size_estimator.set_all_replicated_index(0);
page_size_estimator.set_last_idx_appended_to_leader(0);
page_size_estimator.mutable_preceding_id()->CopyFrom(MinimumOpId());

// We're going to add 100 messages to the queue so we make each page fetch 9 of those,
Expand Down Expand Up @@ -625,7 +630,8 @@ TEST_F(ConsensusQueueTest, TestQueueHandlesOperationOverwriting) {
TEST_F(ConsensusQueueTest, TestQueueMovesWatermarksBackward) {
queue_->Init(MinimumOpId(), MinimumOpId());
queue_->SetNonLeaderMode();
// Append a bunch of messages.
// Append a bunch of messages and update as if they were also appeneded to the leader.
queue_->UpdateLastIndexAppendedToLeader(10);
AppendReplicateMessagesToQueue(queue_.get(), clock_, 1, 10);
log_->WaitUntilAllFlushed();

Expand All @@ -639,6 +645,9 @@ TEST_F(ConsensusQueueTest, TestQueueMovesWatermarksBackward) {
// Wait for the operation to be in the log.
ASSERT_OK(synch.Wait());

// Having appended index 5, the follower is still 5 ops behind the leader.
ASSERT_EQ(5, queue_->metrics_.num_ops_behind_leader->value());

// Without the fix the following append would trigger a check failure
// in log cache.
synch.Reset();
Expand All @@ -650,9 +659,12 @@ TEST_F(ConsensusQueueTest, TestQueueMovesWatermarksBackward) {
// Wait for the operation to be in the log.
ASSERT_OK(synch.Wait());

// Having appended index 6, the follower is still 4 ops behind the leader.
ASSERT_EQ(4, queue_->metrics_.num_ops_behind_leader->value());

// The replication watermark on a follower should not advance by virtue of appending
// entries to the log.
ASSERT_EQ(queue_->GetAllReplicatedIndex(), 0);
ASSERT_EQ(0, queue_->GetAllReplicatedIndex());
}

// Tests that we're advancing the watermarks properly and only when the peer
Expand Down Expand Up @@ -821,21 +833,28 @@ TEST_F(ConsensusQueueTest, TestFollowerCommittedIndexAndMetrics) {
queue_->Init(MinimumOpId(), MinimumOpId());
queue_->SetNonLeaderMode();

// Emulate a follower sending a request to replicate 10 messages.
queue_->UpdateLastIndexAppendedToLeader(10);
AppendReplicateMessagesToQueue(queue_.get(), clock_, 1, 10);
WaitForLocalPeerToAckIndex(10);

// The committed_index should be MinimumOpId() since UpdateFollowerCommittedIndex
// has not been called.
ASSERT_EQ(queue_->GetCommittedIndex(), 0);
ASSERT_EQ(0, queue_->GetCommittedIndex());

// Update the committed index. In real life, this would be done by the consensus
// implementation when it receives an updated committed index from the leader.
queue_->UpdateFollowerWatermarks(10, 10);
ASSERT_EQ(queue_->GetCommittedIndex(), 10);
ASSERT_EQ(10, queue_->GetCommittedIndex());

// Check the metrics have the right values based on the updated committed index.
ASSERT_EQ(queue_->metrics_.num_majority_done_ops->value(), 0);
ASSERT_EQ(queue_->metrics_.num_in_progress_ops->value(), 0);
ASSERT_EQ(0, queue_->metrics_.num_majority_done_ops->value());
ASSERT_EQ(0, queue_->metrics_.num_in_progress_ops->value());
ASSERT_EQ(0, queue_->metrics_.num_ops_behind_leader->value());

// Emulate the leader appending up to index 15. The num_ops_behind_leader should jump to 5.
queue_->UpdateLastIndexAppendedToLeader(15);
ASSERT_EQ(5, queue_->metrics_.num_ops_behind_leader->value());
}

} // namespace consensus
Expand Down
30 changes: 26 additions & 4 deletions src/kudu/consensus/consensus_queue.cc
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,9 @@ METRIC_DEFINE_gauge_int64(tablet, in_progress_ops, "Operations in Progress",
MetricUnit::kOperations,
"Number of operations in the peer's queue ack'd by a minority of "
"peers.");
METRIC_DEFINE_gauge_int64(tablet, ops_behind_leader, "Operations Behind Leader",
MetricUnit::kOperations,
"Number of operations this server believes it is behind the leader.");

std::string PeerMessageQueue::TrackedPeer::ToString() const {
return Substitute("Peer: $0, Is new: $1, Last received: $2, Next index: $3, "
Expand All @@ -98,7 +101,8 @@ std::string PeerMessageQueue::TrackedPeer::ToString() const {
x.Instantiate(metric_entity, 0)
PeerMessageQueue::Metrics::Metrics(const scoped_refptr<MetricEntity>& metric_entity)
: num_majority_done_ops(INSTANTIATE_METRIC(METRIC_majority_done_ops)),
num_in_progress_ops(INSTANTIATE_METRIC(METRIC_in_progress_ops)) {
num_in_progress_ops(INSTANTIATE_METRIC(METRIC_in_progress_ops)),
num_ops_behind_leader(INSTANTIATE_METRIC(METRIC_ops_behind_leader)) {
}
#undef INSTANTIATE_METRIC

Expand All @@ -119,6 +123,7 @@ PeerMessageQueue::PeerMessageQueue(const scoped_refptr<MetricEntity>& metric_ent
queue_state_.committed_index = 0;
queue_state_.all_replicated_index = 0;
queue_state_.majority_replicated_index = 0;
queue_state_.last_idx_appended_to_leader = 0;
queue_state_.state = kQueueConstructed;
queue_state_.mode = NON_LEADER;
queue_state_.majority_size_ = -1;
Expand Down Expand Up @@ -173,6 +178,9 @@ void PeerMessageQueue::SetNonLeaderMode() {
queue_state_.active_config.reset();
queue_state_.mode = NON_LEADER;
queue_state_.majority_size_ = -1;

// Update this when stepping down, since it doesn't get tracked as LEADER.
queue_state_.last_idx_appended_to_leader = queue_state_.last_appended.index();
LOG_WITH_PREFIX_UNLOCKED(INFO) << "Queue going to NON_LEADER mode. State: "
<< queue_state_.ToString();
time_manager_->SetNonLeaderMode();
Expand Down Expand Up @@ -367,6 +375,7 @@ Status PeerMessageQueue::RequestForPeer(const string& uuid,

request->set_committed_index(queue_state_.committed_index);
request->set_all_replicated_index(queue_state_.all_replicated_index);
request->set_last_idx_appended_to_leader(queue_state_.last_appended.index());
request->set_caller_term(current_term);
unreachable_time = MonoTime::Now() - peer.last_successful_communication_time;
}
Expand Down Expand Up @@ -590,6 +599,17 @@ void PeerMessageQueue::UpdateFollowerWatermarks(int64_t committed_index,
UpdateMetrics();
}

void PeerMessageQueue::UpdateLagMetrics() {
metrics_.num_ops_behind_leader->set_value(queue_state_.mode == LEADER ? 0 :
queue_state_.last_idx_appended_to_leader - queue_state_.last_appended.index());
}

void PeerMessageQueue::UpdateLastIndexAppendedToLeader(int64_t last_idx_appended_to_leader) {
DCHECK_EQ(queue_state_.mode, NON_LEADER);
queue_state_.last_idx_appended_to_leader = last_idx_appended_to_leader;
UpdateLagMetrics();
}

void PeerMessageQueue::NotifyPeerIsResponsiveDespiteError(const std::string& peer_uuid) {
std::lock_guard<simple_spinlock> l(queue_lock_);
TrackedPeer* peer = FindPtrOrNull(peers_map_, peer_uuid);
Expand Down Expand Up @@ -852,6 +872,8 @@ void PeerMessageQueue::UpdateMetrics() {
: 0);
metrics_.num_in_progress_ops->set_value(
queue_state_.last_appended.index() - queue_state_.committed_index);

UpdateLagMetrics();
}

void PeerMessageQueue::DumpToStrings(vector<string>* lines) const {
Expand Down Expand Up @@ -1024,10 +1046,10 @@ string PeerMessageQueue::LogPrefixUnlocked() const {

string PeerMessageQueue::QueueState::ToString() const {
return Substitute("All replicated index: $0, Majority replicated index: $1, "
"Committed index: $2, Last appended: $3, Current term: $4, Majority size: $5, "
"State: $6, Mode: $7$8",
"Committed index: $2, Last appended: $3, Last appended by leader: $4, Current term: $5, "
"Majority size: $6, State: $7, Mode: $8$9",
all_replicated_index, majority_replicated_index,
committed_index, OpIdToString(last_appended), current_term,
committed_index, OpIdToString(last_appended), last_idx_appended_to_leader, current_term,
majority_size_, state, (mode == LEADER ? "LEADER" : "NON_LEADER"),
active_config ? ", active raft config: " + SecureShortDebugString(*active_config) : "");
}
Expand Down
19 changes: 17 additions & 2 deletions src/kudu/consensus/consensus_queue.h
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,6 @@ extern const char kConsensusQueueParentTrackerId[];
// This also takes care of pushing requests to peers as new operations are
// added, and notifying RaftConsensus when the commit index advances.
//
// This class is used only on the LEADER side.
//
// TODO(todd): Right now this class is able to track one outstanding operation
// per peer. If we want to have more than one outstanding RPC we need to
// modify it.
Expand Down Expand Up @@ -245,6 +243,14 @@ class PeerMessageQueue {
void UpdateFollowerWatermarks(int64_t committed_index,
int64_t all_replicated_index);

// Update the metric that measures how many ops behind the leader the local
// replica believes it is (0 if leader).
void UpdateLagMetrics();

// Updates the last op appended to the leader and the corresponding lag metric.
// This should not be called by a leader.
void UpdateLastIndexAppendedToLeader(int64_t last_idx_appended_to_leader);

// Closes the queue, peers are still allowed to call UntrackPeer() and
// ResponseFromPeer() but no additional peers can be tracked or messages
// queued.
Expand Down Expand Up @@ -286,6 +292,9 @@ class PeerMessageQueue {
scoped_refptr<AtomicGauge<int64_t> > num_majority_done_ops;
// Keeps track of the number of ops. that are still in progress (IsDone() returns false).
scoped_refptr<AtomicGauge<int64_t> > num_in_progress_ops;
// Keeps track of the number of ops. behind the leader the peer is, measured as the difference
// between the latest appended op index on this peer versus on the leader (0 if leader).
scoped_refptr<AtomicGauge<int64_t> > num_ops_behind_leader;

explicit Metrics(const scoped_refptr<MetricEntity>& metric_entity);
};
Expand All @@ -294,7 +303,9 @@ class PeerMessageQueue {

private:
FRIEND_TEST(ConsensusQueueTest, TestQueueAdvancesCommittedIndex);
FRIEND_TEST(ConsensusQueueTest, TestQueueMovesWatermarksBackward);
FRIEND_TEST(ConsensusQueueTest, TestFollowerCommittedIndexAndMetrics);
FRIEND_TEST(RaftConsensusQuorumTest, TestReplicasEnforceTheLogMatchingProperty);

// Mode specifies how the queue currently behaves:
// LEADER - Means the queue tracks remote peers and replicates whatever messages
Expand Down Expand Up @@ -326,6 +337,10 @@ class PeerMessageQueue {
// The index of the last operation to be considered committed.
int64_t committed_index;

// The index of the last operation appended to the leader. A follower will use this to
// determine how many ops behind the leader it is, as a soft metric for follower lag.
int64_t last_idx_appended_to_leader;

// The opid of the last operation appended to the queue.
OpId last_appended;

Expand Down
4 changes: 4 additions & 0 deletions src/kudu/consensus/raft_consensus.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1149,6 +1149,10 @@ Status RaftConsensus::UpdateReplica(const ConsensusRequestPB* request,
// sanity check.
RETURN_NOT_OK(SnoozeFailureDetectorUnlocked());

// We update the lag metrics here in addition to after appending to the queue so the
// metrics get updated even when the operation is rejected.
queue_->UpdateLastIndexAppendedToLeader(request->last_idx_appended_to_leader());

// Also prohibit voting for anyone for the minimum election timeout.
withhold_votes_until_ = MonoTime::Now() + MinimumElectionTimeout();

Expand Down
1 change: 1 addition & 0 deletions src/kudu/consensus/raft_consensus.h
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,7 @@ class RaftConsensus : public Consensus,
private:
friend class ReplicaState;
friend class RaftConsensusQuorumTest;
FRIEND_TEST(RaftConsensusQuorumTest, TestReplicasEnforceTheLogMatchingProperty);

// Control whether printing of log messages should be done for a particular
// function call.
Expand Down
8 changes: 6 additions & 2 deletions src/kudu/consensus/raft_consensus_quorum-test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -957,24 +957,28 @@ TEST_F(RaftConsensusQuorumTest, TestReplicasEnforceTheLogMatchingProperty) {
scoped_refptr<RaftConsensus> follower;
CHECK_OK(peers_->GetPeerByIdx(0, &follower));


req.set_caller_uuid(leader->peer_uuid());
req.set_caller_term(last_op_id.term());
req.mutable_preceding_id()->CopyFrom(last_op_id);
req.set_committed_index(last_op_id.index());
req.set_all_replicated_index(0);

// Send a request with the next index.
ReplicateMsg* replicate = req.add_ops();
replicate->set_timestamp(clock_->Now().ToUint64());
OpId* id = replicate->mutable_id();
id->set_term(last_op_id.term());
id->set_index(last_op_id.index() + 1);
replicate->set_op_type(NO_OP);

// Since the req adds the next op, the leader must have also appended it.
req.set_last_idx_appended_to_leader(id->index());

// Appending this message to peer0 should work and update
// its 'last_received' to 'id'.
ASSERT_OK(follower->Update(&req, &resp));
ASSERT_TRUE(OpIdEquals(resp.status().last_received(), *id));
ASSERT_EQ(0, follower->queue_->metrics_.num_ops_behind_leader->value());

// Now skip one message in the same term. The replica should
// complain with the right error message.
Expand All @@ -985,7 +989,7 @@ TEST_F(RaftConsensusQuorumTest, TestReplicasEnforceTheLogMatchingProperty) {
ASSERT_OK(follower->Update(&req, &resp));
ASSERT_TRUE(resp.has_status());
ASSERT_TRUE(resp.status().has_error());
ASSERT_EQ(resp.status().error().code(), ConsensusErrorPB::PRECEDING_ENTRY_DIDNT_MATCH);
ASSERT_EQ(ConsensusErrorPB::PRECEDING_ENTRY_DIDNT_MATCH, resp.status().error().code());
ASSERT_STR_CONTAINS(resp.status().error().status().message(),
"Log matching property violated");
}
Expand Down

0 comments on commit e4495d6

Please sign in to comment.