Skip to content

Commit

Permalink
update track,discover,sdkmgt (#121)
Browse files Browse the repository at this point in the history
  • Loading branch information
AstaFrode authored Nov 28, 2023
1 parent 8b5b98e commit 00ccff9
Show file tree
Hide file tree
Showing 7 changed files with 124 additions and 219 deletions.
5 changes: 2 additions & 3 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,9 @@ go 1.20

require (
github.com/AstaFrode/go-libp2p v0.26.4-0.20231113143058-912296254d44
github.com/CESSProject/cess-go-sdk v0.3.21-0.20231114061951-fc77d8141ff0
github.com/CESSProject/cess-go-sdk v0.3.21-0.20231124070841-63bee15bbc25
github.com/CESSProject/go-keyring v0.0.0-20220614131247-ee3a8da30fde
github.com/CESSProject/p2p-go v0.2.6-0.20231122035207-4ef352825668
github.com/CESSProject/p2p-go v0.2.6-0.20231127085809-61bf3338888b
github.com/btcsuite/btcutil v1.0.3-0.20201208143702-a53e38424cce
github.com/cbergoon/merkletree v0.2.0
github.com/centrifuge/go-substrate-rpc-client/v4 v4.1.0
Expand All @@ -30,7 +30,6 @@ require (
github.com/vedhavyas/go-subkey/v2 v2.0.0
go.uber.org/zap v1.25.0
golang.org/x/crypto v0.14.0
golang.org/x/time v0.1.0
)

require (
Expand Down
10 changes: 4 additions & 6 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -58,12 +58,12 @@ github.com/AstaFrode/go-peertaskqueue v0.8.2-0.20231108073729-990e433425a4/go.mo
github.com/BurntSushi/toml v0.3.1 h1:WXkYYl6Yr3qBf1K79EBnL4mak0OimBfB0XUf9Vl28OQ=
github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
github.com/BurntSushi/xgb v0.0.0-20160522181843-27f122750802/go.mod h1:IVnqGOEym/WlBOVXweHU+Q+/VP0lqqI8lqeDx9IjBqo=
github.com/CESSProject/cess-go-sdk v0.3.21-0.20231114061951-fc77d8141ff0 h1:WelCunG4c8Ohmb4/ZiEzmJ9LCiU6cNSDMPxZb35ioGU=
github.com/CESSProject/cess-go-sdk v0.3.21-0.20231114061951-fc77d8141ff0/go.mod h1:KWlHxDKfinyfEI3w7BqMqgo+oi7grsUVayDw/2NYSks=
github.com/CESSProject/cess-go-sdk v0.3.21-0.20231124070841-63bee15bbc25 h1:vsGDRpU39knFBI3FcZe2HxEi4YoUiftHU2/JX9O0TcA=
github.com/CESSProject/cess-go-sdk v0.3.21-0.20231124070841-63bee15bbc25/go.mod h1:KWlHxDKfinyfEI3w7BqMqgo+oi7grsUVayDw/2NYSks=
github.com/CESSProject/go-keyring v0.0.0-20220614131247-ee3a8da30fde h1:5MDRjjtg6PEhqyVjupwaapN96cOZiddOGAYwKQeaTu0=
github.com/CESSProject/go-keyring v0.0.0-20220614131247-ee3a8da30fde/go.mod h1:RUXBd3ROP98MYepEEa0Y0l/T0vQlIKqFJxI/ocdnRLM=
github.com/CESSProject/p2p-go v0.2.6-0.20231122035207-4ef352825668 h1:Enqu6i1fEnm8kORGlYopu94klrbPmMr70DpRYaLnslE=
github.com/CESSProject/p2p-go v0.2.6-0.20231122035207-4ef352825668/go.mod h1:SMwJt5Zpk98k+d2J5gsN+0Forr7MbqOYWbHLd3mBLrI=
github.com/CESSProject/p2p-go v0.2.6-0.20231127085809-61bf3338888b h1:mgOe7+QChOVMt360EUIkmSJq2/WcnwJ7G+pG/+bGM5M=
github.com/CESSProject/p2p-go v0.2.6-0.20231127085809-61bf3338888b/go.mod h1:SMwJt5Zpk98k+d2J5gsN+0Forr7MbqOYWbHLd3mBLrI=
github.com/ChainSafe/go-schnorrkel v1.0.0 h1:3aDA67lAykLaG1y3AOjs88dMxC88PgUuHRrLeDnvGIM=
github.com/ChainSafe/go-schnorrkel v1.0.0/go.mod h1:dpzHYVxLZcp8pjlV+O+UR8K0Hp/z7vcchBSbMBEhCw4=
github.com/StackExchange/wmi v0.0.0-20180116203802-5d049714c4a6 h1:fLjPD/aNc3UIOA6tDi6QXUemppXK3P9BI7mr2hd6gx8=
Expand Down Expand Up @@ -940,8 +940,6 @@ golang.org/x/time v0.0.0-20180412165947-fbb02b2291d2/go.mod h1:tRJNPiyCQ0inRvYxb
golang.org/x/time v0.0.0-20181108054448-85acf8d2951c/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
golang.org/x/time v0.0.0-20190308202827-9d24e82272b4/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
golang.org/x/time v0.0.0-20191024005414-555d28b269f0/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
golang.org/x/time v0.1.0 h1:xYY+Bajn2a7VBmTM5GikTmnK8ZuX8YgnQCqZpbBNtmA=
golang.org/x/time v0.1.0/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
golang.org/x/tools v0.0.0-20180828015842-6cd1fcedba52/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.0.0-20181030000716-a0a13e073c7b/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
Expand Down
212 changes: 25 additions & 187 deletions node/discoverMgt.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,77 +8,26 @@
package node

import (
"fmt"
"time"

"github.com/AstaFrode/go-libp2p/core/peer"
peerstore "github.com/AstaFrode/go-libp2p/core/peerstore"
"github.com/CESSProject/DeOSS/pkg/utils"
"github.com/multiformats/go-multiaddr"
"golang.org/x/time/rate"
)

func (n *Node) findPeers(ch chan<- bool) {
defer func() {
ch <- true
if err := recover(); err != nil {
n.Pnc(utils.RecoverError(err))
}
ch <- true
}()

n.Discover("info", ">>>>> start findPeers <<<<<")

var ok bool
var err error
var foundPeer peer.AddrInfo
var interval time.Duration = 30
var findInterval time.Duration = 1
var tick = time.NewTicker(time.Second * findInterval)
defer tick.Stop()

for {
select {
case <-tick.C:
findInterval += interval
if findInterval > 3600 {
findInterval = interval
err = n.SavePeersToDisk(n.peersPath)
if err != nil {
n.Discover("err", err.Error())
}
}
tick.Reset(time.Second * findInterval)
peerChan, err := n.GetRoutingTable().FindPeers(n.GetCtxQueryFromCtxCancel(), n.GetRendezvousVersion())
if err != nil {
continue
}
ok = true
for ok {
select {
case foundPeer, ok = <-peerChan:
if !ok {
break
}
if foundPeer.ID == n.ID() {
continue
}
err := n.Connect(n.GetCtxQueryFromCtxCancel(), foundPeer)
if err != nil {
// n.Peerstore().RemovePeer(foundPeer.ID)
n.GetDht().RoutingTable().RemovePeer(foundPeer.ID)
} else {
// for _, addr := range foundPeer.Addrs {
// n.Peerstore().AddAddr(foundPeer.ID, addr, peerstore.AddressTTL)
// }
n.GetDht().RoutingTable().TryAddPeer(foundPeer.ID, true, true)
n.SavePeer(foundPeer.ID.Pretty(), peer.AddrInfo{
ID: foundPeer.ID,
Addrs: foundPeer.Addrs,
})
}
}
}
err := n.findpeer()
if err != nil {
n.Discover("err", err.Error())
}
time.Sleep(time.Second)
}
}

Expand All @@ -98,7 +47,6 @@ func (n *Node) recvPeers(ch chan<- bool) {
for _, v := range foundPeer.Responses {
if v != nil {
if len(v.Addrs) > 0 {
// n.Peerstore().AddAddrs(v.ID, v.Addrs, peerstore.AddressTTL)
n.GetDht().RoutingTable().TryAddPeer(foundPeer.ID, true, true)
n.SavePeer(v.ID.Pretty(), peer.AddrInfo{
ID: v.ID,
Expand All @@ -111,139 +59,29 @@ func (n *Node) recvPeers(ch chan<- bool) {
}
}

func (n *Node) discoverMgt2(ch chan<- bool) {
defer func() {
ch <- true
if err := recover(); err != nil {
n.Pnc(utils.RecoverError(err))
}
}()

n.Discover("info", ">>>>> start discoverMgt <<<<<")

go func() {
for {
select {
case foundPeer := <-n.GetDiscoveredPeers():
for _, v := range foundPeer.Responses {
//fmt.Println("***************************Fount id:", v.ID.String(), "addr: ", v.Addrs)
n.Peerstore().AddAddrs(v.ID, v.Addrs, peerstore.AddressTTL)
}
}
}
}()

tickDiscover := time.NewTicker(time.Second * 10)
defer tickDiscover.Stop()
for {
select {
case <-tickDiscover.C:
//peerChan := kademliaDHT.FindProvidersAsync(ctx, cid.Cid{}, 1)
peerChan, err := n.RouteTableFindPeers(0)
if err != nil {
fmt.Println("xxx FindPeers err: ", err)
continue
}
var ok = true
var aPeer peer.AddrInfo
for ok {
select {
case aPeer, ok = <-peerChan:
if !ok {
break
}
if aPeer.ID.Pretty() == n.GetDht().Host().ID().Pretty() {
continue
}
err := n.Connect(n.GetCtxQueryFromCtxCancel(), aPeer)
if err != nil {
fmt.Println("xxx Failed connecting to ", aPeer.ID.Pretty(), ", error:", err)
} else {
for _, addr := range aPeer.Addrs {
n.Peerstore().AddAddr(aPeer.ID, addr, peerstore.PermanentAddrTTL)
}
fmt.Println("+++ Connected to:", aPeer.ID.Pretty())
}
}
}
}
}
}

func (n *Node) discoverMgt(ch chan<- bool) {
defer func() {
ch <- true
if err := recover(); err != nil {
n.Pnc(utils.RecoverError(err))
}
}()

n.Discover("info", ">>>>> start discoverMgt <<<<<")
tickDiscover := time.NewTicker(time.Minute * 10)
defer tickDiscover.Stop()

var r1 = rate.Every(time.Second * 5)
var limit = rate.NewLimiter(r1, 1)

var r2 = rate.Every(time.Minute * 30)
var printLimit = rate.NewLimiter(r2, 1)

err := n.LoadPeersFromDisk(n.peersPath)
func (n *Node) findpeer() error {
peerChan, err := n.GetRoutingTable().FindPeers(
n.GetCtxQueryFromCtxCancel(),
n.GetRendezvousVersion(),
)
if err != nil {
n.Discover("err", err.Error())
} else {
n.RemovePeerIntranetAddr()
return err
}

n.RouteTableFindPeers(0)

for {
select {
case discoveredPeer, _ := <-n.GetDiscoveredPeers():
if limit.Allow() {
n.Discover("info", "reset")
tickDiscover.Reset(time.Minute * 10)
}
if len(discoveredPeer.Responses) == 0 {
break
}
for _, v := range discoveredPeer.Responses {
var addrInfo peer.AddrInfo
var addrs []multiaddr.Multiaddr
for _, addr := range v.Addrs {
if !utils.InterfaceIsNIL(addr) {
if ipv4, ok := utils.FildIpv4([]byte(addr.String())); ok {
if ok, err := utils.IsIntranetIpv4(ipv4); err == nil {
if !ok {
addrs = append(addrs, addr)
}
}
}
}
}
if len(addrs) > 0 {
addrInfo.ID = v.ID
addrInfo.Addrs = utils.RemoveRepeatedAddr(addrs)
n.SavePeer(v.ID.Pretty(), addrInfo)
}
}
case <-tickDiscover.C:
if printLimit.Allow() {
allpeer := n.GetAllPeerId()
for _, v := range allpeer {
n.Discover("info", fmt.Sprintf("found %s", v))
}
n.RemovePeerIntranetAddr()
err = n.SavePeersToDisk(n.peersPath)
if err != nil {
n.Discover("err", err.Error())
}
}
n.Discover("info", "RouteTableFindPeers")
_, err := n.RouteTableFindPeers(len(n.peers) + 20)
if err != nil {
n.Discover("err", err.Error())
}
for onePeer := range peerChan {
if onePeer.ID == n.ID() {
continue
}
err := n.Connect(n.GetCtxQueryFromCtxCancel(), onePeer)
if err != nil {
n.GetDht().RoutingTable().RemovePeer(onePeer.ID)
} else {
n.GetDht().RoutingTable().TryAddPeer(onePeer.ID, true, true)
n.SavePeer(onePeer.ID.Pretty(), peer.AddrInfo{
ID: onePeer.ID,
Addrs: onePeer.Addrs,
})
}
}
return nil
}
35 changes: 35 additions & 0 deletions node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,9 @@ type Node struct {
trackLock *sync.RWMutex
lock *sync.RWMutex
blacklistLock *sync.RWMutex
storagePeersLock *sync.RWMutex
peers map[string]peer.AddrInfo
storagePeers map[string]struct{}
blacklist map[string]int64
trackDir string
fadebackDir string
Expand All @@ -69,8 +71,10 @@ func New() *Node {
trackLock: new(sync.RWMutex),
lock: new(sync.RWMutex),
blacklistLock: new(sync.RWMutex),
storagePeersLock: new(sync.RWMutex),
processingFiles: make([]string, 0),
peers: make(map[string]peer.AddrInfo, 0),
storagePeers: make(map[string]struct{}, 0),
blacklist: make(map[string]int64, 0),
}
}
Expand Down Expand Up @@ -120,6 +124,37 @@ func (n *Node) HasPeer(peerid string) bool {
return ok
}

func (n *Node) SaveStoragePeer(peerid string) {
n.storagePeersLock.Lock()
n.storagePeers[peerid] = struct{}{}
n.storagePeersLock.Unlock()
}

func (n *Node) DeleteStoragePeer(peerid string) {
n.storagePeersLock.Lock()
delete(n.storagePeers, peerid)
n.storagePeersLock.Unlock()
}

func (n *Node) HasStoragePeer(peerid string) bool {
n.storagePeersLock.RLock()
defer n.storagePeersLock.RUnlock()
_, ok := n.storagePeers[peerid]
return ok
}

func (n *Node) GetAllStoragePeerId() []string {
n.storagePeersLock.RLock()
defer n.storagePeersLock.RUnlock()
var result = make([]string, len(n.storagePeers))
var i int
for k, _ := range n.storagePeers {
result[i] = k
i++
}
return result
}

func (n *Node) GetPeer(peerid string) (peer.AddrInfo, bool) {
n.lock.RLock()
result, ok := n.peers[peerid]
Expand Down
Loading

0 comments on commit 00ccff9

Please sign in to comment.