Skip to content

Commit

Permalink
Fix/deadlock frontend queue (grafana#1490)
Browse files Browse the repository at this point in the history
* buffers splitby interval channels to prevent deadlocks

* improves codec merge test requiring merging from multiple intervals

* bound concurrency by min(parallelism,jobs)

* approximates goroutine test
  • Loading branch information
owen-d authored and cyriltovena committed Jan 8, 2020
1 parent 7cc93b2 commit a1d090e
Show file tree
Hide file tree
Showing 4 changed files with 93 additions and 9 deletions.
8 changes: 5 additions & 3 deletions pkg/querier/queryrange/codec_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -337,7 +337,7 @@ func Test_codec_MergeResponse(t *testing.T) {
&LokiResponse{
Status: loghttp.QueryStatusSuccess,
Direction: logproto.BACKWARD,
Limit: 4,
Limit: 6,
Version: 1,
Data: LokiData{
ResultType: loghttp.ResultTypeStream,
Expand All @@ -363,7 +363,7 @@ func Test_codec_MergeResponse(t *testing.T) {
&LokiResponse{
Status: loghttp.QueryStatusSuccess,
Direction: logproto.BACKWARD,
Limit: 4,
Limit: 6,
Version: 1,
Data: LokiData{
ResultType: loghttp.ResultTypeStream,
Expand All @@ -389,7 +389,7 @@ func Test_codec_MergeResponse(t *testing.T) {
&LokiResponse{
Status: loghttp.QueryStatusSuccess,
Direction: logproto.BACKWARD,
Limit: 4,
Limit: 6,
Version: 1,
Data: LokiData{
ResultType: loghttp.ResultTypeStream,
Expand All @@ -399,13 +399,15 @@ func Test_codec_MergeResponse(t *testing.T) {
Entries: []logproto.Entry{
{Timestamp: time.Unix(0, 10), Line: "10"},
{Timestamp: time.Unix(0, 9), Line: "9"},
{Timestamp: time.Unix(0, 9), Line: "9"},
},
},
{
Labels: `{foo="bar", level="debug"}`,
Entries: []logproto.Entry{
{Timestamp: time.Unix(0, 16), Line: "16"},
{Timestamp: time.Unix(0, 15), Line: "15"},
{Timestamp: time.Unix(0, 6), Line: "6"},
},
},
},
Expand Down
11 changes: 8 additions & 3 deletions pkg/querier/queryrange/roundtrip_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -243,14 +243,19 @@ func TestUnhandledPath(t *testing.T) {
require.NoError(t, err)
}

type fakeLimits struct{}
type fakeLimits struct {
maxQueryParallelism int
}

func (fakeLimits) MaxQueryLength(string) time.Duration {
return time.Hour * 7
}

func (fakeLimits) MaxQueryParallelism(string) int {
return 1
func (f fakeLimits) MaxQueryParallelism(string) int {
if f.maxQueryParallelism == 0 {
return 1
}
return f.maxQueryParallelism
}

func counter() (*int, http.Handler) {
Expand Down
15 changes: 12 additions & 3 deletions pkg/querier/queryrange/split_by_interval.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,9 +58,18 @@ func (h *splitByInterval) Process(
threshold int64,
input []*lokiResult,
) (responses []queryrange.Response, err error) {
ctx, cancel := context.WithCancel(ctx)
defer cancel()

ch := h.Feed(ctx, input)

for i := 0; i < parallelism; i++ {
// don't spawn unnecessary goroutines
var p int = parallelism
if len(input) < parallelism {
p = len(input)
}

for i := 0; i < p; i++ {
go h.loop(ctx, ch)
}

Expand Down Expand Up @@ -118,8 +127,8 @@ func (h *splitByInterval) Do(ctx context.Context, r queryrange.Request) (queryra
for _, interval := range intervals {
input = append(input, &lokiResult{
req: interval,
resp: make(chan queryrange.Response),
err: make(chan error),
resp: make(chan queryrange.Response, 1),
err: make(chan error, 1),
})
}

Expand Down
68 changes: 68 additions & 0 deletions pkg/querier/queryrange/split_by_interval_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package queryrange
import (
"context"
"fmt"
"runtime"
"sync"
"testing"
"time"
Expand Down Expand Up @@ -327,3 +328,70 @@ func Test_ExitEarly(t *testing.T) {
require.NoError(t, err)
require.Equal(t, expected, res)
}

func Test_DoesntDeadlock(t *testing.T) {
n := 10

split := splitByInterval{
next: queryrange.HandlerFunc(func(_ context.Context, r queryrange.Request) (queryrange.Response, error) {

return &LokiResponse{
Status: loghttp.QueryStatusSuccess,
Direction: r.(*LokiRequest).Direction,
Limit: r.(*LokiRequest).Limit,
Version: uint32(loghttp.VersionV1),
Data: LokiData{
ResultType: loghttp.ResultTypeStream,
Result: []logproto.Stream{
{
Labels: `{foo="bar", level="debug"}`,
Entries: []logproto.Entry{

{
Timestamp: time.Unix(0, r.(*LokiRequest).StartTs.UnixNano()),
Line: fmt.Sprintf("%d", r.(*LokiRequest).StartTs.UnixNano()),
},
},
},
},
},
}, nil
}),
limits: fakeLimits{
maxQueryParallelism: n,
},
merger: lokiCodec,
interval: time.Hour,
}

// split into n requests w/ n/2 limit, ensuring unused responses are cleaned up properly
req := &LokiRequest{
StartTs: time.Unix(0, 0),
EndTs: time.Unix(0, (time.Duration(n) * time.Hour).Nanoseconds()),
Query: "",
Limit: uint32(n / 2),
Step: 1,
Direction: logproto.FORWARD,
Path: "/api/prom/query_range",
}

ctx := user.InjectOrgID(context.Background(), "1")

startingGoroutines := runtime.NumGoroutine()

// goroutines shouldn't blow up across 100 rounds
for i := 0; i < 100; i++ {
res, err := split.Do(ctx, req)
require.NoError(t, err)
require.Equal(t, 1, len(res.(*LokiResponse).Data.Result))
require.Equal(t, n/2, len(res.(*LokiResponse).Data.Result[0].Entries))

}
runtime.GC()
endingGoroutines := runtime.NumGoroutine()

// give runtime a bit of slack when catching up -- this isn't an exact science :(
// Allow for 1% increase in goroutines
require.LessOrEqual(t, endingGoroutines, startingGoroutines*101/100)

}

0 comments on commit a1d090e

Please sign in to comment.