Skip to content

Commit

Permalink
p2p/discv5: fixed bootnode connect issues
Browse files Browse the repository at this point in the history
  • Loading branch information
zsfelfoldi committed Nov 14, 2016
1 parent a0c6649 commit e33e576
Show file tree
Hide file tree
Showing 5 changed files with 110 additions and 69 deletions.
5 changes: 3 additions & 2 deletions les/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -240,6 +240,7 @@ func (pm *ProtocolManager) findServers() {
if pm.p2pServer == nil || pm.topicDisc == nil {
return
}
glog.V(logger.Debug).Infoln("Looking for topic", string(pm.lesTopic))
enodes := make(chan string, 100)
stop := make(chan struct{})
go pm.topicDisc.SearchTopic(pm.lesTopic, stop, enodes)
Expand Down Expand Up @@ -280,9 +281,9 @@ func (pm *ProtocolManager) Start(srvr *p2p.Server) {
} else {
if pm.topicDisc != nil {
go func() {
glog.V(logger.Debug).Infoln("Starting topic register")
glog.V(logger.Debug).Infoln("Starting registering topic", string(pm.lesTopic))
pm.topicDisc.RegisterTopic(pm.lesTopic, pm.quitSync)
glog.V(logger.Debug).Infoln("Stopped topic register")
glog.V(logger.Debug).Infoln("Stopped registering topic", string(pm.lesTopic))
}()
}
go func() {
Expand Down
90 changes: 53 additions & 37 deletions p2p/discv5/net.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,10 @@ var (
)

const (
autoRefreshInterval = 1 * time.Hour
seedCount = 30
seedMaxAge = 5 * 24 * time.Hour
autoRefreshInterval = 1 * time.Hour
bucketRefreshInterval = 1 * time.Minute
seedCount = 30
seedMaxAge = 5 * 24 * time.Hour
)

const testTopic = "foo"
Expand Down Expand Up @@ -82,7 +83,6 @@ type Network struct {
tableOpResp chan struct{}
topicRegisterReq chan topicRegisterReq
topicSearchReq chan topicSearchReq
bucketFillChn chan chan struct{}

// State of the main loop.
tab *Table
Expand Down Expand Up @@ -169,7 +169,6 @@ func newNetwork(conn transport, ourPubkey ecdsa.PublicKey, natm nat.Interface, d
queryReq: make(chan *findnodeQuery),
topicRegisterReq: make(chan topicRegisterReq),
topicSearchReq: make(chan topicSearchReq),
bucketFillChn: make(chan chan struct{}, 1),
nodes: make(map[NodeID]*Node),
}
go net.loop()
Expand Down Expand Up @@ -353,8 +352,9 @@ func (net *Network) reqTableOp(f func()) (called bool) {

func (net *Network) loop() {
var (
refreshTimer = time.NewTicker(autoRefreshInterval)
refreshDone chan struct{} // closed when the 'refresh' lookup has ended
refreshTimer = time.NewTicker(autoRefreshInterval)
bucketRefreshTimer = time.NewTimer(bucketRefreshInterval)
refreshDone chan struct{} // closed when the 'refresh' lookup has ended
)

// Tracking the next ticket to register.
Expand Down Expand Up @@ -389,6 +389,7 @@ func (net *Network) loop() {
topicRegisterLookupDone chan []*Node
topicRegisterLookupTick = time.NewTimer(0)
topicSearchLookupTarget lookupInfo
searchReqWhenRefreshDone []topicSearchReq
)
topicSearchLookupDone := make(chan []*Node, 1)
<-topicRegisterLookupTick.C
Expand All @@ -406,6 +407,7 @@ loop:

// Ingress packet handling.
case pkt := <-net.read:
//fmt.Println("read", pkt.ev)
debugLog("<-net.read")
n := net.internNode(&pkt)
prestate := n.state
Expand Down Expand Up @@ -503,14 +505,18 @@ loop:
net.conn.sendTopicRegister(nextTicket.t.node, nextTicket.t.topics, nextTicket.idx, nextTicket.t.pong)

case req := <-net.topicSearchReq:
debugLog("<-net.topicSearchReq")
if req.found == nil {
net.ticketStore.removeSearchTopic(req.topic)
continue
}
net.ticketStore.addSearchTopic(req.topic, req.found)
if (topicSearchLookupTarget.target == common.Hash{}) {
topicSearchLookupDone <- nil
if refreshDone == nil {
debugLog("<-net.topicSearchReq")
if req.found == nil {
net.ticketStore.removeSearchTopic(req.topic)
continue
}
net.ticketStore.addSearchTopic(req.topic, req.found)
if (topicSearchLookupTarget.target == common.Hash{}) {
topicSearchLookupDone <- nil
}
} else {
searchReqWhenRefreshDone = append(searchReqWhenRefreshDone, req)
}

case nodes := <-topicSearchLookupDone:
Expand All @@ -519,7 +525,14 @@ loop:
net.ping(n, n.addr())
return n.pingEcho
}, func(n *Node, topic Topic) []byte {
return net.conn.send(n, topicQueryPacket, topicQuery{Topic: topic}) // TODO: set expiration
if n.state == known {
return net.conn.send(n, topicQueryPacket, topicQuery{Topic: topic}) // TODO: set expiration
} else {
if n.state == unknown {
net.ping(n, n.addr())
}
return nil
}
})
topicSearchLookupTarget = net.ticketStore.nextSearchLookup()
target := topicSearchLookupTarget.target
Expand Down Expand Up @@ -564,9 +577,12 @@ loop:
refreshDone = make(chan struct{})
net.refresh(refreshDone)
}
case doneChn := <-net.bucketFillChn:
debugLog("bucketFill")
net.bucketFill(doneChn)
case <-bucketRefreshTimer.C:
target := net.tab.chooseBucketRefreshTarget()
go func() {
net.lookup(target, false)
bucketRefreshTimer.Reset(bucketRefreshInterval)
}()
case newNursery := <-net.refreshReq:
debugLog("<-net.refreshReq")
if newNursery != nil {
Expand All @@ -580,6 +596,13 @@ loop:
case <-refreshDone:
debugLog("<-net.refreshDone")
refreshDone = nil
list := searchReqWhenRefreshDone
searchReqWhenRefreshDone = nil
go func() {
for _, req := range list {
net.topicSearchReq <- req
}
}()
}
}
debugLog("loop stopped")
Expand Down Expand Up @@ -643,28 +666,13 @@ func (net *Network) refresh(done chan<- struct{}) {
}()
}

func (net *Network) bucketFill(done chan<- struct{}) {
target := net.tab.chooseBucketFillTarget()
go func() {
net.lookup(target, false)
close(done)
}()
}

func (net *Network) BucketFill() {
done := make(chan struct{})
select {
case net.bucketFillChn <- done:
<-done
case <-net.closed:
close(done)
}
}

// Node Interning.

func (net *Network) internNode(pkt *ingressPacket) *Node {
if n := net.nodes[pkt.remoteID]; n != nil {
n.IP = pkt.remoteAddr.IP
n.UDP = uint16(pkt.remoteAddr.Port)
n.TCP = uint16(pkt.remoteAddr.Port)
return n
}
n := NewNode(pkt.remoteID, pkt.remoteAddr.IP, uint16(pkt.remoteAddr.Port), uint16(pkt.remoteAddr.Port))
Expand Down Expand Up @@ -967,8 +975,10 @@ func init() {

// handle processes packets sent by n and events related to n.
func (net *Network) handle(n *Node, ev nodeEvent, pkt *ingressPacket) error {
//fmt.Println("handle", n.addr().String(), n.state, ev)
if pkt != nil {
if err := net.checkPacket(n, ev, pkt); err != nil {
//fmt.Println("check err:", err)
return err
}
// Start the background expiration goroutine after the first
Expand All @@ -985,6 +995,7 @@ func (net *Network) handle(n *Node, ev nodeEvent, pkt *ingressPacket) error {
}
next, err := n.state.handle(net, n, ev, pkt)
net.transition(n, next)
//fmt.Println("new state:", n.state)
return err
}

Expand Down Expand Up @@ -1040,6 +1051,11 @@ func (net *Network) abortTimedEvent(n *Node, ev nodeEvent) {
}

func (net *Network) ping(n *Node, addr *net.UDPAddr) {
//fmt.Println("ping", n.addr().String(), n.ID.String(), n.sha.Hex())
if n.pingEcho != nil || n.ID == net.tab.self.ID {
//fmt.Println(" not sent")
return
}
debugLog(fmt.Sprintf("ping(node = %x)", n.ID[:8]))
n.pingTopics = net.ticketStore.regTopicSet()
n.pingEcho = net.conn.sendPing(n, addr, n.pingTopics)
Expand Down
76 changes: 47 additions & 29 deletions p2p/discv5/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ package discv5
import (
"crypto/rand"
"encoding/binary"
"fmt"
"net"
"sort"

Expand Down Expand Up @@ -64,42 +65,54 @@ func newTable(ourID NodeID, ourAddr *net.UDPAddr) *Table {
return tab
}

func (tab *Table) chooseBucketFillTarget() common.Hash {
bucketCount := nBuckets
for bucketCount > 0 && len(tab.buckets[nBuckets-bucketCount].entries) == 0 {
bucketCount--
const printTable = false

// chooseBucketRefreshTarget selects random refresh targets to keep all Kademlia
// buckets filled with live connections and keep the network topology healthy.
// This requires selecting addresses closer to our own with a higher probability
// in order to refresh closer buckets too.
//
// This algorithm approximates the distance distribution of existing nodes in the
// table by selecting a random node from the table and selecting a target address
// with a distance less than twice of that of the selected node.
// This algorithm will be improved later to specifically target the least recently
// used buckets.
func (tab *Table) chooseBucketRefreshTarget() common.Hash {
entries := 0
if printTable {
fmt.Println()
}
var bucket int
for {
// select a target hash that could go into a certain randomly selected bucket
// buckets are chosen with an even chance out of the existing ones that contain
// less that bucketSize entries, plus a potential new one beyond these
bucket = nBuckets - 1 - int(randUint(uint32(bucketCount+1)))
if bucket == bucketCount || len(tab.buckets[bucket].entries) < bucketSize {
break
for i, b := range tab.buckets {
entries += len(b.entries)
if printTable {
for _, e := range b.entries {
fmt.Println(i, e.state, e.addr().String(), e.ID.String(), e.sha.Hex())
}
}
}

// calculate target that has the desired log distance from our own address hash
target := tab.self.sha.Bytes()
prefix := binary.BigEndian.Uint64(target[0:8])
shift := uint(nBuckets - 1 - bucket)
if bucket != bucketCount {
shift++
prefix := binary.BigEndian.Uint64(tab.self.sha[0:8])
dist := ^uint64(0)
entry := int(randUint(uint32(entries + 1)))
for _, b := range tab.buckets {
if entry < len(b.entries) {
n := b.entries[entry]
dist = binary.BigEndian.Uint64(n.sha[0:8]) ^ prefix
break
}
entry -= len(b.entries)
}
var b [8]byte
rand.Read(b[:])
rnd := binary.BigEndian.Uint64(b[:])
rndMask := (^uint64(0)) >> shift
addrMask := ^rndMask
xorMask := uint64(0)
if bucket != bucketCount {
xorMask = rndMask + 1

ddist := ^uint64(0)
if dist+dist > dist {
ddist = dist
}
prefix = (prefix&addrMask ^ xorMask) | (rnd & rndMask)
binary.BigEndian.PutUint64(target[0:8], prefix)
targetPrefix := prefix ^ randUint64n(ddist)

var target common.Hash
binary.BigEndian.PutUint64(target[0:8], targetPrefix)
rand.Read(target[8:])
return common.BytesToHash(target)
return target
}

// readRandomNodes fills the given slice with random nodes from the
Expand Down Expand Up @@ -175,6 +188,10 @@ func (tab *Table) closest(target common.Hash, nresults int) *nodesByDistance {
// bucket has space available, adding the node succeeds immediately.
// Otherwise, the node is added to the replacement cache for the bucket.
func (tab *Table) add(n *Node) (contested *Node) {
//fmt.Println("add", n.addr().String(), n.ID.String(), n.sha.Hex())
if n.ID == tab.self.ID {
return
}
b := tab.buckets[logdist(tab.self.sha, n.sha)]
switch {
case b.bump(n):
Expand Down Expand Up @@ -228,6 +245,7 @@ outer:
// delete removes an entry from the node table (used to evacuate
// failed/non-bonded discovery peers).
func (tab *Table) delete(node *Node) {
//fmt.Println("delete", node.addr().String(), node.ID.String(), node.sha.Hex())
bucket := tab.buckets[logdist(tab.self.sha, node.sha)]
for i := range bucket.entries {
if bucket.entries[i].ID == node.ID {
Expand Down
4 changes: 3 additions & 1 deletion p2p/discv5/ticket.go
Original file line number Diff line number Diff line change
Expand Up @@ -525,7 +525,9 @@ func (s *ticketStore) searchLookupDone(lookup lookupInfo, nodes []*Node, ping fu
} // else {
if s.canQueryTopic(n, lookup.topic) {
hash := query(n, lookup.topic)
s.addTopicQuery(common.BytesToHash(hash), n, lookup)
if hash != nil {
s.addTopicQuery(common.BytesToHash(hash), n, lookup)
}
}
//}
}
Expand Down
4 changes: 4 additions & 0 deletions p2p/discv5/udp.go
Original file line number Diff line number Diff line change
Expand Up @@ -336,14 +336,17 @@ func (t *udp) sendTopicNodes(remote *Node, queryHash common.Hash, nodes []*Node)
}

func (t *udp) sendPacket(toid NodeID, toaddr *net.UDPAddr, ptype byte, req interface{}) (hash []byte, err error) {
//fmt.Println("sendPacket", nodeEvent(ptype), toaddr.String(), toid.String())
packet, hash, err := encodePacket(t.priv, ptype, req)
if err != nil {
//fmt.Println(err)
return hash, err
}
glog.V(logger.Detail).Infof(">>> %v to %x@%v\n", nodeEvent(ptype), toid[:8], toaddr)
if _, err = t.conn.WriteToUDP(packet, toaddr); err != nil {
glog.V(logger.Detail).Infoln("UDP send failed:", err)
}
//fmt.Println(err)
return hash, err
}

Expand Down Expand Up @@ -406,6 +409,7 @@ func (t *udp) handlePacket(from *net.UDPAddr, buf []byte) error {
pkt := ingressPacket{remoteAddr: from}
if err := decodePacket(buf, &pkt); err != nil {
glog.V(logger.Debug).Infof("Bad packet from %v: %v\n", from, err)
//fmt.Println("bad packet", err)
return err
}
t.net.reqReadPacket(pkt)
Expand Down

0 comments on commit e33e576

Please sign in to comment.