Skip to content

Commit

Permalink
feat: Prometheus Integration
Browse files Browse the repository at this point in the history
  • Loading branch information
estahn committed Apr 8, 2019
1 parent 8fc1be4 commit d7eb4a8
Show file tree
Hide file tree
Showing 5 changed files with 134 additions and 3 deletions.
34 changes: 34 additions & 0 deletions collector/collector.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package collector

import "github.com/prometheus/client_golang/prometheus"

const (
namespace = "rabbitmq_cli_consumer"
)

var (
ProcessCounter = prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: namespace,
Name: "process_total",
Help: "The total number of processes executed.",
},
[]string{"exit_code"},
)

ProcessDuration = prometheus.NewHistogram(
prometheus.HistogramOpts{
Namespace: namespace,
Name: "process_duration_seconds",
Help: "The time spent by the consumer to process the message.",
},
)

MessageDuration = prometheus.NewHistogram(
prometheus.HistogramOpts{
Namespace: namespace,
Name: "message_duration_seconds",
Help: "The time spent from publishing to finished processing the message.",
},
)
)
6 changes: 6 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
@@ -1,10 +1,16 @@
module github.com/corvus-ch/rabbitmq-cli-consumer

require (
bou.ke/monkey v1.0.1
github.com/bketelsen/logr v0.0.0-20170116012416-f3d070bdd1c5
github.com/codegangsta/cli v1.20.1-0.20190203184040-693af58b4d51
github.com/corvus-ch/logr v0.0.0-20180917163152-45217966b77e
github.com/magiconair/properties v1.8.0
github.com/pkg/errors v0.8.1
github.com/prometheus/client_golang v0.9.2
github.com/sebdah/goldie v0.0.0-20190305024101-629351c67c53
github.com/streadway/amqp v0.0.0-20190224195609-f16568b23ee6
github.com/stretchr/testify v1.3.0
gopkg.in/gcfg.v1 v1.2.3
gopkg.in/warnings.v0 v0.1.2 // indirect
)
29 changes: 27 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
@@ -1,13 +1,38 @@
bou.ke/monkey v1.0.1 h1:zEMLInw9xvNakzUUPjfS4Ds6jYPqCFx3m7bRmG5NH2U=
bou.ke/monkey v1.0.1/go.mod h1:FgHuK96Rv2Nlf+0u1OOVDpCMdsWyOFmeeketDHE7LIg=
github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24TaqPxmxbtue+5NUziq4I4S80YR8gNf3Q=
github.com/bketelsen/logr v0.0.0-20170116012416-f3d070bdd1c5 h1:k5oblHm+AdEGmDZA1NwxXjKuinRB6WsEuloKQ3i5u5g=
github.com/bketelsen/logr v0.0.0-20170116012416-f3d070bdd1c5/go.mod h1:to4EbfYTEzdQuHxVGz1ug+d7a3bqOjR1r02gJ1Xv4z8=
github.com/codegangsta/cli v1.20.0 h1:iX1FXEgwzd5+XN6wk5cVHOGQj6Q3Dcp20lUeS4lHNTw=
github.com/codegangsta/cli v1.20.0/go.mod h1:/qJNoX69yVSKu5o4jLyXAENLRyk1uhi7zkbQ3slBdOA=
github.com/codegangsta/cli v1.20.1-0.20190203184040-693af58b4d51 h1:onG4micSoSE6AwEVQcPY2Z2H28kf/SFG6URw3Vs9Nz8=
github.com/codegangsta/cli v1.20.1-0.20190203184040-693af58b4d51/go.mod h1:/qJNoX69yVSKu5o4jLyXAENLRyk1uhi7zkbQ3slBdOA=
github.com/corvus-ch/logr v0.0.0-20180917163152-45217966b77e h1:wL16/ZX3hdvCz4Xuz2oWSA9g4tV1N4sNFD3MYEUdDQM=
github.com/corvus-ch/logr v0.0.0-20180917163152-45217966b77e/go.mod h1:vDjLhcz5/6EgBgZdQ5BxE5YAzHHkkfX8XLMPOBhlg2c=
github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/golang/protobuf v1.2.0 h1:P3YflyNX/ehuJFLhxviNdFxQPkGK5cDcApsge1SqnvM=
github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
github.com/magiconair/properties v1.8.0 h1:LLgXmsheXeRoUOBOjtwPQCWIYqM/LU1ayDtDePerRcY=
github.com/magiconair/properties v1.8.0/go.mod h1:PppfXfuXeibc/6YijjN8zIbojt8czPbwD3XqdrwzmxQ=
github.com/matttproud/golang_protobuf_extensions v1.0.1/go.mod h1:D8He9yQNgCq6Z5Ld7szi9bcBfOoFv/3dc6xSMkL2PC0=
github.com/pkg/errors v0.8.1 h1:iURUrRGxPUNPdy5/HRSm+Yj6okJ6UtLINN0Q9M4+h3I=
github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/prometheus/client_golang v0.9.2/go.mod h1:OsXs2jCmiKlQ1lTBmv21f2mNfw4xf/QclQDMrYNZzcM=
github.com/prometheus/client_model v0.0.0-20180712105110-5c3871d89910/go.mod h1:MbSGuTsp3dbXC40dX6PRTWyKYBIrTGTE9sqQNg2J8bo=
github.com/prometheus/common v0.0.0-20181126121408-4724e9255275/go.mod h1:daVV7qP5qjZbuso7PdcryaAu0sAZbrN9i7WWcTMWvro=
github.com/prometheus/procfs v0.0.0-20181204211112-1dc9a6cbc91a/go.mod h1:c3At6R/oaqEKCNdg8wHV1ftS6bRYblBhIjjI8uT2IGk=
github.com/sebdah/goldie v0.0.0-20190305024101-629351c67c53 h1:qR9Fm180+oJEk1tyg+3wysrby5LvT9Y2vD4uKdLZlYw=
github.com/sebdah/goldie v0.0.0-20190305024101-629351c67c53/go.mod h1:lvjGftC8oe7XPtyrOidaMi0rp5B9+XY/ZRUynGnuaxQ=
github.com/streadway/amqp v0.0.0-20190224195609-f16568b23ee6 h1:AyOf9zHp1p9orBslizUk3QhgmX6PaOoR/2FQar0C9Lw=
github.com/streadway/amqp v0.0.0-20190224195609-f16568b23ee6/go.mod h1:1WNBiOZtZQLpVAyu0iTduoJL9hEsMloAK5XWrtW0xdY=
github.com/stretchr/objx v0.1.0 h1:4G4v2dO3VZwixGIRoQ5Lfboy6nUhCyYzaqnIAPPhYs4=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/testify v1.3.0 h1:TivCn/peBQ7UY8ooIcPgZFpTNSz0Q2U6UrFlUfqbe0Q=
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
golang.org/x/net v0.0.0-20181201002055-351d144fa1fc/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/sync v0.0.0-20181108010431-42b317875d0f h1:Bl/8QSvNqXvPGPGXa2z5xUTmV7VDcZyvRZ+QQXkXTZQ=
golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
gopkg.in/gcfg.v1 v1.2.3 h1:m8OOJ4ccYHnx2f4gQwpno8nAX5OGOh7RLaaz0pj3Ogs=
gopkg.in/gcfg.v1 v1.2.3/go.mod h1:yesOnuUOFQAhST5vPY4nbZsb/huCgGGXlipJsBn0b3o=
gopkg.in/warnings.v0 v0.1.2 h1:wFXVbFY8DY5/xOe1ECiWdKCzZlxgshcYVNkBHstARME=
Expand Down
57 changes: 56 additions & 1 deletion main.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,19 +4,24 @@ import (
"context"
"fmt"
stdlog "log"
"net/http"
"os"
"os/signal"
"strings"
"syscall"
"time"

"github.com/bketelsen/logr"
"github.com/codegangsta/cli"
"github.com/corvus-ch/rabbitmq-cli-consumer/acknowledger"
"github.com/corvus-ch/rabbitmq-cli-consumer/collector"
"github.com/corvus-ch/rabbitmq-cli-consumer/command"
"github.com/corvus-ch/rabbitmq-cli-consumer/config"
"github.com/corvus-ch/rabbitmq-cli-consumer/consumer"
"github.com/corvus-ch/rabbitmq-cli-consumer/log"
"github.com/corvus-ch/rabbitmq-cli-consumer/processor"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promhttp"
"github.com/streadway/amqp"
)

Expand All @@ -27,7 +32,7 @@ var (
)

// flags is the list of global flags known to the application.
var flags = []cli.Flag{
var flags []cli.Flag = []cli.Flag{
cli.StringFlag{
Name: "url, u",
Usage: "Connect with RabbitMQ using `URL`",
Expand Down Expand Up @@ -73,6 +78,21 @@ var flags = []cli.Flag{
Name: "no-declare",
Usage: "prevents the queue from being declared.",
},
cli.StringFlag{
Name: "web.listen-address",
Usage: "Address on which to expose metrics and web interface.",
Value: ":8080",
},
cli.StringFlag{
Name: "web.telemetry-path",
Usage: "Path under which to expose metrics.",
Value: "/metrics",
},
//cli.StringSliceFlag{
// Name: "web.message-duration-buckets",
// Usage: "Buckets for the message duration histogram.",
// Value: &cli.StringSlice{"0.005", "0.01", "0.025", "0.05", "0.1", "0.25", "0.5", "1", "2.5", "5", "10"},
//},
}

var ll logr.Logger
Expand Down Expand Up @@ -126,9 +146,44 @@ func Action(c *cli.Context) error {
}
defer client.Close()

setupPromServer(
c.String("web.listen-address"),
c.String("web.telemetry-path"),
)

return consume(client, l)
}

func setupPromServer(addr string, path string) {
srv := &http.Server{
Addr: addr,
// Good practice to set timeouts to avoid Slowloris attacks.
WriteTimeout: time.Second * 15,
ReadTimeout: time.Second * 15,
IdleTimeout: time.Second * 60,
}

prometheus.MustRegister(collector.ProcessCounter)
prometheus.MustRegister(collector.ProcessDuration)
prometheus.MustRegister(collector.MessageDuration)

http.Handle(path, promhttp.Handler())
http.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
w.Write([]byte(`<html>
<head><title>rabbitmq-cli-consumer</title></head>
<body>
<h1>rabbitmq-cli-consumer</h1>
<p><a href='` + path + `'>Metrics</a></p>
</body>
</html>`))
})
go func() {
if err := srv.ListenAndServe(); err != nil {
ll.Errorf("Failed to bind on %s: ", addr)
}
}()
}

func consume(client *consumer.Consumer, l logr.Logger) error {
done := make(chan error)
sig := make(chan os.Signal, 1)
Expand Down
11 changes: 11 additions & 0 deletions processor/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,17 @@ package processor

import (
"os/exec"
"strconv"
"sync"
"syscall"
"time"

"github.com/bketelsen/logr"
"github.com/corvus-ch/rabbitmq-cli-consumer/acknowledger"
"github.com/corvus-ch/rabbitmq-cli-consumer/collector"
"github.com/corvus-ch/rabbitmq-cli-consumer/command"
"github.com/corvus-ch/rabbitmq-cli-consumer/delivery"
"github.com/prometheus/client_golang/prometheus"
)

// Processor describes the interface used by the consumer to process messages.
Expand Down Expand Up @@ -47,8 +51,15 @@ func (p *processor) Process(d delivery.Delivery) error {
return NewCreateCommandError(err)
}

start := time.Now()
exitCode := p.run()

collector.ProcessCounter.With(prometheus.Labels{"exit_code": strconv.Itoa(exitCode)}).Inc()
collector.ProcessDuration.Observe(time.Since(start).Seconds())
if !d.Properties().Timestamp.IsZero() {
collector.MessageDuration.Observe(time.Since(d.Properties().Timestamp).Seconds())
}

if err := p.ack.Ack(d, exitCode); err != nil {
return NewAcknowledgmentError(err)
}
Expand Down

0 comments on commit d7eb4a8

Please sign in to comment.