Skip to content

Commit

Permalink
KUDU-1648 - [python] Expose Setting of Range Partitions
Browse files Browse the repository at this point in the history
Currently, the Python client does not allow developers
to set range partitions. This patch adds that capability
and includes updates to existing tests.

Change-Id: Ib1e2c9a49196c6dd6644388d08014acd7593d4aa
Reviewed-on: http://gerrit.cloudera.org:8080/4795
Tested-by: Kudu Jenkins
Reviewed-by: Jean-Daniel Cryans <[email protected]>
  • Loading branch information
jtbirdsell authored and jdcryans committed Nov 7, 2016
1 parent 13ffec6 commit 0f87b04
Show file tree
Hide file tree
Showing 5 changed files with 174 additions and 11 deletions.
4 changes: 3 additions & 1 deletion python/kudu/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,9 @@
FLUSH_AUTO_SYNC,
FLUSH_MANUAL,
READ_LATEST,
READ_AT_SNAPSHOT)
READ_AT_SNAPSHOT,
EXCLUSIVE_BOUND,
INCLUSIVE_BOUND)

from kudu.errors import (KuduException, KuduBadStatus, KuduNotFound, # noqa
KuduNotSupported,
Expand Down
129 changes: 122 additions & 7 deletions python/kudu/client.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,33 @@ cdef dict _type_names = {
KUDU_UNIXTIME_MICROS : "KUDU_UNIXTIME_MICROS"
}

# Range Partition Bound Type enums
EXCLUSIVE_BOUND = PartitionType_Exclusive
INCLUSIVE_BOUND = PartitionType_Inclusive

cdef dict _partition_bound_types = {
'exclusive': PartitionType_Exclusive,
'inclusive': PartitionType_Inclusive
}

def _check_convert_range_bound_type(bound):
# Convert bounds types to constants and raise exception if invalid.
def invalid_bound_type(bound_type):
raise ValueError('Invalid range partition bound type: {0}'
.format(bound_type))

if isinstance(bound, int):
if bound >= len(_partition_bound_types) \
or bound < 0:
invalid_bound_type(bound)
else:
return bound
else:
try:
return _partition_bound_types[bound.lower()]
except KeyError:
invalid_bound_type(bound)


cdef class TimeDelta:
"""
Expand Down Expand Up @@ -304,18 +331,21 @@ cdef class Client:
try:
c.table_name(tobytes(table_name))
c.schema(schema.schema)
self._apply_partitioning(c, partitioning)
self._apply_partitioning(c, partitioning, schema)
if n_replicas:
c.num_replicas(n_replicas)
s = c.Create()
check_status(s)
finally:
del c

cdef _apply_partitioning(self, KuduTableCreator* c, part):
cdef _apply_partitioning(self, KuduTableCreator* c, part, Schema schema):
cdef:
vector[string] v
PartialRow py_row
PartialRow lower_bound
PartialRow upper_bound
PartialRow split_row

# Apply hash partitioning.
for col_names, num_buckets, seed in part._hash_partitions:
v.clear()
Expand All @@ -331,6 +361,32 @@ cdef class Client:
for n in part._range_partition_cols:
v.push_back(tobytes(n))
c.set_range_partition_columns(v)
if part._range_partitions:
for partition in part._range_partitions:
if not isinstance(partition[0], PartialRow):
lower_bound = schema.new_row(partition[0])
else:
lower_bound = partition[0]
lower_bound._own = 0
if not isinstance(partition[1], PartialRow):
upper_bound = schema.new_row(partition[1])
else:
upper_bound = partition[1]
upper_bound._own = 0
c.add_range_partition(
lower_bound.row,
upper_bound.row,
_check_convert_range_bound_type(partition[2]),
_check_convert_range_bound_type(partition[3])
)
if part._range_partition_splits:
for split in part._range_partition_splits:
if not isinstance(split, PartialRow):
split_row = schema.new_row(split)
else:
split_row = split
split_row._own = 0
c.add_range_partition_split(split_row.row)

def delete_table(self, table_name):
"""
Expand Down Expand Up @@ -944,6 +1000,8 @@ class Partitioning(object):
def __init__(self):
self._hash_partitions = []
self._range_partition_cols = None
self._range_partitions = []
self._range_partition_splits = []

def add_hash_partitions(self, column_names, num_buckets, seed=None):
"""
Expand Down Expand Up @@ -994,9 +1052,62 @@ class Partitioning(object):
self._range_partition_cols = column_names
return self

# TODO: implement split_rows.
# This is slightly tricky since currently the PartialRow constructor requires a
# Table object, which doesn't exist yet. Should we use tuples instead?
def add_range_partition(self, lower_bound=None,
upper_bound=None,
lower_bound_type='inclusive',
upper_bound_type='exclusive'):
"""
Add a range partition to the table.
Multiple range partitions may be added, but they must not overlap.
All range splits specified by add_range_partition_split must fall
in a range partition. The lower bound must be less than or equal
to the upper bound.
If this method is not called, the table's range will be unbounded.
Parameters
----------
lower_bound : PartialRow/list/tuple/dict
upper_bound : PartialRow/list/tuple/dict
lower_bound_type : {'inclusive', 'exclusive'} or constants
kudu.EXCLUSIVE_BOUND and kudu.INCLUSIVE_BOUND
upper_bound_type : {'inclusive', 'exclusive'} or constants
kudu.EXCLUSIVE_BOUND and kudu.INCLUSIVE_BOUND
Returns
-------
self : Partitioning
"""
if self._range_partition_cols:
self._range_partitions.append(
(lower_bound, upper_bound, lower_bound_type, upper_bound_type)
)
else:
raise ValueError("Range Partition Columns must be set before " +
"adding a range partition.")

return self

def add_range_partition_split(self, split_row):
"""
Add a range partition split at the provided row.
Parameters
----------
split_row : PartialRow/list/tuple/dict
Returns
-------
self : Partitioning
"""
if self._range_partition_cols:
self._range_partition_splits.append(split_row)
else:
raise ValueError("Range Partition Columns must be set before " +
"adding a range partition split.")

return self


cdef class Predicate:
Expand Down Expand Up @@ -2244,7 +2355,11 @@ cdef class PartialRow:
if isinstance(key, basestring):
self.set_field(key, value)
else:
self.set_loc(key, value)
if 0 <= key < len(self.schema):
self.set_loc(key, value)
else:
raise IndexError("Column index {0} is out of bounds."
.format(key))

def from_record(self, record):
"""
Expand Down
9 changes: 9 additions & 0 deletions python/kudu/libkudu_client.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -464,6 +464,10 @@ cdef extern from "kudu/client/client.h" namespace "kudu::client" nogil:
ReadMode_Latest " kudu::client::KuduScanner::READ_LATEST"
ReadMode_Snapshot " kudu::client::KuduScanner::READ_AT_SNAPSHOT"

enum RangePartitionBound" kudu::client::KuduTableCreator::RangePartitionBound":
PartitionType_Exclusive " kudu::client::KuduTableCreator::EXCLUSIVE_BOUND"
PartitionType_Inclusive " kudu::client::KuduTableCreator::INCLUSIVE_BOUND"

cdef cppclass KuduClient:

Status DeleteTable(const string& table_name)
Expand Down Expand Up @@ -518,6 +522,11 @@ cdef extern from "kudu/client/client.h" namespace "kudu::client" nogil:
int num_buckets,
int seed)
KuduTableCreator& set_range_partition_columns(vector[string]& columns)
KuduTableCreator& add_range_partition(KuduPartialRow* lower_bound,
KuduPartialRow* upper_bound,
RangePartitionBound lower_bound_type,
RangePartitionBound upper_bound_type)
KuduTableCreator& add_range_partition_split(KuduPartialRow* split_row)
KuduTableCreator& split_rows(vector[const KuduPartialRow*]& split_rows)
KuduTableCreator& num_replicas(int n_replicas)
KuduTableCreator& wait(c_bool wait)
Expand Down
26 changes: 25 additions & 1 deletion python/kudu/tests/test_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,12 @@ def test_create_partitioned_table(self):

self.client.create_table(
name, self.schema,
partitioning=Partitioning().set_range_partition_columns([]))
partitioning=Partitioning()
.set_range_partition_columns(['key'])
.add_range_partition_split({'key': 10})
.add_range_partition_split([20])
.add_range_partition_split((30,))
)
self.client.delete_table(name)

self.client.create_table(
Expand Down Expand Up @@ -246,6 +251,25 @@ def test_list_tablet_server(self):
assert tserver.hostname() is not None
assert tserver.port() is not None

def test_bad_partialrow(self):
table = self.client.table(self.ex_table)
op = table.new_insert()
# Test bad keys or indexes
keys = [
('not-there', KeyError),
(len(self.schema) + 1, IndexError),
(-1, IndexError)
]

for key in keys:
with self.assertRaises(key[1]):
op[key[0]] = 'test'

# Test incorrectly typed data
with self.assertRaises(TypeError):
op['int_val'] = 'incorrect'


class TestMonoDelta(unittest.TestCase):

def test_empty_ctor(self):
Expand Down
17 changes: 15 additions & 2 deletions python/kudu/tests/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,21 +65,34 @@ def setUpClass(self):
table_name = 'type-test'
# Create schema, partitioning and then table
builder = kudu.schema_builder()
builder.add_column('key').type(kudu.int64).nullable(False).primary_key()
builder.add_column('key').type(kudu.int64).nullable(False)
builder.add_column('unixtime_micros_val', type_=kudu.unixtime_micros, nullable=False)
builder.add_column('string_val', type_=kudu.string, compression=kudu.COMPRESSION_LZ4, encoding='prefix')
builder.add_column('bool_val', type_=kudu.bool)
builder.add_column('double_val', type_=kudu.double)
builder.add_column('int8_val', type_=kudu.int8)
builder.add_column('binary_val', type_='binary', compression=kudu.COMPRESSION_SNAPPY, encoding='prefix')
builder.add_column('float_val', type_=kudu.float)
builder.set_primary_keys(['key', 'unixtime_micros_val'])
schema = builder.build()

self.projected_names_w_o_float = [
col for col in schema.names if col != 'float_val'
]

partitioning = Partitioning().add_hash_partitions(column_names=['key'], num_buckets=3)
partitioning = Partitioning() \
.add_hash_partitions(column_names=['key'], num_buckets=3)\
.set_range_partition_columns(['unixtime_micros_val'])\
.add_range_partition(
upper_bound={'unixtime_micros_val': ("2016-01-01", "%Y-%m-%d")},
upper_bound_type=kudu.EXCLUSIVE_BOUND
)\
.add_range_partition(
lower_bound={'unixtime_micros_val': datetime.datetime(2016, 1, 1)},
lower_bound_type='INCLUSIVE',
upper_bound={'unixtime_micros_val': datetime.datetime(9999, 12, 31)}
)


self.client.create_table(table_name, schema, partitioning)
self.type_table = self.client.table(table_name)
Expand Down

0 comments on commit 0f87b04

Please sign in to comment.