Skip to content

Commit

Permalink
Merged pull request influxdata#1622 from adityacs/master
Browse files Browse the repository at this point in the history
Add support for AWS EC2 autoscaling services.
  • Loading branch information
nathanielc committed Nov 28, 2017
2 parents a1a60f2 + 7bc1ac1 commit eac21fe
Show file tree
Hide file tree
Showing 22 changed files with 14,873 additions and 42 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,10 @@

## unreleased

### Features

- [#1622](https://github.com/influxdata/kapacitor/pull/1622): Add support for AWS EC2 autoscaling services.

### Bugfixes

- [#1250](https://github.com/influxdata/kapacitor/issues/1250): Fix VictorOps "data" field being a string instead of actual JSON.
Expand Down
94 changes: 94 additions & 0 deletions autoscale.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"github.com/influxdata/kapacitor/expvar"
"github.com/influxdata/kapacitor/models"
"github.com/influxdata/kapacitor/pipeline"
ec2 "github.com/influxdata/kapacitor/services/ec2/client"
k8s "github.com/influxdata/kapacitor/services/k8s/client"
swarm "github.com/influxdata/kapacitor/services/swarm/client"
"github.com/influxdata/kapacitor/tick/ast"
Expand Down Expand Up @@ -536,3 +537,96 @@ func (a *swarmAutoscaler) SetResourceIDOnTags(id resourceID, tags models.Tags) {
tags[a.outputServiceNameTag] = id.ID()
}
}

/////////////////////////////////////////////
// EC2 implementation of Autoscaler

type ec2Autoscaler struct {
client ec2.Client

groupName string
groupNameTag string
outputGroupNameTag string
}

func newEc2AutoscaleNode(et *ExecutingTask, n *pipeline.Ec2AutoscaleNode, d NodeDiagnostic) (*AutoscaleNode, error) {
client, err := et.tm.EC2Service.Client(n.Cluster)
if err != nil {
return nil, fmt.Errorf("cannot use the EC2Autoscale node, could not create ec2 client: %v", err)
}
outputGroupNameTag := n.OutputGroupNameTag
if outputGroupNameTag == "" {
outputGroupNameTag = n.GroupNameTag
}
a := &ec2Autoscaler{
client: client,

groupName: n.GroupName,
groupNameTag: n.GroupNameTag,
outputGroupNameTag: outputGroupNameTag,
}
return newAutoscaleNode(
et,
d,
n,
a,
int(n.Min),
int(n.Max),
n.IncreaseCooldown,
n.DecreaseCooldown,
n.CurrentField,
n.Replicas,
)
}

type ec2ResourceID string

func (id ec2ResourceID) ID() string {
return string(id)
}

func (a *ec2Autoscaler) ResourceIDFromTags(tags models.Tags) (resourceID, error) {
// Get the name of the resource
var name string
switch {
case a.groupName != "":
name = a.groupName
case a.groupNameTag != "":
t, ok := tags[a.groupNameTag]
if ok {
name = t
}
default:
return nil, errors.New("expected one of GroupName or GroupNameTag to be set")
}
if name == "" {
return nil, errors.New("could not determine the name of the resource")
}
return swarmResourceID(name), nil
}

func (a *ec2Autoscaler) Replicas(id resourceID) (int, error) {
sid := id.ID()
group, err := a.client.Group(sid)
if err != nil {
return 0, errors.Wrapf(err, "failed to get ec2 autoscaleGroup for %q", id)
}
var desiredcapacity int64
for _, resp := range group.AutoScalingGroups {
desiredcapacity = *resp.DesiredCapacity
}
return int(desiredcapacity), nil

}

func (a *ec2Autoscaler) SetReplicas(id resourceID, replicas int) error {
sid := id.ID()

return a.client.UpdateGroup(sid, int64(replicas))
}

func (a *ec2Autoscaler) SetResourceIDOnTags(id resourceID, tags models.Tags) {
if a.outputGroupNameTag != "" {
tags[a.outputGroupNameTag] = id.ID()
}
}
135 changes: 135 additions & 0 deletions pipeline/ec2_autoscale.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
package pipeline

import (
"errors"
"fmt"
"time"

"github.com/influxdata/kapacitor/tick/ast"
)

// EC2AutoscaleNode triggers autoscale events for a group on a AWS Autoscaling group.
// The node also outputs points for the triggered events.
//
// Example:
// // Target 80% cpu per ec2 instance
// var target = 80.0
// var min = 1
// var max = 10
// var period = 5m
// var every = period
// stream
// |from()
// .measurement('cpu')
// .groupBy('host_name','group_name')
// .where(lambda: "cpu" == 'cpu-total')
// |eval(lambda: 100.0 - "usage_idle")
// .as('usage_percent')
// |window()
// .period(period)
// .every(every)
// |mean('usage_percent')
// .as('mean_cpu')
// |groupBy('group_name')
// |sum('mean_cpu')
// .as('total_cpu')
// |ec2Autoscale()
// // Get the group name of the VM(EC2 instance) from "group_name" tag.
// .groupNameTag('group_name')
// .min(min)
// .max(max)
// // Set the desired number of replicas based on target.
// .replicas(lambda: int(ceil("total_cpu" / target)))
// |influxDBOut()
// .database('deployments')
// .measurement('scale_events')
// .precision('s')
//
//
// The above example computes the mean of cpu usage_percent by host_name name and group_name
// Then sum of mean cpu_usage is calculated as total_cpu.
// Using the total_cpu over the last time period a desired number of replicas is computed
// based on the target percentage usage of cpu.
//
// If the desired number of replicas has changed, Kapacitor makes the appropriate API call to AWS autoscaling group
// to update the replicas spec.
//
// Any time the Ec2Autoscale node changes a replica count, it emits a point.
// The point is tagged with the group name,
// using the groupName respectively
// In addition the group by tags will be preserved on the emitted point.
// The point contains two fields: `old`, and `new` representing change in the replicas.
//
// Available Statistics:
//
// * increase_events -- number of times the replica count was increased.
// * decrease_events -- number of times the replica count was decreased.
// * cooldown_drops -- number of times an event was dropped because of a cooldown timer.
// * errors -- number of errors encountered, typically related to communicating with the AWS autoscaling API.
//
type Ec2AutoscaleNode struct {
chainnode

// Cluster is the ID of ec2 autoscale group to use.
// The ID of the cluster is specified in the kapacitor configuration.
Cluster string

// GroupName is the name of the autoscaling group to autoscale.
GroupName string
// GroupName is the name of a tag which contains the name of the autoscaling group to autoscale.
GroupNameTag string
// OutputGroupName is the name of a tag into which the group name will be written for output autoscale events.
// Defaults to the value of GroupNameTag if its not empty.
OutputGroupNameTag string

// CurrentField is the name of a field into which the current replica count will be set as an int.
// If empty no field will be set.
// Useful for computing deltas on the current state.
//
// Example:
// |ec2Autoscale()
// .currentField('replicas')
// // Increase the replicas by 1 if the qps is over the threshold
// .replicas(lambda: if("qps" > threshold, "replicas" + 1, "replicas"))
//
CurrentField string

// The maximum scale factor to set.
// If 0 then there is no upper limit.
// Default: 0, a.k.a no limit.
Max int64

// The minimum scale factor to set.
// Default: 1
Min int64

// Replicas is a lambda expression that should evaluate to the desired number of replicas for the resource.
Replicas *ast.LambdaNode

// Only one increase event can be triggered per resource every IncreaseCooldown interval.
IncreaseCooldown time.Duration
// Only one decrease event can be triggered per resource every DecreaseCooldown interval.
DecreaseCooldown time.Duration
}

func newEc2AutoscaleNode(e EdgeType) *Ec2AutoscaleNode {
k := &Ec2AutoscaleNode{
chainnode: newBasicChainNode("ec2_autoscale", e, StreamEdge),
Min: 1,
}
return k
}

func (n *Ec2AutoscaleNode) validate() error {
if (n.GroupName == "" && n.GroupNameTag == "") ||
(n.GroupName != "" && n.GroupNameTag != "") {
return fmt.Errorf("must specify exactly one of GroupName or GroupNameTag")
}
if n.Min < 1 {
return fmt.Errorf("min must be >= 1, got %d", n.Min)
}
if n.Replicas == nil {
return errors.New("must provide a replicas lambda expression")
}
return nil
}
7 changes: 7 additions & 0 deletions pipeline/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -475,6 +475,13 @@ func (n *chainnode) SwarmAutoscale() *SwarmAutoscaleNode {
return k
}

// Create a node that can trigger autoscale events for a ec2 autoscalegroup.
func (n *chainnode) Ec2Autoscale() *Ec2AutoscaleNode {
k := newEc2AutoscaleNode(n.Provides())
n.linkChild(k)
return k
}

// Create a node that tracks duration in a given state.
func (n *chainnode) StateDuration(expression *ast.LambdaNode) *StateDurationNode {
sd := newStateDurationNode(n.provides, expression)
Expand Down
2 changes: 2 additions & 0 deletions pipeline/tick/ast.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,8 @@ func (a *AST) Create(n pipeline.Node, parents []ast.Node) (ast.Node, error) {
return NewDelete(parents).Build(node)
case *pipeline.DerivativeNode:
return NewDerivative(parents).Build(node)
case *pipeline.Ec2AutoscaleNode:
return NewEc2Autoscale(parents).Build(node)
case *pipeline.EvalNode:
return NewEval(parents).Build(node)
case *pipeline.FlattenNode:
Expand Down
37 changes: 37 additions & 0 deletions pipeline/tick/ec2_autoscale.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
package tick

import (
"github.com/influxdata/kapacitor/pipeline"
"github.com/influxdata/kapacitor/tick/ast"
)

// Ec2AutoscaleNode converts the ec2 autoscaling pipeline node into the TICKScript AST
type Ec2AutoscaleNode struct {
Function
}

// NewEc2Autoscale creates a Ec2Autoscale function builder
func NewEc2Autoscale(parents []ast.Node) *Ec2AutoscaleNode {
return &Ec2AutoscaleNode{
Function{
Parents: parents,
},
}
}

// Build creates a Ec2Autoscale ast.Node
func (n *Ec2AutoscaleNode) Build(s *pipeline.Ec2AutoscaleNode) (ast.Node, error) {
n.Pipe("ec2Autoscale").
Dot("cluster", s.Cluster).
Dot("groupName", s.GroupNameTag).
Dot("groupNameTag", s.GroupNameTag).
Dot("outputGroupNameTag", s.OutputGroupNameTag).
Dot("currentField", s.CurrentField).
Dot("max", s.Max).
Dot("min", s.Min).
Dot("replicas", s.Replicas).
Dot("increaseCooldown", s.IncreaseCooldown).
Dot("decreaseCooldown", s.DecreaseCooldown)

return n.prev, n.err
}
Loading

0 comments on commit eac21fe

Please sign in to comment.