forked from grafana/loki
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathvalidator.go
103 lines (90 loc) · 4.26 KB
/
validator.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
package distributor
import (
"errors"
"net/http"
"strings"
"time"
cortex_client "github.com/cortexproject/cortex/pkg/ingester/client"
"github.com/weaveworks/common/httpgrpc"
"github.com/grafana/loki/pkg/logproto"
"github.com/grafana/loki/pkg/util"
"github.com/grafana/loki/pkg/util/validation"
)
type Validator struct {
Limits
}
func NewValidator(l Limits) (*Validator, error) {
if l == nil {
return nil, errors.New("nil Limits")
}
return &Validator{l}, nil
}
// ValidateEntry returns an error if the entry is invalid
func (v Validator) ValidateEntry(userID string, labels string, entry logproto.Entry) error {
if v.RejectOldSamples(userID) && entry.Timestamp.UnixNano() < time.Now().Add(-v.RejectOldSamplesMaxAge(userID)).UnixNano() {
validation.DiscardedSamples.WithLabelValues(validation.GreaterThanMaxSampleAge, userID).Inc()
validation.DiscardedBytes.WithLabelValues(validation.GreaterThanMaxSampleAge, userID).Add(float64(len(entry.Line)))
return httpgrpc.Errorf(http.StatusBadRequest, validation.GreaterThanMaxSampleAgeErrorMsg(labels, entry.Timestamp))
}
if entry.Timestamp.UnixNano() > time.Now().Add(v.CreationGracePeriod(userID)).UnixNano() {
validation.DiscardedSamples.WithLabelValues(validation.TooFarInFuture, userID).Inc()
validation.DiscardedBytes.WithLabelValues(validation.TooFarInFuture, userID).Add(float64(len(entry.Line)))
return httpgrpc.Errorf(http.StatusBadRequest, validation.TooFarInFutureErrorMsg(labels, entry.Timestamp))
}
if maxSize := v.MaxLineSize(userID); maxSize != 0 && len(entry.Line) > maxSize {
// I wish we didn't return httpgrpc errors here as it seems
// an orthogonal concept (we need not use ValidateLabels in this context)
// but the upstream cortex_validation pkg uses it, so we keep this
// for parity.
validation.DiscardedSamples.WithLabelValues(validation.LineTooLong, userID).Inc()
validation.DiscardedBytes.WithLabelValues(validation.LineTooLong, userID).Add(float64(len(entry.Line)))
return httpgrpc.Errorf(http.StatusBadRequest, validation.LineTooLongErrorMsg(maxSize, len(entry.Line), labels))
}
return nil
}
// Validate labels returns an error if the labels are invalid
func (v Validator) ValidateLabels(userID string, stream logproto.Stream) error {
ls, err := util.ToClientLabels(stream.Labels)
if err != nil {
// I wish we didn't return httpgrpc errors here as it seems
// an orthogonal concept (we need not use ValidateLabels in this context)
// but the upstream cortex_validation pkg uses it, so we keep this
// for parity.
return httpgrpc.Errorf(http.StatusBadRequest, "error parsing labels: %v", err)
}
numLabelNames := len(ls)
if numLabelNames > v.MaxLabelNamesPerSeries(userID) {
validation.DiscardedSamples.WithLabelValues(validation.MaxLabelNamesPerSeries, userID).Inc()
bytes := 0
for _, e := range stream.Entries {
bytes += len(e.Line)
}
validation.DiscardedBytes.WithLabelValues(validation.MaxLabelNamesPerSeries, userID).Add(float64(bytes))
return httpgrpc.Errorf(http.StatusBadRequest, validation.MaxLabelNamesPerSeriesErrorMsg(cortex_client.FromLabelAdaptersToMetric(ls).String(), numLabelNames, v.MaxLabelNamesPerSeries(userID)))
}
maxLabelNameLength := v.MaxLabelNameLength(userID)
maxLabelValueLength := v.MaxLabelValueLength(userID)
lastLabelName := ""
for _, l := range ls {
if len(l.Name) > maxLabelNameLength {
updateMetrics(validation.LabelNameTooLong, userID, stream)
return httpgrpc.Errorf(http.StatusBadRequest, validation.LabelNameTooLongErrorMsg(stream.Labels, l.Name))
} else if len(l.Value) > maxLabelValueLength {
updateMetrics(validation.LabelValueTooLong, userID, stream)
return httpgrpc.Errorf(http.StatusBadRequest, validation.LabelValueTooLongErrorMsg(stream.Labels, l.Value))
} else if cmp := strings.Compare(lastLabelName, l.Name); cmp == 0 {
updateMetrics(validation.DuplicateLabelNames, userID, stream)
return httpgrpc.Errorf(http.StatusBadRequest, validation.DuplicateLabelNamesErrorMsg(stream.Labels, l.Name))
}
lastLabelName = l.Name
}
return nil
}
func updateMetrics(reason, userID string, stream logproto.Stream) {
validation.DiscardedSamples.WithLabelValues(reason, userID).Inc()
bytes := 0
for _, e := range stream.Entries {
bytes += len(e.Line)
}
validation.DiscardedBytes.WithLabelValues(reason, userID).Add(float64(bytes))
}