Skip to content

Commit

Permalink
Copy EC2 tags to spot instances as early as possible.
Browse files Browse the repository at this point in the history
Sometimes the user_data script relies EC2 tags, and also on the fact that new
instances are already tagged by the time they are booting, in a way identical
to all the instances from the AutoScaling group.

Unfortunately EC2 tags are not part of the launch configuration, so they
need to be manually copied to the new spot instances.

Initially the EC2 tags were copied late in the replacement process,
right before the instance was added to the group, which may have been too
late for the user_data script, so the instances were added before they
would be set up, or in the worst (but most common) scenario, they would not
be set up at all in case the user_data script doesn't continuously poll the
instance tags for what it actually expects.

The instance replacement logic was changed so that the new spot instances
are tagged as early as possiblei, by waiting for the request to be
fulfilled and applying the tags immediately.

Change-Id: If63dff0dbf2a6ea31944b32af8d530aa595a1fb5
  • Loading branch information
cristim committed Jul 16, 2016
1 parent f0f110d commit 884d0c6
Show file tree
Hide file tree
Showing 2 changed files with 175 additions and 58 deletions.
213 changes: 163 additions & 50 deletions agent/autospotting/autoscaling.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ func (a *autoScalingGroup) process() {
spotInstanceID, waitForNextRun := a.havingReadyToAttachSpotInstance()

if waitForNextRun == true {
logger.Println("Waiting for next run while processing ", a.name)
logger.Println("Waiting for next run while processing", a.name)
return
}

Expand All @@ -48,7 +48,7 @@ func (a *autoScalingGroup) process() {
a.replaceOnDemandInstanceWithSpot(spotInstanceID)
} else {
// find any given on-demand instance and try to replace it with a spot one
onDemandInstance := a.findOndemandInstanceDetails()
onDemandInstance := a.findInstanceDetails(true)

if onDemandInstance == nil {
logger.Println(a.region.name, a.name,
Expand Down Expand Up @@ -86,12 +86,6 @@ func (a *autoScalingGroup) replaceOnDemandInstanceWithSpot(
minSize, maxSize := *asg.MinSize, *asg.MaxSize
desiredCapacity := *asg.DesiredCapacity

// Tag the Spot instance with the tags of any of the existing on-demand
// instances. The tags first need to be filtered from some invalid values
// which can't just be duplicated.
logger.Println(a.name, "Tagging instance", *spotInstanceID)
a.region.tagInstance(spotInstanceID, a.filterInstanceTags())

// temporarily increase AutoScaling group in case it's of static size
if minSize == maxSize {
logger.Println(a.name, "Temporarily increasing MaxSize")
Expand All @@ -111,7 +105,7 @@ func (a *autoScalingGroup) replaceOnDemandInstanceWithSpot(
// find an on-demand instance from the same AZ as our spot instance
if odInst := a.findOndemandInstanceInAZ(az); odInst != nil {

logger.Println(a.name, "found", odInst.InstanceId,
logger.Println(a.name, "found", *odInst.InstanceId,
"attaching to the group")

// revert attach/detach order when running on minimum capacity
Expand All @@ -127,7 +121,7 @@ func (a *autoScalingGroup) replaceOnDemandInstanceWithSpot(
}

func (a *autoScalingGroup) getInstanceTags() []*ec2.Tag {
if instance := a.findOndemandInstanceDetails(); instance != nil {
if instance := a.findInstanceDetails(false); instance != nil {
return instance.Tags
}
return nil
Expand All @@ -140,15 +134,21 @@ func (a *autoScalingGroup) findInstanceByID(instanceID *string) *ec2.Instance {

// Returns the information about the first on-demand running instance found
// while iterating over all instances from the group.
func (a *autoScalingGroup) findOndemandInstanceDetails() *ec2.Instance {
func (a *autoScalingGroup) findInstanceDetails(
onDemandOnly bool) *ec2.Instance {

for _, instance := range a.asgRawData.Instances {
instanceData := a.region.instances[*instance.InstanceId]

if instanceData != nil && *instanceData.State.Name == "running" &&
// this attribute is non-nil only for spot instances, where it contains
// the value "spot"
instanceData.InstanceLifecycle == nil {
// instance is running
if instanceData != nil && *instanceData.State.Name == "running" {

// the InstanceLifecycle attribute is non-nil only for spot instances,
// where it contains the value "spot", if we're looking for on-demand
// instances only, then we have to skip the current instance.
if onDemandOnly && instanceData.InstanceLifecycle != nil {
continue
}
return instanceData
}
}
Expand Down Expand Up @@ -188,68 +188,108 @@ func (a *autoScalingGroup) havingReadyToAttachSpotInstance() (*string, bool) {
// then we can launch a new spot instance
if len(a.spotInstanceRequests) == 0 {
logger.Println(a.name, "no spot bids were found")
if inst := a.findOndemandInstanceDetails(); inst != nil {
if inst := a.findInstanceDetails(true); inst != nil {
logger.Println(a.name, "on-demand instances were found, proceeding to "+
"launch a replacement spot instance")
return nil, false
}
// Looks like we have no instances in the group, so we can stop here
logger.Println(a.name, "no on-demand instances were found, nothing to do")
return nil, true
}

logger.Println("spot bids were found, continuing")

// Here we search for open spot requests created for the current ASG, and try
// to wait for their instances to start.
for _, req := range a.spotInstanceRequests {
if *req.State == "open" || *req.State == "failed" {
logger.Println(a.name, "Open or failed bids found, "+
"waiting for the next run...")
return nil, true
if *req.State == "open" && *req.Tags[0].Value == a.name {
logger.Println(a.name, "Open bid found for current AutoScaling Group, "+
"waiting for the instance to start so it can be tagged...")

// Here we resume the wait for instances, initiated after requesting the
// spot instance. This may sometimes time out the entire lambda function
// run, just like it could time out the one done when we requested the
// new instance. In case of timeout the next run should continue waiting
// for the instance, and the process should continue until the new
// instance was found. In case of failed spot requests, the first lambda
// function timeout when waiting for the instances would break the loop,
// because the subsequent run would find a failed spot request instead
// of an open one.
a.waitForSpotInstance(req)
activeSpotInstanceRequest = req
}

// We found a spot request with a running instance.
if *req.State == "active" &&
*req.Status.Code == "fulfilled" {
logger.Println(a.name, "Active bid was found, with instance already "+
"started:", *req.InstanceId)

// If the instance is already in the group we don't need to do anything.
if a.hasInstance(*req.InstanceId) {
logger.Println(a.name, "Active bid was found, with instance already "+
"attached to the ASG, skipping...")
logger.Println(a.name, "Instance", *req.InstanceId,
"is already attached to the ASG, skipping...")
continue

// In case the instance wasn't yet attached, we prepare to attach it.
} else {
if a.region.instances[*req.InstanceId].State != nil &&
logger.Println(a.name, "Instance", *req.InstanceId,
"is not yet attached to the ASG, checking if it's running")

if a.region.instances[*req.InstanceId] != nil &&
a.region.instances[*req.InstanceId].State != nil &&
*a.region.instances[*req.InstanceId].State.Name == "running" {
logger.Println(a.name, "Active bid was found, with running "+
"instances not yet attached to the ASG",
*req.InstanceId)
activeSpotInstanceRequest = req
break
} else {
logger.Println(a.name, "Active bid was found, with non-running "+
"instances, waiting for the next run...")
return nil, true
logger.Println(a.name, "Active bid was found, with no running "+
"instances, waiting for an instance to start ...")
a.waitForSpotInstance(req)
activeSpotInstanceRequest = req
}
}
}
}

// in this case we can launch a new spot instance if we can
// In case we don't have any active spot requests with instances in the
// process of starting or already ready to be attached to the group, we can
// launch a new spot instance.
if activeSpotInstanceRequest == nil {
logger.Println(a.name, "No active unfulfilled bid was found")
return nil, false
}

// Show information about the found unattached spot instance
spotInstanceID := activeSpotInstanceRequest.InstanceId
logger.Println("Considering ", *spotInstanceID,
"for attaching to", a.name)

logger.Println(a.name, "found instance", *spotInstanceID, "tagging it first")

// Here we should have an unattached spot instance, trying to tag it with
// the EC2 tags set on the other instances already added to the autoscaling
// group.
a.region.tagInstance(spotInstanceID, a.filterInstanceTags())

logger.Println("Considering ", *spotInstanceID, "for attaching to", a.name)

instData := a.region.instances[*spotInstanceID]
gracePeriod := *a.asgRawData.HealthCheckGracePeriod

if instData.LaunchTime == nil {
return nil, true
}

instanceUpTime := time.Now().Unix() - instData.LaunchTime.Unix()

// check if the spot instance is out of the grace period, so
// in that case we can replace an on-demand instance with it
// Check if the spot instance is out of the grace period, so in that case we
// can replace an on-demand instance with it
if *instData.State.Name == "running" &&
instanceUpTime < gracePeriod {
logger.Println("The new spot instance", *spotInstanceID,
"is still in the grace period, waiting for it to be ready...")
"is still in the grace period,",
"waiting for it to be ready before we can attach it to the group...")
return nil, true
}
return spotInstanceID, false
Expand All @@ -264,6 +304,43 @@ func (a *autoScalingGroup) hasInstance(instanceID string) bool {
return false
}

func (a *autoScalingGroup) waitForSpotInstance(
spotRequest *ec2.SpotInstanceRequest) *string {

logger.Println(a.name, "Waiting for spot instance for",
spotRequest.SpotInstanceRequestId)

ec2Client := a.region.services.ec2

// Keep trying until the instance was found.
for {
params := ec2.DescribeSpotInstanceRequestsInput{
SpotInstanceRequestIds: []*string{spotRequest.SpotInstanceRequestId},
}

requestDetails, err := ec2Client.DescribeSpotInstanceRequests(&params)
if err != nil {
logger.Println(a.name, "Failed to describe spot instance requests")
}

logger.Println(a.name, "Refreshed details for",
*spotRequest.SpotInstanceRequestId,
requestDetails,
"checking for a running instance")

if len(requestDetails.SpotInstanceRequests) == 1 {
instanceID := requestDetails.SpotInstanceRequests[0].InstanceId

if instanceID != nil {
return instanceID
}
}

logger.Println(a.name, "Couldn't find instance, retrying in 5 seconds")
time.Sleep(5 * time.Second)
}
}

func (a *autoScalingGroup) hasEqualAvailibilityZones() bool {

var azInstanceCount = make(map[string]int)
Expand Down Expand Up @@ -306,7 +383,7 @@ func (a *autoScalingGroup) launchCheapestSpotInstance(azToLaunchIn *string) {
logger.Println("Trying to launch spot instance in", *azToLaunchIn,
"\nfirst finding an on-demand instance to use as a template")

baseInstance := a.findOndemandInstanceDetails()
baseInstance := a.findInstanceDetails(true)

if baseInstance == nil {
logger.Println("Found no on-demand instances, nothing to do here...")
Expand Down Expand Up @@ -384,10 +461,29 @@ func (a *autoScalingGroup) bidForSpotInstance(
return
}

logger.Println(a.name, "Created spot instance request",
*resp.SpotInstanceRequests[0].SpotInstanceRequestId)
spotRequest := resp.SpotInstanceRequests[0]
spotRequestID := spotRequest.SpotInstanceRequestId

logger.Println(a.name, "Created spot instance request", *spotRequestID)

// tag the spot instance request to associate it with the current ASG, so we
// know where to attach the instance later.
a.tagSpotInstanceRequest(*spotRequestID)

a.tagSpotInstanceRequest(*resp.SpotInstanceRequests[0].SpotInstanceRequestId)
// Waiting for he instance to start so that we can then later tag it with
// the same tags originally set on the on-demand instances.
//
// This wait is only returns after the instance was found and it may be
// interrupted by the lambda function's timeout, so we also need to check in
// the next run if we have any open spot requests with no instances and
// resume the wait there.
spotInstanceID := a.waitForSpotInstance(spotRequest)

if spotInstanceID != nil {
logger.Println(a.name, "found new spot instance", *spotInstanceID,
"\nTagging it to match the other instances from the group")
a.region.tagInstance(spotInstanceID, a.filterInstanceTags())
}
}

func (a *autoScalingGroup) tagSpotInstanceRequest(requestID string) {
Expand All @@ -406,7 +502,8 @@ func (a *autoScalingGroup) tagSpotInstanceRequest(requestID string) {
if err != nil {
// Print the error, cast err to awserr.Error to get the Code and
// Message from an error.
logger.Println(a.name, "Failed to create tags for the spot instance request",
logger.Println(a.name,
"Failed to create tags for the spot instance request",
err.Error())
return
}
Expand Down Expand Up @@ -503,13 +600,17 @@ func convertLaunchConfigurationToSpotSpecification(

// Checks if the security groups are given by ID or by free-form names, which
// was possible in EC2 Classic
func havingFreeFormSecurityGroupNames(lc *autoscaling.LaunchConfiguration) bool {
func havingFreeFormSecurityGroupNames(
lc *autoscaling.LaunchConfiguration) bool {

for _, sg := range lc.SecurityGroups {

if !strings.HasPrefix(*sg, "sg-") {
logger.Println(*sg)
return true
}
}

return false
}

Expand Down Expand Up @@ -569,8 +670,13 @@ func (a *autoScalingGroup) attachSpotInstance(spotInstanceID *string) {

// Terminates an on-demand instance from the group,
// but only after it was detached from the autoscaling group
func (a *autoScalingGroup) detachAndTerminateOnDemandInstance(instanceID *string) {
logger.Println(a.region.name, a.name, "Detaching and terminating instance:", *instanceID)
func (a *autoScalingGroup) detachAndTerminateOnDemandInstance(
instanceID *string) {

logger.Println(a.region.name,
a.name,
"Detaching and terminating instance:",
*instanceID)

// detach the on-demand instance
detachParams := autoscaling.DetachInstancesInput{
Expand Down Expand Up @@ -602,7 +708,8 @@ func (a *autoScalingGroup) detachAndTerminateOnDemandInstance(instanceID *string
}

func (a *autoScalingGroup) getCheapestCompatibleSpotInstanceType(
availabilityZone string, baseInstance *ec2.Instance) *string {
availabilityZone string,
baseInstance *ec2.Instance) *string {

logger.Println("Getting cheapest spot instance compatible to ",
*baseInstance.InstanceId, " of type", *baseInstance.InstanceType)
Expand Down Expand Up @@ -693,12 +800,17 @@ func (a *autoScalingGroup) getCompatibleSpotInstanceTypes(

// checking how many spot instances of this type we already have, so that
// we can see how risky it is to launch a new one.
spotInstanceCount := a.alreadyRunningSpotInstanceCount(inst.instanceType, availabilityZone)

// We skip it in case we have more than 20% instances of this type already running
if spotInstanceCount == 0 || (*a.asgRawData.DesiredCapacity/spotInstanceCount > 4) {
logger.Println(a.name, "no redundancy issues found for", inst.instanceType,
"existing", spotInstanceCount, "spot instances, adding for comparison",
spotInstanceCount := a.alreadyRunningSpotInstanceCount(
inst.instanceType, availabilityZone)

// We skip it in case we have more than 20% instances of this type already
// running
if spotInstanceCount == 0 ||
(*a.asgRawData.DesiredCapacity/spotInstanceCount > 4) {
logger.Println(a.name,
"no redundancy issues found for", inst.instanceType,
"existing", spotInstanceCount,
"spot instances, adding for comparison",
)

filteredInstanceTypes = append(filteredInstanceTypes, inst.instanceType)
Expand Down Expand Up @@ -732,6 +844,7 @@ func compatibleVirtualization(virtualizationType string,
return false
}

// Counts the number of already running spot instances.
func (a *autoScalingGroup) alreadyRunningSpotInstanceCount(
instanceType, availabilityZone string) int64 {

Expand All @@ -744,8 +857,8 @@ func (a *autoScalingGroup) alreadyRunningSpotInstanceCount(
*instDetails.Placement.AvailabilityZone == availabilityZone &&
instDetails.InstanceLifecycle != nil &&
*instDetails.InstanceLifecycle == "spot" {
logger.Println(a.name, "Found running spot instance ", *instDetails.InstanceId,
"of the same type:", instanceType)
logger.Println(a.name, "Found running spot instance ",
*instDetails.InstanceId, "of the same type:", instanceType)
count++
}
}
Expand Down
Loading

0 comments on commit 884d0c6

Please sign in to comment.