Skip to content

Commit

Permalink
Merge pull request apache#1080 from criccomini/mysql-to-gcs
Browse files Browse the repository at this point in the history
Add MySQL->GCS, GCS->BQ operators
  • Loading branch information
r39132 committed Feb 25, 2016
2 parents a8ed2bb + 40834db commit 5a2dc8f
Show file tree
Hide file tree
Showing 5 changed files with 395 additions and 4 deletions.
66 changes: 63 additions & 3 deletions airflow/contrib/hooks/bigquery_hook.py
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ def __init__(self, service, project_id):
self.service = service
self.project_id = project_id

def run_query(self, bql, destination_dataset_table = False, write_disposition = 'WRITE_EMPTY'):
def run_query(self, bql, destination_dataset_table = False, write_disposition = 'WRITE_EMPTY', allow_large_results=False):
"""
Executes a BigQuery SQL query. Optionally persists results in a BigQuery
table. See here:
Expand All @@ -164,10 +164,12 @@ def run_query(self, bql, destination_dataset_table = False, write_disposition =
BigQuery table to save the query results.
:param write_disposition: What to do if the table already exists in
BigQuery.
:param allow_large_results: Whether to allow large results.
:type allow_large_results: boolean
"""
configuration = {
'query': {
'query': bql
'query': bql,
}
}

Expand All @@ -176,6 +178,7 @@ def run_query(self, bql, destination_dataset_table = False, write_disposition =
'Expected destination_dataset_table in the format of <dataset>.<table>. Got: {}'.format(destination_dataset_table)
destination_dataset, destination_table = destination_dataset_table.split('.', 1)
configuration['query'].update({
'allowLargeResults': allow_large_results,
'writeDisposition': write_disposition,
'destinationTable': {
'projectId': self.project_id,
Expand Down Expand Up @@ -205,7 +208,7 @@ def run_extract(self, source_dataset_table, destination_cloud_storage_uris, comp
:param compression: Type of compression to use.
:type compression: string
:param export_format: File format to export.
:type field_delimiter: string
:type export_format: string
:param field_delimiter: The delimiter to use when extracting to a CSV.
:type field_delimiter: string
:param print_header: Whether to print a header for a CSV file extract.
Expand Down Expand Up @@ -291,6 +294,63 @@ def run_copy(self, source_dataset_tables, destination_project_dataset_table, wri

return self.run_with_configuration(configuration)

def run_load(self, destination_dataset_table, schema_fields, source_uris, source_format='CSV', create_disposition='CREATE_IF_NEEDED', skip_leading_rows=0, write_disposition='WRITE_EMPTY', field_delimiter=','):
"""
Executes a BigQuery load command to load data from Google Cloud Storage
to BigQuery. See here:
https://cloud.google.com/bigquery/docs/reference/v2/jobs
For more details about these parameters.
:param destination_dataset_table: The dotted <dataset>.<table> BigQuery table to load data into.
:type destination_dataset_table: string
:param schema_fields: The schema field list as defined here:
https://cloud.google.com/bigquery/docs/reference/v2/jobs#configuration.load
:type schema_fields: list
:param source_uris: The source Google Cloud
Storage URI (e.g. gs://some-bucket/some-file.txt). A single wild
per-object name can be used.
:type source_uris: list
:param source_format: File format to export.
:type source_format: string
:param create_disposition: The create disposition if the table doesn't exist.
:type create_disposition: string
:param skip_leading_rows: Number of rows to skip when loading from a CSV.
:type skip_leading_rows: int
:param write_disposition: The write disposition if the table already exists.
:type write_disposition: string
:param field_delimiter: The delimiter to use when loading from a CSV.
:type field_delimiter: string
"""
assert '.' in destination_dataset_table, \
'Expected destination_dataset_table in the format of <dataset>.<table>. Got: {}'.format(destination_dataset_table)

destination_dataset, destination_table = destination_dataset_table.split('.', 1)

configuration = {
'load': {
'createDisposition': create_disposition,
'destinationTable': {
'projectId': self.project_id,
'datasetId': destination_dataset,
'tableId': destination_table,
},
'schema': {
'fields': schema_fields
},
'sourceFormat': source_format,
'sourceUris': source_uris,
'writeDisposition': write_disposition,
}
}

if source_format == 'CSV':
configuration['load']['skipLeadingRows'] = skip_leading_rows
configuration['load']['fieldDelimiter'] = field_delimiter

return self.run_with_configuration(configuration)

def run_with_configuration(self, configuration):
"""
Executes a BigQuery SQL query. See here:
Expand Down
21 changes: 21 additions & 0 deletions airflow/contrib/hooks/gcs_hook.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
from airflow.contrib.hooks.gc_base_hook import GoogleCloudBaseHook
from airflow.hooks.base_hook import BaseHook
from apiclient.discovery import build
from apiclient.http import MediaFileUpload
from oauth2client.client import SignedJwtAssertionCredentials

logging.getLogger("google_cloud_storage").setLevel(logging.INFO)
Expand Down Expand Up @@ -66,3 +67,23 @@ def download(self, bucket, object, filename=False):
file_fd.write(downloaded_file_bytes)

return downloaded_file_bytes

def upload(self, bucket, object, filename, mime_type='application/octet-stream'):
"""
Uploads a local file to Google Cloud Storage.
:param bucket: The bucket to upload to.
:type bucket: string
:param object: The object name to set when uploading the local file.
:type object: string
:param filename: The local file path to the file to be uploaded.
:type filename: string
:param mime_type: The MIME type to set when uploading the file.
:type mime_type: string
"""
service = self.get_conn()
media = MediaFileUpload(filename, mime_type)
response = service \
.objects() \
.insert(bucket=bucket, name=object, media_body=media) \
.execute()
4 changes: 3 additions & 1 deletion airflow/contrib/operators/bigquery_operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ def __init__(self,
bql,
destination_dataset_table = False,
write_disposition = 'WRITE_EMPTY',
allow_large_results=False,
bigquery_conn_id='bigquery_default',
delegate_to=None,
*args,
Expand All @@ -41,6 +42,7 @@ def __init__(self,
self.bql = bql
self.destination_dataset_table = destination_dataset_table
self.write_disposition = write_disposition
self.allow_large_results = allow_large_results
self.bigquery_conn_id = bigquery_conn_id
self.delegate_to = delegate_to

Expand All @@ -49,4 +51,4 @@ def execute(self, context):
hook = BigQueryHook(bigquery_conn_id=self.bigquery_conn_id, delegate_to=self.delegate_to)
conn = hook.get_conn()
cursor = conn.cursor()
cursor.run_query(self.bql, self.destination_dataset_table, self.write_disposition)
cursor.run_query(self.bql, self.destination_dataset_table, self.write_disposition, self.allow_large_results)
125 changes: 125 additions & 0 deletions airflow/contrib/operators/gcs_to_bq.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
import json
import logging

from airflow.contrib.hooks.gcs_hook import GoogleCloudStorageHook
from airflow.contrib.hooks.bigquery_hook import BigQueryHook
from airflow.models import BaseOperator
from airflow.utils import apply_defaults

class GoogleCloudStorageToBigQueryOperator(BaseOperator):
"""
Loads files from Google cloud storage into BigQuery.
"""
template_fields = ('bucket','source_objects','schema_object','destination_dataset_table')
template_ext = ('.sql',)
ui_color = '#f0eee4'

@apply_defaults
def __init__(
self,
bucket,
source_objects,
destination_dataset_table,
schema_fields=False,
schema_object=False,
source_format='CSV',
create_disposition='CREATE_IF_NEEDED',
skip_leading_rows=0,
write_disposition='WRITE_EMPTY',
field_delimiter=',',
max_id_key=False,
bigquery_conn_id='bigquery_default',
google_cloud_storage_conn_id='google_cloud_storage_default',
delegate_to=None,
*args,
**kwargs):
"""
The schema to be used for the BigQuery table may be specified in one of
two ways. You may either directly pass the schema fields in, or you may
point the operator to a Google cloud storage object name. The object in
Google cloud storage must be a JSON file with the schema fields in it.
:param bucket: The bucket to load from.
:type bucket: string
:param source_objects: List of Google cloud storage URIs to load from.
:type object: list
:param destination_dataset_table: The dotted <dataset>.<table> BigQuery table to load data into.
:type destination_dataset_table: string
:param schema_fields: If set, the schema field list as defined here:
https://cloud.google.com/bigquery/docs/reference/v2/jobs#configuration.load
:type schema_fields: list
:param schema_object: If set, a GCS object path pointing to a .json file that contains the schema for the table.
:param schema_object: string
:param source_format: File format to export.
:type source_format: string
:param create_disposition: The create disposition if the table doesn't exist.
:type create_disposition: string
:param skip_leading_rows: Number of rows to skip when loading from a CSV.
:type skip_leading_rows: int
:param write_disposition: The write disposition if the table already exists.
:type write_disposition: string
:param field_delimiter: The delimiter to use when loading from a CSV.
:type field_delimiter: string
:param max_id_key: If set, the name of a column in the BigQuery table
that's to be loaded. Thsi will be used to select the MAX value from
BigQuery after the load occurs. The results will be returned by the
execute() command, which in turn gets stored in XCom for future
operators to use. This can be helpful with incremental loads--during
future executions, you can pick up from the max ID.
:type max_id_key: string
:param bigquery_conn_id: Reference to a specific BigQuery hook.
:type bigquery_conn_id: string
:param google_cloud_storage_conn_id: Reference to a specific Google
cloud storage hook.
:type google_cloud_storage_conn_id: string
:param delegate_to: The account to impersonate, if any. For this to
work, the service account making the request must have domain-wide
delegation enabled.
:type delegate_to: string
"""
super(GoogleCloudStorageToBigQueryOperator, self).__init__(*args, **kwargs)

# GCS config
self.bucket = bucket
self.source_objects = source_objects
self.schema_object = schema_object

# BQ config
self.destination_dataset_table = destination_dataset_table
self.schema_fields = schema_fields
self.source_format = source_format
self.create_disposition = create_disposition
self.skip_leading_rows = skip_leading_rows
self.write_disposition = write_disposition
self.field_delimiter = field_delimiter

self.max_id_key = max_id_key
self.bigquery_conn_id = bigquery_conn_id
self.google_cloud_storage_conn_id = google_cloud_storage_conn_id
self.delegate_to = delegate_to

def execute(self, context):
gcs_hook = GoogleCloudStorageHook(google_cloud_storage_conn_id=self.google_cloud_storage_conn_id,
delegate_to=self.delegate_to)
bq_hook = BigQueryHook(bigquery_conn_id=self.bigquery_conn_id,
delegate_to=self.delegate_to)

schema_fields = self.schema_fields if self.schema_fields else json.loads(gcs_hook.download(self.bucket, self.schema_object))
source_uris = map(lambda schema_object: 'gs://{}/{}'.format(self.bucket, schema_object), self.source_objects)
conn = bq_hook.get_conn()
cursor = conn.cursor()
cursor.run_load(
destination_dataset_table=self.destination_dataset_table,
schema_fields=schema_fields,
source_uris=source_uris,
source_format=self.source_format,
create_disposition=self.create_disposition,
skip_leading_rows=self.skip_leading_rows,
write_disposition=self.write_disposition,
field_delimiter=self.field_delimiter)

if self.max_id_key:
cursor.execute('SELECT MAX({}) FROM {}'.format(self.max_id_key, self.destination_dataset_table))
row = cursor.fetchone()
logging.info('Loaded BQ data with max {}.{}={}'.format(self.destination_dataset_table, self.max_id_key, row[0]))
return row[0]
Loading

0 comments on commit 5a2dc8f

Please sign in to comment.