Skip to content

Commit

Permalink
Merge pull request influxdata#1535 from influxdata/md-log-v2
Browse files Browse the repository at this point in the history
WIP: Second attempt at logging refactor
  • Loading branch information
desa authored Sep 18, 2017
2 parents d36f371 + 2e65d67 commit 5d228b2
Show file tree
Hide file tree
Showing 109 changed files with 4,168 additions and 1,432 deletions.
86 changes: 42 additions & 44 deletions alert.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"encoding/json"
"fmt"
html "html/template"
"log"
"os"
"sync"
text "text/template"
Expand All @@ -14,6 +13,7 @@ import (
"github.com/influxdata/kapacitor/alert"
"github.com/influxdata/kapacitor/edge"
"github.com/influxdata/kapacitor/expvar"
"github.com/influxdata/kapacitor/keyvalue"
"github.com/influxdata/kapacitor/models"
"github.com/influxdata/kapacitor/pipeline"
alertservice "github.com/influxdata/kapacitor/services/alert"
Expand Down Expand Up @@ -75,9 +75,13 @@ type AlertNode struct {
}

// Create a new AlertNode which caches the most recent item and exposes it over the HTTP API.
func newAlertNode(et *ExecutingTask, n *pipeline.AlertNode, l *log.Logger) (an *AlertNode, err error) {
func newAlertNode(et *ExecutingTask, n *pipeline.AlertNode, d NodeDiagnostic) (an *AlertNode, err error) {
ctx := []keyvalue.T{
keyvalue.KV("task", et.Task.ID),
}

an = &AlertNode{
node: node{Node: n, et: et, logger: l},
node: node{Node: n, et: et, diag: d},
a: n,
}
an.node.runF = an.runAlert
Expand Down Expand Up @@ -126,20 +130,20 @@ func newAlertNode(et *ExecutingTask, n *pipeline.AlertNode, l *log.Logger) (an *
c := alertservice.TCPHandlerConfig{
Address: tcp.Address,
}
h := alertservice.NewTCPHandler(c, l)
h := alertservice.NewTCPHandler(c, an.diag)
an.handlers = append(an.handlers, h)
}

for _, email := range n.EmailHandlers {
c := smtp.HandlerConfig{
To: email.ToList,
}
h := et.tm.SMTPService.Handler(c, l)
h := et.tm.SMTPService.Handler(c, ctx...)
an.handlers = append(an.handlers, h)
}
if len(n.EmailHandlers) == 0 && (et.tm.SMTPService != nil && et.tm.SMTPService.Global()) {
c := smtp.HandlerConfig{}
h := et.tm.SMTPService.Handler(c, l)
h := et.tm.SMTPService.Handler(c, ctx...)
an.handlers = append(an.handlers, h)
}
// If email has been configured with state changes only set it.
Expand All @@ -155,7 +159,7 @@ func newAlertNode(et *ExecutingTask, n *pipeline.AlertNode, l *log.Logger) (an *
Args: e.Command[1:],
Commander: et.tm.Commander,
}
h := alertservice.NewExecHandler(c, l)
h := alertservice.NewExecHandler(c, an.diag)
an.handlers = append(an.handlers, h)
}

Expand All @@ -165,7 +169,7 @@ func newAlertNode(et *ExecutingTask, n *pipeline.AlertNode, l *log.Logger) (an *
if log.Mode != 0 {
c.Mode = os.FileMode(log.Mode)
}
h, err := alertservice.NewLogHandler(c, l)
h, err := alertservice.NewLogHandler(c, an.diag)
if err != nil {
return nil, errors.Wrap(err, "failed to create log alert handler")
}
Expand All @@ -176,25 +180,25 @@ func newAlertNode(et *ExecutingTask, n *pipeline.AlertNode, l *log.Logger) (an *
c := victorops.HandlerConfig{
RoutingKey: vo.RoutingKey,
}
h := et.tm.VictorOpsService.Handler(c, l)
h := et.tm.VictorOpsService.Handler(c, ctx...)
an.handlers = append(an.handlers, h)
}
if len(n.VictorOpsHandlers) == 0 && (et.tm.VictorOpsService != nil && et.tm.VictorOpsService.Global()) {
c := victorops.HandlerConfig{}
h := et.tm.VictorOpsService.Handler(c, l)
h := et.tm.VictorOpsService.Handler(c, ctx...)
an.handlers = append(an.handlers, h)
}

for _, pd := range n.PagerDutyHandlers {
c := pagerduty.HandlerConfig{
ServiceKey: pd.ServiceKey,
}
h := et.tm.PagerDutyService.Handler(c, l)
h := et.tm.PagerDutyService.Handler(c, ctx...)
an.handlers = append(an.handlers, h)
}
if len(n.PagerDutyHandlers) == 0 && (et.tm.PagerDutyService != nil && et.tm.PagerDutyService.Global()) {
c := pagerduty.HandlerConfig{}
h := et.tm.PagerDutyService.Handler(c, l)
h := et.tm.PagerDutyService.Handler(c, ctx...)
an.handlers = append(an.handlers, h)
}

Expand All @@ -203,7 +207,7 @@ func newAlertNode(et *ExecutingTask, n *pipeline.AlertNode, l *log.Logger) (an *
Source: s.Source,
Handlers: s.HandlersList,
}
h, err := et.tm.SensuService.Handler(c, l)
h, err := et.tm.SensuService.Handler(c, ctx...)
if err != nil {
return nil, errors.Wrap(err, "failed to create sensu alert handler")
}
Expand All @@ -216,11 +220,11 @@ func newAlertNode(et *ExecutingTask, n *pipeline.AlertNode, l *log.Logger) (an *
Username: s.Username,
IconEmoji: s.IconEmoji,
}
h := et.tm.SlackService.Handler(c, l)
h := et.tm.SlackService.Handler(c, ctx...)
an.handlers = append(an.handlers, h)
}
if len(n.SlackHandlers) == 0 && (et.tm.SlackService != nil && et.tm.SlackService.Global()) {
h := et.tm.SlackService.Handler(slack.HandlerConfig{}, l)
h := et.tm.SlackService.Handler(slack.HandlerConfig{}, ctx...)
an.handlers = append(an.handlers, h)
}
// If slack has been configured with state changes only set it.
Expand All @@ -237,7 +241,7 @@ func newAlertNode(et *ExecutingTask, n *pipeline.AlertNode, l *log.Logger) (an *
DisableWebPagePreview: t.IsDisableWebPagePreview,
DisableNotification: t.IsDisableNotification,
}
h := et.tm.TelegramService.Handler(c, l)
h := et.tm.TelegramService.Handler(c, ctx...)
an.handlers = append(an.handlers, h)
}

Expand All @@ -254,7 +258,7 @@ func newAlertNode(et *ExecutingTask, n *pipeline.AlertNode, l *log.Logger) (an *
TrapOid: s.TrapOid,
DataList: dataList,
}
h, err := et.tm.SNMPTrapService.Handler(c, l)
h, err := et.tm.SNMPTrapService.Handler(c, ctx...)
if err != nil {
return nil, errors.Wrapf(err, "failed to create SNMP handler")
}
Expand All @@ -263,7 +267,7 @@ func newAlertNode(et *ExecutingTask, n *pipeline.AlertNode, l *log.Logger) (an *

if len(n.TelegramHandlers) == 0 && (et.tm.TelegramService != nil && et.tm.TelegramService.Global()) {
c := telegram.HandlerConfig{}
h := et.tm.TelegramService.Handler(c, l)
h := et.tm.TelegramService.Handler(c, ctx...)
an.handlers = append(an.handlers, h)
}
// If telegram has been configured with state changes only set it.
Expand All @@ -278,12 +282,12 @@ func newAlertNode(et *ExecutingTask, n *pipeline.AlertNode, l *log.Logger) (an *
Room: hc.Room,
Token: hc.Token,
}
h := et.tm.HipChatService.Handler(c, l)
h := et.tm.HipChatService.Handler(c, ctx...)
an.handlers = append(an.handlers, h)
}
if len(n.HipChatHandlers) == 0 && (et.tm.HipChatService != nil && et.tm.HipChatService.Global()) {
c := hipchat.HandlerConfig{}
h := et.tm.HipChatService.Handler(c, l)
h := et.tm.HipChatService.Handler(c, ctx...)
an.handlers = append(an.handlers, h)
}
// If HipChat has been configured with state changes only set it.
Expand Down Expand Up @@ -322,7 +326,7 @@ func newAlertNode(et *ExecutingTask, n *pipeline.AlertNode, l *log.Logger) (an *
if a.Timeout != 0 {
c.Timeout = a.Timeout
}
h, err := et.tm.AlertaService.Handler(c, l)
h, err := et.tm.AlertaService.Handler(c, ctx...)
if err != nil {
return nil, errors.Wrap(err, "failed to create Alerta handler")
}
Expand All @@ -346,7 +350,7 @@ func newAlertNode(et *ExecutingTask, n *pipeline.AlertNode, l *log.Logger) (an *
if p.Sound != "" {
c.Sound = p.Sound
}
h := et.tm.PushoverService.Handler(c, l)
h := et.tm.PushoverService.Handler(c, ctx...)
an.handlers = append(an.handlers, h)
}

Expand All @@ -356,7 +360,7 @@ func newAlertNode(et *ExecutingTask, n *pipeline.AlertNode, l *log.Logger) (an *
Endpoint: p.Endpoint,
Headers: p.Headers,
}
h := et.tm.HTTPPostService.Handler(c, l)
h := et.tm.HTTPPostService.Handler(c, ctx...)
an.handlers = append(an.handlers, h)
}

Expand All @@ -365,17 +369,17 @@ func newAlertNode(et *ExecutingTask, n *pipeline.AlertNode, l *log.Logger) (an *
TeamsList: og.TeamsList,
RecipientsList: og.RecipientsList,
}
h := et.tm.OpsGenieService.Handler(c, l)
h := et.tm.OpsGenieService.Handler(c, ctx...)
an.handlers = append(an.handlers, h)
}
if len(n.OpsGenieHandlers) == 0 && (et.tm.OpsGenieService != nil && et.tm.OpsGenieService.Global()) {
c := opsgenie.HandlerConfig{}
h := et.tm.OpsGenieService.Handler(c, l)
h := et.tm.OpsGenieService.Handler(c, ctx...)
an.handlers = append(an.handlers, h)
}

for range n.TalkHandlers {
h := et.tm.TalkService.Handler(l)
h := et.tm.TalkService.Handler(ctx...)
an.handlers = append(an.handlers, h)
}

Expand All @@ -386,7 +390,7 @@ func newAlertNode(et *ExecutingTask, n *pipeline.AlertNode, l *log.Logger) (an *
QoS: mqtt.QoSLevel(m.Qos),
Retained: m.Retained,
}
h := et.tm.MQTTService.Handler(c, l)
h := et.tm.MQTTService.Handler(c, ctx...)
an.handlers = append(an.handlers, h)
}
// Parse level expressions
Expand Down Expand Up @@ -560,8 +564,8 @@ func (n *AlertNode) restoreEvent(id string) (alert.Level, time.Time) {
// Check for previous state on anonTopic
if n.hasAnonTopic() {
if state, ok, err := n.et.tm.AlertService.EventState(n.anonTopic, id); err != nil {
n.incrementErrorCount()
n.logger.Printf("E! failed to get event state for anonymous topic %s, event %s: %v", n.anonTopic, id, err)
n.diag.Error("failed to get event state for anonymous topic", err,
keyvalue.KV("topic", n.anonTopic), keyvalue.KV("event", id))
} else if ok {
anonTopicState = state
anonFound = true
Expand All @@ -570,8 +574,8 @@ func (n *AlertNode) restoreEvent(id string) (alert.Level, time.Time) {
// Check for previous state on topic.
if n.hasTopic() {
if state, ok, err := n.et.tm.AlertService.EventState(n.topic, id); err != nil {
n.incrementErrorCount()
n.logger.Printf("E! failed to get event state for topic %s, event %s: %v", n.topic, id, err)
n.diag.Error("failed to get event state for topic", err,
keyvalue.KV("topic", n.anonTopic), keyvalue.KV("event", id))
} else if ok {
topicState = state
topicFound = true
Expand All @@ -581,14 +585,12 @@ func (n *AlertNode) restoreEvent(id string) (alert.Level, time.Time) {
if anonFound && topicFound {
// Anon topic takes precedence
if err := n.et.tm.AlertService.UpdateEvent(n.topic, anonTopicState); err != nil {
n.incrementErrorCount()
n.logger.Printf("E! failed to update topic %q event state for event %q", n.topic, id)
n.diag.Error("failed to update topic event state", err, keyvalue.KV("topic", n.topic), keyvalue.KV("event", id))
}
} else if topicFound && n.hasAnonTopic() {
// Update event state for topic
if err := n.et.tm.AlertService.UpdateEvent(n.anonTopic, topicState); err != nil {
n.incrementErrorCount()
n.logger.Printf("E! failed to update topic %q event state for event %q", n.topic, id)
n.diag.Error("failed to update topic event state", err, keyvalue.KV("topic", n.topic), keyvalue.KV("event", id))
}
} // else nothing was found, nothing to do
}
Expand Down Expand Up @@ -623,16 +625,15 @@ func (n *AlertNode) handleEvent(event alert.Event) {
case alert.Critical:
n.critsTriggered.Add(1)
}
n.logger.Printf("D! %v alert triggered id:%s msg:%s data:%v", event.State.Level, event.State.ID, event.State.Message, event.Data.Result.Series[0])
n.diag.AlertTriggered(event.State.Level, event.State.ID, event.State.Message, event.Data.Result.Series[0])

// If we have anon handlers, emit event to the anonTopic
if n.hasAnonTopic() {
event.Topic = n.anonTopic
err := n.et.tm.AlertService.Collect(event)
if err != nil {
n.eventsDropped.Add(1)
n.incrementErrorCount()
n.logger.Println("E!", err)
n.diag.Error("encountered error collecting event", err)
}
}

Expand All @@ -642,8 +643,7 @@ func (n *AlertNode) handleEvent(event alert.Event) {
err := n.et.tm.AlertService.Collect(event)
if err != nil {
n.eventsDropped.Add(1)
n.incrementErrorCount()
n.logger.Println("E!", err)
n.diag.Error("encountered error collecting event", err)
}
}
}
Expand All @@ -654,8 +654,7 @@ func (n *AlertNode) determineLevel(p edge.FieldsTagsTimeGetter, currentLevel ale
}
if rse := n.levelResets[currentLevel]; rse != nil {
if pass, err := EvalPredicate(rse, n.lrScopePools[currentLevel], p); err != nil {
n.incrementErrorCount()
n.logger.Printf("E! error evaluating reset expression for current level %v: %s", currentLevel, err)
n.diag.Error("error evaluating reset expression for current level", err, keyvalue.KV("level", currentLevel.String()))
} else if !pass {
return currentLevel
}
Expand All @@ -676,8 +675,7 @@ func (n *AlertNode) findFirstMatchLevel(start alert.Level, stop alert.Level, p e
continue
}
if pass, err := EvalPredicate(se, n.scopePools[l], p); err != nil {
n.incrementErrorCount()
n.logger.Printf("E! error evaluating expression for level %v: %s", alert.Level(l), err)
n.diag.Error("error evaluating expression for level", err, keyvalue.KV("level", alert.Level(l).String()))
continue
} else if pass {
return alert.Level(l), true
Expand Down
6 changes: 1 addition & 5 deletions alert/topics.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package alert

import (
"fmt"
"log"
"path"
"sort"
"sync"
Expand All @@ -20,14 +19,11 @@ type Topics struct {
mu sync.RWMutex

topics map[string]*Topic

logger *log.Logger
}

func NewTopics(l *log.Logger) *Topics {
func NewTopics() *Topics {
s := &Topics{
topics: make(map[string]*Topic),
logger: l,
}
return s
}
Expand Down
Loading

0 comments on commit 5d228b2

Please sign in to comment.