Skip to content

Commit

Permalink
Immediately evict pods and delete node when cloud says node is gone.
Browse files Browse the repository at this point in the history
  • Loading branch information
cjcullen committed Mar 7, 2016
1 parent 0867e25 commit e7fc608
Show file tree
Hide file tree
Showing 2 changed files with 134 additions and 42 deletions.
114 changes: 73 additions & 41 deletions pkg/controller/node/nodecontroller.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,8 @@ type NodeController struct {
daemonSetController *framework.Controller
daemonSetStore cache.StoreToDaemonSetLister

forcefullyDeletePod func(*api.Pod) error
forcefullyDeletePod func(*api.Pod) error
nodeExistsInCloudProvider func(string) (bool, error)
}

// NewNodeController returns a new node controller to sync instances from cloudprovider.
Expand Down Expand Up @@ -149,24 +150,25 @@ func NewNodeController(
evictorLock := sync.Mutex{}

nc := &NodeController{
cloud: cloud,
knownNodeSet: make(sets.String),
kubeClient: kubeClient,
recorder: recorder,
podEvictionTimeout: podEvictionTimeout,
maximumGracePeriod: 5 * time.Minute,
evictorLock: &evictorLock,
podEvictor: NewRateLimitedTimedQueue(deletionEvictionLimiter),
terminationEvictor: NewRateLimitedTimedQueue(terminationEvictionLimiter),
nodeStatusMap: make(map[string]nodeStatusData),
nodeMonitorGracePeriod: nodeMonitorGracePeriod,
nodeMonitorPeriod: nodeMonitorPeriod,
nodeStartupGracePeriod: nodeStartupGracePeriod,
lookupIP: net.LookupIP,
now: unversioned.Now,
clusterCIDR: clusterCIDR,
allocateNodeCIDRs: allocateNodeCIDRs,
forcefullyDeletePod: func(p *api.Pod) error { return forcefullyDeletePod(kubeClient, p) },
cloud: cloud,
knownNodeSet: make(sets.String),
kubeClient: kubeClient,
recorder: recorder,
podEvictionTimeout: podEvictionTimeout,
maximumGracePeriod: 5 * time.Minute,
evictorLock: &evictorLock,
podEvictor: NewRateLimitedTimedQueue(deletionEvictionLimiter),
terminationEvictor: NewRateLimitedTimedQueue(terminationEvictionLimiter),
nodeStatusMap: make(map[string]nodeStatusData),
nodeMonitorGracePeriod: nodeMonitorGracePeriod,
nodeMonitorPeriod: nodeMonitorPeriod,
nodeStartupGracePeriod: nodeStartupGracePeriod,
lookupIP: net.LookupIP,
now: unversioned.Now,
clusterCIDR: clusterCIDR,
allocateNodeCIDRs: allocateNodeCIDRs,
forcefullyDeletePod: func(p *api.Pod) error { return forcefullyDeletePod(kubeClient, p) },
nodeExistsInCloudProvider: func(nodeName string) (bool, error) { return nodeExistsInCloudProvider(cloud, nodeName) },
}

nc.podStore.Store, nc.podController = framework.NewInformer(
Expand Down Expand Up @@ -462,40 +464,70 @@ func (nc *NodeController) monitorNodeStatus() error {
}

// Check with the cloud provider to see if the node still exists. If it
// doesn't, delete the node and all pods scheduled on the node.
// doesn't, delete the node immediately.
if readyCondition.Status != api.ConditionTrue && nc.cloud != nil {
instances, ok := nc.cloud.Instances()
if !ok {
glog.Errorf("%v", ErrCloudInstance)
exists, err := nc.nodeExistsInCloudProvider(node.Name)
if err != nil {
glog.Errorf("Error determining if node %v exists in cloud: %v", node.Name, err)
continue
}
if _, err := instances.ExternalID(node.Name); err != nil && err == cloudprovider.InstanceNotFound {
if !exists {
glog.Infof("Deleting node (no longer present in cloud provider): %s", node.Name)
nc.recordNodeEvent(node.Name, api.EventTypeNormal, "DeletingNode", fmt.Sprintf("Deleting Node %v because it's not present according to cloud provider", node.Name))

remaining, err := nc.hasPods(node.Name)
if err != nil {
glog.Errorf("Unable to determine whether node %s has pods, will retry: %v", node.Name, err)
continue
}
if remaining {
// queue eviction of the pods on the node
glog.V(2).Infof("Deleting node %s is delayed while pods are evicted", node.Name)
nc.evictPods(node.Name)
continue
}

if err := nc.kubeClient.Core().Nodes().Delete(node.Name, nil); err != nil {
glog.Errorf("Unable to delete node %s: %v", node.Name, err)
continue
}
go func(nodeName string) {
defer utilruntime.HandleCrash()
// Kubelet is not reporting and Cloud Provider says node
// is gone. Delete it without worrying about grace
// periods.
if err := nc.forcefullyDeleteNode(nodeName); err != nil {
glog.Errorf("Unable to forcefully delete node %q: %v", nodeName, err)
}
}(node.Name)
continue
}
}
}
}
return nil
}

func nodeExistsInCloudProvider(cloud cloudprovider.Interface, nodeName string) (bool, error) {
instances, ok := cloud.Instances()
if !ok {
return false, fmt.Errorf("%v", ErrCloudInstance)
}
if _, err := instances.ExternalID(nodeName); err != nil {
if err == cloudprovider.InstanceNotFound {
return false, nil
}
return false, err
}
return true, nil
}

// forcefullyDeleteNode immediately deletes all pods on the node, and then
// deletes the node itself.
func (nc *NodeController) forcefullyDeleteNode(nodeName string) error {
selector := fields.OneTermEqualSelector(api.PodHostField, nodeName)
options := api.ListOptions{FieldSelector: selector}
pods, err := nc.kubeClient.Core().Pods(api.NamespaceAll).List(options)
if err != nil {
return fmt.Errorf("unable to list pods on node %q: %v", nodeName, err)
}
for _, pod := range pods.Items {
if pod.Spec.NodeName != nodeName {
continue
}
if err := nc.forcefullyDeletePod(&pod); err != nil {
return fmt.Errorf("unable to delete pod %q on node %q: %v", pod.Name, nodeName, err)
}
}
if err := nc.kubeClient.Core().Nodes().Delete(nodeName, nil); err != nil {
return fmt.Errorf("unable to delete node %q: %v", nodeName, err)
}
return nil
}

// reconcileNodeCIDRs looks at each node and assigns it a valid CIDR
// if it doesn't currently have one.
func (nc *NodeController) reconcileNodeCIDRs(nodes *api.NodeList) {
Expand Down
62 changes: 61 additions & 1 deletion pkg/controller/node/nodecontroller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,9 @@ import (
"k8s.io/kubernetes/pkg/client/cache"
"k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/fake"
unversionedcore "k8s.io/kubernetes/pkg/client/typed/generated/core/unversioned"
fakecloud "k8s.io/kubernetes/pkg/cloudprovider/providers/fake"
"k8s.io/kubernetes/pkg/util"
"k8s.io/kubernetes/pkg/util/wait"
"k8s.io/kubernetes/pkg/watch"
)

Expand Down Expand Up @@ -59,7 +61,8 @@ type FakeNodeHandler struct {
RequestCount int

// Synchronization
createLock sync.Mutex
createLock sync.Mutex
deleteWaitChan chan struct{}
}

type FakeLegacyHandler struct {
Expand Down Expand Up @@ -125,6 +128,11 @@ func (m *FakeNodeHandler) List(opts api.ListOptions) (*api.NodeList, error) {
}

func (m *FakeNodeHandler) Delete(id string, opt *api.DeleteOptions) error {
defer func() {
if m.deleteWaitChan != nil {
m.deleteWaitChan <- struct{}{}
}
}()
m.DeletedNodes = append(m.DeletedNodes, newNode(id))
m.RequestCount++
return nil
Expand Down Expand Up @@ -451,6 +459,58 @@ func TestMonitorNodeStatusEvictPods(t *testing.T) {
}
}

// TestCloudProviderNoRateLimit tests that monitorNodes() immediately deletes
// pods and the node when kubelet has not reported, and the cloudprovider says
// the node is gone.
func TestCloudProviderNoRateLimit(t *testing.T) {
fnh := &FakeNodeHandler{
Existing: []*api.Node{
{
ObjectMeta: api.ObjectMeta{
Name: "node0",
CreationTimestamp: unversioned.Date(2012, 1, 1, 0, 0, 0, 0, time.UTC),
},
Status: api.NodeStatus{
Conditions: []api.NodeCondition{
{
Type: api.NodeReady,
Status: api.ConditionUnknown,
LastHeartbeatTime: unversioned.Date(2015, 1, 1, 12, 0, 0, 0, time.UTC),
LastTransitionTime: unversioned.Date(2015, 1, 1, 12, 0, 0, 0, time.UTC),
},
},
},
},
},
Clientset: fake.NewSimpleClientset(&api.PodList{Items: []api.Pod{*newPod("pod0", "node0"), *newPod("pod1", "node0")}}),
deleteWaitChan: make(chan struct{}),
}
nodeController := NewNodeController(nil, fnh, 10*time.Minute,
util.NewFakeAlwaysRateLimiter(), util.NewFakeAlwaysRateLimiter(),
testNodeMonitorGracePeriod, testNodeStartupGracePeriod,
testNodeMonitorPeriod, nil, false)
nodeController.cloud = &fakecloud.FakeCloud{}
nodeController.now = func() unversioned.Time { return unversioned.Date(2016, 1, 1, 12, 0, 0, 0, time.UTC) }
nodeController.nodeExistsInCloudProvider = func(nodeName string) (bool, error) {
return false, nil
}
// monitorNodeStatus should allow this node to be immediately deleted
if err := nodeController.monitorNodeStatus(); err != nil {
t.Errorf("unexpected error: %v", err)
}
select {
case <-fnh.deleteWaitChan:
case <-time.After(wait.ForeverTestTimeout):
t.Errorf("Timed out waiting %v for node to be deleted", wait.ForeverTestTimeout)
}
if len(fnh.DeletedNodes) != 1 || fnh.DeletedNodes[0].Name != "node0" {
t.Errorf("Node was not deleted")
}
if nodeOnQueue := nodeController.podEvictor.Remove("node0"); nodeOnQueue {
t.Errorf("Node was queued for eviction. Should have been immediately deleted.")
}
}

func TestMonitorNodeStatusUpdateStatus(t *testing.T) {
fakeNow := unversioned.Date(2015, 1, 1, 12, 0, 0, 0, time.UTC)
table := []struct {
Expand Down

0 comments on commit e7fc608

Please sign in to comment.