Skip to content

Commit

Permalink
KUDU-2671 range-specific hash schema support in Python client
Browse files Browse the repository at this point in the history
With this patch, it's now possible to operate on ranges having custom
hash schemas in Kudu Python client applications.  In essence, the newly
added API directly maps into the Kudu C++ client API.  This patch also
contains tests to cover the newly introduced functionality.

Change-Id: I61426fadc45d70805cf99461d559f0152a79f4a0
Reviewed-on: http://gerrit.cloudera.org:8080/18771
Reviewed-by: Attila Bukor <[email protected]>
Tested-by: Kudu Jenkins
  • Loading branch information
alexeyserbin committed Jul 22, 2022
1 parent 8902ca6 commit 3946ce5
Show file tree
Hide file tree
Showing 3 changed files with 259 additions and 1 deletion.
139 changes: 139 additions & 0 deletions python/kudu/client.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -419,6 +419,7 @@ cdef class Client:
PartialRow lower_bound
PartialRow upper_bound
PartialRow split_row
KuduRangePartition* range_partition

# Apply hash partitioning.
for col_names, num_buckets, seed in part._hash_partitions:
Expand All @@ -429,12 +430,36 @@ cdef class Client:
c.add_hash_partitions(v, num_buckets, seed)
else:
c.add_hash_partitions(v, num_buckets)

# Apply range partitioning
if part._range_partition_cols is not None:
v.clear()
for n in part._range_partition_cols:
v.push_back(tobytes(n))
c.set_range_partition_columns(v)
if part._range_partitions_with_custom_hash_schemas:
for p in part._range_partitions_with_custom_hash_schemas:
if not isinstance(p.lower_bound, PartialRow):
lower_bound = schema.new_row(p.lower_bound)
else:
lower_bound = p.lower_bound
lower_bound._own = 0
if not isinstance(p.upper_bound, PartialRow):
upper_bound = schema.new_row(p.upper_bound)
else:
upper_bound = p.upper_bound
upper_bound._own = 0
range_partition = new KuduRangePartition(
lower_bound.row,
upper_bound.row,
p.lower_bound_type,
p.upper_bound_type)
for col_names, num_buckets, seed in p.hash_dimensions:
v.clear()
for n in col_names:
v.push_back(tobytes(n))
range_partition.add_hash_partitions(v, num_buckets, seed if seed else 0)
c.add_custom_range_partition(range_partition)
if part._range_partitions:
for partition in part._range_partitions:
if not isinstance(partition[0], PartialRow):
Expand Down Expand Up @@ -1208,13 +1233,61 @@ cdef class Column:

return result


class RangePartition(object):
"""
Argument to Client.add_custom_range_partition(...) to contain information
on the range bounds and range-specific hash schema.
"""
def __init__(self,
lower_bound=None,
upper_bound=None,
lower_bound_type='inclusive',
upper_bound_type='exclusive'):
"""
Parameters
----------
lower_bound : PartialRow/list/tuple/dict
upper_bound : PartialRow/list/tuple/dict
lower_bound_type : {'inclusive', 'exclusive'} or constants
kudu.EXCLUSIVE_BOUND and kudu.INCLUSIVE_BOUND
upper_bound_type : {'inclusive', 'exclusive'} or constants
kudu.EXCLUSIVE_BOUND and kudu.INCLUSIVE_BOUND
"""
self.lower_bound = lower_bound
self.upper_bound = upper_bound
self.lower_bound_type = _check_convert_range_bound_type(lower_bound_type)
self.upper_bound_type = _check_convert_range_bound_type(upper_bound_type)
self.hash_dimensions = []

def add_hash_partitions(self, column_names, num_buckets, seed=None):
"""
Adds a dimension with the specified parameters to the custom hash schema
for this range partition.
Parameters
----------
column_names : list of string column names on which to compute hash function
num_buckets : the number of buckets for the hash function
seed : int - optional; the seed for the hash function mapping rows to buckets
Returns
-------
self: this object
"""
if isinstance(column_names, str):
column_names = [column_names]
self.hash_dimensions.append( (column_names, num_buckets, seed) )
return self

class Partitioning(object):
""" Argument to Client.create_table(...) to describe table partitioning. """

def __init__(self):
self._hash_partitions = []
self._range_partition_cols = None
self._range_partitions = []
self._range_partitions_with_custom_hash_schemas = []
self._range_partition_splits = []

def add_hash_partitions(self, column_names, num_buckets, seed=None):
Expand Down Expand Up @@ -1302,9 +1375,27 @@ class Partitioning(object):
else:
raise ValueError("Range Partition Columns must be set before " +
"adding a range partition.")
return self


def add_custom_range_partition(self, range_partition):
"""
Parameters
----------
range_partition : range partition with custom hash schema to add
Returns
-------
self : Partitioning
"""
if self._range_partition_cols is None:
raise ValueError("Range Partition Columns must be set before " +
"adding a range partition.")

self._range_partitions_with_custom_hash_schemas.append(range_partition)
return self


def add_range_partition_split(self, split_row):
"""
Add a range partition split at the provided row.
Expand Down Expand Up @@ -3219,6 +3310,54 @@ cdef class TableAlterer:
_check_convert_range_bound_type(upper_bound_type)
)


def add_custom_range_partition(self, range_partition):
"""
Add a range partition with custom hash schema.
Multiple range partitions may be added as part of a single alter table
transaction by calling this method multiple times on the table alterer.
This client may immediately write and scan the new tablets when Alter()
returns success, however other existing clients may have to wait for a
timeout period to elapse before the tablets become visible. This period
is configured by the master's 'table_locations_ttl_ms' flag, and
defaults to 5 minutes.
Parameters
----------
range_partition : RangePartition
Returns
-------
self : TableAlterer
"""
cdef:
vector[string] v
KuduRangePartition* p
PartialRow lower_bound
PartialRow upper_bound

if not isinstance(range_partition.lower_bound, PartialRow):
lower_bound = self._table.schema.new_row(range_partition.lower_bound)
else:
lower_bound = range_partition.lower_bound
lower_bound._own = 0
if not isinstance(range_partition.upper_bound, PartialRow):
upper_bound = self._table.schema.new_row(range_partition.upper_bound)
else:
upper_bound = range_partition.upper_bound
upper_bound._own = 0
p = new KuduRangePartition(
lower_bound.row, upper_bound.row,
range_partition.lower_bound_type, range_partition.upper_bound_type)
for col_names, num_buckets, seed in range_partition.hash_dimensions:
v.clear()
for n in col_names:
v.push_back(tobytes(n))
p.add_hash_partitions(v, num_buckets, seed if seed else 0)
self._alterer.AddRangePartition(p)

def drop_range_partition(self, lower_bound=None,
upper_bound=None,
lower_bound_type='inclusive',
Expand Down
12 changes: 12 additions & 0 deletions python/kudu/libkudu_client.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -596,7 +596,17 @@ cdef extern from "kudu/client/client.h" namespace "kudu::client" nogil:
string& hostname()
uint16_t port()

cdef cppclass KuduRangePartition:
KuduRangePartition(KuduPartialRow* lower_bound,
KuduPartialRow* upper_bound,
RangePartitionBound lower_bound_type,
RangePartitionBound upper_bound_type)
Status add_hash_partitions(vector[string]& columns,
int num_buckets,
int seed)

cdef cppclass KuduTableCreator:

KuduTableCreator& table_name(string& name)
KuduTableCreator& schema(KuduSchema* schema)
KuduTableCreator& add_hash_partitions(vector[string]& columns,
Expand All @@ -609,6 +619,7 @@ cdef extern from "kudu/client/client.h" namespace "kudu::client" nogil:
KuduPartialRow* upper_bound,
RangePartitionBound lower_bound_type,
RangePartitionBound upper_bound_type)
KuduTableCreator& add_custom_range_partition(KuduRangePartition* p)
KuduTableCreator& add_range_partition_split(KuduPartialRow* split_row)
KuduTableCreator& split_rows(vector[const KuduPartialRow*]& split_rows)
KuduTableCreator& num_replicas(int n_replicas)
Expand All @@ -627,6 +638,7 @@ cdef extern from "kudu/client/client.h" namespace "kudu::client" nogil:
KuduPartialRow* upper_bound,
RangePartitionBound lower_bound_type,
RangePartitionBound upper_bound_type)
KuduTableAlterer& AddRangePartition(KuduRangePartition* p)
KuduTableAlterer& DropRangePartition(KuduPartialRow* lower_bound,
KuduPartialRow* upper_bound,
RangePartitionBound lower_bound_type,
Expand Down
109 changes: 108 additions & 1 deletion python/kudu/tests/test_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,10 @@

from kudu.compat import unittest, long
from kudu.tests.common import KuduTestBase
from kudu.client import (Partitioning, ENCRYPTION_OPTIONAL, ENCRYPTION_REQUIRED,
from kudu.client import (Partitioning,
RangePartition,
ENCRYPTION_OPTIONAL,
ENCRYPTION_REQUIRED,
ENCRYPTION_REQUIRED_REMOTE)
import kudu
import datetime
Expand Down Expand Up @@ -155,6 +158,44 @@ def test_create_partitioned_table(self):
except:
pass

def test_create_table_with_range_specific_hash_schemas(self):
table_name = 'create_table_range_specific_hash_schemas'
try:
# define range with custom hash schema
p = RangePartition({'key': 0}, {'key': 100})
p.add_hash_partitions(['key'], 5)

self.client.create_table(
table_name, self.schema,
partitioning=Partitioning()
.set_range_partition_columns(['key'])
.add_hash_partitions(['key'], 2)
.add_range_partition(
lower_bound={'key': -100},
upper_bound={'key': 0})
.add_custom_range_partition(p)
)

# rely on 1-1 mapping between tokens and tablets for full table scan
table = self.client.table(table_name)
builder = table.scan_token_builder()
builder.set_fault_tolerant()
tokens = builder.build()
self.assertEqual(7, len(tokens))

session = self.client.new_session()
for i in range(-100, 100):
op = table.new_insert((i, i))
session.apply(op)
session.flush()

self.client.delete_table(table_name)
finally:
try:
self.client.delete_table(table_name)
except:
pass

def test_create_table_with_different_owner(self):
name = 'table_with_different_owner'
try:
Expand Down Expand Up @@ -500,6 +541,72 @@ def test_alter_table_add_drop_partition(self):
alterer.add_range_partition()
table = alterer.alter()

def test_alter_table_add_partition_with_custom_hash_schema(self):
table_name = 'add_partition_with_custom_hash_schema'
try:
# create table with [-100, 0) range having table-wide hash schema
self.client.create_table(
table_name, self.schema,
partitioning=Partitioning()
.set_range_partition_columns(['key'])
.add_hash_partitions(['key'], 3)
.add_range_partition(
lower_bound={'key': -100},
upper_bound={'key': 0})
)

# open the newly created table
table = self.client.table(table_name)

# define range with custom hash schema
p = RangePartition({'key': 0}, {'key': 100})
p.add_hash_partitions(['key'], 2, 8)

alterer = self.client.new_table_alterer(table)
alterer.add_custom_range_partition(p)
table = alterer.alter()

# rely on 1-1 mapping between tokens and tablets for full table scan
builder = table.scan_token_builder()
builder.set_fault_tolerant()
tokens = builder.build()
self.assertEqual(5, len(tokens))

session = self.client.new_session()
for i in range(-100, 100):
op = table.new_insert((i, i))
session.apply(op)
session.flush()

# drop the new custom range partition that hash just been added
alterer = self.client.new_table_alterer(table)
alterer.drop_range_partition({'key': 0}, {'key': 100})
table = alterer.alter()

# rely on 1-1 mapping between tokens and tablets for full table scan
builder = table.scan_token_builder()
builder.set_fault_tolerant()
tokens = builder.build()
self.assertEqual(3, len(tokens))

# drop the range partition that have table-wide hash schema
alterer = self.client.new_table_alterer(table)
alterer.drop_range_partition({'key': -100}, {'key': 0})
table = alterer.alter()

# rely on 1-1 mapping between tokens and tablets for full table scan
builder = table.scan_token_builder()
builder.set_fault_tolerant()
tokens = builder.build()
self.assertEqual(0, len(tokens))

self.client.delete_table(table_name)
finally:
try:
self.client.delete_table(table_name)
except:
pass

def test_require_encryption(self):
client = kudu.connect(self.master_hosts, self.master_ports,
encryption_policy=ENCRYPTION_REQUIRED)
Expand Down

0 comments on commit 3946ce5

Please sign in to comment.