Skip to content

Commit

Permalink
Implement topic Unsubscribe()
Browse files Browse the repository at this point in the history
  • Loading branch information
wizeguyy committed Apr 29, 2024
1 parent 0e8ae82 commit 6c0b4e2
Show file tree
Hide file tree
Showing 4 changed files with 54 additions and 17 deletions.
6 changes: 6 additions & 0 deletions p2p/node/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,10 @@ func (p *P2PNode) Subscribe(location common.Location, datatype interface{}) erro
return p.pubsub.Subscribe(location, datatype)
}

func (p *P2PNode) Unsubscribe(location common.Location, datatype interface{}) {
p.pubsub.Unsubscribe(location, datatype)
}

func (p *P2PNode) Broadcast(location common.Location, data interface{}) error {
return p.pubsub.Broadcast(location, data)
}
Expand All @@ -79,6 +83,7 @@ func (p *P2PNode) Stop() error {
p.Host.Close,
p.dht.Close,
p.peerManager.Stop,
p.pubsub.Stop,
}
// create a channel to collect errors
errs := make(chan error, len(stopFuncs))
Expand Down Expand Up @@ -111,6 +116,7 @@ func (p *P2PNode) Stop() error {
allErrors = append(allErrors, err)
}
}

close(errs)
if len(allErrors) > 0 {
return errors.Errorf("errors during shutdown: %v", allErrors)
Expand Down
55 changes: 40 additions & 15 deletions p2p/pubsubManager/gossipsub.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"errors"
"runtime/debug"
"sync"

pubsub "github.com/libp2p/go-libp2p-pubsub"
"github.com/libp2p/go-libp2p/core/host"
Expand All @@ -23,8 +24,8 @@ var (
type PubsubManager struct {
*pubsub.PubSub
ctx context.Context
subscriptions map[string]*pubsub.Subscription
topics map[string]*pubsub.Topic
subscriptions *sync.Map
topics *sync.Map
consensus quai.ConsensusAPI
genesis common.Hash

Expand All @@ -43,8 +44,8 @@ func NewGossipSubManager(ctx context.Context, h host.Host) (*PubsubManager, erro
return &PubsubManager{
ps,
ctx,
make(map[string]*pubsub.Subscription),
make(map[string]*pubsub.Topic),
new(sync.Map),
new(sync.Map),
nil,
utils.MakeGenesis().ToBlock(0).Hash(),
nil,
Expand All @@ -61,15 +62,22 @@ func (g *PubsubManager) Start(receiveCb func(peer.ID, interface{}, common.Locati
g.onReceived = receiveCb
}

func (g *PubsubManager) Stop() error {
g.UnsubscribeAll()
return nil
}

func (g *PubsubManager) UnsubscribeAll() {
for k, sub := range g.subscriptions {
sub.Cancel()
delete(g.subscriptions, k)
}
for k, t := range g.topics {
t.Close()
delete(g.topics, k)
}
g.subscriptions.Range(func(key, value any) bool {
value.(*pubsub.Subscription).Cancel()
g.subscriptions.Delete(key)
return true
})
g.topics.Range(func(key, value any) bool {
value.(*pubsub.Topic).Close()
g.topics.Delete(key)
return true
})
}

// subscribe to broadcasts of the given type of data
Expand All @@ -85,15 +93,15 @@ func (g *PubsubManager) Subscribe(location common.Location, datatype interface{}
if err != nil {
return err
}
g.topics[topicName] = topic
g.topics.Store(topicName, topic)
g.PubSub.RegisterTopicValidator(topic.String(), g.consensus.ValidatorFunc())

// subscribe to the topic
subscription, err := topic.Subscribe()
if err != nil {
return err
}
g.subscriptions[topicName] = subscription
g.subscriptions.Store(topicName, subscription)

go func(location common.Location, sub *pubsub.Subscription) {
defer func() {
Expand Down Expand Up @@ -135,6 +143,20 @@ func (g *PubsubManager) Subscribe(location common.Location, datatype interface{}
return nil
}

// unsubscribe from broadcasts of the given type of data
func (g *PubsubManager) Unsubscribe(location common.Location, datatype interface{}) {
if topicName, err := TopicName(g.genesis, location, datatype); err != nil {
if value, ok := g.subscriptions.Load(topicName); ok {
value.(*pubsub.Subscription).Cancel()
g.subscriptions.Delete(topicName)
}
if value, ok := g.topics.Load(topicName); ok {
value.(*pubsub.Topic).Close()
g.topics.Delete(topicName)
}
}
}

// broadcasts data to subscribing peers
func (g *PubsubManager) Broadcast(location common.Location, datatype interface{}) error {
topicName, err := TopicName(g.genesis, location, datatype)
Expand All @@ -145,5 +167,8 @@ func (g *PubsubManager) Broadcast(location common.Location, datatype interface{}
if err != nil {
return err
}
return g.topics[topicName].Publish(g.ctx, protoData)
if value, ok := g.topics.Load(topicName); ok {
return value.(*pubsub.Topic).Publish(g.ctx, protoData)
}
return errors.New("no topic for requested data")
}
7 changes: 6 additions & 1 deletion p2p/pubsubManager/utils.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
package pubsubManager

import (
"errors"
"strings"

"github.com/dominant-strategies/go-quai/common"
"github.com/dominant-strategies/go-quai/core/types"
pubsub "github.com/libp2p/go-libp2p-pubsub"
"github.com/libp2p/go-libp2p/core/peer"
)

Expand Down Expand Up @@ -41,5 +43,8 @@ func (g *PubsubManager) PeersForTopic(location common.Location, data interface{}
if err != nil {
return nil, err
}
return g.topics[topicName].ListPeers(), nil
if value, ok := g.topics.Load(topicName); ok {
return value.(*pubsub.Topic).ListPeers(), nil
}
return nil, errors.New("no topic for requested data")
}
3 changes: 2 additions & 1 deletion quai/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,9 @@ type NetworkingAPI interface {
// Stop the p2p node
Stop() error

// Specify location and data type to subscribe to
// Subscribe/UnSubscribe to a type of data from a given location
Subscribe(common.Location, interface{}) error
Unsubscribe(common.Location, interface{})

// Method to broadcast data to the network
// Specify location and the data to send
Expand Down

0 comments on commit 6c0b4e2

Please sign in to comment.