Skip to content

Commit

Permalink
[client] Fix a kudu c++ client bug when using replica_selection policy
Browse files Browse the repository at this point in the history
At c++ client side, the replica_selection policy (LEADER_ONLY and
CLOSEST_REPLICA) is not working. Eg command: 'kudu perf table_scan
$master_list $table -columns=id,name -num_threads=4 -nofill_cache
-replica_selection="LEADER"', but the real replica_selection policy
is CLOSEST_REPLICA.

The patch fixes the bug in client library and adds unit tests.

Change-Id: I413f99b6a0b6082c5453358b8333913e4c6264c2
Reviewed-on: http://gerrit.cloudera.org:8080/18877
Reviewed-by: Yuqi Du <[email protected]>
Tested-by: Kudu Jenkins
Reviewed-by: Yifan Zhang <[email protected]>
  • Loading branch information
shenxingwuying authored and zhangyifan27 committed Nov 19, 2022
1 parent c1ff2fd commit 43ba101
Show file tree
Hide file tree
Showing 5 changed files with 176 additions and 4 deletions.
1 change: 1 addition & 0 deletions src/kudu/client/client.h
Original file line number Diff line number Diff line change
Expand Up @@ -1063,6 +1063,7 @@ class KUDU_EXPORT KuduClient : public sp::enable_shared_from_this<KuduClient> {
friend class tools::LeaderMasterProxy;
friend class tools::RemoteKsckCluster;
friend class tools::TableLister;
friend class ScanTokenTest;

FRIEND_TEST(kudu::ClientStressTest, TestUniqueClientIds);
FRIEND_TEST(kudu::MetaCacheLookupStressTest, PerfSynthetic);
Expand Down
30 changes: 30 additions & 0 deletions src/kudu/client/scan_token-internal.cc
Original file line number Diff line number Diff line change
Expand Up @@ -237,6 +237,23 @@ Status KuduScanToken::Data::PBIntoScanner(KuduClient* client,
configuration->AddConjunctPredicate(std::move(*predicate));
}

switch (message.replica_selection()) {
case kudu::ReplicaSelection::LEADER_ONLY:
RETURN_NOT_OK_LOG(configuration->SetSelection(KuduClient::ReplicaSelection::LEADER_ONLY),
ERROR, "set replica selection LEADER_ONLY failed");
break;
case kudu::ReplicaSelection::CLOSEST_REPLICA:
RETURN_NOT_OK_LOG(configuration->SetSelection(KuduClient::ReplicaSelection::CLOSEST_REPLICA),
ERROR, "set replica selection CLOSEST_REPLICA failed");
break;
case kudu::ReplicaSelection::FIRST_REPLICA:
RETURN_NOT_OK_LOG(configuration->SetSelection(KuduClient::ReplicaSelection::FIRST_REPLICA),
ERROR, "set replica selection FIRST_REPLICA failed");
break;
default:
return Status::NotSupported("unsupported ReplicaSelection policy");
}

#pragma GCC diagnostic push
#pragma GCC diagnostic ignored "-Wdeprecated-declarations"
if (message.has_lower_bound_primary_key()) {
Expand Down Expand Up @@ -394,6 +411,19 @@ Status KuduScanTokenBuilder::Data::Build(vector<KuduScanToken*>* tokens) {
ColumnPredicateToPB(predicate_pair.second, pb.add_column_predicates());
}

switch (configuration_.selection_) {
case KuduClient::ReplicaSelection::LEADER_ONLY:
pb.set_replica_selection(kudu::ReplicaSelection::LEADER_ONLY);
break;
case KuduClient::ReplicaSelection::CLOSEST_REPLICA:
pb.set_replica_selection(kudu::ReplicaSelection::CLOSEST_REPLICA);
break;
case KuduClient::ReplicaSelection::FIRST_REPLICA:
pb.set_replica_selection(kudu::ReplicaSelection::FIRST_REPLICA);
break;
default:
return Status::InvalidArgument("replica_selection is invalid.");
}
const KuduScanner::ReadMode read_mode = configuration_.read_mode();
switch (read_mode) {
case KuduScanner::READ_LATEST:
Expand Down
140 changes: 137 additions & 3 deletions src/kudu/client/scan_token-test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,12 @@
#include <atomic>
#include <cstddef>
#include <cstdint>
#include <functional>
#include <map>
#include <memory>
#include <string>
#include <thread>
#include <type_traits>
#include <unordered_set>
#include <utility>
#include <vector>
Expand All @@ -29,9 +32,11 @@
#include <glog/logging.h>
#include <gtest/gtest.h>

#include "kudu/client/client-internal.h"
#include "kudu/client/client-test-util.h"
#include "kudu/client/client.h"
#include "kudu/client/client.pb.h"
#include "kudu/client/meta_cache.h"
#include "kudu/client/scan_batch.h"
#include "kudu/client/scan_configuration.h"
#include "kudu/client/scan_predicate.h"
Expand All @@ -43,18 +48,22 @@
#include "kudu/common/common.pb.h"
#include "kudu/common/partial_row.h"
#include "kudu/common/wire_protocol.pb.h"
#include "kudu/gutil/map-util.h"
#include "kudu/gutil/ref_counted.h"
#include "kudu/gutil/stl_util.h"
#include "kudu/gutil/strings/substitute.h"
#include "kudu/integration-tests/test_workload.h"
#include "kudu/master/catalog_manager.h"
#include "kudu/master/master.h"
#include "kudu/master/mini_master.h"
#include "kudu/mini-cluster/internal_mini_cluster.h"
#include "kudu/tablet/tablet_replica.h"
#include "kudu/tserver/mini_tablet_server.h"
#include "kudu/tserver/scanners.h"
#include "kudu/tserver/tablet_server.h"
#include "kudu/tserver/ts_tablet_manager.h"
#include "kudu/tserver/tserver.pb.h"
#include "kudu/util/async_util.h"
#include "kudu/util/metrics.h"
#include "kudu/util/monotime.h"
#include "kudu/util/net/sockaddr.h"
Expand All @@ -76,6 +85,7 @@ using kudu::master::TabletInfo;
using kudu::tablet::TabletReplica;
using kudu::tserver::MiniTabletServer;
using std::atomic;
using std::map;
using std::string;
using std::thread;
using std::unique_ptr;
Expand All @@ -85,6 +95,9 @@ using std::vector;
namespace kudu {
namespace client {

static constexpr const int32_t kRecordCount = 1000;
static constexpr const int32_t kBucketNum = 10;

class ScanTokenTest : public KuduTest {
protected:
void SetUp() override {
Expand Down Expand Up @@ -135,13 +148,16 @@ class ScanTokenTest : public KuduTest {

// Similar to CountRows() above, but use the specified client handle
// and run all the scanners sequentially, one by one.
static Status CountRowsSeq(KuduClient* client,
const vector<KuduScanToken*>& tokens,
int64_t* row_count) {
static Status CountRowsSeq(
KuduClient* client,
const vector<KuduScanToken*>& tokens,
int64_t* row_count,
KuduClient::ReplicaSelection replica_selection = KuduClient::ReplicaSelection::LEADER_ONLY) {
int64_t count = 0;
for (auto* t : tokens) {
unique_ptr<KuduScanner> scanner;
RETURN_NOT_OK(IntoUniqueScanner(client, *t, &scanner));
RETURN_NOT_OK(scanner->SetSelection(replica_selection));
RETURN_NOT_OK(scanner->Open());
while (scanner->HasMoreRows()) {
KuduScanBatch batch;
Expand Down Expand Up @@ -246,6 +262,79 @@ class ScanTokenTest : public KuduTest {
return Status::OK();
}

void PrepareEnvForTestReplicaSelection(shared_ptr<KuduTable>* table, vector<string>* tablet_ids) {
constexpr const char* const kTableName = "replica_selection";
// Set up the mini cluster
InternalMiniClusterOptions options;
options.num_tablet_servers = 3;
cluster_.reset(new InternalMiniCluster(env_, options));
ASSERT_OK(cluster_->Start());
constexpr int kReplicationFactor = 3;

// Populate the table with data to scan later.
{
// Create a table with 10 partitions, 3 replication factor.
// and write some rows to make sure all partitions have data.
TestWorkload workload(cluster_.get(), TestWorkload::PartitioningType::HASH);
workload.set_table_name(kTableName);
workload.set_num_tablets(kBucketNum);
workload.set_num_replicas(kReplicationFactor);
workload.set_num_write_threads(10);
workload.set_write_batch_size(128);
workload.Setup();
workload.Start();
ASSERT_EVENTUALLY([&]() { ASSERT_GE(workload.rows_inserted(), kRecordCount); });
workload.StopAndJoin();
}
ASSERT_OK(cluster_->CreateClient(nullptr, &client_));
ASSERT_OK(client_->OpenTable(kTableName, table));
ASSERT_NE(nullptr, table->get());

vector<client::KuduScanToken*> tokens;
ElementDeleter deleter(&tokens);
client::KuduScanTokenBuilder builder(table->get());
ASSERT_OK(builder.Build(&tokens));

tablet_ids->clear();
tablet_ids->reserve(tokens.size());
for (const auto* token : tokens) {
tablet_ids->emplace_back(token->tablet().id());
}
}

void GetSelectedReplicaCount(const vector<string>& tablet_ids,
KuduClient::ReplicaSelection replication_selection,
map<string, int32_t>* replica_num_by_ts_uuid) {
for (const auto& tablet_id : tablet_ids) {
scoped_refptr<internal::RemoteTablet> rt;
Synchronizer sync;
client_->data_->meta_cache_->LookupTabletById(
client_.get(), tablet_id, MonoTime::Max(), &rt, sync.AsStatusCallback());
sync.Wait();
vector<internal::RemoteTabletServer*> tservers;
rt->GetRemoteTabletServers(&tservers);
ASSERT_EQ(3, tservers.size());

vector<internal::RemoteTabletServer*> candidates;
internal::RemoteTabletServer* tserver_picked;
ASSERT_OK(client_->data_->GetTabletServer(
client_.get(), rt, replication_selection, {}, &candidates, &tserver_picked));
auto& count = LookupOrInsert(replica_num_by_ts_uuid, tserver_picked->permanent_uuid(), 0);
count++;
}
}

void GetScannerCount(map<string, int32_t>* scanner_count_by_ts_uuid) {
scanner_count_by_ts_uuid->clear();
for (int i = 0; i < cluster_->num_tablet_servers(); i++) {
vector<tserver::ScanDescriptor> scanners =
cluster_->mini_tablet_server(i)->server()->scanner_manager()->ListScans();
scanner_count_by_ts_uuid->insert(
{cluster_->mini_tablet_server(i)->server()->instance_pb().permanent_uuid(),
static_cast<int32_t>(scanners.size())});
}
}

shared_ptr<KuduClient> client_;
unique_ptr<InternalMiniCluster> cluster_;
};
Expand Down Expand Up @@ -1475,5 +1564,50 @@ TEST_F(ScanTokenTest, ToggleFaultToleranceForScanConfiguration) {
ASSERT_EQ(KuduScanner::READ_YOUR_WRITES, sc.read_mode());
}

class ReplicaSelectionTest : public ScanTokenTest,
public ::testing::WithParamInterface<KuduClient::ReplicaSelection> {};

INSTANTIATE_TEST_SUITE_P(PickServer,
ReplicaSelectionTest,
::testing::Values(KuduClient::ReplicaSelection::LEADER_ONLY,
KuduClient::ReplicaSelection::CLOSEST_REPLICA,
KuduClient::ReplicaSelection::FIRST_REPLICA));

// TODO(duyuqi)
// Using location assignment to test replica selection for ScanToken, refer to:
// src/kudu/integration-tests/location_assignment-itest.cc#L76-L150
//
// This unit test checks whether LEADER_ONLY/CLOSEST_REPLICA/FIRST_REPLICA replica selection works
// as expected.
TEST_P(ReplicaSelectionTest, ReplicaSelection) {
shared_ptr<KuduTable> table;
map<string, int32_t> replica_num_by_ts_uuid;
vector<string> tablet_ids;
auto replica_selection = GetParam();
PrepareEnvForTestReplicaSelection(&table, &tablet_ids);
GetSelectedReplicaCount(tablet_ids, replica_selection, &replica_num_by_ts_uuid);

map<string, int32_t> scanner_count_by_ts_uuid;
GetScannerCount(&scanner_count_by_ts_uuid);
vector<KuduScanToken*> tokens;
ElementDeleter deleter(&tokens);
// Scan all the partitions by specific replica selection.
// Launch scan requests.
ASSERT_OK(KuduScanTokenBuilder(table.get()).Build(&tokens));
int64_t row_count = 0;
CountRowsSeq(client_.get(), tokens, &row_count, replica_selection);

int result = 0;
map<string, int32_t> now_scanner_count_by_ts_uuid;
GetScannerCount(&now_scanner_count_by_ts_uuid);
for (auto& ts_uuid_scanner_count : now_scanner_count_by_ts_uuid) {
const auto& permanent_uuid = ts_uuid_scanner_count.first;
ASSERT_EQ(replica_num_by_ts_uuid[permanent_uuid],
(ts_uuid_scanner_count.second - scanner_count_by_ts_uuid[permanent_uuid]));
result += replica_num_by_ts_uuid[permanent_uuid];
}
ASSERT_EQ(kBucketNum, result);
}

} // namespace client
} // namespace kudu
2 changes: 2 additions & 0 deletions src/kudu/common/common.proto
Original file line number Diff line number Diff line change
Expand Up @@ -312,6 +312,8 @@ enum ReplicaSelection {
// - Replicas whose tablet server has the same location as the client
// - All other replicas
CLOSEST_REPLICA = 2;
// Select the first replica in the list.
FIRST_REPLICA = 3;
}

// The serialized format of a Kudu table partition schema.
Expand Down
7 changes: 6 additions & 1 deletion src/kudu/tools/table_scanner.cc
Original file line number Diff line number Diff line change
Expand Up @@ -170,11 +170,14 @@ bool ValidateWriteType(const char* flag_name,
}

constexpr const char* const kReplicaSelectionClosest = "closest";
constexpr const char* const kReplicaSelectionFirst = "first";
constexpr const char* const kReplicaSelectionLeader = "leader";

bool ValidateReplicaSelection(const char* flag_name,
const string& flag_value) {
static const vector<string> kReplicaSelections = {
kReplicaSelectionClosest,
kReplicaSelectionFirst,
kReplicaSelectionLeader,
};
return IsFlagValueAcceptable(flag_name, flag_value, kReplicaSelections);
Expand Down Expand Up @@ -637,7 +640,7 @@ void TableScanner::SetReadMode(KuduScanner::ReadMode mode) {
}

Status TableScanner::SetReplicaSelection(const string& selection_str) {
KuduClient::ReplicaSelection selection;
KuduClient::ReplicaSelection selection = KuduClient::ReplicaSelection::CLOSEST_REPLICA;
RETURN_NOT_OK(ParseReplicaSelection(selection_str, &selection));
replica_selection_ = selection;
return Status::OK();
Expand Down Expand Up @@ -799,6 +802,8 @@ Status TableScanner::ParseReplicaSelection(
*selection = KuduClient::ReplicaSelection::CLOSEST_REPLICA;
} else if (iequals(kReplicaSelectionLeader, selection_str)) {
*selection = KuduClient::ReplicaSelection::LEADER_ONLY;
} else if (iequals(kReplicaSelectionFirst, selection_str)) {
*selection = KuduClient::ReplicaSelection::FIRST_REPLICA;
} else {
return Status::InvalidArgument("invalid replica selection", selection_str);
}
Expand Down

0 comments on commit 43ba101

Please sign in to comment.