Skip to content

Commit

Permalink
Include the stream's labels in OOO error responses. (grafana#304)
Browse files Browse the repository at this point in the history
* Include the stream's labels in OOO error responses.

Also, read the body of error responses and log them.  And retries 500s and connection errors.

Signed-off-by: Tom Wilkie <[email protected]>

* Log close errors.

Signed-off-by: Tom Wilkie <[email protected]>

* Update pkg/promtail/client/client.go

Co-Authored-By: tomwilkie <[email protected]>

* Log retries at warn, final errors at error.

Signed-off-by: Tom Wilkie <[email protected]>
  • Loading branch information
tomwilkie authored Feb 11, 2019
1 parent 61fc02a commit 40ae03b
Show file tree
Hide file tree
Showing 3 changed files with 85 additions and 35 deletions.
7 changes: 2 additions & 5 deletions pkg/chunkenc/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,19 +3,16 @@ package chunkenc
import (
"errors"
"io"
"net/http"
"time"

"github.com/weaveworks/common/httpgrpc"

"github.com/grafana/loki/pkg/iter"
"github.com/grafana/loki/pkg/logproto"
)

// Errors returned by the chunk interface.
var (
ErrChunkFull = errors.New("Chunk full")
ErrOutOfOrder = httpgrpc.Errorf(http.StatusBadRequest, "Entry out of order")
ErrChunkFull = errors.New("chunk full")
ErrOutOfOrder = errors.New("entry out of order")
ErrInvalidSize = errors.New("invalid size")
ErrInvalidFlag = errors.New("invalid flag")
ErrInvalidChecksum = errors.New("invalid checksum")
Expand Down
6 changes: 6 additions & 0 deletions pkg/ingester/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,13 @@ package ingester

import (
"context"
"net/http"
"time"

"github.com/cortexproject/cortex/pkg/ingester/client"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/model"
"github.com/weaveworks/common/httpgrpc"

"github.com/grafana/loki/pkg/chunkenc"
"github.com/grafana/loki/pkg/iter"
Expand Down Expand Up @@ -93,6 +95,10 @@ func (s *stream) Push(_ context.Context, entries []logproto.Entry) error {
chunk.lastUpdated = entries[i].Timestamp
}

if appendErr == chunkenc.ErrOutOfOrder {
return httpgrpc.Errorf(http.StatusBadRequest, "entry out of order for stream: %s", client.FromLabelPairsToLabels(s.labels).String())
}

return appendErr
}

Expand Down
107 changes: 77 additions & 30 deletions pkg/promtail/client/client.go
Original file line number Diff line number Diff line change
@@ -1,27 +1,32 @@
package client

import (
"bufio"
"bytes"
"context"
"flag"
"fmt"
"io"
"net/http"
"strconv"
"sync"
"time"

"github.com/cortexproject/cortex/pkg/util"
"github.com/cortexproject/cortex/pkg/util/flagext"
"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
"github.com/gogo/protobuf/proto"
"github.com/golang/snappy"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/model"

"github.com/cortexproject/cortex/pkg/util/flagext"

"github.com/grafana/loki/pkg/helpers"
"github.com/grafana/loki/pkg/logproto"
)

const contentType = "application/x-protobuf"
const maxErrMsgLen = 1024

var (
sentBytes = prometheus.NewCounter(prometheus.CounterOpts{
Expand All @@ -47,14 +52,19 @@ type Config struct {
BatchWait time.Duration
BatchSize int

ExternalLabels model.LabelSet `yaml:"external_labels,omitempty"`
BackoffConfig util.BackoffConfig `yaml:"backoff_config"`
ExternalLabels model.LabelSet `yaml:"external_labels,omitempty"`
}

// RegisterFlags registers flags.
func (c *Config) RegisterFlags(flags *flag.FlagSet) {
flags.Var(&c.URL, "client.url", "URL of log server")
flags.DurationVar(&c.BatchWait, "client.batch-wait", 1*time.Second, "Maximum wait period before sending batch.")
flags.IntVar(&c.BatchSize, "client.batch-size-bytes", 100*1024, "Maximum batch size to accrue before sending. ")

flag.IntVar(&c.BackoffConfig.MaxRetries, "client.max-retries", 5, "Maximum number of retires when sending batches.")
flag.DurationVar(&c.BackoffConfig.MinBackoff, "client.min-backoff", 100*time.Millisecond, "Initial backoff time between retries.")
flag.DurationVar(&c.BackoffConfig.MaxBackoff, "client.max-backoff", 5*time.Second, "Maximum backoff time between retries.")
}

// Client for pushing logs in snappy-compressed protos over HTTP.
Expand All @@ -76,10 +86,11 @@ type entry struct {
// New makes a new Client.
func New(cfg Config, logger log.Logger) (*Client, error) {
c := &Client{
logger: logger,
cfg: cfg,
quit: make(chan struct{}),
entries: make(chan entry),
logger: logger,
cfg: cfg,
quit: make(chan struct{}),
entries: make(chan entry),

externalLabels: cfg.ExternalLabels,
}
c.wg.Add(1)
Expand All @@ -93,9 +104,7 @@ func (c *Client) run() {
maxWait := time.NewTimer(c.cfg.BatchWait)

defer func() {
if err := c.send(batch); err != nil {
level.Error(c.logger).Log("msg", "error sending batch", "error", err)
}
c.sendBatch(batch)
c.wg.Done()
}()

Expand All @@ -104,11 +113,10 @@ func (c *Client) run() {
select {
case <-c.quit:
return

case e := <-c.entries:
if batchSize+len(e.Line) > c.cfg.BatchSize {
if err := c.send(batch); err != nil {
level.Error(c.logger).Log("msg", "error sending batch", "error", err)
}
c.sendBatch(batch)
batchSize = 0
batch = map[model.Fingerprint]*logproto.Stream{}
}
Expand All @@ -123,49 +131,88 @@ func (c *Client) run() {
batch[fp] = stream
}
stream.Entries = append(stream.Entries, e.Entry)

case <-maxWait.C:
if len(batch) > 0 {
if err := c.send(batch); err != nil {
level.Error(c.logger).Log("msg", "error sending batch", "error", err)
}
c.sendBatch(batch)
batchSize = 0
batch = map[model.Fingerprint]*logproto.Stream{}
}
}
}
}

func (c *Client) send(batch map[model.Fingerprint]*logproto.Stream) error {
func (c *Client) sendBatch(batch map[model.Fingerprint]*logproto.Stream) {
buf, err := encodeBatch(batch)
if err != nil {
level.Error(c.logger).Log("msg", "error encoding batch", "error", err)
return
}
sentBytes.Add(float64(len(buf)))

ctx := context.Background()
backoff := util.NewBackoff(ctx, c.cfg.BackoffConfig)
var status int
for backoff.Ongoing() {
start := time.Now()
status, err = c.send(ctx, buf)
requestDuration.WithLabelValues(strconv.Itoa(status)).Observe(time.Since(start).Seconds())
if err == nil {
return
}

// Only retry 500s and connection-level errors.
if status > 0 && status/100 != 5 {
break
}

level.Warn(c.logger).Log("msg", "error sending batch", "status", status, "error", err)
backoff.Wait()
}

if err != nil {
level.Error(c.logger).Log("msg", "final error sending batch", "status", status, "error", err)
}
}

func encodeBatch(batch map[model.Fingerprint]*logproto.Stream) ([]byte, error) {
req := logproto.PushRequest{
Streams: make([]*logproto.Stream, 0, len(batch)),
}
count := 0
for _, stream := range batch {
req.Streams = append(req.Streams, stream)
count += len(stream.Entries)
}
buf, err := proto.Marshal(&req)
if err != nil {
return err
return nil, err
}
buf = snappy.Encode(nil, buf)
sentBytes.Add(float64(len(buf)))
return buf, nil
}

start := time.Now()
resp, err := http.Post(c.cfg.URL.String(), contentType, bytes.NewReader(buf))
func (c *Client) send(ctx context.Context, buf []byte) (int, error) {
req, err := http.NewRequest("POST", c.cfg.URL.String(), bytes.NewReader(buf))
if err != nil {
requestDuration.WithLabelValues("failed").Observe(time.Since(start).Seconds())
return err
return -1, err
}
if err := resp.Body.Close(); err != nil {
return err
req = req.WithContext(ctx)
req.Header.Set("Content-Type", contentType)

resp, err := http.DefaultClient.Do(req)
if err != nil {
return -1, err
}
requestDuration.WithLabelValues(strconv.Itoa(resp.StatusCode)).Observe(time.Since(start).Seconds())
defer helpers.LogError("closing response body", resp.Body.Close)

if resp.StatusCode/100 != 2 {
return fmt.Errorf("Error doing write: %d - %s", resp.StatusCode, resp.Status)
scanner := bufio.NewScanner(io.LimitReader(resp.Body, maxErrMsgLen))
line := ""
if scanner.Scan() {
line = scanner.Text()
}
err = fmt.Errorf("server returned HTTP status %s (%d): %s", resp.Status, resp.StatusCode, line)
}
return nil
return resp.StatusCode, err
}

// Stop the client.
Expand Down

0 comments on commit 40ae03b

Please sign in to comment.