Skip to content

Commit

Permalink
add a parameter for number of shard in batch ingestion
Browse files Browse the repository at this point in the history
  • Loading branch information
Hongbo Zeng committed Jan 8, 2016
1 parent 2107dc9 commit 4b29e8a
Show file tree
Hide file tree
Showing 2 changed files with 13 additions and 6 deletions.
15 changes: 10 additions & 5 deletions airflow/hooks/druid_hook.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ def get_ingest_status_url(self, task_id):

def construct_ingest_query(
self, datasource, static_path, ts_dim, columns, metric_spec,
intervals, hadoop_dependency_coordinates=None):
intervals, num_shards, hadoop_dependency_coordinates=None):
"""
Builds an ingest query for an HDFS TSV load.
Expand Down Expand Up @@ -98,6 +98,11 @@ def construct_ingest_query(
"mapreduce.map.output.compress": "false",
"mapreduce.output.fileoutputformat.compress": "false",
},
"partitionsSpec" : {
"type" : "hashed",
"targetPartitionSize" : -1
"numShards" : num_shards
},
},
"ioConfig": {
"inputSpec": {
Expand All @@ -116,10 +121,10 @@ def construct_ingest_query(

def send_ingest_query(
self, datasource, static_path, ts_dim, columns, metric_spec,
intervals, hadoop_dependency_coordinates=None):
intervals, num_shards, hadoop_dependency_coordinates=None):
query = self.construct_ingest_query(
datasource, static_path, ts_dim, columns,
metric_spec, intervals, hadoop_dependency_coordinates)
metric_spec, intervals, num_shards, hadoop_dependency_coordinates)
r = requests.post(
self.ingest_post_url, headers=self.header, data=query)
logging.info(self.ingest_post_url)
Expand All @@ -133,15 +138,15 @@ def send_ingest_query(

def load_from_hdfs(
self, datasource, static_path, ts_dim, columns,
intervals, metric_spec=None, hadoop_dependency_coordinates=None):
intervals, num_shards, metric_spec=None, hadoop_dependency_coordinates=None):
"""
load data to druid from hdfs
:params ts_dim: The column name to use as a timestamp
:params metric_spec: A list of dictionaries
"""
task_id = self.send_ingest_query(
datasource, static_path, ts_dim, columns, metric_spec,
intervals, hadoop_dependency_coordinates)
intervals, num_shards, hadoop_dependency_coordinates)
status_url = self.get_ingest_status_url(task_id)
while True:
r = requests.get(status_url)
Expand Down
4 changes: 3 additions & 1 deletion airflow/operators/hive_to_druid.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,12 +49,14 @@ def __init__(
metastore_conn_id='metastore_default',
hadoop_dependency_coordinates=None,
intervals=None,
num_shards=1,
*args, **kwargs):
super(HiveToDruidTransfer, self).__init__(*args, **kwargs)
self.sql = sql
self.druid_datasource = druid_datasource
self.ts_dim = ts_dim
self.intervals = intervals or ['{{ ds }}/{{ tomorrow_ds }}']
self.num_shards = num_shards
self.metric_spec = metric_spec or [{
"name": "count",
"type": "count"}]
Expand Down Expand Up @@ -101,7 +103,7 @@ def execute(self, context):
datasource=self.druid_datasource,
intervals=self.intervals,
static_path=static_path, ts_dim=self.ts_dim,
columns=columns, metric_spec=self.metric_spec,
columns=columns, num_shards=self.num_shards, metric_spec=self.metric_spec,
hadoop_dependency_coordinates=self.hadoop_dependency_coordinates)
logging.info("Load seems to have succeeded!")

Expand Down

0 comments on commit 4b29e8a

Please sign in to comment.