forked from golang/net
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathconn_flow.go
144 lines (127 loc) · 4.81 KB
/
conn_flow.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
// Copyright 2023 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
//go:build go1.21
package quic
import (
"sync/atomic"
"time"
)
// connInflow tracks connection-level flow control for data sent by the peer to us.
//
// There are four byte offsets of significance in the stream of data received from the peer,
// each >= to the previous:
//
// - bytes read by the user
// - bytes received from the peer
// - limit sent to the peer in a MAX_DATA frame
// - potential new limit to sent to the peer
//
// We maintain a flow control window, so as bytes are read by the user
// the potential limit is extended correspondingly.
//
// We keep an atomic counter of bytes read by the user and not yet applied to the
// potential limit (credit). When this count grows large enough, we update the
// new limit to send and mark that we need to send a new MAX_DATA frame.
type connInflow struct {
sent sentVal // set when we need to send a MAX_DATA update to the peer
usedLimit int64 // total bytes sent by the peer, must be less than sentLimit
sentLimit int64 // last MAX_DATA sent to the peer
newLimit int64 // new MAX_DATA to send
credit atomic.Int64 // bytes read but not yet applied to extending the flow-control window
}
func (c *Conn) inflowInit() {
// The initial MAX_DATA limit is sent as a transport parameter.
c.streams.inflow.sentLimit = c.config.maxConnReadBufferSize()
c.streams.inflow.newLimit = c.streams.inflow.sentLimit
}
// handleStreamBytesReadOffLoop records that the user has consumed bytes from a stream.
// We may extend the peer's flow control window.
//
// This is called indirectly by the user, via Read or CloseRead.
func (c *Conn) handleStreamBytesReadOffLoop(n int64) {
if n == 0 {
return
}
if c.shouldUpdateFlowControl(c.streams.inflow.credit.Add(n)) {
// We should send a MAX_DATA update to the peer.
// Record this on the Conn's main loop.
c.sendMsg(func(now time.Time, c *Conn) {
// A MAX_DATA update may have already happened, so check again.
if c.shouldUpdateFlowControl(c.streams.inflow.credit.Load()) {
c.sendMaxDataUpdate()
}
})
}
}
// handleStreamBytesReadOnLoop extends the peer's flow control window after
// data has been discarded due to a RESET_STREAM frame.
//
// This is called on the conn's loop.
func (c *Conn) handleStreamBytesReadOnLoop(n int64) {
if c.shouldUpdateFlowControl(c.streams.inflow.credit.Add(n)) {
c.sendMaxDataUpdate()
}
}
func (c *Conn) sendMaxDataUpdate() {
c.streams.inflow.sent.setUnsent()
// Apply current credit to the limit.
// We don't strictly need to do this here
// since appendMaxDataFrame will do so as well,
// but this avoids redundant trips down this path
// if the MAX_DATA frame doesn't go out right away.
c.streams.inflow.newLimit += c.streams.inflow.credit.Swap(0)
}
func (c *Conn) shouldUpdateFlowControl(credit int64) bool {
return shouldUpdateFlowControl(c.config.maxConnReadBufferSize(), credit)
}
// handleStreamBytesReceived records that the peer has sent us stream data.
func (c *Conn) handleStreamBytesReceived(n int64) error {
c.streams.inflow.usedLimit += n
if c.streams.inflow.usedLimit > c.streams.inflow.sentLimit {
return localTransportError{
code: errFlowControl,
reason: "stream exceeded flow control limit",
}
}
return nil
}
// appendMaxDataFrame appends a MAX_DATA frame to the current packet.
//
// It returns true if no more frames need appending,
// false if it could not fit a frame in the current packet.
func (c *Conn) appendMaxDataFrame(w *packetWriter, pnum packetNumber, pto bool) bool {
if c.streams.inflow.sent.shouldSendPTO(pto) {
// Add any unapplied credit to the new limit now.
c.streams.inflow.newLimit += c.streams.inflow.credit.Swap(0)
if !w.appendMaxDataFrame(c.streams.inflow.newLimit) {
return false
}
c.streams.inflow.sentLimit += c.streams.inflow.newLimit
c.streams.inflow.sent.setSent(pnum)
}
return true
}
// ackOrLossMaxData records the fate of a MAX_DATA frame.
func (c *Conn) ackOrLossMaxData(pnum packetNumber, fate packetFate) {
c.streams.inflow.sent.ackLatestOrLoss(pnum, fate)
}
// connOutflow tracks connection-level flow control for data sent by us to the peer.
type connOutflow struct {
max int64 // largest MAX_DATA received from peer
used int64 // total bytes of STREAM data sent to peer
}
// setMaxData updates the connection-level flow control limit
// with the initial limit conveyed in transport parameters
// or an update from a MAX_DATA frame.
func (f *connOutflow) setMaxData(maxData int64) {
f.max = max(f.max, maxData)
}
// avail returns the number of connection-level flow control bytes available.
func (f *connOutflow) avail() int64 {
return f.max - f.used
}
// consume records consumption of n bytes of flow.
func (f *connOutflow) consume(n int64) {
f.used += n
}