Skip to content

Commit

Permalink
[Tool] Fix unit test ToolTest.TableCopyLimitSpeed
Browse files Browse the repository at this point in the history
This patch refactor some code and fix an test based on the
patch: https://gerrit.cloudera.org/c/21527/

Change-Id: I8906a8c069f6133fab30b3f2da7723e98c82d869
Reviewed-on: http://gerrit.cloudera.org:8080/21609
Reviewed-by: Alexey Serbin <[email protected]>
Tested-by: Alexey Serbin <[email protected]>
xinghuayu007 authored and alexeyserbin committed Jul 27, 2024
1 parent 065acfb commit 69f57e9
Showing 3 changed files with 14 additions and 9 deletions.
8 changes: 4 additions & 4 deletions src/kudu/tools/kudu-tool-test.cc
Original file line number Diff line number Diff line change
@@ -6026,16 +6026,16 @@ TEST_F(ToolTest, TableCopyLimitSpeed) {
.add_master_server_addr(master_addr)
.Build(&client));
shared_ptr<KuduTable> table;
client->OpenTable(kNewTableName, &table);
ASSERT_OK(client->OpenTable(kNewTableName, &table));
KuduScanner scanner(table.get());
scanner.Open();
ASSERT_OK(scanner.Open());
KuduScanBatch batch;
int64_t data_size = 0;
while (scanner.HasMoreRows()) {
ASSERT_OK(scanner.NextBatch(&batch));
data_size = batch.direct_data().size() + batch.indirect_data().size();
data_size += batch.direct_data().size() + batch.indirect_data().size();
}
// Table copy speed must less than table_copy_throttler_bytes_per_sec.
// Table copy speed must be less than table_copy_throttler_bytes_per_sec.
ASSERT_LE(data_size / (end_time - start_time).ToSeconds(), table_copy_throttler_bytes_per_sec);
}

9 changes: 6 additions & 3 deletions src/kudu/tools/table_scanner.cc
Original file line number Diff line number Diff line change
@@ -61,6 +61,7 @@
#include "kudu/util/slice.h"
#include "kudu/util/stopwatch.h"
#include "kudu/util/string_case.h"
#include "kudu/util/threadpool.h"
#include "kudu/util/throttler.h"

using kudu::client::KuduClient;
@@ -582,12 +583,14 @@ TableScanner::TableScanner(
out_(nullptr) {
CHECK_OK(SetReplicaSelection(FLAGS_replica_selection));
if (FLAGS_table_copy_throttler_bytes_per_sec > 0) {
throttler_ = std::make_shared<Throttler>(Throttler::kNoLimit,
throttler_ = std::make_unique<Throttler>(Throttler::kNoLimit,
FLAGS_table_copy_throttler_bytes_per_sec,
FLAGS_table_copy_throttler_burst_factor);
}
}

TableScanner::~TableScanner() {}

Status TableScanner::ScanData(const vector<KuduScanToken*>& tokens,
const function<Status(const KuduScanBatch& batch)>& cb) {
for (const auto* token : tokens) {
@@ -608,9 +611,9 @@ Status TableScanner::ScanData(const vector<KuduScanToken*>& tokens,
count += batch.NumRows();
total_count_ += batch.NumRows();
++next_batch_calls;
// Limit table copy speed.
// Limit table copying speed.
if (throttler_) {
SCOPED_LOG_SLOW_EXECUTION(WARNING, 1000, "Table copy throttler");
SCOPED_LOG_SLOW_EXECUTION(INFO, 1000, "Table copy throttler");
while (!throttler_->Take(0,
batch.direct_data().size() + batch.indirect_data().size())) {
SleepFor(MonoDelta::FromMicroseconds(Throttler::kRefillPeriodMicros / 2));
6 changes: 4 additions & 2 deletions src/kudu/tools/table_scanner.h
Original file line number Diff line number Diff line change
@@ -32,9 +32,9 @@
#include "kudu/client/write_op.h"
#include "kudu/util/mutex.h"
#include "kudu/util/status.h"
#include "kudu/util/threadpool.h"

namespace kudu {
class ThreadPool;
class Throttler;

namespace tools {
@@ -48,6 +48,8 @@ class TableScanner {
std::nullopt,
std::optional<std::string> dst_table_name = std::nullopt);

~TableScanner();

// Set output stream of this tool, or disable output if not set.
// 'out' must remain valid for the lifetime of this class.
void SetOutput(std::ostream* out);
@@ -104,7 +106,7 @@ class TableScanner {
std::optional<std::string> dst_table_name_;
int32_t scan_batch_size_;
std::unique_ptr<ThreadPool> thread_pool_;
std::shared_ptr<Throttler> throttler_;
std::unique_ptr<Throttler> throttler_;

// Protects output to 'out_' so that rows don't get interleaved.
Mutex output_lock_;

0 comments on commit 69f57e9

Please sign in to comment.