forked from gruntwork-io/kubergrunt
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathnode.go
204 lines (186 loc) · 6.32 KB
/
node.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
package kubectl
import (
"sync"
"time"
"github.com/gruntwork-io/gruntwork-cli/collections"
"github.com/gruntwork-io/gruntwork-cli/errors"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
"github.com/gruntwork-io/kubergrunt/logging"
)
// WaitForNodesReady will continuously watch the nodes until they reach the ready state.
func WaitForNodesReady(
kubectlOptions *KubectlOptions,
nodeIds []string,
maxRetries int,
sleepBetweenRetries time.Duration,
) error {
logger := logging.GetProjectLogger()
logger.Infof("Waiting for %d nodes in Kubernetes to reach ready state", len(nodeIds))
client, err := GetKubernetesClientFromOptions(kubectlOptions)
if err != nil {
return errors.WithStackTrace(err)
}
for i := 0; i < maxRetries; i++ {
logger.Infof("Checking if nodes ready")
nodes, err := GetNodes(client, metav1.ListOptions{})
if err != nil {
return errors.WithStackTrace(err)
}
newNodes := filterNodesByID(nodes, nodeIds)
logger.Debugf("Received %d nodes. Expecting %d nodes.", len(newNodes), len(nodeIds))
allNewNodesRegistered := len(newNodes) == len(nodeIds)
allNewNodesReady := allNodesReady(newNodes)
if allNewNodesRegistered && allNewNodesReady {
return nil
}
if !allNewNodesRegistered {
logger.Infof("Not all nodes are registered yet")
}
if !allNewNodesReady {
logger.Infof("Not all nodes are ready yet")
}
logger.Infof("Waiting for %s...", sleepBetweenRetries)
time.Sleep(sleepBetweenRetries)
}
// Time out
logger.Errorf("Timedout waiting for nodes to reach ready state")
if err := reportAllNotReadyNodes(client, nodeIds); err != nil {
return err
}
return errors.WithStackTrace(NewNodeReadyTimeoutError(len(nodeIds)))
}
// reportAllNotReadyNodes will log error messages for each node that is not ready
func reportAllNotReadyNodes(client *kubernetes.Clientset, nodeIds []string) error {
logger := logging.GetProjectLogger()
nodes, err := GetNodes(client, metav1.ListOptions{})
if err != nil {
return errors.WithStackTrace(err)
}
filteredNodes := filterNodesByID(nodes, nodeIds)
for _, node := range filteredNodes {
if !IsNodeReady(node) {
logger.Errorf("Node %s is not ready", node.Name)
}
}
return nil
}
// allNodesReady will return true if all the nodes in the list are ready, and false when any node is not.
func allNodesReady(nodes []corev1.Node) bool {
logger := logging.GetProjectLogger()
for _, node := range nodes {
if !IsNodeReady(node) {
logger.Debugf("Node %s is not ready", node.Name)
return false
}
logger.Debugf("Node %s is ready", node.Name)
}
return true
}
// filterNodesByID will return the list of nodes that correspond to the given node id
func filterNodesByID(nodes []corev1.Node, nodeIds []string) []corev1.Node {
filteredNodes := []corev1.Node{}
for _, node := range nodes {
if collections.ListContainsElement(nodeIds, node.Name) {
filteredNodes = append(filteredNodes, node)
}
}
return filteredNodes
}
// DrainNodes calls `kubectl drain` on each node provided. Draining a node consists of:
// - Taint the nodes so that new pods are not scheduled
// - Evict all the pods gracefully
// See
// https://kubernetes.io/docs/tasks/administer-cluster/safely-drain-node/#use-kubectl-drain-to-remove-a-node-from-service
// for more information.
func DrainNodes(kubectlOptions *KubectlOptions, nodeIds []string, timeout time.Duration) error {
// Concurrently trigger drain events for all requested nodes.
var wg sync.WaitGroup // So that we can wait for all the drain calls
errChannel := make(chan NodeDrainError, 1) // Collect all errors from each command
for _, nodeID := range nodeIds {
wg.Add(1)
go drainNode(&wg, errChannel, kubectlOptions, nodeID, timeout)
}
go waitForAllDrains(&wg, errChannel)
drainErrors := NewNodeDrainErrors()
for err := range errChannel {
if err.Error != nil {
drainErrors.AddError(err)
}
}
if !drainErrors.IsEmpty() {
return errors.WithStackTrace(drainErrors)
}
return nil
}
func drainNode(
wg *sync.WaitGroup,
errChannel chan<- NodeDrainError,
kubectlOptions *KubectlOptions,
nodeID string,
timeout time.Duration,
) {
defer wg.Done()
err := RunKubectl(kubectlOptions, "drain", nodeID, "--ignore-daemonsets", "--timeout", timeout.String())
errChannel <- NodeDrainError{NodeID: nodeID, Error: err}
}
func waitForAllDrains(wg *sync.WaitGroup, errChannel chan<- NodeDrainError) {
wg.Wait()
close(errChannel)
}
// CordonNodes calls `kubectl cordon` on each node provided. Cordoning a node makes it unschedulable, preventing new
// Pods from being scheduled on the node. Note that cordoning a node does not evict the running Pods. To evict existing
// Pods, use DrainNodes.
func CordonNodes(kubectlOptions *KubectlOptions, nodeIds []string) error {
// Concurrently trigger cordon events for all requested nodes.
var wg sync.WaitGroup // So that we can wait for all the cordon calls
errChannel := make(chan NodeCordonError, 1) // Collect all errors from each command
for _, nodeID := range nodeIds {
wg.Add(1)
go cordonNode(&wg, errChannel, kubectlOptions, nodeID)
}
go waitForAllCordons(&wg, errChannel)
cordonErrors := NewNodeCordonErrors()
for err := range errChannel {
if err.Error != nil {
cordonErrors.AddError(err)
}
}
if !cordonErrors.IsEmpty() {
return errors.WithStackTrace(cordonErrors)
}
return nil
}
func cordonNode(
wg *sync.WaitGroup,
errChannel chan<- NodeCordonError,
kubectlOptions *KubectlOptions,
nodeID string,
) {
defer wg.Done()
err := RunKubectl(kubectlOptions, "cordon", nodeID)
errChannel <- NodeCordonError{NodeID: nodeID, Error: err}
}
func waitForAllCordons(wg *sync.WaitGroup, errChannel chan<- NodeCordonError) {
wg.Wait()
close(errChannel)
}
// GetNodes queries Kubernetes for information about the worker nodes registered to the cluster, given a
// clientset.
func GetNodes(clientset *kubernetes.Clientset, options metav1.ListOptions) ([]corev1.Node, error) {
nodes, err := clientset.CoreV1().Nodes().List(options)
if err != nil {
return nil, err
}
return nodes.Items, err
}
// IsNodeReady takes a Kubernetes Node information object and checks if the Node is in the ready state.
func IsNodeReady(node corev1.Node) bool {
for _, condition := range node.Status.Conditions {
if condition.Type == corev1.NodeReady {
return condition.Status == corev1.ConditionTrue
}
}
return false
}