Skip to content

Commit

Permalink
Fix UDF regressions. Fixes cloudera#29. Add UDF exec tests
Browse files Browse the repository at this point in the history
* Pulled in a newer `udf.h`

* Impala UDF named structs now pulled out of precompiled module with dummy fn

* UDFs now link a fresh precompiled module into each UDF

* Split C++ code into string implementations and type spec; updated `Makefile` and `setup.py`

* ImpalaContext now cleans up UDFs and UDAs

* Switched UDF modules to absolute imports
  • Loading branch information
laserson committed Oct 8, 2014
1 parent b238d4b commit 7120e8a
Show file tree
Hide file tree
Showing 17 changed files with 396 additions and 63 deletions.
8 changes: 8 additions & 0 deletions DEVELOP.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,14 @@ checking out the repo to develop on it do NOT need to run the codegen. Codegen
performed with Thrift 0.9.x.


#### UDF maintenance

Copy a fresh copy of the `udf.h` header file

```bash
cp $IMPALA_REPO/be/src/udf/udf.h $IMPYLA_REPO/impala/udf/precompiled
```

#### Release

1. Generate a summary of all the commits since the last release
Expand Down
21 changes: 11 additions & 10 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -13,19 +13,20 @@
# limitations under the License.

PREFIX=impala/udf/precompiled
INCLUDE_DIR=$(PREFIX)
SRC_FILE=$(PREFIX)/impala-precompiled.cc
OUTPUT_FILE=$(PREFIX)/impala-precompiled.bc
OUTPUT_FILE=$(PREFIX)/impyla.bc

# if LLVM_CONFIG unset, try the default
if [ -z $(LLVM_CONFIG) ]; then LLVM_CONFIG=$(which llvm-config); fi;
ifndef LLVM_CONFIG
$(error llvm-config not in PATH -- please set LLVM_CONFIG=path/to/llvm-config)
# if LLVM_CONFIG_PATH unset, try the default
if [ -z $(LLVM_CONFIG_PATH) ]; then LLVM_CONFIG_PATH=$(which llvm-config); fi;
ifndef LLVM_CONFIG_PATH
$(error llvm-config not in PATH -- please set LLVM_CONFIG_PATH=path/to/llvm-config)
endif
CLANG=$(shell $(LLVM_CONFIG) --bindir)/clang++
CLANG=$(shell $(LLVM_CONFIG_PATH) --bindir)/clang++
LINK=$(shell $(LLVM_CONFIG_PATH) --bindir)/llvm-link

all:
$(CLANG) -emit-llvm -O0 -I $(INCLUDE_DIR) -c $(SRC_FILE) -o $(OUTPUT_FILE)
$(CLANG) -c -emit-llvm -O0 -o $(PREFIX)/impala-types.bc $(PREFIX)/impala-types.cc
$(CLANG) -c -emit-llvm -O0 -o $(PREFIX)/string-impl.bc $(PREFIX)/string-impl.cc
$(LINK) -o $(OUTPUT_FILE) $(PREFIX)/*.bc

clean:
rm -f $(OUTPUT_FILE)
rm -f $(PREFIX)/*.bc
8 changes: 8 additions & 0 deletions impala/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,14 @@ def close(self):
temp_tables = [x[0] for x in self._cursor.fetchall()]
for table in temp_tables:
self._cursor.execute('DROP TABLE IF EXISTS %s.%s' % (self._temp_db, table))
self._cursor.execute('SHOW FUNCTIONS')
temp_udfs = [x[1] for x in self._cursor.fetchall()]
for udf in temp_udfs:
self._cursor.execute('DROP FUNCTION IF EXISTS %s.%s' % (self._temp_db, udf))
self._cursor.execute('SHOW AGGREGATE FUNCTIONS')
temp_udas = [x[1] for x in self._cursor.fetchall()]
for uda in temp_udas:
self._cursor.execute('DROP AGGREGATE FUNCTION IF EXISTS %s.%s' % (self._temp_db, uda))
self._cursor.execute('USE default')
self._cursor.execute('DROP DATABASE IF EXISTS %s' % self._temp_db)
# drop the temp dir in HDFS
Expand Down
199 changes: 199 additions & 0 deletions impala/tests/test_udf_execution.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,199 @@
# Copyright 2014 Cloudera Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import string

from impala.udf import udf, ship_udf
from impala.udf.types import (FunctionContext, BooleanVal, SmallIntVal, IntVal,
BigIntVal, StringVal)

def test_return_null(ic):
@udf(IntVal(FunctionContext, IntVal))
def return_null(context, a):
return None
ship_udf(ic, return_null, overwrite=True)
ic._cursor.execute('SELECT %s.return_null(10)' % ic._temp_db)
results = ic._cursor.fetchall()
assert results[0][0] is None

def test_int_predicates(ic):
@udf(BooleanVal(FunctionContext, IntVal))
def int_predicate(context, a):
if a > 10:
return True
else:
return False
ship_udf(ic, int_predicate, overwrite=True)
ic._cursor.execute('SELECT %s.int_predicate(10)' % ic._temp_db)
results = ic._cursor.fetchall()
assert results[0][0] == False
ic._cursor.execute('SELECT %s.int_predicate(11)' % ic._temp_db)
results = ic._cursor.fetchall()
assert results[0][0] == True

def test_numeric_literals(ic):
@udf(BigIntVal(FunctionContext, SmallIntVal))
def numeric_literals(context, a):
if a is None:
return 1729
elif a < 0:
return None
elif a < 10:
return a + 5
else:
return a * 2
ship_udf(ic, numeric_literals, overwrite=True)
ic._cursor.execute('SELECT %s.numeric_literals(NULL)' % ic._temp_db)
results = ic._cursor.fetchall()
assert results[0][0] == 1729
ic._cursor.execute('SELECT %s.numeric_literals(-5)' % ic._temp_db)
results = ic._cursor.fetchall()
assert results[0][0] is None
ic._cursor.execute('SELECT %s.numeric_literals(2)' % ic._temp_db)
results = ic._cursor.fetchall()
assert results[0][0] == 7
ic._cursor.execute('SELECT %s.numeric_literals(12)' % ic._temp_db)
results = ic._cursor.fetchall()
assert results[0][0] == 24

def test_int_promotion(ic):
@udf(BigIntVal(FunctionContext, IntVal))
def int_promotion(context, x):
return x + 1
ship_udf(ic, int_promotion, overwrite=True)
ic._cursor.execute('SELECT %s.int_promotion(2)' % ic._temp_db)
results = ic._cursor.fetchall()
assert results[0][0] == 3
assert ic._cursor.description[0][1] == 'BIGINT'


# test StringVal fns

def test_return_string_literal(ic):
@udf(StringVal(FunctionContext, StringVal))
def return_string_literal(context, a):
return "bar"
ship_udf(ic, return_string_literal, overwrite=True)
ic._cursor.execute('SELECT %s.return_string_literal("foo")' % ic._temp_db)
results = ic._cursor.fetchall()
assert results[0][0] == 'bar'

def test_return_two_str_literals(ic):
@udf(StringVal(FunctionContext, IntVal))
def return_two_str_literals(context, a):
if a > 5:
return "foo"
else:
return "bar"
ship_udf(ic, return_two_str_literals, overwrite=True)
ic._cursor.execute('SELECT %s.return_two_str_literals(2)' % ic._temp_db)
results = ic._cursor.fetchall()
assert results[0][0] == 'bar'
ic._cursor.execute('SELECT %s.return_two_str_literals(20)' % ic._temp_db)
results = ic._cursor.fetchall()
assert results[0][0] == 'foo'

def test_return_empty_string(ic):
@udf(StringVal(FunctionContext, StringVal))
def return_empty_string(context, a):
return ""
ship_udf(ic, return_empty_string, overwrite=True)
ic._cursor.execute('SELECT %s.return_empty_string("blah")' % ic._temp_db)
results = ic._cursor.fetchall()
assert results[0][0] == ''

def test_string_eq(ic):
@udf(BooleanVal(FunctionContext, StringVal))
def string_eq(context, a):
if a == "foo":
return True
elif a == "bar":
return False
else:
return None
ship_udf(ic, string_eq, overwrite=True)
ic._cursor.execute('SELECT %s.string_eq("foo")' % ic._temp_db)
results = ic._cursor.fetchall()
assert results[0][0] == True
ic._cursor.execute('SELECT %s.string_eq("bar")' % ic._temp_db)
results = ic._cursor.fetchall()
assert results[0][0] == False
ic._cursor.execute('SELECT %s.string_eq("baz")' % ic._temp_db)
results = ic._cursor.fetchall()
assert results[0][0] is None

def test_string_len(ic):
@udf(IntVal(FunctionContext, StringVal))
def string_len(context, a):
return len(a)
ship_udf(ic, string_len, overwrite=True)
ic._cursor.execute('SELECT %s.string_len("australopithecus")' % ic._temp_db)
results = ic._cursor.fetchall()
assert results[0][0] == 16

def test_string_split_comma(ic):
@udf(StringVal(FunctionContext, StringVal))
def string_split_comma(context, a):
return string.split(a, ",")[1]
ship_udf(ic, string_split_comma, overwrite=True)
ic._cursor.execute('SELECT %s.string_split_comma("foo,bar")' % ic._temp_db)
results = ic._cursor.fetchall()
assert results[0][0] == 'bar'

def test_string_indexing(ic):
@udf(StringVal(FunctionContext, StringVal, IntVal))
def string_indexing(context, a, b):
return a[b]
ship_udf(ic, string_indexing, overwrite=True)
ic._cursor.execute('SELECT %s.string_indexing("foo", 1)' % ic._temp_db)
results = ic._cursor.fetchall()
assert results[0][0] == 'o'

def test_string_index_concat(ic):
@udf(StringVal(FunctionContext, StringVal))
def string_index_concat(context, a):
return a[0] + a[3]
ship_udf(ic, string_index_concat, overwrite=True)
ic._cursor.execute('SELECT %s.string_index_concat("money")' % ic._temp_db)
results = ic._cursor.fetchall()
assert results[0][0] == 'me'

def test_string_concat(ic):
@udf(StringVal(FunctionContext, StringVal, StringVal))
def string_concat(context, a, b):
return a + b
ship_udf(ic, string_concat, overwrite=True)
ic._cursor.execute('SELECT %s.string_concat("howdy ", "doody")' % ic._temp_db)
results = ic._cursor.fetchall()
assert results[0][0] == 'howdy doody'




















14 changes: 8 additions & 6 deletions impala/udf/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
from __future__ import absolute_import

import os
import pkgutil

import llvm.core as lc
from numba import sigutils
Expand Down Expand Up @@ -58,10 +59,13 @@ def __init__(self, pyfunc, signature):
llvm_func = impala_targets.finalize(self._cres.llvm_func, return_type,
args)
self.llvm_func = llvm_func
numba_module = llvm_func.module
self.llvm_module = lc.Module.new(self.name)
self.llvm_module.link_in(numba_module)
self.llvm_module.link_in(impala_targets.precompiled_module)
# numba_module = llvm_func.module
self.llvm_module = llvm_func.module
# link in the precompiled module
# bc it's destructive, load a fresh version
precompiled = lc.Module.from_bitcode(
pkgutil.get_data("impala.udf", "precompiled/impyla.bc"))
self.llvm_module.link_in(precompiled)


# functionality to ship code to Impala cluster
Expand All @@ -77,8 +81,6 @@ def __init__(self, pyfunc, signature):
'StringVal': 'STRING',
'TimestampVal': 'TIMESTAMP'}

# TODO: in the future, consider taking an "ImpalaContext" if there is some info I
# want to store. But this could also all be in the cursor object potentially
try:
from pywebhdfs.webhdfs import PyWebHdfsClient

Expand Down
11 changes: 5 additions & 6 deletions impala/udf/abi.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,11 @@

import llvm.core as lc

from .types import (AnyVal, BooleanVal, TinyIntVal, SmallIntVal, IntVal,
BigIntVal, FloatVal, DoubleVal, StringVal)
from .impl_utils import (BooleanValStruct, TinyIntValStruct, SmallIntValStruct,
IntValStruct, BigIntValStruct, FloatValStruct,
DoubleValStruct, StringValStruct)
from .impl_utils import _get_is_null, _set_is_null
from impala.udf.types import (AnyVal, BooleanVal, TinyIntVal, SmallIntVal,
IntVal, BigIntVal, FloatVal, DoubleVal, StringVal)
from impala.udf.impl_utils import (BooleanValStruct, TinyIntValStruct,
SmallIntValStruct, IntValStruct, BigIntValStruct, FloatValStruct,
DoubleValStruct, StringValStruct, _get_is_null, _set_is_null)


class ABIHandling(object):
Expand Down
18 changes: 17 additions & 1 deletion impala/udf/impl_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,19 @@
# See the License for the specific language governing permissions and
# limitations under the License.

from __future__ import absolute_import

import pkgutil

import llvm.core as lc
from numba import types as ntypes
from numba import cgutils

from .types import AnyVal
from impala.udf.types import AnyVal

# load the Impala UDF types from the precompiled module
precompiled = lc.Module.from_bitcode(
pkgutil.get_data("impala.udf", "precompiled/impyla.bc"))

# struct access utils

Expand All @@ -42,45 +49,54 @@ def _set_is_null(builder, val, is_null):
# Impala *Val struct impls

class AnyValStruct(cgutils.Structure):
_name = "struct.impala_udf::AnyVal"
_fields = [('is_null', ntypes.boolean)]


class BooleanValStruct(cgutils.Structure):
_name = "struct.impala_udf::BooleanVal"
_fields = [('parent', AnyVal),
('val', ntypes.int8),]


class TinyIntValStruct(cgutils.Structure):
_name = "struct.impala_udf::TinyIntVal"
_fields = [('parent', AnyVal),
('val', ntypes.int8),]


class SmallIntValStruct(cgutils.Structure):
_name = "struct.impala_udf::SmallIntVal"
_fields = [('parent', AnyVal),
('val', ntypes.int16),]


class IntValStruct(cgutils.Structure):
_name = "struct.impala_udf::IntVal"
_fields = [('parent', AnyVal),
('val', ntypes.int32),]


class BigIntValStruct(cgutils.Structure):
_name = "struct.impala_udf::BigIntVal"
_fields = [('parent', AnyVal),
('val', ntypes.int64),]


class FloatValStruct(cgutils.Structure):
_name = "struct.impala_udf::FloatVal"
_fields = [('parent', AnyVal),
('val', ntypes.float32),]


class DoubleValStruct(cgutils.Structure):
_name = "struct.impala_udf::DoubleVal"
_fields = [('parent', AnyVal),
('val', ntypes.float64),]


class StringValStruct(cgutils.Structure):
_name = "struct.impala_udf::StringVal"
_fields = [('parent', AnyVal),
('len', ntypes.int32),
('ptr', ntypes.CPointer(ntypes.uint8))]
Expand Down
Loading

0 comments on commit 7120e8a

Please sign in to comment.