Skip to content

Commit

Permalink
fix(metrics): Unregister callback on stop for several metrics impleme…
Browse files Browse the repository at this point in the history
…ntations (celestiaorg#3281)

Self explanatory
  • Loading branch information
renaynay authored Apr 21, 2024
1 parent e8165e6 commit 8bc46ce
Show file tree
Hide file tree
Showing 15 changed files with 131 additions and 23 deletions.
5 changes: 5 additions & 0 deletions das/daser.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,11 @@ func (d *DASer) Stop(ctx context.Context) error {
}

d.cancel()

if err := d.sampler.metrics.close(); err != nil {
log.Warnw("closing metrics", "err", err)
}

if err = d.sampler.wait(ctx); err != nil {
return fmt.Errorf("DASer force quit: %w", err)
}
Expand Down
11 changes: 10 additions & 1 deletion das/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ type metrics struct {
newHead metric.Int64Counter

lastSampledTS uint64

clientReg metric.Registration
}

func (d *DASer) InitMetrics() error {
Expand Down Expand Up @@ -119,7 +121,7 @@ func (d *DASer) InitMetrics() error {
return nil
}

_, err = meter.RegisterCallback(callback,
d.sampler.metrics.clientReg, err = meter.RegisterCallback(callback,
lastSampledTS,
busyWorkers,
networkHead,
Expand All @@ -133,6 +135,13 @@ func (d *DASer) InitMetrics() error {
return nil
}

func (m *metrics) close() error {
if m == nil {
return nil
}
return m.clientReg.Unregister()
}

// observeSample records the time it took to sample a header +
// the amount of sampled contiguous headers
func (m *metrics) observeSample(
Expand Down
21 changes: 18 additions & 3 deletions nodebuilder/node/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,15 @@ import (
"context"
"time"

logging "github.com/ipfs/go-log/v2"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/metric"
"go.uber.org/fx"
)

var log = logging.Logger("module/node")

var meter = otel.Meter("node")

var (
Expand All @@ -17,7 +21,7 @@ var (
)

// WithMetrics registers node metrics.
func WithMetrics() error {
func WithMetrics(lc fx.Lifecycle) error {
nodeStartTS, err := meter.Int64ObservableGauge(
"node_start_ts",
metric.WithDescription("timestamp when the node was started"),
Expand Down Expand Up @@ -66,7 +70,18 @@ func WithMetrics() error {
return nil
}

_, err = meter.RegisterCallback(callback, nodeStartTS, totalNodeRunTime, buildInfoGauge)
clientReg, err := meter.RegisterCallback(callback, nodeStartTS, totalNodeRunTime, buildInfoGauge)
if err != nil {
return nil
}

return err
lc.Append(
fx.Hook{OnStop: func(context.Context) error {
if err := clientReg.Unregister(); err != nil {
log.Warn("failed to close metrics", "err", err)
}
return nil
}},
)
return nil
}
4 changes: 2 additions & 2 deletions nodebuilder/settings.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,11 +86,11 @@ func WithMetrics(metricOpts []otlpmetrichttp.Option, nodeType node.Type) fx.Opti
baseComponents := fx.Options(
fx.Supply(metricOpts),
fx.Invoke(initializeMetrics),
fx.Invoke(func(ca *state.CoreAccessor) {
fx.Invoke(func(lc fx.Lifecycle, ca *state.CoreAccessor) {
if ca == nil {
return
}
state.WithMetrics(ca)
state.WithMetrics(lc, ca)
}),
fx.Invoke(fraud.WithMetrics[*header.ExtendedHeader]),
fx.Invoke(node.WithMetrics),
Expand Down
7 changes: 5 additions & 2 deletions share/eds/cache/accessor_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -222,10 +222,13 @@ func (bc *AccessorCache) Remove(key shard.Key) error {
}

// EnableMetrics enables metrics for the cache.
func (bc *AccessorCache) EnableMetrics() error {
func (bc *AccessorCache) EnableMetrics() (CloseMetricsFn, error) {
var err error
bc.metrics, err = newMetrics(bc)
return err
if err != nil {
return nil, err
}
return bc.metrics.close, err
}

// refCloser manages references to accessor from provided reader and removes the ref, when the
Expand Down
4 changes: 3 additions & 1 deletion share/eds/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ var (
errCacheMiss = errors.New("accessor not found in blockstore cache")
)

type CloseMetricsFn func() error

// Cache is an interface that defines the basic Cache operations.
type Cache interface {
// Get retrieves an item from the Cache.
Expand All @@ -37,7 +39,7 @@ type Cache interface {
Remove(shard.Key) error

// EnableMetrics enables metrics in Cache
EnableMetrics() error
EnableMetrics() (CloseMetricsFn, error)
}

// Accessor is a interface type returned by cache, that allows to read raw data by reader or create
Expand Down
19 changes: 15 additions & 4 deletions share/eds/cache/doublecache.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,20 @@ func (mc *DoubleCache) Second() Cache {
return mc.second
}

func (mc *DoubleCache) EnableMetrics() error {
if err := mc.first.EnableMetrics(); err != nil {
return err
func (mc *DoubleCache) EnableMetrics() (CloseMetricsFn, error) {
firstCloser, err := mc.first.EnableMetrics()
if err != nil {
return nil, err
}
return mc.second.EnableMetrics()
secondCloser, err := mc.second.EnableMetrics()
if err != nil {
return nil, err
}

return func() error {
if err := errors.Join(firstCloser(), secondCloser()); err != nil {
log.Warnw("failed to close metrics", "err", err)
}
return nil
}, nil
}
17 changes: 15 additions & 2 deletions share/eds/cache/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ const (
type metrics struct {
getCounter metric.Int64Counter
evictedCounter metric.Int64Counter

clientReg metric.Registration
}

func newMetrics(bc *AccessorCache) (*metrics, error) {
Expand Down Expand Up @@ -43,12 +45,23 @@ func newMetrics(bc *AccessorCache) (*metrics, error) {
observer.ObserveInt64(cacheSize, int64(bc.cache.Len()))
return nil
}
_, err = meter.RegisterCallback(callback, cacheSize)
clientReg, err := meter.RegisterCallback(callback, cacheSize)
if err != nil {
return nil, err
}

return &metrics{
getCounter: getCounter,
evictedCounter: evictedCounter,
}, err
clientReg: clientReg,
}, nil
}

func (m *metrics) close() error {
if m == nil {
return nil
}
return m.clientReg.Unregister()
}

func (m *metrics) observeEvicted(failed bool) {
Expand Down
4 changes: 2 additions & 2 deletions share/eds/cache/noop.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,8 @@ func (n NoopCache) Remove(shard.Key) error {
return nil
}

func (n NoopCache) EnableMetrics() error {
return nil
func (n NoopCache) EnableMetrics() (CloseMetricsFn, error) {
return func() error { return nil }, nil
}

var _ Accessor = (*NoopAccessor)(nil)
Expand Down
20 changes: 18 additions & 2 deletions share/eds/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package eds

import (
"context"
"errors"
"time"

"go.opentelemetry.io/otel"
Expand Down Expand Up @@ -49,6 +50,9 @@ type metrics struct {

longOpTime metric.Float64Histogram
gcTime metric.Float64Histogram

clientReg metric.Registration
closerFn func() error
}

func (s *Store) WithMetrics() error {
Expand Down Expand Up @@ -124,7 +128,8 @@ func (s *Store) WithMetrics() error {
return err
}

if err = s.cache.Load().EnableMetrics(); err != nil {
closerFn, err := s.cache.Load().EnableMetrics()
if err != nil {
return err
}

Expand All @@ -139,7 +144,8 @@ func (s *Store) WithMetrics() error {
return nil
}

if _, err := meter.RegisterCallback(callback, dagStoreShards); err != nil {
clientReg, err := meter.RegisterCallback(callback, dagStoreShards)
if err != nil {
return err
}

Expand All @@ -155,10 +161,20 @@ func (s *Store) WithMetrics() error {
shardFailureCount: shardFailureCount,
longOpTime: longOpTime,
gcTime: gcTime,
clientReg: clientReg,
closerFn: closerFn,
}
return nil
}

func (m *metrics) close() error {
if m == nil {
return nil
}

return errors.Join(m.closerFn(), m.clientReg.Unregister())
}

func (m *metrics) observeGCtime(ctx context.Context, dur time.Duration, failed bool) {
if m == nil {
return
Expand Down
5 changes: 5 additions & 0 deletions share/eds/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,11 @@ func (s *Store) Start(ctx context.Context) error {
// Stop stops the underlying DAGStore.
func (s *Store) Stop(context.Context) error {
defer s.cancel()

if err := s.metrics.close(); err != nil {
log.Warnw("failed to close metrics", "err", err)
}

if err := s.invertedIdx.close(); err != nil {
return err
}
Expand Down
7 changes: 6 additions & 1 deletion share/p2p/discovery/discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,12 @@ func (d *Discovery) Start(context.Context) error {

func (d *Discovery) Stop(context.Context) error {
d.cancel()
return d.metrics.close()

if err := d.metrics.close(); err != nil {
log.Warnw("failed to close metrics", "err", err)
}

return nil
}

// Peers provides a list of discovered peers in the given topic.
Expand Down
4 changes: 4 additions & 0 deletions share/p2p/peers/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,10 @@ func (m *Manager) Start(startCtx context.Context) error {
func (m *Manager) Stop(ctx context.Context) error {
m.cancel()

if err := m.metrics.close(); err != nil {
log.Warnw("closing metrics", "err", err)
}

// we do not need to wait for headersub and disconnected peers to finish
// here, since they were never started
if m.headerSub == nil && m.shrexSub == nil {
Expand Down
11 changes: 10 additions & 1 deletion share/p2p/peers/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,8 @@ type metrics struct {
fullNodesPool metric.Int64ObservableGauge // attributes: pool_status
blacklistedPeersByReason sync.Map
blacklistedPeers metric.Int64ObservableGauge // attributes: blacklist_reason

clientReg metric.Registration
}

func initMetrics(manager *Manager) (*metrics, error) {
Expand Down Expand Up @@ -154,13 +156,20 @@ func initMetrics(manager *Manager) (*metrics, error) {
})
return nil
}
_, err = meter.RegisterCallback(callback, shrexPools, fullNodesPool, blacklisted)
metrics.clientReg, err = meter.RegisterCallback(callback, shrexPools, fullNodesPool, blacklisted)
if err != nil {
return nil, fmt.Errorf("registering metrics callback: %w", err)
}
return metrics, nil
}

func (m *metrics) close() error {
if m == nil {
return nil
}
return m.clientReg.Unregister()
}

func (m *metrics) observeGetPeer(
ctx context.Context,
source peerSource, poolSize int, waitTime time.Duration,
Expand Down
15 changes: 13 additions & 2 deletions state/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,12 @@ import (

"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/metric"
"go.uber.org/fx"
)

var meter = otel.Meter("state")

func WithMetrics(ca *CoreAccessor) {
func WithMetrics(lc fx.Lifecycle, ca *CoreAccessor) {
pfbCounter, _ := meter.Int64ObservableCounter(
"pfb_count",
metric.WithDescription("Total count of submitted PayForBlob transactions"),
Expand All @@ -24,8 +25,18 @@ func WithMetrics(ca *CoreAccessor) {
observer.ObserveInt64(lastPfbTimestamp, ca.LastPayForBlob())
return nil
}
_, err := meter.RegisterCallback(callback, pfbCounter, lastPfbTimestamp)

clientReg, err := meter.RegisterCallback(callback, pfbCounter, lastPfbTimestamp)
if err != nil {
panic(err)
}

lc.Append(fx.Hook{
OnStop: func(context.Context) error {
if err := clientReg.Unregister(); err != nil {
log.Warnw("failed to close metrics", "err", err)
}
return nil
},
})
}

0 comments on commit 8bc46ce

Please sign in to comment.