Skip to content

Commit 6618b8e

Browse files
committed
client-go: make retry in Request thread safe
1 parent 11a6146 commit 6618b8e

File tree

3 files changed

+56
-45
lines changed

3 files changed

+56
-45
lines changed

staging/src/k8s.io/client-go/rest/request.go

+34-18
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,12 @@ func (r *RequestConstructionError) Error() string {
8282

8383
var noBackoff = &NoBackoff{}
8484

85+
type requestRetryFunc func(maxRetries int) WithRetry
86+
87+
func defaultRequestRetryFn(maxRetries int) WithRetry {
88+
return &withRetry{maxRetries: maxRetries}
89+
}
90+
8591
// Request allows for building up a request to a server in a chained fashion.
8692
// Any errors are stored until the end of your call, so you only have to
8793
// check once.
@@ -93,6 +99,7 @@ type Request struct {
9399
rateLimiter flowcontrol.RateLimiter
94100
backoff BackoffManager
95101
timeout time.Duration
102+
maxRetries int
96103

97104
// generic components accessible via method setters
98105
verb string
@@ -109,9 +116,10 @@ type Request struct {
109116
subresource string
110117

111118
// output
112-
err error
113-
body io.Reader
114-
retry WithRetry
119+
err error
120+
body io.Reader
121+
122+
retryFn requestRetryFunc
115123
}
116124

117125
// NewRequest creates a new request helper object for accessing runtime.Objects on a server.
@@ -142,7 +150,8 @@ func NewRequest(c *RESTClient) *Request {
142150
backoff: backoff,
143151
timeout: timeout,
144152
pathPrefix: pathPrefix,
145-
retry: &withRetry{maxRetries: 10},
153+
maxRetries: 10,
154+
retryFn: defaultRequestRetryFn,
146155
warningHandler: c.warningHandler,
147156
}
148157

@@ -408,7 +417,10 @@ func (r *Request) Timeout(d time.Duration) *Request {
408417
// function is specifically called with a different value.
409418
// A zero maxRetries prevent it from doing retires and return an error immediately.
410419
func (r *Request) MaxRetries(maxRetries int) *Request {
411-
r.retry.SetMaxRetries(maxRetries)
420+
if maxRetries < 0 {
421+
maxRetries = 0
422+
}
423+
r.maxRetries = maxRetries
412424
return r
413425
}
414426

@@ -612,27 +624,29 @@ func (r *Request) Watch(ctx context.Context) (watch.Interface, error) {
612624
}
613625
return false
614626
}
627+
retry := r.retryFn(r.maxRetries)
615628
url := r.URL().String()
616629
for {
617-
if err := r.retry.Before(ctx, r); err != nil {
618-
return nil, r.retry.WrapPreviousError(err)
630+
if err := retry.Before(ctx, r); err != nil {
631+
return nil, retry.WrapPreviousError(err)
619632
}
620633

621634
req, err := r.newHTTPRequest(ctx)
622635
if err != nil {
623636
return nil, err
624637
}
638+
625639
resp, err := client.Do(req)
626640
updateURLMetrics(ctx, r, resp, err)
627-
r.retry.After(ctx, r, resp, err)
641+
retry.After(ctx, r, resp, err)
628642
if err == nil && resp.StatusCode == http.StatusOK {
629643
return r.newStreamWatcher(resp)
630644
}
631645

632646
done, transformErr := func() (bool, error) {
633647
defer readAndCloseResponseBody(resp)
634648

635-
if r.retry.IsNextRetry(ctx, r, req, resp, err, isErrRetryableFunc) {
649+
if retry.IsNextRetry(ctx, r, req, resp, err, isErrRetryableFunc) {
636650
return false, nil
637651
}
638652

@@ -654,7 +668,7 @@ func (r *Request) Watch(ctx context.Context) (watch.Interface, error) {
654668
// we need to return the error object from that.
655669
err = transformErr
656670
}
657-
return nil, r.retry.WrapPreviousError(err)
671+
return nil, retry.WrapPreviousError(err)
658672
}
659673
}
660674
}
@@ -719,9 +733,10 @@ func (r *Request) Stream(ctx context.Context) (io.ReadCloser, error) {
719733
client = http.DefaultClient
720734
}
721735

736+
retry := r.retryFn(r.maxRetries)
722737
url := r.URL().String()
723738
for {
724-
if err := r.retry.Before(ctx, r); err != nil {
739+
if err := retry.Before(ctx, r); err != nil {
725740
return nil, err
726741
}
727742

@@ -734,7 +749,7 @@ func (r *Request) Stream(ctx context.Context) (io.ReadCloser, error) {
734749
}
735750
resp, err := client.Do(req)
736751
updateURLMetrics(ctx, r, resp, err)
737-
r.retry.After(ctx, r, resp, err)
752+
retry.After(ctx, r, resp, err)
738753
if err != nil {
739754
// we only retry on an HTTP response with 'Retry-After' header
740755
return nil, err
@@ -749,7 +764,7 @@ func (r *Request) Stream(ctx context.Context) (io.ReadCloser, error) {
749764
done, transformErr := func() (bool, error) {
750765
defer resp.Body.Close()
751766

752-
if r.retry.IsNextRetry(ctx, r, req, resp, err, neverRetryError) {
767+
if retry.IsNextRetry(ctx, r, req, resp, err, neverRetryError) {
753768
return false, nil
754769
}
755770
result := r.transformResponse(resp, req)
@@ -856,9 +871,10 @@ func (r *Request) request(ctx context.Context, fn func(*http.Request, *http.Resp
856871
}
857872

858873
// Right now we make about ten retry attempts if we get a Retry-After response.
874+
retry := r.retryFn(r.maxRetries)
859875
for {
860-
if err := r.retry.Before(ctx, r); err != nil {
861-
return r.retry.WrapPreviousError(err)
876+
if err := retry.Before(ctx, r); err != nil {
877+
return retry.WrapPreviousError(err)
862878
}
863879
req, err := r.newHTTPRequest(ctx)
864880
if err != nil {
@@ -871,7 +887,7 @@ func (r *Request) request(ctx context.Context, fn func(*http.Request, *http.Resp
871887
if req.ContentLength >= 0 && !(req.Body != nil && req.ContentLength == 0) {
872888
metrics.RequestSize.Observe(ctx, r.verb, r.URL().Host, float64(req.ContentLength))
873889
}
874-
r.retry.After(ctx, r, resp, err)
890+
retry.After(ctx, r, resp, err)
875891

876892
done := func() bool {
877893
defer readAndCloseResponseBody(resp)
@@ -884,15 +900,15 @@ func (r *Request) request(ctx context.Context, fn func(*http.Request, *http.Resp
884900
fn(req, resp)
885901
}
886902

887-
if r.retry.IsNextRetry(ctx, r, req, resp, err, isErrRetryableFunc) {
903+
if retry.IsNextRetry(ctx, r, req, resp, err, isErrRetryableFunc) {
888904
return false
889905
}
890906

891907
f(req, resp)
892908
return true
893909
}()
894910
if done {
895-
return r.retry.WrapPreviousError(err)
911+
return retry.WrapPreviousError(err)
896912
}
897913
}
898914
}

staging/src/k8s.io/client-go/rest/request_test.go

+22-14
Original file line numberDiff line numberDiff line change
@@ -998,7 +998,8 @@ func TestRequestWatch(t *testing.T) {
998998
c.Client = client
999999
}
10001000
testCase.Request.backoff = &noSleepBackOff{}
1001-
testCase.Request.retry = &withRetry{maxRetries: testCase.maxRetries}
1001+
testCase.Request.maxRetries = testCase.maxRetries
1002+
testCase.Request.retryFn = defaultRequestRetryFn
10021003

10031004
watch, err := testCase.Request.Watch(context.Background())
10041005

@@ -1211,7 +1212,8 @@ func TestRequestStream(t *testing.T) {
12111212
c.Client = client
12121213
}
12131214
testCase.Request.backoff = &noSleepBackOff{}
1214-
testCase.Request.retry = &withRetry{maxRetries: testCase.maxRetries}
1215+
testCase.Request.maxRetries = testCase.maxRetries
1216+
testCase.Request.retryFn = defaultRequestRetryFn
12151217

12161218
body, err := testCase.Request.Stream(context.Background())
12171219

@@ -1266,7 +1268,7 @@ func TestRequestDo(t *testing.T) {
12661268
}
12671269
for i, testCase := range testCases {
12681270
testCase.Request.backoff = &NoBackoff{}
1269-
testCase.Request.retry = &withRetry{}
1271+
testCase.Request.retryFn = defaultRequestRetryFn
12701272
body, err := testCase.Request.Do(context.Background()).Raw()
12711273
hasErr := err != nil
12721274
if hasErr != testCase.Err {
@@ -1429,8 +1431,9 @@ func TestConnectionResetByPeerIsRetried(t *testing.T) {
14291431
return nil, &net.OpError{Err: syscall.ECONNRESET}
14301432
}),
14311433
},
1432-
backoff: backoff,
1433-
retry: &withRetry{maxRetries: 10},
1434+
backoff: backoff,
1435+
maxRetries: 10,
1436+
retryFn: defaultRequestRetryFn,
14341437
}
14351438
// We expect two retries of "connection reset by peer" and the success.
14361439
_, err := req.Do(context.Background()).Raw()
@@ -2504,8 +2507,9 @@ func TestRequestWithRetry(t *testing.T) {
25042507
c: &RESTClient{
25052508
Client: client,
25062509
},
2507-
backoff: &noSleepBackOff{},
2508-
retry: &withRetry{maxRetries: 1},
2510+
backoff: &noSleepBackOff{},
2511+
maxRetries: 1,
2512+
retryFn: defaultRequestRetryFn,
25092513
}
25102514

25112515
var transformFuncInvoked int
@@ -2782,8 +2786,9 @@ func testRequestWithRetry(t *testing.T, key string, doFunc func(ctx context.Cont
27822786
content: defaultContentConfig(),
27832787
Client: client,
27842788
},
2785-
backoff: &noSleepBackOff{},
2786-
retry: &withRetry{maxRetries: test.maxRetries},
2789+
backoff: &noSleepBackOff{},
2790+
maxRetries: test.maxRetries,
2791+
retryFn: defaultRequestRetryFn,
27872792
}
27882793

27892794
doFunc(context.Background(), req)
@@ -3006,7 +3011,8 @@ func testRetryWithRateLimiterBackoffAndMetrics(t *testing.T, key string, doFunc
30063011
pathPrefix: "/api/v1",
30073012
rateLimiter: interceptor,
30083013
backoff: interceptor,
3009-
retry: &withRetry{maxRetries: test.maxRetries},
3014+
maxRetries: test.maxRetries,
3015+
retryFn: defaultRequestRetryFn,
30103016
}
30113017

30123018
doFunc(ctx, req)
@@ -3140,7 +3146,7 @@ func testWithRetryInvokeOrder(t *testing.T, key string, doFunc func(ctx context.
31403146
pathPrefix: "/api/v1",
31413147
rateLimiter: flowcontrol.NewFakeAlwaysRateLimiter(),
31423148
backoff: &NoBackoff{},
3143-
retry: interceptor,
3149+
retryFn: func(_ int) WithRetry { return interceptor },
31443150
}
31453151

31463152
doFunc(context.Background(), req)
@@ -3315,7 +3321,8 @@ func testWithWrapPreviousError(t *testing.T, doFunc func(ctx context.Context, r
33153321
pathPrefix: "/api/v1",
33163322
rateLimiter: flowcontrol.NewFakeAlwaysRateLimiter(),
33173323
backoff: &noSleepBackOff{},
3318-
retry: &withRetry{maxRetries: test.maxRetries},
3324+
maxRetries: test.maxRetries,
3325+
retryFn: defaultRequestRetryFn,
33193326
}
33203327

33213328
err = doFunc(context.Background(), req)
@@ -3618,8 +3625,9 @@ func TestRequestBodyResetOrder(t *testing.T) {
36183625
content: defaultContentConfig(),
36193626
Client: client,
36203627
},
3621-
backoff: &noSleepBackOff{},
3622-
retry: &withRetry{maxRetries: 1},
3628+
backoff: &noSleepBackOff{},
3629+
maxRetries: 1,
3630+
retryFn: defaultRequestRetryFn,
36233631
}
36243632

36253633
req.Do(context.Background())

staging/src/k8s.io/client-go/rest/with_retry.go

-13
Original file line numberDiff line numberDiff line change
@@ -52,12 +52,6 @@ var neverRetryError = IsRetryableErrorFunc(func(_ *http.Request, _ error) bool {
5252
// Note that WithRetry is not safe for concurrent use by multiple
5353
// goroutines without additional locking or coordination.
5454
type WithRetry interface {
55-
// SetMaxRetries makes the request use the specified integer as a ceiling
56-
// for retries upon receiving a 429 status code and the "Retry-After" header
57-
// in the response.
58-
// A zero maxRetries should prevent from doing any retry and return immediately.
59-
SetMaxRetries(maxRetries int)
60-
6155
// IsNextRetry advances the retry counter appropriately
6256
// and returns true if the request should be retried,
6357
// otherwise it returns false, if:
@@ -144,13 +138,6 @@ type withRetry struct {
144138
previousErr, currentErr error
145139
}
146140

147-
func (r *withRetry) SetMaxRetries(maxRetries int) {
148-
if maxRetries < 0 {
149-
maxRetries = 0
150-
}
151-
r.maxRetries = maxRetries
152-
}
153-
154141
func (r *withRetry) trackPreviousError(err error) {
155142
// keep track of two most recent errors
156143
if r.currentErr != nil {

0 commit comments

Comments
 (0)