Skip to content

Commit

Permalink
Implement volumes as plugins.
Browse files Browse the repository at this point in the history
Break up the monolithic volumes code in kubelet into very small individual
modules with a well-defined interface.  Move them all into their own packages
and beef up testing along the way.
  • Loading branch information
thockin committed Jan 20, 2015
1 parent f90ad57 commit 6cb2758
Show file tree
Hide file tree
Showing 31 changed files with 2,059 additions and 985 deletions.
5 changes: 3 additions & 2 deletions cmd/integration/integration.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ import (
replicationControllerPkg "github.com/GoogleCloudPlatform/kubernetes/pkg/controller"
"github.com/GoogleCloudPlatform/kubernetes/pkg/health"
"github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/dockertools"
"github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/volume/empty_dir"
"github.com/GoogleCloudPlatform/kubernetes/pkg/labels"
"github.com/GoogleCloudPlatform/kubernetes/pkg/master"
"github.com/GoogleCloudPlatform/kubernetes/pkg/service"
Expand Down Expand Up @@ -191,13 +192,13 @@ func startComponents(manifestURL string) (apiServerURL string) {
// Kubelet (localhost)
testRootDir := makeTempDirOrDie("kubelet_integ_1.")
glog.Infof("Using %s as root dir for kubelet #1", testRootDir)
standalone.SimpleRunKubelet(cl, nil, &fakeDocker1, machineList[0], testRootDir, manifestURL, "127.0.0.1", 10250, api.NamespaceDefault)
standalone.SimpleRunKubelet(cl, nil, &fakeDocker1, machineList[0], testRootDir, manifestURL, "127.0.0.1", 10250, api.NamespaceDefault, empty_dir.ProbeVolumePlugins())
// Kubelet (machine)
// Create a second kubelet so that the guestbook example's two redis slaves both
// have a place they can schedule.
testRootDir = makeTempDirOrDie("kubelet_integ_2.")
glog.Infof("Using %s as root dir for kubelet #2", testRootDir)
standalone.SimpleRunKubelet(cl, nil, &fakeDocker2, machineList[1], testRootDir, "", "127.0.0.1", 10251, api.NamespaceDefault)
standalone.SimpleRunKubelet(cl, nil, &fakeDocker2, machineList[1], testRootDir, "", "127.0.0.1", 10251, api.NamespaceDefault, empty_dir.ProbeVolumePlugins())

return apiServer.URL
}
Expand Down
43 changes: 43 additions & 0 deletions cmd/kubelet/app/plugins.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
/*
Copyright 2014 Google Inc. All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package app

// This file exists to force the desired plugin implementations to be linked.
import (
// Credential providers
_ "github.com/GoogleCloudPlatform/kubernetes/pkg/credentialprovider/gcp"
// Volume plugins
"github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/volume"
"github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/volume/empty_dir"
"github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/volume/gce_pd"
"github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/volume/git_repo"
"github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/volume/host_path"
)

func ProbeVolumePlugins() []volume.Plugin {
allPlugins := []volume.Plugin{}

// The list of plugins to probe is decided by the kubelet binary, not
// by dynamic linking or other "magic". Plugins will be analyzed and
// initialized later.
allPlugins = append(allPlugins, empty_dir.ProbeVolumePlugins()...)
allPlugins = append(allPlugins, gce_pd.ProbeVolumePlugins()...)
allPlugins = append(allPlugins, git_repo.ProbeVolumePlugins()...)
allPlugins = append(allPlugins, host_path.ProbeVolumePlugins()...)

return allPlugins
}
2 changes: 2 additions & 0 deletions cmd/kubelet/kubelet.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"net"
"time"

"github.com/GoogleCloudPlatform/kubernetes/cmd/kubelet/app"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
_ "github.com/GoogleCloudPlatform/kubernetes/pkg/healthz"
"github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet"
Expand Down Expand Up @@ -152,6 +153,7 @@ func main() {
KubeClient: client,
EtcdClient: kubelet.EtcdClientOrDie(etcdServerList, *etcdConfigFile),
MasterServiceNamespace: *masterServiceNamespace,
VolumePlugins: app.ProbeVolumePlugins(),
}

standalone.RunKubelet(&kcfg)
Expand Down
24 changes: 0 additions & 24 deletions cmd/kubelet/plugins.go

This file was deleted.

3 changes: 2 additions & 1 deletion cmd/kubernetes/kubernetes.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"fmt"
"time"

kubeletapp "github.com/GoogleCloudPlatform/kubernetes/cmd/kubelet/app"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api/testapi"
"github.com/GoogleCloudPlatform/kubernetes/pkg/client"
Expand Down Expand Up @@ -54,7 +55,7 @@ func startComponents(etcdClient tools.EtcdClient, cl *client.Client, addr string
standalone.RunControllerManager(machineList, cl, *nodeMilliCPU, *nodeMemory)

dockerClient := util.ConnectToDockerOrDie(*dockerEndpoint)
standalone.SimpleRunKubelet(cl, nil, dockerClient, machineList[0], "/tmp/kubernetes", "", "127.0.0.1", 10250, *masterServiceNamespace)
standalone.SimpleRunKubelet(cl, nil, dockerClient, machineList[0], "/tmp/kubernetes", "", "127.0.0.1", 10250, *masterServiceNamespace, kubeletapp.ProbeVolumePlugins())
}

func newApiClient(addr string, port int) *client.Client {
Expand Down
124 changes: 76 additions & 48 deletions pkg/kubelet/kubelet.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,12 +39,12 @@ import (
"github.com/GoogleCloudPlatform/kubernetes/pkg/health"
"github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/dockertools"
"github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/envvars"
"github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/volume"
"github.com/GoogleCloudPlatform/kubernetes/pkg/labels"
"github.com/GoogleCloudPlatform/kubernetes/pkg/tools"
"github.com/GoogleCloudPlatform/kubernetes/pkg/types"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util/errors"
"github.com/GoogleCloudPlatform/kubernetes/pkg/volume"
"github.com/fsouza/go-dockerclient"
"github.com/golang/glog"
)
Expand Down Expand Up @@ -81,7 +81,8 @@ func NewMainKubelet(
sourceReady SourceReadyFn,
clusterDomain string,
clusterDNS net.IP,
masterServiceNamespace string) (*Kubelet, error) {
masterServiceNamespace string,
volumePlugins []volume.Plugin) (*Kubelet, error) {
if rootDirectory == "" {
return nil, fmt.Errorf("invalid root directory %q", rootDirectory)
}
Expand Down Expand Up @@ -124,6 +125,9 @@ func NewMainKubelet(
if err := klet.setupDataDirs(); err != nil {
return nil, err
}
if err := klet.volumePluginMgr.InitPlugins(volumePlugins, &volumeHost{klet}); err != nil {
return nil, err
}

return klet, nil
}
Expand Down Expand Up @@ -191,34 +195,52 @@ type Kubelet struct {

masterServiceNamespace string
serviceLister serviceLister

// Volume plugins.
volumePluginMgr volume.PluginMgr
}

// GetRootDir returns the full path to the directory under which kubelet can
// getRootDir returns the full path to the directory under which kubelet can
// store data. These functions are useful to pass interfaces to other modules
// that may need to know where to write data without getting a whole kubelet
// instance.
func (kl *Kubelet) GetRootDir() string {
func (kl *Kubelet) getRootDir() string {
return kl.rootDirectory
}

// GetPodsDir returns the full path to the directory under which pod
// getPodsDir returns the full path to the directory under which pod
// directories are created.
func (kl *Kubelet) GetPodsDir() string {
return path.Join(kl.GetRootDir(), "pods")
func (kl *Kubelet) getPodsDir() string {
return path.Join(kl.getRootDir(), "pods")
}

// getPluginsDir returns the full path to the directory under which plugin
// directories are created. Plugins can use these directories for data that
// they need to persist. Plugins should create subdirectories under this named
// after their own names.
func (kl *Kubelet) getPluginsDir() string {
return path.Join(kl.getRootDir(), "plugins")
}

// getPluginDir returns a data directory name for a given plugin name.
// Plugins can use these directories to store data that they need to persist.
// For per-pod plugin data, see getPodPluginDir.
func (kl *Kubelet) getPluginDir(pluginName string) string {
return path.Join(kl.getPluginsDir(), pluginName)
}

// GetPodDir returns the full path to the per-pod data directory for the
// getPodDir returns the full path to the per-pod data directory for the
// specified pod. This directory may not exist if the pod does not exist.
func (kl *Kubelet) GetPodDir(podUID types.UID) string {
func (kl *Kubelet) getPodDir(podUID types.UID) string {
// Backwards compat. The "old" stuff should be removed before 1.0
// release. The thinking here is this:
// !old && !new = use new
// !old && new = use new
// old && !new = use old
// old && new = use new (but warn)
oldPath := path.Join(kl.GetRootDir(), string(podUID))
oldPath := path.Join(kl.getRootDir(), string(podUID))
oldExists := dirExists(oldPath)
newPath := path.Join(kl.GetPodsDir(), string(podUID))
newPath := path.Join(kl.getPodsDir(), string(podUID))
newExists := dirExists(newPath)
if oldExists && !newExists {
return oldPath
Expand All @@ -229,26 +251,47 @@ func (kl *Kubelet) GetPodDir(podUID types.UID) string {
return newPath
}

// GetPodVolumesDir returns the full path to the per-pod data directory under
// getPodVolumesDir returns the full path to the per-pod data directory under
// which volumes are created for the specified pod. This directory may not
// exist if the pod does not exist.
func (kl *Kubelet) GetPodVolumesDir(podUID types.UID) string {
return path.Join(kl.GetPodDir(podUID), "volumes")
func (kl *Kubelet) getPodVolumesDir(podUID types.UID) string {
return path.Join(kl.getPodDir(podUID), "volumes")
}

// GetPodContainerDir returns the full path to the per-pod data directory under
// getPodVolumeDir returns the full path to the directory which represents the
// named volume under the named plugin for specified pod. This directory may not
// exist if the pod does not exist.
func (kl *Kubelet) getPodVolumeDir(podUID types.UID, pluginName string, volumeName string) string {
return path.Join(kl.getPodVolumesDir(podUID), pluginName, volumeName)
}

// getPodPluginsDir returns the full path to the per-pod data directory under
// which plugins may store data for the specified pod. This directory may not
// exist if the pod does not exist.
func (kl *Kubelet) getPodPluginsDir(podUID types.UID) string {
return path.Join(kl.getPodDir(podUID), "plugins")
}

// getPodPluginDir returns a data directory name for a given plugin name for a
// given pod UID. Plugins can use these directories to store data that they
// need to persist. For non-per-pod plugin data, see getPluginDir.
func (kl *Kubelet) getPodPluginDir(podUID types.UID, pluginName string) string {
return path.Join(kl.getPodPluginsDir(podUID), pluginName)
}

// getPodContainerDir returns the full path to the per-pod data directory under
// which container data is held for the specified pod. This directory may not
// exist if the pod or container does not exist.
func (kl *Kubelet) GetPodContainerDir(podUID types.UID, ctrName string) string {
func (kl *Kubelet) getPodContainerDir(podUID types.UID, ctrName string) string {
// Backwards compat. The "old" stuff should be removed before 1.0
// release. The thinking here is this:
// !old && !new = use new
// !old && new = use new
// old && !new = use old
// old && new = use new (but warn)
oldPath := path.Join(kl.GetPodDir(podUID), ctrName)
oldPath := path.Join(kl.getPodDir(podUID), ctrName)
oldExists := dirExists(oldPath)
newPath := path.Join(kl.GetPodDir(podUID), "containers", ctrName)
newPath := path.Join(kl.getPodDir(podUID), "containers", ctrName)
newExists := dirExists(newPath)
if oldExists && !newExists {
return oldPath
Expand All @@ -269,18 +312,21 @@ func dirExists(path string) bool {

func (kl *Kubelet) setupDataDirs() error {
kl.rootDirectory = path.Clean(kl.rootDirectory)
if err := os.MkdirAll(kl.GetRootDir(), 0750); err != nil {
if err := os.MkdirAll(kl.getRootDir(), 0750); err != nil {
return fmt.Errorf("error creating root directory: %v", err)
}
if err := os.MkdirAll(kl.GetPodsDir(), 0750); err != nil {
if err := os.MkdirAll(kl.getPodsDir(), 0750); err != nil {
return fmt.Errorf("error creating pods directory: %v", err)
}
if err := os.MkdirAll(kl.getPluginsDir(), 0750); err != nil {
return fmt.Errorf("error creating plugins directory: %v", err)
}
return nil
}

// Get a list of pods that have data directories.
func (kl *Kubelet) listPodsFromDisk() ([]types.UID, error) {
podInfos, err := ioutil.ReadDir(kl.GetPodsDir())
podInfos, err := ioutil.ReadDir(kl.getPodsDir())
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -508,27 +554,6 @@ func milliCPUToShares(milliCPU int64) int64 {
return shares
}

func (kl *Kubelet) mountExternalVolumes(pod *api.BoundPod) (volumeMap, error) {
podVolumes := make(volumeMap)
for _, vol := range pod.Spec.Volumes {
extVolume, err := volume.CreateVolumeBuilder(&vol, pod.Name, kl.rootDirectory)
if err != nil {
return nil, err
}
// TODO(jonesdl) When the default volume behavior is no longer supported, this case
// should never occur and an error should be thrown instead.
if extVolume == nil {
continue
}
podVolumes[vol.Name] = extVolume
err = extVolume.SetUp()
if err != nil {
return nil, err
}
}
return podVolumes, nil
}

// A basic interface that knows how to execute handlers
type actionHandler interface {
Run(podFullName string, uid types.UID, container *api.Container, handler *api.Handler) error
Expand Down Expand Up @@ -657,7 +682,7 @@ func (kl *Kubelet) runContainer(pod *api.BoundPod, container *api.Container, pod
}

if len(container.TerminationMessagePath) != 0 {
p := kl.GetPodContainerDir(pod.UID, container.Name)
p := kl.getPodContainerDir(pod.UID, container.Name)
if err := os.MkdirAll(p, 0750); err != nil {
glog.Errorf("Error on creating %q: %v", p, err)
} else {
Expand Down Expand Up @@ -974,10 +999,13 @@ func (kl *Kubelet) syncPod(pod *api.BoundPod, dockerContainers dockertools.Docke
glog.V(4).Infof("Syncing Pod, podFullName: %q, uid: %q", podFullName, uid)

// Make data dirs.
if err := os.Mkdir(kl.GetPodDir(uid), 0750); err != nil && !os.IsExist(err) {
if err := os.Mkdir(kl.getPodDir(uid), 0750); err != nil && !os.IsExist(err) {
return err
}
if err := os.Mkdir(kl.getPodVolumesDir(uid), 0750); err != nil && !os.IsExist(err) {
return err
}
if err := os.Mkdir(kl.GetPodVolumesDir(uid), 0750); err != nil && !os.IsExist(err) {
if err := os.Mkdir(kl.getPodPluginsDir(uid), 0750); err != nil && !os.IsExist(err) {
return err
}

Expand Down Expand Up @@ -1148,7 +1176,7 @@ func getDesiredVolumes(pods []api.BoundPod) map[string]api.Volume {
desiredVolumes := make(map[string]api.Volume)
for _, pod := range pods {
for _, volume := range pod.Spec.Volumes {
identifier := path.Join(pod.Name, volume.Name)
identifier := path.Join(string(pod.UID), volume.Name)
desiredVolumes[identifier] = volume
}
}
Expand All @@ -1168,7 +1196,7 @@ func (kl *Kubelet) cleanupOrphanedPods(pods []api.BoundPod) error {
for i := range found {
if !desired.Has(string(found[i])) {
glog.V(3).Infof("Orphaned pod %q found, removing", found[i])
if err := os.RemoveAll(kl.GetPodDir(found[i])); err != nil {
if err := os.RemoveAll(kl.getPodDir(found[i])); err != nil {
errlist = append(errlist, err)
}
}
Expand All @@ -1180,7 +1208,7 @@ func (kl *Kubelet) cleanupOrphanedPods(pods []api.BoundPod) error {
// If an active volume does not have a respective desired volume, clean it up.
func (kl *Kubelet) cleanupOrphanedVolumes(pods []api.BoundPod) error {
desiredVolumes := getDesiredVolumes(pods)
currentVolumes := volume.GetCurrentVolumes(kl.rootDirectory)
currentVolumes := kl.getPodVolumesFromDisk()
for name, vol := range currentVolumes {
if _, ok := desiredVolumes[name]; !ok {
//TODO (jonesdl) We should somehow differentiate between volumes that are supposed
Expand Down
Loading

0 comments on commit 6cb2758

Please sign in to comment.