Skip to content

Commit

Permalink
discovery/triton: pass context to the client (prometheus#5235)
Browse files Browse the repository at this point in the history
Signed-off-by: Simon Pasquier <[email protected]>
  • Loading branch information
simonpasquier authored Feb 26, 2019
1 parent 9de0ab3 commit fe7a1bc
Show file tree
Hide file tree
Showing 2 changed files with 45 additions and 34 deletions.
19 changes: 12 additions & 7 deletions discovery/triton/triton.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ func (d *Discovery) Run(ctx context.Context, ch chan<- []*targetgroup.Group) {
defer ticker.Stop()

// Get an initial set right away.
tg, err := d.refresh()
tg, err := d.refresh(ctx)
if err != nil {
level.Error(d.logger).Log("msg", "Refreshing targets failed", "err", err)
} else {
Expand All @@ -169,7 +169,7 @@ func (d *Discovery) Run(ctx context.Context, ch chan<- []*targetgroup.Group) {
for {
select {
case <-ticker.C:
tg, err := d.refresh()
tg, err := d.refresh(ctx)
if err != nil {
level.Error(d.logger).Log("msg", "Refreshing targets failed", "err", err)
} else {
Expand All @@ -181,7 +181,7 @@ func (d *Discovery) Run(ctx context.Context, ch chan<- []*targetgroup.Group) {
}
}

func (d *Discovery) refresh() (tg *targetgroup.Group, err error) {
func (d *Discovery) refresh(ctx context.Context) (tg *targetgroup.Group, err error) {
t0 := time.Now()
defer func() {
refreshDuration.Observe(time.Since(t0).Seconds())
Expand All @@ -200,22 +200,27 @@ func (d *Discovery) refresh() (tg *targetgroup.Group, err error) {
Source: endpoint,
}

resp, err := d.client.Get(endpoint)
req, err := http.NewRequest("GET", endpoint, nil)
if err != nil {
return tg, fmt.Errorf("an error occurred when requesting targets from the discovery endpoint. %s", err)
return nil, err
}
req = req.WithContext(ctx)
resp, err := d.client.Do(req)
if err != nil {
return nil, fmt.Errorf("an error occurred when requesting targets from the discovery endpoint: %s", err)
}

defer resp.Body.Close()

data, err := ioutil.ReadAll(resp.Body)
if err != nil {
return tg, fmt.Errorf("an error occurred when reading the response body. %s", err)
return nil, fmt.Errorf("an error occurred when reading the response body: %s", err)
}

dr := DiscoveryResponse{}
err = json.Unmarshal(data, &dr)
if err != nil {
return tg, fmt.Errorf("an error occurred unmarshaling the discovery response json. %s", err)
return nil, fmt.Errorf("an error occurred unmarshaling the discovery response json: %s", err)
}

for _, container := range dr.Containers {
Expand Down
60 changes: 33 additions & 27 deletions discovery/triton/triton_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,8 +67,12 @@ var (
}
)

func newTritonDiscovery(c SDConfig) (*Discovery, error) {
return New(nil, &c)
}

func TestTritonSDNew(t *testing.T) {
td, err := New(nil, &conf)
td, err := newTritonDiscovery(conf)
testutil.Ok(t, err)
testutil.Assert(t, td != nil, "")
testutil.Assert(t, td.client != nil, "")
Expand All @@ -81,13 +85,13 @@ func TestTritonSDNew(t *testing.T) {
}

func TestTritonSDNewBadConfig(t *testing.T) {
td, err := New(nil, &badconf)
td, err := newTritonDiscovery(badconf)
testutil.NotOk(t, err, "")
testutil.Assert(t, td == nil, "")
}

func TestTritonSDNewGroupsConfig(t *testing.T) {
td, err := New(nil, &groupsconf)
td, err := newTritonDiscovery(groupsconf)
testutil.Ok(t, err)
testutil.Assert(t, td != nil, "")
testutil.Assert(t, td.client != nil, "")
Expand All @@ -102,14 +106,11 @@ func TestTritonSDNewGroupsConfig(t *testing.T) {

func TestTritonSDRun(t *testing.T) {
var (
td, err = New(nil, &conf)
td, _ = newTritonDiscovery(conf)
ch = make(chan []*targetgroup.Group)
ctx, cancel = context.WithCancel(context.Background())
)

testutil.Ok(t, err)
testutil.Assert(t, td != nil, "")

wait := make(chan struct{})
go func() {
td.Run(ctx, ch)
Expand Down Expand Up @@ -160,47 +161,52 @@ func TestTritonSDRefreshMultipleTargets(t *testing.T) {

func TestTritonSDRefreshNoServer(t *testing.T) {
var (
td, err = New(nil, &conf)
td, _ = newTritonDiscovery(conf)
)
testutil.Ok(t, err)
testutil.Assert(t, td != nil, "")

tg, rerr := td.refresh()
testutil.NotOk(t, rerr, "")
testutil.Equals(t, strings.Contains(rerr.Error(), "an error occurred when requesting targets from the discovery endpoint."), true)
testutil.Assert(t, tg != nil, "")
testutil.Assert(t, tg.Targets == nil, "")
_, err := td.refresh(context.Background())
testutil.NotOk(t, err, "")
testutil.Equals(t, strings.Contains(err.Error(), "an error occurred when requesting targets from the discovery endpoint"), true)
}

func TestTritonSDRefreshCancelled(t *testing.T) {
var (
td, _ = newTritonDiscovery(conf)
)

ctx, cancel := context.WithCancel(context.Background())
cancel()
_, err := td.refresh(ctx)
testutil.NotOk(t, err, "")
testutil.Equals(t, strings.Contains(err.Error(), context.Canceled.Error()), true)
}

func testTritonSDRefresh(t *testing.T, dstr string) []model.LabelSet {
var (
td, err = New(nil, &conf)
s = httptest.NewTLSServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
td, _ = newTritonDiscovery(conf)
s = httptest.NewTLSServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
fmt.Fprintln(w, dstr)
}))
)

defer s.Close()

u, uperr := url.Parse(s.URL)
testutil.Ok(t, uperr)
u, err := url.Parse(s.URL)
testutil.Ok(t, err)
testutil.Assert(t, u != nil, "")

host, strport, sherr := net.SplitHostPort(u.Host)
testutil.Ok(t, sherr)
host, strport, err := net.SplitHostPort(u.Host)
testutil.Ok(t, err)
testutil.Assert(t, host != "", "")
testutil.Assert(t, strport != "", "")

port, atoierr := strconv.Atoi(strport)
testutil.Ok(t, atoierr)
port, err := strconv.Atoi(strport)
testutil.Ok(t, err)
testutil.Assert(t, port != 0, "")

td.sdConfig.Port = port

testutil.Ok(t, err)
testutil.Assert(t, td != nil, "")

tg, err := td.refresh()
tg, err := td.refresh(context.Background())
testutil.Ok(t, err)
testutil.Assert(t, tg != nil, "")

Expand Down

0 comments on commit fe7a1bc

Please sign in to comment.