Skip to content

Commit 97bf298

Browse files
authored
Merge pull request kubernetes#109050 from MadhavJivrajani/client-go-retry
rest: Ensure response body is fully read and closed before retry
2 parents 92a1d0f + 68c8c45 commit 97bf298

File tree

3 files changed

+150
-55
lines changed

3 files changed

+150
-55
lines changed

staging/src/k8s.io/client-go/rest/request.go

+11-14
Original file line numberDiff line numberDiff line change
@@ -614,15 +614,14 @@ func (r *Request) Watch(ctx context.Context) (watch.Interface, error) {
614614
}
615615
url := r.URL().String()
616616
for {
617-
req, err := r.newHTTPRequest(ctx)
618-
if err != nil {
619-
return nil, err
620-
}
621-
622617
if err := r.retry.Before(ctx, r); err != nil {
623618
return nil, r.retry.WrapPreviousError(err)
624619
}
625620

621+
req, err := r.newHTTPRequest(ctx)
622+
if err != nil {
623+
return nil, err
624+
}
626625
resp, err := client.Do(req)
627626
updateURLMetrics(ctx, r, resp, err)
628627
r.retry.After(ctx, r, resp, err)
@@ -722,18 +721,17 @@ func (r *Request) Stream(ctx context.Context) (io.ReadCloser, error) {
722721

723722
url := r.URL().String()
724723
for {
724+
if err := r.retry.Before(ctx, r); err != nil {
725+
return nil, err
726+
}
727+
725728
req, err := r.newHTTPRequest(ctx)
726729
if err != nil {
727730
return nil, err
728731
}
729732
if r.body != nil {
730733
req.Body = ioutil.NopCloser(r.body)
731734
}
732-
733-
if err := r.retry.Before(ctx, r); err != nil {
734-
return nil, err
735-
}
736-
737735
resp, err := client.Do(req)
738736
updateURLMetrics(ctx, r, resp, err)
739737
r.retry.After(ctx, r, resp, err)
@@ -859,14 +857,13 @@ func (r *Request) request(ctx context.Context, fn func(*http.Request, *http.Resp
859857

860858
// Right now we make about ten retry attempts if we get a Retry-After response.
861859
for {
860+
if err := r.retry.Before(ctx, r); err != nil {
861+
return r.retry.WrapPreviousError(err)
862+
}
862863
req, err := r.newHTTPRequest(ctx)
863864
if err != nil {
864865
return err
865866
}
866-
867-
if err := r.retry.Before(ctx, r); err != nil {
868-
return r.retry.WrapPreviousError(err)
869-
}
870867
resp, err := client.Do(req)
871868
updateURLMetrics(ctx, r, resp, err)
872869
// The value -1 or a value of 0 with a non-nil Body indicates that the length is unknown.

staging/src/k8s.io/client-go/rest/request_test.go

+118-9
Original file line numberDiff line numberDiff line change
@@ -938,7 +938,7 @@ func TestRequestWatch(t *testing.T) {
938938
},
939939
Err: true,
940940
ErrFn: func(err error) bool {
941-
return apierrors.IsInternalError(err)
941+
return !apierrors.IsInternalError(err) && strings.Contains(err.Error(), "failed to reset the request body while retrying a request: EOF")
942942
},
943943
},
944944
{
@@ -954,7 +954,10 @@ func TestRequestWatch(t *testing.T) {
954954
serverReturns: []responseErr{
955955
{response: nil, err: io.EOF},
956956
},
957-
Empty: true,
957+
Err: true,
958+
ErrFn: func(err error) bool {
959+
return !apierrors.IsInternalError(err)
960+
},
958961
},
959962
{
960963
name: "max retries 2, server always returns a response with Retry-After header",
@@ -1130,7 +1133,7 @@ func TestRequestStream(t *testing.T) {
11301133
},
11311134
Err: true,
11321135
ErrFn: func(err error) bool {
1133-
return apierrors.IsInternalError(err)
1136+
return !apierrors.IsInternalError(err) && strings.Contains(err.Error(), "failed to reset the request body while retrying a request: EOF")
11341137
},
11351138
},
11361139
{
@@ -1371,8 +1374,6 @@ func (b *testBackoffManager) Sleep(d time.Duration) {
13711374
}
13721375

13731376
func TestCheckRetryClosesBody(t *testing.T) {
1374-
// unblock CI until http://issue.k8s.io/108906 is resolved in 1.24
1375-
t.Skip("http://issue.k8s.io/108906")
13761377
count := 0
13771378
ch := make(chan struct{})
13781379
testServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
@@ -2435,6 +2436,7 @@ func TestRequestWithRetry(t *testing.T) {
24352436
body io.Reader
24362437
serverReturns responseErr
24372438
errExpected error
2439+
errContains string
24382440
transformFuncInvokedExpected int
24392441
roundTripInvokedExpected int
24402442
}{
@@ -2451,7 +2453,7 @@ func TestRequestWithRetry(t *testing.T) {
24512453
body: &readSeeker{err: io.EOF},
24522454
serverReturns: responseErr{response: retryAfterResponse(), err: nil},
24532455
errExpected: nil,
2454-
transformFuncInvokedExpected: 1,
2456+
transformFuncInvokedExpected: 0,
24552457
roundTripInvokedExpected: 1,
24562458
},
24572459
{
@@ -2474,7 +2476,7 @@ func TestRequestWithRetry(t *testing.T) {
24742476
name: "server returns retryable err, request body Seek returns error, retry aborted",
24752477
body: &readSeeker{err: io.EOF},
24762478
serverReturns: responseErr{response: nil, err: io.ErrUnexpectedEOF},
2477-
errExpected: io.ErrUnexpectedEOF,
2479+
errContains: "failed to reset the request body while retrying a request: EOF",
24782480
transformFuncInvokedExpected: 0,
24792481
roundTripInvokedExpected: 1,
24802482
},
@@ -2517,8 +2519,15 @@ func TestRequestWithRetry(t *testing.T) {
25172519
if test.transformFuncInvokedExpected != transformFuncInvoked {
25182520
t.Errorf("Expected transform func to be invoked %d times, but got: %d", test.transformFuncInvokedExpected, transformFuncInvoked)
25192521
}
2520-
if test.errExpected != unWrap(err) {
2521-
t.Errorf("Expected error: %v, but got: %v", test.errExpected, unWrap(err))
2522+
switch {
2523+
case test.errExpected != nil:
2524+
if test.errExpected != unWrap(err) {
2525+
t.Errorf("Expected error: %v, but got: %v", test.errExpected, unWrap(err))
2526+
}
2527+
case len(test.errContains) > 0:
2528+
if !strings.Contains(err.Error(), test.errContains) {
2529+
t.Errorf("Expected error message to caontain: %q, but got: %q", test.errContains, err.Error())
2530+
}
25222531
}
25232532
})
25242533
}
@@ -3531,3 +3540,103 @@ func TestTransportConcurrency(t *testing.T) {
35313540
})
35323541
}
35333542
}
3543+
3544+
// TODO: see if we can consolidate the other trackers into one.
3545+
type requestBodyTracker struct {
3546+
io.ReadSeeker
3547+
f func(string)
3548+
}
3549+
3550+
func (t *requestBodyTracker) Read(p []byte) (int, error) {
3551+
t.f("Request.Body.Read")
3552+
return t.ReadSeeker.Read(p)
3553+
}
3554+
3555+
func (t *requestBodyTracker) Seek(offset int64, whence int) (int64, error) {
3556+
t.f("Request.Body.Seek")
3557+
return t.ReadSeeker.Seek(offset, whence)
3558+
}
3559+
3560+
type responseBodyTracker struct {
3561+
io.ReadCloser
3562+
f func(string)
3563+
}
3564+
3565+
func (t *responseBodyTracker) Read(p []byte) (int, error) {
3566+
t.f("Response.Body.Read")
3567+
return t.ReadCloser.Read(p)
3568+
}
3569+
3570+
func (t *responseBodyTracker) Close() error {
3571+
t.f("Response.Body.Close")
3572+
return t.ReadCloser.Close()
3573+
}
3574+
3575+
type recorder struct {
3576+
order []string
3577+
}
3578+
3579+
func (r *recorder) record(call string) {
3580+
r.order = append(r.order, call)
3581+
}
3582+
3583+
func TestRequestBodyResetOrder(t *testing.T) {
3584+
recorder := &recorder{}
3585+
respBodyTracker := &responseBodyTracker{
3586+
ReadCloser: nil, // the server will fill it
3587+
f: recorder.record,
3588+
}
3589+
3590+
var attempts int
3591+
client := clientForFunc(func(req *http.Request) (*http.Response, error) {
3592+
defer func() {
3593+
attempts++
3594+
}()
3595+
3596+
// read the request body.
3597+
ioutil.ReadAll(req.Body)
3598+
3599+
// first attempt, we send a retry-after
3600+
if attempts == 0 {
3601+
resp := retryAfterResponse()
3602+
respBodyTracker.ReadCloser = ioutil.NopCloser(bytes.NewReader([]byte{}))
3603+
resp.Body = respBodyTracker
3604+
return resp, nil
3605+
}
3606+
3607+
return &http.Response{StatusCode: http.StatusOK}, nil
3608+
})
3609+
3610+
reqBodyTracker := &requestBodyTracker{
3611+
ReadSeeker: bytes.NewReader([]byte{}), // empty body ensures one Read operation at most.
3612+
f: recorder.record,
3613+
}
3614+
req := &Request{
3615+
verb: "POST",
3616+
body: reqBodyTracker,
3617+
c: &RESTClient{
3618+
content: defaultContentConfig(),
3619+
Client: client,
3620+
},
3621+
backoff: &noSleepBackOff{},
3622+
retry: &withRetry{maxRetries: 1},
3623+
}
3624+
3625+
req.Do(context.Background())
3626+
3627+
expected := []string{
3628+
// 1st attempt: the server handler reads the request body
3629+
"Request.Body.Read",
3630+
// the server sends a retry-after, client reads the
3631+
// response body, and closes it
3632+
"Response.Body.Read",
3633+
"Response.Body.Close",
3634+
// client retry logic seeks to the beginning of the request body
3635+
"Request.Body.Seek",
3636+
// 2nd attempt: the server reads the request body
3637+
"Request.Body.Read",
3638+
}
3639+
if !reflect.DeepEqual(expected, recorder.order) {
3640+
t.Errorf("Expected invocation request and response body operations for retry do not match: %s", cmp.Diff(expected, recorder.order))
3641+
}
3642+
}

staging/src/k8s.io/client-go/rest/with_retry.go

+21-32
Original file line numberDiff line numberDiff line change
@@ -78,8 +78,12 @@ type WithRetry interface {
7878
IsNextRetry(ctx context.Context, restReq *Request, httpReq *http.Request, resp *http.Response, err error, f IsRetryableErrorFunc) bool
7979

8080
// Before should be invoked prior to each attempt, including
81-
// the first one. if an error is returned, the request
82-
// should be aborted immediately.
81+
// the first one. If an error is returned, the request should
82+
// be aborted immediately.
83+
//
84+
// Before may also be additionally responsible for preparing
85+
// the request for the next retry, namely in terms of resetting
86+
// the request body in case it has been read.
8387
Before(ctx context.Context, r *Request) error
8488

8589
// After should be invoked immediately after an attempt is made.
@@ -194,46 +198,18 @@ func (r *withRetry) IsNextRetry(ctx context.Context, restReq *Request, httpReq *
194198
r.retryAfter.Wait = time.Duration(seconds) * time.Second
195199
r.retryAfter.Reason = getRetryReason(r.attempts, seconds, resp, err)
196200

197-
if err := r.prepareForNextRetry(ctx, restReq); err != nil {
198-
klog.V(4).Infof("Could not retry request - %v", err)
199-
return false
200-
}
201-
202201
return true
203202
}
204203

205-
// prepareForNextRetry is responsible for carrying out operations that need
206-
// to be completed before the next retry is initiated:
207-
// - if the request context is already canceled there is no need to
208-
// retry, the function will return ctx.Err().
209-
// - we need to seek to the beginning of the request body before we
210-
// initiate the next retry, the function should return an error if
211-
// it fails to do so.
212-
func (r *withRetry) prepareForNextRetry(ctx context.Context, request *Request) error {
213-
if ctx.Err() != nil {
214-
return ctx.Err()
215-
}
216-
217-
// Ensure the response body is fully read and closed before
218-
// we reconnect, so that we reuse the same TCP connection.
219-
if seeker, ok := request.body.(io.Seeker); ok && request.body != nil {
220-
if _, err := seeker.Seek(0, 0); err != nil {
221-
return fmt.Errorf("can't Seek() back to beginning of body for %T", request)
222-
}
223-
}
224-
225-
klog.V(4).Infof("Got a Retry-After %s response for attempt %d to %v", r.retryAfter.Wait, r.retryAfter.Attempt, request.URL().String())
226-
return nil
227-
}
228-
229204
func (r *withRetry) Before(ctx context.Context, request *Request) error {
205+
// If the request context is already canceled there
206+
// is no need to retry.
230207
if ctx.Err() != nil {
231208
r.trackPreviousError(ctx.Err())
232209
return ctx.Err()
233210
}
234211

235212
url := request.URL()
236-
237213
// r.retryAfter represents the retry after parameters calculated
238214
// from the (response, err) tuple from the last attempt, so 'Before'
239215
// can apply these retry after parameters prior to the next attempt.
@@ -245,6 +221,18 @@ func (r *withRetry) Before(ctx context.Context, request *Request) error {
245221
return nil
246222
}
247223

224+
// At this point we've made atleast one attempt, post which the response
225+
// body should have been fully read and closed in order for it to be safe
226+
// to reset the request body before we reconnect, in order for us to reuse
227+
// the same TCP connection.
228+
if seeker, ok := request.body.(io.Seeker); ok && request.body != nil {
229+
if _, err := seeker.Seek(0, io.SeekStart); err != nil {
230+
err = fmt.Errorf("failed to reset the request body while retrying a request: %v", err)
231+
r.trackPreviousError(err)
232+
return err
233+
}
234+
}
235+
248236
// if we are here, we have made attempt(s) al least once before.
249237
if request.backoff != nil {
250238
// TODO(tkashem) with default set to use exponential backoff
@@ -263,6 +251,7 @@ func (r *withRetry) Before(ctx context.Context, request *Request) error {
263251
return err
264252
}
265253

254+
klog.V(4).Infof("Got a Retry-After %s response for attempt %d to %v", r.retryAfter.Wait, r.retryAfter.Attempt, request.URL().String())
266255
return nil
267256
}
268257

0 commit comments

Comments
 (0)