Skip to content

Commit

Permalink
TDL-6335: Added support for returning files in sorted manner (#29)
Browse files Browse the repository at this point in the history
* TDL-6335: Added support for returning files in sorted manner

* changed order of returning files

* Added integration test for verifying bookmark

* removed unused code

* updated config.yml file

* updated code of getting last modified date

* add code for sorting file names in summary

* TDL-TDL-13851: Best Practices (upgraded versions of dependencies)

* added negative unit test case

* removed sleep from the tests
  • Loading branch information
hpatel41 authored Jul 2, 2021
1 parent 4daa8bc commit 1c4007d
Show file tree
Hide file tree
Showing 9 changed files with 253 additions and 36 deletions.
2 changes: 1 addition & 1 deletion .circleci/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -50,4 +50,4 @@ workflows:
- master
jobs:
- build:
context: circleci-user
context: circleci-user
4 changes: 2 additions & 2 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,10 @@
classifiers=["Programming Language :: Python :: 3 :: Only"],
py_modules=["tap_sftp"],
install_requires=[
"singer-python==5.9.0",
"singer-python==5.12.1",
'paramiko==2.6.0',
'backoff==1.8.0',
'singer-encodings==0.0.8',
'singer-encodings==0.1.0',
'terminaltables==3.1.0',
],
extras_require={
Expand Down
4 changes: 3 additions & 1 deletion tap_sftp/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,9 @@ def do_sync(config, catalog, state):
rows = []

for table_name, table_data in STATS.items():
for filepath, file_data in table_data['files'].items():
# sort STATS data in order of "last_modified" for summary as python3.5 re-arranges the data
sorted_data = sorted(table_data["files"].items(), key=lambda a: a[1]["last_modified"])
for filepath, file_data in sorted_data:
rows.append([table_name,
table_data['search_prefix'],
table_data['search_pattern'],
Expand Down
4 changes: 3 additions & 1 deletion tap_sftp/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,9 @@ def get_files(self, prefix, search_pattern, modified_since=None):
if modified_since is not None:
matching_files = [f for f in matching_files if f["last_modified"] > modified_since]

return matching_files
# sort files in increasing order of "last_modified"
sorted_files = sorted(matching_files, key = lambda x: (x['last_modified']).timestamp())
return sorted_files

def get_file_handle(self, f):
""" Takes a file dict {"filepath": "...", "last_modified": "..."} and returns a handle to the file. """
Expand Down
10 changes: 6 additions & 4 deletions tests/base.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from tap_tester.scenario import (SCENARIOS)
import time
import tap_tester.connections as connections
import tap_tester.menagerie as menagerie
import tap_tester.runner as runner
Expand Down Expand Up @@ -129,7 +129,7 @@ def get_type(self):
def get_credentials(self):
return {'password': os.getenv('TAP_SFTP_PASSWORD')}

def append_to_files(self):
def append_to_files(self, files_list=None):
root_dir = os.getenv('TAP_SFTP_ROOT_DIR')

with self.get_test_connection() as client:
Expand All @@ -139,11 +139,13 @@ def append_to_files(self):
file_group = self.get_files()[0]
headers = file_group['headers']
directory = file_group['directory']
for filename in file_group['files']:
for filename in file_group['files'] if files_list is None else files_list:
client.chdir(directory)
with client.open(filename, 'a') as f:
writer = csv.writer(f)
lines = file_group['generator'](10)
# appending greater number of rows as it takes some time
# so we can test modified date
lines = file_group['generator'](1500 if files_list else 10)
writer.writerows(lines)
client.chdir('..')

Expand Down
150 changes: 150 additions & 0 deletions tests/test_sftp_sorted_files.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,150 @@
import time
from base import TestSFTPBase
import tap_tester.connections as connections
import tap_tester.menagerie as menagerie
import tap_tester.runner as runner
import os
import csv
import json
import datetime

RECORD_COUNT = {}

class TestSFTPOrderedFiles(TestSFTPBase):

def name(self):
return "tap_tester_sftp_ordered_files"

# get file files details to add data
def get_files(self):
return [
{
"headers": ['id', 'string_col', 'integer_col'],
"directory": "folderA",
"files": ["table_1_file.csv", "table_2_file.csv", "table_3_file.csv", "table_4_file.csv", "table_5_file.csv"],
# adding greater number of rows as it takes some time
# so we can test modified date
"num_rows": 1500,
"generator": self.generate_simple_csv_lines_typeA
}
]

def setUp(self):
if not all([x for x in [os.getenv('TAP_SFTP_USERNAME'),
os.getenv('TAP_SFTP_PASSWORD'),
os.getenv('TAP_SFTP_ROOT_DIR')]]):
#pylint: disable=line-too-long
raise Exception("set TAP_SFTP_USERNAME, TAP_SFTP_PASSWORD, TAP_SFTP_ROOT_DIR")

root_dir = os.getenv('TAP_SFTP_ROOT_DIR')

with self.get_test_connection() as client:
# drop all csv files in root dir
client.chdir(root_dir)
try:
TestSFTPOrderedFiles.rm('tap_tester', client)
except FileNotFoundError:
pass
client.mkdir('tap_tester')
client.chdir('tap_tester')

# Add subdirectories
file_info = self.get_files()
for entry in file_info:
client.mkdir(entry['directory'])

# Add csv files
for file_group in file_info:
headers = file_group['headers']
directory = file_group['directory']
for filename in file_group['files']:
client.chdir(directory)
with client.open(filename, 'w') as f:
writer = csv.writer(f)
lines = [headers] + file_group['generator'](file_group['num_rows'])
writer.writerows(lines)
client.chdir('..')

def expected_check_streams(self):
return {
'table'
}

def expected_pks(self):
return {
'table': {'id'}
}

def expected_sync_row_counts(self):
return {
'table': 12000
}

def expected_first_sync_streams(self):
return {
'table'
}

def get_properties(self, original: bool = True):
props = self.get_common_properties()
props['tables'] = json.dumps([
{
"table_name": "table",
"delimiter": ",",
"search_prefix": os.getenv("TAP_SFTP_ROOT_DIR") + "/tap_tester",
"search_pattern": "table.*csv",
"key_properties": ['id']
}
])
if original:
return props

props["start_date"] = self.START_DATE
return props

# returns the last modified date of all the files present in the folder
def get_last_modified(self):
root_dir = os.getenv('TAP_SFTP_ROOT_DIR')

with self.get_test_connection() as client:
client.chdir(root_dir + '/tap_tester/folderA')
files = client.listdir_attr('.')
last_modified = []
for file in files:
last_modified.append(datetime.datetime.fromtimestamp(file.st_mtime))

return last_modified

def test_run(self):

# append some data to particular files to test the modified date
self.append_to_files(["table_1_file.csv", "table_3_file.csv", "table_4_file.csv"])

# sync
conn_id = connections.ensure_connection(self)

found_catalogs = self.run_and_verify_check_mode(conn_id)

self.perform_and_verify_table_and_field_selection(conn_id,found_catalogs)

record_count_by_stream = self.run_and_verify_sync(conn_id)

state = menagerie.get_state(conn_id)

# checking if we got any data from sync
self.assertGreater(sum(record_count_by_stream.values()), 0)

# checking if data after sync is as expected
for tap_stream_id in self.expected_first_sync_streams():
self.assertEqual(self.expected_sync_row_counts()[tap_stream_id],
record_count_by_stream[tap_stream_id])

# getting maximum of last mofified dates from all files
max_date = max(self.get_last_modified()).replace(microsecond = 0)
expected_date = max_date.timestamp()

# getting bookmark
actual_date = datetime.datetime.fromisoformat(state['bookmarks']['table']['modified_since']).timestamp()

# checking if maximum last modified date is set as bookmark
self.assertEqual(int(expected_date), int(actual_date))
4 changes: 0 additions & 4 deletions tests/test_sftp_start_date_multiple_stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,6 @@
import tap_tester.menagerie as menagerie
import tap_tester.runner as runner
import os
import time
from datetime import datetime as dt
import csv
import json

Expand Down Expand Up @@ -130,8 +128,6 @@ def test_run(self):
# getting state
state = menagerie.get_state(conn_id)

time.sleep(60)

# creating file "table_1_fileB"
with self.get_test_connection() as client:
root_dir = os.getenv('TAP_SFTP_ROOT_DIR')
Expand Down
23 changes: 0 additions & 23 deletions tests/test_sftp_start_date_one_stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,9 @@
import tap_tester.menagerie as menagerie
import tap_tester.runner as runner
import os
import time
import csv
import json
from datetime import datetime as dt
from datetime import timedelta

RECORD_COUNT = {}

Expand Down Expand Up @@ -98,23 +96,6 @@ def get_properties(self, original: bool = True):
props["start_date"] = self.START_DATE
return props

def append_to_files(self):
root_dir = os.getenv('TAP_SFTP_ROOT_DIR')

with self.get_test_connection() as client:
client.chdir(root_dir + '/tap_tester')

# Append stuff to a subset of the files
file_group = self.get_files()[0]
directory = file_group['directory']
for filename in file_group['files']:
client.chdir(directory)
with client.open(filename, 'a') as f:
writer = csv.writer(f)
lines = file_group['generator'](10)
writer.writerows(lines)
client.chdir('..')

def test_run(self):
self.file_modified_test()
self.file_not_modified_test()
Expand All @@ -137,8 +118,6 @@ def file_modified_test(self):
# changing start date to "utcnow"
self.START_DATE = dt.strftime(dt.utcnow(), "%Y-%m-%dT00:00:00Z")

time.sleep(60)

# adding some data to the file
self.append_to_files()

Expand Down Expand Up @@ -197,8 +176,6 @@ def file_not_modified_test(self):
# changing start date to "utcnow"
self.START_DATE = dt.strftime(dt.utcnow(), "%Y-%m-%dT00:00:00Z")

time.sleep(60)

# sync 2
conn_id_2 = connections.ensure_connection(self, original_properties = False)

Expand Down
88 changes: 88 additions & 0 deletions tests/unittests/test_sorted_files.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
from time import time
import unittest
from unittest import mock
import tap_sftp.client as client
from datetime import datetime
import time
import pytz
import singer

@mock.patch("tap_sftp.client.SFTPConnection.get_files_by_prefix")
@mock.patch("tap_sftp.client.SFTPConnection.get_files_matching_pattern")
class TestSortedFiles(unittest.TestCase):

def test_sorted_files(self, mocked_matching_files, mocked_all_files):
conn = client.SFTPConnection("10.0.0.1", "username", port="22")

files_list = [
{
"filepath": "/root/file1.csv",
"last_modified": datetime.utcfromtimestamp(time.time() - 10).replace(tzinfo=pytz.UTC)
},
{
"filepath": "/root/file2.csv",
"last_modified": datetime.utcfromtimestamp(time.time() - 4).replace(tzinfo=pytz.UTC)
},
{
"filepath": "/root/file3.csv",
"last_modified": datetime.utcfromtimestamp(time.time() - 8).replace(tzinfo=pytz.UTC)
},
{
"filepath": "/root/file4.csv",
"last_modified": datetime.utcfromtimestamp(time.time()).replace(tzinfo=pytz.UTC)
},
{
"filepath": "/root/file.txt",
"last_modified": datetime.utcfromtimestamp(time.time() - 2).replace(tzinfo=pytz.UTC)
},
{
"filepath": "/root/file5.json",
"last_modified": datetime.utcfromtimestamp(time.time() - 3).replace(tzinfo=pytz.UTC)
}]

mocked_all_files.return_value = files_list

mocked_matching_files.return_value = files_list[:4]

files = conn.get_files("/root", "file.*.csv")

# expected files in increasing order of "last_modified"
expected_files_list = ["/root/file1.csv", "/root/file3.csv", "/root/file2.csv", "/root/file4.csv"]
actual_files_list = [f["filepath"] for f in files]

self.assertEquals(expected_files_list, actual_files_list)

def test_sorted_files_negative(self, mocked_matching_files, mocked_all_files):
conn = client.SFTPConnection("10.0.0.1", "username", port="22")

files_list = [
{
"filepath": "/root/file1.csv",
"last_modified": datetime.utcfromtimestamp(time.time() - 3).replace(tzinfo=pytz.UTC)
},
{
"filepath": "/root/file2.csv",
"last_modified": datetime.utcfromtimestamp(time.time()).replace(tzinfo=pytz.UTC)
},
{
"filepath": "/root/file.txt",
"last_modified": datetime.utcfromtimestamp(time.time() - 2).replace(tzinfo=pytz.UTC)
},
{
"filepath": "/root/file3.json",
"last_modified": datetime.utcfromtimestamp(time.time() - 5).replace(tzinfo=pytz.UTC)
}]

mocked_all_files.return_value = files_list

mocked_matching_files.return_value = files_list[:2]

# setting "modified_since" to now
modified_since = singer.utils.strptime_to_utc(datetime.utcnow().replace(tzinfo=pytz.UTC).isoformat())
files = conn.get_files("/root", "file.*.csv", modified_since)

# as all the modified date is lesser than "modified_since" thus, no files will be returned
expected_files_list = []
actual_files_list = [f["filepath"] for f in files]

self.assertEquals(expected_files_list, actual_files_list)

0 comments on commit 1c4007d

Please sign in to comment.