Skip to content

Commit

Permalink
Added SalesforceHook missing method to return only dataframe (apache#…
Browse files Browse the repository at this point in the history
…8565) (apache#8644)

* add feature for skipping writing to file

* add SalesforceHook missing method to return dataframe only

function write_object_to_file is divided to object_to_df which returns df and then the write_object_to_file can uses object_to_df as the first step before exporting to file

* fixed exception message

* fix review comments - removed filename check for None
  • Loading branch information
pranjalmittal authored May 17, 2020
1 parent 8985df0 commit ff342fc
Show file tree
Hide file tree
Showing 2 changed files with 97 additions and 24 deletions.
76 changes: 52 additions & 24 deletions airflow/providers/salesforce/hooks/salesforce.py
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,58 @@ def write_object_to_file(self,
if fmt not in ['csv', 'json', 'ndjson']:
raise ValueError("Format value is not recognized: {}".format(fmt))

df = self.object_to_df(query_results=query_results, coerce_to_timestamp=coerce_to_timestamp,
record_time_added=record_time_added)

# write the CSV or JSON file depending on the option
# NOTE:
# datetimes here are an issue.
# There is no good way to manage the difference
# for to_json, the options are an epoch or a ISO string
# but for to_csv, it will be a string output by datetime
# For JSON we decided to output the epoch timestamp in seconds
# (as is fairly standard for JavaScript)
# And for csv, we do a string
if fmt == "csv":
# there are also a ton of newline objects that mess up our ability to write to csv
# we remove these newlines so that the output is a valid CSV format
self.log.info("Cleaning data and writing to CSV")
possible_strings = df.columns[df.dtypes == "object"]
df[possible_strings] = df[possible_strings].astype(str).apply(
lambda x: x.str.replace("\r\n", "").str.replace("\n", "")
)
# write the dataframe
df.to_csv(filename, index=False)
elif fmt == "json":
df.to_json(filename, "records", date_unit="s")
elif fmt == "ndjson":
df.to_json(filename, "records", lines=True, date_unit="s")

return df

def object_to_df(self, query_results, coerce_to_timestamp=False,
record_time_added=False):
"""
Export query results to dataframe.
By default, this function will try and leave all values as they are represented in Salesforce.
You use the `coerce_to_timestamp` flag to force all datetimes to become Unix timestamps (UTC).
This is can be greatly beneficial as it will make all of your datetime fields look the same,
and makes it easier to work with in other database environments
:param query_results: the results from a SQL query
:type query_results: list of dict
:param coerce_to_timestamp: True if you want all datetime fields to be converted into Unix timestamps.
False if you want them to be left in the same format as they were in Salesforce.
Leaving the value as False will result in datetimes being strings. Default: False
:type coerce_to_timestamp: bool
:param record_time_added: True if you want to add a Unix timestamp field
to the resulting data that marks when the data was fetched from Salesforce. Default: False
:type record_time_added: bool
:return: the dataframe.
:rtype: pd.Dataframe
"""

# this line right here will convert all integers to floats
# if there are any None/np.nan values in the column
# that's because None/np.nan cannot exist in an integer column
Expand Down Expand Up @@ -272,28 +324,4 @@ def write_object_to_file(self,
fetched_time = time.time()
df["time_fetched_from_salesforce"] = fetched_time

# write the CSV or JSON file depending on the option
# NOTE:
# datetimes here are an issue.
# There is no good way to manage the difference
# for to_json, the options are an epoch or a ISO string
# but for to_csv, it will be a string output by datetime
# For JSON we decided to output the epoch timestamp in seconds
# (as is fairly standard for JavaScript)
# And for csv, we do a string
if fmt == "csv":
# there are also a ton of newline objects that mess up our ability to write to csv
# we remove these newlines so that the output is a valid CSV format
self.log.info("Cleaning data and writing to CSV")
possible_strings = df.columns[df.dtypes == "object"]
df[possible_strings] = df[possible_strings].astype(str).apply(
lambda x: x.str.replace("\r\n", "").str.replace("\n", "")
)
# write the dataframe
df.to_csv(filename, index=False)
elif fmt == "json":
df.to_json(filename, "records", date_unit="s")
elif fmt == "ndjson":
df.to_json(filename, "records", lines=True, date_unit="s")

return df
45 changes: 45 additions & 0 deletions tests/providers/salesforce/hooks/test_salesforce.py
Original file line number Diff line number Diff line change
Expand Up @@ -176,3 +176,48 @@ def test_write_object_to_file_ndjson_with_record_time(self, mock_data_frame, moc
}
),
)

@patch(
"airflow.providers.salesforce.hooks.salesforce.SalesforceHook.describe_object",
return_value={"fields": [{"name": "field_1", "type": "date"}]},
)
@patch(
"airflow.providers.salesforce.hooks.salesforce.pd.DataFrame.from_records",
return_value=pd.DataFrame({"test": [1, 2, 3], "field_1": ["2019-01-01", "2019-01-02", "2019-01-03"]}),
)
def test_obect_to_df_with_timestamp_conversion(self, mock_data_frame, mock_describe_object):
obj_name = "obj_name"

data_frame = self.salesforce_hook.object_to_df(
query_results=[{"attributes": {"type": obj_name}}],
coerce_to_timestamp=True,
)

mock_describe_object.assert_called_once_with(obj_name)
pd.testing.assert_frame_equal(
data_frame, pd.DataFrame({"test": [1, 2, 3], "field_1": [1.546301e09, 1.546387e09, 1.546474e09]})
)

@patch("airflow.providers.salesforce.hooks.salesforce.time.time", return_value=1.23)
@patch(
"airflow.providers.salesforce.hooks.salesforce.pd.DataFrame.from_records",
return_value=pd.DataFrame({"test": [1, 2, 3]}),
)
def test_object_to_df_with_record_time(self, mock_data_frame, mock_time):
data_frame = self.salesforce_hook.object_to_df(
query_results=[], record_time_added=True
)

pd.testing.assert_frame_equal(
data_frame,
pd.DataFrame(
{
"test": [1, 2, 3],
"time_fetched_from_salesforce": [
mock_time.return_value,
mock_time.return_value,
mock_time.return_value,
],
}
),
)

0 comments on commit ff342fc

Please sign in to comment.