Skip to content

Commit

Permalink
Revert "[AIRFLOW-1613] Handle binary field in MySqlToGoogleCloudStora…
Browse files Browse the repository at this point in the history
…geOperator"

Reverting due to improper handling of binary description_flag.

This reverts commit d578b29.
  • Loading branch information
criccomini committed Oct 13, 2017
1 parent 4dade6d commit 8512776
Show file tree
Hide file tree
Showing 2 changed files with 14 additions and 38 deletions.
4 changes: 1 addition & 3 deletions airflow/contrib/hooks/bigquery_hook.py
Original file line number Diff line number Diff line change
Expand Up @@ -971,9 +971,7 @@ def _bq_cast(string_field, bq_type):
if string_field is None:
return None
elif bq_type == 'INTEGER' or bq_type == 'TIMESTAMP':
# convert to float first to handle cases where string_field is
# represented in scientific notation
return int(float(string_field))
return int(string_field)
elif bq_type == 'FLOAT':
return float(string_field)
elif bq_type == 'BOOLEAN':
Expand Down
48 changes: 13 additions & 35 deletions airflow/contrib/operators/mysql_to_gcs.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,14 @@

import json
import time
import base64

from airflow.contrib.hooks.gcs_hook import GoogleCloudStorageHook
from airflow.hooks.mysql_hook import MySqlHook
from airflow.models import BaseOperator
from airflow.utils.decorators import apply_defaults
from datetime import date, datetime
from decimal import Decimal
from MySQLdb.constants import FIELD_TYPE, FLAG
from MySQLdb.constants import FIELD_TYPE
from tempfile import NamedTemporaryFile


Expand Down Expand Up @@ -121,20 +120,15 @@ def _write_local_data_files(self, cursor):
names in GCS, and values are file handles to local files that
contain the data for the GCS objects.
"""
field_names = list(map(lambda schema_tuple: schema_tuple[0], cursor.description))
mysql_types = list(map(lambda schema_tuple: schema_tuple[1], cursor.description))
byte_fields = [self.is_binary(t, f) for t, f in zip(mysql_types, cursor.description_flags)]

schema = list(map(lambda schema_tuple: schema_tuple[0], cursor.description))
file_no = 0
tmp_file_handle = NamedTemporaryFile(mode='w', delete=True)
tmp_file_handle = NamedTemporaryFile(delete=True)
tmp_file_handles = {self.filename.format(file_no): tmp_file_handle}

for row in cursor:
# Convert datetime objects to utc seconds, decimals to floats, and binaries
# to base64-encoded strings
row_dict = {}
for name, value, is_binary in zip(field_names, row, byte_fields):
row_dict[name] = self.convert_types(value, is_binary)
# Convert datetime objects to utc seconds, and decimals to floats
row = map(self.convert_types, row)
row_dict = dict(zip(schema, row))

# TODO validate that row isn't > 2MB. BQ enforces a hard row size of 2MB.
json.dump(row_dict, tmp_file_handle)
Expand All @@ -145,7 +139,7 @@ def _write_local_data_files(self, cursor):
# Stop if the file exceeds the file size limit.
if tmp_file_handle.tell() >= self.approx_max_file_size_bytes:
file_no += 1
tmp_file_handle = NamedTemporaryFile(mode='w', delete=True)
tmp_file_handle = NamedTemporaryFile(delete=True)
tmp_file_handles[self.filename.format(file_no)] = tmp_file_handle

return tmp_file_handles
Expand All @@ -160,12 +154,10 @@ def _write_local_schema_file(self, cursor):
contains the BigQuery schema fields in .json format.
"""
schema = []
for field, flag in zip(cursor.description, cursor.description_flags):
for field in cursor.description:
# See PEP 249 for details about the description tuple.
field_name = field[0]

field_type = self.type_map(field[1], flag)

field_type = self.type_map(field[1])
# Always allow TIMESTAMP to be nullable. MySQLdb returns None types
# for required fields because some MySQL timestamps can't be
# represented by Python's datetime (e.g. 0000-00-00 00:00:00).
Expand All @@ -177,7 +169,7 @@ def _write_local_schema_file(self, cursor):
})

self.log.info('Using schema for %s: %s', self.schema_filename, schema)
tmp_schema_file_handle = NamedTemporaryFile(mode='w', delete=True)
tmp_schema_file_handle = NamedTemporaryFile(delete=True)
json.dump(schema, tmp_schema_file_handle)
return {self.schema_filename: tmp_schema_file_handle}

Expand All @@ -192,24 +184,21 @@ def _upload_to_gcs(self, files_to_upload):
hook.upload(self.bucket, object, tmp_file_handle.name, 'application/json')

@classmethod
def convert_types(cls, value, is_binary=False):
def convert_types(cls, value):
"""
Takes a value from MySQLdb, and converts it to a value that's safe for
JSON/Google cloud storage/BigQuery. Dates are converted to UTC seconds.
Decimals are converted to floats. Binaries are converted to base64-encoded
strings.
Decimals are converted to floats.
"""
if type(value) in (datetime, date):
return time.mktime(value.timetuple())
elif isinstance(value, Decimal):
return float(value)
elif is_binary:
return base64.b64encode(value).decode()
else:
return value

@classmethod
def type_map(cls, mysql_type, flags):
def type_map(cls, mysql_type):
"""
Helper function that maps from MySQL fields to BigQuery fields. Used
when a schema_filename is set.
Expand All @@ -231,15 +220,4 @@ def type_map(cls, mysql_type, flags):
FIELD_TYPE.TIMESTAMP: 'TIMESTAMP',
FIELD_TYPE.YEAR: 'INTEGER',
}

if MySqlToGoogleCloudStorageOperator.is_binary(mysql_type, flags):
return 'BYTES'

return d[mysql_type] if mysql_type in d else 'STRING'

@classmethod
def is_binary(cls, mysql_type, flags):
# MySQLdb groups both char/varchar and binary/varbinary as STRING/VAR_STRING.
# To work around this ambiguity, check the description flag to see if it's a binary field.
return mysql_type in [FIELD_TYPE.STRING, FIELD_TYPE.VAR_STRING] and \
flags & FLAG.BINARY == FLAG.BINARY

0 comments on commit 8512776

Please sign in to comment.