Skip to content

Commit

Permalink
profiler: v3 upload and metrics (DataDog#781)
Browse files Browse the repository at this point in the history
* profile V3 upload format

The new format replaces ordinal parallel arrays for the attachments,
substituting canonical names for the attachment types.

- heap
- cpu
- block
- mutex
- goroutines

Example upload
```bash
 curl -i \
  -H "DD-API-KEY:redacted" \
  -F recording-start=$(date -u --iso-8601=seconds | sed 's/\+.*/Z/') \
  -F recording-end=$(date -d '+1 min' -u --iso-8601=seconds | sed 's/\+.*/Z/') \
  -F tags\[\]=host:test-host \
  -F tags\[\]=service:test-service \
  -F tags\[\]=language:go \
  -F data\[heap.pprof\][email protected] \
  -F data\[cpu.pprof\][email protected] \
  -F data\[goroutines.pprof\][email protected] \
  -F data\[block.pprof\][email protected] \
  -F data\[mutex.pprof\][email protected] \
  -F version=3 \
  -F family=go \
  https://intake.profile.datadoghq.com/v1/input
```

* drive-by clarifying comment on batch end field

* gofmt fixes

* profiles: add custom metrics payload (DataDog#801)

* normalize upload filenames and string ids

* add metric profile and heap metrics

* add gc metrics

* fixup copyright date
  • Loading branch information
pmbauer authored Jan 7, 2021
1 parent 4ce6b1a commit 634b231
Show file tree
Hide file tree
Showing 10 changed files with 398 additions and 82 deletions.
130 changes: 130 additions & 0 deletions profiler/metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
// Unless explicitly stated otherwise all files in this repository are licensed
// under the Apache License Version 2.0.
// This product includes software developed at Datadog (https://www.datadoghq.com/).
// Copyright 2016 Datadog, Inc.

package profiler

import (
"bytes"
"encoding/json"
"fmt"
"math"
"runtime"
"time"
)

type point struct {
metric string
value float64
}

// MarshalJSON serialize points as array tuples
func (p point) MarshalJSON() ([]byte, error) {
return json.Marshal([]interface{}{
p.metric,
p.value,
})
}

type collectionTooFrequent struct {
min time.Duration
observed time.Duration
}

func (e collectionTooFrequent) Error() string {
return fmt.Sprintf("period between metrics collection is too small min=%v observed=%v", e.min, e.observed)
}

type metrics struct {
collectedAt time.Time
stats runtime.MemStats
compute func(*runtime.MemStats, *runtime.MemStats, time.Duration, time.Time) []point
}

func newMetrics() *metrics {
return &metrics{
compute: computeMetrics,
}
}

func (m *metrics) reset(now time.Time) {
m.collectedAt = now
runtime.ReadMemStats(&m.stats)
}

func (m *metrics) report(now time.Time, buf *bytes.Buffer) error {
period := now.Sub(m.collectedAt)

if period < time.Second {
// Profiler could be mis-configured to report more frequently than every second
// or a system clock issue causes time to run backwards.
// We can't emit valid metrics in either case.
return collectionTooFrequent{min: time.Second, observed: period}
}

previousStats := m.stats
m.reset(now)

points := m.compute(&previousStats, &m.stats, period, now)
data, err := json.Marshal(removeInvalid(points))

if err != nil {
// NB the minimum period check and removeInvalid ensures we don't hit this case
return err
}

if _, err := buf.Write(data); err != nil {
return err
}

return nil
}

func computeMetrics(prev *runtime.MemStats, curr *runtime.MemStats, period time.Duration, now time.Time) []point {
return []point{
{metric: "go_alloc_bytes_per_sec", value: rate(curr.TotalAlloc, prev.TotalAlloc, period/time.Second)},
{metric: "go_allocs_per_sec", value: rate(curr.Mallocs, prev.Mallocs, period/time.Second)},
{metric: "go_frees_per_sec", value: rate(curr.Frees, prev.Frees, period/time.Second)},
{metric: "go_heap_growth_bytes_per_sec", value: rate(curr.HeapAlloc, prev.HeapAlloc, period/time.Second)},
{metric: "go_gcs_per_sec", value: rate(uint64(curr.NumGC), uint64(prev.NumGC), period/time.Second)},
{metric: "go_gc_pause_time", value: rate(curr.PauseTotalNs, prev.PauseTotalNs, period)}, // % of time spent paused
{metric: "go_max_gc_pause_time", value: float64(maxPauseNs(curr, now.Add(-period)))},
}
}

func rate(curr, prev uint64, period time.Duration) float64 {
return float64(int64(curr)-int64(prev)) / float64(period)
}

// maxPauseNs returns maximum pause time within the recent period, assumes stats populated at period end
func maxPauseNs(stats *runtime.MemStats, periodStart time.Time) (max uint64) {
// NB
// stats.PauseEnd is a circular buffer of recent GC pause end times as nanoseconds since the epoch.
// stats.PauseNs is a circular buffer of recent GC pause times in nanoseconds.
// The most recent pause is indexed by (stats.NumGC+255)%256

for i := stats.NumGC + 255; i >= stats.NumGC; i-- {
// Stop searching if we find a PauseEnd outside the period
if time.Unix(0, int64(stats.PauseEnd[i%256])).Before(periodStart) {
break
}
if stats.PauseNs[i%256] > max {
max = stats.PauseNs[i%256]
}
}
return max
}

// removeInvalid removes NaN and +/-Inf values as they can't be json-serialized
// This is an extra safety check to ensure we don't emit bad data in case of
// a metric computation coding error
func removeInvalid(points []point) (result []point) {
for _, p := range points {
if math.IsNaN(p.value) || math.IsInf(p.value, 0) {
continue
}
result = append(result, p)
}
return result
}
156 changes: 156 additions & 0 deletions profiler/metrics_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,156 @@
// Unless explicitly stated otherwise all files in this repository are licensed
// under the Apache License Version 2.0.
// This product includes software developed at Datadog (https://www.datadoghq.com/).
// Copyright 2016 Datadog, Inc.

package profiler

import (
"bytes"
"math"
"runtime"
"testing"
"time"

"github.com/stretchr/testify/assert"
)

func newTestMetrics(now time.Time) *metrics {
m := newMetrics()
m.reset(now)
return m
}

func valsRing(vals ...time.Duration) [256]uint64 {
var ring [256]uint64
for i := 0; i < len(vals) && i < 256; i++ {
ring[i] = uint64(vals[i])
}
return ring
}

func timeRing(vals ...time.Time) [256]uint64 {
var ring [256]uint64
for i := 0; i < len(vals) && i < 256; i++ {
ring[i] = uint64(vals[i].UnixNano())
}
return ring
}

func TestMetricsCompute(t *testing.T) {
now := now()
prev := runtime.MemStats{
TotalAlloc: 100,
Mallocs: 10,
Frees: 2,
HeapAlloc: 75,
NumGC: 1,
PauseTotalNs: uint64(2 * time.Second),
PauseEnd: timeRing(now.Add(-11 * time.Second)),
PauseNs: valsRing(2 * time.Second),
}
curr := runtime.MemStats{
TotalAlloc: 150,
Mallocs: 14,
Frees: 30,
HeapAlloc: 50,
NumGC: 3,
PauseTotalNs: uint64(3 * time.Second),
PauseEnd: timeRing(now.Add(-11*time.Second), now.Add(-9*time.Second), now.Add(-time.Second)),
PauseNs: valsRing(time.Second, time.Second/2, time.Second/2),
}

assert.Equal(t,
[]point{
{metric: "go_alloc_bytes_per_sec", value: 5},
{metric: "go_allocs_per_sec", value: 0.4},
{metric: "go_frees_per_sec", value: 2.8},
{metric: "go_heap_growth_bytes_per_sec", value: -2.5},
{metric: "go_gcs_per_sec", value: 0.2},
{metric: "go_gc_pause_time", value: 0.1}, // % of time spent paused
{metric: "go_max_gc_pause_time", value: float64(time.Second / 2)},
},
computeMetrics(&prev, &curr, 10*time.Second, now))

assert.Equal(t,
[]point{
{metric: "go_alloc_bytes_per_sec", value: 0},
{metric: "go_allocs_per_sec", value: 0},
{metric: "go_frees_per_sec", value: 0},
{metric: "go_heap_growth_bytes_per_sec", value: 0},
{metric: "go_gcs_per_sec", value: 0},
{metric: "go_gc_pause_time", value: 0},
{metric: "go_max_gc_pause_time", value: 0},
},
computeMetrics(&prev, &prev, 10*time.Second, now),
"identical memstats")
}

func TestMetricsMaxPauseNs(t *testing.T) {
start := now()

assert.Equal(t, uint64(0),
maxPauseNs(&runtime.MemStats{}, start),
"max is 0 for empty pause buffers")

assert.Equal(t, uint64(time.Second),
maxPauseNs(
&runtime.MemStats{
NumGC: 3,
PauseNs: valsRing(time.Minute, time.Second, time.Millisecond),
PauseEnd: timeRing(start.Add(-1), start, start.Add(1)),
},
start,
),
"only values in period are considered")

assert.Equal(t, uint64(time.Minute),
maxPauseNs(
&runtime.MemStats{
NumGC: 1000,
PauseNs: valsRing(time.Second, time.Minute, time.Millisecond),
PauseEnd: timeRing(),
},
time.Unix(0, 0),
),
"should terminate if all values are in period")
}

func TestMetricsReport(t *testing.T) {
now := now()
var err error
var buf bytes.Buffer
m := newTestMetrics(now)

m.compute = func(_ *runtime.MemStats, _ *runtime.MemStats, _ time.Duration, _ time.Time) []point {
return []point{
{metric: "metric_name", value: 1.1},
{metric: "does_not_include_NaN", value: math.NaN()},
{metric: "does_not_include_+Inf", value: math.Inf(1)},
{metric: "does_not_include_-Inf", value: math.Inf(-1)},
}
}

err = m.report(now.Add(time.Second), &buf)
assert.NoError(t, err)
assert.Equal(t, "[[\"metric_name\",1.1]]", buf.String())
}

func TestMetricsCollectFrequency(t *testing.T) {
now := now()
var err error
var buf bytes.Buffer
m := newTestMetrics(now)

err = m.report(now.Add(-time.Second), &buf)
assert.Error(t, err, "collection call times must be monotonically increasing")
assert.Empty(t, buf)

err = m.report(now.Add(time.Second-1), &buf)
assert.Error(t, err, "must be at least one second between collection calls")
assert.Empty(t, buf)

err = m.report(now.Add(time.Second), &buf)
assert.NoError(t, err, "one second between calls should work")
assert.NotEmpty(t, buf)
}
3 changes: 2 additions & 1 deletion profiler/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ var defaultClient = &http.Client{
Timeout: defaultHTTPTimeout,
}

var defaultProfileTypes = []ProfileType{CPUProfile, HeapProfile}
var defaultProfileTypes = []ProfileType{MetricsProfile, CPUProfile, HeapProfile}

type config struct {
apiKey string
Expand Down Expand Up @@ -224,6 +224,7 @@ func WithProfileTypes(types ...ProfileType) Option {
for k := range cfg.types {
delete(cfg.types, k)
}
cfg.addProfileType(MetricsProfile) // always report metrics
for _, t := range types {
cfg.addProfileType(t)
}
Expand Down
2 changes: 1 addition & 1 deletion profiler/options_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ func TestOptions(t *testing.T) {
WithProfileTypes(HeapProfile)(&cfg)
_, ok := cfg.types[HeapProfile]
assert.True(t, ok)
assert.Len(t, cfg.types, 1)
assert.Len(t, cfg.types, 2)
})

t.Run("WithService", func(t *testing.T) {
Expand Down
Loading

0 comments on commit 634b231

Please sign in to comment.