Skip to content

Commit

Permalink
blacken
Browse files Browse the repository at this point in the history
  • Loading branch information
singingwolfboy authored and dashea committed Apr 25, 2020
1 parent d4ea93d commit f0a7d61
Show file tree
Hide file tree
Showing 2 changed files with 74 additions and 31 deletions.
16 changes: 15 additions & 1 deletion .github/workflows/test.yml
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
name: Test
name: CI
on: [push, pull_request]

jobs:
Expand Down Expand Up @@ -27,3 +27,17 @@ jobs:
with:
file: ./coverage.xml
fail_ci_if_error: false

black:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v2
- name: Setup python
uses: actions/setup-python@v1
with:
python-version: "3.x"
- name: Install black
run: pip install black

- name: Run black
run: black --check .
89 changes: 59 additions & 30 deletions tests/test_requests_file.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import shutil
import platform


class FileRequestTestCase(unittest.TestCase):
def setUp(self):
self._session = requests.Session()
Expand All @@ -19,11 +20,11 @@ def _pathToURL(self, path):
# Split the path on the os spearator and recombine it with / as the
# separator. There probably aren't any OS's that allow / as a path
# component, but just in case, encode any remaining /'s.
urlsplit = (part.replace('/', '%2F') for part in urlpath.split(os.sep))
urlsplit = (part.replace("/", "%2F") for part in urlpath.split(os.sep))
urlpath = "/".join(urlsplit)

# Encode /'s in the drive for the imaginary case where that can be a thing
urldrive = urldrive.replace('/', '%2F')
urldrive = urldrive.replace("/", "%2F")

# Add the leading /. If there is a drive component, this needs to be
# placed before the drive.
Expand All @@ -35,10 +36,12 @@ def test_fetch_regular(self):
# Fetch this file using requests
with open(__file__, "rb") as f:
testdata = f.read()
response = self._session.get("file://%s" % self._pathToURL(os.path.abspath(__file__)))
response = self._session.get(
"file://%s" % self._pathToURL(os.path.abspath(__file__))
)

self.assertEqual(response.status_code, requests.codes.ok)
self.assertEqual(response.headers['Content-Length'], len(testdata))
self.assertEqual(response.headers["Content-Length"], len(testdata))
self.assertEqual(response.content, testdata)

response.close()
Expand All @@ -50,92 +53,114 @@ def test_fetch_missing(self):
self.assertTrue(response.text)
response.close()

@unittest.skipIf(hasattr(os, "geteuid") and os.geteuid() == 0,
"Skipping permissions test since running as root")
@unittest.skipIf(
hasattr(os, "geteuid") and os.geteuid() == 0,
"Skipping permissions test since running as root",
)
def test_fetch_no_access(self):
# Create a file and remove read permissions, try to get a 403
# probably doesn't work on windows
with tempfile.NamedTemporaryFile() as tmp:
os.chmod(tmp.name, 0)
response = self._session.get("file://%s" % self._pathToURL(os.path.abspath(tmp.name)))
response = self._session.get(
"file://%s" % self._pathToURL(os.path.abspath(tmp.name))
)

self.assertEqual(response.status_code, requests.codes.forbidden)
self.assertTrue(response.text)

response.close()

@unittest.skipIf(platform.system() == "Windows",
"skipping locale test on windows")
@unittest.skipIf(platform.system() == "Windows", "skipping locale test on windows")
def test_fetch_missing_localized(self):
# Make sure translated error messages don't cause any problems
import locale

saved_locale = locale.setlocale(locale.LC_MESSAGES, None)
try:
locale.setlocale(locale.LC_MESSAGES, 'ru_RU.UTF-8')
locale.setlocale(locale.LC_MESSAGES, "ru_RU.UTF-8")
response = self._session.get("file:///no/such/path")
self.assertEqual(response.status_code, requests.codes.not_found)
self.assertTrue(response.text)
response.close()
except locale.Error:
unittest.SkipTest('ru_RU.UTF-8 locale not available')
unittest.SkipTest("ru_RU.UTF-8 locale not available")
finally:
locale.setlocale(locale.LC_MESSAGES, saved_locale)

def test_head(self):
# Check that HEAD returns the content-length
testlen = os.stat(__file__).st_size
response = self._session.head("file://%s" % self._pathToURL(os.path.abspath(__file__)))
response = self._session.head(
"file://%s" % self._pathToURL(os.path.abspath(__file__))
)

self.assertEqual(response.status_code, requests.codes.ok)
self.assertEqual(response.headers['Content-Length'], testlen)
self.assertEqual(response.headers["Content-Length"], testlen)

response.close()

def test_fetch_post(self):
# Make sure that non-GET methods are rejected
self.assertRaises(ValueError, self._session.post,
("file://%s" % self._pathToURL(os.path.abspath(__file__))))
self.assertRaises(
ValueError,
self._session.post,
("file://%s" % self._pathToURL(os.path.abspath(__file__))),
)

def test_fetch_nonlocal(self):
# Make sure that network locations are rejected
self.assertRaises(ValueError, self._session.get,
("file://example.com%s" % self._pathToURL(os.path.abspath(__file__))))
self.assertRaises(ValueError, self._session.get,
("file://localhost:8080%s" % self._pathToURL(os.path.abspath(__file__))))
self.assertRaises(
ValueError,
self._session.get,
("file://example.com%s" % self._pathToURL(os.path.abspath(__file__))),
)
self.assertRaises(
ValueError,
self._session.get,
("file://localhost:8080%s" % self._pathToURL(os.path.abspath(__file__))),
)

# localhost is ok, though
with open(__file__, "rb") as f:
testdata = f.read()
response = self._session.get("file://localhost%s" % self._pathToURL(os.path.abspath(__file__)))
response = self._session.get(
"file://localhost%s" % self._pathToURL(os.path.abspath(__file__))
)
self.assertEqual(response.status_code, requests.codes.ok)
self.assertEqual(response.content, testdata)
response.close()

def test_funny_names(self):
testdata = 'yo wassup man\n'.encode('ascii')
testdata = "yo wassup man\n".encode("ascii")
tmpdir = tempfile.mkdtemp()

try:
with open(os.path.join(tmpdir, 'spa ces'), "w+b") as space_file:
with open(os.path.join(tmpdir, "spa ces"), "w+b") as space_file:
space_file.write(testdata)
space_file.flush()
response = self._session.get("file://%s/spa%%20ces" % self._pathToURL(tmpdir))
response = self._session.get(
"file://%s/spa%%20ces" % self._pathToURL(tmpdir)
)
self.assertEqual(response.status_code, requests.codes.ok)
self.assertEqual(response.content, testdata)
response.close()

with open(os.path.join(tmpdir, 'per%cent'), "w+b") as percent_file:
with open(os.path.join(tmpdir, "per%cent"), "w+b") as percent_file:
percent_file.write(testdata)
percent_file.flush()
response = self._session.get("file://%s/per%%25cent" % self._pathToURL(tmpdir))
response = self._session.get(
"file://%s/per%%25cent" % self._pathToURL(tmpdir)
)
self.assertEqual(response.status_code, requests.codes.ok)
self.assertEqual(response.content, testdata)
response.close()

# percent-encoded directory separators should be rejected
with open(os.path.join(tmpdir, 'badname'), "w+b") as bad_file:
response = self._session.get("file://%s%%%Xbadname" % (self._pathToURL(tmpdir), ord(os.sep)))
with open(os.path.join(tmpdir, "badname"), "w+b") as bad_file:
response = self._session.get(
"file://%s%%%Xbadname" % (self._pathToURL(tmpdir), ord(os.sep))
)
self.assertEqual(response.status_code, requests.codes.not_found)
response.close()

Expand All @@ -144,7 +169,9 @@ def test_funny_names(self):

def test_close(self):
# Open a request for this file
response = self._session.get("file://%s" % self._pathToURL(os.path.abspath(__file__)))
response = self._session.get(
"file://%s" % self._pathToURL(os.path.abspath(__file__))
)

# Try closing it
response.close()
Expand All @@ -161,8 +188,10 @@ def test_windows_legacy(self):
testdata = f.read()

drive, path = os.path.splitdrive(os.path.abspath(__file__))
response = self._session.get("file:///%s|%s" % (drive[:-1], path.replace(os.sep, "/")))
response = self._session.get(
"file:///%s|%s" % (drive[:-1], path.replace(os.sep, "/"))
)
self.assertEqual(response.status_code, requests.codes.ok)
self.assertEqual(response.headers['Content-Length'], len(testdata))
self.assertEqual(response.headers["Content-Length"], len(testdata))
self.assertEqual(response.content, testdata)
response.close()

0 comments on commit f0a7d61

Please sign in to comment.