Skip to content

Commit

Permalink
Validate labels in the distributor. (grafana#251)
Browse files Browse the repository at this point in the history
Also, make ErrOutOfOrder a HTTP 4xx error and continue to append even when we encourter an out of order entry.

Signed-off-by: Tom Wilkie <[email protected]>
  • Loading branch information
tomwilkie authored Jan 31, 2019
1 parent 071ea04 commit ab4c5be
Show file tree
Hide file tree
Showing 7 changed files with 84 additions and 37 deletions.
5 changes: 4 additions & 1 deletion pkg/chunkenc/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,19 @@ package chunkenc
import (
"errors"
"io"
"net/http"
"time"

"github.com/weaveworks/common/httpgrpc"

"github.com/grafana/loki/pkg/iter"
"github.com/grafana/loki/pkg/logproto"
)

// Errors returned by the chunk interface.
var (
ErrChunkFull = errors.New("Chunk full")
ErrOutOfOrder = errors.New("Entry out of order")
ErrOutOfOrder = httpgrpc.Errorf(http.StatusBadRequest, "Entry out of order")
ErrInvalidSize = errors.New("invalid size")
ErrInvalidFlag = errors.New("invalid flag")
ErrInvalidChecksum = errors.New("invalid checksum")
Expand Down
27 changes: 23 additions & 4 deletions pkg/distributor/distributor.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,8 @@ import (

cortex_client "github.com/cortexproject/cortex/pkg/ingester/client"
"github.com/cortexproject/cortex/pkg/ring"
"github.com/cortexproject/cortex/pkg/util"
cortex_util "github.com/cortexproject/cortex/pkg/util"
"github.com/cortexproject/cortex/pkg/util/validation"
opentracing "github.com/opentracing/opentracing-go"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
Expand All @@ -17,6 +18,7 @@ import (

"github.com/grafana/loki/pkg/ingester/client"
"github.com/grafana/loki/pkg/logproto"
"github.com/grafana/loki/pkg/util"
)

var (
Expand Down Expand Up @@ -56,11 +58,12 @@ type Distributor struct {
cfg Config
clientCfg client.Config
ring ring.ReadRing
overrides *validation.Overrides
pool *cortex_client.Pool
}

// New a distributor creates.
func New(cfg Config, clientCfg client.Config, ring ring.ReadRing) (*Distributor, error) {
func New(cfg Config, clientCfg client.Config, ring ring.ReadRing, overrides *validation.Overrides) (*Distributor, error) {
factory := func(addr string) (grpc_health_v1.HealthClient, error) {
return client.New(clientCfg, addr)
}
Expand All @@ -69,7 +72,8 @@ func New(cfg Config, clientCfg client.Config, ring ring.ReadRing) (*Distributor,
cfg: cfg,
clientCfg: clientCfg,
ring: ring,
pool: cortex_client.NewPool(clientCfg.PoolConfig, ring, factory, util.Logger),
overrides: overrides,
pool: cortex_client.NewPool(clientCfg.PoolConfig, ring, factory, cortex_util.Logger),
}, nil
}

Expand Down Expand Up @@ -114,7 +118,13 @@ func (d *Distributor) Push(ctx context.Context, req *logproto.PushRequest) (*log
// We also work out the hash value at the same time.
streams := make([]streamTracker, len(req.Streams))
keys := make([]uint32, 0, len(req.Streams))
var validationErr error
for i, stream := range req.Streams {
if err := d.validateLabels(userID, stream.Labels); err != nil {
validationErr = err
continue
}

keys = append(keys, tokenFor(userID, stream.Labels))
streams[i].stream = stream
}
Expand Down Expand Up @@ -160,10 +170,19 @@ func (d *Distributor) Push(ctx context.Context, req *logproto.PushRequest) (*log
case err := <-tracker.err:
return nil, err
case <-tracker.done:
return &logproto.PushResponse{}, nil
return &logproto.PushResponse{}, validationErr
}
}

func (d *Distributor) validateLabels(userID, labels string) error {
ls, err := util.ToClientLabels(labels)
if err != nil {
return err
}

return d.overrides.ValidateLabels(userID, ls)
}

// TODO taken from Cortex, see if we can refactor out an usable interface.
func (d *Distributor) sendSamples(ctx context.Context, ingester ring.IngesterDesc, streamTrackers []*streamTracker, pushTracker *pushTracker) {
err := d.sendSamplesErr(ctx, ingester, streamTrackers)
Expand Down
29 changes: 8 additions & 21 deletions pkg/ingester/instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,13 @@ import (

"github.com/cortexproject/cortex/pkg/ingester/client"
"github.com/cortexproject/cortex/pkg/ingester/index"
"github.com/cortexproject/cortex/pkg/util/wire"

"github.com/grafana/loki/pkg/helpers"
"github.com/grafana/loki/pkg/iter"
"github.com/grafana/loki/pkg/logproto"
"github.com/grafana/loki/pkg/parser"
"github.com/grafana/loki/pkg/querier"
"github.com/grafana/loki/pkg/util"
)

const queryBatchSize = 128
Expand Down Expand Up @@ -67,10 +67,12 @@ func (i *instance) Push(ctx context.Context, req *logproto.PushRequest) error {
i.streamsMtx.Lock()
defer i.streamsMtx.Unlock()

var appendErr error
for _, s := range req.Streams {
labels, err := toClientLabels(s.Labels)
labels, err := util.ToClientLabels(s.Labels)
if err != nil {
return err
appendErr = err
continue
}

fp := client.FastFingerprint(labels)
Expand All @@ -83,11 +85,12 @@ func (i *instance) Push(ctx context.Context, req *logproto.PushRequest) error {
}

if err := stream.Push(ctx, s.Entries); err != nil {
return err
appendErr = err
continue
}
}

return nil
return appendErr
}

func (i *instance) Query(req *logproto.QueryRequest, queryServer logproto.Querier_QueryServer) error {
Expand Down Expand Up @@ -183,19 +186,3 @@ func sendBatches(i iter.EntryIterator, queryServer logproto.Querier_QueryServer,
}
return nil
}

func toClientLabels(labels string) ([]client.LabelPair, error) {
ls, err := parser.Labels(labels)
if err != nil {
return nil, err
}

pairs := make([]client.LabelPair, 0, len(ls))
for i := 0; i < len(ls); i++ {
pairs = append(pairs, client.LabelPair{
Name: wire.Bytes(ls[i].Name),
Value: wire.Bytes(ls[i].Value),
})
}
return pairs, nil
}
7 changes: 5 additions & 2 deletions pkg/ingester/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,9 @@ func (s *stream) Push(_ context.Context, entries []logproto.Entry) error {
chunksCreatedTotal.Inc()
}

// Don't fail on the first append error - if samples are sent out of order,
// we still want to append the later ones.
var appendErr error
for i := range entries {
if s.chunks[0].closed || !s.chunks[0].chunk.SpaceFor(&entries[i]) {
samplesPerChunk.Observe(float64(s.chunks[0].chunk.Size()))
Expand All @@ -78,11 +81,11 @@ func (s *stream) Push(_ context.Context, entries []logproto.Entry) error {
chunksCreatedTotal.Inc()
}
if err := s.chunks[len(s.chunks)-1].chunk.Append(&entries[i]); err != nil {
return err
appendErr = err
}
}

return nil
return appendErr
}

// Returns an iterator.
Expand Down
1 change: 1 addition & 0 deletions pkg/loki/loki.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ type Loki struct {

server *server.Server
ring *ring.Ring
overrides *validation.Overrides
distributor *distributor.Distributor
ingester *ingester.Ingester
querier *querier.Querier
Expand Down
28 changes: 19 additions & 9 deletions pkg/loki/modules.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ type moduleName int
// The various modules that make up Loki.
const (
Ring moduleName = iota
Overrides
Server
Distributor
Ingester
Expand All @@ -38,6 +39,8 @@ func (m moduleName) String() string {
switch m {
case Ring:
return "ring"
case Overrides:
return "overrides"
case Server:
return "server"
case Distributor:
Expand All @@ -60,6 +63,9 @@ func (m *moduleName) Set(s string) error {
case "ring":
*m = Ring
return nil
case "overrides":
*m = Overrides
return nil
case "server":
*m = Server
return nil
Expand Down Expand Up @@ -97,8 +103,13 @@ func (t *Loki) initRing() (err error) {
return
}

func (t *Loki) initOverrides() (err error) {
t.overrides, err = validation.NewOverrides(t.cfg.LimitsConfig)
return err
}

func (t *Loki) initDistributor() (err error) {
t.distributor, err = distributor.New(t.cfg.Distributor, t.cfg.IngesterClient, t.ring)
t.distributor, err = distributor.New(t.cfg.Distributor, t.cfg.IngesterClient, t.ring, t.overrides)
if err != nil {
return
}
Expand Down Expand Up @@ -158,13 +169,7 @@ func (t *Loki) stopIngester() error {
}

func (t *Loki) initStore() (err error) {
var overrides *validation.Overrides
overrides, err = validation.NewOverrides(t.cfg.LimitsConfig)
if err != nil {
return err
}

t.store, err = storage.NewStore(t.cfg.StorageConfig, t.cfg.ChunkStoreConfig, t.cfg.SchemaConfig, overrides)
t.store, err = storage.NewStore(t.cfg.StorageConfig, t.cfg.ChunkStoreConfig, t.cfg.SchemaConfig, t.overrides)
return
}

Expand All @@ -189,12 +194,17 @@ var modules = map[moduleName]module{
init: (*Loki).initRing,
},

Overrides: {
init: (*Loki).initOverrides,
},

Distributor: {
deps: []moduleName{Ring, Server},
deps: []moduleName{Ring, Server, Overrides},
init: (*Loki).initDistributor,
},

Store: {
deps: []moduleName{Overrides},
init: (*Loki).initStore,
stop: (*Loki).stopStore,
},
Expand Down
24 changes: 24 additions & 0 deletions pkg/util/conv.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package util

import (
"github.com/cortexproject/cortex/pkg/ingester/client"
"github.com/cortexproject/cortex/pkg/util/wire"
"github.com/grafana/loki/pkg/parser"
)

// ToClientLabels parses the labels and converts them to the Cortex type.
func ToClientLabels(labels string) ([]client.LabelPair, error) {
ls, err := parser.Labels(labels)
if err != nil {
return nil, err
}

pairs := make([]client.LabelPair, 0, len(ls))
for i := 0; i < len(ls); i++ {
pairs = append(pairs, client.LabelPair{
Name: wire.Bytes(ls[i].Name),
Value: wire.Bytes(ls[i].Value),
})
}
return pairs, nil
}

0 comments on commit ab4c5be

Please sign in to comment.