Skip to content

Commit

Permalink
Add Google cloud storage to BigQuery operator
Browse files Browse the repository at this point in the history
Remeber to do docs

Add a max_key to gcs2gq

Add allow_large_results for BigQuery. Log max ID.

Add documentation

Fix logging line
  • Loading branch information
criccomini committed Feb 25, 2016
1 parent 17bdb48 commit 40834db
Show file tree
Hide file tree
Showing 5 changed files with 235 additions and 7 deletions.
38 changes: 35 additions & 3 deletions airflow/contrib/hooks/bigquery_hook.py
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ def __init__(self, service, project_id):
self.service = service
self.project_id = project_id

def run_query(self, bql, destination_dataset_table = False, write_disposition = 'WRITE_EMPTY'):
def run_query(self, bql, destination_dataset_table = False, write_disposition = 'WRITE_EMPTY', allow_large_results=False):
"""
Executes a BigQuery SQL query. Optionally persists results in a BigQuery
table. See here:
Expand All @@ -164,10 +164,12 @@ def run_query(self, bql, destination_dataset_table = False, write_disposition =
BigQuery table to save the query results.
:param write_disposition: What to do if the table already exists in
BigQuery.
:param allow_large_results: Whether to allow large results.
:type allow_large_results: boolean
"""
configuration = {
'query': {
'query': bql
'query': bql,
}
}

Expand All @@ -176,6 +178,7 @@ def run_query(self, bql, destination_dataset_table = False, write_disposition =
'Expected destination_dataset_table in the format of <dataset>.<table>. Got: {}'.format(destination_dataset_table)
destination_dataset, destination_table = destination_dataset_table.split('.', 1)
configuration['query'].update({
'allowLargeResults': allow_large_results,
'writeDisposition': write_disposition,
'destinationTable': {
'projectId': self.project_id,
Expand Down Expand Up @@ -205,7 +208,7 @@ def run_extract(self, source_dataset_table, destination_cloud_storage_uris, comp
:param compression: Type of compression to use.
:type compression: string
:param export_format: File format to export.
:type field_delimiter: string
:type export_format: string
:param field_delimiter: The delimiter to use when extracting to a CSV.
:type field_delimiter: string
:param print_header: Whether to print a header for a CSV file extract.
Expand Down Expand Up @@ -292,6 +295,34 @@ def run_copy(self, source_dataset_tables, destination_project_dataset_table, wri
return self.run_with_configuration(configuration)

def run_load(self, destination_dataset_table, schema_fields, source_uris, source_format='CSV', create_disposition='CREATE_IF_NEEDED', skip_leading_rows=0, write_disposition='WRITE_EMPTY', field_delimiter=','):
"""
Executes a BigQuery load command to load data from Google Cloud Storage
to BigQuery. See here:
https://cloud.google.com/bigquery/docs/reference/v2/jobs
For more details about these parameters.
:param destination_dataset_table: The dotted <dataset>.<table> BigQuery table to load data into.
:type destination_dataset_table: string
:param schema_fields: The schema field list as defined here:
https://cloud.google.com/bigquery/docs/reference/v2/jobs#configuration.load
:type schema_fields: list
:param source_uris: The source Google Cloud
Storage URI (e.g. gs://some-bucket/some-file.txt). A single wild
per-object name can be used.
:type source_uris: list
:param source_format: File format to export.
:type source_format: string
:param create_disposition: The create disposition if the table doesn't exist.
:type create_disposition: string
:param skip_leading_rows: Number of rows to skip when loading from a CSV.
:type skip_leading_rows: int
:param write_disposition: The write disposition if the table already exists.
:type write_disposition: string
:param field_delimiter: The delimiter to use when loading from a CSV.
:type field_delimiter: string
"""
assert '.' in destination_dataset_table, \
'Expected destination_dataset_table in the format of <dataset>.<table>. Got: {}'.format(destination_dataset_table)

Expand All @@ -311,6 +342,7 @@ def run_load(self, destination_dataset_table, schema_fields, source_uris, source
'sourceFormat': source_format,
'sourceUris': source_uris,
'writeDisposition': write_disposition,
}
}

if source_format == 'CSV':
Expand Down
13 changes: 12 additions & 1 deletion airflow/contrib/hooks/gcs_hook.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,10 +69,21 @@ def download(self, bucket, object, filename=False):
return downloaded_file_bytes

def upload(self, bucket, object, filename, mime_type='application/octet-stream'):
"""
Uploads a local file to Google Cloud Storage.
:param bucket: The bucket to upload to.
:type bucket: string
:param object: The object name to set when uploading the local file.
:type object: string
:param filename: The local file path to the file to be uploaded.
:type filename: string
:param mime_type: The MIME type to set when uploading the file.
:type mime_type: string
"""
service = self.get_conn()
media = MediaFileUpload(filename, mime_type)
response = service \
.objects() \
.insert(bucket=bucket, name=object, media_body=media) \
.execute()

4 changes: 3 additions & 1 deletion airflow/contrib/operators/bigquery_operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ def __init__(self,
bql,
destination_dataset_table = False,
write_disposition = 'WRITE_EMPTY',
allow_large_results=False,
bigquery_conn_id='bigquery_default',
delegate_to=None,
*args,
Expand All @@ -41,6 +42,7 @@ def __init__(self,
self.bql = bql
self.destination_dataset_table = destination_dataset_table
self.write_disposition = write_disposition
self.allow_large_results = allow_large_results
self.bigquery_conn_id = bigquery_conn_id
self.delegate_to = delegate_to

Expand All @@ -49,4 +51,4 @@ def execute(self, context):
hook = BigQueryHook(bigquery_conn_id=self.bigquery_conn_id, delegate_to=self.delegate_to)
conn = hook.get_conn()
cursor = conn.cursor()
cursor.run_query(self.bql, self.destination_dataset_table, self.write_disposition)
cursor.run_query(self.bql, self.destination_dataset_table, self.write_disposition, self.allow_large_results)
125 changes: 125 additions & 0 deletions airflow/contrib/operators/gcs_to_bq.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
import json
import logging

from airflow.contrib.hooks.gcs_hook import GoogleCloudStorageHook
from airflow.contrib.hooks.bigquery_hook import BigQueryHook
from airflow.models import BaseOperator
from airflow.utils import apply_defaults

class GoogleCloudStorageToBigQueryOperator(BaseOperator):
"""
Loads files from Google cloud storage into BigQuery.
"""
template_fields = ('bucket','source_objects','schema_object','destination_dataset_table')
template_ext = ('.sql',)
ui_color = '#f0eee4'

@apply_defaults
def __init__(
self,
bucket,
source_objects,
destination_dataset_table,
schema_fields=False,
schema_object=False,
source_format='CSV',
create_disposition='CREATE_IF_NEEDED',
skip_leading_rows=0,
write_disposition='WRITE_EMPTY',
field_delimiter=',',
max_id_key=False,
bigquery_conn_id='bigquery_default',
google_cloud_storage_conn_id='google_cloud_storage_default',
delegate_to=None,
*args,
**kwargs):
"""
The schema to be used for the BigQuery table may be specified in one of
two ways. You may either directly pass the schema fields in, or you may
point the operator to a Google cloud storage object name. The object in
Google cloud storage must be a JSON file with the schema fields in it.
:param bucket: The bucket to load from.
:type bucket: string
:param source_objects: List of Google cloud storage URIs to load from.
:type object: list
:param destination_dataset_table: The dotted <dataset>.<table> BigQuery table to load data into.
:type destination_dataset_table: string
:param schema_fields: If set, the schema field list as defined here:
https://cloud.google.com/bigquery/docs/reference/v2/jobs#configuration.load
:type schema_fields: list
:param schema_object: If set, a GCS object path pointing to a .json file that contains the schema for the table.
:param schema_object: string
:param source_format: File format to export.
:type source_format: string
:param create_disposition: The create disposition if the table doesn't exist.
:type create_disposition: string
:param skip_leading_rows: Number of rows to skip when loading from a CSV.
:type skip_leading_rows: int
:param write_disposition: The write disposition if the table already exists.
:type write_disposition: string
:param field_delimiter: The delimiter to use when loading from a CSV.
:type field_delimiter: string
:param max_id_key: If set, the name of a column in the BigQuery table
that's to be loaded. Thsi will be used to select the MAX value from
BigQuery after the load occurs. The results will be returned by the
execute() command, which in turn gets stored in XCom for future
operators to use. This can be helpful with incremental loads--during
future executions, you can pick up from the max ID.
:type max_id_key: string
:param bigquery_conn_id: Reference to a specific BigQuery hook.
:type bigquery_conn_id: string
:param google_cloud_storage_conn_id: Reference to a specific Google
cloud storage hook.
:type google_cloud_storage_conn_id: string
:param delegate_to: The account to impersonate, if any. For this to
work, the service account making the request must have domain-wide
delegation enabled.
:type delegate_to: string
"""
super(GoogleCloudStorageToBigQueryOperator, self).__init__(*args, **kwargs)

# GCS config
self.bucket = bucket
self.source_objects = source_objects
self.schema_object = schema_object

# BQ config
self.destination_dataset_table = destination_dataset_table
self.schema_fields = schema_fields
self.source_format = source_format
self.create_disposition = create_disposition
self.skip_leading_rows = skip_leading_rows
self.write_disposition = write_disposition
self.field_delimiter = field_delimiter

self.max_id_key = max_id_key
self.bigquery_conn_id = bigquery_conn_id
self.google_cloud_storage_conn_id = google_cloud_storage_conn_id
self.delegate_to = delegate_to

def execute(self, context):
gcs_hook = GoogleCloudStorageHook(google_cloud_storage_conn_id=self.google_cloud_storage_conn_id,
delegate_to=self.delegate_to)
bq_hook = BigQueryHook(bigquery_conn_id=self.bigquery_conn_id,
delegate_to=self.delegate_to)

schema_fields = self.schema_fields if self.schema_fields else json.loads(gcs_hook.download(self.bucket, self.schema_object))
source_uris = map(lambda schema_object: 'gs://{}/{}'.format(self.bucket, schema_object), self.source_objects)
conn = bq_hook.get_conn()
cursor = conn.cursor()
cursor.run_load(
destination_dataset_table=self.destination_dataset_table,
schema_fields=schema_fields,
source_uris=source_uris,
source_format=self.source_format,
create_disposition=self.create_disposition,
skip_leading_rows=self.skip_leading_rows,
write_disposition=self.write_disposition,
field_delimiter=self.field_delimiter)

if self.max_id_key:
cursor.execute('SELECT MAX({}) FROM {}'.format(self.max_id_key, self.destination_dataset_table))
row = cursor.fetchone()
logging.info('Loaded BQ data with max {}.{}={}'.format(self.destination_dataset_table, self.max_id_key, row[0]))
return row[0]
62 changes: 60 additions & 2 deletions airflow/contrib/operators/mysql_to_gcs.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,9 @@
from tempfile import NamedTemporaryFile

class MySqlToGoogleCloudStorageOperator(BaseOperator):
"""
Copy data from MySQL to Google cloud storage in JSON format.
"""
template_fields = ('sql', 'bucket', 'filename', 'schema_filename')
template_ext = ('.sql',)
ui_color = '#a0e08c'
Expand All @@ -20,12 +23,41 @@ def __init__(self,
bucket,
filename,
schema_filename=None,
approx_max_file_size_bytes=3900000000L,
approx_max_file_size_bytes=1900000000L,
mysql_conn_id='mysql_default',
google_cloud_storage_conn_id='google_cloud_storage_default',
delegate_to=None,
*args,
**kwargs):
"""
:param sql: The SQL to execute on the MySQL table.
:type sql: string
:param bucket: The bucket to upload to.
:type bucket: string
:param filename: The filename to use as the object name when uploading
to Google cloud storage. A {} should be specified in the filename
to allow the operator to inject file numbers in cases where the
file is split due to size.
:type filename: string
:param schema_filename: If set, the filename to use as the object name
when uploading a .json file containing the BigQuery schema fields
for the table that was dumped from MySQL.
:type schema_filename: string
:param approx_max_file_size_bytes: This operator supports the ability
to split large table dumps into multiple files (see notes in the
filenamed param docs above). Google cloud storage allows for files
to be a maximum of 4GB. This param allows developers to specify the
file size of the splits.
:type approx_max_file_size_bytes: long
:param mysql_conn_id: Reference to a specific MySQL hook.
:type mysql_conn_id: string
:param google_cloud_storage_conn_id: Reference to a specific Google
cloud storage hook.
:type google_cloud_storage_conn_id: string
:param delegate_to: The account to impersonate, if any. For this to
work, the service account making the request must have domain-wide
delegation enabled.
"""
super(MySqlToGoogleCloudStorageOperator, self).__init__(*args, **kwargs)
self.sql = sql;
self.bucket = bucket
Expand Down Expand Up @@ -55,13 +87,23 @@ def execute(self, context):
file_handle.close()

def _query_mysql(self):
"""
Queries mysql and returns a cursor to the results.
"""
mysql = MySqlHook(mysql_conn_id=self.mysql_conn_id)
conn = mysql.get_conn()
cursor = conn.cursor()
cursor.execute(self.sql)
return cursor

def _write_local_data_files(self, cursor):
"""
Takes a cursor, and writes results to a local file.
:return: A dictionary where keys are filenames to be used as object
names in GCS, and values are file handles to local files that
contain the data for the GCS objects.
"""
schema = map(lambda schema_tuple: schema_tuple[0], cursor.description)
file_no = 0
tmp_file_handle = NamedTemporaryFile(delete=True)
Expand All @@ -84,6 +126,14 @@ def _write_local_data_files(self, cursor):
return tmp_file_handles

def _write_local_schema_file(self, cursor):
"""
Takes a cursor, and writes the BigQuery schema for the results to a
local file system.
:return: A dictionary where key is a filename to be used as an object
name in GCS, and values are file handles to local files that
contains the BigQuery schema fields in .json format.
"""
schema = []
for field in cursor.description:
# See PEP 249 for details about the description tuple.
Expand All @@ -96,19 +146,27 @@ def _write_local_schema_file(self, cursor):
'mode': field_mode,
})

print('Using schema for {}: {}', self.schema_filename, str(schema))
logging.info('Using schema for %s: %s', self.schema_filename, schema)
tmp_schema_file_handle = NamedTemporaryFile(delete=True)
json.dump(schema, tmp_schema_file_handle)
return {self.schema_filename: tmp_schema_file_handle}

def _upload_to_gcs(self, files_to_upload):
"""
Upload all of the file splits (and optionally the schema .json file) to
Google cloud storage.
"""
hook = GoogleCloudStorageHook(google_cloud_storage_conn_id=self.google_cloud_storage_conn_id,
delegate_to=self.delegate_to)
for object, tmp_file_handle in files_to_upload.items():
hook.upload(self.bucket, object, tmp_file_handle.name, 'application/json')

@classmethod
def type_map(cls, mysql_type):
"""
Helper function that maps from MySQL fields to BigQuery fields. Used
when a schema_filename is set.
"""
d = {
FIELD_TYPE.BIT: 'INTEGER',
FIELD_TYPE.DATETIME: 'TIMESTAMP',
Expand Down

0 comments on commit 40834db

Please sign in to comment.