Skip to content

Commit

Permalink
Report result set sizes returned from LogPoller (smartcontractkit#9889)
Browse files Browse the repository at this point in the history
* Report result set sized returned from LogPoller

* More granular buckets for measuring performance

* Post review fixes
  • Loading branch information
mateusz-sekara authored Jul 25, 2023
1 parent 04d9554 commit 016aa43
Show file tree
Hide file tree
Showing 2 changed files with 83 additions and 34 deletions.
65 changes: 44 additions & 21 deletions core/chains/evm/logpoller/observability.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,30 +17,42 @@ var (
float64(1 * time.Millisecond),
float64(5 * time.Millisecond),
float64(10 * time.Millisecond),
float64(25 * time.Millisecond),
float64(20 * time.Millisecond),
float64(30 * time.Millisecond),
float64(40 * time.Millisecond),
float64(50 * time.Millisecond),
float64(75 * time.Millisecond),
float64(60 * time.Millisecond),
float64(70 * time.Millisecond),
float64(80 * time.Millisecond),
float64(90 * time.Millisecond),
float64(100 * time.Millisecond),
float64(250 * time.Millisecond),
float64(200 * time.Millisecond),
float64(300 * time.Millisecond),
float64(400 * time.Millisecond),
float64(500 * time.Millisecond),
float64(750 * time.Millisecond),
float64(1 * time.Second),
float64(2 * time.Second),
float64(5 * time.Second),
}
lpQueryHistogram = promauto.NewHistogramVec(prometheus.HistogramOpts{
lpQueryDuration = promauto.NewHistogramVec(prometheus.HistogramOpts{
Name: "log_poller_query_duration",
Help: "Measures duration of Log Poller's queries fetching logs",
Buckets: sqlLatencyBuckets,
}, []string{"evmChainID", "query"})
lpQueryDataSets = promauto.NewGaugeVec(prometheus.GaugeOpts{
Name: "log_poller_query_dataset_size",
Help: "Measures size of the datasets returned by Log Poller's queries",
}, []string{"evmChainID", "query"})
)

// ObservedLogPoller is a decorator layer for LogPoller, responsible for adding pushing histogram metrics for some of the queries.
// ObservedLogPoller is a decorator layer for LogPoller, responsible for pushing Prometheus metrics reporting duration and size of result set for some of the queries.
// It doesn't change internal logic, because all calls are delegated to the origin LogPoller
type ObservedLogPoller struct {
LogPoller
histogram *prometheus.HistogramVec
chainId string
queryDuration *prometheus.HistogramVec
datasetSize *prometheus.GaugeVec
chainId string
}

// NewObservedLogPoller creates an observed version of log poller created by NewLogPoller
Expand All @@ -49,14 +61,15 @@ func NewObservedLogPoller(orm *ORM, ec Client, lggr logger.Logger, pollPeriod ti
finalityDepth int64, backfillBatchSize int64, rpcBatchSize int64, keepBlocksDepth int64) LogPoller {

return &ObservedLogPoller{
LogPoller: NewLogPoller(orm, ec, lggr, pollPeriod, finalityDepth, backfillBatchSize, rpcBatchSize, keepBlocksDepth),
histogram: lpQueryHistogram,
chainId: orm.chainID.String(),
LogPoller: NewLogPoller(orm, ec, lggr, pollPeriod, finalityDepth, backfillBatchSize, rpcBatchSize, keepBlocksDepth),
queryDuration: lpQueryDuration,
datasetSize: lpQueryDataSets,
chainId: orm.chainID.String(),
}
}

func (o *ObservedLogPoller) LogsCreatedAfter(eventSig common.Hash, address common.Address, after time.Time, confs int, qopts ...pg.QOpt) ([]Log, error) {
return withObservedQuery(o, "LogsCreatedAfter", func() ([]Log, error) {
return withObservedQueryAndResults(o, "LogsCreatedAfter", func() ([]Log, error) {
return o.LogPoller.LogsCreatedAfter(eventSig, address, after, confs, qopts...)
})
}
Expand All @@ -68,7 +81,7 @@ func (o *ObservedLogPoller) LatestLogByEventSigWithConfs(eventSig common.Hash, a
}

func (o *ObservedLogPoller) LatestLogEventSigsAddrsWithConfs(fromBlock int64, eventSigs []common.Hash, addresses []common.Address, confs int, qopts ...pg.QOpt) ([]Log, error) {
return withObservedQuery(o, "LatestLogEventSigsAddrsWithConfs", func() ([]Log, error) {
return withObservedQueryAndResults(o, "LatestLogEventSigsAddrsWithConfs", func() ([]Log, error) {
return o.LogPoller.LatestLogEventSigsAddrsWithConfs(fromBlock, eventSigs, addresses, confs, qopts...)
})
}
Expand All @@ -80,57 +93,67 @@ func (o *ObservedLogPoller) LatestBlockByEventSigsAddrsWithConfs(eventSigs []com
}

func (o *ObservedLogPoller) IndexedLogs(eventSig common.Hash, address common.Address, topicIndex int, topicValues []common.Hash, confs int, qopts ...pg.QOpt) ([]Log, error) {
return withObservedQuery(o, "IndexedLogs", func() ([]Log, error) {
return withObservedQueryAndResults(o, "IndexedLogs", func() ([]Log, error) {
return o.LogPoller.IndexedLogs(eventSig, address, topicIndex, topicValues, confs, qopts...)
})
}

func (o *ObservedLogPoller) IndexedLogsByBlockRange(start, end int64, eventSig common.Hash, address common.Address, topicIndex int, topicValues []common.Hash, qopts ...pg.QOpt) ([]Log, error) {
return withObservedQuery(o, "IndexedLogsByBlockRange", func() ([]Log, error) {
return withObservedQueryAndResults(o, "IndexedLogsByBlockRange", func() ([]Log, error) {
return o.LogPoller.IndexedLogsByBlockRange(start, end, eventSig, address, topicIndex, topicValues, qopts...)
})
}

func (o *ObservedLogPoller) IndexedLogsCreatedAfter(eventSig common.Hash, address common.Address, topicIndex int, topicValues []common.Hash, after time.Time, confs int, qopts ...pg.QOpt) ([]Log, error) {
return withObservedQuery(o, "IndexedLogsCreatedAfter", func() ([]Log, error) {
return withObservedQueryAndResults(o, "IndexedLogsCreatedAfter", func() ([]Log, error) {
return o.LogPoller.IndexedLogsCreatedAfter(eventSig, address, topicIndex, topicValues, after, confs, qopts...)
})
}

func (o *ObservedLogPoller) IndexedLogsTopicGreaterThan(eventSig common.Hash, address common.Address, topicIndex int, topicValueMin common.Hash, confs int, qopts ...pg.QOpt) ([]Log, error) {
return withObservedQuery(o, "IndexedLogsTopicGreaterThan", func() ([]Log, error) {
return withObservedQueryAndResults(o, "IndexedLogsTopicGreaterThan", func() ([]Log, error) {
return o.LogPoller.IndexedLogsTopicGreaterThan(eventSig, address, topicIndex, topicValueMin, confs, qopts...)
})
}

func (o *ObservedLogPoller) IndexedLogsTopicRange(eventSig common.Hash, address common.Address, topicIndex int, topicValueMin common.Hash, topicValueMax common.Hash, confs int, qopts ...pg.QOpt) ([]Log, error) {
return withObservedQuery(o, "IndexedLogsTopicRange", func() ([]Log, error) {
return withObservedQueryAndResults(o, "IndexedLogsTopicRange", func() ([]Log, error) {
return o.LogPoller.IndexedLogsTopicRange(eventSig, address, topicIndex, topicValueMin, topicValueMax, confs, qopts...)
})
}

func (o *ObservedLogPoller) IndexedLogsWithSigsExcluding(address common.Address, eventSigA, eventSigB common.Hash, topicIndex int, fromBlock, toBlock int64, confs int, qopts ...pg.QOpt) ([]Log, error) {
return withObservedQuery(o, "IndexedLogsWithSigsExcluding", func() ([]Log, error) {
return withObservedQueryAndResults(o, "IndexedLogsWithSigsExcluding", func() ([]Log, error) {
return o.LogPoller.IndexedLogsWithSigsExcluding(address, eventSigA, eventSigB, topicIndex, fromBlock, toBlock, confs, qopts...)
})
}

func (o *ObservedLogPoller) LogsDataWordRange(eventSig common.Hash, address common.Address, wordIndex int, wordValueMin, wordValueMax common.Hash, confs int, qopts ...pg.QOpt) ([]Log, error) {
return withObservedQuery(o, "LogsDataWordRange", func() ([]Log, error) {
return withObservedQueryAndResults(o, "LogsDataWordRange", func() ([]Log, error) {
return o.LogPoller.LogsDataWordRange(eventSig, address, wordIndex, wordValueMin, wordValueMax, confs, qopts...)
})
}

func (o *ObservedLogPoller) LogsDataWordGreaterThan(eventSig common.Hash, address common.Address, wordIndex int, wordValueMin common.Hash, confs int, qopts ...pg.QOpt) ([]Log, error) {
return withObservedQuery(o, "LogsDataWordGreaterThan", func() ([]Log, error) {
return withObservedQueryAndResults(o, "LogsDataWordGreaterThan", func() ([]Log, error) {
return o.LogPoller.LogsDataWordGreaterThan(eventSig, address, wordIndex, wordValueMin, confs, qopts...)
})
}

func withObservedQueryAndResults[T any](o *ObservedLogPoller, queryName string, query func() ([]T, error)) ([]T, error) {
results, err := withObservedQuery(o, queryName, query)
if err == nil {
o.datasetSize.
WithLabelValues(o.chainId, queryName).
Set(float64(len(results)))
}
return results, err
}

func withObservedQuery[T any](o *ObservedLogPoller, queryName string, query func() (T, error)) (T, error) {
queryStarted := time.Now()
defer func() {
o.histogram.
o.queryDuration.
WithLabelValues(o.chainId, queryName).
Observe(float64(time.Since(queryStarted)))
}()
Expand Down
52 changes: 39 additions & 13 deletions core/chains/evm/logpoller/observability_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package logpoller

import (
"fmt"
"math/big"
"testing"
"time"
Expand All @@ -22,7 +23,7 @@ import (
func TestMultipleMetricsArePublished(t *testing.T) {
ctx := testutils.Context(t)
lp := createObservedPollLogger(t, 100)
require.Equal(t, 0, testutil.CollectAndCount(lp.histogram))
require.Equal(t, 0, testutil.CollectAndCount(lp.queryDuration))

_, _ = lp.IndexedLogs(common.Hash{}, common.Address{}, 1, []common.Hash{}, 1, pg.WithParentCtx(ctx))
_, _ = lp.IndexedLogsByBlockRange(0, 1, common.Hash{}, common.Address{}, 1, []common.Hash{}, pg.WithParentCtx(ctx))
Expand All @@ -36,54 +37,73 @@ func TestMultipleMetricsArePublished(t *testing.T) {
_, _ = lp.LatestLogEventSigsAddrsWithConfs(0, []common.Hash{{}}, []common.Address{{}}, 1, pg.WithParentCtx(ctx))
_, _ = lp.IndexedLogsCreatedAfter(common.Hash{}, common.Address{}, 0, []common.Hash{}, time.Now(), 0, pg.WithParentCtx(ctx))

require.Equal(t, 11, testutil.CollectAndCount(lp.histogram))
require.Equal(t, 11, testutil.CollectAndCount(lp.queryDuration))
require.Equal(t, 10, testutil.CollectAndCount(lp.datasetSize))
resetMetrics(*lp)
}

func TestShouldPublishMetricInCaseOfError(t *testing.T) {
func TestShouldPublishDurationInCaseOfError(t *testing.T) {
ctx := testutils.Context(t)
lp := createObservedPollLogger(t, 200)
require.Equal(t, 0, testutil.CollectAndCount(lp.histogram))
require.Equal(t, 0, testutil.CollectAndCount(lp.queryDuration))

_, err := lp.LatestLogByEventSigWithConfs(common.Hash{}, common.Address{}, 0, pg.WithParentCtx(ctx))
require.Error(t, err)

require.Equal(t, 1, testutil.CollectAndCount(lp.histogram))
require.Equal(t, 1, counterFromHistogramByLabels(t, lp.histogram, "200", "LatestLogByEventSigWithConfs"))
require.Equal(t, 1, testutil.CollectAndCount(lp.queryDuration))
require.Equal(t, 1, counterFromHistogramByLabels(t, lp.queryDuration, "200", "LatestLogByEventSigWithConfs"))

resetMetrics(*lp)
}

func TestNotObservedFunctions(t *testing.T) {
ctx := testutils.Context(t)
lp := createObservedPollLogger(t, 300)
require.Equal(t, 0, testutil.CollectAndCount(lp.histogram))
require.Equal(t, 0, testutil.CollectAndCount(lp.queryDuration))

_, err := lp.Logs(0, 1, common.Hash{}, common.Address{}, pg.WithParentCtx(ctx))
require.NoError(t, err)

_, err = lp.LogsWithSigs(0, 1, []common.Hash{{}}, common.Address{}, pg.WithParentCtx(ctx))
require.NoError(t, err)

require.Equal(t, 0, testutil.CollectAndCount(lp.histogram))
require.Equal(t, 0, testutil.CollectAndCount(lp.queryDuration))
require.Equal(t, 0, testutil.CollectAndCount(lp.datasetSize))
resetMetrics(*lp)
}

func TestMetricsAreProperlyPopulatedWithLabels(t *testing.T) {
lp := createObservedPollLogger(t, 420)
expectedCount := 9
expectedSize := 2

for i := 0; i < expectedCount; i++ {
_, err := withObservedQuery(lp, "query", func() (string, error) { return "value", nil })
_, err := withObservedQueryAndResults(lp, "query", func() ([]string, error) { return []string{"value1", "value2"}, nil })
require.NoError(t, err)
}

require.Equal(t, expectedCount, counterFromHistogramByLabels(t, lp.histogram, "420", "query"))
require.Equal(t, 0, counterFromHistogramByLabels(t, lp.histogram, "420", "other_query"))
require.Equal(t, 0, counterFromHistogramByLabels(t, lp.histogram, "5", "query"))
require.Equal(t, expectedCount, counterFromHistogramByLabels(t, lp.queryDuration, "420", "query"))
require.Equal(t, expectedSize, counterFromGaugeByLabels(lp.datasetSize, "420", "query"))

require.Equal(t, 0, counterFromHistogramByLabels(t, lp.queryDuration, "420", "other_query"))
require.Equal(t, 0, counterFromHistogramByLabels(t, lp.queryDuration, "5", "query"))

require.Equal(t, 0, counterFromGaugeByLabels(lp.datasetSize, "420", "other_query"))
require.Equal(t, 0, counterFromGaugeByLabels(lp.datasetSize, "5", "query"))

resetMetrics(*lp)
}

func TestNotPublishingDatasetSizeInCaseOfError(t *testing.T) {
lp := createObservedPollLogger(t, 420)

_, err := withObservedQueryAndResults(lp, "errorQuery", func() ([]string, error) { return nil, fmt.Errorf("error") })
require.Error(t, err)

require.Equal(t, 1, counterFromHistogramByLabels(t, lp.queryDuration, "420", "errorQuery"))
require.Equal(t, 0, counterFromGaugeByLabels(lp.datasetSize, "420", "errorQuery"))
}

func createObservedPollLogger(t *testing.T, chainId int64) *ObservedLogPoller {
lggr, _ := logger.TestLoggerObserved(t, zapcore.ErrorLevel)
db := pgtest.NewSqlxDB(t)
Expand All @@ -94,7 +114,13 @@ func createObservedPollLogger(t *testing.T, chainId int64) *ObservedLogPoller {
}

func resetMetrics(lp ObservedLogPoller) {
lp.histogram.Reset()
lp.queryDuration.Reset()
lp.datasetSize.Reset()
}

func counterFromGaugeByLabels(gaugeVec *prometheus.GaugeVec, labels ...string) int {
value := testutil.ToFloat64(gaugeVec.WithLabelValues(labels...))
return int(value)
}

func counterFromHistogramByLabels(t *testing.T, histogramVec *prometheus.HistogramVec, labels ...string) int {
Expand Down

0 comments on commit 016aa43

Please sign in to comment.