Add dependency to the project
go get github.com/trustwallet/go-libs/metrics
- The
handler.go
contains very simple method to register Prometheus middleware withgin-gonic
engine. - The
metrics.go
is a place for the generic metrics services.PerformanceMetric
allows to track generic job performance, start time, duration, success and failed executions.
- The
register.go
contains function which allows registering collectors with custom scope (prometheus labels ornil
) and target Prometheus Registerer instance (could be a Default Registerer). - The
pusher.go
configures Prometheus Pushgateway client (see Push mode below). - The
go-libs/worker
andgo-libs/mq
packages integration for automatic job performance tracking.
When we think about collecting system metrics there are several approaches available:
- Case 1 - There a system (service) that might have been built without Prometheus in mind
- Case 2 - There a custom Prometheus Collector
- Case 3 - There is a service that is Prometheus aware, but manages inner collectors
For this case the custom Prometheus Collector is recommended which wraps
existing service and invokes its functions on Collect()
method being called.
See the example in the prometheus/client_golang repo.
The go-libs/metrics
package offers the metrics.Register(namespace, labels, registerer)
method
for easier registration of custom collectors.
This case is pretty much the same as Case 1, but the collector itself is aware of the logic required to collect and set collectors' values.
Example of the APIMetricsCollector
// Descriptor used by the APIMetricsCollector below.
var tickersCachedTotal = prometheus.NewGaugeVec(prometheus.GaugeOpts{
Namespace: "market",
Subsystem: "api",
Name: "tickers_cached_total",
Help: "Total number of tickers cached",
}, nil)
type APIMetricsCollector struct {
tickersCache cache.Data
}
func NewAPIMetricsCollector(tickersCache cache.Data) *APIMetricsCollector {
return &APIMetrics{
tickersCache: tickersCache,
}
}
// Describe is implemented with DescribeByCollect. That's possible because the
// Collect method will always return the same metrics with the same descriptors.
func (c APIMetricsCollector) Describe(ch chan<- *prometheus.Desc) {
prometheus.DescribeByCollect(c, ch)
}
// Collect first triggers the internal collect() to fetch data from the underlying services
// and update set collectors' values.
// Then it simply executes collector.Collect().
func (c APIMetricsCollector) Collect(ch chan<- prometheus.Metric) {
c.log.Info("collect market api metrics")
err := c.collect()
if err != nil {
log.Error(err)
}
tickersCachedTotal.Collect(ch)
}
func (c *APIMetricsCollector) collect() error {
log.Info("export market api metrics")
tCached, err := s.tickersCache.GetAllTickers()
if err != nil {
return errors.Wrap(err, "failed to get all cached tickers")
}
tickersCachedTotal.WithLabelValues().Set(float64(len(tCached)))
return nil
}
The instance of this collector has to be registered with Prometheus client.
// main.go
func initMetrics(tickersCache *memory.DataInstance) {
// disable default go collector which produces a lot of noise
prometheus.Unregister(collectors.NewGoCollector())
// register prometheus http handler
metrics.InitHandler(engine, "/metrics")
// register collector
metrics.Register(nil, prometheus.DefaultRegisterer, api.NewAPIMetricsCollector(tickersCache))
}
The example of the 3rd case is the PerformanceMetric
which is delivered as part
of this package.
Internally it manages several collectors and registers itself with passed Prometheus Registerer. The passed prometheus.labels
allow to initialize
multiple instances of the service to track different target jobs execution
without a collision (from Prometheus perspective).
The initialization of the metric
metric := metrics.NewPerformanceMetric(
"market_api",
prometheus.Labels{"module": "tickers"},
prometheus.DefaultRegisterer,
)
The usage of the metric service
func (j *job) SomeJob() error {
defer j.metric.Duration(j.metric.Start())
err := doSomeWork()
if err != nil {
j.metric.Failure()
return err
}
j.metric.Success()
}
The web
applications that are hosted to serve incoming requests usually
have the /metrics
or similar endpoint exposed.
One extra line in the main application logic to expose the registered collectors for Prometheus scrapper.
engine := gin.New()
// register prometheus http handler
metrics.InitHandler(engine, "/metrics")
The worker
applications are launched without a capability to serve the incoming requests,
thus /metrics
endpoint (Prometheus handler) cannot be utilized there. Instead,
the Prometheus Pushgateway should be used, an intermediary service which allows to push metrics
from jobs which cannot be scraped directly.
Assuming, the PerformanceMetric
(see above) is configured to capture
worker performance.
To make the metric collectors push the values to Prometheus Pushgateway server, need to initialize the Pushgateway client and set up the worker which pushes registered collectors' values.
func initMetrics() (worker.Worker, error) {
pusher := metrics.NewPusher(pushgatewayURL, "market_worker")
// check connection to pusher
err := pusher.Push()
if err != nil {
log.WithError(err).
Error("cannot connect to pushgateway, metrics won't be pushed")
return err, nil
}
return worker.InitWorker(
"metrics_pusher",
worker.DefaultWorkerOptions(pushInterval),
pusher.Push,
), nil
}
// start metrics_pusher worker
initMetrics().Start(ctx, wg)
📎 When pushing the collectors' values to Pushgateway the instance
label is
automatically set from DYNO
(set by Heroku) or INSTANCE_ID
(generic variable that
can be set easily) environment variables; otherwise instance is local
.
The latest worker
package has already integrated with metrics
package
and tracks worker function performance automatically.
The following code is already part of the worker
package:
func (w *worker) invoke() {
metric := w.options.PerformanceMetric
// no-op perf metric
if metric == nil {
metric = &metrics.NullablePerformanceMetric{}
}
// collect worker start time and duration on func return
defer metric.Duration(metric.Start())
// invoke worker function
err := w.workerFn()
if err != nil {
// increment failure counter on error
metric.Failure()
log.WithField("worker", w.name).Error(err)
} else {
// increment success counter
metric.Success(lvs)
}
}
If the PerformanceMetric
option wasn't initialized it does no-op.
To initialize the worker performance tracking use the following code:
options := worker.DefaultWorkerOptions(interval)
if metricsEnabled {
options.PerformanceMetric = metrics.NewPerformanceMetric(
"market_worker",
prometheus.Labels{"worker": workerName},
prometheus.DefaultRegisterer,
)
}
worker := worker.InitWorker(WorkerName, options, service.DoWork)
ctx, cancel := context.WithCancel(context.Background())
wg := &sync.WaitGroup{}
worker.Start(ctx, waitGroup)
This example assumes either Prometheus handler was initialized for scrape mode
or Pushgateway pusher
was started with metrics_pusher
worker (see above).
The latest mq
package has already integrated with metrics
package
and tracks functions processing incoming messages performance automatically.
The following code is already part of the mq
package:
func (c *consumer) process(queueName string, body []byte) error {
metric := c.options.PerformanceMetric
// no-op perf metric
if metric == nil {
metric = &metrics.NullablePerformanceMetric{}
}
// collect worker start time and duration on func return
defer metric.Duration(metric.Start())
// invoke message process function
err := c.fn(body)
if err != nil {
// increment failure counter on error
metric.Failure()
} else {
// increment success counter
metric.Success()
}
return err
}
The registration is also very similar:
options := mq.DefaultConsumerOptions(workersCount)
if maxRetries > 0 {
options.MaxRetries = maxRetries
}
if metricsEnabled {
options.PerformanceMetric = metrics.NewPerformanceMetric(
"market_consumer",
prometheus.Labels{"queue_name": string(queueName)},
prometheus.DefaultRegisterer,
)
}
mqClient, err := mq.Connect(rabbitmqURL)
if err != nil {
log.WithError(err).Fatal("failed to init Rabbit MQ client")
}
consumer := mqClient.InitConsumer(queueName, options, service.DoMessageProcessing)
ctx, cancel := context.WithCancel(context.Background())
if err := mqClient.StartConsumers(ctx, consumer); err != nil {
log.WithError(err).Fatal("failed to start Rabbit MQ consumers")
}