forked from grafana/loki
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathtail.go
55 lines (44 loc) · 1.17 KB
/
tail.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
package main
import (
"time"
"github.com/grafana/loki/pkg/iter"
"github.com/grafana/loki/pkg/logproto"
)
const tailIteratorIncrement = 10 * time.Second
func tailQuery() iter.EntryIterator {
return &tailIterator{
from: time.Now().Add(-tailIteratorIncrement),
}
}
type tailIterator struct {
from time.Time
err error
iter.EntryIterator
}
func (t *tailIterator) Next() bool {
for t.EntryIterator == nil || !t.EntryIterator.Next() {
through, now := t.from.Add(tailIteratorIncrement), time.Now()
if through.After(now) {
time.Sleep(through.Sub(now))
}
resp, err := query(t.from, through, logproto.FORWARD)
if err != nil {
t.err = err
return false
}
// We store the through time such that if we don't see any entries, we will
// still make forward progress. This is overwritten by any entries we might
// see to ensure pagination works.
t.from = through
t.EntryIterator = iter.NewQueryResponseIterator(resp, logproto.FORWARD)
}
return true
}
func (t *tailIterator) Entry() logproto.Entry {
entry := t.EntryIterator.Entry()
t.from = entry.Timestamp.Add(1 * time.Nanosecond)
return entry
}
func (t *tailIterator) Error() error {
return t.err
}