From 6716a7af4e9f9c93d16dff55bfb9ac31d4992728 Mon Sep 17 00:00:00 2001 From: Alejo Acosta Date: Thu, 12 Oct 2023 18:05:47 -0300 Subject: [PATCH] add foundational files to support gossipsub --- go.mod | 2 ++ go.sum | 4 +++ p2p/node/node.go | 11 +++++++ p2p/pubsub/gossipsub/cache.go | 38 +++++++++++++++++++++ p2p/pubsub/gossipsub/handlers.go | 21 ++++++++++++ p2p/pubsub/gossipsub/validators.go | 23 +++++++++++++ p2p/pubsub/pubsub.go | 53 ++++++++++++++++++++++++++++++ 7 files changed, 152 insertions(+) create mode 100644 p2p/pubsub/gossipsub/cache.go create mode 100644 p2p/pubsub/gossipsub/handlers.go create mode 100644 p2p/pubsub/gossipsub/validators.go create mode 100644 p2p/pubsub/pubsub.go diff --git a/go.mod b/go.mod index d7ed375794..850a0eb900 100644 --- a/go.mod +++ b/go.mod @@ -31,6 +31,7 @@ require ( github.com/hashicorp/errwrap v1.1.0 // indirect github.com/hashicorp/go-multierror v1.1.1 // indirect github.com/hashicorp/golang-lru v0.5.4 // indirect + github.com/hashicorp/golang-lru/v2 v2.0.5 // indirect github.com/hashicorp/hcl v1.0.0 // indirect github.com/huin/goupnp v1.2.0 // indirect github.com/ipfs/boxo v0.10.0 // indirect @@ -127,6 +128,7 @@ require ( github.com/inconshreveable/mousetrap v1.1.0 // indirect github.com/libp2p/go-libp2p v0.31.0 github.com/libp2p/go-libp2p-kad-dht v0.25.1 + github.com/libp2p/go-libp2p-pubsub v0.9.3 github.com/pkg/errors v0.9.1 github.com/spf13/cobra v1.7.0 // indirect github.com/spf13/pflag v1.0.5 // indirect diff --git a/go.sum b/go.sum index 03c153a716..89e23dfa91 100644 --- a/go.sum +++ b/go.sum @@ -232,6 +232,8 @@ github.com/hashicorp/golang-lru v0.5.0/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ github.com/hashicorp/golang-lru v0.5.1/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8= github.com/hashicorp/golang-lru v0.5.4 h1:YDjusn29QI/Das2iO9M0BHnIbxPeyuCHsjMW+lJfyTc= github.com/hashicorp/golang-lru v0.5.4/go.mod h1:iADmTwqILo4mZ8BN3D2Q6+9jd8WM5uGBxy+E8yxSoD4= +github.com/hashicorp/golang-lru/v2 v2.0.5 h1:wW7h1TG88eUIJ2i69gaE3uNVtEPIagzhGvHgwfx2Vm4= +github.com/hashicorp/golang-lru/v2 v2.0.5/go.mod h1:QeFd9opnmA6QUJc5vARoKUSoFhyfM2/ZepoAG6RGpeM= github.com/hashicorp/hcl v1.0.0 h1:0Anlzjpi4vEasTeNFn2mLJgTSwt0+6sfsiTG8qcWGx4= github.com/hashicorp/hcl v1.0.0/go.mod h1:E5yfLk+7swimpb2L/Alb/PJmXilQ/rhwaUYs4T20WEQ= github.com/huin/goupnp v1.2.0 h1:uOKW26NG1hsSSbXIZ1IR7XP9Gjd1U8pnLaCMgntmkmY= @@ -304,6 +306,8 @@ github.com/libp2p/go-libp2p-kad-dht v0.25.1 h1:ofFNrf6MMEy4vi3R1VbJ7LOcTn3Csh0cD github.com/libp2p/go-libp2p-kad-dht v0.25.1/go.mod h1:6za56ncRHYXX4Nc2vn8z7CZK0P4QiMcrn77acKLM2Oo= github.com/libp2p/go-libp2p-kbucket v0.6.3 h1:p507271wWzpy2f1XxPzCQG9NiN6R6lHL9GiSErbQQo0= github.com/libp2p/go-libp2p-kbucket v0.6.3/go.mod h1:RCseT7AH6eJWxxk2ol03xtP9pEHetYSPXOaJnOiD8i0= +github.com/libp2p/go-libp2p-pubsub v0.9.3 h1:ihcz9oIBMaCK9kcx+yHWm3mLAFBMAUsM4ux42aikDxo= +github.com/libp2p/go-libp2p-pubsub v0.9.3/go.mod h1:RYA7aM9jIic5VV47WXu4GkcRxRhrdElWf8xtyli+Dzc= github.com/libp2p/go-libp2p-record v0.2.0 h1:oiNUOCWno2BFuxt3my4i1frNrt7PerzB3queqa1NkQ0= github.com/libp2p/go-libp2p-record v0.2.0/go.mod h1:I+3zMkvvg5m2OcSdoL0KPljyJyvNDFGKX7QdlpYUcwk= github.com/libp2p/go-libp2p-routing-helpers v0.7.2 h1:xJMFyhQ3Iuqnk9Q2dYE1eUTzsah7NLw3Qs2zjUV78T0= diff --git a/p2p/node/node.go b/p2p/node/node.go index 79b46fbe3d..5aabe2f06d 100644 --- a/p2p/node/node.go +++ b/p2p/node/node.go @@ -6,6 +6,7 @@ import ( "github.com/dominant-strategies/go-quai/log" "github.com/dominant-strategies/go-quai/p2p/discovery" + "github.com/dominant-strategies/go-quai/p2p/pubsub" "github.com/libp2p/go-libp2p" dht "github.com/libp2p/go-libp2p-kad-dht" @@ -23,6 +24,7 @@ type P2PNode struct { host.Host dht discovery.DHT ctx context.Context + ps *pubsub.PubSubManager } // returns a new libp2p node setup with the given IP address, address and private key @@ -67,6 +69,15 @@ func NewNode(ctx context.Context, ipaddr string, port string, privKeyFile string Host: node, ctx: ctx, } + + // Initialize the PubSub manager with default options + psMgr, err := pubsub.NewPubSubManager(ctx, node, nil) + if err != nil { + log.Errorf("error initializing PubSub manager: %s", err) + return nil, err + } + p2pNode.ps = psMgr + return p2pNode, nil } diff --git a/p2p/pubsub/gossipsub/cache.go b/p2p/pubsub/gossipsub/cache.go new file mode 100644 index 0000000000..d928a03353 --- /dev/null +++ b/p2p/pubsub/gossipsub/cache.go @@ -0,0 +1,38 @@ +package gossipsub + +import ( + "time" + + lru "github.com/hashicorp/golang-lru" +) + +const cacheSize = 1000 + +// Gossipsub message cache. +// Mostly used for duplicate messages and handling of recently seen messages. +var msgCache *lru.Cache + +func init() { + msgCache, _ = lru.NewWithEvict(cacheSize, onEvicted) +} + +func AddMessageToCache(msgID string, msgData interface{}) { + msgCache.Add(msgID, msgData) +} + +func GetMessageFromCache(msgID string) (interface{}, bool) { + return msgCache.Get(msgID) +} + +func onEvicted(key interface{}, value interface{}) { + // TODO: Handle any logic needed when a message is evicted from cache +} + +func CleanupCache(tickDuration time.Duration) { + ticker := time.NewTicker(tickDuration) + defer ticker.Stop() + + for range ticker.C { + // TODO: Add cleanup logic + } +} diff --git a/p2p/pubsub/gossipsub/handlers.go b/p2p/pubsub/gossipsub/handlers.go new file mode 100644 index 0000000000..327960a6b5 --- /dev/null +++ b/p2p/pubsub/gossipsub/handlers.go @@ -0,0 +1,21 @@ +package gossipsub + +import ( + gossip "github.com/libp2p/go-libp2p-pubsub" + "github.com/libp2p/go-libp2p/core/peer" +) + +// Handle incoming Gossipsub message +func HandleIncomingMessage(msg *gossip.Message) { + // TODO: implement +} + +// Handle logic when a peer joins the network +func HandlePeerJoin(peerID peer.ID) { + // TODO: implement +} + +// Handle logic when a peer leaves the network +func HandlePeerLeave(peerID peer.ID) { + // TODO: implement +} diff --git a/p2p/pubsub/gossipsub/validators.go b/p2p/pubsub/gossipsub/validators.go new file mode 100644 index 0000000000..9ef524a7e3 --- /dev/null +++ b/p2p/pubsub/gossipsub/validators.go @@ -0,0 +1,23 @@ +package gossipsub + +import ( + "context" + + gossip "github.com/libp2p/go-libp2p-pubsub" + "github.com/libp2p/go-libp2p/core/peer" +) + +// Register a validator for a topic. +// Validators help in ensuring that incoming Gossipsub messages meet certain criteria +// before they are processed or propagated. +func RegisterValidators(ps *gossip.PubSub) { + // TODO: implement + ps.RegisterTopicValidator("example-topic", exampleTopicValidator) +} + +func exampleTopicValidator(ctx context.Context, peerID peer.ID, msg *gossip.Message) bool { + // TODO: implement + // Validate the message for the "example-topic" + // Return true if valid, false otherwise + return true +} diff --git a/p2p/pubsub/pubsub.go b/p2p/pubsub/pubsub.go new file mode 100644 index 0000000000..93fa7d51a1 --- /dev/null +++ b/p2p/pubsub/pubsub.go @@ -0,0 +1,53 @@ +package pubsub + +import ( + "context" + + "github.com/dominant-strategies/go-quai/log" + gossip "github.com/libp2p/go-libp2p-pubsub" + "github.com/libp2p/go-libp2p/core/host" + "github.com/libp2p/go-libp2p/core/peer" +) + +// Base instance for the pub-sub service. +// It manages the Gossipsub instance and provides utility methods +type PubSubManager struct { + ps *gossip.PubSub +} + +func NewPubSubManager(ctx context.Context, h host.Host, opts []gossip.Option) (*PubSubManager, error) { + ps, err := gossip.NewGossipSub(ctx, h, opts...) + if err != nil { + return nil, err + } + return &PubSubManager{ps: ps}, nil +} + +// Join a topic +func (manager *PubSubManager) Join(topic string) (*gossip.Topic, error) { + return manager.ps.Join(topic) +} + +// Join a topic and subscribe to it +func (manager *PubSubManager) Subscribe(topic string) (*gossip.Subscription, error) { + topicHandle, err := manager.ps.Join(topic) + if err != nil { + log.Errorf("error joining topic: %s", err) + return nil, err + } + return topicHandle.Subscribe() +} + +// Publish a message to a topic +func (manager *PubSubManager) Publish(topic string, data []byte) error { + topicHandle, err := manager.ps.Join(topic) + if err != nil { + log.Errorf("error joining topic: %s", err) + return err + } + return topicHandle.Publish(context.Background(), data) +} + +func (manager *PubSubManager) ListPeers(topic string) []peer.ID { + return manager.ps.ListPeers(topic) +}