Skip to content

Commit

Permalink
[tool] update type of loadgen's --num_rows_per_thread flag
Browse files Browse the repository at this point in the history
This patch updates the type of the --num_rows_per_thread command-line
flag, changing it from unsigned.  With that, the value of 0 is not a
special value for this flag anymore.  Instead, any negative value
for this flag now means 'as many rows per thread as possible'.

With this change, it's now possible to use the 'kudu perf loadgen'
CLI tool to create empty tables, which is useful in various test
scenarios.

In addition, this patch contains a small clean-up on using
the --keep_auto_table loadgen's flag in some test scenarios.

Change-Id: I2712ac7678c9cfd9359629f11df3a86dd727997d
Reviewed-on: http://gerrit.cloudera.org:8080/13373
Tested-by: Kudu Jenkins
Reviewed-by: Adar Dembo <[email protected]>
  • Loading branch information
alexeyserbin committed May 22, 2019
1 parent 8073749 commit 53ddc88
Show file tree
Hide file tree
Showing 3 changed files with 145 additions and 102 deletions.
48 changes: 43 additions & 5 deletions src/kudu/tools/kudu-tool-test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -608,7 +608,9 @@ class ToolTest : public KuduTest {
protected:
void RunLoadgen(int num_tservers = 1,
const vector<string>& tool_args = {},
const string& table_name = "");
const string& table_name = "",
string* tool_stdout = nullptr,
string* tool_stderr = nullptr);
void StartExternalMiniCluster(ExternalMiniClusterOptions opts = {});
void StartMiniCluster(InternalMiniClusterOptions opts = {});
unique_ptr<ExternalMiniCluster> cluster_;
Expand Down Expand Up @@ -2025,7 +2027,9 @@ TEST_F(ToolTest, TestLocalReplicaOps) {
// and then run 'kudu perf loadgen ...' utility against it.
void ToolTest::RunLoadgen(int num_tservers,
const vector<string>& tool_args,
const string& table_name) {
const string& table_name,
string* tool_stdout,
string* tool_stderr) {
ExternalMiniClusterOptions opts;
opts.num_tablet_servers = num_tservers;
NO_FATALS(StartExternalMiniCluster(std::move(opts)));
Expand Down Expand Up @@ -2075,14 +2079,48 @@ void ToolTest::RunLoadgen(int num_tservers,
args.push_back(Substitute("-table_name=$0", table_name));
}
copy(tool_args.begin(), tool_args.end(), back_inserter(args));
ASSERT_OK(RunKuduTool(args));
ASSERT_OK(RunKuduTool(args, tool_stdout, tool_stderr));
}

// Run the loadgen benchmark with all optional parameters set to defaults.
TEST_F(ToolTest, TestLoadgenDefaultParameters) {
NO_FATALS(RunLoadgen());
}

// Verify it's possible to run loadgen to create a table, no records inserted.
// Also verify that --num_rows_per_thread=0 in case of existing table
// results in no rows inserted.
TEST_F(ToolTest, TestLoadgenZeroRowsPerThread) {
// Run the tool with zer rows per thread against an existing table.
// The existing table should get no rows inserted.
{
string out;
NO_FATALS(RunLoadgen(1, { "--num_rows_per_thread=0", "--run_scan" },
"an_empty_test_table", &out));
ASSERT_STR_MATCHES(out, "expected rows: 0");
ASSERT_STR_MATCHES(out, "actual rows : 0");
}

// Request to run with zero rows per thread and with various numbers
// of generator threads. The latter parameter in such a configuration is
// irrelevant, and the result table should be empty anyways.
for (auto num_threads : { 1, 2, 10, 100 }) {
SCOPED_TRACE(Substitute("num_threads=$0", num_threads));
const vector<string> args = {
"perf",
"loadgen",
cluster_->master()->bound_rpc_addr().ToString(),
Substitute("--num_threads=$0", num_threads),
"--num_rows_per_thread=0",
"--run_scan",
};
string out;
ASSERT_OK(RunKuduTool(args, &out));
ASSERT_STR_MATCHES(out, "expected rows: 0");
ASSERT_STR_MATCHES(out, "actual rows : 0");
}
}

// Run the loadgen benchmark in AUTO_FLUSH_BACKGROUND mode, sequential values.
TEST_F(ToolTest, TestLoadgenAutoFlushBackgroundSequential) {
NO_FATALS(RunLoadgen(3,
Expand Down Expand Up @@ -2277,14 +2315,14 @@ TEST_F(ToolTest, TestNonRandomWorkloadLoadgen) {

TEST_F(ToolTest, TestPerfTableScan) {
const string& kTableName = "perf.table_scan";
NO_FATALS(RunLoadgen(1, { "--keep_auto_table=true", "--run_scan" }, kTableName));
NO_FATALS(RunLoadgen(1, { "--run_scan" }, kTableName));
NO_FATALS(RunScanTableCheck(kTableName, "", 1, 2000, {}, "perf table_scan"));
}

TEST_F(ToolTest, TestPerfTabletScan) {
// Create a table.
const string& kTableName = "perf.tablet_scan";
NO_FATALS(RunLoadgen(1, { "--keep_auto_table=true" }, kTableName));
NO_FATALS(RunLoadgen(1, {}, kTableName));

// Get the list of tablets.
vector<string> tablet_ids;
Expand Down
155 changes: 76 additions & 79 deletions src/kudu/tools/tool_action_common.cc
Original file line number Diff line number Diff line change
Expand Up @@ -119,61 +119,50 @@ DEFINE_string(tables, "", "Tables to include (comma-separated list of table name
DEFINE_string(memtracker_output, "table",
"One of 'json', 'json_compact' or 'table'. Table output flattens "
"the memtracker hierarchy.");

DEFINE_int32(num_threads, 2,
"Number of threads to run. Each thread runs its own "
"KuduSession.");

namespace boost {
template <typename Signature>
class function;
} // namespace boost

namespace kudu {

namespace master {
class ListMastersRequestPB;
class ListMastersResponsePB;
class ListTabletServersRequestPB;
class ListTabletServersResponsePB;
class ReplaceTabletRequestPB;
class ReplaceTabletResponsePB;
} // namespace master

namespace tools {

using client::internal::AsyncLeaderMasterRpc;
using client::KuduClient;
using client::KuduClientBuilder;
using consensus::ConsensusServiceProxy; // NOLINT
using consensus::ReplicateMsg;
using log::LogEntryPB;
using log::LogEntryReader;
using log::ReadableLogSegment;
using master::ListMastersRequestPB;
using master::ListMastersResponsePB;
using master::ListTabletServersRequestPB;
using master::ListTabletServersResponsePB;
using master::MasterServiceProxy;
using master::ReplaceTabletRequestPB;
using master::ReplaceTabletResponsePB;
using pb_util::SecureDebugString;
using pb_util::SecureShortDebugString;
using rpc::BackoffType;
using rpc::Messenger;
using rpc::MessengerBuilder;
using rpc::RequestIdPB;
using rpc::ResponseCallback;
using rpc::RpcController;
using server::GenericServiceProxy;
using server::GetFlagsRequestPB;
using server::GetFlagsResponsePB;
using server::GetStatusRequestPB;
using server::GetStatusResponsePB;
using server::ServerClockRequestPB;
using server::ServerClockResponsePB;
using server::ServerStatusPB;
using server::SetFlagRequestPB;
using server::SetFlagResponsePB;
static bool ValidateNumThreads(const char* flag_name, int32_t flag_value) {
if (flag_value <= 0) {
LOG(ERROR) << strings::Substitute("'$0' flag should have a positive value",
flag_name);
return false;
}
return true;
}
DEFINE_validator(num_threads, &ValidateNumThreads);

using kudu::client::KuduClient;
using kudu::client::KuduClientBuilder;
using kudu::client::internal::AsyncLeaderMasterRpc;
using kudu::consensus::ConsensusServiceProxy; // NOLINT
using kudu::consensus::ReplicateMsg;
using kudu::log::LogEntryPB;
using kudu::log::LogEntryReader;
using kudu::log::ReadableLogSegment;
using kudu::master::MasterServiceProxy;
using kudu::pb_util::SecureDebugString;
using kudu::pb_util::SecureShortDebugString;
using kudu::rpc::BackoffType;
using kudu::rpc::Messenger;
using kudu::rpc::MessengerBuilder;
using kudu::rpc::RequestIdPB;
using kudu::rpc::ResponseCallback;
using kudu::rpc::RpcController;
using kudu::server::GenericServiceProxy;
using kudu::server::GetFlagsRequestPB;
using kudu::server::GetFlagsResponsePB;
using kudu::server::GetStatusRequestPB;
using kudu::server::GetStatusResponsePB;
using kudu::server::ServerClockRequestPB;
using kudu::server::ServerClockResponsePB;
using kudu::server::ServerStatusPB;
using kudu::server::SetFlagRequestPB;
using kudu::server::SetFlagResponsePB;
using kudu::tserver::TabletServerAdminServiceProxy; // NOLINT
using kudu::tserver::TabletServerServiceProxy; // NOLINT
using kudu::tserver::WriteRequestPB;
using std::cout;
using std::endl;
using std::ostream;
Expand All @@ -185,9 +174,14 @@ using std::unique_ptr;
using std::vector;
using strings::Split;
using strings::Substitute;
using tserver::TabletServerAdminServiceProxy; // NOLINT
using tserver::TabletServerServiceProxy; // NOLINT
using tserver::WriteRequestPB;

namespace boost {
template <typename Signature>
class function;
} // namespace boost

namespace kudu {
namespace tools {

const char* const kMasterAddressesArg = "master_addresses";
const char* const kMasterAddressesArgDesc = "Comma-separated list of Kudu "
Expand Down Expand Up @@ -738,32 +732,35 @@ Status LeaderMasterProxy::SyncRpc(const Req& req,

// Explicit specializations for callers outside this compilation unit.
template
Status LeaderMasterProxy::SyncRpc(const ListTabletServersRequestPB& req,
ListTabletServersResponsePB* resp,
string func_name,
const boost::function<void(MasterServiceProxy*,
const ListTabletServersRequestPB&,
ListTabletServersResponsePB*,
RpcController*,
const ResponseCallback&)>& func);
Status LeaderMasterProxy::SyncRpc(
const master::ListTabletServersRequestPB& req,
master::ListTabletServersResponsePB* resp,
string func_name,
const boost::function<void(MasterServiceProxy*,
const master::ListTabletServersRequestPB&,
master::ListTabletServersResponsePB*,
RpcController*,
const ResponseCallback&)>& func);
template
Status LeaderMasterProxy::SyncRpc(const ListMastersRequestPB& req,
ListMastersResponsePB* resp,
string func_name,
const boost::function<void(MasterServiceProxy*,
const ListMastersRequestPB&,
ListMastersResponsePB*,
RpcController*,
const ResponseCallback&)>& func);
Status LeaderMasterProxy::SyncRpc(
const master::ListMastersRequestPB& req,
master::ListMastersResponsePB* resp,
string func_name,
const boost::function<void(MasterServiceProxy*,
const master::ListMastersRequestPB&,
master::ListMastersResponsePB*,
RpcController*,
const ResponseCallback&)>& func);
template
Status LeaderMasterProxy::SyncRpc(const ReplaceTabletRequestPB& req,
ReplaceTabletResponsePB* resp,
string func_name,
const boost::function<void(MasterServiceProxy*,
const ReplaceTabletRequestPB&,
ReplaceTabletResponsePB*,
RpcController*,
const ResponseCallback&)>& func);
Status LeaderMasterProxy::SyncRpc(
const master::ReplaceTabletRequestPB& req,
master::ReplaceTabletResponsePB* resp,
string func_name,
const boost::function<void(MasterServiceProxy*,
const master::ReplaceTabletRequestPB&,
master::ReplaceTabletResponsePB*,
RpcController*,
const ResponseCallback&)>& func);

const int ControlShellProtocol::kMaxMessageBytes = 1024 * 1024;

Expand Down
44 changes: 26 additions & 18 deletions src/kudu/tools/tool_action_perf.cc
Original file line number Diff line number Diff line change
Expand Up @@ -290,10 +290,10 @@ DEFINE_bool(keep_auto_table, false,
"nor its data is ever dropped/deleted.");
DEFINE_int32(num_iters, 1,
"Number of times to run the scan.");
DEFINE_uint64(num_rows_per_thread, 1000,
"Number of rows each thread generates and inserts; "
"0 means unlimited. All rows generated by a thread are inserted "
"in the context of the same session.");
DEFINE_int64(num_rows_per_thread, 1000,
"Number of rows each thread generates and inserts; "
"-1 means unlimited. All rows generated by a thread are inserted "
"in the context of the same session.");
DECLARE_int32(num_threads);
DEFINE_bool(ordered_scan, false,
"Whether to run an ordered or unordered scan.");
Expand Down Expand Up @@ -433,8 +433,13 @@ string Generator::Next() {
// should insert across if inserting in non-random mode. In random mode, this
// is used to generate different RNG seeds per thread.
int64_t SpanPerThread(int num_columns) {
return (FLAGS_num_rows_per_thread == 0) ?
numeric_limits<int64_t>::max() / FLAGS_num_threads
CHECK_LT(0, num_columns);
CHECK_LT(0, FLAGS_num_threads);
const auto per_thread_limit = numeric_limits<int64_t>::max() /
(num_columns * FLAGS_num_threads);
return (FLAGS_num_rows_per_thread < 0 ||
FLAGS_num_rows_per_thread > per_thread_limit)
? numeric_limits<int64_t>::max() / FLAGS_num_threads
: FLAGS_num_rows_per_thread * num_columns;
}

Expand Down Expand Up @@ -512,9 +517,13 @@ void GeneratorThread(
const size_t flush_per_n_rows = FLAGS_flush_per_n_rows;
const uint64_t gen_seq_start = FLAGS_seq_start;
client::sp::shared_ptr<KuduSession> session(client->NewSession());
uint64_t idx = 0;
int64_t idx = 0;

auto generator = [&]() -> Status {
const int64_t num_rows_per_gen = FLAGS_num_rows_per_thread;
if (num_rows_per_gen == 0) {
return Status::OK();
}
RETURN_NOT_OK(session->SetMutationBufferFlushWatermark(
FLAGS_buffer_flush_watermark_pct));
RETURN_NOT_OK(session->SetMutationBufferSpace(
Expand All @@ -524,21 +533,19 @@ void GeneratorThread(
RETURN_NOT_OK(session->SetFlushMode(
flush_per_n_rows == 0 ? KuduSession::AUTO_FLUSH_BACKGROUND
: KuduSession::MANUAL_FLUSH));
const size_t num_rows_per_gen = FLAGS_num_rows_per_thread;

client::sp::shared_ptr<KuduTable> table;
RETURN_NOT_OK(client->OpenTable(table_name, &table));
const size_t num_columns = table->schema().num_columns();

// Planning for non-intersecting ranges for different generator threads
// in sequential generation mode.
const int64_t gen_span = SpanPerThread(num_columns);
const int64_t gen_span = SpanPerThread(table->schema().num_columns());
const int64_t gen_seed = gen_idx * gen_span + gen_seq_start;
Generator gen(gen_mode, gen_seed, FLAGS_string_len);
for (; num_rows_per_gen == 0 || idx < num_rows_per_gen; ++idx) {
for (; num_rows_per_gen < 0 || idx < num_rows_per_gen; ++idx) {
unique_ptr<KuduInsert> insert_op(table->NewInsert());
RETURN_NOT_OK(GenerateRowData(&gen, insert_op->mutable_row(),
FLAGS_string_fixed));
FLAGS_string_fixed));
RETURN_NOT_OK(session->Apply(insert_op.release()));
if (flush_per_n_rows != 0 && idx != 0 && idx % flush_per_n_rows == 0) {
session->FlushAsync(nullptr);
Expand Down Expand Up @@ -642,17 +649,18 @@ Status TestLoadGenerator(const RunnerContext& context) {
RETURN_NOT_OK(b.Build(&schema));

unique_ptr<KuduTableCreator> table_creator(client->NewTableCreator());
table_creator->table_name(table_name)
.schema(&schema);
table_creator->table_name(table_name).schema(&schema);
if (FLAGS_table_num_replicas > 0) {
table_creator->num_replicas(FLAGS_table_num_replicas);
}
if (FLAGS_table_num_range_partitions > 1) {
// Split the generated span for a sequential workload evenly across all
// tablets. In case we're inserting in random mode, use unbounded range
// partitioning, so the table has key coverage of the entire keyspace.
const int64_t total_inserted_span = SpanPerThread(schema.num_columns()) * FLAGS_num_threads;
const int64_t span_per_range = total_inserted_span / FLAGS_table_num_range_partitions;
const int64_t total_inserted_span =
SpanPerThread(schema.num_columns()) * FLAGS_num_threads;
const int64_t span_per_range =
total_inserted_span / FLAGS_table_num_range_partitions;
table_creator->set_range_partition_columns({ kKeyColumnName });
for (int i = 1; i < FLAGS_table_num_range_partitions; i++) {
unique_ptr<KuduPartialRow> split(schema.NewRow());
Expand All @@ -662,8 +670,8 @@ Status TestLoadGenerator(const RunnerContext& context) {
}
}
if (FLAGS_table_num_hash_partitions > 1) {
table_creator->add_hash_partitions(
vector<string>({ kKeyColumnName }), FLAGS_table_num_hash_partitions);
table_creator->add_hash_partitions({ kKeyColumnName },
FLAGS_table_num_hash_partitions);
}
RETURN_NOT_OK(table_creator->Create());
}
Expand Down

0 comments on commit 53ddc88

Please sign in to comment.