diff --git a/kubernetes/client/api_client.py b/kubernetes/client/api_client.py index d553aef6f8..c198b15d5a 100644 --- a/kubernetes/client/api_client.py +++ b/kubernetes/client/api_client.py @@ -21,6 +21,7 @@ from __future__ import absolute_import from . import models +from . import ws_client from .rest import RESTClientObject from .rest import ApiException @@ -343,6 +344,15 @@ def request(self, method, url, query_params=None, headers=None, """ Makes the HTTP request using RESTClient. """ + # FIXME(dims) : We need a better way to figure out which + # calls end up using web sockets + if url.endswith('/exec') and method == "GET": + return ws_client.GET(self.config, + url, + query_params=query_params, + _request_timeout=_request_timeout, + headers=headers) + if method == "GET": return self.rest_client.GET(url, query_params=query_params, diff --git a/kubernetes/client/ws_client.py b/kubernetes/client/ws_client.py new file mode 100644 index 0000000000..b143400bee --- /dev/null +++ b/kubernetes/client/ws_client.py @@ -0,0 +1,114 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +from .rest import ApiException + +import certifi +import collections +import websocket +import six +import ssl +from six.moves.urllib.parse import urlencode +from six.moves.urllib.parse import quote_plus + + +class WSClient: + def __init__(self, configuration, url, headers): + self.messages = [] + self.errors = [] + websocket.enableTrace(False) + header = None + + # We just need to pass the Authorization, ignore all the other + # http headers we get from the generated code + if 'Authorization' in headers: + header = "Authorization: %s" % headers['Authorization'] + + self.ws = websocket.WebSocketApp(url, + on_message=self.on_message, + on_error=self.on_error, + on_close=self.on_close, + header=[header] if header else None) + self.ws.on_open = self.on_open + + if url.startswith('wss://') and configuration.verify_ssl: + ssl_opts = { + 'cert_reqs': ssl.CERT_REQUIRED, + 'keyfile': configuration.key_file, + 'certfile': configuration.cert_file, + 'ca_certs': configuration.ssl_ca_cert or certifi.where(), + } + if configuration.assert_hostname is not None: + ssl_opts['check_hostname'] = configuration.assert_hostname + else: + ssl_opts = {'cert_reqs': ssl.CERT_NONE} + + self.ws.run_forever(sslopt=ssl_opts) + + def on_message(self, ws, message): + if message[0] == '\x01': + message = message[1:] + if message: + if six.PY3 and isinstance(message, six.binary_type): + message = message.decode('utf-8') + self.messages.append(message) + + def on_error(self, ws, error): + self.errors.append(error) + + def on_close(self, ws): + pass + + def on_open(self, ws): + pass + + +WSResponse = collections.namedtuple('WSResponse', ['data']) + + +def GET(configuration, url, query_params, _request_timeout, headers): + # switch protocols from http to websocket + url = url.replace('http://', 'ws://') + url = url.replace('https://', 'wss://') + + # patch extra / + url = url.replace('//api', '/api') + + # Extract the command from the list of tuples + commands = None + for key, value in query_params: + if key == 'command': + commands = value + break + + # drop command from query_params as we will be processing it separately + query_params = [(key, value) for key, value in query_params if + key != 'command'] + + # if we still have query params then encode them + if query_params: + url += '?' + urlencode(query_params) + + # tack on the actual command to execute at the end + if isinstance(commands, list): + for command in commands: + url += "&command=%s&" % quote_plus(command) + else: + url += '&command=' + quote_plus(commands) + + client = WSClient(configuration, url, headers) + if client.errors: + raise ApiException( + status=0, + reason='\n'.join([str(error) for error in client.errors]) + ) + return WSResponse('%s' % ''.join(client.messages)) diff --git a/kubernetes/e2e_test/test_client.py b/kubernetes/e2e_test/test_client.py index a6ee3d6c4e..ec2792987c 100644 --- a/kubernetes/e2e_test/test_client.py +++ b/kubernetes/e2e_test/test_client.py @@ -12,6 +12,7 @@ # License for the specific language governing permissions and limitations # under the License. +import time import unittest import uuid @@ -34,22 +35,49 @@ def test_pod_apis(self): client = api_client.ApiClient(self.API_URL, config=self.config) api = core_v1_api.CoreV1Api(client) - name = 'test-' + str(uuid.uuid4()) - pod_manifest = {'apiVersion': 'v1', - 'kind': 'Pod', - 'metadata': {'color': 'blue', 'name': name}, - 'spec': {'containers': [{'image': 'dockerfile/redis', - 'name': 'redis'}]}} + name = 'busybox-test-' + str(uuid.uuid4()) + pod_manifest = { + 'apiVersion': 'v1', + 'kind': 'Pod', + 'metadata': { + 'name': name + }, + 'spec': { + 'containers': [{ + 'image': 'busybox', + 'name': 'sleep', + "args": [ + "/bin/sh", + "-c", + "while true;do date;sleep 5; done" + ] + }] + } + } resp = api.create_namespaced_pod(body=pod_manifest, namespace='default') self.assertEqual(name, resp.metadata.name) self.assertTrue(resp.status.phase) - resp = api.read_namespaced_pod(name=name, - namespace='default') - self.assertEqual(name, resp.metadata.name) - self.assertTrue(resp.status.phase) + while True: + resp = api.read_namespaced_pod(name=name, + namespace='default') + self.assertEqual(name, resp.metadata.name) + self.assertTrue(resp.status.phase) + if resp.status.phase != 'Pending': + break + time.sleep(1) + + exec_command = ['/bin/sh', + '-c', + 'for i in $(seq 1 3); do date; sleep 1; done'] + resp = api.connect_get_namespaced_pod_exec(name, 'default', + command=exec_command, + stderr=False, stdin=False, + stdout=True, tty=False) + print('EXEC response : %s' % resp) + self.assertEqual(3, len(resp.splitlines())) number_of_pods = len(api.list_pod_for_all_namespaces().items) self.assertTrue(number_of_pods > 0) diff --git a/requirements.txt b/requirements.txt index 49d541adda..2674d78ff5 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,9 +1,9 @@ certifi >= 14.05.14 -six == 1.8.0 +six>=1.9.0 python_dateutil >= 2.5.3 setuptools >= 21.0.0 urllib3 >= 1.19.1 pyyaml >= 3.12 oauth2client >= 4.0.0 ipaddress >= 1.0.17 - +websocket-client>=0.32.0 diff --git a/tox.ini b/tox.ini index bfb56cf074..4091bcae61 100644 --- a/tox.ini +++ b/tox.ini @@ -6,6 +6,7 @@ passenv = TOXENV CI TRAVIS TRAVIS_* usedevelop = True install_command = pip install -U {opts} {packages} deps = -r{toxinidir}/test-requirements.txt + -r{toxinidir}/requirements.txt commands = python -V nosetests []