Skip to content

Commit

Permalink
Merge pull request EGA-archive#117 from EGA-archive/EE-2059
Browse files Browse the repository at this point in the history
EE-2059 Modify how/where temporary downloaded slices are stored
  • Loading branch information
jorizci authored Jul 14, 2021
2 parents 4a349ce + 7f0a079 commit 497523b
Show file tree
Hide file tree
Showing 5 changed files with 140 additions and 100 deletions.
54 changes: 35 additions & 19 deletions pyega3/data_file.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import logging.handlers
import math
import os
import shutil
import sys
import time
import urllib
Expand Down Expand Up @@ -105,9 +106,11 @@ def download_file(self, output_file, num_connections=1):

chunk_len = math.ceil(file_size / num_connections)

temporary_directory = os.path.join(os.path.dirname(output_file), ".tmp_download")

with tqdm(total=int(file_size), unit='B', unit_scale=True) as pbar:
params = [
(output_file, chunk_start_pos, min(chunk_len, file_size - chunk_start_pos), options, pbar)
(temporary_directory, chunk_start_pos, min(chunk_len, file_size - chunk_start_pos), options, pbar)
for chunk_start_pos in range(0, file_size, chunk_len)]

results = []
Expand Down Expand Up @@ -154,7 +157,9 @@ def download_file_slice(self, file_name, start_pos, length, options=None, pbar=N
if options is not None:
path += '?' + urllib.parse.urlencode(options)

file_name += f'-from-{str(start_pos)}-len-{str(length)}.slice'
final_file_name = f'{file_name}-from-{str(start_pos)}-len-{str(length)}.slice'
file_name = final_file_name + '.tmp'

self.temporary_files.add(file_name)

existing_size = os.stat(file_name).st_size if os.path.exists(file_name) else 0
Expand All @@ -166,19 +171,28 @@ def download_file_slice(self, file_name, start_pos, length, options=None, pbar=N
if existing_size == length:
return file_name

with self.data_client.get_stream(path,
{'Range': f'bytes={start_pos + existing_size}-{start_pos + length - 1}'}) as r:
with open(file_name, 'ba') as file_out:
for chunk in r.iter_content(DOWNLOAD_FILE_SLICE_CHUNK_SIZE):
file_out.write(chunk)
if pbar:
pbar.update(len(chunk))
try:
with self.data_client.get_stream(path,
{
'Range': f'bytes={start_pos + existing_size}-{start_pos + length - 1}'}) as r:
with open(file_name, 'ba') as file_out:
for chunk in r.iter_content(DOWNLOAD_FILE_SLICE_CHUNK_SIZE):
file_out.write(chunk)
if pbar:
pbar.update(len(chunk))

total_received = os.path.getsize(file_name)
if total_received != length:
raise Exception(f"Slice error: received={total_received}, requested={length}, file='{file_name}'")

except Exception:
if os.path.exists(file_name):
os.remove(file_name)
raise

total_received = os.path.getsize(file_name)
if total_received != length:
raise Exception(f"Slice error: received={total_received}, requested={length}, file='{file_name}'")
os.rename(file_name, final_file_name)

return file_name
return final_file_name

@staticmethod
def is_genomic_range(genomic_range_args):
Expand Down Expand Up @@ -228,6 +242,10 @@ def download_file_retry(self, num_connections, output_file, genomic_range_args,
if not os.path.exists(dir) and len(dir) > 0:
os.makedirs(dir)

temporary_directory = os.path.join(os.path.dirname(output_file), ".tmp_download")
if not os.path.exists(temporary_directory):
os.makedirs(temporary_directory)

hdd = psutil.disk_usage(os.getcwd())
logging.info(f"Total space : {hdd.total / (2 ** 30):.2f} GiB")
logging.info(f"Used space : {hdd.used / (2 ** 30):.2f} GiB")
Expand Down Expand Up @@ -257,17 +275,15 @@ def download_file_retry(self, num_connections, output_file, genomic_range_args,
logging.exception(e)
if num_retries == max_retries:
if DataFile.temporary_files_should_be_deleted:
self.delete_temporary_files()
self.delete_temporary_folder(temporary_directory)

raise e
time.sleep(retry_wait)
num_retries += 1
logging.info(f"retry attempt {num_retries}")

def delete_temporary_files(self):
def delete_temporary_folder(self, temporary_directory):
try:
for temporary_file in self.temporary_files:
logging.debug(f'Deleting the {temporary_file} temporary file...')
os.remove(temporary_file)
shutil.rmtree(temporary_directory)
except FileNotFoundError as ex:
logging.error(f'Could not delete the temporary file: {ex}')
logging.error(f'Could not delete the temporary folder: {ex}')
49 changes: 19 additions & 30 deletions test/test_delete_temporary_files.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,18 +11,6 @@
expected_file_size = DOWNLOAD_FILE_SLICE_CHUNK_SIZE * 3


def test_deleting_non_existent_file_does_not_raise_exception():

dummy_file = DataFile(None, None)

non_existent_file = '/tmp/non/existent/file'
assert not os.path.exists(non_existent_file)
dummy_file.temporary_files.add(non_existent_file)

# No exception is raised:
dummy_file.delete_temporary_files()


def test_temp_files_are_deleted_automatically_if_there_are_no_exceptions(mock_server_config,
mock_auth_client,
temporary_output_file,
Expand All @@ -41,7 +29,8 @@ def test_temp_files_are_deleted_automatically_if_there_are_no_exceptions(mock_se
input_file = bytearray(os.urandom(file_size_without_iv))
mock_requests.add(responses.GET, f'{mock_server_config.url_api}/files/{test_file_id}', body=input_file, status=200)

file = DataFile(mock_data_client, test_file_id, temporary_output_file, temporary_output_file, file_size_with_iv, 'check_sum')
file = DataFile(mock_data_client, test_file_id, temporary_output_file, temporary_output_file, file_size_with_iv,
'check_sum')

file.download_file_retry(1, temporary_output_file, None, 2, 0.1)

Expand Down Expand Up @@ -82,40 +71,40 @@ def download_with_exception(mock_requests, output_file_path, mock_server_config,
assert not os.path.exists(output_file_path)


def test_temporary_files_are_deleted_if_the_user_says_so(mock_server_config,
def test_temporary_folder_is_deleted_if_the_user_says_so(mock_server_config,
mock_data_client,
temporary_output_file,
mock_requests):
# Given: a file that exist in EGA object store and the user has permissions to access to it
DataFile.temporary_files_should_be_deleted = True

file = DataFile(mock_data_client, test_file_id, temporary_output_file, temporary_output_file, expected_file_size, 'check_sum')
file = DataFile(mock_data_client, test_file_id, temporary_output_file, temporary_output_file, expected_file_size,
'check_sum')

temporary_folder_name = os.path.join(os.path.dirname(temporary_output_file), '.tmp_download')

# When: the user completes downloading a file
download_with_exception(mock_requests, temporary_output_file, mock_server_config, file)

# The temporary file should not exist because the pyega3.TEMPORARY_FILES_SHOULD_BE_DELETED
# variable was set to True previously:
assert not os.path.exists(file.temporary_files.pop())
# Then: the temporary folder and the temporary files are deleted
assert not os.path.exists(temporary_folder_name)


def test_temporary_files_are_not_deleted_if_the_user_says_so(mock_server_config,
def test_temporary_folder_is_not_deleted_if_the_user_says_so(mock_server_config,
mock_data_client,
temporary_output_file,
mock_requests):
# The user asks for keeping the temporary files:

# Given: a file that exist in EGA object store and the user has permissions to access to it
DataFile.temporary_files_should_be_deleted = False

file = DataFile(mock_data_client, test_file_id, temporary_output_file, temporary_output_file, expected_file_size,
'check_sum')

download_with_exception(mock_requests, temporary_output_file, mock_server_config, file)
temporary_folder_name = os.path.join(os.path.dirname(temporary_output_file), '.tmp_download')

temp_file = file.temporary_files.pop()

# The temporary file should exist because the pyega3.TEMPORARY_FILES_SHOULD_BE_DELETED
# variable was set to False previously:
assert os.path.exists(temp_file)

# The download client should have been able to download the whole file:
assert os.stat(temp_file).st_size == expected_file_size - 3 * 1000
# When: he user completes downloading a file
download_with_exception(mock_requests, temporary_output_file, mock_server_config, file)

os.remove(temp_file)
# Then: the temporary folder and the temporary files are NOT deleted
assert os.path.exists(temporary_folder_name)
39 changes: 38 additions & 1 deletion test/test_download_file.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import hashlib
import os
import tempfile
from collections import namedtuple
from unittest import mock

Expand Down Expand Up @@ -41,7 +42,8 @@ def os_rename_mock(s, d):
with mock.patch('os.path.exists', lambda path: os.path.basename(path) in files):
with mock.patch('os.stat', os_stat_mock):
with mock.patch('os.rename', os_rename_mock):
yield files
with mock.patch('shutil.rmtree'):
yield files


def test_download_file(mock_data_server, random_binary_file, mock_writing_files, mock_server_config, mock_data_client):
Expand Down Expand Up @@ -127,3 +129,38 @@ def test_gpg_files_not_supported(mock_data_client):
file = DataFile(mock_data_client, "", "test.gz", "test.gz.gpg", 0, "")

file.download_file_retry(1, output_file=None, genomic_range_args=None, max_retries=5, retry_wait=5)


def test_temporary_chunk_files_stored_in_temp_folder_with_suffix_tmp(mock_data_server, random_binary_file,
mock_server_config,
mock_data_client):
# Given: a file that exist in EGA object store and the user has permissions to access to it
file_id = "EGAF00000000001"
file_name = "resulting.file"
file_md5 = hashlib.md5(random_binary_file).hexdigest()

mock_data_server.file_content[file_id] = random_binary_file

file = DataFile(mock_data_client, file_id, file_name, file_name + ".cip", len(random_binary_file) + 16, file_md5)

# When: the user starts to download a file
output_file = os.path.join(tempfile.gettempdir(), "pyega-download-test", "output_file")
md5_file = output_file + ".md5"
if os.path.exists(output_file):
os.remove(output_file)
if os.path.exists(md5_file):
os.remove(md5_file)

with mock.patch('builtins.open', wraps=open) as wrapped_open:
file.download_file_retry(1, output_file=output_file, genomic_range_args=None, max_retries=5, retry_wait=0)

# Then: The temporary files for the chunks are in the temporary folder and has .tmp as a suffix
temporary_folder = os.path.join(tempfile.gettempdir(), "pyega-download-test", ".tmp_download")
slices_opened = set([call.args[0] for call in wrapped_open.mock_calls if len(call.args) == 2])
slices_opened.remove(output_file)
slices_opened.remove(md5_file)

for slice_file in slices_opened:
assert slice_file.startswith(temporary_folder)
assert slice_file.endswith(".tmp")

52 changes: 48 additions & 4 deletions test/test_download_file_slice.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

import pytest
import requests
import os

import test.conftest as common
from pyega3.data_file import DataFile
Expand All @@ -25,16 +26,17 @@ def mock_write(buf):
written_bytes += buf_len

file_name = common.rand_str()
file_name_for_slice = file_name + '-from-' + str(slice_start) + '-len-' + str(slice_length) + '.slice'
file_name_for_slice = file_name + '-from-' + str(slice_start) + '-len-' + str(slice_length) + '.slice.tmp'

file = DataFile(mock_data_client, file_id)

m_open = mock.mock_open()
with mock.patch("builtins.open", m_open, create=True):
with mock.patch("os.path.getsize", lambda path: written_bytes if path == file_name_for_slice else 0):
m_open().write.side_effect = mock_write
file.download_file_slice(file_name, slice_start, slice_length)
assert slice_length == written_bytes
with mock.patch("os.rename"):
m_open().write.side_effect = mock_write
file.download_file_slice(file_name, slice_start, slice_length)
assert slice_length == written_bytes

m_open.assert_called_with(file_name_for_slice, 'ba')

Expand Down Expand Up @@ -64,3 +66,45 @@ def test_error_when_end_is_negative(mock_data_client):
file = DataFile(mock_data_client, common.rand_str())
with pytest.raises(ValueError):
file.download_file_slice(common.rand_str(), 0, -1)


def test_slice_file_name_removes_tmp_suffix_when_successful(mock_data_server, mock_data_client, random_binary_file):
# Given: a file that exist in EGA object store and the user has permissions to access to it
file_id = "EGAF1234"
mock_data_server.file_content[file_id] = random_binary_file

slice_start = random.randint(0, len(random_binary_file))
slice_length = random.randint(0, len(random_binary_file) - slice_start)

# When: the user successfully downloads a chunk
file_name = common.rand_str()
file = DataFile(mock_data_client, file_id)
file.download_file_slice(file_name, slice_start, slice_length)

# Then: the suffix .tmp is removed from file for the successful chunk
file_name_for_slice = file_name + '-from-' + str(slice_start) + '-len-' + str(slice_length) + '.slice'
assert os.path.exists(file_name_for_slice)
assert not os.path.exists(file_name_for_slice + '.tmp')


def test_chunk_fails_to_download(mock_data_server, mock_data_client, random_binary_file):
# Given: a file that exist in EGA object store and the user has permissions to access to it
file_id = "EGAF1234"
mock_data_server.file_content[file_id] = random_binary_file

slice_start = random.randint(0, len(random_binary_file))
slice_length = len(random_binary_file) + 10

# When: the user unsuccessfully downloads a chunk
file_name = common.rand_str()
file = DataFile(mock_data_client, file_id)
try:
file.download_file_slice(file_name, slice_start, slice_length)
except:
# For the purpose of this test the download should fail
pass

# Then: file for the failed chunk is removed
file_name_for_slice = file_name + '-from-' + str(slice_start) + '-len-' + str(slice_length) + '.slice'
assert not os.path.exists(file_name_for_slice)
assert not os.path.exists(file_name_for_slice + '.tmp')
46 changes: 0 additions & 46 deletions test/test_retry_download.py

This file was deleted.

0 comments on commit 497523b

Please sign in to comment.