Skip to content

Commit

Permalink
fix smtp for unauthed email servers
Browse files Browse the repository at this point in the history
  • Loading branch information
nathanielc committed Nov 25, 2015
1 parent 446d50d commit 1ed3f7b
Show file tree
Hide file tree
Showing 10 changed files with 106 additions and 37 deletions.
4 changes: 4 additions & 0 deletions alert.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,10 @@ func newAlertNode(et *ExecutingTask, n *pipeline.AlertNode) (an *AlertNode, err
}
if n.UseEmail || (et.tm.SMTPService != nil && et.tm.SMTPService.Global()) {
an.handlers = append(an.handlers, an.handleEmail)
// If email has been configured globally only send state changes.
if et.tm.SMTPService != nil && et.tm.SMTPService.Global() {
n.IsStateChangesOnly = true
}
}
if n.UseVictorOps || (et.tm.VictorOpsService != nil && et.tm.VictorOpsService.Global()) {
an.handlers = append(an.handlers, an.handleVictorOps)
Expand Down
39 changes: 28 additions & 11 deletions build.py
Original file line number Diff line number Diff line change
Expand Up @@ -118,10 +118,23 @@ def run(command, allow_failure=False, shell=False):
def create_temp_dir():
return tempfile.mkdtemp(prefix='kapacitor-build')

def get_current_version():
def get_current_version_tag():
version = run("git describe --always --tags --abbrev=0").strip()
# Remove leading 'v'
return version[1:]
return version

def get_current_version():
version_tag = get_current_version_tag()
# Remove leading 'v' and possible '-rc\d+'
version = re.sub(r'-rc\d+', '', version_tag[1:])
return version

def get_current_rc():
rc = None
version_tag = get_current_version_tag()
matches = re.match(r'.*-rc(\d+)', version_tag)
if matches:
rc, = matches.groups(1)
return rc

def get_current_commit(short=False):
command = None
Expand Down Expand Up @@ -231,9 +244,6 @@ def build(version=None,
shutil.rmtree(outdir)
os.makedirs(outdir)

if rc:
# If a release candidate, update the version information accordingly
version = "{}rc{}".format(version, rc)

get_command = None
if update:
Expand Down Expand Up @@ -293,7 +303,7 @@ def package_scripts(build_root):
shutil.copy(LOGROTATE_CONFIG, os.path.join(build_root, LOGROTATE_CONFIG))
shutil.copy(DEFAULT_CONFIG, os.path.join(build_root, DEFAULT_CONFIG))

def build_packages(build_output, version):
def build_packages(build_output, version, rc):
outfiles = []
tmp_build_dir = create_temp_dir()
try:
Expand All @@ -317,12 +327,19 @@ def build_packages(build_output, version):
print "\t- Packaging directory '{}' as '{}'...".format(build_root, package_type),
name = 'kapacitor'
if package_type in ['zip', 'tar']:
name = '{}-{}_{}_{}'.format(name, version, p, a)
fpm_command = "fpm {} --name {} -t {} --version {} -C {} -p {}".format(
v = version
if rc is not None:
v = '{}-1.rc{}'.format(version, rc)
name = '{}-{}_{}_{}'.format(name, v, p, a)
iteration = '1'
if rc is not None:
iteration = '1.rc{}'.format(rc)
fpm_command = "fpm {} --name {} -t {} --version {} --iteration {} -C {} -p {} ".format(
fpm_common_args,
name,
package_type,
version,
iteration,
build_root,
current_location,
)
Expand Down Expand Up @@ -377,7 +394,7 @@ def main():
nightly_version = None
branch = None
version = get_current_version()
rc = None
rc = get_current_rc()
clean = False
package = False
update = False
Expand Down Expand Up @@ -500,7 +517,7 @@ def main():
if not check_path_for("fpm"):
print "!! Cannot package without command 'fpm'. Stopping."
sys.exit(1)
packages = build_packages(build_output, version)
packages = build_packages(build_output, version, rc)
if upload:
upload_packages(packages)
return 0
Expand Down
2 changes: 1 addition & 1 deletion build.sh
Original file line number Diff line number Diff line change
Expand Up @@ -17,4 +17,4 @@ docker run --rm \
-e AWS_SECRET_ACCESS_KEY="$AWS_SECRET_ACCESS_KEY" \
-v $DIR:/gopath/src/github.com/influxdb/kapacitor \
influxdb/kapacitor-builder \
--outdir=/tmp/kapacitor-build "$@"
"$@"
2 changes: 1 addition & 1 deletion circle.yml
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,6 @@ test:

deployment:
release:
tag: /v[0-9]+(\.[0-9]+)*/
tag: /v[0-9]+(\.[0-9]+)*(-rc[0-9]+)?/
commands:
- ./build.sh --clean --packages --upload --platform=all --arch=all
5 changes: 5 additions & 0 deletions etc/kapacitor/kapacitor.conf
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,11 @@ token = ""
port = 25
username = ""
password = ""
# From address for outgoing mail
from = ""
# List of default To addresses.
# to = ["[email protected]"]

# Skip TLS certificate verify when connecting to SMTP server
no-verify = false
# Close idle connections after timeout
Expand Down
62 changes: 42 additions & 20 deletions pipeline/alert.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ const defaultIDTmpl = "{{ .Name }}:{{ .Group }}"
// Default template for constructing a message.
const defaultMessageTmpl = "{{ .ID }} is {{ .Level }}"

// An AlertNode can trigger an event of varying severity levels
// An AlertNode can trigger an event of varying severity levels,
// and pass the event to alert handlers. The criteria for triggering
// an alert is specified via a [lambda expression](https://influxdb.com/docs/kapacitor/v0.1/tick/expr.html).
// See AlertNode.Info, AlertNode.Warn, and AlertNode.Cirt below.
Expand Down Expand Up @@ -60,10 +60,10 @@ const defaultMessageTmpl = "{{ .ID }} is {{ .Level }}"
//
//
// It is assumed that each successive level filters a subset
// of the previous level. As a result the filter will only be applied if
// of the previous level. As a result, the filter will only be applied if
// a data point passed the previous level.
// In the above example if value = 15 then the INFO and
// WARNING expressions would be evaluated but not the
// In the above example, if value = 15 then the INFO and
// WARNING expressions would be evaluated, but not the
// CRITICAL expression.
// Each expression maintains its own state.
type AlertNode struct {
Expand All @@ -73,9 +73,9 @@ type AlertNode struct {
//
// Available template data:
//
// * Name -- Measurement name
// * Name -- Measurement name.
// * Group -- Concatenation of all group-by tags of the form [key=value,]+.
// If no groupBy is performed equal to literal 'nil'
// If no groupBy is performed equal to literal 'nil'.
// * Tags -- Map of tags. Use '{{ index .Tags "key" }}' to get a specific tag value.
//
// Example:
Expand Down Expand Up @@ -110,9 +110,9 @@ type AlertNode struct {
// Available template data:
//
// * ID -- The ID of the alert.
// * Name -- Measurement name
// * Name -- Measurement name.
// * Group -- Concatenation of all group-by tags of the form [key=value,]+.
// If no groupBy is performed equal to literal 'nil'
// If no groupBy is performed equal to literal 'nil'.
// * Tags -- Map of tags. Use '{{ index .Tags "key" }}' to get a specific tag value.
// * Level -- Alert Level, one of: INFO, WARNING, CRITICAL.
// * Fields -- Map of fields. Use '{{ index .Fields "key" }}' to get a specific field value.
Expand Down Expand Up @@ -215,10 +215,34 @@ func (a *AlertNode) Exec(executable string, args ...string) *AlertNode {
}

// Email the alert data.
// If the To list is empty uses the To addresses from the configuration.
//
// If the To list is empty, the To addresses from the configuration are used.
// The email subject is the AlertNode.Message property.
// The email body is the JSON alert data.
//
// If the 'smtp' section in the configuration has the option: global = true
// then all alerts are sent via email without the need to explicitly state it
// in the TICKscript.
//
// Example:
// [smtp]
// enabled = true
// host = "localhost"
// port = 25
// username = ""
// password = ""
// from = "[email protected]"
// to = ["[email protected]"]
// # Set global to true so all alert trigger emails.
// global = true
//
// Example:
// stream...
// .alert()
//
// Send email to '[email protected]' from '[email protected]'
//
// **NOTE**: The global option for email also implies stateChangesOnly is set on all alerts.
// tick:property
func (a *AlertNode) Email(to ...string) *AlertNode {
a.UseEmail = true
Expand All @@ -233,11 +257,11 @@ func (a *AlertNode) Email(to ...string) *AlertNode {
// Each different alerting level is considered a different state.
// The low and high thresholds are inverted thresholds of a percentage of state changes.
// Meaning that if the percentage of state changes goes above the `high`
// threshold the alert enters a flapping state. The alert remains in the flapping state
// threshold, the alert enters a flapping state. The alert remains in the flapping state
// until the percentage of state changes goes below the `low` threshold.
// Typical values are low: 0.25 and high: 0.5. The percentage values represent the number state changes
// over the total possible number of state changes. A percentage change of 0.5 means that the alert changed
// state in half of the recorded history and remained the same in the other half of the history.
// state in half of the recorded history, and remained the same in the other half of the history.
// tick:property
func (a *AlertNode) Flapping(low, high float64) Node {
a.UseFlapping = true
Expand Down Expand Up @@ -313,7 +337,7 @@ func (a *AlertNode) RoutingKey(routingKey string) *AlertNode {
// 3. Click the "Add Service" button.
// 4. Once the service is created, you'll be taken to the service page. On this page, you'll see the "Service key", which is needed to access the API
//
// Place the 'service key' into the 'pagerduty' section of the Kapacitor configuration as the option 'service-key'
// Place the 'service key' into the 'pagerduty' section of the Kapacitor configuration as the option 'service-key'.
//
// Example:
// [pagerduty]
Expand All @@ -327,8 +351,6 @@ func (a *AlertNode) RoutingKey(routingKey string) *AlertNode {
// .alert()
// .pagerDuty()
//
// Send alerts to PagerDuty.
//
// If the 'pagerduty' section in the configuration has the option: global = true
// then all alerts are sent to PagerDuty without the need to explicitly state it
// in the TICKscript.
Expand All @@ -351,7 +373,7 @@ func (a *AlertNode) PagerDuty() *AlertNode {
}

// Send the alert to Slack.
// To allow Kapacitor to post to Slack
// To allow Kapacitor to post to Slack,
// go to the URL https://slack.com/services/new/incoming-webhook
// and create a new incoming webhook and place the generated URL
// in the 'slack' configuration section.
Expand Down Expand Up @@ -405,7 +427,7 @@ func (a *AlertNode) PagerDuty() *AlertNode {
// .alert()
//
// Send alert to Slack using default channel '#general'.
// *NOTE*: The global option for Slack also implies stateChangesOnly is set on all alerts.
// **NOTE**: The global option for Slack also implies stateChangesOnly is set on all alerts.
// tick:property
func (a *AlertNode) Slack() *AlertNode {
a.UseSlack = true
Expand Down Expand Up @@ -434,10 +456,10 @@ func (a *AlertNode) Channel(channel string) *AlertNode {
// .stateChangesOnly()
// .slack()
//
// If the "value" is greater than 10 for a total of 60s then
// only two events will be sent. First when the value crosses
// the threshold and second when it falls back into an OK state.
// Without stateChangesOnly the alert would have triggered 7 times:
// If the "value" is greater than 10 for a total of 60s, then
// only two events will be sent. First, when the value crosses
// the threshold, and second, when it falls back into an OK state.
// Without stateChangesOnly, the alert would have triggered 7 times:
// 6 times for each 10s period where the condition was met and once more
// for the recovery.
//
Expand Down
4 changes: 2 additions & 2 deletions services/smtp/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,9 @@ type Config struct {
NoVerify bool `toml:"no-verify"`
// Whether all alerts should trigger an email.
Global bool `toml:"global"`
// Default from address
// From address
From string `toml:"from"`
// Default to addresses
// Default To addresses
To []string `toml:"to"`
// Close connection to SMTP server after idle timeout has elapsed
IdleTimeout toml.Duration `toml:"idle-timeout"`
Expand Down
11 changes: 10 additions & 1 deletion services/smtp/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package smtp

import (
"crypto/tls"
"errors"
"log"
"sync"
"time"
Expand All @@ -25,6 +26,9 @@ func NewService(c Config, l *log.Logger) *Service {
}

func (s *Service) Open() error {
if s.c.From == "" {
return errors.New("cannot open smtp service: missing from address in configuration")
}
s.wg.Add(1)
go s.runMailer()
return nil
Expand All @@ -42,7 +46,12 @@ func (s *Service) Global() bool {

func (s *Service) runMailer() {
defer s.wg.Done()
d := gomail.NewPlainDialer(s.c.Host, s.c.Port, s.c.Username, s.c.Password)
var d *gomail.Dialer
if s.c.Username == "" {
d = &gomail.Dialer{Host: s.c.Host, Port: s.c.Port}
} else {
d = gomail.NewPlainDialer(s.c.Host, s.c.Port, s.c.Username, s.c.Password)
}
if s.c.NoVerify {
d.TLSConfig = &tls.Config{InsecureSkipVerify: true}
}
Expand Down
12 changes: 12 additions & 0 deletions services/stats/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
package stats

import (
"errors"
"log"
"sync"
"time"
Expand All @@ -43,7 +44,9 @@ type Service struct {
db string
rp string

open bool
closing chan struct{}
mu sync.Mutex
wg sync.WaitGroup

logger *log.Logger
Expand All @@ -59,6 +62,9 @@ func NewService(c Config, l *log.Logger) *Service {
}

func (s *Service) Open() error {
s.mu.Lock()
defer s.mu.Unlock()
s.open = true
s.closing = make(chan struct{})
s.wg.Add(1)
go s.sendStats()
Expand All @@ -67,6 +73,12 @@ func (s *Service) Open() error {
}

func (s *Service) Close() error {
s.mu.Lock()
defer s.mu.Unlock()
if !s.open {
return errors.New("error closing stats service: service not open")
}
s.open = false
close(s.closing)
s.wg.Wait()
s.logger.Println("I! closed service")
Expand Down
2 changes: 1 addition & 1 deletion tick/cmd/tickdoc/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -398,7 +398,7 @@ func (n *Node) Render(buf *bytes.Buffer, r Renderer, nodes map[string]*Node) err
if len(n.Properties) > 0 {
r.Header(buf, func() bool { buf.Write([]byte("Properties")); return true }, 2, "")
r.Paragraph(buf, func() bool {
buf.Write([]byte("Property methods modify state on the calling node. They do not add another node to the pipeline and always return a reference to the calling node."))
buf.Write([]byte("Property methods modify state on the calling node. They do not add another node to the pipeline, and always return a reference to the calling node."))
return true
})
props := make([]string, len(n.Properties))
Expand Down

0 comments on commit 1ed3f7b

Please sign in to comment.