Skip to content

Commit

Permalink
Working on HiveHook
Browse files Browse the repository at this point in the history
  • Loading branch information
LDAP/maxime_beauchemin committed Oct 14, 2014
1 parent e3026ff commit 1e2fa2e
Show file tree
Hide file tree
Showing 4 changed files with 77 additions and 0 deletions.
Empty file added flux/hooks/hive/__init__.py
Empty file.
75 changes: 75 additions & 0 deletions flux/hooks/hive/hive_hook.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
import logging
import sys
import os
from flux import settings

# Adding the Hive python libs to python path
sys.path.insert(0, settings.HIVE_HOME_PY)

from thrift import Thrift
from thrift.transport import TSocket
from thrift.transport import TTransport
from thrift.protocol import TBinaryProtocol
from hive_metastore import ThriftHiveMetastore
from hive_service import ThriftHive

from flux.hooks.base_hook import BaseHook

METASTORE_THRIFT_HOST = "localhost"
METASTORE_THRIFT_PORT = 10000


class HiveHook(BaseHook):

def __init__(self, hive_dbid=None):

# Connection to Hive
transport = TSocket.TSocket(
METASTORE_THRIFT_HOST, METASTORE_THRIFT_PORT)
self.transport = TTransport.TBufferedTransport(transport)
protocol = TBinaryProtocol.TBinaryProtocol(self.transport)
self.hive = ThriftHive.Client(protocol)

def get_conn(self):
self.transport.open()
return self.hive

def check_for_partition(self, schema, table, filter):
try:
self.transport.open()
partitions = self.hive.get_partitions_by_filter(
schema, table, filter, 1)
self.transport.close()
if partitions:
return True
else:
return False
except Exception as e:
logging.error(
"Metastore down? Activing as if partition doesn't "
"exist to be safe...")
return False

def get_records(self, hql, schema=None):
self.transport.open()
if schema:
self.hive.execute("USE " + schema)
self.hive.execute(hql)
records = self.hive.fetchAll()
self.transport.close()
return [row.split("\t") for row in records]

def run(self, hql, schema=None):
self.transport.open()
if schema:
self.hive.execute("USE " + schema)
self.hive.execute(hql)
self.transport.close()

if __name__ == "__main__":
hh = HiveHook()
hql = "SELECT * FROM fct_nights_booked WHERE ds='2014-10-01' LIMIT 2"
print hh.get_records(schema="core_data", hql=hql)

print "Checking for partition:" + str(hh.check_for_partition(
schema="core_data", table="fct_nights_booked", filter="ds=2014-10-01"))
1 change: 1 addition & 0 deletions flux/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
sys.path.append(BASE_FOLDER)
DAGS_FOLDER = FLUX_HOME + '/dags'
BASE_LOG_FOLDER = FLUX_HOME + "/logs"
HIVE_HOME_PY = '/usr/lib/hive/lib/py'
RUN_AS_MASTER = True
JOB_HEARTBEAT_SEC = 5
ID_LEN = 250 # Used for dag_id and task_id VARCHAR length
Expand Down
1 change: 1 addition & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -12,3 +12,4 @@ python-dateutil
sphinx
sphinx_rtd_theme
sqlalchemy
thrift

0 comments on commit 1e2fa2e

Please sign in to comment.