Skip to content

Commit

Permalink
Rewrite sampler to fix oversampling (uber-go#290)
Browse files Browse the repository at this point in the history
Instead of using time.AfterFunc and resetting timers, use the timestamps on the entries and maintain a sliding window for the current "tick" with atomics to track when the counters should be reset. This avoids all allocations and contention on the global timer heap.
  • Loading branch information
prashantv authored and Akshay Shah committed Feb 15, 2017
1 parent a98f6bb commit a5dbac8
Show file tree
Hide file tree
Showing 3 changed files with 158 additions and 82 deletions.
2 changes: 1 addition & 1 deletion benchmarks/zap_bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -237,7 +237,7 @@ func BenchmarkZapSampleCheckWithoutFields(b *testing.B) {
benchEncoder(),
&testutils.Discarder{},
zap.DebugLevel,
), time.Second, 10, 10000))
), 50*time.Millisecond, 10, 10000))
b.ResetTimer()
b.RunParallel(func(pb *testing.PB) {
i := 0
Expand Down
50 changes: 32 additions & 18 deletions zapcore/sampler.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,24 +31,21 @@ const (
_countersPerLevel = 4096
)

func newCounters() *counters {
return &counters{}
type counter struct {
resetAt atomic.Int64
counter atomic.Uint64
}

type counters [_numLevels][_countersPerLevel]atomic.Uint64

func (c *counters) Inc(lvl Level, key string) uint64 {
return c.get(lvl, key).Inc()
}
type counters [_numLevels][_countersPerLevel]counter

func (c *counters) Reset(lvl Level, key string) {
c.get(lvl, key).Store(0)
func newCounters() *counters {
return &counters{}
}

func (c *counters) get(lvl Level, key string) *atomic.Uint64 {
func (cs *counters) get(lvl Level, key string) *counter {
i := lvl - _minLevel
j := fnv32a(key) % _countersPerLevel
return &c[i][j]
return &cs[i][j]
}

// fnv32a, adapted from "hash/fnv", but without a []byte(string) alloc
Expand All @@ -65,6 +62,25 @@ func fnv32a(s string) uint32 {
return hash
}

func (c *counter) IncCheckReset(t time.Time, tick time.Duration) uint64 {
tn := t.UnixNano()
resetAfter := c.resetAt.Load()
if resetAfter > tn {
return c.counter.Inc()
}

c.counter.Store(1)

newResetAfter := tn + tick.Nanoseconds()
if !c.resetAt.CAS(resetAfter, newResetAfter) {
// We raced with another goroutine trying to reset, and it also reset
// the counter to 1, so we need to reincrement the counter.
return c.counter.Inc()
}

return 1
}

// Sample creates a Facility that samples incoming entries, which caps the CPU
// and I/O load of logging while attempting to preserve a representative subset
// of your logs.
Expand Down Expand Up @@ -110,13 +126,11 @@ func (s *sampler) Check(ent Entry, ce *CheckedEntry) *CheckedEntry {
return ce
}

if n := s.counts.Inc(ent.Level, ent.Message); n > s.first {
if n == s.first+1 {
time.AfterFunc(s.tick, func() { s.counts.Reset(ent.Level, ent.Message) })
}
if (n-s.first)%s.thereafter != 0 {
return ce
}
counter := s.counts.get(ent.Level, ent.Message)
n := counter.IncCheckReset(ent.Time, s.tick)
if n > s.first && (n-s.first)%s.thereafter != 0 {
return ce
}

return s.Facility.Check(ent, ce)
}
188 changes: 125 additions & 63 deletions zapcore/sampler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,9 @@ import (
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"go.uber.org/atomic"
"go.uber.org/zap/internal/observer"
"go.uber.org/zap/testutils"
. "go.uber.org/zap/zapcore"
Expand All @@ -40,103 +42,163 @@ func fakeSampler(lvl LevelEnabler, tick time.Duration, first, thereafter int) (F
return fac, &logs
}

func buildExpectation(level Level, nums ...int) []observer.LoggedEntry {
var expected []observer.LoggedEntry
for _, n := range nums {
expected = append(expected, observer.LoggedEntry{
Entry: Entry{Level: level},
Context: []Field{makeInt64Field("iter", n)},
})
func assertSequence(t testing.TB, logs []observer.LoggedEntry, lvl Level, seq ...int64) {
seen := make([]int64, len(logs))
for i, entry := range logs {
require.Equal(t, "", entry.Message, "Message wasn't created by writeSequence.")
require.Equal(t, 1, len(entry.Context), "Unexpected number of fields.")
require.Equal(t, lvl, entry.Level, "Unexpected level.")
f := entry.Context[0]
require.Equal(t, "iter", f.Key, "Unexpected field key.")
require.Equal(t, Int64Type, f.Type, "Unexpected field type")
seen[i] = f.Integer
}
return expected
assert.Equal(t, seq, seen, "Unexpected sequence logged at level %v.", lvl)
}

func writeIter(fac Facility, n int, lvl Level) {
func writeSequence(fac Facility, n int, lvl Level) {
// All tests using writeSequence verify that counters are shared between
// parent and child facilities.
fac = fac.With([]Field{makeInt64Field("iter", n)})
if ce := fac.Check(Entry{Level: lvl}, nil); ce != nil {
if ce := fac.Check(Entry{Level: lvl, Time: time.Now().UTC()}, nil); ce != nil {
ce.Write()
}
}

func TestSampler(t *testing.T) {
for _, lvl := range []Level{DebugLevel, InfoLevel, WarnLevel, ErrorLevel, DPanicLevel} {
for _, lvl := range []Level{DebugLevel, InfoLevel, WarnLevel, ErrorLevel, DPanicLevel, PanicLevel, FatalLevel} {
sampler, logs := fakeSampler(DebugLevel, time.Minute, 2, 3)

// Ensure that counts aren't shared between levels.
probeLevel := DebugLevel
if lvl == DebugLevel {
probeLevel = InfoLevel
}
for i := 0; i < 10; i++ {
writeSequence(sampler, 1, probeLevel)
}
// Clear any output.
logs.TakeAll()

for i := 1; i < 10; i++ {
writeIter(sampler, i, lvl)
writeSequence(sampler, i, lvl)
}
assert.Equal(t, buildExpectation(lvl, 1, 2, 5, 8), logs.All(), "Unexpected output from sampled logger.")
assertSequence(t, logs.TakeAll(), lvl, 1, 2, 5, 8)
}
}

func TestSamplerDisabledLevels(t *testing.T) {
sampler, logs := fakeSampler(InfoLevel, time.Minute, 1, 100)

// Shouldn't be counted, because debug logging isn't enabled.
writeIter(sampler, 1, DebugLevel)
writeIter(sampler, 2, InfoLevel)
expected := buildExpectation(InfoLevel, 2)
assert.Equal(t, expected, logs.All(), "Expected to disregard disabled log levels.")
writeSequence(sampler, 1, DebugLevel)
writeSequence(sampler, 2, InfoLevel)
assertSequence(t, logs.TakeAll(), InfoLevel, 2)
}

func TestSamplerWithSharesCounters(t *testing.T) {
sampler, logs := fakeSampler(DebugLevel, time.Minute, 1, 100)
func TestSamplerTicking(t *testing.T) {
// Ensure that we're resetting the sampler's counter every tick.
sampler, logs := fakeSampler(DebugLevel, 10*time.Millisecond, 5, 10)

first := sampler.With([]Field{makeInt64Field("child", 1)})
for i := 1; i < 10; i++ {
writeIter(first, i, InfoLevel)
// If we log five or fewer messages every tick, none of them should be
// dropped.
for tick := 0; tick < 2; tick++ {
for i := 1; i <= 5; i++ {
writeSequence(sampler, i, InfoLevel)
}
testutils.Sleep(15 * time.Millisecond)
}
second := sampler.With([]Field{makeInt64Field("child", 2)})
// The new child logger should share the same counters, so we don't expect to
// write these logs.
for i := 10; i < 20; i++ {
writeIter(second, i, InfoLevel)
assertSequence(
t,
logs.TakeAll(),
InfoLevel,
1, 2, 3, 4, 5, // first tick
1, 2, 3, 4, 5, // second tick
)

// If we log quickly, we should drop some logs. The first five statements
// each tick should be logged, then every tenth.
for tick := 0; tick < 3; tick++ {
for i := 1; i < 18; i++ {
writeSequence(sampler, i, InfoLevel)
}
testutils.Sleep(10 * time.Millisecond)
}

expected := []observer.LoggedEntry{{
Entry: Entry{Level: InfoLevel},
Context: []Field{makeInt64Field("child", 1), makeInt64Field("iter", 1)},
}}
assert.Equal(t, expected, logs.All(), "Expected child loggers to share counters.")
assertSequence(
t,
logs.TakeAll(),
InfoLevel,
1, 2, 3, 4, 5, 15, // first tick
1, 2, 3, 4, 5, 15, // second tick
1, 2, 3, 4, 5, 15, // third tick
)
}

func TestSamplerTicks(t *testing.T) {
// Ensure that we're resetting the sampler's counter every tick.
sampler, logs := fakeSampler(DebugLevel, time.Millisecond, 1, 1000)

// The first statement should be logged, the second should be skipped but
// start the reset timer, and then we sleep. After sleeping for more than a
// tick, the third statement should be logged.
for i := 1; i < 4; i++ {
if i == 3 {
testutils.Sleep(5 * time.Millisecond)
}
writeIter(sampler, i, InfoLevel)
}
type countingFacility struct {
logs atomic.Uint32
}

expected := buildExpectation(InfoLevel, 1, 3)
assert.Equal(t, expected, logs.All(), "Expected sleeping for a tick to reset sampler.")
func (c *countingFacility) Enabled(Level) bool {
return true
}

func TestSamplerCheck(t *testing.T) {
sampler, logs := fakeSampler(InfoLevel, time.Millisecond, 1, 10)
func (c *countingFacility) Check(ent Entry, ce *CheckedEntry) *CheckedEntry {
return ce.AddFacility(ent, c)
}

t.Run("always drops ignored levels (Debug)", func(t *testing.T) {
assert.Nil(t, sampler.Check(Entry{Level: DebugLevel}, nil), "Expected a nil CheckedMessage at disabled log levels.")
})
func (c *countingFacility) Write(Entry, []Field) error {
c.logs.Inc()
return nil
}

for _, lvl := range []Level{
InfoLevel, WarnLevel, ErrorLevel, PanicLevel, FatalLevel,
} {
t.Run(fmt.Sprintf("samples each level independently (%v)", lvl), func(t *testing.T) {
for i := 1; i < 12; i++ {
if cm := sampler.Check(Entry{Level: lvl}, nil); cm != nil {
cm.Write(makeInt64Field("iter", i))
func (c *countingFacility) With([]Field) Facility {
return c
}

func TestSamplerConcurrent(t *testing.T) {
const (
logsPerTick = 10
numMessages = 5
numTicks = 100
numGoroutines = 10
)

var tick = testutils.Timeout(time.Millisecond)
cf := &countingFacility{}
sampler := Sample(cf, tick, logsPerTick, 100000)

var done atomic.Bool
var wg sync.WaitGroup
for i := 0; i < numGoroutines; i++ {
wg.Add(1)
go func(i int) {
defer wg.Done()

for {
if done.Load() {
return
}
msg := fmt.Sprintf("msg%v", i%numMessages)
ent := Entry{Level: DebugLevel, Message: msg, Time: time.Now()}
if ce := sampler.Check(ent, nil); ce != nil {
ce.Write()
}

// Give a chance for other goroutines to run.
time.Sleep(time.Microsecond)
}
expected := buildExpectation(lvl, 1, 11)
assert.Equal(t, expected, logs.TakeAll(), "Unexpected output when sampling with Check.")
})
}(i)
}

time.AfterFunc(numTicks*tick, func() {
done.Store(true)
})
wg.Wait()

// We expect numMessages*logsPerTick in each tick, and we have 100 ticks.
assert.InDelta(t, numMessages*logsPerTick*numTicks, cf.logs.Load(), 500,
"Unexpected number of logs")
}

func TestSamplerRaces(t *testing.T) {
Expand All @@ -150,7 +212,7 @@ func TestSamplerRaces(t *testing.T) {
go func() {
<-start
for j := 0; j < 100; j++ {
writeIter(sampler, j, InfoLevel)
writeSequence(sampler, j, InfoLevel)
}
wg.Done()
}()
Expand Down

0 comments on commit a5dbac8

Please sign in to comment.