Skip to content

Commit

Permalink
Prepare to allow max conns per torrent to be configured
Browse files Browse the repository at this point in the history
  • Loading branch information
anacrolix committed Jul 5, 2016
1 parent 12191db commit 326b365
Show file tree
Hide file tree
Showing 5 changed files with 83 additions and 89 deletions.
72 changes: 7 additions & 65 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -264,7 +264,7 @@ func NewClient(cfg *Config) (cl *Client, err error) {
}
}()
cl = &Client{
halfOpenLimit: socketsPerTorrent,
halfOpenLimit: defaultHalfOpenConnsPerTorrent,
config: *cfg,
defaultStorage: cfg.DefaultStorage,
dopplegangerAddrs: make(map[string]struct{}),
Expand Down Expand Up @@ -382,7 +382,7 @@ func (cl *Client) waitAccept() {
defer cl.mu.Unlock()
for {
for _, t := range cl.torrents {
if cl.wantConns(t) {
if t.wantConns() {
return
}
}
Expand Down Expand Up @@ -936,7 +936,7 @@ func (cl *Client) runHandshookConn(c *connection, t *Torrent) {
c.rw,
}
completedHandshakeConnectionFlags.Add(c.connectionFlags(), 1)
if !cl.addConnection(t, c) {
if !t.addConnection(c) {
return
}
defer t.dropConnection(c)
Expand Down Expand Up @@ -1337,67 +1337,10 @@ func (cl *Client) connectionLoop(t *Torrent, c *connection) error {
}
}

// Returns true if the connection is added.
func (cl *Client) addConnection(t *Torrent, c *connection) bool {
if cl.closed.IsSet() {
return false
}
if !cl.wantConns(t) {
return false
}
for _, c0 := range t.conns {
if c.PeerID == c0.PeerID {
// Already connected to a client with that ID.
duplicateClientConns.Add(1)
return false
}
}
if len(t.conns) >= socketsPerTorrent {
c := t.worstBadConn(cl)
if c == nil {
return false
}
if cl.config.Debug && missinggo.CryHeard() {
log.Printf("%s: dropping connection to make room for new one:\n %s", t, c)
}
c.Close()
t.deleteConnection(c)
}
if len(t.conns) >= socketsPerTorrent {
panic(len(t.conns))
}
t.conns = append(t.conns, c)
c.t = t
return true
}

func (cl *Client) usefulConn(t *Torrent, c *connection) bool {
if c.closed.IsSet() {
return false
}
if !t.haveInfo() {
return c.supportsExtension("ut_metadata")
}
if t.seeding() {
return c.PeerInterested
}
return t.connHasWantedPieces(c)
}

func (cl *Client) wantConns(t *Torrent) bool {
if !t.seeding() && !t.needData() {
return false
}
if len(t.conns) < socketsPerTorrent {
return true
}
return t.worstBadConn(cl) != nil
}

func (cl *Client) openNewConns(t *Torrent) {
defer t.updateWantPeersEvent()
for len(t.peers) != 0 {
if !cl.wantConns(t) {
if !t.wantConns() {
return
}
if len(t.halfOpen) >= cl.halfOpenLimit {
Expand Down Expand Up @@ -1431,9 +1374,7 @@ func (cl *Client) badPeerIPPort(ip net.IP, port int) bool {
return false
}

// Prepare a Torrent without any attachment to a Client. That means we can
// initialize fields all fields that don't require the Client without locking
// it.
// Return a Torrent ready for insertion into a Client.
func (cl *Client) newTorrent(ih metainfo.Hash) (t *Torrent) {
t = &Torrent{
cl: cl,
Expand All @@ -1444,7 +1385,8 @@ func (cl *Client) newTorrent(ih metainfo.Hash) (t *Torrent) {
halfOpen: make(map[string]struct{}),
pieceStateChanges: pubsub.NewPubSub(),

storageOpener: cl.defaultStorage,
storageOpener: cl.defaultStorage,
maxEstablishedConns: defaultEstablishedConnsPerTorrent,
}
return
}
Expand Down
16 changes: 16 additions & 0 deletions connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -653,3 +653,19 @@ func (cn *connection) wroteBytes(b []byte) {
cn.stats.wroteBytes(b)
cn.t.stats.wroteBytes(b)
}

// Returns whether the connection is currently useful to us. We're seeding and
// they want data, we don't have metainfo and they can provide it, etc.
func (c *connection) useful() bool {
t := c.t
if c.closed.IsSet() {
return false
}
if !t.haveInfo() {
return c.supportsExtension("ut_metadata")
}
if t.seeding() {
return c.PeerInterested
}
return t.connHasWantedPieces(c)
}
7 changes: 4 additions & 3 deletions global.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,10 @@ const (
// http://www.bittorrent.org/beps/bep_0005.html
defaultExtensionBytes = "\x00\x00\x00\x00\x00\x10\x00\x01"

socketsPerTorrent = 80
torrentPeersHighWater = 200
torrentPeersLowWater = 50
defaultEstablishedConnsPerTorrent = 80
defaultHalfOpenConnsPerTorrent = 80
torrentPeersHighWater = 200
torrentPeersLowWater = 50

// Limit how long handshake can take. This is to reduce the lingering
// impact of a few bad apples. 4s loses 1% of successful handshakes that
Expand Down
67 changes: 52 additions & 15 deletions torrent.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,8 @@ type Torrent struct {
// The info dict. nil if we don't have it (yet).
info *metainfo.InfoEx
// Active peer connections, running message stream loops.
conns []*connection
conns []*connection
maxEstablishedConns int
// Set of addrs to which we're attempting to connect. Connections are
// half-open until all handshakes are completed.
halfOpen map[string]struct{}
Expand Down Expand Up @@ -133,12 +134,8 @@ func (t *Torrent) addrActive(addr string) bool {
return false
}

func (t *Torrent) worstConns(cl *Client) (wcs *worstConns) {
wcs = &worstConns{
c: make([]*connection, 0, len(t.conns)),
t: t,
cl: cl,
}
func (t *Torrent) worstConns() (wcs *worstConns) {
wcs = &worstConns{make([]*connection, 0, len(t.conns))}
for _, c := range t.conns {
if !c.closed.IsSet() {
wcs.c = append(wcs.c, c)
Expand Down Expand Up @@ -446,11 +443,7 @@ func (t *Torrent) writeStatus(w io.Writer, cl *Client) {
fmt.Fprintf(w, "Pending peers: %d\n", len(t.peers))
fmt.Fprintf(w, "Half open: %d\n", len(t.halfOpen))
fmt.Fprintf(w, "Active peers: %d\n", len(t.conns))
sort.Sort(&worstConns{
c: t.conns,
t: t,
cl: cl,
})
sort.Sort(&worstConns{t.conns})
for i, c := range t.conns {
fmt.Fprintf(w, "%2d. ", i+1)
c.WriteStatus(w, t)
Expand Down Expand Up @@ -740,15 +733,15 @@ func (t *Torrent) extentPieces(off, _len int64) (pieces []int) {
return
}

func (t *Torrent) worstBadConn(cl *Client) *connection {
wcs := t.worstConns(cl)
func (t *Torrent) worstBadConn() *connection {
wcs := t.worstConns()
heap.Init(wcs)
for wcs.Len() != 0 {
c := heap.Pop(wcs).(*connection)
if c.UnwantedChunksReceived >= 6 && c.UnwantedChunksReceived > c.UsefulChunksReceived {
return c
}
if wcs.Len() >= (socketsPerTorrent+1)/2 {
if wcs.Len() >= (t.maxEstablishedConns+1)/2 {
// Give connections 1 minute to prove themselves.
if time.Since(c.completedHandshake) > time.Minute {
return c
Expand Down Expand Up @@ -1273,3 +1266,47 @@ func (t *Torrent) addPeers(peers []Peer) {
func (t *Torrent) Stats() TorrentStats {
return t.stats
}

// Returns true if the connection is added.
func (t *Torrent) addConnection(c *connection) bool {
if t.cl.closed.IsSet() {
return false
}
if !t.wantConns() {
return false
}
for _, c0 := range t.conns {
if c.PeerID == c0.PeerID {
// Already connected to a client with that ID.
duplicateClientConns.Add(1)
return false
}
}
if len(t.conns) >= t.maxEstablishedConns {
c := t.worstBadConn()
if c == nil {
return false
}
if t.cl.config.Debug && missinggo.CryHeard() {
log.Printf("%s: dropping connection to make room for new one:\n %s", t, c)
}
c.Close()
t.deleteConnection(c)
}
if len(t.conns) >= t.maxEstablishedConns {
panic(len(t.conns))
}
t.conns = append(t.conns, c)
c.t = t
return true
}

func (t *Torrent) wantConns() bool {
if !t.seeding() && !t.needData() {
return false
}
if len(t.conns) < t.maxEstablishedConns {
return true
}
return t.worstBadConn() != nil
}
10 changes: 4 additions & 6 deletions worst_conns.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,9 @@ import (
"time"
)

// Implements heap functions such that [0] is the worst connection.
// Implements a heap of connections by how useful they are or have been.
type worstConns struct {
c []*connection
t *Torrent
cl *Client
c []*connection
}

func (wc *worstConns) Len() int { return len(wc.c) }
Expand Down Expand Up @@ -44,8 +42,8 @@ func (wc worstConnsSortKey) Less(other worstConnsSortKey) bool {

func (wc *worstConns) key(i int) (key worstConnsSortKey) {
c := wc.c[i]
key.useful = wc.cl.usefulConn(wc.t, c)
if wc.t.seeding() {
key.useful = c.useful()
if c.t.seeding() {
key.lastHelpful = c.lastChunkSent
}
// Intentionally consider the last time a chunk was received when seeding,
Expand Down

0 comments on commit 326b365

Please sign in to comment.