Skip to content

Commit

Permalink
Copy util directory for use in cri/remote package
Browse files Browse the repository at this point in the history
Signed-off-by: Davanum Srinivas <[email protected]>
  • Loading branch information
dims committed May 20, 2020
1 parent 0608e8b commit e049e59
Show file tree
Hide file tree
Showing 11 changed files with 682 additions and 14 deletions.
File renamed without changes.
2 changes: 1 addition & 1 deletion pkg/kubelet/cri/remote/fake/fake_runtime.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import (
"google.golang.org/grpc"
kubeapi "k8s.io/cri-api/pkg/apis/runtime/v1alpha2"
apitest "k8s.io/cri-api/pkg/apis/testing"
"k8s.io/kubernetes/pkg/kubelet/util"
"k8s.io/kubernetes/pkg/kubelet/cri/remote/util"
utilexec "k8s.io/utils/exec"
)

Expand Down
2 changes: 1 addition & 1 deletion pkg/kubelet/cri/remote/remote_image.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import (

internalapi "k8s.io/cri-api/pkg/apis"
runtimeapi "k8s.io/cri-api/pkg/apis/runtime/v1alpha2"
"k8s.io/kubernetes/pkg/kubelet/util"
"k8s.io/kubernetes/pkg/kubelet/cri/remote/util"
)

// RemoteImageService is a gRPC implementation of internalapi.ImageManagerService.
Expand Down
2 changes: 1 addition & 1 deletion pkg/kubelet/cri/remote/remote_runtime.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import (
"k8s.io/component-base/logs/logreduction"
internalapi "k8s.io/cri-api/pkg/apis"
runtimeapi "k8s.io/cri-api/pkg/apis/runtime/v1alpha2"
"k8s.io/kubernetes/pkg/kubelet/util"
"k8s.io/kubernetes/pkg/kubelet/cri/remote/util"
utilexec "k8s.io/utils/exec"
)

Expand Down
145 changes: 145 additions & 0 deletions pkg/kubelet/cri/remote/util/util_unix.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,145 @@
// +build freebsd linux darwin

/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package util

import (
"context"
"fmt"
"io/ioutil"
"net"
"net/url"
"os"
"path/filepath"

"golang.org/x/sys/unix"
"k8s.io/klog"
)

const (
// unixProtocol is the network protocol of unix socket.
unixProtocol = "unix"
)

// CreateListener creates a listener on the specified endpoint.
func CreateListener(endpoint string) (net.Listener, error) {
protocol, addr, err := parseEndpointWithFallbackProtocol(endpoint, unixProtocol)
if err != nil {
return nil, err
}
if protocol != unixProtocol {
return nil, fmt.Errorf("only support unix socket endpoint")
}

// Unlink to cleanup the previous socket file.
err = unix.Unlink(addr)
if err != nil && !os.IsNotExist(err) {
return nil, fmt.Errorf("failed to unlink socket file %q: %v", addr, err)
}

if err := os.MkdirAll(filepath.Dir(addr), 0750); err != nil {
return nil, fmt.Errorf("error creating socket directory %q: %v", filepath.Dir(addr), err)
}

// Create the socket on a tempfile and move it to the destination socket to handle improprer cleanup
file, err := ioutil.TempFile(filepath.Dir(addr), "")
if err != nil {
return nil, fmt.Errorf("failed to create temporary file: %v", err)
}

if err := os.Remove(file.Name()); err != nil {
return nil, fmt.Errorf("failed to remove temporary file: %v", err)
}

l, err := net.Listen(protocol, file.Name())
if err != nil {
return nil, err
}

if err = os.Rename(file.Name(), addr); err != nil {
return nil, fmt.Errorf("failed to move temporary file to addr %q: %v", addr, err)
}

return l, nil
}

// GetAddressAndDialer returns the address parsed from the given endpoint and a context dialer.
func GetAddressAndDialer(endpoint string) (string, func(ctx context.Context, addr string) (net.Conn, error), error) {
protocol, addr, err := parseEndpointWithFallbackProtocol(endpoint, unixProtocol)
if err != nil {
return "", nil, err
}
if protocol != unixProtocol {
return "", nil, fmt.Errorf("only support unix socket endpoint")
}

return addr, dial, nil
}

func dial(ctx context.Context, addr string) (net.Conn, error) {
return (&net.Dialer{}).DialContext(ctx, unixProtocol, addr)
}

func parseEndpointWithFallbackProtocol(endpoint string, fallbackProtocol string) (protocol string, addr string, err error) {
if protocol, addr, err = parseEndpoint(endpoint); err != nil && protocol == "" {
fallbackEndpoint := fallbackProtocol + "://" + endpoint
protocol, addr, err = parseEndpoint(fallbackEndpoint)
if err == nil {
klog.Warningf("Using %q as endpoint is deprecated, please consider using full url format %q.", endpoint, fallbackEndpoint)
}
}
return
}

func parseEndpoint(endpoint string) (string, string, error) {
u, err := url.Parse(endpoint)
if err != nil {
return "", "", err
}

switch u.Scheme {
case "tcp":
return "tcp", u.Host, nil

case "unix":
return "unix", u.Path, nil

case "":
return "", "", fmt.Errorf("using %q as endpoint is deprecated, please consider using full url format", endpoint)

default:
return u.Scheme, "", fmt.Errorf("protocol %q not supported", u.Scheme)
}
}

// IsUnixDomainSocket returns whether a given file is a AF_UNIX socket file
func IsUnixDomainSocket(filePath string) (bool, error) {
fi, err := os.Stat(filePath)
if err != nil {
return false, fmt.Errorf("stat file %s failed: %v", filePath, err)
}
if fi.Mode()&os.ModeSocket == 0 {
return false, nil
}
return true, nil
}

// NormalizePath is a no-op for Linux for now
func NormalizePath(path string) string {
return path
}
135 changes: 135 additions & 0 deletions pkg/kubelet/cri/remote/util/util_unix_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
// +build freebsd linux darwin

/*
Copyright 2018 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package util

import (
"io/ioutil"
"net"
"os"
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

func TestParseEndpoint(t *testing.T) {
tests := []struct {
endpoint string
expectError bool
expectedProtocol string
expectedAddr string
}{
{
endpoint: "unix:///tmp/s1.sock",
expectedProtocol: "unix",
expectedAddr: "/tmp/s1.sock",
},
{
endpoint: "tcp://localhost:15880",
expectedProtocol: "tcp",
expectedAddr: "localhost:15880",
},
{
endpoint: "npipe://./pipe/mypipe",
expectedProtocol: "npipe",
expectError: true,
},
{
endpoint: "tcp1://abc",
expectedProtocol: "tcp1",
expectError: true,
},
{
endpoint: "a b c",
expectError: true,
},
}

for _, test := range tests {
protocol, addr, err := parseEndpoint(test.endpoint)
assert.Equal(t, test.expectedProtocol, protocol)
if test.expectError {
assert.NotNil(t, err, "Expect error during parsing %q", test.endpoint)
continue
}
assert.Nil(t, err, "Expect no error during parsing %q", test.endpoint)
assert.Equal(t, test.expectedAddr, addr)
}

}

func TestIsUnixDomainSocket(t *testing.T) {
tests := []struct {
label string
listenOnSocket bool
expectSocket bool
expectError bool
invalidFile bool
}{
{
label: "Domain Socket file",
listenOnSocket: true,
expectSocket: true,
expectError: false,
},
{
label: "Non Existent file",
invalidFile: true,
expectError: true,
},
{
label: "Regular file",
listenOnSocket: false,
expectSocket: false,
expectError: false,
},
}
for _, test := range tests {
f, err := ioutil.TempFile("", "test-domain-socket")
require.NoErrorf(t, err, "Failed to create file for test purposes: %v while setting up: %s", err, test.label)
addr := f.Name()
f.Close()
var ln *net.UnixListener
if test.listenOnSocket {
os.Remove(addr)
ta, err := net.ResolveUnixAddr("unix", addr)
require.NoErrorf(t, err, "Failed to ResolveUnixAddr: %v while setting up: %s", err, test.label)
ln, err = net.ListenUnix("unix", ta)
require.NoErrorf(t, err, "Failed to ListenUnix: %v while setting up: %s", err, test.label)
}
fileToTest := addr
if test.invalidFile {
fileToTest = fileToTest + ".invalid"
}
result, err := IsUnixDomainSocket(fileToTest)
if test.listenOnSocket {
// this takes care of removing the file associated with the domain socket
ln.Close()
} else {
// explicitly remove regular file
os.Remove(addr)
}
if test.expectError {
assert.NotNil(t, err, "Unexpected nil error from IsUnixDomainSocket for %s", test.label)
} else {
assert.Nil(t, err, "Unexpected error invoking IsUnixDomainSocket for %s", test.label)
}
assert.Equal(t, result, test.expectSocket, "Unexpected result from IsUnixDomainSocket: %v for %s", result, test.label)
}
}
55 changes: 55 additions & 0 deletions pkg/kubelet/cri/remote/util/util_unsupported.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
// +build !freebsd,!linux,!windows,!darwin

/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package util

import (
"context"
"fmt"
"net"
"time"
)

// CreateListener creates a listener on the specified endpoint.
func CreateListener(endpoint string) (net.Listener, error) {
return nil, fmt.Errorf("CreateListener is unsupported in this build")
}

// GetAddressAndDialer returns the address parsed from the given endpoint and a context dialer.
func GetAddressAndDialer(endpoint string) (string, func(ctx context.Context, addr string) (net.Conn, error), error) {
return "", nil, fmt.Errorf("GetAddressAndDialer is unsupported in this build")
}

// LockAndCheckSubPath empty implementation
func LockAndCheckSubPath(volumePath, subPath string) ([]uintptr, error) {
return []uintptr{}, nil
}

// UnlockPath empty implementation
func UnlockPath(fileHandles []uintptr) {
}

// LocalEndpoint empty implementation
func LocalEndpoint(path, file string) (string, error) {
return "", fmt.Errorf("LocalEndpoints are unsupported in this build")
}

// GetBootTime empty implementation
func GetBootTime() (time.Time, error) {
return time.Time{}, fmt.Errorf("GetBootTime is unsupported in this build")
}
Loading

0 comments on commit e049e59

Please sign in to comment.