Skip to content

Commit

Permalink
metrics: add metrics to fraud package (celestiaorg#1047)
Browse files Browse the repository at this point in the history
  • Loading branch information
vgonkivs authored Oct 6, 2022
1 parent 069b47f commit 02cd7ca
Show file tree
Hide file tree
Showing 8 changed files with 106 additions and 4 deletions.
45 changes: 45 additions & 0 deletions fraud/metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
package fraud

import (
"context"

"github.com/ipfs/go-datastore"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/metric/instrument"
"go.opentelemetry.io/otel/metric/unit"
)

// WithMetrics enables metrics to monitor fraud proofs.
func WithMetrics(store Getter) {
proofTypes := registeredProofTypes()
for _, proofType := range proofTypes {
counter, _ := meter.AsyncInt64().Gauge(string(proofType),
instrument.WithUnit(unit.Dimensionless),
instrument.WithDescription("Stored fraud proof"),
)
err := meter.RegisterCallback(
[]instrument.Asynchronous{
counter,
},
func(ctx context.Context) {
proofs, err := store.Get(ctx, proofType)
switch err {
case nil:
counter.Observe(ctx,
int64(len(proofs)),
attribute.String("proof_type", string(proofType)),
)
case datastore.ErrNotFound:
counter.Observe(ctx, 0, attribute.String("err", "not_found"))
return
default:
counter.Observe(ctx, 0, attribute.String("err", "unknown"))
}
},
)

if err != nil {
panic(err)
}
}
}
8 changes: 8 additions & 0 deletions fraud/proof.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,17 @@ import (
"encoding"
"fmt"

"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/metric/global"

"github.com/celestiaorg/celestia-node/header"
)

var (
meter = global.MeterProvider().Meter("fraud")
tracer = otel.Tracer("fraud")
)

type ErrFraudExists struct {
Proof []Proof
}
Expand Down
4 changes: 2 additions & 2 deletions fraud/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,8 @@ func Register(p Proof) {
}
}

// getRegisteredProofTypes returns all available proofTypes.
func getRegisteredProofTypes() []ProofType {
// registeredProofTypes returns all available proofTypes.
func registeredProofTypes() []ProofType {
unmarshalersLk.Lock()
defer unmarshalersLk.Unlock()
proofs := make([]ProofType, 0, len(defaultUnmarshalers))
Expand Down
28 changes: 27 additions & 1 deletion fraud/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,10 @@ import (
pubsub "github.com/libp2p/go-libp2p-pubsub"

"github.com/celestiaorg/celestia-node/params"

"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/codes"
"go.opentelemetry.io/otel/trace"
)

// fraudRequests is the amount of external requests that will be tried to get fraud proofs from other peers.
Expand Down Expand Up @@ -77,7 +81,7 @@ func (f *ProofService) registerProofTopics(proofTypes ...ProofType) error {
// Start joins fraud proofs topics, sets the stream handler for fraudProtocolID and starts syncing if syncer is enabled.
func (f *ProofService) Start(context.Context) error {
f.ctx, f.cancel = context.WithCancel(context.Background())
if err := f.registerProofTopics(getRegisteredProofTypes()...); err != nil {
if err := f.registerProofTopics(registeredProofTypes()...); err != nil {
return err
}
f.host.SetStreamHandler(fraudProtocolID, f.handleFraudMessageRequest)
Expand Down Expand Up @@ -129,6 +133,11 @@ func (f *ProofService) processIncoming(
from peer.ID,
msg *pubsub.Message,
) pubsub.ValidationResult {
ctx, span := tracer.Start(ctx, "process_proof", trace.WithAttributes(
attribute.String("proof_type", string(proofType)),
))
defer span.End()

// unmarshal message to the Proof.
// Peer will be added to black list if unmarshalling fails.
proof, err := Unmarshal(proofType, msg.Data)
Expand All @@ -137,10 +146,17 @@ func (f *ProofService) processIncoming(
if !errors.Is(err, &errNoUnmarshaler{}) {
f.pubsub.BlacklistPeer(from)
}
span.RecordError(err)
return pubsub.ValidationReject
}
// check the fraud proof locally and ignore if it has been already stored locally.
if f.verifyLocal(ctx, proofType, hex.EncodeToString(proof.HeaderHash()), msg.Data) {
span.AddEvent("received_known_fraud_proof", trace.WithAttributes(
attribute.String("proof_type", string(proof.Type())),
attribute.Int("block_height", int(proof.Height())),
attribute.String("block_hash", hex.EncodeToString(proof.HeaderHash())),
attribute.String("from_peer", from.String()),
))
return pubsub.ValidationIgnore
}

Expand All @@ -160,15 +176,25 @@ func (f *ProofService) processIncoming(
log.Errorw("proof validation err: ",
"err", err, "proofType", proof.Type(), "height", proof.Height())
f.pubsub.BlacklistPeer(from)
span.RecordError(err)
return pubsub.ValidationReject
}

span.AddEvent("received_valid_proof", trace.WithAttributes(
attribute.String("proof_type", string(proof.Type())),
attribute.Int("block_height", int(proof.Height())),
attribute.String("block_hash", hex.EncodeToString(proof.HeaderHash())),
attribute.String("from_peer", from.String()),
))

// add the fraud proof to storage.
err = f.put(ctx, proof.Type(), hex.EncodeToString(proof.HeaderHash()), msg.Data)
if err != nil {
log.Errorw("failed to store fraud proof", "err", err)
span.RecordError(err)
}

span.SetStatus(codes.Ok, "")
return pubsub.ValidationAccept
}

Expand Down
14 changes: 14 additions & 0 deletions fraud/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ import (
"github.com/libp2p/go-libp2p-core/network"
"github.com/libp2p/go-libp2p-core/peer"
pubsub "github.com/libp2p/go-libp2p-pubsub"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/codes"

"github.com/celestiaorg/go-libp2p-messenger/serde"

Expand Down Expand Up @@ -57,14 +59,24 @@ func (f *ProofService) syncFraudProofs(ctx context.Context) {
peerCache[connStatus.Peer] = struct{}{}
// valid peer found, so go send proof requests
go func(pid peer.ID) {
ctx, span := tracer.Start(ctx, "sync_proofs")
defer span.End()

span.SetAttributes(
attribute.String("peer_id", pid.String()),
attribute.StringSlice("proof_types", proofTypes),
)
log.Debugw("requesting proofs from peer", "pid", pid)
respProofs, err := requestProofs(ctx, f.host, pid, proofTypes)
if err != nil {
log.Errorw("error while requesting fraud proofs", "err", err, "peer", pid)
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
return
}
if len(respProofs) == 0 {
log.Debugw("peer did not return any proofs", "pid", pid)
span.SetStatus(codes.Ok, "")
return
}
log.Debugw("got fraud proofs from peer", "pid", connStatus.Peer)
Expand All @@ -87,9 +99,11 @@ func (f *ProofService) syncFraudProofs(ctx context.Context) {
)
if err != nil {
log.Error(err)
span.RecordError(err)
}
}
}
span.SetStatus(codes.Ok, "")
}(connStatus.Peer)
}
}
Expand Down
3 changes: 2 additions & 1 deletion header/p2p/exchange.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,12 @@ import (
"github.com/libp2p/go-libp2p-core/protocol"
tmbytes "github.com/tendermint/tendermint/libs/bytes"

"github.com/celestiaorg/go-libp2p-messenger/serde"

"github.com/celestiaorg/celestia-node/header"
p2p_pb "github.com/celestiaorg/celestia-node/header/p2p/pb"
header_pb "github.com/celestiaorg/celestia-node/header/pb"
"github.com/celestiaorg/celestia-node/params"
"github.com/celestiaorg/go-libp2p-messenger/serde"
)

var log = logging.Logger("header/p2p")
Expand Down
6 changes: 6 additions & 0 deletions nodebuilder/fraud/module.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,21 +4,27 @@ import (
logging "github.com/ipfs/go-log/v2"
"go.uber.org/fx"

"github.com/celestiaorg/celestia-node/fraud"
"github.com/celestiaorg/celestia-node/nodebuilder/node"
)

var log = logging.Logger("fraud-module")

func ConstructModule(tp node.Type) fx.Option {
baseComponent := fx.Provide(func(module Module) fraud.Getter {
return module
})
switch tp {
case node.Light:
return fx.Module(
"fraud",
baseComponent,
fx.Provide(ModuleWithSyncer),
)
case node.Full, node.Bridge:
return fx.Module(
"fraud",
baseComponent,
fx.Provide(NewModule),
)
default:
Expand Down
2 changes: 2 additions & 0 deletions nodebuilder/settings.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
semconv "go.opentelemetry.io/otel/semconv/v1.10.0"
"go.uber.org/fx"

"github.com/celestiaorg/celestia-node/fraud"
"github.com/celestiaorg/celestia-node/header"
"github.com/celestiaorg/celestia-node/nodebuilder/daser"
"github.com/celestiaorg/celestia-node/nodebuilder/node"
Expand All @@ -40,6 +41,7 @@ func WithMetrics(metricOpts []otlpmetrichttp.Option, nodeType node.Type) fx.Opti
fx.Invoke(initializeMetrics),
fx.Invoke(header.WithMetrics),
fx.Invoke(state.WithMetrics),
fx.Invoke(fraud.WithMetrics),
)

var opts fx.Option
Expand Down

0 comments on commit 02cd7ca

Please sign in to comment.