Skip to content

Commit

Permalink
refined the instance replacement logic
Browse files Browse the repository at this point in the history
Change-Id: I85a064d0071e9cc025842bf1f7d9afb334ef1643
  • Loading branch information
cristim committed Jun 22, 2016
1 parent 162449c commit 584c077
Show file tree
Hide file tree
Showing 2 changed files with 94 additions and 63 deletions.
153 changes: 92 additions & 61 deletions agent/autospotting/autoscaling.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,17 +45,21 @@ func (a *autoScalingGroup) process() {
logger.Println(a.region, "Attaching spot instance",
*spotInstanceID, "to", a.name)

a.replaceOnDemandInstanceWithSpot(*spotInstanceID)
a.replaceOnDemandInstanceWithSpot(spotInstanceID)
} else {
azToLaunchSpotIn := a.biggestOnDemandAvailablityZone()
// find any given on-demand instance and try to replace it with a spot one
onDemandInstance := a.findOndemandInstance()

if azToLaunchSpotIn == nil {
if onDemandInstance == nil {
logger.Println(a.region.name, a.name,
"No AZ can be used for launching new instances, nothing to do here...")
} else {
logger.Println(a.region.name, a.name,
"Would launch a spot instance in ", *azToLaunchSpotIn)
"No running on-demand instances were found, nothing to do here...")
return
}

azToLaunchSpotIn := onDemandInstance.Placement.AvailabilityZone
logger.Println(a.region.name, a.name,
"Would launch a spot instance in ", *azToLaunchSpotIn)

a.launchCheapestSpotInstance(azToLaunchSpotIn)
}
}
Expand All @@ -75,30 +79,42 @@ func (a *autoScalingGroup) filterInstanceTags() []*ec2.Tag {
}

func (a *autoScalingGroup) replaceOnDemandInstanceWithSpot(
spotInstanceID string) {

a.region.tagInstance(spotInstanceID, a.filterInstanceTags())
spotInstanceID *string) {

asg := a.asgRawData

minSize, maxSize := *asg.MinSize, *asg.MaxSize
desiredCapacity := *asg.DesiredCapacity

// temporarily increase AutoScaling group in case it's of fixed size
// Tag the Spot instance with the tags of any of the existing on-demand
// instances. The tags first need to be filtered from some invalid values
// which can't just be duplicated.
a.region.tagInstance(spotInstanceID, a.filterInstanceTags())

// temporarily increase AutoScaling group in case it's of static size
if minSize == maxSize {
a.setAutoScalingMaxSize(maxSize + 1)
defer a.setAutoScalingMaxSize(maxSize)
}

// revert attach/detach order when running on minimum capacity
if desiredCapacity == minSize {
a.attachSpotInstance(spotInstanceID)
} else {
defer a.attachSpotInstance(spotInstanceID)
}
// get the details of our spot instance so we can see its AZ
if spotInst := a.findInstanceByID(spotInstanceID); spotInst != nil {

az := spotInst.Placement.AvailabilityZone

a.detachAndTerminateOnDemandInstance()
// find an on-demand instance from the same AZ as our spot instance
if odInst := a.findOndemandInstanceInAZ(az); odInst != nil {

// revert attach/detach order when running on minimum capacity
if desiredCapacity == minSize {
a.attachSpotInstance(spotInstanceID)
} else {
defer a.attachSpotInstance(spotInstanceID)
}

a.detachAndTerminateOnDemandInstance(odInst.InstanceId)
}
}
}

func (a *autoScalingGroup) getInstanceTags() []*ec2.Tag {
Expand All @@ -109,22 +125,49 @@ func (a *autoScalingGroup) getInstanceTags() []*ec2.Tag {
return nil
}

// returns the first instance we could find
// Returns the details of the first instance we could find.
func (a *autoScalingGroup) findInstance() *ec2.Instance {

for _, instance := range a.asgRawData.Instances {
return a.region.instances[*instance.InstanceId]
}
return nil
}

// Returns the detailed information about an instance.
func (a *autoScalingGroup) findInstanceByID(instanceID *string) *ec2.Instance {
return a.region.instances[*instanceID]
}

func (a *autoScalingGroup) findOndemandInstance() *ec2.Instance {

for _, instance := range a.asgRawData.Instances {
instanceData := a.region.instances[*instance.InstanceId]
// this attribute is non-nil only for spot instances, where it contains
// the value "spot"
if instanceData != nil && instanceData.InstanceLifecycle == nil {

// return the first found on-demand running instance
if instanceData != nil &&
instanceData.State.String() == "running" &&
// this attribute is non-nil only for spot instances, where it contains
// the value "spot"
instanceData.InstanceLifecycle == nil {

return instanceData
}
}
return nil
}

func (a *autoScalingGroup) findOndemandInstanceInAZ(az *string) *ec2.Instance {

for _, instance := range a.asgRawData.Instances {
instanceData := a.region.instances[*instance.InstanceId]

// return the first found on-demand running instance
if instanceData != nil &&
instanceData.Placement.AvailabilityZone == az &&
instanceData.State.String() == "running" &&
// this attribute is non-nil only for spot instances, where it contains
// the value "spot"
instanceData.InstanceLifecycle == nil {

return instanceData
}
Expand Down Expand Up @@ -271,6 +314,7 @@ func (a *autoScalingGroup) smallestAvailablityZone() *string {
smallestAZ, min = k, v
}
}

logger.Println("Smallest AZ is ", smallestAZ)
return &smallestAZ
}
Expand Down Expand Up @@ -572,14 +616,14 @@ func copyBlockDeviceMappings(
return ec2BDMlist
}

func (a *autoScalingGroup) attachSpotInstance(spotInstanceID string) {
func (a *autoScalingGroup) attachSpotInstance(spotInstanceID *string) {

svc := a.region.services.autoScaling

params := autoscaling.AttachInstancesInput{
AutoScalingGroupName: aws.String(a.name),
InstanceIds: []*string{
&spotInstanceID,
spotInstanceID,
},
}

Expand All @@ -595,49 +639,36 @@ func (a *autoScalingGroup) attachSpotInstance(spotInstanceID string) {

// Terminates an on-demand instance from the group,
// but only after it was detached from the autoscaling group
func (a *autoScalingGroup) detachAndTerminateOnDemandInstance() {

for _, inst := range a.asgRawData.Instances {

instDetails := a.region.instances[*inst.InstanceId]
func (a *autoScalingGroup) detachAndTerminateOnDemandInstance(instanceID *string) {
logger.Println(a.region.name, a.name, "Detaching and terminating instance:", *instanceID)

// skip spot instances
if instDetails.InstanceLifecycle != nil &&
*instDetails.InstanceLifecycle == "spot" {
continue
}

detachParams := autoscaling.DetachInstancesInput{
AutoScalingGroupName: aws.String(a.name),
InstanceIds: []*string{
inst.InstanceId,
},
ShouldDecrementDesiredCapacity: aws.Bool(true),
}
// detach the on-demand instance
detachParams := autoscaling.DetachInstancesInput{
AutoScalingGroupName: aws.String(a.name),
InstanceIds: []*string{
instanceID,
},
ShouldDecrementDesiredCapacity: aws.Bool(true),
}

asSvc := a.region.services.autoScaling
asResp, err := asSvc.DetachInstances(&detachParams)
logger.Println(asResp)
if err != nil {
logger.Println(err.Error())
}
asSvc := a.region.services.autoScaling

ec2Svc := a.region.services.ec2
if _, err := asSvc.DetachInstances(&detachParams); err != nil {
logger.Println(err.Error())
}

termParams := ec2.TerminateInstancesInput{
InstanceIds: []*string{
inst.InstanceId,
},
}
// then terminate it
ec2Svc := a.region.services.ec2

ec2Resp, err := ec2Svc.TerminateInstances(&termParams)
logger.Println(ec2Resp)
if err != nil {
logger.Println(err.Error())
}
return // we can exit after terminating a single instance from that AZ
termParams := ec2.TerminateInstancesInput{
InstanceIds: []*string{
instanceID,
},
}

if _, err := ec2Svc.TerminateInstances(&termParams); err != nil {
logger.Println(err.Error())
}
}

func (a *autoScalingGroup) getCheapestCompatibleSpotInstanceType(
Expand Down
4 changes: 2 additions & 2 deletions agent/autospotting/region.go
Original file line number Diff line number Diff line change
Expand Up @@ -229,10 +229,10 @@ func (r *region) scanInstances() {
// logger.Println(r.instances)
}

func (r *region) tagInstance(instanceID string, tags []*ec2.Tag) {
func (r *region) tagInstance(instanceID *string, tags []*ec2.Tag) {
svc := r.services.ec2
params := &ec2.CreateTagsInput{
Resources: []*string{aws.String(instanceID)},
Resources: []*string{instanceID},
Tags: tags,
}

Expand Down

0 comments on commit 584c077

Please sign in to comment.