Skip to content

Commit

Permalink
Moved the collection of stats into its own go routine
Browse files Browse the repository at this point in the history
  • Loading branch information
Per Abich committed Feb 18, 2020
1 parent 3e8a3a4 commit 26d0642
Showing 1 changed file with 63 additions and 31 deletions.
94 changes: 63 additions & 31 deletions postfix_exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package main
import (
"bufio"
"bytes"
"context"
"errors"
"fmt"
"github.com/alecthomas/kingpin"
Expand Down Expand Up @@ -293,7 +294,7 @@ var (
)

// CollectFromLogline collects metrict from a Postfix log line.
func (e *PostfixExporter) CollectFromLogline(line string) {
func (e *PostfixExporter) CollectFromLogLine(line string) {
// Strip off timestamp, hostname, etc.
if logMatches := logLine.FindStringSubmatch(line); logMatches != nil {
// Group patterns to check by Postfix service.
Expand Down Expand Up @@ -397,7 +398,7 @@ func (e *PostfixExporter) CollectFromLogline(line string) {
}
e.smtpDelays.WithLabelValues("transmission").Observe(xdelay)

if smtpMatches := smtpStatusDeferredLine.FindStringSubmatch(logMatches[2]) ; smtpMatches != nil {
if smtpMatches := smtpStatusDeferredLine.FindStringSubmatch(logMatches[2]); smtpMatches != nil {
e.smtpStatusDeferred.Inc()
}
} else if smtpTLSMatches := smtpTLSLine.FindStringSubmatch(logMatches[2]); smtpTLSMatches != nil {
Expand Down Expand Up @@ -438,14 +439,25 @@ func (e *PostfixExporter) CollectFromLogline(line string) {
}

// CollectLogfileFromFile tails a Postfix log file and collects entries from it.
func (e *PostfixExporter) CollectLogfileFromFile() error {
func (e *PostfixExporter) CollectLogfileFromFile(ctx context.Context) {
gaugeVec := prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: "postfix",
Subsystem: "",
Name: "up",
Help: "Whether scraping Postfix's metrics was successful.",
},
[]string{"path"})
gauge := gaugeVec.With(prometheus.Labels{"path": e.tailer.Filename})
for {
select {
case line := <-e.tailer.Lines:
e.CollectFromLogline(line.Text)
default:
return nil
e.CollectFromLogLine(line.Text)
case <-ctx.Done():
gauge.Set(0)
return
}
gauge.Set(1)
}
}

Expand Down Expand Up @@ -596,7 +608,6 @@ func NewPostfixExporter(showqPath string, logfilePath string, journal *Journal)
Name: "smtp_status_deferred",
Help: "Total number of messages deferred.",
}),

}, nil
}

Expand Down Expand Up @@ -627,6 +638,48 @@ func (e *PostfixExporter) Describe(ch chan<- *prometheus.Desc) {
e.unsupportedLogEntries.Describe(ch)
}

func (e *PostfixExporter) foreverCollectFromJournal(ctx context.Context) {
gauge := prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: "postfix",
Subsystem: "",
Name: "up",
Help: "Whether scraping Postfix's metrics was successful.",
},
[]string{"path"}).With(prometheus.Labels{"path": e.journal.Path})
select {
case <-ctx.Done():
gauge.Set(0)
return
default:
err := e.CollectLogfileFromJournal()
if err != nil {
log.Printf("Couldn't read journal: %v", err)
gauge.Set(0)
} else {
gauge.Set(1)
}
}
}

func (e *PostfixExporter) StartMetricCollection(ctx context.Context) {
if e.journal != nil {
e.foreverCollectFromJournal(ctx)
} else {
e.CollectLogfileFromFile(ctx)
}

prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: "postfix",
Subsystem: "",
Name: "up",
Help: "Whether scraping Postfix's metrics was successful.",
},
[]string{"path"})
return
}

// Collect metrics from Postfix's showq socket and its log file.
func (e *PostfixExporter) Collect(ch chan<- prometheus.Metric) {
err := CollectShowqFromSocket(e.showqPath, ch)
Expand All @@ -645,29 +698,6 @@ func (e *PostfixExporter) Collect(ch chan<- prometheus.Metric) {
e.showqPath)
}

var src string
if e.journal != nil {
err = e.CollectLogfileFromJournal()
src = e.journal.Path
} else {
err = e.CollectLogfileFromFile()
src = e.tailer.Filename
}
if err == nil {
ch <- prometheus.MustNewConstMetric(
postfixUpDesc,
prometheus.GaugeValue,
1.0,
src)
} else {
log.Printf("Failed to scrape log: %s", err)
ch <- prometheus.MustNewConstMetric(
postfixUpDesc,
prometheus.GaugeValue,
0.0,
src)
}

ch <- e.cleanupProcesses
ch <- e.cleanupRejects
ch <- e.cleanupNotAccepted
Expand Down Expand Up @@ -742,7 +772,9 @@ func main() {
panic(err)
}
})

ctx, cancelFunc := context.WithCancel(context.Background())
defer cancelFunc()
go exporter.StartMetricCollection(ctx)
log.Print("Listening on ", *listenAddress)
log.Fatal(http.ListenAndServe(*listenAddress, nil))
}

0 comments on commit 26d0642

Please sign in to comment.