forked from facebookresearch/ParlAI
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathtest_build_data.py
152 lines (127 loc) · 5.73 KB
/
test_build_data.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
#!/usr/bin/env python3
# Copyright (c) Facebook, Inc. and its affiliates.
# This source code is licensed under the MIT license found in the
# LICENSE file in the root directory of this source tree.
import os
import pytest
from parlai.core import build_data
import unittest
import unittest.mock
import requests
import parlai.utils.testing as testing_utils
import multiprocessing
from parlai.utils.io import PathManager
from parlai.core.params import ParlaiParser
@pytest.mark.nofbcode
@testing_utils.skipUnlessGPU
class TestBuildData(unittest.TestCase):
"""
Basic tests on the build_data.py download_multiprocess.
"""
dest_filenames = ('mnist0.tar.gz', 'mnist1.tar.gz', 'mnist2.tar.gz')
def setUp(self):
self.datapath = ParlaiParser().parse_args([])['datapath']
self.datapath = os.path.join(self.datapath, 'build_data_pyt_data')
PathManager.mkdirs(self.datapath)
for d in self.dest_filenames:
# Removing files if they are already there b/c otherwise it won't try to download them again
try:
PathManager.rm(os.path.join(self.datapath, d))
except OSError:
pass
def test_download_multiprocess(self):
urls = [
'https://parl.ai/downloads/mnist/mnist.tar.gz',
'https://parl.ai/downloads/mnist/mnist.tar.gz.BAD',
'https://parl.ai/downloads/mnist/mnist.tar.gz.BAD',
]
download_results = build_data.download_multiprocess(
urls, self.datapath, dest_filenames=self.dest_filenames
)
output_filenames, output_statuses, output_errors = zip(*download_results)
self.assertEqual(
output_filenames, self.dest_filenames, 'output filenames not correct'
)
self.assertEqual(
output_statuses, (200, 403, 403), 'output http statuses not correct'
)
def test_download_multiprocess_chunks(self):
# Tests that the three finish downloading but may finish in any order
urls = [
'https://parl.ai/downloads/mnist/mnist.tar.gz',
'https://parl.ai/downloads/mnist/mnist.tar.gz.BAD',
'https://parl.ai/downloads/mnist/mnist.tar.gz.BAD',
]
download_results = build_data.download_multiprocess(
urls, self.datapath, dest_filenames=self.dest_filenames, chunk_size=1
)
output_filenames, output_statuses, output_errors = zip(*download_results)
self.assertIn('mnist0.tar.gz', output_filenames)
self.assertIn('mnist1.tar.gz', output_filenames)
self.assertIn('mnist2.tar.gz', output_filenames)
self.assertIn(200, output_statuses, 'unexpected error code')
self.assertIn(403, output_statuses, 'unexpected error code')
def test_connectionerror_download(self):
with unittest.mock.patch('requests.Session.get') as Session:
Session.side_effect = requests.exceptions.ConnectTimeout
with testing_utils.tempdir() as tmpdir:
with self.assertRaises(RuntimeError):
build_data.download(
'http://test.com/bad', tmpdir, 'foo', num_retries=3
)
assert Session.call_count == 3
class TestUnzip(unittest.TestCase):
def test_ungzip(self):
with testing_utils.tempdir() as tmpdir:
import gzip
fname = os.path.join(tmpdir, "test.txt.gz")
with gzip.GzipFile(fname, mode="w") as f:
f.write("This is a test\n".encode("utf-8"))
build_data.ungzip(tmpdir, "test.txt.gz")
out_fn = os.path.join(tmpdir, "test.txt")
assert os.path.exists(out_fn)
assert not os.path.exists(fname)
with open(out_fn) as f:
assert f.read() == "This is a test\n"
def test_unzip(self):
with testing_utils.tempdir() as tmpdir:
import zipfile
zname = os.path.join(tmpdir, "test.zip")
with zipfile.ZipFile(zname, "w") as zf:
with zf.open("test1.txt", "w") as f:
f.write(b"Test1\n")
with zf.open("test2.txt", "w") as f:
f.write(b"Test2\n")
build_data._unzip(tmpdir, "test.zip")
assert os.path.exists(os.path.join(tmpdir, "test1.txt"))
assert os.path.exists(os.path.join(tmpdir, "test2.txt"))
with open(os.path.join(tmpdir, "test1.txt")) as f:
assert f.read() == "Test1\n"
with open(os.path.join(tmpdir, "test2.txt")) as f:
assert f.read() == "Test2\n"
assert not os.path.exists(zname)
def test_untar(self):
with testing_utils.tempdir() as tmpdir:
import io
import tarfile
zname = os.path.join(tmpdir, "test.tar.gz")
with tarfile.open(zname, "w") as zf:
with io.BytesIO(b"Test1\n") as f:
tarinfo = tarfile.TarInfo("test1.txt")
tarinfo.size = 6
zf.addfile(tarinfo, fileobj=f)
with io.BytesIO(b"Test2\n") as f:
tarinfo = tarfile.TarInfo("test2.txt")
tarinfo.size = 6
zf.addfile(tarinfo, fileobj=f)
build_data._untar(tmpdir, "test.tar.gz")
assert os.path.exists(os.path.join(tmpdir, "test1.txt"))
assert os.path.exists(os.path.join(tmpdir, "test2.txt"))
with open(os.path.join(tmpdir, "test1.txt")) as f:
assert f.read() == "Test1\n"
with open(os.path.join(tmpdir, "test2.txt")) as f:
assert f.read() == "Test2\n"
assert not os.path.exists(zname)
if __name__ == '__main__':
multiprocessing.set_start_method('spawn')
unittest.main()