Skip to content

Commit

Permalink
KUDU-687: add replication factor to KuduTable
Browse files Browse the repository at this point in the history
This is generally useful, and necessary if ksck is to use the C++ client.

While I was at it, I reduced the use of a second table in client-test to
only those tests that actually needed it.

Change-Id: If76ec6c3001e78a31517991d7432f79d4645fccc
Reviewed-on: http://gerrit.cloudera.org:8080/4123
Tested-by: Kudu Jenkins
Reviewed-by: Todd Lipcon <[email protected]>
  • Loading branch information
adembo committed Aug 25, 2016
1 parent 86afadc commit ebe4d78
Show file tree
Hide file tree
Showing 7 changed files with 65 additions and 26 deletions.
21 changes: 15 additions & 6 deletions src/kudu/client/client-internal.cc
Original file line number Diff line number Diff line change
Expand Up @@ -542,7 +542,8 @@ Status KuduClient::Data::GetTableSchema(KuduClient* client,
const MonoTime& deadline,
KuduSchema* schema,
PartitionSchema* partition_schema,
string* table_id) {
string* table_id,
int* num_replicas) {
GetTableSchemaRequestPB req;
GetTableSchemaResponsePB resp;

Expand All @@ -565,11 +566,19 @@ Status KuduClient::Data::GetTableSchema(KuduClient* client,
*new_schema.get(),
&new_partition_schema));

// Parsing was successful; release the schemas to the user.
delete schema->schema_;
schema->schema_ = new_schema.release();
*partition_schema = std::move(new_partition_schema);
*table_id = resp.table_id();
if (schema) {
delete schema->schema_;
schema->schema_ = new_schema.release();
}
if (partition_schema) {
*partition_schema = std::move(new_partition_schema);
}
if (table_id) {
*table_id = resp.table_id();
}
if (num_replicas) {
*num_replicas = resp.num_replicas();
}
return Status::OK();
}

Expand Down
3 changes: 2 additions & 1 deletion src/kudu/client/client-internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,8 @@ class KuduClient::Data {
const MonoTime& deadline,
KuduSchema* schema,
PartitionSchema* partition_schema,
std::string* table_id);
std::string* table_id,
int* num_replicas);

Status InitLocalHostNames();

Expand Down
29 changes: 21 additions & 8 deletions src/kudu/client/client-test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ using std::string;
using std::unique_ptr;
using std::vector;
using std::map;
using strings::Substitute;

namespace kudu {
namespace client {
Expand Down Expand Up @@ -138,7 +139,6 @@ class ClientTest : public KuduTest {
.Build(&client_));

ASSERT_NO_FATAL_FAILURE(CreateTable(kTableName, 1, GenerateSplitRows(), {}, &client_table_));
ASSERT_NO_FATAL_FAILURE(CreateTable(kTable2Name, 1, {}, {}, &client_table2_));
}

// Looks up the remote tablet entry for a given partition key in the meta cache.
Expand Down Expand Up @@ -179,7 +179,6 @@ class ClientTest : public KuduTest {
protected:

static const char *kTableName;
static const char *kTable2Name;
static const int32_t kNoBound;

string GetFirstTabletId(KuduTable* table) {
Expand Down Expand Up @@ -491,7 +490,7 @@ class ClientTest : public KuduTest {
}
}
if (!ts_found) {
return Status::InvalidArgument(strings::Substitute("Could not find tablet server $1", uuid));
return Status::InvalidArgument(Substitute("Could not find tablet server $1", uuid));
}

return Status::OK();
Expand Down Expand Up @@ -522,14 +521,16 @@ class ClientTest : public KuduTest {
gscoped_ptr<MiniCluster> cluster_;
shared_ptr<KuduClient> client_;
shared_ptr<KuduTable> client_table_;
shared_ptr<KuduTable> client_table2_;
};

const char *ClientTest::kTableName = "client-testtb";
const char *ClientTest::kTable2Name = "client-testtb2";
const int32_t ClientTest::kNoBound = kint32max;

TEST_F(ClientTest, TestListTables) {
const char* kTable2Name = "client-testtb2";
shared_ptr<KuduTable> second_table;
NO_FATALS(CreateTable(kTable2Name, 1, {}, {}, &second_table));

vector<string> tables;
ASSERT_OK(client_->ListTables(&tables));
std::sort(tables.begin(), tables.end());
Expand Down Expand Up @@ -1916,6 +1917,9 @@ TEST_F(ClientTest, TestSessionClose) {
// Test which sends multiple batches through the same session, each of which
// contains multiple rows spread across multiple tablets.
TEST_F(ClientTest, TestMultipleMultiRowManualBatches) {
shared_ptr<KuduTable> second_table;
NO_FATALS(CreateTable("second table", 1, {}, {}, &second_table));

shared_ptr<KuduSession> session = client_->NewSession();
ASSERT_OK(session->SetFlushMode(KuduSession::MANUAL_FLUSH));

Expand All @@ -1928,7 +1932,7 @@ TEST_F(ClientTest, TestMultipleMultiRowManualBatches) {
for (int i = 0; i < kRowsPerBatch; i++) {
ASSERT_OK(ApplyInsertToSession(
session.get(),
(row_key % 2 == 0) ? client_table_ : client_table2_,
(row_key % 2 == 0) ? client_table_ : second_table,
row_key, row_key * 10, "hello world"));
row_key++;
}
Expand All @@ -1939,7 +1943,7 @@ TEST_F(ClientTest, TestMultipleMultiRowManualBatches) {

const int kNumRowsPerTablet = kNumBatches * kRowsPerBatch / 2;
ASSERT_EQ(kNumRowsPerTablet, CountRowsFromClient(client_table_.get()));
ASSERT_EQ(kNumRowsPerTablet, CountRowsFromClient(client_table2_.get()));
ASSERT_EQ(kNumRowsPerTablet, CountRowsFromClient(second_table.get()));

// Verify the data looks right.
vector<string> rows;
Expand Down Expand Up @@ -3102,7 +3106,7 @@ TEST_F(ClientTest, TestServerTooBusyRetry) {
int t = 0;
while (!stop) {
scoped_refptr<kudu::Thread> thread;
ASSERT_OK(kudu::Thread::Create("test", strings::Substitute("t$0", t++),
ASSERT_OK(kudu::Thread::Create("test", Substitute("t$0", t++),
&ClientTest::CheckRowCount, this, client_table_.get(), kNumRows,
&thread));
threads.push_back(thread);
Expand Down Expand Up @@ -3254,5 +3258,14 @@ TEST_F(ClientTest, TestBatchScanConstIterator) {
}
}

TEST_F(ClientTest, TestTableNumReplicas) {
for (int i : { 1, 3, 5, 7, 9 }) {
shared_ptr<KuduTable> table;
NO_FATALS(CreateTable(Substitute("table_with_$0_replicas", i),
i, {}, {}, &table));
ASSERT_EQ(i, table->num_replicas());
}
}

} // namespace client
} // namespace kudu
24 changes: 16 additions & 8 deletions src/kudu/client/client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -284,14 +284,13 @@ Status KuduClient::IsAlterTableInProgress(const string& table_name,
Status KuduClient::GetTableSchema(const string& table_name,
KuduSchema* schema) {
MonoTime deadline = MonoTime::Now() + default_admin_operation_timeout();
string table_id_ignored;
PartitionSchema partition_schema;
return data_->GetTableSchema(this,
table_name,
deadline,
schema,
&partition_schema,
&table_id_ignored);
nullptr, // partition schema
nullptr, // table id
nullptr); // number of replicas
}

Status KuduClient::ListTabletServers(vector<KuduTabletServer*>* tablet_servers) {
Expand Down Expand Up @@ -367,18 +366,21 @@ Status KuduClient::OpenTable(const string& table_name,
shared_ptr<KuduTable>* table) {
KuduSchema schema;
string table_id;
int num_replicas;
PartitionSchema partition_schema;
MonoTime deadline = MonoTime::Now() + default_admin_operation_timeout();
RETURN_NOT_OK(data_->GetTableSchema(this,
table_name,
deadline,
&schema,
&partition_schema,
&table_id));
&table_id,
&num_replicas));

// TODO: in the future, probably will look up the table in some map to reuse
// KuduTable instances.
table->reset(new KuduTable(shared_from_this(), table_name, table_id,
table->reset(new KuduTable(shared_from_this(),
table_name, table_id, num_replicas,
schema, partition_schema));
return Status::OK();
}
Expand Down Expand Up @@ -586,10 +588,12 @@ Status KuduTableCreator::Create() {

KuduTable::KuduTable(const shared_ptr<KuduClient>& client,
const string& name,
const string& table_id,
const string& id,
int num_replicas,
const KuduSchema& schema,
const PartitionSchema& partition_schema)
: data_(new KuduTable::Data(client, name, table_id, schema, partition_schema)) {
: data_(new KuduTable::Data(client, name, id, num_replicas,
schema, partition_schema)) {
}

KuduTable::~KuduTable() {
Expand All @@ -608,6 +612,10 @@ const KuduSchema& KuduTable::schema() const {
return data_->schema_;
}

int KuduTable::num_replicas() const {
return data_->num_replicas_;
}

KuduInsert* KuduTable::NewInsert() {
return new KuduInsert(shared_from_this());
}
Expand Down
6 changes: 5 additions & 1 deletion src/kudu/client/client.h
Original file line number Diff line number Diff line change
Expand Up @@ -664,6 +664,9 @@ class KUDU_EXPORT KuduTable : public sp::enable_shared_from_this<KuduTable> {
/// @return Reference to the table's schema object.
const KuduSchema& schema() const;

/// @return Replication factor of the table.
int num_replicas() const;

/// @return New @c INSERT operation for this table. It is the caller's
/// responsibility to free the result, unless it is passed to
/// KuduSession::Apply().
Expand Down Expand Up @@ -725,7 +728,8 @@ class KUDU_EXPORT KuduTable : public sp::enable_shared_from_this<KuduTable> {

KuduTable(const sp::shared_ptr<KuduClient>& client,
const std::string& name,
const std::string& table_id,
const std::string& id,
int num_replicas,
const KuduSchema& schema,
const PartitionSchema& partition_schema);

Expand Down
2 changes: 2 additions & 0 deletions src/kudu/client/table-internal.cc
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,13 @@ using sp::shared_ptr;
KuduTable::Data::Data(shared_ptr<KuduClient> client,
string name,
string id,
int num_replicas,
const KuduSchema& schema,
PartitionSchema partition_schema)
: client_(std::move(client)),
name_(std::move(name)),
id_(std::move(id)),
num_replicas_(num_replicas),
schema_(schema),
partition_schema_(std::move(partition_schema)) {
}
Expand Down
6 changes: 4 additions & 2 deletions src/kudu/client/table-internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,15 +30,17 @@ class KuduTable::Data {
public:
Data(sp::shared_ptr<KuduClient> client,
std::string name,
std::string table_id,
std::string id,
int num_replicas,
const KuduSchema& schema,
PartitionSchema partition_schema);
~Data();

sp::shared_ptr<KuduClient> client_;

std::string name_;
const std::string name_;
const std::string id_;
const int num_replicas_;

// TODO: figure out how we deal with a schema change from the client perspective.
// Do we make them call a RefreshSchema() method? Or maybe reopen the table and get
Expand Down

0 comments on commit ebe4d78

Please sign in to comment.