From f3d9d297bbb81db4d916737e1809c9f8c97131f6 Mon Sep 17 00:00:00 2001 From: Sheng Date: Sat, 25 Aug 2018 06:53:21 +0800 Subject: [PATCH] Updated tests --- tests/test_app.py | 168 +++++++++++++++++++++++++++------------------- tests/utils.py | 5 +- 2 files changed, 102 insertions(+), 71 deletions(-) diff --git a/tests/test_app.py b/tests/test_app.py index 0a5cb9ff..50713155 100644 --- a/tests/test_app.py +++ b/tests/test_app.py @@ -14,6 +14,11 @@ from webssh.settings import get_app_settings, max_body_size from webssh.utils import to_str +try: + from urllib.parse import urlencode +except ImportError: + from urllib import urlencode + handler.DELAY = 0.1 @@ -123,10 +128,102 @@ def test_app_with_correct_credentials_timeout(self): ws = yield tornado.websocket.websocket_connect(ws_url) msg = yield ws.read_message() self.assertIsNone(msg) + self.assertEqual(ws.close_reason, 'Websocket authentication failed.') + + @tornado.testing.gen_test + def test_app_with_correct_credentials_user_robey(self): + url = self.get_url('/') + client = self.get_http_client() + response = yield client.fetch(url) + self.assertEqual(response.code, 200) + + response = yield client.fetch(url, method='POST', body=self.body) + data = json.loads(to_str(response.body)) + self.assertIsNone(data['status']) + self.assertIsNotNone(data['id']) + self.assertIsNotNone(data['encoding']) + + url = url.replace('http', 'ws') + ws_url = url + 'ws?id=' + data['id'] + ws = yield tornado.websocket.websocket_connect(ws_url) + msg = yield ws.read_message() + self.assertEqual(to_str(msg, data['encoding']), banner) + ws.close() + + @tornado.testing.gen_test + def test_app_with_correct_credentials_user_bar(self): + url = self.get_url('/') + client = self.get_http_client() + response = yield client.fetch(url) + self.assertEqual(response.code, 200) + + body = self.body.replace('robey', 'bar') + response = yield client.fetch(url, method='POST', body=body) + data = json.loads(to_str(response.body)) + self.assertIsNone(data['status']) + self.assertIsNotNone(data['id']) + self.assertIsNotNone(data['encoding']) + + url = url.replace('http', 'ws') + ws_url = url + 'ws?id=' + data['id'] + ws = yield tornado.websocket.websocket_connect(ws_url) + msg = yield ws.read_message() + self.assertEqual(to_str(msg, data['encoding']), banner) + + # messages below will be ignored silently + yield ws.write_message('hello') + yield ws.write_message('"hello"') + yield ws.write_message('[hello]') + yield ws.write_message(json.dumps({'resize': []})) + yield ws.write_message(json.dumps({'resize': {}})) + yield ws.write_message(json.dumps({'resize': 'ab'})) + yield ws.write_message(json.dumps({'resize': ['a', 'b']})) + yield ws.write_message(json.dumps({'resize': {'a': 1, 'b': 2}})) + yield ws.write_message(json.dumps({'resize': [100]})) + yield ws.write_message(json.dumps({'resize': [100]*10})) + yield ws.write_message(json.dumps({'resize': [-1, -1]})) + yield ws.write_message(json.dumps({'data': [1]})) + yield ws.write_message(json.dumps({'data': (1,)})) + yield ws.write_message(json.dumps({'data': {'a': 2}})) + yield ws.write_message(json.dumps({'data': 1})) + yield ws.write_message(json.dumps({'data': 2.1})) + yield ws.write_message(json.dumps({'key-non-existed': 'hello'})) + # end - those just for testing webssh websocket stablity + + yield ws.write_message(json.dumps({'resize': [79, 23]})) + msg = yield ws.read_message() + self.assertEqual(b'resized', msg) + + yield ws.write_message(json.dumps({'data': 'bye'})) + msg = yield ws.read_message() + self.assertEqual(b'bye', msg) + ws.close() + + @tornado.testing.gen_test + def test_app_auth_with_valid_pubkey_by_urlencoded_form(self): + url = self.get_url('/') + client = self.get_http_client() + response = yield client.fetch(url) + self.assertEqual(response.code, 200) + + privatekey = read_file(make_tests_data_path('user_rsa_key')) + self.body_dict.update(privatekey=privatekey) + body = urlencode(self.body_dict) + response = yield client.fetch(url, method='POST', body=body) + data = json.loads(to_str(response.body)) + self.assertIsNone(data['status']) + self.assertIsNotNone(data['id']) + self.assertIsNotNone(data['encoding']) + + url = url.replace('http', 'ws') + ws_url = url + 'ws?id=' + data['id'] + ws = yield tornado.websocket.websocket_connect(ws_url) + msg = yield ws.read_message() + self.assertEqual(to_str(msg, data['encoding']), banner) ws.close() @tornado.testing.gen_test - def test_app_auth_with_valid_pubkey_for_user_robey(self): + def test_app_auth_with_valid_pubkey_by_multipart_form(self): url = self.get_url('/') client = self.get_http_client() response = yield client.fetch(url) @@ -237,72 +334,3 @@ def test_app_post_form_with_large_body_size(self): with self.assertRaises(HTTPError): yield client.fetch(url, method='POST', headers=headers, body=body) - - @tornado.testing.gen_test - def test_app_with_correct_credentials_user_robey(self): - url = self.get_url('/') - client = self.get_http_client() - response = yield client.fetch(url) - self.assertEqual(response.code, 200) - - response = yield client.fetch(url, method='POST', body=self.body) - data = json.loads(to_str(response.body)) - self.assertIsNone(data['status']) - self.assertIsNotNone(data['id']) - self.assertIsNotNone(data['encoding']) - - url = url.replace('http', 'ws') - ws_url = url + 'ws?id=' + data['id'] - ws = yield tornado.websocket.websocket_connect(ws_url) - msg = yield ws.read_message() - self.assertEqual(to_str(msg, data['encoding']), banner) - ws.close() - - @tornado.testing.gen_test - def test_app_with_correct_credentials_user_bar(self): - url = self.get_url('/') - client = self.get_http_client() - response = yield client.fetch(url) - self.assertEqual(response.code, 200) - - body = self.body.replace('robey', 'bar') - response = yield client.fetch(url, method='POST', body=body) - data = json.loads(to_str(response.body)) - self.assertIsNone(data['status']) - self.assertIsNotNone(data['id']) - self.assertIsNotNone(data['encoding']) - - url = url.replace('http', 'ws') - ws_url = url + 'ws?id=' + data['id'] - ws = yield tornado.websocket.websocket_connect(ws_url) - msg = yield ws.read_message() - self.assertEqual(to_str(msg, data['encoding']), banner) - - # messages below will be ignored silently - yield ws.write_message('hello') - yield ws.write_message('"hello"') - yield ws.write_message('[hello]') - yield ws.write_message(json.dumps({'resize': []})) - yield ws.write_message(json.dumps({'resize': {}})) - yield ws.write_message(json.dumps({'resize': 'ab'})) - yield ws.write_message(json.dumps({'resize': ['a', 'b']})) - yield ws.write_message(json.dumps({'resize': {'a': 1, 'b': 2}})) - yield ws.write_message(json.dumps({'resize': [100]})) - yield ws.write_message(json.dumps({'resize': [100]*10})) - yield ws.write_message(json.dumps({'resize': [-1, -1]})) - yield ws.write_message(json.dumps({'data': [1]})) - yield ws.write_message(json.dumps({'data': (1,)})) - yield ws.write_message(json.dumps({'data': {'a': 2}})) - yield ws.write_message(json.dumps({'data': 1})) - yield ws.write_message(json.dumps({'data': 2.1})) - yield ws.write_message(json.dumps({'key-non-existed': 'hello'})) - # end - those just for testing webssh websocket stablity - - yield ws.write_message(json.dumps({'resize': [79, 23]})) - msg = yield ws.read_message() - self.assertEqual(b'resized', msg) - - yield ws.write_message(json.dumps({'data': 'bye'})) - msg = yield ws.read_message() - self.assertEqual(b'bye', msg) - ws.close() diff --git a/tests/utils.py b/tests/utils.py index 1436fa7a..5b384a2b 100644 --- a/tests/utils.py +++ b/tests/utils.py @@ -41,7 +41,10 @@ def get_content_type(filename): def read_file(path, encoding='utf-8'): - return open(path, 'rb').read().decode(encoding) + data = open(path, 'rb').read() + if encoding is None: + return data + return data.decode(encoding) def make_tests_data_path(filename):