From ff342fc230982dc5d88acfd5e5eab75187256b58 Mon Sep 17 00:00:00 2001 From: Pranjal Mittal Date: Sun, 17 May 2020 20:39:04 +0530 Subject: [PATCH] Added SalesforceHook missing method to return only dataframe (#8565) (#8644) * add feature for skipping writing to file * add SalesforceHook missing method to return dataframe only function write_object_to_file is divided to object_to_df which returns df and then the write_object_to_file can uses object_to_df as the first step before exporting to file * fixed exception message * fix review comments - removed filename check for None --- .../providers/salesforce/hooks/salesforce.py | 76 +++++++++++++------ .../salesforce/hooks/test_salesforce.py | 45 +++++++++++ 2 files changed, 97 insertions(+), 24 deletions(-) diff --git a/airflow/providers/salesforce/hooks/salesforce.py b/airflow/providers/salesforce/hooks/salesforce.py index 9f224c6a31f68..66aa0dd2e2c11 100644 --- a/airflow/providers/salesforce/hooks/salesforce.py +++ b/airflow/providers/salesforce/hooks/salesforce.py @@ -236,6 +236,58 @@ def write_object_to_file(self, if fmt not in ['csv', 'json', 'ndjson']: raise ValueError("Format value is not recognized: {}".format(fmt)) + df = self.object_to_df(query_results=query_results, coerce_to_timestamp=coerce_to_timestamp, + record_time_added=record_time_added) + + # write the CSV or JSON file depending on the option + # NOTE: + # datetimes here are an issue. + # There is no good way to manage the difference + # for to_json, the options are an epoch or a ISO string + # but for to_csv, it will be a string output by datetime + # For JSON we decided to output the epoch timestamp in seconds + # (as is fairly standard for JavaScript) + # And for csv, we do a string + if fmt == "csv": + # there are also a ton of newline objects that mess up our ability to write to csv + # we remove these newlines so that the output is a valid CSV format + self.log.info("Cleaning data and writing to CSV") + possible_strings = df.columns[df.dtypes == "object"] + df[possible_strings] = df[possible_strings].astype(str).apply( + lambda x: x.str.replace("\r\n", "").str.replace("\n", "") + ) + # write the dataframe + df.to_csv(filename, index=False) + elif fmt == "json": + df.to_json(filename, "records", date_unit="s") + elif fmt == "ndjson": + df.to_json(filename, "records", lines=True, date_unit="s") + + return df + + def object_to_df(self, query_results, coerce_to_timestamp=False, + record_time_added=False): + """ + Export query results to dataframe. + + By default, this function will try and leave all values as they are represented in Salesforce. + You use the `coerce_to_timestamp` flag to force all datetimes to become Unix timestamps (UTC). + This is can be greatly beneficial as it will make all of your datetime fields look the same, + and makes it easier to work with in other database environments + + :param query_results: the results from a SQL query + :type query_results: list of dict + :param coerce_to_timestamp: True if you want all datetime fields to be converted into Unix timestamps. + False if you want them to be left in the same format as they were in Salesforce. + Leaving the value as False will result in datetimes being strings. Default: False + :type coerce_to_timestamp: bool + :param record_time_added: True if you want to add a Unix timestamp field + to the resulting data that marks when the data was fetched from Salesforce. Default: False + :type record_time_added: bool + :return: the dataframe. + :rtype: pd.Dataframe + """ + # this line right here will convert all integers to floats # if there are any None/np.nan values in the column # that's because None/np.nan cannot exist in an integer column @@ -272,28 +324,4 @@ def write_object_to_file(self, fetched_time = time.time() df["time_fetched_from_salesforce"] = fetched_time - # write the CSV or JSON file depending on the option - # NOTE: - # datetimes here are an issue. - # There is no good way to manage the difference - # for to_json, the options are an epoch or a ISO string - # but for to_csv, it will be a string output by datetime - # For JSON we decided to output the epoch timestamp in seconds - # (as is fairly standard for JavaScript) - # And for csv, we do a string - if fmt == "csv": - # there are also a ton of newline objects that mess up our ability to write to csv - # we remove these newlines so that the output is a valid CSV format - self.log.info("Cleaning data and writing to CSV") - possible_strings = df.columns[df.dtypes == "object"] - df[possible_strings] = df[possible_strings].astype(str).apply( - lambda x: x.str.replace("\r\n", "").str.replace("\n", "") - ) - # write the dataframe - df.to_csv(filename, index=False) - elif fmt == "json": - df.to_json(filename, "records", date_unit="s") - elif fmt == "ndjson": - df.to_json(filename, "records", lines=True, date_unit="s") - return df diff --git a/tests/providers/salesforce/hooks/test_salesforce.py b/tests/providers/salesforce/hooks/test_salesforce.py index b6828aebcbfd5..1c9dfdcc0ffd2 100644 --- a/tests/providers/salesforce/hooks/test_salesforce.py +++ b/tests/providers/salesforce/hooks/test_salesforce.py @@ -176,3 +176,48 @@ def test_write_object_to_file_ndjson_with_record_time(self, mock_data_frame, moc } ), ) + + @patch( + "airflow.providers.salesforce.hooks.salesforce.SalesforceHook.describe_object", + return_value={"fields": [{"name": "field_1", "type": "date"}]}, + ) + @patch( + "airflow.providers.salesforce.hooks.salesforce.pd.DataFrame.from_records", + return_value=pd.DataFrame({"test": [1, 2, 3], "field_1": ["2019-01-01", "2019-01-02", "2019-01-03"]}), + ) + def test_obect_to_df_with_timestamp_conversion(self, mock_data_frame, mock_describe_object): + obj_name = "obj_name" + + data_frame = self.salesforce_hook.object_to_df( + query_results=[{"attributes": {"type": obj_name}}], + coerce_to_timestamp=True, + ) + + mock_describe_object.assert_called_once_with(obj_name) + pd.testing.assert_frame_equal( + data_frame, pd.DataFrame({"test": [1, 2, 3], "field_1": [1.546301e09, 1.546387e09, 1.546474e09]}) + ) + + @patch("airflow.providers.salesforce.hooks.salesforce.time.time", return_value=1.23) + @patch( + "airflow.providers.salesforce.hooks.salesforce.pd.DataFrame.from_records", + return_value=pd.DataFrame({"test": [1, 2, 3]}), + ) + def test_object_to_df_with_record_time(self, mock_data_frame, mock_time): + data_frame = self.salesforce_hook.object_to_df( + query_results=[], record_time_added=True + ) + + pd.testing.assert_frame_equal( + data_frame, + pd.DataFrame( + { + "test": [1, 2, 3], + "time_fetched_from_salesforce": [ + mock_time.return_value, + mock_time.return_value, + mock_time.return_value, + ], + } + ), + )