Skip to content

Commit

Permalink
[tools] KUDU-1945: Kudu table copy and perf loadgen
Browse files Browse the repository at this point in the history
This patch introduces support for 'table copy' CLI tool for the tables
with auto-incrementing columns. While scanning the source table, we do
not scan the auto incrementing column and while writing back the
rows to the destination table, we write all the scanned rows. The
auto-incrementing column is then populated at the server side during
each row's write.

It also adds support for 'perf loadgen' CLI tool to insert into tables
with auto incrementing columns. This currently only works with insert
and insert_ignore write operation. upsert and upsert_ignore are not
supported currently.

Change-Id: I754a7e84c16d1f3b2d52be937e1eb50b3d00d759
Reviewed-on: http://gerrit.cloudera.org:8080/19890
Reviewed-by: Alexey Serbin <[email protected]>
Tested-by: Kudu Jenkins
  • Loading branch information
achennaka committed May 24, 2023
1 parent 8c546ae commit 65bf631
Showing 4 changed files with 129 additions and 9 deletions.
5 changes: 4 additions & 1 deletion src/kudu/integration-tests/data_gen_util.cc
Original file line number Diff line number Diff line change
@@ -82,10 +82,13 @@ void GenerateDataForRow(const client::KuduSchema& schema, uint64_t record_id,
RNG* random, KuduPartialRow* row) {
for (int col_idx = 0; col_idx < schema.num_columns(); col_idx++) {
// We randomly generate the inserted data, except for the first column,
// which is always based on a monotonic "record id".
// which is always based on a monotonic "record id" and the auto-incrementing column,
// if present.
uint64_t value;
if (col_idx == 0) {
value = record_id;
} else if (col_idx == schema.GetAutoIncrementingColumnIndex()) {
continue;
} else {
value = random->Next64();
}
84 changes: 79 additions & 5 deletions src/kudu/tools/kudu-tool-test.cc
Original file line number Diff line number Diff line change
@@ -853,7 +853,8 @@ enum RunCopyTableCheckArgsType {
kTestCopyTableComplexSchema,
kTestCopyUnpartitionedTable,
kTestCopyTablePredicates,
kTestCopyTableWithStringBounds
kTestCopyTableWithStringBounds,
kTestCopyTableAutoIncrementingColumn
};
// Subclass of ToolTest that allows running individual test cases with different parameters to run
// 'kudu table copy' CLI tool.
@@ -864,9 +865,10 @@ class ToolTestCopyTableParameterized :
void SetUp() override {
test_case_ = GetParam();
ExternalMiniClusterOptions opts;
if (test_case_ == kTestCopyTableSchemaOnly) {
// In kTestCopyTableSchemaOnly case, we may create table with RF=3,
// means 3 tservers needed at least.
if (test_case_ == kTestCopyTableSchemaOnly ||
test_case_ == kTestCopyTableAutoIncrementingColumn) {
// In kTestCopyTableSchemaOnly and kTestCopyTableAutoIncrementingColumn
// case, we may create table with RF=3, means 3 tservers needed at least.
opts.num_tablet_servers = 3;
}
NO_FATALS(StartExternalMiniCluster(opts));
@@ -893,6 +895,10 @@ class ToolTestCopyTableParameterized :
ww.set_schema(schema);
ww.Setup();
return;
} else if (test_case_ == kTestCopyTableAutoIncrementingColumn) {
KuduSchema schema;
ASSERT_OK(CreateAutoIncrementingTable(&schema));
ww.set_schema(schema);
}
ww.Setup();
ww.Start();
@@ -922,6 +928,10 @@ class ToolTestCopyTableParameterized :
case kTestCopyTableDstTableNotExist:
args.mode = TableCopyMode::INSERT_TO_NEW_TABLE;
return { args };
case kTestCopyTableAutoIncrementingColumn:
args.mode = TableCopyMode::INSERT_TO_NEW_TABLE;
args.columns = kAutoIncrementingSchemaColumns;
return { args };
case kTestCopyTableInsertIgnore:
args.mode = TableCopyMode::INSERT_IGNORE_TO_EXISTING_TABLE;
return { args };
@@ -1186,6 +1196,23 @@ class ToolTestCopyTableParameterized :
.Create();
}

Status CreateAutoIncrementingTable(KuduSchema* schema) {
shared_ptr<KuduClient> client;
RETURN_NOT_OK(cluster_->CreateClient(nullptr, &client));
unique_ptr<KuduTableCreator> table_creator(client->NewTableCreator());
KuduSchemaBuilder b;
b.AddColumn("key")->Type(client::KuduColumnSchema::INT32)->NotNull()->NonUniquePrimaryKey();
b.AddColumn("int_val")->Type(client::KuduColumnSchema::INT32);
b.AddColumn("string_val")->Type(client::KuduColumnSchema::STRING)->Nullable();
RETURN_NOT_OK(b.Build(schema));

return table_creator->table_name(kTableName)
.schema(schema)
.set_range_partition_columns({})
.num_replicas(3)
.Create();
}

void InsertOneRowWithNullCell() {
shared_ptr<KuduClient> client;
ASSERT_OK(cluster_->CreateClient(nullptr, &client));
@@ -1203,12 +1230,15 @@ class ToolTestCopyTableParameterized :

static const char kTableName[];
static const char kSimpleSchemaColumns[];
static const char kAutoIncrementingSchemaColumns[];
static const char kComplexSchemaColumns[];
int test_case_ = 0;
int64_t total_rows_ = 0;
};
const char ToolTestCopyTableParameterized::kTableName[] = "ToolTestCopyTableParameterized";
const char ToolTestCopyTableParameterized::kSimpleSchemaColumns[] = "key,int_val,string_val";
const char ToolTestCopyTableParameterized::kAutoIncrementingSchemaColumns[]
= "key,int_val,string_val";
const char ToolTestCopyTableParameterized::kComplexSchemaColumns[]
= "key_hash0,key_hash1,key_hash2,key_range,int8_val,int16_val,int32_val,int64_val,"
"timestamp_val,string_val,bool_val,float_val,double_val,binary_val,decimal_val";
@@ -1224,7 +1254,8 @@ INSTANTIATE_TEST_SUITE_P(CopyTableParameterized,
kTestCopyTableComplexSchema,
kTestCopyUnpartitionedTable,
kTestCopyTablePredicates,
kTestCopyTableWithStringBounds));
kTestCopyTableWithStringBounds,
kTestCopyTableAutoIncrementingColumn));

void ToolTest::StartExternalMiniCluster(ExternalMiniClusterOptions opts) {
cluster_.reset(new ExternalMiniCluster(std::move(opts)));
@@ -3727,6 +3758,49 @@ TEST_F(ToolTest, TestLoadgenAutoGenTablePartitioning) {
ASSERT_OK(WaitForNumTabletsOnTS(ts, expected_tablets, kTimeout));
}

TEST_F(ToolTest, TestLoadgenAutoIncrementingColumn) {
shared_ptr<KuduClient> client;
const string kTableName = "loadgen_auto_incrementing";
NO_FATALS(StartExternalMiniCluster());

// Create a table with auto-incrementing column and a single tablet
// for simplicity in the test case.
ASSERT_OK(cluster_->CreateClient(nullptr, &client));
unique_ptr<KuduTableCreator> table_creator(client->NewTableCreator());
KuduSchemaBuilder b;
b.AddColumn("key")->Type(client::KuduColumnSchema::INT32)->NotNull()->NonUniquePrimaryKey();
b.AddColumn("int_val")->Type(client::KuduColumnSchema::INT32);
KuduSchema schema;
ASSERT_OK(b.Build(&schema));
ASSERT_OK(table_creator->table_name(kTableName)
.schema(&schema)
.set_range_partition_columns({})
.num_replicas(1)
.Create());

// Insert data into the table with perf loadgen tool
constexpr int kNumRows = 100;
NO_FATALS(RunTool(
Substitute("perf loadgen $0 -table_name=$1 -num_threads=1 "
"-num_rows_per_thread=$2",
cluster_->master()->bound_rpc_addr().ToString(),
kTableName, kNumRows)));

// Scan the data back and validate
vector<string> lines;
NO_FATALS(RunActionStdoutLines(
Substitute("table scan $0 $1 -num_threads=1",
cluster_->master()->bound_rpc_addr().ToString(),
kTableName),
&lines));

ASSERT_EQ(kNumRows + 1, lines.size());
for (int i = 0; i < kNumRows; i++) {
ASSERT_STR_CONTAINS(lines[i], Substitute("int32 key=$0, int64 auto_incrementing_id=$1,"
" int32 int_val", i, i+1));
}
}

// Run the loadgen with txn-related options.
TEST_F(ToolTest, LoadgenTxnBasics) {
SKIP_IF_SLOW_NOT_ALLOWED();
46 changes: 43 additions & 3 deletions src/kudu/tools/table_scanner.cc
Original file line number Diff line number Diff line change
@@ -750,6 +750,27 @@ Status TableScanner::StartWork(WorkType work_type) {
}
}

if (work_type == WorkType::kCopy) {
// If we are copying a table we do not want to scan the auto-incrementing column as it would be
// populated on the server side. This would avoid scanning an entire column of the table.
if (src_table->schema().GetAutoIncrementingColumnIndex() != -1) {
vector<string> projected_column_names;
for (int i = 0; i < src_table->schema().num_columns(); i++) {
if (src_table->schema().Column(i).name() == KuduSchema::GetAutoIncrementingColumnName()) {
continue;
}
projected_column_names.emplace_back(src_table->schema().Column(i).name());
}
RETURN_NOT_OK(builder.SetProjectedColumnNames(projected_column_names));
}
// Ensure both the source and destination table schemas are identical at this point.
client::sp::shared_ptr<KuduTable> dst_table;
RETURN_NOT_OK(dst_client_->get()->OpenTable(*dst_table_name_, &dst_table));
if (dst_table->schema() != src_table->schema()) {
Status::InvalidArgument("source and destination tables should have the same schema");
}
}

// Set predicates.
RETURN_NOT_OK(AddPredicates(src_table, &builder));

@@ -853,10 +874,29 @@ Status TableScanner::AddRow(KuduSession* session,
break; // unreachable
}

// If the destination table has auto-incrementing column, we do not set it
// as we skip scanning the auto-incrementing column while scanning the source table.
auto* dst_row = write_op->mutable_row();
memcpy(dst_row->row_data_, src_row.row_data_,
ContiguousRowHelper::row_size(*src_row.schema_));
BitmapChangeBits(dst_row->isset_bitmap_, 0, table->schema().num_columns(), true);
const int auto_incrementing_col_idx = table->schema().GetAutoIncrementingColumnIndex();
if (auto_incrementing_col_idx == Schema::kColumnNotFound) {
memcpy(dst_row->row_data_, src_row.row_data_,
ContiguousRowHelper::row_size(*src_row.schema_));
BitmapChangeBits(dst_row->isset_bitmap_, 0, table->schema().num_columns(), true);
} else {
int src_iterator = 0;
for (int dst_iterator = 0; dst_iterator < table->schema().num_columns(); dst_iterator++) {
if (auto_incrementing_col_idx != dst_iterator) {
if (src_row.IsNull(src_iterator)) {
RETURN_NOT_OK(dst_row->SetNull(dst_iterator));
} else {
RETURN_NOT_OK(dst_row->Set(dst_iterator, src_row.row_data_ +
src_row->schema_->column_offset(src_iterator)));
}
BitmapChange(dst_row->isset_bitmap_, dst_iterator, true);
src_iterator++;
}
}
}

return session->Apply(write_op.release());
}
3 changes: 3 additions & 0 deletions src/kudu/tools/tool_action_perf.cc
Original file line number Diff line number Diff line change
@@ -528,6 +528,9 @@ Status GenerateRowData(Generator* key_gen, Generator* value_gen, KuduPartialRow*
// when perform DELETE operations.
Generator* gen = key_gen;
for (size_t idx = 0; idx < gen_column_count; ++idx) {
if (columns[idx].is_auto_incrementing()) {
continue;
}
if (idx == row->schema()->num_key_columns()) {
gen = value_gen;
}

0 comments on commit 65bf631

Please sign in to comment.