Skip to content

Commit

Permalink
KUDU-1671 - [python] Enable predicate pushdown for additional types
Browse files Browse the repository at this point in the history
Currently, the python client does not support predicate pushdown for
boolean and unixtime_micros values. Additionally, as pointed out in
KUDU-1672, float predicates have a bug. This patch addresses both
of these issues.  Test cases have been added to validate this
functionality.  Two minor namespace issues were addressed as well
for float and boolean types.

Change-Id: If5766d24055dfba5fa371fc61c6dfd66adc54273
Reviewed-on: http://gerrit.cloudera.org:8080/4589
Tested-by: Kudu Jenkins
Reviewed-by: David Ribeiro Alves <[email protected]>
  • Loading branch information
jtbirdsell authored and dralves committed Oct 6, 2016
1 parent bce1dd7 commit 4d1b1e9
Show file tree
Hide file tree
Showing 6 changed files with 182 additions and 28 deletions.
4 changes: 2 additions & 2 deletions python/kudu/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,8 @@
KuduInvalidArgument)

from kudu.schema import (int8, int16, int32, int64, string_ as string, # noqa
double_ as double, float_, binary,
unixtime_micros,
double_ as double, float_, float_ as float, binary,
unixtime_micros, bool_ as bool,
KuduType,
SchemaBuilder, ColumnSpec, Schema, ColumnSchema,
COMPRESSION_DEFAULT,
Expand Down
40 changes: 20 additions & 20 deletions python/kudu/client.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -766,21 +766,28 @@ cdef class Column:
cdef KuduValue* box_value(self, object obj) except NULL:
cdef:
KuduValue* val
Slice* slc
Slice slc

if isinstance(obj, unicode):
obj = obj.encode('utf8')

if isinstance(obj, bytes):
slc = new Slice(<char*> obj, len(obj))
val = KuduValue.CopyString(deref(slc))
del slc
elif isinstance(obj, int):
if (self.spec.type.name[:3] == 'int'):
val = KuduValue.FromInt(obj)
elif isinstance(obj, float):
elif (self.spec.type.name == 'string'):
if isinstance(obj, unicode):
obj = obj.encode('utf8')

slc = Slice(<char*> obj, len(obj))
val = KuduValue.CopyString(slc)
elif (self.spec.type.name == 'bool'):
val = KuduValue.FromBool(obj)
elif (self.spec.type.name == 'float'):
val = KuduValue.FromFloat(obj)
elif (self.spec.type.name == 'double'):
val = KuduValue.FromDouble(obj)
elif (self.spec.type.name == 'unixtime_micros'):
obj = to_unixtime_micros(obj)
val = KuduValue.FromInt(obj)
else:
raise TypeError(obj)
raise TypeError("Cannot add predicate for kudu type <{0}>"
.format(self.spec.type.name))

return val

Expand Down Expand Up @@ -2006,15 +2013,8 @@ cdef class PartialRow:
self.row.SetStringCopy(i, deref(slc))
del slc
elif t == KUDU_UNIXTIME_MICROS:
# String with custom format
# eg: ("2016-01-01", "%Y-%m-%d")
if type(value) is tuple:
self.row.SetUnixTimeMicros(i, <int64_t>
to_unixtime_micros(value[0], value[1]))
# datetime.datetime input or string with default format
else:
self.row.SetUnixTimeMicros(i, <int64_t>
to_unixtime_micros(value))
self.row.SetUnixTimeMicros(i, <int64_t>
to_unixtime_micros(value))
else:
raise TypeError("Cannot set kudu type <{0}>.".format(_type_names[t]))

Expand Down
37 changes: 35 additions & 2 deletions python/kudu/tests/test_scanner.py
Original file line number Diff line number Diff line change
Expand Up @@ -141,14 +141,14 @@ def test_scan_invalid_predicates(self):
with self.assertRaises(TypeError):
scanner.add_predicates([sv >= None])

with self.assertRaises(kudu.KuduInvalidArgument):
with self.assertRaises(TypeError):
scanner.add_predicates([sv >= 1])

with self.assertRaises(TypeError):
scanner.add_predicates([sv.in_list(['testing',
datetime.datetime.utcnow()])])

with self.assertRaises(kudu.KuduInvalidArgument):
with self.assertRaises(TypeError):
scanner.add_predicates([sv.in_list([
'hello_20',
120
Expand Down Expand Up @@ -214,3 +214,36 @@ def test_read_mode(self):
check_tuples = sorted(scanner.read_all_tuples())
# Avoid tight looping
time.sleep(0.05)

def verify_pred_type_scans(self, preds, row_indexes, count_only=False):
# Using the incoming list of predicates, verify that the row returned
# matches the inserted tuple at the row indexes specified in a
# slice object
scanner = self.type_table.scanner()
scanner.set_fault_tolerant()
scanner.add_predicates(preds)
scanner.set_projected_column_names(self.projected_names_w_o_float)
tuples = scanner.open().read_all_tuples()

# verify rows
if count_only:
self.assertEqual(len(self.type_test_rows[row_indexes]), len(tuples))
else:
self.assertEqual(sorted(self.type_test_rows[row_indexes]), tuples)

def test_unixtime_micros_pred(self):
# Test unixtime_micros value predicate
self._test_unixtime_micros_pred()

def test_bool_pred(self):
# Test a boolean value predicate
self._test_bool_pred()

def test_double_pred(self):
# Test a double precision float predicate
self._test_double_pred()

def test_float_pred(self):
# Test a single precision float predicate
# Does a row check count only
self._test_float_pred()
40 changes: 37 additions & 3 deletions python/kudu/tests/test_scantoken.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ def setUpClass(self):
def setUp(self):
pass

def _subtest_serialize_thread_and_verify(self, tokens, expected_tuples):
def _subtest_serialize_thread_and_verify(self, tokens, expected_tuples, count_only=False):
"""
Given the input serialized tokens, spawn new threads,
execute them and validate the results
Expand All @@ -55,7 +55,10 @@ def _subtest_serialize_thread_and_verify(self, tokens, expected_tuples):
for result in results:
actual_tuples += result

self.assertEqual(sorted(expected_tuples), sorted(actual_tuples))
if count_only:
self.assertEqual(expected_tuples, actual_tuples)
else:
self.assertEqual(sorted(expected_tuples), sorted(actual_tuples))

def test_scan_token_serde_threaded_with_named_projection(self):
"""
Expand Down Expand Up @@ -113,7 +116,7 @@ def test_scan_token_invalid_predicates(self):
with self.assertRaises(TypeError):
builder.add_predicates([sv >= None])

with self.assertRaises(kudu.KuduInvalidArgument):
with self.assertRaises(TypeError):
builder.add_predicates([sv >= 1])

def test_scan_token_batch_by_batch_with_local_scanner(self):
Expand Down Expand Up @@ -209,3 +212,34 @@ def test_read_mode(self):
tuples.extend(scanner.read_all_tuples())

self.assertEqual(sorted(self.tuples), sorted(tuples))

def verify_pred_type_scans(self, preds, row_indexes, count_only=False):
# Using the incoming list of predicates, verify that the row returned
# matches the inserted tuple at the row indexes specified in a
# slice object
builder = self.type_table.scan_token_builder()
builder.set_fault_tolerant()
builder.set_projected_column_names(self.projected_names_w_o_float)
builder.add_predicates(preds)

# Verify rows
self._subtest_serialize_thread_and_verify(builder.build(),
self.type_test_rows[row_indexes],
count_only)

def test_unixtime_micros_pred(self):
# Test unixtime_micros value predicate
self._test_unixtime_micros_pred()

def test_bool_pred(self):
# Test a boolean value predicate
self._test_bool_pred()

def test_double_pred(self):
# Test a double precision float predicate
self._test_double_pred()

def test_float_pred(self):
# Test a single precision float predicate
# Does a row check count only
self._test_float_pred()
84 changes: 83 additions & 1 deletion python/kudu/tests/util.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
# -*- coding: utf-8 -*-
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
Expand All @@ -17,6 +18,7 @@
# under the License.

from kudu.compat import unittest
from kudu.client import Partitioning
from kudu.tests.common import KuduTestBase
import kudu
import datetime
Expand Down Expand Up @@ -58,6 +60,50 @@ def setUpClass(self):
self.table = table
self.tuples = tuples

# Create table to test all types
# for various predicate tests
table_name = 'type-test'
# Create schema, partitioning and then table
builder = kudu.schema_builder()
builder.add_column('key').type(kudu.int64).nullable(False).primary_key()
builder.add_column('unixtime_micros_val', type_=kudu.unixtime_micros, nullable=False)
builder.add_column('string_val', type_=kudu.string, compression=kudu.COMPRESSION_LZ4, encoding='prefix')
builder.add_column('bool_val', type_=kudu.bool)
builder.add_column('double_val', type_=kudu.double)
builder.add_column('int8_val', type_=kudu.int8)
builder.add_column('float_val', type_=kudu.float)
schema = builder.build()

self.projected_names_w_o_float = [
col for col in schema.names if col != 'float_val'
]

partitioning = Partitioning().add_hash_partitions(column_names=['key'], num_buckets=3)

self.client.create_table(table_name, schema, partitioning)
self.type_table = self.client.table(table_name)

# Insert new rows
self.type_test_rows = [
(1, datetime.datetime(2016, 1, 1).replace(tzinfo=pytz.utc),
"Test One", True, 1.7976931348623157 * (10^308), 127, 3.402823 * (10^38)),
(2, datetime.datetime.utcnow().replace(tzinfo=pytz.utc),
"测试二", False, 200.1, -1, -150.2)
]
session = self.client.new_session()
for row in self.type_test_rows:
op = self.type_table.new_insert()
for idx, val in enumerate(row):
op[idx] = val
session.apply(op)
session.flush()

# Remove the float values from the type_test_rows tuples so we can
# compare the other vals
self.type_test_rows = [
tuple[:-1] for tuple in self.type_test_rows
]

def setUp(self):
pass

Expand Down Expand Up @@ -127,4 +173,40 @@ def delete_insert_row_for_read_test(self):
for idx, val in enumerate(row):
op[idx] = val
session.apply(op)
session.flush()
session.flush()

def _test_unixtime_micros_pred(self):
# Test unixtime_micros value predicate
self.verify_pred_type_scans(
preds=[
self.type_table['unixtime_micros_val'] == ("2016-01-01", "%Y-%m-%d")
],
row_indexes=slice(0,1)
)

def _test_bool_pred(self):
# Test a boolean value predicate
self.verify_pred_type_scans(
preds=[
self.type_table['bool_val'] == False
],
row_indexes=slice(1,2)
)

def _test_double_pred(self):
# Test a double precision float predicate
self.verify_pred_type_scans(
preds=[
self.type_table['double_val'] < 200.11
],
row_indexes=slice(1,2)
)

def _test_float_pred(self):
self.verify_pred_type_scans(
preds=[
self.type_table['float_val'] == 3.402823 * (10^38)
],
row_indexes=slice(0, 1),
count_only=True
)
5 changes: 5 additions & 0 deletions python/kudu/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,9 @@ def to_unixtime_micros(timestamp, format = "%Y-%m-%dT%H:%M:%S.%f"):
---------
timestamp : datetime.datetime or string
If a string is provided, a format must be provided as well.
A tuple may be provided in place of the timestamp with a
string value and a format. This is useful for predicates
and setting values where this method is indirectly called.
Timezones provided in the string are not supported at this
time. UTC unless provided in a datetime object.
format : Required if a string timestamp is provided
Expand All @@ -60,6 +63,8 @@ def to_unixtime_micros(timestamp, format = "%Y-%m-%dT%H:%M:%S.%f"):
pass
elif isinstance(timestamp, six.string_types):
timestamp = datetime.datetime.strptime(timestamp, format)
elif isinstance(timestamp, tuple):
timestamp = datetime.datetime.strptime(timestamp[0], timestamp[1])
else:
raise ValueError("Invalid timestamp type. " +
"You must provide a datetime.datetime or a string.")
Expand Down

0 comments on commit 4d1b1e9

Please sign in to comment.