Skip to content

Commit

Permalink
Merge pull request EGA-archive#114 from EGA-archive/EE-1423
Browse files Browse the repository at this point in the history
EE-1423 Refactor pyega3 to make it OO
  • Loading branch information
jorizci authored Jul 5, 2021
2 parents d9a3146 + 1f62d42 commit 2431d63
Show file tree
Hide file tree
Showing 21 changed files with 1,053 additions and 911 deletions.
3 changes: 3 additions & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -5,3 +5,6 @@ responses==0.9.0
tqdm==4.19.6
htsget==0.2.5
urllib3==1.24.2

pytest~=6.0.1
setuptools~=54.2.0
113 changes: 113 additions & 0 deletions test/conftest.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
import contextlib
import os
import random
import string
import tempfile
from collections import namedtuple
from unittest import mock

import pytest
import responses
from psutil import virtual_memory

import pyega3.pyega3 as pyega3
from test.mock_data_server import MockDataServer


def rand_str():
length = random.randint(1, 127)
return ''.join(random.choices(string.ascii_letters + string.digits, k=length))


@pytest.fixture(autouse=True)
def reset_pyega_api_url():
pyega3.URL_API = ''


@pytest.fixture(autouse=True)
def reset_pyega_global_variables():
pyega3.URL_AUTH = ''
pyega3.URL_API_TICKET = ''
pyega3.CLIENT_SECRET = ''
pyega3.TEMPORARY_FILES = set()
pyega3.TEMPORARY_FILES_SHOULD_BE_DELETED = False


@pytest.fixture
def mock_requests():
with responses.RequestsMock() as rsps:
yield rsps


@pytest.fixture
def random_binary_file():
mem = virtual_memory().available
file_length = random.randint(1, mem // 512)
return os.urandom(file_length)


@pytest.fixture
def temporary_output_file():
"""Returns a file-path to a random, temporary file-name."""
_, output_file_path = tempfile.mkstemp()
os.remove(output_file_path)
return output_file_path


@pytest.fixture
def mock_data_server(mock_requests, reset_pyega_api_url):
server = MockDataServer(mock_requests)
pyega3.URL_API = server.url
return server


@pytest.fixture
def mock_input_file():
@contextlib.contextmanager
def make_mock_input_file(contents):
file_name = rand_str()
with mock.patch('os.path.exists', lambda p: p == file_name):
with mock.patch('builtins.open', mock.mock_open(read_data=contents)):
yield file_name

return make_mock_input_file


@pytest.fixture
def empty_dataset(mock_data_server):
file_id = "EGAD00000000001"
mock_data_server.dataset_files[file_id] = None
Dataset = namedtuple('Dataset', ['id'])
return Dataset(file_id)


@pytest.fixture
def dataset_with_files(mock_data_server, empty_dataset):
files = [
{
"unencryptedChecksum": "3b89b96387db5199fef6ba613f70e27c",
"datasetId": empty_dataset.id,
"fileStatus": "available",
"fileId": "EGAF00000000001",
"checksumType": "MD5",
"fileSize": 4804928,
"fileName": "EGAZ00000000001/ENCFF000001.bam",
"displayFileName": "ENCFF000001.bam"
},
{
"unencryptedChecksum": "b8ae14d5d1f717ab17d45e8fc36946a0",
"datasetId": empty_dataset.id,
"fileStatus": "available",
"fileId": "EGAF00000000002",
"checksumType": "MD5",
"fileSize": 5991400,
"fileName": "EGAZ00000000002/ENCFF000002.bam",
"displayFileName": "ENCFF000002.bam"
}]

for file in files:
mock_data_server.files[file.get("fileId")] = file
mock_data_server.dataset_files = {empty_dataset.id: [file.get("fileId") for file in files]}

Dataset = namedtuple("DatasetWithFiles", ["id", "files"])
return Dataset(empty_dataset.id, files)
101 changes: 101 additions & 0 deletions test/mock_data_server.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
import json
import random
import re
import string
import urllib

import responses


class MockDataServer:
url = "https://mock.metadata.server"
token = ''.join(random.choices(string.ascii_letters, k=64))

dataset_files = {}
files = {}
file_content = {}

def __init__(self, mock_requests):
mock_requests.add_callback(
responses.GET,
self.url + "/metadata/datasets",
callback=self.list_datasets_callback,
content_type='application/json',
)

mock_requests.add_callback(
responses.GET,
re.compile(self.url + "/metadata/datasets/.*/files"),
callback=self.list_files_callback,
content_type='application/json'
)

mock_requests.add_callback(
responses.GET,
re.compile(self.url + "/metadata/files/.*"),
callback=self.get_file_metadata_callback,
content_type='application/json'
)

mock_requests.add_callback(
responses.GET,
re.compile(self.url + "/files/.*"),
callback=self.download_file_callback
)

mock_requests.assert_all_requests_are_fired = False

def check_auth_header(self, request):
auth_hdr = request.headers['Authorization']
return auth_hdr is not None and auth_hdr == 'Bearer ' + self.token

def list_datasets_callback(self, request):
if not self.check_auth_header(request):
return 400, {}, json.dumps({"error_description": "invalid token"})

return 200, {}, json.dumps(self.all_datasets)

def list_files_callback(self, request):
if not self.check_auth_header(request):
return 400, {}, json.dumps({"error_description": "invalid token"})

dataset_id = request.path_url.split('/')[-2]
files = None
if self.dataset_files[dataset_id] is not None:
files = [self.files[file_id] for file_id in sorted(self.dataset_files[dataset_id])]

return 200, {}, json.dumps(files)

def get_file_metadata_callback(self, request):
if not self.check_auth_header(request):
return 400, {}, json.dumps({"error_description": "invalid token"})

try:
file_id = urllib.parse.urlsplit(request.url).path.split('/')[-1]
return 200, {}, json.dumps(self.files[file_id])
except KeyError:
return 404, {}, None

@staticmethod
def parse_ranges(s):
return tuple(map(int, re.match(r'^bytes=(\d+)-(\d+)$', s).groups()))

def download_file_callback(self, request):
if not self.check_auth_header(request):
return 400, {}, json.dumps({"error_description": "invalid token"})

file_id = urllib.parse.urlsplit(request.url).path.split('/')[-1]
file_content = self.file_content[file_id]

if request.headers['Range'] is not None:
start, end = self.parse_ranges(request.headers['Range'])
assert start < end
return 200, {}, file_content[start:end + 1]

return 200, {}, file_content

@property
def all_datasets(self):
if self.dataset_files is None:
return None
return list(sorted(self.dataset_files.keys()))
12 changes: 12 additions & 0 deletions test/test_client_ip.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
import responses

import pyega3.pyega3 as pyega3


def test_when_ipinfo_is_blocked_return_unknown(mock_requests):
endpoint = 'https://ipinfo.io/json'
mock_requests.add(responses.GET, endpoint, status=403)

resp_ip = pyega3.get_client_ip()

assert resp_ip == 'Unknown'
55 changes: 55 additions & 0 deletions test/test_credentials.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
import json
from unittest import mock

import pytest

import pyega3.pyega3 as pyega3
import test.conftest as common


def test_load_username_and_password_from_credentials_file(mock_input_file):
username = common.rand_str()
password = common.rand_str()

with mock_input_file(json.dumps({"username": username, "password": password})) as credentials_file:
result = pyega3.load_credential(credentials_file)
assert len(result) == 3
assert result[0] == username
assert result[1] == password


def test_when_credentials_file_has_no_password_ask_user_for_it(mock_input_file):
username = common.rand_str()
password = common.rand_str()

with mock_input_file(json.dumps({"username": username})) as credentials_file, mock.patch("getpass.getpass",
return_value=password):
result = pyega3.load_credential(credentials_file)
assert len(result) == 3
assert result[0] == username
assert result[1] == password


def test_error_when_credentials_file_is_bad_json(mock_input_file):
with mock_input_file("bad json") as credentials_file:
with pytest.raises(SystemExit):
pyega3.load_credential(credentials_file)


def test_get_credential_prompts_user_for_username_and_password():
username = common.rand_str()
password = common.rand_str()

with mock.patch('builtins.input', return_value=username):
with mock.patch('getpass.getpass', return_value=password):
assert pyega3.get_credential() == (username, password, None)


def test_load_credential_prompts_user_for_credentials_if_credentials_file_does_not_exist():
username = common.rand_str()
password = common.rand_str()

with mock.patch('builtins.input', return_value=username):
with mock.patch('getpass.getpass', return_value=password):
with mock.patch('os.path.exists', return_value=False):
assert pyega3.load_credential("unknownfile.txt") == (username, password, None)
Loading

0 comments on commit 2431d63

Please sign in to comment.