Skip to content

Commit

Permalink
New API endpoint for dynamically overriding sections of the config (i…
Browse files Browse the repository at this point in the history
…nfluxdata#932)

* Initial work to add config-override service

* add deps

* git subrepo clone https://github.com/mitchellh/reflectwalk.git vendor/github.com/mitchellh/reflectwalk

subrepo:
  subdir:   "vendor/github.com/mitchellh/reflectwalk"
  merged:   "0c9480f"
upstream:
  origin:   "https://github.com/mitchellh/reflectwalk.git"
  branch:   "master"
  commit:   "0c9480f"
git-subrepo:
  version:  "0.3.0"
  origin:   "???"
  commit:   "???"

* git subrepo clone https://github.com/mitchellh/copystructure.git vendor/github.com/mitchellh/copystructure

subrepo:
  subdir:   "vendor/github.com/mitchellh/copystructure"
  merged:   "ad4c8fe"
upstream:
  origin:   "https://github.com/mitchellh/copystructure.git"
  branch:   "master"
  commit:   "ad4c8fe"
git-subrepo:
  version:  "0.3.0"
  origin:   "???"
  commit:   "???"

* Add config-override service
Update services to be dynamically updated
  • Loading branch information
Nathaniel Cook authored Oct 24, 2016
1 parent 61095ca commit dca5d71
Show file tree
Hide file tree
Showing 88 changed files with 11,138 additions and 1,407 deletions.
34 changes: 29 additions & 5 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,41 @@

### Release Notes

### Features
New K8sAutoscale node that allows you to auotmatically scale Kubernetes deployments driven by any metrics Kapacitor consumes.
For example, to scale a deployment `myapp` based off requests per second:

### Bugfixes
```
// The target requests per second per host
var target = 100.0
stream
|from()
.measurement('requests')
.where(lambda: "deployment" == 'myapp')
// Compute the moving average of the last 5 minutes
|movingAverage('requests', 5*60)
.as('mean_requests_per_second')
|k8sAutoscale()
.resourceName('app')
.kind('deployments')
.min(4)
.max(100)
// Compute the desired number of replicas based on target.
.replicas(lambda: int(ceil("mean_requests_per_second" / target)))
```


New API endpoints have been added to be able to configure InfluxDB clusters and alert handlers dynamically without needing to restart the Kapacitor daemon.
See the API docs for more details.

### Features

- [#980](https://github.com/influxdata/kapacitor/pull/980): Upgrade to using go 1.7
- [#931](https://github.com/influxdata/kapacitor/issues/931): Add a Kubernetes autoscaler node. You can now autoscale your Kubernetes deployments via Kapacitor.
- [#928](https://github.com/influxdata/kapacitor/issues/928): Add new API endpoint for dynamically overriding sections of the configuration.
- [#980](https://github.com/influxdata/kapacitor/pull/980): Upgrade to using go 1.7

### Bugfixes


## v1.0.2 [2016-10-06]

### Release Notes
Expand Down Expand Up @@ -454,7 +479,6 @@ If you have existing tasks which do not match this pattern they should continue

### Features


### Bugfixes

- [#545](https://github.com/influxdata/kapacitor/issues/545): Fixes inconsistancy with API docs for creating a task.
Expand Down
4 changes: 3 additions & 1 deletion LICENSE_OF_DEPENDENCIES.md
Original file line number Diff line number Diff line change
@@ -1,15 +1,17 @@
Dependencies
============

* github.com/boltdb/bolt [MIT](https://github.com/boltdb/bolt/blob/master/LICENSE)
* github.com/BurntSushi/toml [WTFPL](https://github.com/BurntSushi/toml/blob/master/COPYING)
* github.com/boltdb/bolt [MIT](https://github.com/boltdb/bolt/blob/master/LICENSE)
* github.com/cenkalti/backoff [MIT](https://github.com/cenkalti/backoff/blob/master/LICENSE)
* github.com/dgrijalva/jwt-go [MIT](https://github.com/dgrijalva/jwt-go/blob/master/LICENSE)
* github.com/dustin/go-humanize [MIT](https://github.com/dustin/go-humanize/blob/master/LICENSE)
* github.com/golang/protobuf [BSD](https://github.com/golang/protobuf/blob/master/LICENSE)
* github.com/gorhill/cronexpr [APLv2](https://github.com/gorhill/cronexpr/blob/master/APLv2)
* github.com/kimor79/gollectd [BSD](https://github.com/kimor79/gollectd/blob/master/LICENSE)
* github.com/mattn/go-runewidth [MIT](https://github.com/mattn/go-runewidth/blob/master/README.mkd)
* github.com/mitchellh/copystructure[MIT](https://github.com/mitchellh/copystructure/blob/master/LICENSE)
* github.com/mitchellh/reflectwalk [MIT](https://github.com/mitchellh/reflectwalk/blob/master/LICENSE)
* github.com/pkg/errors [BSD](https://github.com/pkg/errors/blob/master/LICENSE)
* github.com/russross/blackfriday [BSD](https://github.com/russross/blackfriday/blob/master/LICENSE.txt)
* github.com/serenize/snaker [MIT](https://github.com/serenize/snaker/blob/master/LICENSE.txt)
Expand Down
48 changes: 2 additions & 46 deletions alert.go
Original file line number Diff line number Diff line change
Expand Up @@ -950,13 +950,8 @@ func (a *AlertNode) handleTcp(tcp *pipeline.TcpHandler, ad *AlertData) {
}

func (a *AlertNode) handleEmail(email *pipeline.EmailHandler, ad *AlertData) {
if a.et.tm.SMTPService != nil {
err := a.et.tm.SMTPService.SendMail(email.ToList, ad.Message, ad.Details)
if err != nil {
a.logger.Println("E!", err)
}
} else {
a.logger.Println("E! smtp service not enabled, cannot send email.")
if err := a.et.tm.SMTPService.SendMail(email.ToList, ad.Message, ad.Details); err != nil {
a.logger.Println("E! failed to send email:", err)
}
}

Expand Down Expand Up @@ -1001,10 +996,6 @@ func (a *AlertNode) handleLog(l *pipeline.LogHandler, ad *AlertData) {
}

func (a *AlertNode) handleVictorOps(vo *pipeline.VictorOpsHandler, ad *AlertData) {
if a.et.tm.VictorOpsService == nil {
a.logger.Println("E! failed to send VictorOps alert. VictorOps is not enabled")
return
}
var messageType string
switch ad.Level {
case OKAlert:
Expand All @@ -1027,10 +1018,6 @@ func (a *AlertNode) handleVictorOps(vo *pipeline.VictorOpsHandler, ad *AlertData
}

func (a *AlertNode) handlePagerDuty(pd *pipeline.PagerDutyHandler, ad *AlertData) {
if a.et.tm.PagerDutyService == nil {
a.logger.Println("E! failed to send PagerDuty alert. PagerDuty is not enabled")
return
}
err := a.et.tm.PagerDutyService.Alert(
pd.ServiceKey,
ad.ID,
Expand All @@ -1045,11 +1032,6 @@ func (a *AlertNode) handlePagerDuty(pd *pipeline.PagerDutyHandler, ad *AlertData
}

func (a *AlertNode) handleSensu(sensu *pipeline.SensuHandler, ad *AlertData) {
if a.et.tm.SensuService == nil {
a.logger.Println("E! failed to send Sensu message. Sensu is not enabled")
return
}

err := a.et.tm.SensuService.Alert(
ad.ID,
ad.Message,
Expand All @@ -1062,10 +1044,6 @@ func (a *AlertNode) handleSensu(sensu *pipeline.SensuHandler, ad *AlertData) {
}

func (a *AlertNode) handleSlack(slack *pipeline.SlackHandler, ad *AlertData) {
if a.et.tm.SlackService == nil {
a.logger.Println("E! failed to send Slack message. Slack is not enabled")
return
}
err := a.et.tm.SlackService.Alert(
slack.Channel,
ad.Message,
Expand All @@ -1078,10 +1056,6 @@ func (a *AlertNode) handleSlack(slack *pipeline.SlackHandler, ad *AlertData) {
}

func (a *AlertNode) handleTelegram(telegram *pipeline.TelegramHandler, ad *AlertData) {
if a.et.tm.TelegramService == nil {
a.logger.Println("E! failed to send Telegram message. Telegram is not enabled")
return
}
err := a.et.tm.TelegramService.Alert(
telegram.ChatId,
telegram.ParseMode,
Expand All @@ -1096,10 +1070,6 @@ func (a *AlertNode) handleTelegram(telegram *pipeline.TelegramHandler, ad *Alert
}

func (a *AlertNode) handleHipChat(hipchat *pipeline.HipChatHandler, ad *AlertData) {
if a.et.tm.HipChatService == nil {
a.logger.Println("E! failed to send HipChat message. HipChat is not enabled")
return
}
err := a.et.tm.HipChatService.Alert(
hipchat.Room,
hipchat.Token,
Expand All @@ -1123,11 +1093,6 @@ type alertaHandler struct {
}

func (a *AlertNode) handleAlerta(alerta alertaHandler, ad *AlertData) {
if a.et.tm.AlertaService == nil {
a.logger.Println("E! failed to send Alerta message. Alerta is not enabled")
return
}

var severity string

switch ad.Level {
Expand Down Expand Up @@ -1215,10 +1180,6 @@ func (a *AlertNode) handleAlerta(alerta alertaHandler, ad *AlertData) {
}

func (a *AlertNode) handleOpsGenie(og *pipeline.OpsGenieHandler, ad *AlertData) {
if a.et.tm.OpsGenieService == nil {
a.logger.Println("E! failed to send OpsGenie alert. OpsGenie is not enabled")
return
}
var messageType string
switch ad.Level {
case OKAlert:
Expand All @@ -1243,11 +1204,6 @@ func (a *AlertNode) handleOpsGenie(og *pipeline.OpsGenieHandler, ad *AlertData)
}

func (a *AlertNode) handleTalk(talk *pipeline.TalkHandler, ad *AlertData) {
if a.et.tm.TalkService == nil {
a.logger.Println("E! failed to send Talk message. Talk is not enabled")
return
}

err := a.et.tm.TalkService.Alert(
ad.ID,
ad.Message,
Expand Down
47 changes: 10 additions & 37 deletions batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package kapacitor

import (
"bytes"
"errors"
"fmt"
"log"
"sync"
Expand All @@ -14,11 +13,11 @@ import (
"github.com/influxdata/kapacitor/influxdb"
"github.com/influxdata/kapacitor/models"
"github.com/influxdata/kapacitor/pipeline"
"github.com/pkg/errors"
)

const (
statsQueryErrors = "query_errors"
statsConnectErrors = "connect_errors"
statsBatchesQueried = "batches_queried"
statsPointsQueried = "points_queried"
)
Expand Down Expand Up @@ -139,7 +138,6 @@ type QueryNode struct {
aborting chan struct{}

queryErrors *expvar.Int
connectErrors *expvar.Int
batchesQueried *expvar.Int
pointsQueried *expvar.Int
byName bool
Expand Down Expand Up @@ -265,20 +263,21 @@ func (b *QueryNode) Queries(start, stop time.Time) ([]*Query, error) {
func (b *QueryNode) doQuery() error {
defer b.ins[0].Close()
b.queryErrors = &expvar.Int{}
b.connectErrors = &expvar.Int{}
b.batchesQueried = &expvar.Int{}
b.pointsQueried = &expvar.Int{}

b.statMap.Set(statsQueryErrors, b.queryErrors)
b.statMap.Set(statsConnectErrors, b.connectErrors)
b.statMap.Set(statsBatchesQueried, b.batchesQueried)
b.statMap.Set(statsPointsQueried, b.pointsQueried)

if b.et.tm.InfluxDBService == nil {
return errors.New("InfluxDB not configured, cannot query InfluxDB for batch query")
}

var con influxdb.Client
con, err := b.et.tm.InfluxDBService.NewNamedClient(b.b.Cluster)
if err != nil {
return errors.Wrap(err, "failed to get InfluxDB client")
}
tickC := b.ticker.Start()
for {
select {
Expand All @@ -288,48 +287,22 @@ func (b *QueryNode) doQuery() error {
return errors.New("batch doQuery aborted")
case now := <-tickC:
b.timer.Start()

// Update times for query
stop := now.Add(-1 * b.b.Offset)
b.query.SetStartTime(stop.Add(-1 * b.b.Period))
b.query.SetStopTime(stop)

b.logger.Println("D! starting next batch query:", b.query.String())
qStr := b.query.String()
b.logger.Println("D! starting next batch query:", qStr)

var err error
if con == nil {
if b.b.Cluster != "" {
con, err = b.et.tm.InfluxDBService.NewNamedClient(b.b.Cluster)
} else {
con, err = b.et.tm.InfluxDBService.NewDefaultClient()
}
if err != nil {
b.logger.Println("E! failed to connect to InfluxDB:", err)
b.connectErrors.Add(1)
// Ensure connection is nil
con = nil
b.timer.Stop()
break
}
}
// Execute query
q := influxdb.Query{
Command: b.query.String(),
Command: qStr,
}

// Execute query
resp, err := con.Query(q)
if err != nil {
b.logger.Println("E! query failed:", err)
b.queryErrors.Add(1)
// Get a new connection
con = nil
b.timer.Stop()
break
}

if err := resp.Error(); err != nil {
b.logger.Println("E! query returned error response:", err)
b.queryErrors.Add(1)
b.logger.Println("E!", err)
b.timer.Stop()
break
}
Expand Down
4 changes: 2 additions & 2 deletions build.py
Original file line number Diff line number Diff line change
Expand Up @@ -287,10 +287,10 @@ def run(command, allow_failure=False, shell=False, printOutput=False):
out = out.decode('utf-8').strip()
if p.returncode != 0:
if allow_failure:
logging.warn("Command '{}' failed with error: {}".format(command, out))
logging.warn(u"Command '{}' failed with error: {}".format(command, out))
return None
else:
logging.error("Command '{}' failed with error: {}".format(command, out))
logging.error(u"Command '{}' failed with error: {}".format(command, out))
sys.exit(1)
except OSError as e:
if allow_failure:
Expand Down
Loading

0 comments on commit dca5d71

Please sign in to comment.