Skip to content

Commit

Permalink
[AIRFLOW-1188] Add max_bad_records param to GoogleCloudStorageToBigQu…
Browse files Browse the repository at this point in the history
…eryOperator

Closes apache#2286 from ckpklos/master
  • Loading branch information
ckpklos authored and criccomini committed May 10, 2017
1 parent 72cf07b commit 443e6b2
Show file tree
Hide file tree
Showing 2 changed files with 13 additions and 0 deletions.
7 changes: 7 additions & 0 deletions airflow/contrib/hooks/bigquery_hook.py
Original file line number Diff line number Diff line change
Expand Up @@ -369,6 +369,7 @@ def run_load(self,
skip_leading_rows=0,
write_disposition='WRITE_EMPTY',
field_delimiter=',',
max_bad_records=0,
schema_update_options=()):
"""
Executes a BigQuery load command to load data from Google Cloud Storage
Expand Down Expand Up @@ -400,6 +401,9 @@ def run_load(self,
:type write_disposition: string
:param field_delimiter: The delimiter to use when loading from a CSV.
:type field_delimiter: string
:param max_bad_records: The maximum number of bad records that BigQuery can
ignore when running the job.
:type max_bad_records: int
:param schema_update_options: Allows the schema of the desitination
table to be updated as a side effect of the load job.
:type schema_update_options: list
Expand Down Expand Up @@ -473,6 +477,9 @@ def run_load(self,
configuration['load']['skipLeadingRows'] = skip_leading_rows
configuration['load']['fieldDelimiter'] = field_delimiter

if max_bad_records:
configuration['load']['maxBadRecords'] = max_bad_records

return self.run_with_configuration(configuration)

def run_with_configuration(self, configuration):
Expand Down
6 changes: 6 additions & 0 deletions airflow/contrib/operators/gcs_to_bq.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ def __init__(
skip_leading_rows=0,
write_disposition='WRITE_EMPTY',
field_delimiter=',',
max_bad_records=0,
max_id_key=None,
bigquery_conn_id='bigquery_default',
google_cloud_storage_conn_id='google_cloud_storage_default',
Expand Down Expand Up @@ -80,6 +81,9 @@ def __init__(
:type write_disposition: string
:param field_delimiter: The delimiter to use when loading from a CSV.
:type field_delimiter: string
:param max_bad_records: The maximum number of bad records that BigQuery can
ignore when running the job.
:type max_bad_records: int
:param max_id_key: If set, the name of a column in the BigQuery table
that's to be loaded. Thsi will be used to select the MAX value from
BigQuery after the load occurs. The results will be returned by the
Expand Down Expand Up @@ -115,6 +119,7 @@ def __init__(
self.skip_leading_rows = skip_leading_rows
self.write_disposition = write_disposition
self.field_delimiter = field_delimiter
self.max_bad_records = max_bad_records

self.max_id_key = max_id_key
self.bigquery_conn_id = bigquery_conn_id
Expand Down Expand Up @@ -150,6 +155,7 @@ def execute(self, context):
skip_leading_rows=self.skip_leading_rows,
write_disposition=self.write_disposition,
field_delimiter=self.field_delimiter,
max_bad_records=self.max_bad_records,
schema_update_options=self.schema_update_options)

if self.max_id_key:
Expand Down

0 comments on commit 443e6b2

Please sign in to comment.