Skip to content

Commit

Permalink
boskos-client: HTTP retry on timeout
Browse files Browse the repository at this point in the history
This patch implements retry logic for HTTP requests that timeout when
attempting to connect to the remote endpoint.

A retry count of one results in at most two round-trip requests -- the
initial request and a possible retry.

Fixes kubernetes#9549 - cc @krzyzacy
  • Loading branch information
akutz committed Feb 20, 2019
1 parent 050bdfd commit 65a8195
Showing 1 changed file with 65 additions and 10 deletions.
75 changes: 65 additions & 10 deletions boskos/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"io"
"io/ioutil"
"net/http"
"net/url"
"strings"
"time"

Expand All @@ -44,20 +45,31 @@ var (

// Client defines the public Boskos client object
type Client struct {
// RetryCount is the number of times an HTTP request issued by this client
// is retried when the initial request fails due an inaccessible endpoint.
RetryCount uint

// RetryDuration is the interval to wait before retrying an HTTP operation
// that failed due to an inaccessible endpoint.
RetryWait time.Duration

owner string
url string
lock sync.Mutex

storage storage.PersistenceLayer
}

// NewClient creates a boskos client, with boskos url and owner of the client.
// NewClient creates a boskos client, with boskos url, owner of the client,
// a RetryCount of 3, and a RetryWait interval of 10s.
func NewClient(owner string, url string) *Client {

client := &Client{
url: url,
owner: owner,
storage: storage.NewMemoryStorage(),
RetryCount: 3,
RetryWait: 10 * time.Second,
url: url,
owner: owner,
storage: storage.NewMemoryStorage(),
}

return client
Expand Down Expand Up @@ -240,7 +252,7 @@ func (c *Client) updateLocalResource(i common.Item, state string, data *common.U
}

func (c *Client) acquire(rtype, state, dest string) (*common.Resource, error) {
resp, err := http.Post(fmt.Sprintf("%v/acquire?type=%v&state=%v&dest=%v&owner=%v",
resp, err := c.httpPost(fmt.Sprintf("%v/acquire?type=%v&state=%v&dest=%v&owner=%v",
c.url, rtype, state, dest, c.owner), "", nil)
if err != nil {
return nil, err
Expand Down Expand Up @@ -269,7 +281,7 @@ func (c *Client) acquire(rtype, state, dest string) (*common.Resource, error) {
}

func (c *Client) acquireByState(state, dest string, names []string) ([]common.Resource, error) {
resp, err := http.Post(fmt.Sprintf("%v/acquirebystate?state=%v&dest=%v&owner=%v&names=%v",
resp, err := c.httpPost(fmt.Sprintf("%v/acquirebystate?state=%v&dest=%v&owner=%v&names=%v",
c.url, state, dest, c.owner, strings.Join(names, ",")), "", nil)
if err != nil {
return nil, err
Expand All @@ -292,7 +304,7 @@ func (c *Client) acquireByState(state, dest string, names []string) ([]common.Re
}

func (c *Client) release(name, dest string) error {
resp, err := http.Post(fmt.Sprintf("%v/release?name=%v&dest=%v&owner=%v",
resp, err := c.httpPost(fmt.Sprintf("%v/release?name=%v&dest=%v&owner=%v",
c.url, name, dest, c.owner), "", nil)
if err != nil {
return err
Expand All @@ -315,7 +327,7 @@ func (c *Client) update(name, state string, userData *common.UserData) error {
}
body = b
}
resp, err := http.Post(fmt.Sprintf("%v/update?name=%v&owner=%v&state=%v",
resp, err := c.httpPost(fmt.Sprintf("%v/update?name=%v&owner=%v&state=%v",
c.url, name, c.owner, state), "application/json", body)
if err != nil {
return err
Expand All @@ -330,7 +342,7 @@ func (c *Client) update(name, state string, userData *common.UserData) error {

func (c *Client) reset(rtype, state string, expire time.Duration, dest string) (map[string]string, error) {
rmap := make(map[string]string)
resp, err := http.Post(fmt.Sprintf("%v/reset?type=%v&state=%v&expire=%v&dest=%v",
resp, err := c.httpPost(fmt.Sprintf("%v/reset?type=%v&state=%v&expire=%v&dest=%v",
c.url, rtype, state, expire.String(), dest), "", nil)
if err != nil {
return rmap, err
Expand All @@ -352,7 +364,7 @@ func (c *Client) reset(rtype, state string, expire time.Duration, dest string) (

func (c *Client) metric(rtype string) (common.Metric, error) {
var metric common.Metric
resp, err := http.Get(fmt.Sprintf("%v/metric?type=%v", c.url, rtype))
resp, err := c.httpGet(fmt.Sprintf("%v/metric?type=%v", c.url, rtype))
if err != nil {
return metric, err
}
Expand All @@ -370,3 +382,46 @@ func (c *Client) metric(rtype string) (common.Metric, error) {
err = json.Unmarshal(body, &metric)
return metric, err
}

func (c *Client) httpGet(url string) (*http.Response, error) {
req, err := http.NewRequest(http.MethodGet, url, nil)
if err != nil {
return nil, err
}
return c.httpDo(req)
}

func (c *Client) httpPost(url, contentType string, body io.Reader) (*http.Response, error) {
req, err := http.NewRequest(http.MethodPost, url, body)
if err != nil {
return nil, err
}
if contentType != "" {
req.Header.Set("Content-Type", contentType)
}
return c.httpDo(req)
}

func (c *Client) httpDo(req *http.Request) (*http.Response, error) {
// Always bump the retryCount by 1 in order to equal the actual number of
// attempts. For example, if a retryCount of 2 is specified, the intent
// is for three attempts -- the initial attempt with two retries in case
// the initial attempt times out.
retryCount := c.RetryCount + 1
retryWait := c.RetryWait
i := uint(0)
for {
res, err := http.DefaultClient.Do(req)
if err != nil {
if err2, ok := err.(*url.Error); ok && err2.Timeout() {
if i < retryCount-1 {
i++
time.Sleep(retryWait)
continue
}
}
return nil, err
}
return res, nil
}
}

0 comments on commit 65a8195

Please sign in to comment.