Skip to content

Commit

Permalink
adding druid hook and operator
Browse files Browse the repository at this point in the history
  • Loading branch information
mvfast authored and mistercrunch committed Jul 13, 2015
1 parent 19fef41 commit da43a94
Show file tree
Hide file tree
Showing 4 changed files with 151 additions and 23 deletions.
102 changes: 89 additions & 13 deletions airflow/hooks/druid_hook.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@

from airflow.hooks.base_hook import BaseHook


class DruidHook(BaseHook):
'''
Interact with druid.
Expand All @@ -20,30 +19,107 @@ def __init__(
):
self.druid_query_conn_id = druid_query_conn_id
self.druid_ingest_conn_id = druid_ingest_conn_id
self.header = {'content-type': 'application/json'}

@property
def domain_port(self):
pass

def get_conn(self):
"""
Returns a druid connection object
Returns a druid connection object for query
"""
conn = self.get_connection(self.druid_query_conn_id)
client = pydruid.client.PyDruid(
"http://{conn.host}:{conn.port}".format(**locals()),
conn.extra_dejson.get('enpoint', ''))
conn.extra_dejson.get('endpoint', ''))
return client

def load_from_hdfs(self, hdfs_uri):
@property
def ingest_post_url(self):
conn = self.get_connection(self.druid_ingest_conn_id)
host = conn.host
port = conn.port
endpoint = conn.extra_dejson.get('endpoint', '')
return "http://{host}:{port}/{endpoint}".format(**locals())

def get_ingest_status_url(self, task_id):
post_url = self.ingest_post_url
return "{post_url}/{task_id}/status".format(**locals())

def construct_ingest_query(self, datasource, static_path,
ts_dim, dimensions, metric_spec):
ingest_query_dict = {
"type": "index_hadoop",
"spec": {
"dataSchema": {
"metricsSpec": metric_spec,
"granularitySpec": {
"queryGranularity": "NONE",
"intervals": ["1901-01-01/2040-05-25"],
"type": "uniform",
"segmentGranularity": "DAY"
},
"parser": {
"type": "string",
"parseSpec": {
"dimensionsSpec": {
"dimensionExclusions": [],
"dimensions": dimensions, # list of names
"spatialDimensions": []
},
"timestampSpec": {
"column": ts_dim,
"format": "auto"
},
"format": "tsv"
}
},
"dataSource": datasource
},
"tuningConfig": {
"type": "hadoop"
},
"ioConfig": {
"inputSpec": {
"paths": static_path,
"type": "static"
},
"type": "hadoop"
}
}
}

return json.dumps(ingest_query_dict)


def send_ingest_query(self, datasource, static_path, ts_dim,
dimensions, metric_spec):
query = self.construct_ingest_query(datasource, static_path,
ts_dims, dimensions, metric_spec)
r = requests.post(self.ingest_post_url, headers=self.header,
data=query)
d = json.loads(r.text)
if "task" not in d:
raise AirflowException("[Error]: Ingesting data to druid failed.")
return d["task"]


def load_from_hdfs(self, datasource, static_path, ts_dim,
dimensions, metric_spec=None):
"""
load data to druid from hdfs
:params ts_dim: The column name to use as a timestamp
:params dimensions: A list of column names to use as dimension
:params metric_spec: A list of dictionaries
"""
conn = self.get_connection(self.druid_query_conn_id)
endpoint = conn.extra_dejson.get('enpoint', '')
j = requests.post(conn.host, hdfs_uri)
task_id = send_ingest_query(datasource, static_path, ts_dim,
dimensions, metric_spec)
status_url = self.get_ingest_status_url(task_id)
while True:
response = request(uri + '', j)
if "success" in request.post(task_id).content:
r = requests.get(status_url)
d = json.loads(r.text)
if d['status']['status'] == 'FAILED':
raise AirflowException(
"[Error]: Ingesting data to druid failed.")
elif d['status']['status'] == 'SUCCESS':
break
time.sleep(5)
time.sleep(30)

51 changes: 42 additions & 9 deletions airflow/operators/hive_to_druid.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,48 +7,81 @@

class HiveToDruidTransfer(BaseOperator):
"""
Moves data from Hive to Druid, note that for now the data is loaded
Moves data from Hive to Druid, [del]note that for now the data is loaded
into memory before being pushed to Druid, so this operator should
be used for smallish amount of data.
be used for smallish amount of data.[/del]
:param sql: SQL query to execute against the Druid database
:type sql: str
:param druid_datasource: the datasource you want to ingest into in druid
:type druid_datasource: str
:param ts_dim: the timestamp dimension
:type ts_dim: str
:param metric_spec: the metrics you want to define for your data
:type metric_spec: list
:param hive_cli_conn_id: the hive connection id
:type hive_cli_conn_id: str
:param druid_ingest_conn_id: the druid ingest connection id
:type druid_ingest_conn_id: str
:param metastore_conn_id: the metastore connection id
:type metastore_conn_id: str
"""

template_fields = ('sql', 'druid_table')
template_ext = ('.sql',)
ui_color = '#a0e08c'
#template_fields = ('sql', 'druid_table')
#template_ext = ('.sql',)
#ui_color = '#a0e08c'

@apply_defaults
def __init__(
self,
sql,
druid_datasource,
ts_dim,
metric_spec=None,
hive_cli_conn_id='hiveserver2_default',
druid_ingest_conn_id='druid_ingest_default',
metastore_conn_id='metastore_default',
*args, **kwargs):
super(HiveToDruidTransfer, self).__init__(*args, **kwargs)
self.sql = sql
self.druid_datasource = druid_datasource
self.ts_dim = ts_dim
self.metric_spec = metric_spec
self.hive_cli_conn_id = hive_cli_conn_id
self.druid_ingest_conn_id = druid_ingest_conn_id
self.metastore_conn_id = metastore_conn_id



def execute(self, context):
hive = HiveServer2Hook(hiveserver2_conn_id=self.hive_cli_conn_id)
logging.info("Extracting data from Hive")
hive_table = (
'tmp.druid__' + context['task_instance'].task_instance_key_str)
hql = hql.strip().strip(';')
sql = """\
CREATE EXTERNAL TABLE {hive_table}
sql = sql.strip().strip(';')
set_output_compressed_false = "\
set mapred.output.compress=false; \
set hive.exec.compress.output=false;"
hql = """\
{set_output_compressed_false}
CREATE TABLE {hive_table}
STORED AS TEXTFILE AS
{sql};
""".format(**locals())
m = HiveMetastoreHook(self.metastore_conn_id)
t = m.get_table(hive_table)

dimensions = [col.name for col in t.sd.cols]
for metric in metric_spec:
dimensions.remove(metric['fieldName'])

hdfs_uri = m.get_table(hive_table).sd.location
pos = hdfs_uri.find('/user')
static_path = hdfs_uri[pos:]

hive.run(self.sql)
druid = DruidHook(druid_ingest_conn_id=self.druid_ingest_conn_id)
logging.info("Inserting rows into Druid")
druid.load_from_hdfs(table=self.druid_datasource, hdfs_uri=hdfs_uri)
druid.load_from_hdfs(datasource=self.druid_datasource,
static_path=static_path, ts_dim=self.ts_dim,
dimensions=dimensions, metric_spec=self.metric_spec)
19 changes: 19 additions & 0 deletions dags/testdruid.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
from airflow.operators import HiveToDruidTransfer
from airflow import DAG
from datetime import datetime

args = {
'owner': 'qi_wang',
'start_date': datetime(2015, 4, 4),
}

dag = DAG("test_druid", default_args=args)


HiveToDruidTransfer(task_id="load_dummy_test",
sql="select * from qi.druid_test_dataset_w_platform_1 \
limit 10;",
druid_datasource="airflow_test",
ts_dim="ds",
dag=dag
)
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
postgres = ['psycopg2>=2.6']
optional = ['librabbitmq>=1.6.1']
samba = ['pysmbclient>=0.1.3']
druid = ['druid>=0.2.1']
pydruid = ['pydruid>=0.2.1']
s3 = ['boto>=2.36.0']

all_dbs = postgres + mysql + hive
Expand Down

0 comments on commit da43a94

Please sign in to comment.