Skip to content

Commit

Permalink
rebased and removed some unwanted code
Browse files Browse the repository at this point in the history
  • Loading branch information
sandeepsukhani authored and cyriltovena committed Aug 5, 2019
1 parent 3457f44 commit bddf4fc
Show file tree
Hide file tree
Showing 2 changed files with 5 additions and 126 deletions.
48 changes: 0 additions & 48 deletions pkg/querier/querier.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,7 @@ import (
cortex_client "github.com/cortexproject/cortex/pkg/ingester/client"
"github.com/cortexproject/cortex/pkg/ring"
"github.com/cortexproject/cortex/pkg/util"
token_util "github.com/grafana/loki/pkg/util"
"github.com/prometheus/common/model"
"github.com/weaveworks/common/user"
"google.golang.org/grpc/health/grpc_health_v1"

"github.com/grafana/loki/pkg/helpers"
Expand Down Expand Up @@ -287,59 +285,13 @@ func (q *Querier) Tail(ctx context.Context, req *logproto.TailRequest) (*Tailer,
time.Duration(req.DelayFor)*time.Second,
tailClients,
reversedIterator,
func(from, to time.Time, labels string) (iterator iter.EntryIterator, e error) {
return q.queryDroppedStreams(queryCtx, req, from, to, labels)
},
func(connectedIngestersAddr []string) (map[string]logproto.Querier_TailClient, error) {
return q.tailDisconnectedIngesters(tailCtx, req, connectedIngestersAddr)
},
q.cfg.TailMaxDuration,
), nil
}

// passed to tailer for querying dropped streams
func (q *Querier) queryDroppedStreams(ctx context.Context, req *logproto.TailRequest, start, end time.Time, labels string) (iter.EntryIterator, error) {
userID, err := user.ExtractOrgID(ctx)
if err != nil {
return nil, err
}

key := token_util.TokenFor(userID, labels)
replicationSet, err := q.ring.Get(key, ring.Read)
if err != nil {
return nil, err
}

query := logproto.QueryRequest{
Direction: logproto.FORWARD,
Start: start,
End: end,
Limit: 10000,
Query: req.Query,
Regex: req.Regex,
}

clients, err := q.forGivenIngesters(replicationSet, func(client logproto.QuerierClient) (interface{}, error) {
return client.Query(ctx, &query)
})
if err != nil {
return nil, err
}

ingesterIterators := make([]iter.EntryIterator, len(clients))
for i := range clients {
ingesterIterators[i] = iter.NewQueryClientIterator(clients[i].response.(logproto.Querier_QueryClient), query.Direction)
}

chunkStoreIterators, err := q.store.LazyQuery(ctx, &query)
if err != nil {
return nil, err
}

iterators := append(ingesterIterators, chunkStoreIterators)
return iter.NewHeapIterator(iterators, query.Direction), nil
}

// passed to tailer for (re)connecting to new or disconnected ingesters
func (q *Querier) tailDisconnectedIngesters(ctx context.Context, req *logproto.TailRequest, connectedIngestersAddr []string) (map[string]logproto.Querier_TailClient, error) {
tailClients := make(map[string]logproto.Querier_TailClient)
Expand Down
83 changes: 5 additions & 78 deletions pkg/querier/tail.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,51 +33,15 @@ type TailResponse struct {
DroppedEntries []droppedEntry `json:"dropped_entries"`
}

/*// dropped streams are collected into a heap to quickly find dropped stream which has oldest timestamp
type droppedStreamsIterator []logproto.DroppedStream
func (h droppedStreamsIterator) Len() int { return len(h) }
func (h droppedStreamsIterator) Swap(i, j int) {
h[i], h[j] = h[j], h[i]
}
func (h droppedStreamsIterator) Peek() time.Time {
return h[0].From
}
func (h *droppedStreamsIterator) Push(x interface{}) {
*h = append(*h, x.(logproto.DroppedStream))
}
func (h *droppedStreamsIterator) Pop() interface{} {
old := *h
n := len(old)
x := old[n-1]
*h = old[0 : n-1]
return x
}
func (h droppedStreamsIterator) Less(i, j int) bool {
t1, t2 := h[i].From, h[j].From
if !t1.Equal(t2) {
return t1.Before(t2)
}
return h[i].Labels < h[j].Labels
}*/

// Tailer manages complete lifecycle of a tail request
type Tailer struct {
// openStreamIterator is for streams already open which can be complete streams returned by ingester or
// dropped streams queried from ingester and store
// openStreamIterator is for streams already open
openStreamIterator iter.HeapIterator
/*droppedStreamsIterator interface { // for holding dropped stream metadata
heap.Interface
Peek() time.Time
}*/
streamMtx sync.Mutex // for synchronizing access to openStreamIterator and droppedStreamsIterator
streamMtx sync.Mutex // for synchronizing access to openStreamIterator

currEntry logproto.Entry
currLabels string

queryDroppedStreams func(from, to time.Time, labels string) (iter.EntryIterator, error)
tailDisconnectedIngesters func([]string) (map[string]logproto.Querier_TailClient, error)

querierTailClients map[string]logproto.Querier_TailClient // addr -> grpc clients for tailing logs from ingesters
Expand Down Expand Up @@ -164,7 +128,6 @@ func (t *Tailer) loop() {
tailResponse.DroppedEntries = t.popDroppedEntries()
}

//response := []tailResponse{{Stream: logproto.Stream{Labels: t.currLabels, Entries: responses[t.currLabels]}, DroppedEntries: t.popDroppedEntries()}}
select {
case t.responseChan <- tailResponse:
default:
Expand Down Expand Up @@ -231,48 +194,17 @@ func (t *Tailer) pushTailResponseFromIngester(resp *logproto.TailResponse) {
defer t.streamMtx.Unlock()

t.openStreamIterator.Push(iter.NewStreamIterator(resp.Stream))
/*if resp.DroppedStreams != nil {
for idx := range resp.DroppedStreams {
heap.Push(t.droppedStreamsIterator, *resp.DroppedStreams[idx])
}
}*/
}

// finds oldest entry by peeking at open stream iterator and dropped stream iterator.
// if open stream iterator has oldest entry then pop it for sending it to tail client
// else pop dropped stream details, to query from ingester and store.
// Response from ingester and store is pushed to open stream for further processing
// finds oldest entry by peeking at open stream iterator.
// Response from ingester is pushed to open stream for further processing
func (t *Tailer) next() bool {
t.streamMtx.Lock()
defer t.streamMtx.Unlock()

if t.openStreamIterator.Len() == 0 || !time.Now().After(t.openStreamIterator.Peek().Add(t.delayFor)) || !t.openStreamIterator.Next() {
return false
}
/*// if we don't have any entries or any of the entries are not older than now()-delay then return false
if !((t.openStreamIterator.Len() != 0 && time.Now().After(t.openStreamIterator.Peek().Add(t.delayFor))) || (t.droppedStreamsIterator.Len() != 0 && time.Now().After(t.droppedStreamsIterator.Peek().Add(t.delayFor)))) {
return false
}
// If any of the dropped streams are older than open streams, pop dropped stream details for querying them
if t.droppedStreamsIterator.Len() != 0 {
oldestTsFromDroppedStreams := t.droppedStreamsIterator.Peek()
if t.droppedStreamsIterator.Len() != 0 && (t.openStreamIterator.Len() == 0 || t.openStreamIterator.Peek().After(t.droppedStreamsIterator.Peek())) {
for t.droppedStreamsIterator.Len() != 0 && t.droppedStreamsIterator.Peek().Equal(oldestTsFromDroppedStreams) {
droppedStream := heap.Pop(t.droppedStreamsIterator).(logproto.DroppedStream)
iterator, err := t.queryDroppedStreams(droppedStream.From, droppedStream.To.Add(1), droppedStream.Labels)
if err != nil {
level.Error(util.Logger).Log("Error querying dropped streams", fmt.Sprintf("%v", err))
continue
}
t.openStreamIterator.Push(iterator)
}
}
}
if !t.openStreamIterator.Next() {
return false
}*/

t.currEntry = t.openStreamIterator.Entry()
t.currLabels = t.openStreamIterator.Labels()
Expand Down Expand Up @@ -328,24 +260,19 @@ func newTailer(
delayFor time.Duration,
querierTailClients map[string]logproto.Querier_TailClient,
historicEntries iter.EntryIterator,
queryDroppedStreams func(from, to time.Time, labels string) (iter.EntryIterator, error),
tailDisconnectedIngesters func([]string) (map[string]logproto.Querier_TailClient, error),
tailMaxDuration time.Duration,
) *Tailer {
t := Tailer{
openStreamIterator: iter.NewHeapIterator([]iter.EntryIterator{}, logproto.FORWARD),
//droppedStreamsIterator: &droppedStreamsIterator{},
openStreamIterator: iter.NewHeapIterator([]iter.EntryIterator{historicEntries}, logproto.FORWARD),
querierTailClients: querierTailClients,
queryDroppedStreams: queryDroppedStreams,
delayFor: delayFor,
responseChan: make(chan *TailResponse, bufferSizeForTailResponse),
closeErrChan: make(chan error),
tailDisconnectedIngesters: tailDisconnectedIngesters,
tailMaxDuration: tailMaxDuration,
}

t.openStreamIterator.Push(historicEntries)

t.readTailClients()
go t.loop()
return &t
Expand Down

0 comments on commit bddf4fc

Please sign in to comment.