Skip to content

Commit

Permalink
[AIRFLOW-4423] Improve date handling in mysql to gcs operator. (apach…
Browse files Browse the repository at this point in the history
…e#5196)

* Handle TIME columns
* Ensure DATETIME and TIMESTAMP columns treated as UTC
  • Loading branch information
jmcarp authored and Fokko committed Jun 18, 2019
1 parent d0f6a80 commit 929b8fd
Show file tree
Hide file tree
Showing 3 changed files with 115 additions and 42 deletions.
9 changes: 9 additions & 0 deletions UPDATING.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,16 @@ The `elasticsearch_` prefix has been removed from all config items under the `[e

Updating to `google-cloud-storage >= 1.16` changes the signature of the upstream `client.get_bucket()` method from `get_bucket(bucket_name: str)` to `get_bucket(bucket_or_name: Union[str, Bucket])`. This method is not directly exposed by the airflow hook, but any code accessing the connection directly (`GoogleCloudStorageHook().get_conn().get_bucket(...)` or similar) will need to be updated.

### Export MySQL timestamps as UTC

`MySqlToGoogleCloudStorageOperator` now exports TIMESTAMP columns as UTC
by default, rather than using the default timezone of the MySQL server.
This is the correct behavior for use with BigQuery, since BigQuery
assumes that TIMESTAMP columns without time zones are in UTC. To
preserve the previous behavior, set `ensure_utc` to `False.`

### Removal of Mesos Executor

The Mesos Executor is removed from the code base as it was not widely used and not maintained. [Mailing List Discussion on deleting it](https://lists.apache.org/[email protected]:lte=1M:mesos).

### Increase standard Dataproc disk sizes
Expand Down
99 changes: 57 additions & 42 deletions airflow/contrib/operators/mysql_to_gcs.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,15 @@
# specific language governing permissions and limitations
# under the License.

import json
import time
import base64
import calendar
import json

from airflow.contrib.hooks.gcs_hook import GoogleCloudStorageHook
from airflow.hooks.mysql_hook import MySqlHook
from airflow.models import BaseOperator
from airflow.utils.decorators import apply_defaults
from datetime import date, datetime
from datetime import date, datetime, timedelta
from decimal import Decimal
from MySQLdb.constants import FIELD_TYPE
from tempfile import NamedTemporaryFile
Expand Down Expand Up @@ -78,11 +78,33 @@ class MySqlToGoogleCloudStorageOperator(BaseOperator):
:type export_format: str
:param field_delimiter: The delimiter to be used for CSV files.
:type field_delimiter: str
:param ensure_utc: Ensure TIMESTAMP columns exported as UTC. If set to
`False`, TIMESTAMP columns will be exported using the MySQL server's
default timezone.
:type ensure_utc: bool
"""
template_fields = ('sql', 'bucket', 'filename', 'schema_filename', 'schema')
template_ext = ('.sql',)
ui_color = '#a0e08c'

type_map = {
FIELD_TYPE.BIT: 'INTEGER',
FIELD_TYPE.DATETIME: 'TIMESTAMP',
FIELD_TYPE.DATE: 'TIMESTAMP',
FIELD_TYPE.DECIMAL: 'FLOAT',
FIELD_TYPE.NEWDECIMAL: 'FLOAT',
FIELD_TYPE.DOUBLE: 'FLOAT',
FIELD_TYPE.FLOAT: 'FLOAT',
FIELD_TYPE.INT24: 'INTEGER',
FIELD_TYPE.LONG: 'INTEGER',
FIELD_TYPE.LONGLONG: 'INTEGER',
FIELD_TYPE.SHORT: 'INTEGER',
FIELD_TYPE.TIME: 'TIME',
FIELD_TYPE.TIMESTAMP: 'TIMESTAMP',
FIELD_TYPE.TINY: 'INTEGER',
FIELD_TYPE.YEAR: 'INTEGER',
}

@apply_defaults
def __init__(self,
sql,
Expand All @@ -96,6 +118,7 @@ def __init__(self,
delegate_to=None,
export_format='json',
field_delimiter=',',
ensure_utc=False,
*args,
**kwargs):
super().__init__(*args, **kwargs)
Expand All @@ -110,6 +133,7 @@ def __init__(self,
self.delegate_to = delegate_to
self.export_format = export_format.lower()
self.field_delimiter = field_delimiter
self.ensure_utc = ensure_utc

def execute(self, context):
cursor = self._query_mysql()
Expand Down Expand Up @@ -138,6 +162,12 @@ def _query_mysql(self):
mysql = MySqlHook(mysql_conn_id=self.mysql_conn_id)
conn = mysql.get_conn()
cursor = conn.cursor()
if self.ensure_utc:
# Ensure TIMESTAMP results are in UTC
tz_query = "SET time_zone = '+00:00'"
self.log.info('Executing: %s', tz_query)
cursor.execute(tz_query)
self.log.info('Executing: %s', self.sql)
cursor.execute(self.sql)
return cursor

Expand Down Expand Up @@ -228,7 +258,7 @@ def _write_local_schema_file(self, cursor):
for field in cursor.description:
# See PEP 249 for details about the description tuple.
field_name = field[0]
field_type = self.type_map(field[1])
field_type = self.type_map.get(field[1], "STRING")
# Always allow TIMESTAMP to be nullable. MySQLdb returns None types
# for required fields because some MySQL timestamps can't be
# represented by Python's datetime (e.g. 0000-00-00 00:00:00).
Expand Down Expand Up @@ -265,27 +295,36 @@ def _upload_to_gcs(self, files_to_upload):
tmp_file.get('file_handle').name,
mime_type=tmp_file.get('file_mime_type'))

@staticmethod
def _convert_types(schema, col_type_dict, row):
@classmethod
def _convert_types(cls, schema, col_type_dict, row):
return [
cls._convert_type(value, col_type_dict.get(name))
for name, value in zip(schema, row)
]

@classmethod
def _convert_type(cls, value, schema_type):
"""
Takes a value from MySQLdb, and converts it to a value that's safe for
JSON/Google cloud storage/BigQuery. Dates are converted to UTC seconds.
Decimals are converted to floats. Binary type fields are encoded with base64,
as imported BYTES data must be base64-encoded according to Bigquery SQL
date type documentation: https://cloud.google.com/bigquery/data-types
:param value: MySQLdb column value
:type value: Any
:param schema_type: BigQuery data type
:type schema_type: str
"""
converted_row = []
for col_name, col_val in zip(schema, row):
if type(col_val) in (datetime, date):
col_val = time.mktime(col_val.timetuple())
elif isinstance(col_val, Decimal):
col_val = float(col_val)
elif col_type_dict.get(col_name) == "BYTES":
col_val = base64.standard_b64encode(col_val).decode('ascii')
else:
col_val = col_val
converted_row.append(col_val)
return converted_row
if isinstance(value, (datetime, date)):
return calendar.timegm(value.timetuple())
if isinstance(value, timedelta):
return value.total_seconds()
if isinstance(value, Decimal):
return float(value)
if schema_type == "BYTES":
return base64.standard_b64encode(value).decode('ascii')
return value

def _get_col_type_dict(self):
"""
Expand All @@ -308,27 +347,3 @@ def _get_col_type_dict(self):
'refer to: https://cloud.google.com/bigquery/docs/schemas'
'#specifying_a_json_schema_file')
return col_type_dict

@classmethod
def type_map(cls, mysql_type):
"""
Helper function that maps from MySQL fields to BigQuery fields. Used
when a schema_filename is set.
"""
d = {
FIELD_TYPE.INT24: 'INTEGER',
FIELD_TYPE.TINY: 'INTEGER',
FIELD_TYPE.BIT: 'INTEGER',
FIELD_TYPE.DATETIME: 'TIMESTAMP',
FIELD_TYPE.DATE: 'TIMESTAMP',
FIELD_TYPE.DECIMAL: 'FLOAT',
FIELD_TYPE.NEWDECIMAL: 'FLOAT',
FIELD_TYPE.DOUBLE: 'FLOAT',
FIELD_TYPE.FLOAT: 'FLOAT',
FIELD_TYPE.LONG: 'INTEGER',
FIELD_TYPE.LONGLONG: 'INTEGER',
FIELD_TYPE.SHORT: 'INTEGER',
FIELD_TYPE.TIMESTAMP: 'TIMESTAMP',
FIELD_TYPE.YEAR: 'INTEGER',
}
return d[mysql_type] if mysql_type in d else 'STRING'
49 changes: 49 additions & 0 deletions tests/contrib/operators/test_mysql_to_gcs_operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,18 @@
# specific language governing permissions and limitations
# under the License.

import datetime
import decimal
import unittest

from airflow.contrib.operators.mysql_to_gcs import \
MySqlToGoogleCloudStorageOperator
from parameterized import parameterized
from tests.compat import mock

TASK_ID = 'test-mysql-to-gcs'
MYSQL_CONN_ID = 'mysql_conn_test'
TZ_QUERY = "SET time_zone = '+00:00'"
SQL = 'select 1'
BUCKET = 'gs://test'
JSON_FILENAME = 'test_{}.ndjson'
Expand Down Expand Up @@ -77,6 +81,18 @@ def test_init(self):
self.assertEqual(op.export_format, 'csv')
self.assertEqual(op.field_delimiter, '|')

@parameterized.expand([
("string", None, "string"),
(datetime.date(1970, 1, 2), None, 86400),
(datetime.datetime(1970, 1, 1, 1, 0), None, 3600),
(decimal.Decimal(5), None, 5),
(b"bytes", "BYTES", "Ynl0ZXM="),
])
def test_convert_type(self, value, schema_type, expected):
self.assertEqual(
MySqlToGoogleCloudStorageOperator._convert_type(value, schema_type),
expected)

@mock.patch('airflow.contrib.operators.mysql_to_gcs.MySqlHook')
@mock.patch('airflow.contrib.operators.mysql_to_gcs.GoogleCloudStorageHook')
def test_exec_success_json(self, gcs_hook_mock_class, mysql_hook_mock_class):
Expand Down Expand Up @@ -140,6 +156,39 @@ def _assert_upload(bucket, obj, tmp_filename, mime_type=None):
mysql_hook_mock_class.assert_called_once_with(mysql_conn_id=MYSQL_CONN_ID)
mysql_hook_mock.get_conn().cursor().execute.assert_called_once_with(SQL)

@mock.patch('airflow.contrib.operators.mysql_to_gcs.MySqlHook')
@mock.patch('airflow.contrib.operators.mysql_to_gcs.GoogleCloudStorageHook')
def test_exec_success_csv_ensure_utc(self, gcs_hook_mock_class, mysql_hook_mock_class):
"""Test successful run of execute function for CSV"""
op = MySqlToGoogleCloudStorageOperator(
task_id=TASK_ID,
mysql_conn_id=MYSQL_CONN_ID,
sql=SQL,
export_format='CSV',
bucket=BUCKET,
filename=CSV_FILENAME,
ensure_utc=True)

mysql_hook_mock = mysql_hook_mock_class.return_value
mysql_hook_mock.get_conn().cursor().__iter__.return_value = iter(ROWS)
mysql_hook_mock.get_conn().cursor().description = CURSOR_DESCRIPTION

gcs_hook_mock = gcs_hook_mock_class.return_value

def _assert_upload(bucket, obj, tmp_filename, mime_type=None):
self.assertEqual(BUCKET, bucket)
self.assertEqual(CSV_FILENAME.format(0), obj)
self.assertEqual('text/csv', mime_type)
with open(tmp_filename, 'rb') as f:
self.assertEqual(b''.join(CSV_LINES), f.read())

gcs_hook_mock.upload.side_effect = _assert_upload

op.execute(None)

mysql_hook_mock_class.assert_called_once_with(mysql_conn_id=MYSQL_CONN_ID)
mysql_hook_mock.get_conn().cursor().execute.assert_has_calls([mock.call(TZ_QUERY), mock.call(SQL)])

@mock.patch('airflow.contrib.operators.mysql_to_gcs.MySqlHook')
@mock.patch('airflow.contrib.operators.mysql_to_gcs.GoogleCloudStorageHook')
def test_exec_success_csv_with_delimiter(self, gcs_hook_mock_class, mysql_hook_mock_class):
Expand Down

0 comments on commit 929b8fd

Please sign in to comment.