Skip to content

Commit

Permalink
[consensus] KUDU-1620: re-resolve consensus peers on network error
Browse files Browse the repository at this point in the history
This plumbs the work from KUDU-75 into the long-lived consensus proxy,
allowing Raft peers to re-resolve on error.

This has the knock-on effect that masters starting up also re-resolve
other masters' address when attempting to fetch UUIDs, since this
process also uses consensus proxies.

Change-Id: Ibd1b68c3c14d7d8f81168e16fe450d2ffcce840b
Reviewed-on: http://gerrit.cloudera.org:8080/17868
Tested-by: Kudu Jenkins
Reviewed-by: Alexey Serbin <[email protected]>
  • Loading branch information
andrwng committed Oct 8, 2021
1 parent 4fe61a2 commit 3884a63
Show file tree
Hide file tree
Showing 8 changed files with 209 additions and 19 deletions.
15 changes: 4 additions & 11 deletions src/kudu/consensus/consensus_peers.cc
Original file line number Diff line number Diff line change
Expand Up @@ -48,9 +48,7 @@
#include "kudu/util/flag_tags.h"
#include "kudu/util/logging.h"
#include "kudu/util/monotime.h"
#include "kudu/util/net/dns_resolver.h"
#include "kudu/util/net/net_util.h"
#include "kudu/util/net/sockaddr.h"
#include "kudu/util/pb_util.h"
#include "kudu/util/status.h"
#include "kudu/util/threadpool.h"
Expand Down Expand Up @@ -321,7 +319,8 @@ void Peer::StartElection() {
// TODO(adar): lack of C++14 move capture makes for ugly code.
RunLeaderElectionResponsePB* resp = resp_uniq.release();
RpcController* controller = controller_uniq.release();
proxy_->StartElectionAsync(req, resp, controller, [resp, controller, peer_uuid]() {
auto s_this = shared_from_this();
proxy_->StartElectionAsync(req, resp, controller, [resp, controller, peer_uuid, s_this]() {
unique_ptr<RunLeaderElectionResponsePB> r(resp);
unique_ptr<RpcController> c(controller);
string error_msg = Substitute("unable to start election on peer $0", peer_uuid);
Expand Down Expand Up @@ -588,14 +587,8 @@ Status CreateConsensusServiceProxyForHost(
const shared_ptr<Messenger>& messenger,
DnsResolver* dns_resolver,
unique_ptr<ConsensusServiceProxy>* new_proxy) {
vector<Sockaddr> addrs;
RETURN_NOT_OK(dns_resolver->ResolveAddresses(hostport, &addrs));
if (addrs.size() > 1) {
LOG(WARNING) << Substitute(
"Peer address '$0' resolves to $1 different addresses. "
"Using $2", hostport.ToString(), addrs.size(), addrs[0].ToString());
}
new_proxy->reset(new ConsensusServiceProxy(messenger, addrs[0], hostport.host()));
new_proxy->reset(new ConsensusServiceProxy(messenger, hostport, dns_resolver));
(*new_proxy)->Init();
return Status::OK();
}

Expand Down
178 changes: 178 additions & 0 deletions src/kudu/integration-tests/dns_alias-itest.cc
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
// under the License.

#include <cstdint>
#include <functional>
#include <memory>
#include <string>
#include <utility>
Expand All @@ -26,15 +27,19 @@

#include "kudu/client/client.h"
#include "kudu/gutil/stl_util.h"
#include "kudu/gutil/strings/join.h"
#include "kudu/gutil/strings/split.h"
#include "kudu/gutil/strings/substitute.h"
#include "kudu/integration-tests/cluster_itest_util.h"
#include "kudu/integration-tests/cluster_verifier.h"
#include "kudu/integration-tests/test_workload.h"
#include "kudu/mini-cluster/external_mini_cluster.h"
#include "kudu/mini-cluster/mini_cluster.h"
#include "kudu/util/metrics.h"
#include "kudu/util/monotime.h"
#include "kudu/util/net/net_util.h"
#include "kudu/util/net/sockaddr.h"
#include "kudu/util/net/socket.h"
#include "kudu/util/status.h"
#include "kudu/util/test_macros.h"
#include "kudu/util/test_util.h"
Expand All @@ -48,6 +53,7 @@ METRIC_DECLARE_entity(server);

using kudu::client::KuduClient;
using kudu::client::KuduTabletServer;
using kudu::cluster::ExternalDaemon;
using kudu::cluster::ExternalMiniCluster;
using kudu::cluster::ExternalMiniClusterOptions;
using std::string;
Expand Down Expand Up @@ -89,6 +95,73 @@ class DnsAliasITest : public KuduTest {
void TearDown() override {
NO_FATALS(cluster_->AssertNoCrashes());
}

// Get the new DNS override string when restarting the last node of the given
// daemon type with the given reserved address.
string GetNewOverridesFlag(ExternalMiniCluster::DaemonType node_type,
const Sockaddr& new_addr) {
int master_end_idx = cluster_->num_masters();
int tserver_end_idx = cluster_->num_tablet_servers();
bool is_master = node_type == ExternalMiniCluster::DaemonType::MASTER;
if (is_master) {
--master_end_idx;
} else {
--tserver_end_idx;
}
vector<string> new_overrides;
new_overrides.reserve(cluster_->num_masters() + cluster_->num_tablet_servers());
for (int i = 0; i < master_end_idx; i++) {
new_overrides.emplace_back(Substitute("$0.$1=$2", kMasterHostPrefix, i,
cluster_->master(i)->bound_rpc_addr().ToString()));
}
for (int i = 0; i < tserver_end_idx; i++) {
new_overrides.emplace_back(
Substitute("$0.$1=$2", kTServerHostPrefix, i,
cluster_->tablet_server(i)->bound_rpc_addr().ToString()));
}
new_overrides.emplace_back(
Substitute("$0.$1=$2", is_master ? kMasterHostPrefix : kTServerHostPrefix,
is_master ? master_end_idx : tserver_end_idx,
new_addr.ToString()));
return JoinStrings(new_overrides, ",");
}

// Adds the appropriate flags for the given daemon to be restarted bound to
// the given address.
void SetUpDaemonForNewAddr(const Sockaddr& new_addr, const string& new_overrides_str,
ExternalDaemon* daemon) {
HostPort new_ip_hp(new_addr.host(), new_addr.port());
daemon->SetRpcBindAddress(new_ip_hp);
daemon->mutable_flags()->emplace_back("--rpc_reuseport=true");
// TODO(awong): more plumbing is needed to allow the server to startup with
// the webserver, so just disable it.
daemon->mutable_flags()->emplace_back("--webserver_enabled=false");
daemon->mutable_flags()->emplace_back(
Substitute("--dns_addr_resolution_override=$0", new_overrides_str));
}

// Sets the flags on all nodes in the cluster, except for the last node of
// the given 'node_type', which is expected to have been restarted with the
// appropriate flags.
void SetFlagsOnRemainingCluster(ExternalMiniCluster::DaemonType node_type,
const string& new_overrides_str) {
int master_end_idx = cluster_->num_masters();
int tserver_end_idx = cluster_->num_tablet_servers();
if (node_type == ExternalMiniCluster::DaemonType::MASTER) {
--master_end_idx;
} else {
--tserver_end_idx;
}
for (int i = 0; i < master_end_idx; i++) {
ASSERT_OK(cluster_->SetFlag(
cluster_->master(i), "dns_addr_resolution_override", new_overrides_str));
}
for (int i = 0; i < tserver_end_idx; i++) {
ASSERT_OK(cluster_->SetFlag(
cluster_->tablet_server(i), "dns_addr_resolution_override", new_overrides_str));
}
}

protected:
unique_ptr<ExternalMiniCluster> cluster_;
client::sp::shared_ptr<KuduClient> client_;
Expand Down Expand Up @@ -180,5 +253,110 @@ TEST_F(DnsAliasWithUnixSocketsITest, TestBasic) {
}
}

// These tests depend on restarted servers being assigned a new IP address. On
// MacOS, tservers are all assigned the same address, so don't run them there.
#if defined(__linux__)

// Regression test for KUDU-1620, wherein consensus proxies don't eventually
// succeed when the address changes but the host/ports stays the same.
TEST_F(DnsAliasITest, Kudu1620) {
TestWorkload w(cluster_.get());
w.set_num_replicas(3);
w.set_num_write_threads(1);
w.Setup();
w.Start();
while (w.rows_inserted() < 10) {
SleepFor(MonoDelta::FromMilliseconds(10));
}
w.StopAndJoin();

// Shut down a tablet server and start one up at a different IP.
auto* tserver = cluster_->tablet_server(cluster_->num_tablet_servers() - 1);
tserver->Shutdown();
unique_ptr<Socket> reserved_socket;
ASSERT_OK(cluster_->ReserveDaemonSocket(cluster::ExternalMiniCluster::DaemonType::TSERVER, 3,
kDefaultBindMode, &reserved_socket,
tserver->bound_rpc_hostport().port()));
Sockaddr new_addr;
ASSERT_OK(reserved_socket->GetSocketAddress(&new_addr));

// Once we start having the other servers communicate with the new tserver,
// ksck should return healthy.
auto new_overrides_str = GetNewOverridesFlag(ExternalMiniCluster::DaemonType::TSERVER, new_addr);
SetUpDaemonForNewAddr(new_addr, new_overrides_str, tserver);
ASSERT_OK(tserver->Restart());

// Running ksck should fail because the existing servers are still trying to
// communicate with the old port.
ClusterVerifier v(cluster_.get());
Status s = v.RunKsck();
ASSERT_TRUE(s.IsRuntimeError()) << s.ToString();

SetFlagsOnRemainingCluster(ExternalMiniCluster::DaemonType::TSERVER, new_overrides_str);

// Our test thread still thinks the old alias is still valid, so our ksck
// should fail.
s = v.RunKsck();
ASSERT_TRUE(s.IsRuntimeError()) << s.ToString();

// Once we set the DNS aliases in the test thread, ksck should succeed.
FLAGS_dns_addr_resolution_override = new_overrides_str;
ASSERT_EVENTUALLY([&] {
ASSERT_OK(v.RunKsck());
});
}

// Master-side regression test for KUDU-1620. Masters instantiate consensus
// proxies to get the UUIDs of its peers. With KUDU-1620 resolved, the proxy
// used should be able to re-resolve and retry upon failure, rather than
// retrying at the same address.
TEST_F(DnsAliasITest, TestMasterReresolveOnStartup) {
const int last_master_idx = cluster_->num_masters() - 1;
auto* master = cluster_->master(last_master_idx);

// Shut down and prepare the node that we're going to give a new address.
master->Shutdown();
unique_ptr<Socket> reserved_socket;
ASSERT_OK(cluster_->ReserveDaemonSocket(cluster::ExternalMiniCluster::DaemonType::MASTER, 3,
kDefaultBindMode, &reserved_socket,
master->bound_rpc_hostport().port()));
Sockaddr new_addr;
ASSERT_OK(reserved_socket->GetSocketAddress(&new_addr));
auto new_overrides_str = GetNewOverridesFlag(ExternalMiniCluster::DaemonType::MASTER, new_addr);
SetUpDaemonForNewAddr(new_addr, new_overrides_str, master);

// Shut down the other masters so we can test what happens when they come
// back up.
for (int i = 0; i < last_master_idx; i++) {
cluster_->master(i)->Shutdown();
}
for (int i = 0; i < last_master_idx; i++) {
ASSERT_OK(cluster_->master(i)->Restart());
}
// Since the rest of the cluster doesn't know about the address, ksck will
// fail.
ClusterVerifier v(cluster_.get());
Status s = v.RunKsck();
ASSERT_TRUE(s.IsRuntimeError()) << s.ToString();

// Even upon setting the DNS overrides on the rest of the nodes, since the
// master hasn't started, we should still see an error.
SetFlagsOnRemainingCluster(ExternalMiniCluster::DaemonType::MASTER, new_overrides_str);
s = v.RunKsck();
ASSERT_TRUE(s.IsRuntimeError()) << s.ToString();
FLAGS_dns_addr_resolution_override = new_overrides_str;
s = v.RunKsck();
ASSERT_TRUE(s.IsRuntimeError()) << s.ToString();

// Upon restarting the node, the other masters should be able to resolve and
// connect to it.
ASSERT_OK(master->Restart());
ASSERT_EVENTUALLY([&] {
ASSERT_OK(v.RunKsck());
});
}

#endif

} // namespace itest
} // namespace kudu
8 changes: 5 additions & 3 deletions src/kudu/integration-tests/tablet_replacement-itest.cc
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,10 @@
#include <glog/logging.h>
#include <gtest/gtest.h>

#include "kudu/common/row_operations.pb.h"
#include "kudu/common/schema.h"
#include "kudu/common/wire_protocol-test-util.h"
#include "kudu/common/wire_protocol.h"
#include "kudu/common/wire_protocol.pb.h"
#include "kudu/consensus/consensus.pb.h"
#include "kudu/consensus/metadata.pb.h"
#include "kudu/gutil/strings/substitute.h"
Expand Down Expand Up @@ -237,8 +237,10 @@ void TabletReplacementITest::TestDontEvictIfRemainingConfigIsUnstable(
consensus::ConsensusStatePB cstate;
ASSERT_OK(GetConsensusState(leader_ts, tablet_id, kTimeout, EXCLUDE_HEALTH_REPORT, &cstate));
SCOPED_TRACE(cstate.DebugString());
ASSERT_FALSE(cstate.has_pending_config())
<< "Leader should not have issued any config change";
// It's possible the leader only registered one replica as failed when
// sending its report to the master, so the master may have requested a
// change config request to add non-voter. Regardless, there should be no
// new committed config since a majority is down.
ASSERT_EQ(cstate_initial.committed_config().opid_index(),
cstate.committed_config().opid_index())
<< "Leader should not have issued any config change";
Expand Down
6 changes: 6 additions & 0 deletions src/kudu/mini-cluster/external_mini_cluster.h
Original file line number Diff line number Diff line change
Expand Up @@ -669,6 +669,12 @@ class ExternalDaemon : public RefCountedThreadSafe<ExternalDaemon> {
// Return the options used to create the daemon.
ExternalDaemonOptions opts() const { return opts_; }

void SetRpcBindAddress(HostPort rpc_hostport) {
DCHECK(!IsProcessAlive());
bound_rpc_ = std::move(rpc_hostport);
opts_.rpc_bind_address = bound_rpc_;
}

protected:
friend class RefCountedThreadSafe<ExternalDaemon>;
virtual ~ExternalDaemon();
Expand Down
5 changes: 3 additions & 2 deletions src/kudu/mini-cluster/mini_cluster.cc
Original file line number Diff line number Diff line change
Expand Up @@ -60,10 +60,11 @@ string MiniCluster::GetBindIpForDaemonWithType(DaemonType type,
Status MiniCluster::ReserveDaemonSocket(DaemonType type,
int index,
BindMode bind_mode,
unique_ptr<Socket>* socket) {
unique_ptr<Socket>* socket,
uint16_t port) {
string ip = GetBindIpForDaemonWithType(type, index, bind_mode);
Sockaddr sock_addr;
RETURN_NOT_OK(sock_addr.ParseString(ip, 0));
RETURN_NOT_OK(sock_addr.ParseString(ip, port));

unique_ptr<Socket> sock(new Socket());
RETURN_NOT_OK(sock->Init(sock_addr.family(), 0));
Expand Down
4 changes: 3 additions & 1 deletion src/kudu/mini-cluster/mini_cluster.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
// under the License.
#pragma once

#include <cstdint>
#include <memory>
#include <string>
#include <vector>
Expand Down Expand Up @@ -140,7 +141,8 @@ class MiniCluster {
static Status ReserveDaemonSocket(DaemonType type,
int index,
BindMode bind_mode,
std::unique_ptr<Socket>* socket);
std::unique_ptr<Socket>* socket,
uint16_t port = 0);

protected:
// Return the IP address that the daemon with the given type and index will
Expand Down
4 changes: 4 additions & 0 deletions src/kudu/rpc/proxy.cc
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
#include "kudu/rpc/rpc_controller.h"
#include "kudu/rpc/user_credentials.h"
#include "kudu/util/kernel_stack_watchdog.h"
#include "kudu/util/logging.h"
#include "kudu/util/net/dns_resolver.h"
#include "kudu/util/net/net_util.h"
#include "kudu/util/net/sockaddr.h"
Expand Down Expand Up @@ -227,6 +228,9 @@ void Proxy::AsyncRequest(const string& method,
// TODO(awong): we should be more specific here -- consider having the RPC
// layer set a flag in the controller that warrants a retry.
if (PREDICT_FALSE(!controller->status().ok())) {
KLOG_EVERY_N_SECS(WARNING, 5)
<< Substitute("Call had error, refreshing address and retrying: $0",
controller->status().ToString());
auto req_payload = controller->ReleaseRequestPayload();
controller->Reset();
RefreshDnsAndEnqueueRequest(method, std::move(req_payload), response, controller, callback);
Expand Down
8 changes: 6 additions & 2 deletions src/kudu/util/net/net_util.cc
Original file line number Diff line number Diff line change
Expand Up @@ -220,8 +220,12 @@ Status HostPort::ResolveAddresses(vector<Sockaddr>* addresses) const {
TRACE_EVENT1("net", "HostPort::ResolveAddresses",
"host", host_);
TRACE_COUNTER_SCOPE_LATENCY_US("dns_us");
if (PREDICT_FALSE(!FLAGS_dns_addr_resolution_override.empty())) {
vector<string> hosts_and_addrs = Split(FLAGS_dns_addr_resolution_override, ",");
// NOTE: we use this instead of the FLAGS_... variant because this flag may be
// changed at runtime in tests and thus needs to be thread-safe.
const auto dns_addr_resolution_override_flag =
google::GetCommandLineFlagInfoOrDie("dns_addr_resolution_override");
if (PREDICT_FALSE(!dns_addr_resolution_override_flag.current_value.empty())) {
vector<string> hosts_and_addrs = Split(dns_addr_resolution_override_flag.current_value, ",");
for (const auto& ha : hosts_and_addrs) {
vector<string> host_and_addr = Split(ha, "=");
if (host_and_addr.size() != 2) {
Expand Down

0 comments on commit 3884a63

Please sign in to comment.