forked from gruntwork-io/kubergrunt
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathdeploy.go
177 lines (158 loc) · 6.74 KB
/
deploy.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
package eks
import (
"math"
"strings"
"time"
"github.com/aws/aws-sdk-go/service/autoscaling"
"github.com/aws/aws-sdk-go/service/ec2"
"github.com/aws/aws-sdk-go/service/elb"
"github.com/gruntwork-io/gruntwork-cli/errors"
"github.com/gruntwork-io/kubergrunt/kubectl"
"github.com/gruntwork-io/kubergrunt/logging"
)
// RollOutDeployment will perform a zero downtime roll out of the current launch configuration associated with the
// provided ASG in the provided EKS cluster. This is accomplished by:
// 1. Double the desired capacity of the Auto Scaling Group that powers the EKS Cluster. This will launch new EKS
// workers with the new launch configuration.
// 2. Wait for the new nodes to be ready for Pod scheduling in Kubernetes.
// 3. Cordon the old nodes so that no new Pods will be scheduled there.
// 4. Drain the pods scheduled on the old EKS workers (using the equivalent of "kubectl drain"), so that they will be
// rescheduled on the new EKS workers.
// 5. Wait for all the pods to migrate off of the old EKS workers.
// 6. Set the desired capacity down to the original value and remove the old EKS workers from the ASG.
// TODO feature request: Break up into stages/checkpoints, and store state along the way so that command can pick up
// from a stage if something bad happens.
func RollOutDeployment(
region string,
eksAsgName string,
kubectlOptions *kubectl.KubectlOptions,
drainTimeout time.Duration,
maxRetries int,
sleepBetweenRetries time.Duration,
) error {
logger := logging.GetProjectLogger()
logger.Infof("Beginning roll out for EKS cluster worker group %s in %s", eksAsgName, region)
// Construct clients for AWS
sess, err := NewAuthenticatedSession(region)
if err != nil {
return errors.WithStackTrace(err)
}
asgSvc := autoscaling.New(sess)
ec2Svc := ec2.New(sess)
elbSvc := elb.New(sess)
logger.Infof("Successfully authenticated with AWS")
// Retrieve the ASG object and gather required info we will need later
originalCapacity, currentInstanceIds, err := getAsgInfo(asgSvc, eksAsgName)
if err != nil {
return err
}
// Calculate default max retries
if maxRetries == 0 {
maxRetries = getDefaultMaxRetries(originalCapacity, sleepBetweenRetries)
logger.Infof(
"No max retries set. Defaulted to %d based on sleep between retries duration of %s and scale up count %d.",
maxRetries,
sleepBetweenRetries,
originalCapacity,
)
}
// Make sure ASG is in steady state
if originalCapacity != int64(len(currentInstanceIds)) {
logger.Infof("Ensuring ASG is in steady state (current capacity = desired capacity)")
err = waitForCapacity(asgSvc, eksAsgName, maxRetries, sleepBetweenRetries)
if err != nil {
logger.Error("Error waiting for ASG to reach steady state. Try again after the ASG is in a steady state.")
return err
}
logger.Infof("Verified ASG is in steady state (current capacity = desired capacity)")
originalCapacity, currentInstanceIds, err = getAsgInfo(asgSvc, eksAsgName)
if err != nil {
return err
}
}
logger.Infof("Starting with the following list of instances in ASG:")
logger.Infof("%s", strings.Join(currentInstanceIds, ","))
logger.Infof("Launching new nodes with new launch config on ASG %s", eksAsgName)
err = scaleUp(
asgSvc,
ec2Svc,
elbSvc,
kubectlOptions,
eksAsgName,
originalCapacity*2,
currentInstanceIds,
maxRetries,
sleepBetweenRetries,
)
if err != nil {
return err
}
logger.Infof("Successfully launched new nodes with new launch config on ASG %s", eksAsgName)
logger.Infof("Cordoning old instances in cluster ASG %s to prevent Pod scheduling", eksAsgName)
err = cordonNodesInAsg(ec2Svc, kubectlOptions, currentInstanceIds)
if err != nil {
logger.Errorf("Error while cordoning nodes.")
logger.Errorf("Continue to cordon nodes that failed manually, and then terminate the underlying instances to complete the rollout.")
return err
}
logger.Infof("Successfully cordoned old instances in cluster ASG %s", eksAsgName)
logger.Infof("Draining Pods on old instances in cluster ASG %s", eksAsgName)
err = drainNodesInAsg(ec2Svc, kubectlOptions, currentInstanceIds, drainTimeout)
if err != nil {
logger.Errorf("Error while draining nodes.")
logger.Errorf("Continue to drain nodes that failed manually, and then terminate the underlying instances to complete the rollout.")
return err
}
logger.Infof("Successfully drained all scheduled Pods on old instances in cluster ASG %s", eksAsgName)
logger.Infof("Removing old nodes from ASG %s", eksAsgName)
err = detachInstances(asgSvc, eksAsgName, currentInstanceIds)
if err != nil {
logger.Errorf("Error while detaching the old instances.")
logger.Errorf("Continue to detach the old instances and then terminate the underlying instances to complete the rollout.")
return err
}
err = terminateInstances(ec2Svc, currentInstanceIds)
if err != nil {
logger.Errorf("Error while terminating the old instances.")
logger.Errorf("Continue to terminate the underlying instances to complete the rollout.")
return err
}
logger.Infof("Successfully removed old nodes from ASG %s", eksAsgName)
logger.Infof("Successfully finished roll out for EKS cluster worker group %s in %s", eksAsgName, region)
return nil
}
// Retrieves current state of the ASG and returns the original Capacity and the IDs of the instances currently
// associated with it.
func getAsgInfo(asgSvc *autoscaling.AutoScaling, asgName string) (int64, []string, error) {
logger := logging.GetProjectLogger()
logger.Infof("Retrieving current ASG info")
asg, err := GetAsgByName(asgSvc, asgName)
if err != nil {
return -1, nil, err
}
originalCapacity := *asg.DesiredCapacity
currentInstances := asg.Instances
currentInstanceIds := idsFromAsgInstances(currentInstances)
logger.Infof("Successfully retrieved current ASG info.")
logger.Infof("\tCurrent desired capacity: %d", originalCapacity)
logger.Infof("\tCurrent capacity: %d", len(currentInstances))
return originalCapacity, currentInstanceIds, nil
}
// Calculates the default max retries based on a heuristic of 5 minutes per wave. This assumes that the ASG scales up in
// waves of 10 instances, so the number of retries is:
// ceil(scaleUpCount / 10) * 5 minutes / sleepBetweenRetries
func getDefaultMaxRetries(scaleUpCount int64, sleepBetweenRetries time.Duration) int {
logger := logging.GetProjectLogger()
numWaves := int(math.Ceil(float64(scaleUpCount) / float64(10)))
logger.Debugf("Calculated number of waves as %d (scaleUpCount %d)", numWaves, scaleUpCount)
sleepBetweenRetriesSeconds := int(math.Trunc(sleepBetweenRetries.Seconds()))
defaultMaxRetries := numWaves * 600 / sleepBetweenRetriesSeconds
logger.Debugf(
"Calculated default max retries as %d (scaleUpCount %d, num waves %d, duration (s) %d)",
defaultMaxRetries,
scaleUpCount,
numWaves,
sleepBetweenRetriesSeconds,
)
return defaultMaxRetries
}