Skip to content

Commit

Permalink
[IMPYLA-65] Add Kerberos support for WebHDFS
Browse files Browse the repository at this point in the history
Fixes cloudera#65

Switch from PyWebHDFS to hdfs for kerberos support

Corrected list dir in response to mtth/hdfs#14

Added kerberos support to the tests

Added sasl install to jenkins script
  • Loading branch information
laserson committed Apr 7, 2015
1 parent f22a5b2 commit 97d4860
Show file tree
Hide file tree
Showing 11 changed files with 74 additions and 42 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ Required for `BigDataFrame`:

Required for utilizing automated shipping/registering of code/UDFs/BDFs/etc:

* `pywebhdfs`
* `hdfs[kerberos]` (a Python client that wraps WebHDFS; kerberos is optional)

For manipulating results as pandas `DataFrame`s, we recommend installing pandas
regardless.
Expand Down
9 changes: 5 additions & 4 deletions bin/register-impala-udfs.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
import argparse

import llvm.core as lc
from pywebhdfs.webhdfs import PyWebHdfsClient
from hdfs.client import InsecureClient

import impala.dbapi

Expand Down Expand Up @@ -98,9 +98,10 @@ def log(msg):
pass

# transfer the LLVM module to HDFS
hdfs_client = PyWebHdfsClient(host=args.nn_host, port=args.webhdfs_port,
user_name=args.user)
hdfs_client.create_file(args.hdfs_path.lstrip('/'), bc, overwrite=args.force)
url = 'http://{nn_host}:{webhdfs_port}'.format(
nn_host=args.nn_host, webhdfs_port=args.webhdfs_port)
hdfs_client = InsecureClient(url, user=args.user)
hdfs_client.write(args.hdfs_path, bc, overwrite=args.force)
log("Transferred LLVM IR to HDFS at %s" % args.hdfs_path)

# register the functions with impala
Expand Down
4 changes: 2 additions & 2 deletions impala/bdf.py
Original file line number Diff line number Diff line change
Expand Up @@ -146,8 +146,8 @@ def from_pandas(ic, df, table=None, path=None, method='in_query',
df.to_csv(raw_data, sep=field_terminator,
line_terminator=line_terminator, quoting=csv.QUOTE_NONE,
escapechar=escape_char, header=False, index=False)
hdfs_client.create_file(
os.path.join(path, 'data.txt').lstrip('/'), raw_data.getvalue(),
hdfs_client.write(
os.path.join(path, 'data.txt'), raw_data.getvalue(),
overwrite=overwrite)
raw_data.close()
else:
Expand Down
32 changes: 22 additions & 10 deletions impala/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,11 @@ def __init__(self, temp_dir=None, temp_db=None, nn_host=None,
suffix if temp_db is None else temp_db)
self._conn = connect(*args, **kwargs)
self._cursor = self._conn.cursor()
# used for pywebhdfs cleanup of temp dir; not required
# used for webhdfs cleanup of temp dir; not required
self._nn_host = nn_host
self._webhdfs_port = webhdfs_port
self._hdfs_user = hdfs_user
self._kerberized = self._conn.kerberized()
if temp_db is None:
self._cursor.execute("CREATE DATABASE %s LOCATION '%s'" %
(self._temp_db, self._temp_dir))
Expand Down Expand Up @@ -69,25 +70,36 @@ def close(self):
self._temp_db, uda))
self._cursor.execute('USE default')
self._cursor.execute('DROP DATABASE IF EXISTS %s' % self._temp_db)
# drop the temp dir in HDFS
# The DROP DATABASE command deletes the associated directory if it
# didn't already exist. But if it did, we delete it here using WebHDFS
try:
from requests.exceptions import ConnectionError
from hdfs.util import HdfsError
hdfs_client = self.hdfs_client()
hdfs_client.delete_file_dir(self._temp_dir.lstrip('/'),
recursive=True)
try:
# generates an exception if file doesn't exist
_ = hdfs_client.status(self._temp_dir)
hdfs_client.delete(self._temp_dir, recursive=True)
except HdfsError:
pass
except ImportError:
import sys
sys.stderr.write("Could not import requests or pywebhdfs. "
sys.stderr.write("Could not import requests or hdfs. "
"You must delete the temporary directory "
"manually: %s" % self._temp_dir)
except ConnectionError:
import sys
sys.stderr.write("Could not connect via pywebhdfs. "
sys.stderr.write("Could not connect via webhdfs. "
"You must delete the temporary directory "
"manually: %s" % self._temp_dir)

def hdfs_client(self):
from pywebhdfs.webhdfs import PyWebHdfsClient
return PyWebHdfsClient(
host=self._nn_host, port=self._webhdfs_port,
user_name=self._hdfs_user)
url = 'http://{nn_host}:{webhdfs_port}'.format(
nn_host=self._nn_host, webhdfs_port=self._webhdfs_port)
if self._kerberized:
from hdfs.ext.kerberos import KerberosClient
client = KerberosClient(url, mutual_auth='REQUIRED')
else:
from hdfs.client import InsecureClient
client = InsecureClient(url, user=self._hdfs_user)
return client
8 changes: 8 additions & 0 deletions impala/dbapi/interface.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,14 @@ def cursor(self, session_handle=None, user=None, configuration=None):
def reconnect(self):
raise NotImplementedError

def kerberized(self):
# returns bool whether underlying service is kerberized or not
from impala.thrift_sasl import TSaslClientTransport
if isinstance(self.service._iprot.trans, TSaslClientTransport):
if self.service._iprot.trans.mechanism == 'GSSAPI':
return True
return False

def __enter__(self):
return self

Expand Down
24 changes: 15 additions & 9 deletions impala/tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,15 @@ def protocol():
return 'hiveserver2'


@fixture(scope='session')
def use_kerberos():
if 'USE_KERBEROS' in os.environ:
return os.environ['USE_KERBEROS'].lower() == 'true'
else:
sys.stderr.write("USE_KERBEROS not set; using default 'False'")
return False


@fixture(scope='session')
def nn_host():
if 'NAMENODE_HOST' in os.environ:
Expand Down Expand Up @@ -104,15 +113,14 @@ def temp_db():

@fixture(scope='session')
def ic(request, temp_hdfs_dir, temp_db, nn_host, webhdfs_port, hdfs_user, host,
port, protocol):
port, protocol, use_kerberos):
"""Provides an ImpalaContext"""
from impala.context import ImpalaContext

ctx = ImpalaContext(temp_dir=temp_hdfs_dir, temp_db=temp_db,
nn_host=nn_host,
webhdfs_port=webhdfs_port, hdfs_user=hdfs_user,
host=host,
port=port, protocol=protocol)
nn_host=nn_host, webhdfs_port=webhdfs_port,
hdfs_user=hdfs_user, host=host, port=port,
protocol=protocol, use_kerberos=use_kerberos)

def fin():
ctx.close()
Expand All @@ -123,12 +131,10 @@ def fin():

@fixture(scope='session')
def hdfs_client(ic):
pywebhdfs = importorskip('pywebhdfs')
hdfs = importorskip('hdfs')
if ic._nn_host is None:
skip("NAMENODE_HOST not set; skipping...")

hdfs_client = ic.hdfs_client()
return hdfs_client
return ic.hdfs_client()


@fixture(scope='session')
Expand Down
2 changes: 1 addition & 1 deletion impala/tests/test_bdf.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ def test_from_hdfs(ic, hdfs_client):
raw_data = pkgutil.get_data('impala.tests', 'data/iris.data')
dir_ = os.path.join(ic._temp_dir, 'test_small_data_dir')
file_ = os.path.join(dir_, 'iris.data')
hdfs_client.create_file(file_.lstrip('/'), raw_data)
hdfs_client.write(file_, raw_data)
schema = [('a', 'DOUBLE'), ('b', 'DOUBLE'), ('c', 'DOUBLE'),
('d', 'DOUBLE'), ('e', 'STRING')]
bdf = from_hdfs(ic, dir_, schema)
Expand Down
22 changes: 11 additions & 11 deletions impala/tests/test_context.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,12 @@
from impala.bdf import from_pandas


def test_context_cleanup(host, port, protocol, ic, hdfs_client):
def test_context_cleanup(host, port, protocol, use_kerberos, ic, hdfs_client):
# create a *new* ImpalaContext
ctx = ImpalaContext(temp_dir=None, temp_db=None, nn_host=ic._nn_host,
webhdfs_port=ic._webhdfs_port, hdfs_user=ic._hdfs_user,
host=host, port=port, protocol=protocol)
host=host, port=port, protocol=protocol,
use_kerberos=use_kerberos)

# check that the database was created
ctx._cursor.execute('SHOW DATABASES')
Expand All @@ -49,13 +50,12 @@ def test_context_cleanup(host, port, protocol, ic, hdfs_client):

# check that the temporary directory was created
# (raises FileNotFound on failure)
assert hdfs_client.get_file_dir_status(ctx._temp_dir.lstrip('/'))
assert hdfs_client.status(ctx._temp_dir)

# check that the corresponding data file has the correct size
listing = hdfs_client.list_dir(
os.path.join(ctx._temp_dir, table_name).lstrip('/'))
statuses = listing['FileStatuses']['FileStatus']
sizes = [s['length'] for s in statuses if s['type'] == 'FILE']
table_path = os.path.join(ctx._temp_dir, table_name)
sizes = [s['length'] for (_, s) in hdfs_client.list(table_path)
if s['type'] == 'FILE']
assert len(sizes) == 1
assert sizes[0] == 20

Expand All @@ -68,7 +68,7 @@ def test_context_cleanup(host, port, protocol, ic, hdfs_client):

# check that the temp dir was deleted
# I know this is importable because this test depends on hdfs_client, which
# skips if pywebhdfs is not available
from pywebhdfs.errors import FileNotFound
with pytest.raises(FileNotFound):
assert hdfs_client.get_file_dir_status(ctx._temp_dir.lstrip('/'))
# skips if hdfs is not available
from hdfs.util import HdfsError
with pytest.raises(HdfsError):
assert hdfs_client.status(ctx._temp_dir)
6 changes: 5 additions & 1 deletion impala/tests/test_dbapi_compliance.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,13 +36,17 @@
if 'IMPALA_PROTOCOL' not in os.environ:
print >>sys.stderr, ("Using default Impala protocol of 'hiveserver2', or "
"set IMPALA_PROTOCOL env variable")
if 'USE_KERBEROS' not in os.environ:
print >>sys.stderr, ("Set USE_KERBEROS=True if you want to use Kerberos")
host = os.environ['IMPALA_HOST']
port = int(os.environ.get('IMPALA_PORT', 21050))
protocol = os.environ.get('IMPALA_PROTOCOL', 'hiveserver2')
use_kerberos = os.environ.get('USE_KERBEROS', 'False').lower() == 'true'

connect_kw_args = {'host': host,
'port': port,
'protocol': protocol}
'protocol': protocol,
'use_kerberos': use_kerberos}

@pytest.mark.dbapi_compliance
class ImpalaDBAPI20Test(_dbapi20_tests.DatabaseAPI20Test):
Expand Down
4 changes: 2 additions & 2 deletions impala/udf/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ def ship_udf(ic, function, hdfs_path=None, udf_name=None, database=None,
hdfs_path = os.path.join(ic._temp_dir, udf_name + '.ll')
if not hdfs_path.endswith('.ll'):
raise ValueError("The HDFS file name must end with .ll")
hdfs_client.create_file(hdfs_path.lstrip('/'), ir, overwrite=overwrite)
hdfs_client.write(hdfs_path, ir, overwrite=overwrite)

# register the function in Impala
if database is None:
Expand All @@ -114,5 +114,5 @@ def ship_udf(ic, function, hdfs_path=None, udf_name=None, database=None,
ic._cursor.execute(register_query)

except ImportError:
print ("Failed to import pywebhdfs; you must ship your "
print ("Failed to import hdfs; you must ship your "
"Python UDFs manually.")
3 changes: 2 additions & 1 deletion jenkins/run-all.sh
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,11 @@ VENV_NAME=$JOB_NAME-pyvenv-$BUILD_NUMBER
cd /tmp && virtualenv $VENV_NAME && source $VENV_NAME/bin/activate
pip install pytest
pip install thrift
pip install sasl
pip install unittest2
pip install numpy
pip install pandas
pip install pywebhdfs
pip install hdfs[kerberos]
if [ "$NUMBA_VERSION" == "master" ]; then
pip install git+https://github.com/llvmpy/llvmpy.git@master
pip install git+https://github.com/numba/numba.git@master
Expand Down

0 comments on commit 97d4860

Please sign in to comment.