Skip to content

Commit

Permalink
add request packet size token limit (ContentSquare#299)
Browse files Browse the repository at this point in the history
* add request packet size token limit

* update packet size token reserve way and some other details

* fix config_test

---------

Co-authored-by: wangqian445 <[email protected]>
  • Loading branch information
Smelly-calf and wangqian445 authored Feb 21, 2023
1 parent ed1bf8c commit 7b2f4ec
Show file tree
Hide file tree
Showing 10 changed files with 205 additions and 24 deletions.
16 changes: 16 additions & 0 deletions config/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -258,6 +258,14 @@ max_execution_time: <duration> | optional | default = 0
# By default there are no per-minute limits
requests_per_minute: <int> | optional | default = 0

# The burst of request packet size token bucket for user
# By default there are no request packet size limits
request_packet_size_tokens_burst: <byte_size> | optional | default = 0

# The request packet size tokens produced rate per second for user
# By default there are no request packet size limits
request_packet_size_tokens_rate: <byte_size> | optional | default = 0

# Maximum number of requests waiting for execution in the queue.
# By default requests are executed without waiting in the queue
max_queue_size: <int> | optional | default = 0
Expand Down Expand Up @@ -362,6 +370,14 @@ max_execution_time: <duration> | optional | default = 0
# By default there are no per-minute limits
requests_per_minute: <int> | optional | default = 0

# The burst of request packet size token bucket for user
# By default there are no request packet size limits
request_packet_size_tokens_burst: <byte_size> | optional | default = 0

# The request packet size tokens produced rate per second for user
# By default there are no request packet size limits
request_packet_size_tokens_rate: <byte_size> | optional | default = 0

# Maximum number of requests waiting for execution in the queue.
# By default requests are executed without waiting in the queue
max_queue_size: <int> | optional | default = 0
Expand Down
24 changes: 24 additions & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -544,6 +544,14 @@ type User struct {
// if omitted or zero - no limits would be applied
ReqPerMin uint32 `yaml:"requests_per_minute,omitempty"`

// The burst of request packet size token bucket for user
// if omitted or zero - no limits would be applied
ReqPacketSizeTokensBurst ByteSize `yaml:"request_packet_size_tokens_burst,omitempty"`

// The request packet size tokens produced rate per second for user
// if omitted or zero - no limits would be applied
ReqPacketSizeTokensRate ByteSize `yaml:"request_packet_size_tokens_rate,omitempty"`

// Maximum number of queries waiting for execution in the queue
// if omitted or zero - queries are executed without waiting
// in the queue
Expand Down Expand Up @@ -615,6 +623,10 @@ func (u *User) UnmarshalYAML(unmarshal func(interface{}) error) error {
}
}

if u.ReqPacketSizeTokensBurst > 0 && u.ReqPacketSizeTokensRate == 0 {
return fmt.Errorf("`request_packet_size_tokens_rate` must be set if `request_packet_size_tokens_burst` is set for %q", u.Name)
}

return checkOverflow(u.XXX, fmt.Sprintf("user %q", u.Name))
}

Expand Down Expand Up @@ -826,6 +838,14 @@ type ClusterUser struct {
// if omitted or zero - no limits would be applied
ReqPerMin uint32 `yaml:"requests_per_minute,omitempty"`

// The burst of request packet size token bucket for user
// if omitted or zero - no limits would be applied
ReqPacketSizeTokensBurst ByteSize `yaml:"request_packet_size_tokens_burst,omitempty"`

// The request packet size tokens produced rate for user
// if omitted or zero - no limits would be applied
ReqPacketSizeTokensRate ByteSize `yaml:"request_packet_size_tokens_rate,omitempty"`

// Maximum number of queries waiting for execution in the queue
// if omitted or zero - queries are executed without waiting
// in the queue
Expand Down Expand Up @@ -861,6 +881,10 @@ func (cu *ClusterUser) UnmarshalYAML(unmarshal func(interface{}) error) error {
return fmt.Errorf("`max_queue_size` must be set if `max_queue_time` is set for %q", cu.Name)
}

if cu.ReqPacketSizeTokensBurst > 0 && cu.ReqPacketSizeTokensRate == 0 {
return fmt.Errorf("`request_packet_size_tokens_rate` must be set if `request_packet_size_tokens_burst` is set for %q", cu.Name)
}

return checkOverflow(cu.XXX, fmt.Sprintf("cluster.user %q", cu.Name))
}

Expand Down
10 changes: 10 additions & 0 deletions config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -456,6 +456,16 @@ func TestBadConfig(t *testing.T) {
"testdata/bad.queue_size_time_cluster_user.yml",
"`max_queue_size` must be set if `max_queue_time` is set for \"default\"",
},
{
"packet size token burst and rate on user",
"testdata/bad.packet_size_token_burst_rate_user.yml",
"`request_packet_size_tokens_rate` must be set if `request_packet_size_tokens_burst` is set for \"default\"",
},
{
"packet size token burst and rate on user on cluster_user",
"testdata/bad.packet_size_token_burst_rate_cluster_user.yml",
"`request_packet_size_tokens_rate` must be set if `request_packet_size_tokens_burst` is set for \"default\"",
},
{
"cache max size",
"testdata/bad.cache_max_size.yml",
Expand Down
16 changes: 16 additions & 0 deletions config/testdata/bad.packet_size_token_burst_rate_cluster_user.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
server:
http:
listen_addr: ":8080"
allowed_networks: ["127.0.0.1"]

users:
- name: "default"
to_cluster: "cluster"
to_user: "default"

clusters:
- name: "cluster"
nodes: ["127.0.0.1:8123"]
users:
- name: "default"
request_packet_size_tokens_burst: 10
14 changes: 14 additions & 0 deletions config/testdata/bad.packet_size_token_burst_rate_user.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
server:
http:
listen_addr: ":8080"
allowed_networks: ["127.0.0.1"]

users:
- name: "default"
to_cluster: "cluster"
to_user: "default"
request_packet_size_tokens_burst: 10

clusters:
- name: "cluster"
nodes: ["127.0.0.1:8123"]
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ require (
github.com/redis/go-redis/v9 v9.0.2
github.com/stretchr/testify v1.8.1
golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9
golang.org/x/time v0.3.0
gopkg.in/yaml.v2 v2.4.0
)

Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,8 @@ golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk=
golang.org/x/text v0.3.6 h1:aRYxNxv6iGQlyVaZmk6ZgYEDa+Jg18DxebPSrd6bg1M=
golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/time v0.3.0 h1:rg5rLMjNzMS1RkNLzCG38eapWhnYLFYXDXj2gOlr8j4=
golang.org/x/time v0.3.0/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1 h1:go1bK/D/BFZV2I8cIQd1NKEZ+0owSTG1fDTci4IqFcE=
Expand Down
6 changes: 6 additions & 0 deletions proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -831,5 +831,11 @@ func (rp *reverseProxy) getScope(req *http.Request) (*scope, int, error) {
}

s := newScope(req, u, c, cu, sessionId, sessionTimeout)

q, err := getFullQuery(req)
if err != nil {
return nil, http.StatusBadRequest, fmt.Errorf("%s: cannot read query: %w", s, err)
}
s.requestPacketSize = len(q)
return s, 0, nil
}
57 changes: 57 additions & 0 deletions proxy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"bytes"
"crypto/tls"
"fmt"
"golang.org/x/time/rate"
"io"
"math/rand"
"net"
Expand Down Expand Up @@ -578,6 +579,62 @@ func TestReverseProxy_ServeHTTP1(t *testing.T) {
return makeCustomRequest(p, req)
},
},
{
cfg: goodCfg,
name: "request packet size token limit for user",
expResponse: "limits for user \"default\" is exceeded: request_packet_size_tokens_burst limit: 4",
expStatusCode: http.StatusTooManyRequests,
f: func(p *reverseProxy) *http.Response {
p.users[defaultUsername].reqPacketSizeTokensBurst = 4
p.users[defaultUsername].reqPacketSizeTokenLimiter = rate.NewLimiter(
rate.Limit(1), 4)
go makeHeavyRequest(p, time.Millisecond*20)
return makeHeavyRequest(p, time.Millisecond*200)
},
},
{
cfg: goodCfg,
name: "request packet size token limit for cluster user",
expResponse: "limits for cluster user \"web\" is exceeded: request_packet_size_tokens_burst limit: 4",
expStatusCode: http.StatusTooManyRequests,
f: func(p *reverseProxy) *http.Response {
p.clusters["cluster"].users["web"].reqPacketSizeTokensBurst = 4
p.clusters["cluster"].users["web"].reqPacketSizeTokenLimiter = rate.NewLimiter(
rate.Limit(1), 4)
go makeHeavyRequest(p, time.Millisecond*20)
return makeHeavyRequest(p, time.Millisecond*200)
},
},
{
cfg: goodCfg,
name: "queuing request packet size token limit for user",
expResponse: okResponse,
expStatusCode: http.StatusOK,
f: func(p *reverseProxy) *http.Response {
p.users[defaultUsername].reqPacketSizeTokensBurst = 5
p.users[defaultUsername].reqPacketSizeTokenLimiter = rate.NewLimiter(
rate.Limit(1), 5)
p.users[defaultUsername].queueCh = make(chan struct{}, 2)
p.users[defaultUsername].maxQueueTime = 10 * time.Second
runHeavyRequestInGoroutine(p, 1, true)
return makeHeavyRequest(p, time.Millisecond*200)
},
},
{
cfg: goodCfg,
name: "queuing request with packet size token limit for cluster user",
expResponse: okResponse,
expStatusCode: http.StatusOK,
f: func(p *reverseProxy) *http.Response {
p.clusters["cluster"].users["web"].reqPacketSizeTokensBurst = 5
p.clusters["cluster"].users["web"].reqPacketSizeTokenLimiter = rate.NewLimiter(
rate.Limit(1), 5)
p.clusters["cluster"].users["web"].queueCh = make(chan struct{}, 2)
p.clusters["cluster"].users["web"].maxQueueTime = 10 * time.Second
runHeavyRequestInGoroutine(p, 1, true)
return makeHeavyRequest(p, time.Millisecond*200)
},
},
}

for _, tc := range testCases {
Expand Down
83 changes: 59 additions & 24 deletions scope.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"github.com/contentsquare/chproxy/config"
"github.com/contentsquare/chproxy/log"
"github.com/prometheus/client_golang/prometheus"
"golang.org/x/time/rate"
)

type scopeID uint64
Expand Down Expand Up @@ -50,6 +51,8 @@ type scope struct {
canceled bool

labels prometheus.Labels

requestPacketSize int
}

func newScope(req *http.Request, u *user, c *cluster, cu *clusterUser, sessionId string, sessionTimeout int) *scope {
Expand Down Expand Up @@ -224,6 +227,24 @@ func (s *scope) inc() error {
s.clusterUser.name, s.clusterUser.reqPerMin)
}

// reserving tokens num s.requestPacketSize
if s.user.reqPacketSizeTokensBurst > 0 {
tl := s.user.reqPacketSizeTokenLimiter
ok := tl.AllowN(time.Now(), s.requestPacketSize)
if !ok {
err = fmt.Errorf("limits for user %q is exceeded: request_packet_size_tokens_burst limit: %d",
s.user.name, s.user.reqPacketSizeTokensBurst)
}
}
if s.clusterUser.reqPacketSizeTokensBurst > 0 {
tl := s.clusterUser.reqPacketSizeTokenLimiter
ok := tl.AllowN(time.Now(), s.requestPacketSize)
if !ok {
err = fmt.Errorf("limits for cluster user %q is exceeded: request_packet_size_tokens_burst limit: %d",
s.clusterUser.name, s.clusterUser.reqPacketSizeTokensBurst)
}
}

if err != nil {
s.user.queryCounter.dec()
s.clusterUser.queryCounter.dec()
Expand Down Expand Up @@ -469,6 +490,10 @@ type user struct {
reqPerMin uint32
rateLimiter rateLimiter

reqPacketSizeTokenLimiter *rate.Limiter
reqPacketSizeTokensBurst config.ByteSize
reqPacketSizeTokensRate config.ByteSize

queueCh chan struct{}
maxQueueTime time.Duration

Expand Down Expand Up @@ -541,22 +566,25 @@ func (up usersProfile) newUser(u config.User) (*user, error) {
}

return &user{
name: u.Name,
password: u.Password,
toCluster: u.ToCluster,
toUser: u.ToUser,
maxConcurrentQueries: u.MaxConcurrentQueries,
maxExecutionTime: time.Duration(u.MaxExecutionTime),
reqPerMin: u.ReqPerMin,
queueCh: queueCh,
maxQueueTime: time.Duration(u.MaxQueueTime),
allowedNetworks: u.AllowedNetworks,
denyHTTP: u.DenyHTTP,
denyHTTPS: u.DenyHTTPS,
allowCORS: u.AllowCORS,
isWildcarded: u.IsWildcarded,
cache: cc,
params: params,
name: u.Name,
password: u.Password,
toCluster: u.ToCluster,
toUser: u.ToUser,
maxConcurrentQueries: u.MaxConcurrentQueries,
maxExecutionTime: time.Duration(u.MaxExecutionTime),
reqPerMin: u.ReqPerMin,
queueCh: queueCh,
maxQueueTime: time.Duration(u.MaxQueueTime),
reqPacketSizeTokenLimiter: rate.NewLimiter(rate.Limit(u.ReqPacketSizeTokensRate), int(u.ReqPacketSizeTokensBurst)),
reqPacketSizeTokensBurst: u.ReqPacketSizeTokensBurst,
reqPacketSizeTokensRate: u.ReqPacketSizeTokensRate,
allowedNetworks: u.AllowedNetworks,
denyHTTP: u.DenyHTTP,
denyHTTPS: u.DenyHTTPS,
allowCORS: u.AllowCORS,
isWildcarded: u.IsWildcarded,
cache: cc,
params: params,
}, nil
}

Expand All @@ -575,6 +603,10 @@ type clusterUser struct {
queueCh chan struct{}
maxQueueTime time.Duration

reqPacketSizeTokenLimiter *rate.Limiter
reqPacketSizeTokensBurst config.ByteSize
reqPacketSizeTokensRate config.ByteSize

allowedNetworks config.Networks
isWildcarded bool
}
Expand Down Expand Up @@ -602,14 +634,17 @@ func newClusterUser(cu config.ClusterUser) *clusterUser {
queueCh = make(chan struct{}, cu.MaxQueueSize)
}
return &clusterUser{
name: cu.Name,
password: cu.Password,
maxConcurrentQueries: cu.MaxConcurrentQueries,
maxExecutionTime: time.Duration(cu.MaxExecutionTime),
reqPerMin: cu.ReqPerMin,
queueCh: queueCh,
maxQueueTime: time.Duration(cu.MaxQueueTime),
allowedNetworks: cu.AllowedNetworks,
name: cu.Name,
password: cu.Password,
maxConcurrentQueries: cu.MaxConcurrentQueries,
maxExecutionTime: time.Duration(cu.MaxExecutionTime),
reqPerMin: cu.ReqPerMin,
reqPacketSizeTokenLimiter: rate.NewLimiter(rate.Limit(cu.ReqPacketSizeTokensRate), int(cu.ReqPacketSizeTokensBurst)),
reqPacketSizeTokensBurst: cu.ReqPacketSizeTokensBurst,
reqPacketSizeTokensRate: cu.ReqPacketSizeTokensRate,
queueCh: queueCh,
maxQueueTime: time.Duration(cu.MaxQueueTime),
allowedNetworks: cu.AllowedNetworks,
}
}

Expand Down

0 comments on commit 7b2f4ec

Please sign in to comment.