Skip to content

Commit

Permalink
[AIRFLOW-1439] Add max billing tier for the BQ Hook and Operator
Browse files Browse the repository at this point in the history
Closes apache#2437 from aviDms/master
  • Loading branch information
avram-dames authored and criccomini committed Jul 21, 2017
1 parent b87903d commit f1f022c
Show file tree
Hide file tree
Showing 2 changed files with 13 additions and 3 deletions.
8 changes: 6 additions & 2 deletions airflow/contrib/hooks/bigquery_hook.py
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,8 @@ def run_query(
write_disposition = 'WRITE_EMPTY',
allow_large_results=False,
udf_config = False,
use_legacy_sql=True):
use_legacy_sql=True,
maximum_billing_tier=None):
"""
Executes a BigQuery SQL query. Optionally persists results in a BigQuery
table. See here:
Expand All @@ -216,11 +217,14 @@ def run_query(
:type udf_config: list
:param use_legacy_sql: Whether to use legacy SQL (true) or standard SQL (false).
:type use_legacy_sql: boolean
:param maximum_billing_tier: Positive integer that serves as a multiplier of the basic price.
:type maximum_billing_tier: integer
"""
configuration = {
'query': {
'query': bql,
'useLegacySql': use_legacy_sql
'useLegacySql': use_legacy_sql,
'maximumBillingTier': maximum_billing_tier
}
}

Expand Down
8 changes: 7 additions & 1 deletion airflow/contrib/operators/bigquery_operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,9 @@ class BigQueryOperator(BaseOperator):
:type udf_config: list
:param use_legacy_sql: Whether to use legacy SQL (true) or standard SQL (false).
:type use_legacy_sql: boolean
:param maximum_billing_tier: Positive integer that serves as a multiplier of the basic price.
Defaults to None, in which case it uses the value set in the project.
:type maximum_billing_tier: integer
"""
template_fields = ('bql', 'destination_dataset_table')
template_ext = ('.sql',)
Expand All @@ -57,6 +60,7 @@ def __init__(self,
delegate_to=None,
udf_config=False,
use_legacy_sql=True,
maximum_billing_tier=None,
*args,
**kwargs):
super(BigQueryOperator, self).__init__(*args, **kwargs)
Expand All @@ -68,6 +72,7 @@ def __init__(self,
self.delegate_to = delegate_to
self.udf_config = udf_config
self.use_legacy_sql = use_legacy_sql
self.maximum_billing_tier = maximum_billing_tier

def execute(self, context):
logging.info('Executing: %s', self.bql)
Expand All @@ -76,4 +81,5 @@ def execute(self, context):
conn = hook.get_conn()
cursor = conn.cursor()
cursor.run_query(self.bql, self.destination_dataset_table, self.write_disposition,
self.allow_large_results, self.udf_config, self.use_legacy_sql)
self.allow_large_results, self.udf_config, self.use_legacy_sql,
self.maximum_billing_tier)

0 comments on commit f1f022c

Please sign in to comment.