Skip to content

Commit

Permalink
KUDU-1363: Add IN-list predicate type
Browse files Browse the repository at this point in the history
Adds support in the C++ client for providing a set of equalities for a
given column. Support for using IN list predicates from the Java client
will be in a follow-up commit.

Change-Id: I986cb13097f1cf4af2b752f58c4c38f412a6a598
Reviewed-on: http://gerrit.cloudera.org:8080/2986
Tested-by: Kudu Jenkins
Reviewed-by: Alexey Serbin <[email protected]>
  • Loading branch information
Sameer Abhyankar authored and danburkert committed Sep 29, 2016
1 parent 17d1367 commit 9b50bd5
Show file tree
Hide file tree
Showing 16 changed files with 1,000 additions and 37 deletions.
32 changes: 25 additions & 7 deletions src/kudu/client/client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
#include "kudu/common/row_operations.h"
#include "kudu/common/wire_protocol.h"
#include "kudu/gutil/map-util.h"
#include "kudu/gutil/stl_util.h"
#include "kudu/gutil/strings/substitute.h"
#include "kudu/master/master.pb.h"
#include "kudu/master/master.proxy.h"
Expand Down Expand Up @@ -177,11 +178,11 @@ Status SetInternalSignalNumber(int signum) {
return SetStackTraceSignal(signum);
}

std::string GetShortVersionString() {
string GetShortVersionString() {
return VersionInfo::GetShortVersionString();
}

std::string GetAllVersionInfo() {
string GetAllVersionInfo() {
return VersionInfo::GetAllVersionInfo();
}

Expand Down Expand Up @@ -357,7 +358,7 @@ Status KuduClient::ListTables(vector<string>* tables,
}

Status KuduClient::TableExists(const string& table_name, bool* exists) {
std::vector<std::string> tables;
vector<string> tables;
RETURN_NOT_OK(ListTables(&tables, table_name));
for (const string& table : tables) {
if (table == table_name) {
Expand Down Expand Up @@ -497,12 +498,12 @@ KuduTableCreator& KuduTableCreator::schema(const KuduSchema* schema) {
return *this;
}

KuduTableCreator& KuduTableCreator::add_hash_partitions(const std::vector<std::string>& columns,
KuduTableCreator& KuduTableCreator::add_hash_partitions(const vector<string>& columns,
int32_t num_buckets) {
return add_hash_partitions(columns, num_buckets, 0);
}

KuduTableCreator& KuduTableCreator::add_hash_partitions(const std::vector<std::string>& columns,
KuduTableCreator& KuduTableCreator::add_hash_partitions(const vector<string>& columns,
int32_t num_buckets, int32_t seed) {
PartitionSchemaPB::HashBucketSchemaPB* bucket_schema =
data_->partition_schema_.add_hash_bucket_schemas();
Expand All @@ -514,8 +515,7 @@ KuduTableCreator& KuduTableCreator::add_hash_partitions(const std::vector<std::s
return *this;
}

KuduTableCreator& KuduTableCreator::set_range_partition_columns(
const std::vector<std::string>& columns) {
KuduTableCreator& KuduTableCreator::set_range_partition_columns(const vector<string>& columns) {
PartitionSchemaPB::RangeSchemaPB* range_schema =
data_->partition_schema_.mutable_range_schema();
range_schema->Clear();
Expand Down Expand Up @@ -721,6 +721,24 @@ KuduPredicate* KuduTable::NewComparisonPredicate(const Slice& col_name,
return new KuduPredicate(new ComparisonPredicateData(s->column(col_idx), op, value));
}

KuduPredicate* KuduTable::NewInListPredicate(const Slice& col_name,
vector<KuduValue*>* values) {
StringPiece name_sp(reinterpret_cast<const char*>(col_name.data()), col_name.size());
const Schema* s = data_->schema_.schema_;
int col_idx = s->find_column(name_sp);
if (col_idx == Schema::kColumnNotFound) {
// Since this function doesn't return an error, instead we create a special
// predicate that just returns the errors when we add it to the scanner.
//
// This makes the API more "fluent".
STLDeleteElements(values); // we always take ownership of 'values'.
delete values;
return new KuduPredicate(new ErrorPredicateData(
Status::NotFound("column not found", col_name)));
}
return new KuduPredicate(new InListPredicateData(s->column(col_idx), values));
}

////////////////////////////////////////////////////////////
// Error
////////////////////////////////////////////////////////////
Expand Down
26 changes: 26 additions & 0 deletions src/kudu/client/client.h
Original file line number Diff line number Diff line change
Expand Up @@ -826,6 +826,32 @@ class KUDU_EXPORT KuduTable : public sp::enable_shared_from_this<KuduTable> {
KuduPredicate::ComparisonOp op,
KuduValue* value);

/// Create a new IN list predicate which can be used for scanners on this
/// table.
///
/// The IN list predicate is used to specify a list of values that a column
/// must match. A row is filtered from the scan if the value of the column
/// does not equal any value from the list.
///
/// The type of entries in the list must correspond to the type of the column
/// to which the predicate is to be applied. For example, if the given column
/// is any type of integer, the KuduValues should also be integers, with the
/// values in the valid range for the column type. No attempt is made to cast
/// between floating point and integer values, or numeric and string values.
///
/// @param [in] col_name
/// Name of the column to which the predicate applies.
/// @param [in] values
/// Vector of values which the column will be matched against.
/// @return Raw pointer to an IN list predicate. The caller owns the predicate
/// until it is passed into KuduScanner::AddConjunctPredicate(). The
/// returned predicate takes ownership of the values vector and its
/// elements. In the case of an error (e.g. an invalid column name), a
/// non-NULL value is still returned. The error will be returned when
/// attempting to add this predicate to a KuduScanner.
KuduPredicate* NewInListPredicate(const Slice& col_name,
std::vector<KuduValue*>* values);

/// @return The KuduClient object associated with the table. The caller
/// should not free the returned pointer.
KuduClient* client() const;
Expand Down
93 changes: 93 additions & 0 deletions src/kudu/client/predicate-test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,19 @@ class PredicateTest : public KuduTest {
return rows;
}

template <typename T>
int CountMatchedRows(const vector<T>& values, const vector<T>& test_values) {

int count = 0;
for (const T& v : values) {
if (std::any_of(test_values.begin(), test_values.end(),
[&] (const T& t) { return t == v; })) {
count += 1;
}
}
return count;
}

// Returns a vector of ints from -50 (inclusive) to 50 (exclusive), and
// boundary values.
template <typename T>
Expand Down Expand Up @@ -289,6 +302,20 @@ class PredicateTest : public KuduTest {
}));
}
}

// IN list predicates
std::random_shuffle(test_values.begin(), test_values.end());

for (auto end = test_values.begin(); end <= test_values.end(); end++) {
vector<KuduValue*> vals;

for (auto itr = test_values.begin(); itr != end; itr++) {
vals.push_back(KuduValue::FromInt(*itr));
}

int count = CountMatchedRows<T>(values, vector<T>(test_values.begin(), end));
ASSERT_EQ(count, CountRows(table, { table->NewInListPredicate("value", &vals) }));
}
}

// Check string predicates against the specified table.
Expand Down Expand Up @@ -384,6 +411,20 @@ class PredicateTest : public KuduTest {
}));
}
}

// IN list predicates
std::random_shuffle(test_values.begin(), test_values.end());

for (auto end = test_values.begin(); end <= test_values.end(); end++) {
vector<KuduValue*> vals;

for (auto itr = test_values.begin(); itr != end; itr++) {
vals.push_back(KuduValue::CopyString(*itr));
}

int count = CountMatchedRows<string>(values, vector<string>(test_values.begin(), end));
ASSERT_EQ(count, CountRows(table, { table->NewInListPredicate("value", &vals) }));
}
}

shared_ptr<KuduClient> client_;
Expand Down Expand Up @@ -479,6 +520,30 @@ TEST_F(PredicateTest, TestBoolPredicates) {
KuduValue::FromBool(true));
ASSERT_EQ(1, CountRows(table, { pred }));
}

{ // value IN ()
vector<KuduValue*> values = { };
KuduPredicate* pred = table->NewInListPredicate("value", &values);
ASSERT_EQ(0, CountRows(table, { pred }));
}

{ // value IN (true)
vector<KuduValue*> values = { KuduValue::FromBool(true) };
KuduPredicate* pred = table->NewInListPredicate("value", &values);
ASSERT_EQ(1, CountRows(table, { pred }));
}

{ // value IN (false)
vector<KuduValue*> values = { KuduValue::FromBool(false) };
KuduPredicate* pred = table->NewInListPredicate("value", &values);
ASSERT_EQ(1, CountRows(table, { pred }));
}

{ // value IN (true, false)
vector<KuduValue*> values = { KuduValue::FromBool(false), KuduValue::FromBool(true) };
KuduPredicate* pred = table->NewInListPredicate("value", &values);
ASSERT_EQ(2, CountRows(table, { pred }));
}
}

TEST_F(PredicateTest, TestInt8Predicates) {
Expand Down Expand Up @@ -682,6 +747,20 @@ TEST_F(PredicateTest, TestFloatPredicates) {
}));
}
}

// IN list predicates
std::random_shuffle(test_values.begin(), test_values.end());

for (auto end = test_values.begin(); end <= test_values.end(); end++) {
vector<KuduValue*> vals;

for (auto itr = test_values.begin(); itr != end; itr++) {
vals.push_back(KuduValue::FromFloat(*itr));
}

int count = CountMatchedRows<float>(values, vector<float>(test_values.begin(), end));
ASSERT_EQ(count, CountRows(table, { table->NewInListPredicate("value", &vals) }));
}
}

TEST_F(PredicateTest, TestDoublePredicates) {
Expand Down Expand Up @@ -785,6 +864,20 @@ TEST_F(PredicateTest, TestDoublePredicates) {
}));
}
}

// IN list predicates
std::random_shuffle(test_values.begin(), test_values.end());

for (auto end = test_values.begin(); end <= test_values.end(); end++) {
vector<KuduValue*> vals;

for (auto itr = test_values.begin(); itr != end; itr++) {
vals.push_back(KuduValue::FromDouble(*itr));
}

int count = CountMatchedRows<double>(values, vector<double>(test_values.begin(), end));
ASSERT_EQ(count, CountRows(table, { table->NewInListPredicate("value", &vals) }));
}
}

TEST_F(PredicateTest, TestStringPredicates) {
Expand Down
33 changes: 30 additions & 3 deletions src/kudu/client/scan_predicate-internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,11 @@
#ifndef KUDU_CLIENT_SCAN_PREDICATE_INTERNAL_H
#define KUDU_CLIENT_SCAN_PREDICATE_INTERNAL_H

#include <vector>

#include "kudu/client/scan_predicate.h"
#include "kudu/client/value.h"
#include "kudu/client/value-internal.h"
#include "kudu/client/value.h"
#include "kudu/common/scan_spec.h"
#include "kudu/gutil/macros.h"
#include "kudu/util/memory/arena.h"
Expand All @@ -46,7 +48,7 @@ class KuduPredicate::Data {
class ErrorPredicateData : public KuduPredicate::Data {
public:
explicit ErrorPredicateData(const Status& s)
: status_(s) {
: status_(s) {
}

virtual ~ErrorPredicateData() {
Expand Down Expand Up @@ -77,7 +79,7 @@ class ComparisonPredicateData : public KuduPredicate::Data {
Status AddToScanSpec(ScanSpec* spec, Arena* arena) override;

ComparisonPredicateData* Clone() const override {
return new ComparisonPredicateData(col_, op_, val_->Clone());
return new ComparisonPredicateData(col_, op_, val_->Clone());
}

private:
Expand All @@ -88,6 +90,31 @@ class ComparisonPredicateData : public KuduPredicate::Data {
gscoped_ptr<KuduValue> val_;
};

// A list predicate for a column and a list of constant values.
class InListPredicateData : public KuduPredicate::Data {
public:
InListPredicateData(ColumnSchema col, std::vector<KuduValue*>* values);

virtual ~InListPredicateData();

Status AddToScanSpec(ScanSpec* spec, Arena* arena) override;

InListPredicateData* Clone() const override {
std::vector<KuduValue*> values(vals_.size());
for (KuduValue* val : vals_) {
values.push_back(val->Clone());
}

return new InListPredicateData(col_, &values);
}

private:
friend class KuduScanner;

ColumnSchema col_;
std::vector<KuduValue*> vals_;
};

} // namespace client
} // namespace kudu
#endif /* KUDU_CLIENT_SCAN_PREDICATE_INTERNAL_H */
35 changes: 35 additions & 0 deletions src/kudu/client/scan_predicate.cc
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,17 @@

#include <boost/optional.hpp>
#include <utility>
#include <vector>

#include "kudu/client/scan_predicate-internal.h"
#include "kudu/client/value-internal.h"
#include "kudu/client/value.h"
#include "kudu/common/scan_spec.h"
#include "kudu/gutil/stl_util.h"
#include "kudu/gutil/strings/substitute.h"

using std::move;
using std::vector;
using boost::optional;

namespace kudu {
Expand Down Expand Up @@ -104,5 +107,37 @@ Status ComparisonPredicateData::AddToScanSpec(ScanSpec* spec, Arena* arena) {
return Status::OK();
}

InListPredicateData::InListPredicateData(ColumnSchema col,
vector<KuduValue*>* values)
: col_(move(col)) {
vals_.swap(*values);
}

InListPredicateData::~InListPredicateData() {
STLDeleteElements(&vals_);
}

Status InListPredicateData::AddToScanSpec(ScanSpec* spec, Arena* /*arena*/) {
vector<const void*> vals_list;
vals_list.reserve(vals_.size());
for (auto value : vals_) {
void* val_void;
// The local vals_ list consists of KuduValue pointers that make up the IN
// list predicate. For every value in the vals_ list a call to
// CheckTypeAndGetPointer is made to get a local pointer (void*) to the
// underlying value. The local list (vals_list) of all the void* pointers is
// passed to the ColumnPredicate::InList constructor. The constructor for
// ColumnPredicate::InList will assume ownership of the pointers via a swap.
RETURN_NOT_OK(value->data_->CheckTypeAndGetPointer(col_.name(),
col_.type_info()->physical_type(),
&val_void));
vals_list.push_back(val_void);
}

spec->AddPredicate(ColumnPredicate::InList(col_, &vals_list));

return Status::OK();
}

} // namespace client
} // namespace kudu
1 change: 1 addition & 0 deletions src/kudu/client/scan_predicate.h
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ class KUDU_EXPORT KuduPredicate {
private:
friend class ComparisonPredicateData;
friend class ErrorPredicateData;
friend class InListPredicateData;
friend class KuduTable;
friend class ScanConfiguration;

Expand Down
1 change: 1 addition & 0 deletions src/kudu/client/value.h
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ class KUDU_EXPORT KuduValue {
~KuduValue();
private:
friend class ComparisonPredicateData;
friend class InListPredicateData;
friend class KuduColumnSpec;

class KUDU_NO_EXPORT Data;
Expand Down
Loading

0 comments on commit 9b50bd5

Please sign in to comment.