Skip to content

Commit

Permalink
Falls back to SPDY for gorilla/websocket https proxy error
Browse files Browse the repository at this point in the history
  • Loading branch information
seans3 committed Jul 20, 2024
1 parent a8d354b commit 9d56054
Show file tree
Hide file tree
Showing 8 changed files with 275 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,15 @@ func IsUpgradeFailure(err error) bool {
return errors.As(err, &upgradeErr)
}

// isHTTPSProxyError returns true if error is Gorilla/Websockets HTTPS Proxy dial error;
// false otherwise (see https://github.com/kubernetes/kubernetes/issues/126134).
func IsHTTPSProxyError(err error) bool {
if err == nil {
return false
}
return strings.Contains(err.Error(), "proxy: unknown scheme: https")
}

// IsUpgradeRequest returns true if the given request is a connection upgrade request
func IsUpgradeRequest(req *http.Request) bool {
for _, h := range req.Header[http.CanonicalHeaderKey(HeaderConnection)] {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -168,3 +168,32 @@ func TestIsUpgradeFailureError(t *testing.T) {
})
}
}

func TestIsHTTPSProxyError(t *testing.T) {
testCases := map[string]struct {
err error
expected bool
}{
"nil error should return false": {
err: nil,
expected: false,
},
"Not HTTPS proxy error should return false": {
err: errors.New("this is not an upgrade error"),
expected: false,
},
"HTTPS proxy error should return true": {
err: errors.New("proxy: unknown scheme: https"),
expected: true,
},
}

for name, test := range testCases {
t.Run(name, func(t *testing.T) {
actual := IsHTTPSProxyError(test.err)
if test.expected != actual {
t.Errorf("expected HTTPS proxy error %t, got %t", test.expected, actual)
}
})
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,12 @@ limitations under the License.
package portforward

import (
"errors"
"fmt"
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"k8s.io/apimachinery/pkg/util/httpstream"
)

Expand All @@ -36,7 +38,7 @@ func TestFallbackDialer(t *testing.T) {
assert.True(t, primary.dialed, "no fallback; primary should have dialed")
assert.False(t, secondary.dialed, "no fallback; secondary should *not* have dialed")
assert.Equal(t, primaryProtocol, negotiated, "primary negotiated protocol returned")
assert.Nil(t, err, "error from primary dialer should be nil")
require.NoError(t, err, "error from primary dialer should be nil")
// If primary dialer error is upgrade error, then fallback returning secondary dial response.
primary = &fakeDialer{dialed: false, negotiatedProtocol: primaryProtocol, err: &httpstream.UpgradeFailureError{}}
secondary = &fakeDialer{dialed: false, negotiatedProtocol: secondaryProtocol}
Expand All @@ -45,7 +47,18 @@ func TestFallbackDialer(t *testing.T) {
assert.True(t, primary.dialed, "fallback; primary should have dialed")
assert.True(t, secondary.dialed, "fallback; secondary should have dialed")
assert.Equal(t, secondaryProtocol, negotiated, "negotiated protocol is from secondary dialer")
assert.Nil(t, err, "error from secondary dialer should be nil")
require.NoError(t, err, "error from secondary dialer should be nil")
// If primary dialer error is https proxy dialing error, then fallback returning secondary dial response.
primary = &fakeDialer{negotiatedProtocol: primaryProtocol, err: errors.New("proxy: unknown scheme: https")}
secondary = &fakeDialer{negotiatedProtocol: secondaryProtocol}
fallbackDialer = NewFallbackDialer(primary, secondary, func(err error) bool {
return httpstream.IsUpgradeFailure(err) || httpstream.IsHTTPSProxyError(err)
})
_, negotiated, err = fallbackDialer.Dial(protocols...)
assert.True(t, primary.dialed, "fallback; primary should have dialed")
assert.True(t, secondary.dialed, "fallback; secondary should have dialed")
assert.Equal(t, secondaryProtocol, negotiated, "negotiated protocol is from secondary dialer")
require.NoError(t, err, "error from secondary dialer should be nil")
// If primary dialer returns non-upgrade error, then primary error is returned.
nonUpgradeErr := fmt.Errorf("This is a non-upgrade error")
primary = &fakeDialer{dialed: false, err: nonUpgradeErr}
Expand Down
176 changes: 176 additions & 0 deletions staging/src/k8s.io/client-go/tools/remotecommand/fallback_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,19 @@ import (
"bytes"
"context"
"crypto/rand"
"crypto/tls"
"io"
"net/http"
"net/http/httptest"
"net/url"
"sync/atomic"
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"k8s.io/apimachinery/pkg/util/httpstream"
utilnettesting "k8s.io/apimachinery/pkg/util/net/testing"
"k8s.io/apimachinery/pkg/util/remotecommand"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/rest"
Expand Down Expand Up @@ -165,6 +169,178 @@ func TestFallbackClient_SPDYSecondarySucceeds(t *testing.T) {
}
}

// localhostCert was generated from crypto/tls/generate_cert.go with the following command:
//
// go run generate_cert.go --rsa-bits 2048 --host 127.0.0.1,::1,example.com --ca --start-date "Jan 1 00:00:00 1970" --duration=1000000h
var localhostCert = []byte(`-----BEGIN CERTIFICATE-----
MIIDGTCCAgGgAwIBAgIRALL5AZcefF4kkYV1SEG6YrMwDQYJKoZIhvcNAQELBQAw
EjEQMA4GA1UEChMHQWNtZSBDbzAgFw03MDAxMDEwMDAwMDBaGA8yMDg0MDEyOTE2
MDAwMFowEjEQMA4GA1UEChMHQWNtZSBDbzCCASIwDQYJKoZIhvcNAQEBBQADggEP
ADCCAQoCggEBALQ/FHcyVwdFHxARbbD2KBtDUT7Eni+8ioNdjtGcmtXqBv45EC1C
JOqqGJTroFGJ6Q9kQIZ9FqH5IJR2fOOJD9kOTueG4Vt1JY1rj1Kbpjefu8XleZ5L
SBwIWVnN/lEsEbuKmj7N2gLt5AH3zMZiBI1mg1u9Z5ZZHYbCiTpBrwsq6cTlvR9g
dyo1YkM5hRESCzsrL0aUByoo0qRMD8ZsgANJwgsiO0/M6idbxDwv1BnGwGmRYvOE
Hxpy3v0Jg7GJYrvnpnifJTs4nw91N5X9pXxR7FFzi/6HTYDWRljvTb0w6XciKYAz
bWZ0+cJr5F7wB7ovlbm7HrQIR7z7EIIu2d8CAwEAAaNoMGYwDgYDVR0PAQH/BAQD
AgKkMBMGA1UdJQQMMAoGCCsGAQUFBwMBMA8GA1UdEwEB/wQFMAMBAf8wLgYDVR0R
BCcwJYILZXhhbXBsZS5jb22HBH8AAAGHEAAAAAAAAAAAAAAAAAAAAAEwDQYJKoZI
hvcNAQELBQADggEBAFPPWopNEJtIA2VFAQcqN6uJK+JVFOnjGRoCrM6Xgzdm0wxY
XCGjsxY5dl+V7KzdGqu858rCaq5osEBqypBpYAnS9C38VyCDA1vPS1PsN8SYv48z
DyBwj+7R2qar0ADBhnhWxvYO9M72lN/wuCqFKYMeFSnJdQLv3AsrrHe9lYqOa36s
8wxSwVTFTYXBzljPEnSaaJMPqFD8JXaZK1ryJPkO5OsCNQNGtatNiWAf3DcmwHAT
MGYMzP0u4nw47aRz9shB8w+taPKHx2BVwE1m/yp3nHVioOjXqA1fwRQVGclCJSH1
D2iq3hWVHRENgjTjANBPICLo9AZ4JfN6PH19mnU=
-----END CERTIFICATE-----`)

// localhostKey is the private key for localhostCert.
var localhostKey = []byte(`-----BEGIN RSA PRIVATE KEY-----
MIIEogIBAAKCAQEAtD8UdzJXB0UfEBFtsPYoG0NRPsSeL7yKg12O0Zya1eoG/jkQ
LUIk6qoYlOugUYnpD2RAhn0WofkglHZ844kP2Q5O54bhW3UljWuPUpumN5+7xeV5
nktIHAhZWc3+USwRu4qaPs3aAu3kAffMxmIEjWaDW71nllkdhsKJOkGvCyrpxOW9
H2B3KjViQzmFERILOysvRpQHKijSpEwPxmyAA0nCCyI7T8zqJ1vEPC/UGcbAaZFi
84QfGnLe/QmDsYliu+emeJ8lOzifD3U3lf2lfFHsUXOL/odNgNZGWO9NvTDpdyIp
gDNtZnT5wmvkXvAHui+VubsetAhHvPsQgi7Z3wIDAQABAoIBAGmw93IxjYCQ0ncc
kSKMJNZfsdtJdaxuNRZ0nNNirhQzR2h403iGaZlEpmdkhzxozsWcto1l+gh+SdFk
bTUK4MUZM8FlgO2dEqkLYh5BcMT7ICMZvSfJ4v21E5eqR68XVUqQKoQbNvQyxFk3
EddeEGdNrkb0GDK8DKlBlzAW5ep4gjG85wSTjR+J+muUv3R0BgLBFSuQnIDM/IMB
LWqsja/QbtB7yppe7jL5u8UCFdZG8BBKT9fcvFIu5PRLO3MO0uOI7LTc8+W1Xm23
uv+j3SY0+v+6POjK0UlJFFi/wkSPTFIfrQO1qFBkTDQHhQ6q/7GnILYYOiGbIRg2
NNuP52ECgYEAzXEoy50wSYh8xfFaBuxbm3ruuG2W49jgop7ZfoFrPWwOQKAZS441
VIwV4+e5IcA6KkuYbtGSdTYqK1SMkgnUyD/VevwAqH5TJoEIGu0pDuKGwVuwqioZ
frCIAV5GllKyUJ55VZNbRr2vY2fCsWbaCSCHETn6C16DNuTCe5C0JBECgYEA4JqY
5GpNbMG8fOt4H7hU0Fbm2yd6SHJcQ3/9iimef7xG6ajxsYrIhg1ft+3IPHMjVI0+
9brwHDnWg4bOOx/VO4VJBt6Dm/F33bndnZRkuIjfSNpLM51P+EnRdaFVHOJHwKqx
uF69kihifCAG7YATgCveeXImzBUSyZUz9UrETu8CgYARNBimdFNG1RcdvEg9rC0/
p9u1tfecvNySwZqU7WF9kz7eSonTueTdX521qAHowaAdSpdJMGODTTXaywm6cPhQ
jIfj9JZZhbqQzt1O4+08Qdvm9TamCUB5S28YLjza+bHU7nBaqixKkDfPqzCyilpX
yVGGL8SwjwmN3zop/sQXAQKBgC0JMsESQ6YcDsRpnrOVjYQc+LtW5iEitTdfsaID
iGGKihmOI7B66IxgoCHMTws39wycKdSyADVYr5e97xpR3rrJlgQHmBIrz+Iow7Q2
LiAGaec8xjl6QK/DdXmFuQBKqyKJ14rljFODP4QuE9WJid94bGqjpf3j99ltznZP
4J8HAoGAJb4eb4lu4UGwifDzqfAPzLGCoi0fE1/hSx34lfuLcc1G+LEu9YDKoOVJ
9suOh0b5K/bfEy9KrVMBBriduvdaERSD8S3pkIQaitIz0B029AbE4FLFf9lKQpP2
KR8NJEkK99Vh/tew6jAMll70xFrE7aF8VLXJVE7w4sQzuvHxl9Q=
-----END RSA PRIVATE KEY-----
`)

// See (https://github.com/kubernetes/kubernetes/issues/126134).
func TestFallbackClient_WebSocketHTTPSProxyCausesSPDYFallback(t *testing.T) {
cert, err := tls.X509KeyPair(localhostCert, localhostKey)
if err != nil {
t.Errorf("https (valid hostname): proxy_test: %v", err)
}

var proxyCalled atomic.Int64
proxyHandler := utilnettesting.NewHTTPProxyHandler(t, func(req *http.Request) bool {
proxyCalled.Add(1)
return true
})
defer proxyHandler.Wait()

proxyServer := httptest.NewUnstartedServer(proxyHandler)
proxyServer.TLS = &tls.Config{Certificates: []tls.Certificate{cert}}
proxyServer.StartTLS()
defer proxyServer.Close() //nolint:errcheck

proxyLocation, err := url.Parse(proxyServer.URL)
require.NoError(t, err)

// Create fake SPDY server. Copy received STDIN data back onto STDOUT stream.
spdyServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
var stdin, stdout bytes.Buffer
ctx, err := createHTTPStreams(w, req, &StreamOptions{
Stdin: &stdin,
Stdout: &stdout,
})
if err != nil {
w.WriteHeader(http.StatusForbidden)
return
}
defer ctx.conn.Close() //nolint:errcheck
_, err = io.Copy(ctx.stdoutStream, ctx.stdinStream)
if err != nil {
t.Fatalf("error copying STDIN to STDOUT: %v", err)
}
}))
defer spdyServer.Close() //nolint:errcheck

backendLocation, err := url.Parse(spdyServer.URL)
require.NoError(t, err)

clientConfig := &rest.Config{
Host: spdyServer.URL,
TLSClientConfig: rest.TLSClientConfig{CAData: localhostCert},
Proxy: func(req *http.Request) (*url.URL, error) {
return proxyLocation, nil
},
}

// Websocket with https proxy will fail in dialing (falling back to SPDY).
websocketExecutor, err := NewWebSocketExecutor(clientConfig, "GET", backendLocation.String())
require.NoError(t, err)
spdyExecutor, err := NewSPDYExecutor(clientConfig, "POST", backendLocation)
require.NoError(t, err)
// Fallback to spdyExecutor with websocket https proxy error; spdyExecutor succeeds against fake spdy server.
sawHTTPSProxyError := false
exec, err := NewFallbackExecutor(websocketExecutor, spdyExecutor, func(err error) bool {
if httpstream.IsUpgradeFailure(err) {
t.Errorf("saw upgrade failure: %v", err)
return true
}
if httpstream.IsHTTPSProxyError(err) {
sawHTTPSProxyError = true
t.Logf("saw https proxy error: %v", err)
return true
}
return false
})
require.NoError(t, err)

// Generate random data, and set it up to stream on STDIN. The data will be
// returned on the STDOUT buffer.
randomSize := 1024 * 1024
randomData := make([]byte, randomSize)
if _, err := rand.Read(randomData); err != nil {
t.Errorf("unexpected error reading random data: %v", err)
}
var stdout bytes.Buffer
options := &StreamOptions{
Stdin: bytes.NewReader(randomData),
Stdout: &stdout,
}
errorChan := make(chan error)
go func() {
errorChan <- exec.StreamWithContext(context.Background(), *options)
}()

select {
case <-time.After(wait.ForeverTestTimeout):
t.Fatalf("expect stream to be closed after connection is closed.")
case err := <-errorChan:
if err != nil {
t.Errorf("unexpected error")
}
}

data, err := io.ReadAll(bytes.NewReader(stdout.Bytes()))
if err != nil {
t.Errorf("error reading the stream: %v", err)
return
}
// Check the random data sent on STDIN was the same returned on STDOUT.
if !bytes.Equal(randomData, data) {
t.Errorf("unexpected data received: %d sent: %d", len(data), len(randomData))
}

// Ensure the https proxy error was observed
if !sawHTTPSProxyError {
t.Errorf("expected to see https proxy error")
}
// Ensure the proxy was called once
if e, a := int64(1), proxyCalled.Load(); e != a {
t.Errorf("expected %d proxy call, got %d", e, a)
}
}

func TestFallbackClient_PrimaryAndSecondaryFail(t *testing.T) {
// Create fake WebSocket server. Copy received STDIN data back onto STDOUT stream.
websocketServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
Expand Down
37 changes: 37 additions & 0 deletions staging/src/k8s.io/client-go/tools/remotecommand/websocket_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ import (
v1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/httpstream"
"k8s.io/apimachinery/pkg/util/httpstream/wsstream"
"k8s.io/apimachinery/pkg/util/remotecommand"
"k8s.io/apimachinery/pkg/util/wait"
Expand Down Expand Up @@ -814,6 +815,42 @@ func TestWebSocketClient_BadHandshake(t *testing.T) {
}
}

// See (https://github.com/kubernetes/kubernetes/issues/126134).
func TestWebSocketClient_HTTPSProxyErrorExpected(t *testing.T) {
urlStr := "http://127.0.0.1/never-used" + "?" + "stdin=true" + "&" + "stdout=true"
websocketLocation, err := url.Parse(urlStr)
if err != nil {
t.Fatalf("Unable to parse WebSocket server URL: %s", urlStr)
}
// proxy url with https scheme will trigger websocket dialing error.
httpsProxyFunc := func(req *http.Request) (*url.URL, error) { return url.Parse("https://127.0.0.1") }
exec, err := NewWebSocketExecutor(&rest.Config{Host: websocketLocation.Host, Proxy: httpsProxyFunc}, "GET", urlStr)
if err != nil {
t.Errorf("unexpected error creating websocket executor: %v", err)
}
var stdout bytes.Buffer
options := &StreamOptions{
Stdout: &stdout,
}
errorChan := make(chan error)
go func() {
// Start the streaming on the WebSocket "exec" client.
errorChan <- exec.StreamWithContext(context.Background(), *options)
}()

select {
case <-time.After(wait.ForeverTestTimeout):
t.Fatalf("expect stream to be closed after connection is closed.")
case err := <-errorChan:
if err == nil {
t.Errorf("expected error but received none")
}
if !httpstream.IsHTTPSProxyError(err) {
t.Errorf("expected https proxy error, got (%s)", err)
}
}
}

// TestWebSocketClient_HeartbeatTimeout tests the heartbeat by forcing a
// timeout by setting the ping period greater than the deadline.
func TestWebSocketClient_HeartbeatTimeout(t *testing.T) {
Expand Down
4 changes: 3 additions & 1 deletion staging/src/k8s.io/kubectl/pkg/cmd/attach/attach.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,9 @@ func createExecutor(url *url.URL, config *restclient.Config) (remotecommand.Exec
if err != nil {
return nil, err
}
exec, err = remotecommand.NewFallbackExecutor(websocketExec, exec, httpstream.IsUpgradeFailure)
exec, err = remotecommand.NewFallbackExecutor(websocketExec, exec, func(err error) bool {
return httpstream.IsUpgradeFailure(err) || httpstream.IsHTTPSProxyError(err)
})
if err != nil {
return nil, err
}
Expand Down
4 changes: 3 additions & 1 deletion staging/src/k8s.io/kubectl/pkg/cmd/exec/exec.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,9 @@ func createExecutor(url *url.URL, config *restclient.Config) (remotecommand.Exec
if err != nil {
return nil, err
}
exec, err = remotecommand.NewFallbackExecutor(websocketExec, exec, httpstream.IsUpgradeFailure)
exec, err = remotecommand.NewFallbackExecutor(websocketExec, exec, func(err error) bool {
return httpstream.IsUpgradeFailure(err) || httpstream.IsHTTPSProxyError(err)
})
if err != nil {
return nil, err
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,9 @@ func createDialer(method string, url *url.URL, opts PortForwardOptions) (httpstr
return nil, err
}
// First attempt tunneling (websocket) dialer, then fallback to spdy dialer.
dialer = portforward.NewFallbackDialer(tunnelingDialer, dialer, httpstream.IsUpgradeFailure)
dialer = portforward.NewFallbackDialer(tunnelingDialer, dialer, func(err error) bool {
return httpstream.IsUpgradeFailure(err) || httpstream.IsHTTPSProxyError(err)
})
}
return dialer, nil
}
Expand Down

0 comments on commit 9d56054

Please sign in to comment.