Skip to content

Commit

Permalink
Upgrading pydruid version and adopt 'merge' flag during refresh_druid…
Browse files Browse the repository at this point in the history
… operation (apache#1879)

* Initial

* rewrite some line to make it short and setting merge variable temporarily

* rewrite commit author

* add emitted attribute

* Fix typo

* fix test error

* fix typo

* test added
  • Loading branch information
dkhwangbo authored and mistercrunch committed Dec 27, 2016
1 parent 1673105 commit 3e6f90c
Show file tree
Hide file tree
Showing 4 changed files with 18 additions and 8 deletions.
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
'markdown==2.6.6',
'pandas==0.18.1',
'parsedatetime==2.0.0',
'pydruid==0.3.0',
'pydruid==0.3.1',
'PyHive>=0.2.1',
'python-dateutil==2.5.3',
'requests==2.10.0',
Expand Down
10 changes: 8 additions & 2 deletions superset/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,13 +82,19 @@ def load_examples(load_test_data):
help=(
"Specify which datasource name to load, if omitted, all "
"datasources will be refreshed"))
def refresh_druid(datasource):
@manager.option(
'-m', '--merge',
help=(
"Specify using 'merge' property during operation. "
"Default value is False "))
def refresh_druid(datasource, merge):
"""Refresh druid datasources"""
session = db.session()
from superset import models
for cluster in session.query(models.DruidCluster).all():
try:
cluster.refresh_datasources(datasource_name=datasource)
cluster.refresh_datasources(datasource_name=datasource,
merge_flag=merge)
except Exception as e:
print(
"Error while processing cluster '{}'\n{}".format(
Expand Down
13 changes: 8 additions & 5 deletions superset/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -1602,7 +1602,7 @@ def get_druid_version(self):
).format(obj=self)
return json.loads(requests.get(endpoint).text)['version']

def refresh_datasources(self, datasource_name=None):
def refresh_datasources(self, datasource_name=None, merge_flag=False):
"""Refresh metadata of all datasources in the cluster
If ``datasource_name`` is specified, only that datasource is updated
Expand All @@ -1611,7 +1611,7 @@ def refresh_datasources(self, datasource_name=None):
for datasource in self.get_datasources():
if datasource not in config.get('DRUID_DATA_SOURCE_BLACKLIST'):
if not datasource_name or datasource_name == datasource:
DruidDatasource.sync_to_db(datasource, self)
DruidDatasource.sync_to_db(datasource, self, merge_flag)

@property
def perm(self):
Expand Down Expand Up @@ -1777,7 +1777,8 @@ def latest_metadata(self):
try:
segment_metadata = client.segment_metadata(
datasource=self.datasource_name,
intervals=lbound + '/' + rbound)
intervals=lbound + '/' + rbound,
merge=self.merge_flag)
except Exception as e:
logging.warning("Failed first attempt to get latest segment")
logging.exception(e)
Expand All @@ -1790,7 +1791,8 @@ def latest_metadata(self):
try:
segment_metadata = client.segment_metadata(
datasource=self.datasource_name,
intervals=lbound + '/' + rbound)
intervals=lbound + '/' + rbound,
merge=self.merge_flag)
except Exception as e:
logging.warning("Failed 2nd attempt to get latest segment")
logging.exception(e)
Expand Down Expand Up @@ -1876,7 +1878,7 @@ def sync_to_db_from_config(cls, druid_config, user, cluster):
session.commit()

@classmethod
def sync_to_db(cls, name, cluster):
def sync_to_db(cls, name, cluster, merge):
"""Fetches metadata for that datasource and merges the Superset db"""
logging.info("Syncing Druid datasource [{}]".format(name))
session = get_session()
Expand All @@ -1889,6 +1891,7 @@ def sync_to_db(cls, name, cluster):
flasher("Refreshing datasource [{}]".format(name), "info")
session.flush()
datasource.cluster = cluster
datasource.merge_flag = merge
session.flush()

cols = datasource.latest_metadata()
Expand Down
1 change: 1 addition & 0 deletions tests/druid_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ def test_client(self, PyDruid):
cluster.get_datasources = Mock(return_value=['test_datasource'])
cluster.get_druid_version = Mock(return_value='0.9.1')
cluster.refresh_datasources()
cluster.refresh_datasources(merge_flag=True)
datasource_id = cluster.datasources[0].id
db.session.commit()

Expand Down

0 comments on commit 3e6f90c

Please sign in to comment.