diff --git a/agent/autospotting/autoscaling.go b/agent/autospotting/autoscaling.go index 703bbb95..e3237739 100644 --- a/agent/autospotting/autoscaling.go +++ b/agent/autospotting/autoscaling.go @@ -37,7 +37,7 @@ func (a *autoScalingGroup) process() { spotInstanceID, waitForNextRun := a.havingReadyToAttachSpotInstance() if waitForNextRun == true { - logger.Println("Waiting for next run while processing ", a.name) + logger.Println("Waiting for next run while processing", a.name) return } @@ -48,7 +48,7 @@ func (a *autoScalingGroup) process() { a.replaceOnDemandInstanceWithSpot(spotInstanceID) } else { // find any given on-demand instance and try to replace it with a spot one - onDemandInstance := a.findOndemandInstanceDetails() + onDemandInstance := a.findInstanceDetails(true) if onDemandInstance == nil { logger.Println(a.region.name, a.name, @@ -86,12 +86,6 @@ func (a *autoScalingGroup) replaceOnDemandInstanceWithSpot( minSize, maxSize := *asg.MinSize, *asg.MaxSize desiredCapacity := *asg.DesiredCapacity - // Tag the Spot instance with the tags of any of the existing on-demand - // instances. The tags first need to be filtered from some invalid values - // which can't just be duplicated. - logger.Println(a.name, "Tagging instance", *spotInstanceID) - a.region.tagInstance(spotInstanceID, a.filterInstanceTags()) - // temporarily increase AutoScaling group in case it's of static size if minSize == maxSize { logger.Println(a.name, "Temporarily increasing MaxSize") @@ -111,7 +105,7 @@ func (a *autoScalingGroup) replaceOnDemandInstanceWithSpot( // find an on-demand instance from the same AZ as our spot instance if odInst := a.findOndemandInstanceInAZ(az); odInst != nil { - logger.Println(a.name, "found", odInst.InstanceId, + logger.Println(a.name, "found", *odInst.InstanceId, "attaching to the group") // revert attach/detach order when running on minimum capacity @@ -127,7 +121,7 @@ func (a *autoScalingGroup) replaceOnDemandInstanceWithSpot( } func (a *autoScalingGroup) getInstanceTags() []*ec2.Tag { - if instance := a.findOndemandInstanceDetails(); instance != nil { + if instance := a.findInstanceDetails(false); instance != nil { return instance.Tags } return nil @@ -140,15 +134,21 @@ func (a *autoScalingGroup) findInstanceByID(instanceID *string) *ec2.Instance { // Returns the information about the first on-demand running instance found // while iterating over all instances from the group. -func (a *autoScalingGroup) findOndemandInstanceDetails() *ec2.Instance { +func (a *autoScalingGroup) findInstanceDetails( + onDemandOnly bool) *ec2.Instance { for _, instance := range a.asgRawData.Instances { instanceData := a.region.instances[*instance.InstanceId] - if instanceData != nil && *instanceData.State.Name == "running" && - // this attribute is non-nil only for spot instances, where it contains - // the value "spot" - instanceData.InstanceLifecycle == nil { + // instance is running + if instanceData != nil && *instanceData.State.Name == "running" { + + // the InstanceLifecycle attribute is non-nil only for spot instances, + // where it contains the value "spot", if we're looking for on-demand + // instances only, then we have to skip the current instance. + if onDemandOnly && instanceData.InstanceLifecycle != nil { + continue + } return instanceData } } @@ -188,32 +188,57 @@ func (a *autoScalingGroup) havingReadyToAttachSpotInstance() (*string, bool) { // then we can launch a new spot instance if len(a.spotInstanceRequests) == 0 { logger.Println(a.name, "no spot bids were found") - if inst := a.findOndemandInstanceDetails(); inst != nil { + if inst := a.findInstanceDetails(true); inst != nil { logger.Println(a.name, "on-demand instances were found, proceeding to "+ "launch a replacement spot instance") return nil, false } + // Looks like we have no instances in the group, so we can stop here logger.Println(a.name, "no on-demand instances were found, nothing to do") return nil, true } logger.Println("spot bids were found, continuing") + // Here we search for open spot requests created for the current ASG, and try + // to wait for their instances to start. for _, req := range a.spotInstanceRequests { - if *req.State == "open" || *req.State == "failed" { - logger.Println(a.name, "Open or failed bids found, "+ - "waiting for the next run...") - return nil, true + if *req.State == "open" && *req.Tags[0].Value == a.name { + logger.Println(a.name, "Open bid found for current AutoScaling Group, "+ + "waiting for the instance to start so it can be tagged...") + + // Here we resume the wait for instances, initiated after requesting the + // spot instance. This may sometimes time out the entire lambda function + // run, just like it could time out the one done when we requested the + // new instance. In case of timeout the next run should continue waiting + // for the instance, and the process should continue until the new + // instance was found. In case of failed spot requests, the first lambda + // function timeout when waiting for the instances would break the loop, + // because the subsequent run would find a failed spot request instead + // of an open one. + a.waitForSpotInstance(req) + activeSpotInstanceRequest = req } + // We found a spot request with a running instance. if *req.State == "active" && *req.Status.Code == "fulfilled" { + logger.Println(a.name, "Active bid was found, with instance already "+ + "started:", *req.InstanceId) + + // If the instance is already in the group we don't need to do anything. if a.hasInstance(*req.InstanceId) { - logger.Println(a.name, "Active bid was found, with instance already "+ - "attached to the ASG, skipping...") + logger.Println(a.name, "Instance", *req.InstanceId, + "is already attached to the ASG, skipping...") continue + + // In case the instance wasn't yet attached, we prepare to attach it. } else { - if a.region.instances[*req.InstanceId].State != nil && + logger.Println(a.name, "Instance", *req.InstanceId, + "is not yet attached to the ASG, checking if it's running") + + if a.region.instances[*req.InstanceId] != nil && + a.region.instances[*req.InstanceId].State != nil && *a.region.instances[*req.InstanceId].State.Name == "running" { logger.Println(a.name, "Active bid was found, with running "+ "instances not yet attached to the ASG", @@ -221,35 +246,50 @@ func (a *autoScalingGroup) havingReadyToAttachSpotInstance() (*string, bool) { activeSpotInstanceRequest = req break } else { - logger.Println(a.name, "Active bid was found, with non-running "+ - "instances, waiting for the next run...") - return nil, true + logger.Println(a.name, "Active bid was found, with no running "+ + "instances, waiting for an instance to start ...") + a.waitForSpotInstance(req) + activeSpotInstanceRequest = req } } } } - // in this case we can launch a new spot instance if we can + // In case we don't have any active spot requests with instances in the + // process of starting or already ready to be attached to the group, we can + // launch a new spot instance. if activeSpotInstanceRequest == nil { logger.Println(a.name, "No active unfulfilled bid was found") return nil, false } - // Show information about the found unattached spot instance spotInstanceID := activeSpotInstanceRequest.InstanceId - logger.Println("Considering ", *spotInstanceID, - "for attaching to", a.name) + + logger.Println(a.name, "found instance", *spotInstanceID, "tagging it first") + + // Here we should have an unattached spot instance, trying to tag it with + // the EC2 tags set on the other instances already added to the autoscaling + // group. + a.region.tagInstance(spotInstanceID, a.filterInstanceTags()) + + logger.Println("Considering ", *spotInstanceID, "for attaching to", a.name) instData := a.region.instances[*spotInstanceID] gracePeriod := *a.asgRawData.HealthCheckGracePeriod + + if instData.LaunchTime == nil { + return nil, true + } + instanceUpTime := time.Now().Unix() - instData.LaunchTime.Unix() - // check if the spot instance is out of the grace period, so - // in that case we can replace an on-demand instance with it + // Check if the spot instance is out of the grace period, so in that case we + // can replace an on-demand instance with it if *instData.State.Name == "running" && instanceUpTime < gracePeriod { logger.Println("The new spot instance", *spotInstanceID, - "is still in the grace period, waiting for it to be ready...") + "is still in the grace period,", + "waiting for it to be ready before we can attach it to the group...") return nil, true } return spotInstanceID, false @@ -264,6 +304,43 @@ func (a *autoScalingGroup) hasInstance(instanceID string) bool { return false } +func (a *autoScalingGroup) waitForSpotInstance( + spotRequest *ec2.SpotInstanceRequest) *string { + + logger.Println(a.name, "Waiting for spot instance for", + spotRequest.SpotInstanceRequestId) + + ec2Client := a.region.services.ec2 + + // Keep trying until the instance was found. + for { + params := ec2.DescribeSpotInstanceRequestsInput{ + SpotInstanceRequestIds: []*string{spotRequest.SpotInstanceRequestId}, + } + + requestDetails, err := ec2Client.DescribeSpotInstanceRequests(¶ms) + if err != nil { + logger.Println(a.name, "Failed to describe spot instance requests") + } + + logger.Println(a.name, "Refreshed details for", + *spotRequest.SpotInstanceRequestId, + requestDetails, + "checking for a running instance") + + if len(requestDetails.SpotInstanceRequests) == 1 { + instanceID := requestDetails.SpotInstanceRequests[0].InstanceId + + if instanceID != nil { + return instanceID + } + } + + logger.Println(a.name, "Couldn't find instance, retrying in 5 seconds") + time.Sleep(5 * time.Second) + } +} + func (a *autoScalingGroup) hasEqualAvailibilityZones() bool { var azInstanceCount = make(map[string]int) @@ -306,7 +383,7 @@ func (a *autoScalingGroup) launchCheapestSpotInstance(azToLaunchIn *string) { logger.Println("Trying to launch spot instance in", *azToLaunchIn, "\nfirst finding an on-demand instance to use as a template") - baseInstance := a.findOndemandInstanceDetails() + baseInstance := a.findInstanceDetails(true) if baseInstance == nil { logger.Println("Found no on-demand instances, nothing to do here...") @@ -384,10 +461,29 @@ func (a *autoScalingGroup) bidForSpotInstance( return } - logger.Println(a.name, "Created spot instance request", - *resp.SpotInstanceRequests[0].SpotInstanceRequestId) + spotRequest := resp.SpotInstanceRequests[0] + spotRequestID := spotRequest.SpotInstanceRequestId + + logger.Println(a.name, "Created spot instance request", *spotRequestID) + + // tag the spot instance request to associate it with the current ASG, so we + // know where to attach the instance later. + a.tagSpotInstanceRequest(*spotRequestID) - a.tagSpotInstanceRequest(*resp.SpotInstanceRequests[0].SpotInstanceRequestId) + // Waiting for he instance to start so that we can then later tag it with + // the same tags originally set on the on-demand instances. + // + // This wait is only returns after the instance was found and it may be + // interrupted by the lambda function's timeout, so we also need to check in + // the next run if we have any open spot requests with no instances and + // resume the wait there. + spotInstanceID := a.waitForSpotInstance(spotRequest) + + if spotInstanceID != nil { + logger.Println(a.name, "found new spot instance", *spotInstanceID, + "\nTagging it to match the other instances from the group") + a.region.tagInstance(spotInstanceID, a.filterInstanceTags()) + } } func (a *autoScalingGroup) tagSpotInstanceRequest(requestID string) { @@ -406,7 +502,8 @@ func (a *autoScalingGroup) tagSpotInstanceRequest(requestID string) { if err != nil { // Print the error, cast err to awserr.Error to get the Code and // Message from an error. - logger.Println(a.name, "Failed to create tags for the spot instance request", + logger.Println(a.name, + "Failed to create tags for the spot instance request", err.Error()) return } @@ -503,13 +600,17 @@ func convertLaunchConfigurationToSpotSpecification( // Checks if the security groups are given by ID or by free-form names, which // was possible in EC2 Classic -func havingFreeFormSecurityGroupNames(lc *autoscaling.LaunchConfiguration) bool { +func havingFreeFormSecurityGroupNames( + lc *autoscaling.LaunchConfiguration) bool { + for _, sg := range lc.SecurityGroups { + if !strings.HasPrefix(*sg, "sg-") { logger.Println(*sg) return true } } + return false } @@ -569,8 +670,13 @@ func (a *autoScalingGroup) attachSpotInstance(spotInstanceID *string) { // Terminates an on-demand instance from the group, // but only after it was detached from the autoscaling group -func (a *autoScalingGroup) detachAndTerminateOnDemandInstance(instanceID *string) { - logger.Println(a.region.name, a.name, "Detaching and terminating instance:", *instanceID) +func (a *autoScalingGroup) detachAndTerminateOnDemandInstance( + instanceID *string) { + + logger.Println(a.region.name, + a.name, + "Detaching and terminating instance:", + *instanceID) // detach the on-demand instance detachParams := autoscaling.DetachInstancesInput{ @@ -602,7 +708,8 @@ func (a *autoScalingGroup) detachAndTerminateOnDemandInstance(instanceID *string } func (a *autoScalingGroup) getCheapestCompatibleSpotInstanceType( - availabilityZone string, baseInstance *ec2.Instance) *string { + availabilityZone string, + baseInstance *ec2.Instance) *string { logger.Println("Getting cheapest spot instance compatible to ", *baseInstance.InstanceId, " of type", *baseInstance.InstanceType) @@ -693,12 +800,17 @@ func (a *autoScalingGroup) getCompatibleSpotInstanceTypes( // checking how many spot instances of this type we already have, so that // we can see how risky it is to launch a new one. - spotInstanceCount := a.alreadyRunningSpotInstanceCount(inst.instanceType, availabilityZone) - - // We skip it in case we have more than 20% instances of this type already running - if spotInstanceCount == 0 || (*a.asgRawData.DesiredCapacity/spotInstanceCount > 4) { - logger.Println(a.name, "no redundancy issues found for", inst.instanceType, - "existing", spotInstanceCount, "spot instances, adding for comparison", + spotInstanceCount := a.alreadyRunningSpotInstanceCount( + inst.instanceType, availabilityZone) + + // We skip it in case we have more than 20% instances of this type already + // running + if spotInstanceCount == 0 || + (*a.asgRawData.DesiredCapacity/spotInstanceCount > 4) { + logger.Println(a.name, + "no redundancy issues found for", inst.instanceType, + "existing", spotInstanceCount, + "spot instances, adding for comparison", ) filteredInstanceTypes = append(filteredInstanceTypes, inst.instanceType) @@ -732,6 +844,7 @@ func compatibleVirtualization(virtualizationType string, return false } +// Counts the number of already running spot instances. func (a *autoScalingGroup) alreadyRunningSpotInstanceCount( instanceType, availabilityZone string) int64 { @@ -744,8 +857,8 @@ func (a *autoScalingGroup) alreadyRunningSpotInstanceCount( *instDetails.Placement.AvailabilityZone == availabilityZone && instDetails.InstanceLifecycle != nil && *instDetails.InstanceLifecycle == "spot" { - logger.Println(a.name, "Found running spot instance ", *instDetails.InstanceId, - "of the same type:", instanceType) + logger.Println(a.name, "Found running spot instance ", + *instDetails.InstanceId, "of the same type:", instanceType) count++ } } diff --git a/agent/autospotting/region.go b/agent/autospotting/region.go index d486e06e..3eba0bf0 100644 --- a/agent/autospotting/region.go +++ b/agent/autospotting/region.go @@ -241,20 +241,24 @@ func (r *region) scanInstances() { func (r *region) tagInstance(instanceID *string, tags []*ec2.Tag) { svc := r.services.ec2 - params := &ec2.CreateTagsInput{ + params := ec2.CreateTagsInput{ Resources: []*string{instanceID}, Tags: tags, } - _, err := svc.CreateTags(params) + logger.Println(r.name, "Tagging spot instance", *instanceID) + + for _, err := svc.CreateTags(¶ms); err != nil; _, err = svc.CreateTags(¶ms) { - if err != nil { - // Print the error, cast err to awserr.Error to get the Code and - // Message from an error. logger.Println(r.name, - "Failed to create tags for the spot instance request:", err.Error()) - return + "Failed to create tags for the spot instance", *instanceID, err.Error()) + + logger.Println(r.name, + "Sleeping for 5 seconds before retrying") + + time.Sleep(5 * time.Second) } - logger.Println("Instance", *instanceID, "was tagged with the following tags:", tags) + logger.Println("Instance", *instanceID, + "was tagged with the following tags:", tags) }