Skip to content

Commit

Permalink
Addition of a /suspend endpoint to Loki Canary (grafana#1891)
Browse files Browse the repository at this point in the history
* First pass loki canary pause

Signed-off-by: Joe Elliott <[email protected]>

* Write suspending msg to stderr

Signed-off-by: Joe Elliott <[email protected]>

* Added resume endpoint

Signed-off-by: Joe Elliott <[email protected]>

* Added documentation on new endpoints

Signed-off-by: Joe Elliott <[email protected]>

* Pass both signals to the same channel

Signed-off-by: Joe Elliott <[email protected]>

* lint

Signed-off-by: Joe Elliott <[email protected]>

* Removed return on startCanary

Signed-off-by: Joe Elliott <[email protected]>
  • Loading branch information
joe-elliott authored Apr 6, 2020
1 parent 7573078 commit 383eb6e
Show file tree
Hide file tree
Showing 3 changed files with 67 additions and 27 deletions.
73 changes: 52 additions & 21 deletions cmd/loki-canary/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"os"
"os/signal"
"strconv"
"sync"
"syscall"
"time"

Expand All @@ -19,6 +20,14 @@ import (
"github.com/grafana/loki/pkg/canary/writer"
)

type canary struct {
lock sync.Mutex

writer *writer.Writer
reader *reader.Reader
comparator *comparator.Comparator
}

func main() {

lName := flag.String("labelname", "name", "The label name for this instance of loki-canary to use in the log selector")
Expand Down Expand Up @@ -52,10 +61,28 @@ func main() {
sentChan := make(chan time.Time)
receivedChan := make(chan time.Time)

w := writer.NewWriter(os.Stdout, sentChan, *interval, *size)
r := reader.NewReader(os.Stderr, receivedChan, *tls, *addr, *user, *pass, *lName, *lVal)
c := comparator.NewComparator(os.Stderr, *wait, *pruneInterval, *buckets, sentChan, receivedChan, r, true)
c := &canary{}
startCanary := func() {
c.stop()

c.lock.Lock()
defer c.lock.Unlock()

c.writer = writer.NewWriter(os.Stdout, sentChan, *interval, *size)
c.reader = reader.NewReader(os.Stderr, receivedChan, *tls, *addr, *user, *pass, *lName, *lVal)
c.comparator = comparator.NewComparator(os.Stderr, *wait, *pruneInterval, *buckets, sentChan, receivedChan, c.reader, true)
}

startCanary()

http.HandleFunc("/resume", func(_ http.ResponseWriter, _ *http.Request) {
_, _ = fmt.Fprintf(os.Stderr, "restarting\n")
startCanary()
})
http.HandleFunc("/suspend", func(_ http.ResponseWriter, _ *http.Request) {
_, _ = fmt.Fprintf(os.Stderr, "suspending\n")
c.stop()
})
http.Handle("/metrics", promhttp.Handler())
go func() {
err := http.ListenAndServe(":"+strconv.Itoa(*port), nil)
Expand All @@ -64,25 +91,29 @@ func main() {
}
}()

interrupt := make(chan os.Signal, 1)
terminate := make(chan os.Signal, 1)
signal.Notify(interrupt, os.Interrupt)
signal.Notify(terminate, syscall.SIGTERM)

for {
select {
case <-interrupt:
_, _ = fmt.Fprintf(os.Stderr, "suspending indefinitely\n")
w.Stop()
r.Stop()
c.Stop()
case <-terminate:
_, _ = fmt.Fprintf(os.Stderr, "shutting down\n")
w.Stop()
r.Stop()
c.Stop()
return
}
signal.Notify(terminate, syscall.SIGTERM, os.Interrupt)

for range terminate {
_, _ = fmt.Fprintf(os.Stderr, "shutting down\n")
c.stop()
return
}
}

func (c *canary) stop() {
c.lock.Lock()
defer c.lock.Unlock()

if c.writer == nil || c.reader == nil || c.comparator == nil {
return
}

c.writer.Stop()
c.reader.Stop()
c.comparator.Stop()

c.writer = nil
c.reader = nil
c.comparator = nil
}
7 changes: 7 additions & 0 deletions docs/operations/loki-canary.md
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,13 @@ determine if they are truly missing or only missing from the WebSocket. If
missing entries are not found in the direct query, the `missing_entries` counter
is incremented.

### Control

Loki Canary responds to two endpoints to allow dynamic suspending/resuming of the
canary process. This can be useful if you'd like to quickly disable or reenable the
canary. To stop or start the canary issue an HTTP GET request against the `/suspend` or
`/resume` endpoints.

## Installation

### Binary
Expand Down
14 changes: 8 additions & 6 deletions pkg/canary/comparator/comparator.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,12 +86,14 @@ func NewComparator(writer io.Writer, maxWait time.Duration, pruneInterval time.D
done: make(chan struct{}),
}

responseLatency = promauto.NewHistogram(prometheus.HistogramOpts{
Namespace: "loki_canary",
Name: "response_latency",
Help: "is how long it takes for log lines to be returned from Loki in seconds.",
Buckets: prometheus.ExponentialBuckets(0.5, 2, buckets),
})
if responseLatency == nil {
responseLatency = promauto.NewHistogram(prometheus.HistogramOpts{
Namespace: "loki_canary",
Name: "response_latency",
Help: "is how long it takes for log lines to be returned from Loki in seconds.",
Buckets: prometheus.ExponentialBuckets(0.5, 2, buckets),
})
}

go c.run()

Expand Down

0 comments on commit 383eb6e

Please sign in to comment.