Skip to content

Commit

Permalink
les: remove clientPeerSet and serverSet (ethereum#21566)
Browse files Browse the repository at this point in the history
* les: move NodeStateMachine from clientPool to LesServer

* les: new header broadcaster

* les: peerCommons.headInfo always contains last announced head

* les: remove clientPeerSet and serverSet

* les: fixed panic

* les: fixed --nodiscover option

* les: disconnect all peers at ns.Stop()

* les: added comments and fixed signed broadcasts

* les: removed unused parameter, fixed tests
  • Loading branch information
zsfelfoldi authored Oct 21, 2020
1 parent 3e82c9e commit 85d81b2
Show file tree
Hide file tree
Showing 10 changed files with 239 additions and 332 deletions.
8 changes: 1 addition & 7 deletions les/client_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,13 +102,7 @@ func (h *clientHandler) handle(p *serverPeer) error {
p.Log().Debug("Light Ethereum peer connected", "name", p.Name())

// Execute the LES handshake
var (
head = h.backend.blockchain.CurrentHeader()
hash = head.Hash()
number = head.Number.Uint64()
td = h.backend.blockchain.GetTd(hash, number)
)
if err := p.Handshake(td, hash, number, h.backend.blockchain.Genesis().Hash(), nil); err != nil {
if err := p.Handshake(h.backend.blockchain.Genesis().Hash()); err != nil {
p.Log().Debug("Light Ethereum handshake failed", "err", err)
return err
}
Expand Down
49 changes: 16 additions & 33 deletions les/clientpool.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ package les

import (
"fmt"
"reflect"
"sync"
"time"

Expand Down Expand Up @@ -46,19 +45,6 @@ const (
inactiveTimeout = time.Second * 10
)

var (
clientPoolSetup = &nodestate.Setup{}
clientField = clientPoolSetup.NewField("clientInfo", reflect.TypeOf(&clientInfo{}))
connAddressField = clientPoolSetup.NewField("connAddr", reflect.TypeOf(""))
balanceTrackerSetup = lps.NewBalanceTrackerSetup(clientPoolSetup)
priorityPoolSetup = lps.NewPriorityPoolSetup(clientPoolSetup)
)

func init() {
balanceTrackerSetup.Connect(connAddressField, priorityPoolSetup.CapacityField)
priorityPoolSetup.Connect(balanceTrackerSetup.BalanceField, balanceTrackerSetup.UpdateFlag) // NodeBalance implements nodePriority
}

// clientPool implements a client database that assigns a priority to each client
// based on a positive and negative balance. Positive balance is externally assigned
// to prioritized clients and is decreased with connection time and processed
Expand Down Expand Up @@ -119,8 +105,7 @@ type clientInfo struct {
}

// newClientPool creates a new client pool
func newClientPool(lespayDb ethdb.Database, minCap uint64, connectedBias time.Duration, clock mclock.Clock, removePeer func(enode.ID)) *clientPool {
ns := nodestate.NewNodeStateMachine(nil, nil, clock, clientPoolSetup)
func newClientPool(ns *nodestate.NodeStateMachine, lespayDb ethdb.Database, minCap uint64, connectedBias time.Duration, clock mclock.Clock, removePeer func(enode.ID)) *clientPool {
pool := &clientPool{
ns: ns,
BalanceTrackerSetup: balanceTrackerSetup,
Expand All @@ -147,7 +132,7 @@ func newClientPool(lespayDb ethdb.Database, minCap uint64, connectedBias time.Du
})

ns.SubscribeState(pool.ActiveFlag.Or(pool.PriorityFlag), func(node *enode.Node, oldState, newState nodestate.Flags) {
c, _ := ns.GetField(node, clientField).(*clientInfo)
c, _ := ns.GetField(node, clientInfoField).(*clientInfo)
if c == nil {
return
}
Expand All @@ -172,7 +157,7 @@ func newClientPool(lespayDb ethdb.Database, minCap uint64, connectedBias time.Du
if oldState.Equals(pool.ActiveFlag) && newState.Equals(pool.InactiveFlag) {
clientDeactivatedMeter.Mark(1)
log.Debug("Client deactivated", "id", node.ID())
c, _ := ns.GetField(node, clientField).(*clientInfo)
c, _ := ns.GetField(node, clientInfoField).(*clientInfo)
if c == nil || !c.peer.allowInactive() {
pool.removePeer(node.ID())
}
Expand All @@ -190,13 +175,11 @@ func newClientPool(lespayDb ethdb.Database, minCap uint64, connectedBias time.Du
newCap, _ := newValue.(uint64)
totalConnected += newCap - oldCap
totalConnectedGauge.Update(int64(totalConnected))
c, _ := ns.GetField(node, clientField).(*clientInfo)
c, _ := ns.GetField(node, clientInfoField).(*clientInfo)
if c != nil {
c.peer.updateCapacity(newCap)
}
})

ns.Start()
return pool
}

Expand All @@ -210,7 +193,6 @@ func (f *clientPool) stop() {
f.disconnectNode(node)
})
f.bt.Stop()
f.ns.Stop()
}

// connect should be called after a successful handshake. If the connection was
Expand All @@ -225,7 +207,7 @@ func (f *clientPool) connect(peer clientPoolPeer) (uint64, error) {
}
// Dedup connected peers.
node, freeID := peer.Node(), peer.freeClientId()
if f.ns.GetField(node, clientField) != nil {
if f.ns.GetField(node, clientInfoField) != nil {
log.Debug("Client already connected", "address", freeID, "id", node.ID().String())
return 0, fmt.Errorf("Client already connected address=%s id=%s", freeID, node.ID().String())
}
Expand All @@ -237,7 +219,7 @@ func (f *clientPool) connect(peer clientPoolPeer) (uint64, error) {
connected: true,
connectedAt: now,
}
f.ns.SetField(node, clientField, c)
f.ns.SetField(node, clientInfoField, c)
f.ns.SetField(node, connAddressField, freeID)
if c.balance, _ = f.ns.GetField(node, f.BalanceField).(*lps.NodeBalance); c.balance == nil {
f.disconnect(peer)
Expand Down Expand Up @@ -280,7 +262,7 @@ func (f *clientPool) disconnect(p clientPoolPeer) {
// disconnectNode removes node fields and flags related to connected status
func (f *clientPool) disconnectNode(node *enode.Node) {
f.ns.SetField(node, connAddressField, nil)
f.ns.SetField(node, clientField, nil)
f.ns.SetField(node, clientInfoField, nil)
}

// setDefaultFactors sets the default price factors applied to subsequently connected clients
Expand All @@ -299,7 +281,8 @@ func (f *clientPool) capacityInfo() (uint64, uint64, uint64) {
defer f.lock.Unlock()

// total priority active cap will be supported when the token issuer module is added
return f.capLimit, f.pp.ActiveCapacity(), 0
_, activeCap := f.pp.Active()
return f.capLimit, activeCap, 0
}

// setLimits sets the maximum number and total capacity of connected clients,
Expand All @@ -314,21 +297,21 @@ func (f *clientPool) setLimits(totalConn int, totalCap uint64) {

// setCapacity sets the assigned capacity of a connected client
func (f *clientPool) setCapacity(node *enode.Node, freeID string, capacity uint64, bias time.Duration, setCap bool) (uint64, error) {
c, _ := f.ns.GetField(node, clientField).(*clientInfo)
c, _ := f.ns.GetField(node, clientInfoField).(*clientInfo)
if c == nil {
if setCap {
return 0, fmt.Errorf("client %064x is not connected", node.ID())
}
c = &clientInfo{node: node}
f.ns.SetField(node, clientField, c)
f.ns.SetField(node, clientInfoField, c)
f.ns.SetField(node, connAddressField, freeID)
if c.balance, _ = f.ns.GetField(node, f.BalanceField).(*lps.NodeBalance); c.balance == nil {
log.Error("BalanceField is missing", "node", node.ID())
return 0, fmt.Errorf("BalanceField of %064x is missing", node.ID())
}
defer func() {
f.ns.SetField(node, connAddressField, nil)
f.ns.SetField(node, clientField, nil)
f.ns.SetField(node, clientInfoField, nil)
}()
}
var (
Expand Down Expand Up @@ -370,7 +353,7 @@ func (f *clientPool) forClients(ids []enode.ID, cb func(client *clientInfo)) {

if len(ids) == 0 {
f.ns.ForEach(nodestate.Flags{}, nodestate.Flags{}, func(node *enode.Node, state nodestate.Flags) {
c, _ := f.ns.GetField(node, clientField).(*clientInfo)
c, _ := f.ns.GetField(node, clientInfoField).(*clientInfo)
if c != nil {
cb(c)
}
Expand All @@ -381,20 +364,20 @@ func (f *clientPool) forClients(ids []enode.ID, cb func(client *clientInfo)) {
if node == nil {
node = enode.SignNull(&enr.Record{}, id)
}
c, _ := f.ns.GetField(node, clientField).(*clientInfo)
c, _ := f.ns.GetField(node, clientInfoField).(*clientInfo)
if c != nil {
cb(c)
} else {
c = &clientInfo{node: node}
f.ns.SetField(node, clientField, c)
f.ns.SetField(node, clientInfoField, c)
f.ns.SetField(node, connAddressField, "")
if c.balance, _ = f.ns.GetField(node, f.BalanceField).(*lps.NodeBalance); c.balance != nil {
cb(c)
} else {
log.Error("BalanceField is missing")
}
f.ns.SetField(node, connAddressField, nil)
f.ns.SetField(node, clientField, nil)
f.ns.SetField(node, clientInfoField, nil)
}
}
}
Expand Down
43 changes: 30 additions & 13 deletions les/clientpool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,11 @@ type poolTestPeer struct {
inactiveAllowed bool
}

func testStateMachine() *nodestate.NodeStateMachine {
return nodestate.NewNodeStateMachine(nil, nil, mclock.System{}, serverSetup)

}

func newPoolTestPeer(i int, disconnCh chan int) *poolTestPeer {
return &poolTestPeer{
index: i,
Expand Down Expand Up @@ -91,7 +96,7 @@ func (i *poolTestPeer) allowInactive() bool {
}

func getBalance(pool *clientPool, p *poolTestPeer) (pos, neg uint64) {
temp := pool.ns.GetField(p.node, clientField) == nil
temp := pool.ns.GetField(p.node, clientInfoField) == nil
if temp {
pool.ns.SetField(p.node, connAddressField, p.freeClientId())
}
Expand Down Expand Up @@ -128,8 +133,9 @@ func testClientPool(t *testing.T, activeLimit, clientCount, paidCount int, rando
disconnFn = func(id enode.ID) {
disconnCh <- int(id[0]) + int(id[1])<<8
}
pool = newClientPool(db, 1, 0, &clock, disconnFn)
pool = newClientPool(testStateMachine(), db, 1, 0, &clock, disconnFn)
)
pool.ns.Start()

pool.setLimits(activeLimit, uint64(activeLimit))
pool.setDefaultFactors(lps.PriceFactors{TimeFactor: 1, CapacityFactor: 0, RequestFactor: 1}, lps.PriceFactors{TimeFactor: 1, CapacityFactor: 0, RequestFactor: 1})
Expand Down Expand Up @@ -233,7 +239,8 @@ func TestConnectPaidClient(t *testing.T) {
clock mclock.Simulated
db = rawdb.NewMemoryDatabase()
)
pool := newClientPool(db, 1, defaultConnectedBias, &clock, func(id enode.ID) {})
pool := newClientPool(testStateMachine(), db, 1, defaultConnectedBias, &clock, func(id enode.ID) {})
pool.ns.Start()
defer pool.stop()
pool.setLimits(10, uint64(10))
pool.setDefaultFactors(lps.PriceFactors{TimeFactor: 1, CapacityFactor: 0, RequestFactor: 1}, lps.PriceFactors{TimeFactor: 1, CapacityFactor: 0, RequestFactor: 1})
Expand All @@ -248,7 +255,8 @@ func TestConnectPaidClientToSmallPool(t *testing.T) {
clock mclock.Simulated
db = rawdb.NewMemoryDatabase()
)
pool := newClientPool(db, 1, defaultConnectedBias, &clock, func(id enode.ID) {})
pool := newClientPool(testStateMachine(), db, 1, defaultConnectedBias, &clock, func(id enode.ID) {})
pool.ns.Start()
defer pool.stop()
pool.setLimits(10, uint64(10)) // Total capacity limit is 10
pool.setDefaultFactors(lps.PriceFactors{TimeFactor: 1, CapacityFactor: 0, RequestFactor: 1}, lps.PriceFactors{TimeFactor: 1, CapacityFactor: 0, RequestFactor: 1})
Expand All @@ -266,7 +274,8 @@ func TestConnectPaidClientToFullPool(t *testing.T) {
db = rawdb.NewMemoryDatabase()
)
removeFn := func(enode.ID) {} // Noop
pool := newClientPool(db, 1, defaultConnectedBias, &clock, removeFn)
pool := newClientPool(testStateMachine(), db, 1, defaultConnectedBias, &clock, removeFn)
pool.ns.Start()
defer pool.stop()
pool.setLimits(10, uint64(10)) // Total capacity limit is 10
pool.setDefaultFactors(lps.PriceFactors{TimeFactor: 1, CapacityFactor: 0, RequestFactor: 1}, lps.PriceFactors{TimeFactor: 1, CapacityFactor: 0, RequestFactor: 1})
Expand Down Expand Up @@ -295,7 +304,8 @@ func TestPaidClientKickedOut(t *testing.T) {
removeFn := func(id enode.ID) {
kickedCh <- int(id[0])
}
pool := newClientPool(db, 1, defaultConnectedBias, &clock, removeFn)
pool := newClientPool(testStateMachine(), db, 1, defaultConnectedBias, &clock, removeFn)
pool.ns.Start()
pool.bt.SetExpirationTCs(0, 0)
defer pool.stop()
pool.setLimits(10, uint64(10)) // Total capacity limit is 10
Expand Down Expand Up @@ -325,7 +335,8 @@ func TestConnectFreeClient(t *testing.T) {
clock mclock.Simulated
db = rawdb.NewMemoryDatabase()
)
pool := newClientPool(db, 1, defaultConnectedBias, &clock, func(id enode.ID) {})
pool := newClientPool(testStateMachine(), db, 1, defaultConnectedBias, &clock, func(id enode.ID) {})
pool.ns.Start()
defer pool.stop()
pool.setLimits(10, uint64(10))
pool.setDefaultFactors(lps.PriceFactors{TimeFactor: 1, CapacityFactor: 0, RequestFactor: 1}, lps.PriceFactors{TimeFactor: 1, CapacityFactor: 0, RequestFactor: 1})
Expand All @@ -341,7 +352,8 @@ func TestConnectFreeClientToFullPool(t *testing.T) {
db = rawdb.NewMemoryDatabase()
)
removeFn := func(enode.ID) {} // Noop
pool := newClientPool(db, 1, defaultConnectedBias, &clock, removeFn)
pool := newClientPool(testStateMachine(), db, 1, defaultConnectedBias, &clock, removeFn)
pool.ns.Start()
defer pool.stop()
pool.setLimits(10, uint64(10)) // Total capacity limit is 10
pool.setDefaultFactors(lps.PriceFactors{TimeFactor: 1, CapacityFactor: 0, RequestFactor: 1}, lps.PriceFactors{TimeFactor: 1, CapacityFactor: 0, RequestFactor: 1})
Expand Down Expand Up @@ -370,7 +382,8 @@ func TestFreeClientKickedOut(t *testing.T) {
kicked = make(chan int, 100)
)
removeFn := func(id enode.ID) { kicked <- int(id[0]) }
pool := newClientPool(db, 1, defaultConnectedBias, &clock, removeFn)
pool := newClientPool(testStateMachine(), db, 1, defaultConnectedBias, &clock, removeFn)
pool.ns.Start()
defer pool.stop()
pool.setLimits(10, uint64(10)) // Total capacity limit is 10
pool.setDefaultFactors(lps.PriceFactors{TimeFactor: 1, CapacityFactor: 0, RequestFactor: 1}, lps.PriceFactors{TimeFactor: 1, CapacityFactor: 0, RequestFactor: 1})
Expand Down Expand Up @@ -411,7 +424,8 @@ func TestPositiveBalanceCalculation(t *testing.T) {
kicked = make(chan int, 10)
)
removeFn := func(id enode.ID) { kicked <- int(id[0]) } // Noop
pool := newClientPool(db, 1, defaultConnectedBias, &clock, removeFn)
pool := newClientPool(testStateMachine(), db, 1, defaultConnectedBias, &clock, removeFn)
pool.ns.Start()
defer pool.stop()
pool.setLimits(10, uint64(10)) // Total capacity limit is 10
pool.setDefaultFactors(lps.PriceFactors{TimeFactor: 1, CapacityFactor: 0, RequestFactor: 1}, lps.PriceFactors{TimeFactor: 1, CapacityFactor: 0, RequestFactor: 1})
Expand All @@ -434,7 +448,8 @@ func TestDowngradePriorityClient(t *testing.T) {
kicked = make(chan int, 10)
)
removeFn := func(id enode.ID) { kicked <- int(id[0]) } // Noop
pool := newClientPool(db, 1, defaultConnectedBias, &clock, removeFn)
pool := newClientPool(testStateMachine(), db, 1, defaultConnectedBias, &clock, removeFn)
pool.ns.Start()
defer pool.stop()
pool.setLimits(10, uint64(10)) // Total capacity limit is 10
pool.setDefaultFactors(lps.PriceFactors{TimeFactor: 1, CapacityFactor: 0, RequestFactor: 1}, lps.PriceFactors{TimeFactor: 1, CapacityFactor: 0, RequestFactor: 1})
Expand Down Expand Up @@ -468,7 +483,8 @@ func TestNegativeBalanceCalculation(t *testing.T) {
clock mclock.Simulated
db = rawdb.NewMemoryDatabase()
)
pool := newClientPool(db, 1, defaultConnectedBias, &clock, func(id enode.ID) {})
pool := newClientPool(testStateMachine(), db, 1, defaultConnectedBias, &clock, func(id enode.ID) {})
pool.ns.Start()
defer pool.stop()
pool.setLimits(10, uint64(10)) // Total capacity limit is 10
pool.setDefaultFactors(lps.PriceFactors{TimeFactor: 1e-3, CapacityFactor: 0, RequestFactor: 1}, lps.PriceFactors{TimeFactor: 1e-3, CapacityFactor: 0, RequestFactor: 1})
Expand Down Expand Up @@ -503,7 +519,8 @@ func TestInactiveClient(t *testing.T) {
clock mclock.Simulated
db = rawdb.NewMemoryDatabase()
)
pool := newClientPool(db, 1, defaultConnectedBias, &clock, func(id enode.ID) {})
pool := newClientPool(testStateMachine(), db, 1, defaultConnectedBias, &clock, func(id enode.ID) {})
pool.ns.Start()
defer pool.stop()
pool.setLimits(2, uint64(2))

Expand Down
2 changes: 1 addition & 1 deletion les/enr_entry.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ func (e lesEntry) ENRKey() string {

// setupDiscovery creates the node discovery source for the eth protocol.
func (eth *LightEthereum) setupDiscovery(cfg *p2p.Config) (enode.Iterator, error) {
if /*cfg.NoDiscovery || */ len(eth.config.DiscoveryURLs) == 0 {
if cfg.NoDiscovery || len(eth.config.DiscoveryURLs) == 0 {
return nil, nil
}
client := dnsdisc.NewClient(dnsdisc.Config{})
Expand Down
6 changes: 3 additions & 3 deletions les/lespay/server/prioritypool.go
Original file line number Diff line number Diff line change
Expand Up @@ -253,12 +253,12 @@ func (pp *PriorityPool) SetActiveBias(bias time.Duration) {
pp.tryActivate()
}

// ActiveCapacity returns the total capacity of currently active nodes
func (pp *PriorityPool) ActiveCapacity() uint64 {
// Active returns the number and total capacity of currently active nodes
func (pp *PriorityPool) Active() (uint64, uint64) {
pp.lock.Lock()
defer pp.lock.Unlock()

return pp.activeCap
return pp.activeCount, pp.activeCap
}

// inactiveSetIndex callback updates ppNodeInfo item index in inactiveQueue
Expand Down
Loading

0 comments on commit 85d81b2

Please sign in to comment.