Skip to content

Commit

Permalink
Implementation for /exec using websocket
Browse files Browse the repository at this point in the history
inspired by the POC from @chekolyn

* Adds a new requirement on websocket-client
* Add a new class WSClient that uses WebSocketApp from
  the websocket-client.
* Make sure we pass Authorization header
* Make sure we honor the SSL settings in configuration
* Some of the code will get overwritten when we generate
  fresh classes from swagger definition. To remind us
  added a e2e test so we don't lose the changes
* Added a new configuration option to enable/disable failures
  when hostnames in certificates don't match

Fixes kubernetes-client#58
  • Loading branch information
dims committed Feb 7, 2017
1 parent 192b67c commit 066bba1
Show file tree
Hide file tree
Showing 5 changed files with 165 additions and 12 deletions.
10 changes: 10 additions & 0 deletions kubernetes/client/api_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
from __future__ import absolute_import

from . import models
from . import ws_client
from .rest import RESTClientObject
from .rest import ApiException

Expand Down Expand Up @@ -343,6 +344,15 @@ def request(self, method, url, query_params=None, headers=None,
"""
Makes the HTTP request using RESTClient.
"""
# FIXME(dims) : We need a better way to figure out which
# calls end up using web sockets
if url.endswith('/exec') and method == "GET":
return ws_client.GET(self.config,
url,
query_params=query_params,
_request_timeout=_request_timeout,
headers=headers)

if method == "GET":
return self.rest_client.GET(url,
query_params=query_params,
Expand Down
114 changes: 114 additions & 0 deletions kubernetes/client/ws_client.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.

from .rest import ApiException

import certifi
import collections
import websocket
import six
import ssl
from six.moves.urllib.parse import urlencode
from six.moves.urllib.parse import quote_plus


class WSClient:
def __init__(self, configuration, url, headers):
self.messages = []
self.errors = []
websocket.enableTrace(False)
header = None

# We just need to pass the Authorization, ignore all the other
# http headers we get from the generated code
if 'Authorization' in headers:
header = "Authorization: %s" % headers['Authorization']

self.ws = websocket.WebSocketApp(url,
on_message=self.on_message,
on_error=self.on_error,
on_close=self.on_close,
header=[header] if header else None)
self.ws.on_open = self.on_open

if url.startswith('wss://') and configuration.verify_ssl:
ssl_opts = {
'cert_reqs': ssl.CERT_REQUIRED,
'keyfile': configuration.key_file,
'certfile': configuration.cert_file,
'ca_certs': configuration.ssl_ca_cert or certifi.where(),
}
if configuration.assert_hostname is not None:
ssl_opts['check_hostname'] = configuration.assert_hostname
else:
ssl_opts = {'cert_reqs': ssl.CERT_NONE}

self.ws.run_forever(sslopt=ssl_opts)

def on_message(self, ws, message):
if message[0] == '\x01':
message = message[1:]
if message:
if six.PY3 and isinstance(message, six.binary_type):
message = message.decode('utf-8')
self.messages.append(message)

def on_error(self, ws, error):
self.errors.append(error)

def on_close(self, ws):
pass

def on_open(self, ws):
pass


WSResponse = collections.namedtuple('WSResponse', ['data'])


def GET(configuration, url, query_params, _request_timeout, headers):
# switch protocols from http to websocket
url = url.replace('http://', 'ws://')
url = url.replace('https://', 'wss://')

# patch extra /
url = url.replace('//api', '/api')

# Extract the command from the list of tuples
commands = None
for key, value in query_params:
if key == 'command':
commands = value
break

# drop command from query_params as we will be processing it separately
query_params = [(key, value) for key, value in query_params if
key != 'command']

# if we still have query params then encode them
if query_params:
url += '?' + urlencode(query_params)

# tack on the actual command to execute at the end
if isinstance(commands, list):
for command in commands:
url += "&command=%s&" % quote_plus(command)
else:
url += '&command=' + quote_plus(commands)

client = WSClient(configuration, url, headers)
if client.errors:
raise ApiException(
status=0,
reason='\n'.join([str(error) for error in client.errors])
)
return WSResponse('%s' % ''.join(client.messages))
48 changes: 38 additions & 10 deletions kubernetes/e2e_test/test_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
# License for the specific language governing permissions and limitations
# under the License.

import time
import unittest
import uuid

Expand All @@ -34,22 +35,49 @@ def test_pod_apis(self):
client = api_client.ApiClient(self.API_URL, config=self.config)
api = core_v1_api.CoreV1Api(client)

name = 'test-' + str(uuid.uuid4())
pod_manifest = {'apiVersion': 'v1',
'kind': 'Pod',
'metadata': {'color': 'blue', 'name': name},
'spec': {'containers': [{'image': 'dockerfile/redis',
'name': 'redis'}]}}
name = 'busybox-test-' + str(uuid.uuid4())
pod_manifest = {
'apiVersion': 'v1',
'kind': 'Pod',
'metadata': {
'name': name
},
'spec': {
'containers': [{
'image': 'busybox',
'name': 'sleep',
"args": [
"/bin/sh",
"-c",
"while true;do date;sleep 5; done"
]
}]
}
}

resp = api.create_namespaced_pod(body=pod_manifest,
namespace='default')
self.assertEqual(name, resp.metadata.name)
self.assertTrue(resp.status.phase)

resp = api.read_namespaced_pod(name=name,
namespace='default')
self.assertEqual(name, resp.metadata.name)
self.assertTrue(resp.status.phase)
while True:
resp = api.read_namespaced_pod(name=name,
namespace='default')
self.assertEqual(name, resp.metadata.name)
self.assertTrue(resp.status.phase)
if resp.status.phase != 'Pending':
break
time.sleep(1)

exec_command = ['/bin/sh',
'-c',
'for i in $(seq 1 3); do date; sleep 1; done']
resp = api.connect_get_namespaced_pod_exec(name, 'default',
command=exec_command,
stderr=False, stdin=False,
stdout=True, tty=False)
print('EXEC response : %s' % resp)
self.assertEqual(3, len(resp.splitlines()))

number_of_pods = len(api.list_pod_for_all_namespaces().items)
self.assertTrue(number_of_pods > 0)
Expand Down
4 changes: 2 additions & 2 deletions requirements.txt
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
certifi >= 14.05.14
six == 1.8.0
six>=1.9.0
python_dateutil >= 2.5.3
setuptools >= 21.0.0
urllib3 >= 1.19.1
pyyaml >= 3.12
oauth2client >= 4.0.0
ipaddress >= 1.0.17

websocket-client>=0.32.0
1 change: 1 addition & 0 deletions tox.ini
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ passenv = TOXENV CI TRAVIS TRAVIS_*
usedevelop = True
install_command = pip install -U {opts} {packages}
deps = -r{toxinidir}/test-requirements.txt
-r{toxinidir}/requirements.txt
commands =
python -V
nosetests []
Expand Down

0 comments on commit 066bba1

Please sign in to comment.