From 634b23176a259a4274f46d4c2793d478717d3b7c Mon Sep 17 00:00:00 2001
From: Paul Bauer <paul.bauer@datadoghq.com>
Date: Thu, 7 Jan 2021 15:32:41 -0500
Subject: [PATCH] profiler: v3 upload and metrics (#781)

* profile V3 upload format

The new format replaces ordinal parallel arrays for the attachments,
substituting canonical names for the attachment types.

- heap
- cpu
- block
- mutex
- goroutines

Example upload
```bash
 curl -i \
  -H "DD-API-KEY:redacted" \
  -F recording-start=$(date -u --iso-8601=seconds | sed 's/\+.*/Z/') \
  -F recording-end=$(date -d '+1 min' -u --iso-8601=seconds | sed 's/\+.*/Z/') \
  -F tags\[\]=host:test-host \
  -F tags\[\]=service:test-service \
  -F tags\[\]=language:go \
  -F data\[heap.pprof\]=@heap.pprof \
  -F data\[cpu.pprof\]=@cpu.pprof \
  -F data\[goroutines.pprof\]=@goroutines.pprof \
  -F data\[block.pprof\]=@block.pprof \
  -F data\[mutex.pprof\]=@mutex.pprof \
  -F version=3 \
  -F family=go \
  https://intake.profile.datadoghq.com/v1/input
```

* drive-by clarifying comment on batch end field

* gofmt fixes

* profiles: add custom metrics payload (#801)

* normalize upload filenames and string ids

* add metric profile and heap metrics

* add gc metrics

* fixup copyright date
---
 profiler/metrics.go       | 130 +++++++++++++++++++++++++++++++
 profiler/metrics_test.go  | 156 ++++++++++++++++++++++++++++++++++++++
 profiler/options.go       |   3 +-
 profiler/options_test.go  |   2 +-
 profiler/profile.go       |  92 ++++++++++++++++------
 profiler/profile_test.go  |  20 ++---
 profiler/profiler.go      |  14 +++-
 profiler/profiler_test.go |  17 +----
 profiler/upload.go        |  16 ++--
 profiler/upload_test.go   |  30 ++++----
 10 files changed, 398 insertions(+), 82 deletions(-)
 create mode 100644 profiler/metrics.go
 create mode 100644 profiler/metrics_test.go

diff --git a/profiler/metrics.go b/profiler/metrics.go
new file mode 100644
index 0000000000..e9be511ef9
--- /dev/null
+++ b/profiler/metrics.go
@@ -0,0 +1,130 @@
+// Unless explicitly stated otherwise all files in this repository are licensed
+// under the Apache License Version 2.0.
+// This product includes software developed at Datadog (https://www.datadoghq.com/).
+// Copyright 2016 Datadog, Inc.
+
+package profiler
+
+import (
+	"bytes"
+	"encoding/json"
+	"fmt"
+	"math"
+	"runtime"
+	"time"
+)
+
+type point struct {
+	metric string
+	value  float64
+}
+
+// MarshalJSON serialize points as array tuples
+func (p point) MarshalJSON() ([]byte, error) {
+	return json.Marshal([]interface{}{
+		p.metric,
+		p.value,
+	})
+}
+
+type collectionTooFrequent struct {
+	min      time.Duration
+	observed time.Duration
+}
+
+func (e collectionTooFrequent) Error() string {
+	return fmt.Sprintf("period between metrics collection is too small min=%v observed=%v", e.min, e.observed)
+}
+
+type metrics struct {
+	collectedAt time.Time
+	stats       runtime.MemStats
+	compute     func(*runtime.MemStats, *runtime.MemStats, time.Duration, time.Time) []point
+}
+
+func newMetrics() *metrics {
+	return &metrics{
+		compute: computeMetrics,
+	}
+}
+
+func (m *metrics) reset(now time.Time) {
+	m.collectedAt = now
+	runtime.ReadMemStats(&m.stats)
+}
+
+func (m *metrics) report(now time.Time, buf *bytes.Buffer) error {
+	period := now.Sub(m.collectedAt)
+
+	if period < time.Second {
+		// Profiler could be mis-configured to report more frequently than every second
+		// or a system clock issue causes time to run backwards.
+		// We can't emit valid metrics in either case.
+		return collectionTooFrequent{min: time.Second, observed: period}
+	}
+
+	previousStats := m.stats
+	m.reset(now)
+
+	points := m.compute(&previousStats, &m.stats, period, now)
+	data, err := json.Marshal(removeInvalid(points))
+
+	if err != nil {
+		// NB the minimum period check and removeInvalid ensures we don't hit this case
+		return err
+	}
+
+	if _, err := buf.Write(data); err != nil {
+		return err
+	}
+
+	return nil
+}
+
+func computeMetrics(prev *runtime.MemStats, curr *runtime.MemStats, period time.Duration, now time.Time) []point {
+	return []point{
+		{metric: "go_alloc_bytes_per_sec", value: rate(curr.TotalAlloc, prev.TotalAlloc, period/time.Second)},
+		{metric: "go_allocs_per_sec", value: rate(curr.Mallocs, prev.Mallocs, period/time.Second)},
+		{metric: "go_frees_per_sec", value: rate(curr.Frees, prev.Frees, period/time.Second)},
+		{metric: "go_heap_growth_bytes_per_sec", value: rate(curr.HeapAlloc, prev.HeapAlloc, period/time.Second)},
+		{metric: "go_gcs_per_sec", value: rate(uint64(curr.NumGC), uint64(prev.NumGC), period/time.Second)},
+		{metric: "go_gc_pause_time", value: rate(curr.PauseTotalNs, prev.PauseTotalNs, period)}, // % of time spent paused
+		{metric: "go_max_gc_pause_time", value: float64(maxPauseNs(curr, now.Add(-period)))},
+	}
+}
+
+func rate(curr, prev uint64, period time.Duration) float64 {
+	return float64(int64(curr)-int64(prev)) / float64(period)
+}
+
+// maxPauseNs returns maximum pause time within the recent period, assumes stats populated at period end
+func maxPauseNs(stats *runtime.MemStats, periodStart time.Time) (max uint64) {
+	// NB
+	// stats.PauseEnd is a circular buffer of recent GC pause end times as nanoseconds since the epoch.
+	// stats.PauseNs is a circular buffer of recent GC pause times in nanoseconds.
+	// The most recent pause is indexed by (stats.NumGC+255)%256
+
+	for i := stats.NumGC + 255; i >= stats.NumGC; i-- {
+		// Stop searching if we find a PauseEnd outside the period
+		if time.Unix(0, int64(stats.PauseEnd[i%256])).Before(periodStart) {
+			break
+		}
+		if stats.PauseNs[i%256] > max {
+			max = stats.PauseNs[i%256]
+		}
+	}
+	return max
+}
+
+// removeInvalid removes NaN and +/-Inf values as they can't be json-serialized
+// This is an extra safety check to ensure we don't emit bad data in case of
+// a metric computation coding error
+func removeInvalid(points []point) (result []point) {
+	for _, p := range points {
+		if math.IsNaN(p.value) || math.IsInf(p.value, 0) {
+			continue
+		}
+		result = append(result, p)
+	}
+	return result
+}
diff --git a/profiler/metrics_test.go b/profiler/metrics_test.go
new file mode 100644
index 0000000000..bcf380a10d
--- /dev/null
+++ b/profiler/metrics_test.go
@@ -0,0 +1,156 @@
+// Unless explicitly stated otherwise all files in this repository are licensed
+// under the Apache License Version 2.0.
+// This product includes software developed at Datadog (https://www.datadoghq.com/).
+// Copyright 2016 Datadog, Inc.
+
+package profiler
+
+import (
+	"bytes"
+	"math"
+	"runtime"
+	"testing"
+	"time"
+
+	"github.com/stretchr/testify/assert"
+)
+
+func newTestMetrics(now time.Time) *metrics {
+	m := newMetrics()
+	m.reset(now)
+	return m
+}
+
+func valsRing(vals ...time.Duration) [256]uint64 {
+	var ring [256]uint64
+	for i := 0; i < len(vals) && i < 256; i++ {
+		ring[i] = uint64(vals[i])
+	}
+	return ring
+}
+
+func timeRing(vals ...time.Time) [256]uint64 {
+	var ring [256]uint64
+	for i := 0; i < len(vals) && i < 256; i++ {
+		ring[i] = uint64(vals[i].UnixNano())
+	}
+	return ring
+}
+
+func TestMetricsCompute(t *testing.T) {
+	now := now()
+	prev := runtime.MemStats{
+		TotalAlloc:   100,
+		Mallocs:      10,
+		Frees:        2,
+		HeapAlloc:    75,
+		NumGC:        1,
+		PauseTotalNs: uint64(2 * time.Second),
+		PauseEnd:     timeRing(now.Add(-11 * time.Second)),
+		PauseNs:      valsRing(2 * time.Second),
+	}
+	curr := runtime.MemStats{
+		TotalAlloc:   150,
+		Mallocs:      14,
+		Frees:        30,
+		HeapAlloc:    50,
+		NumGC:        3,
+		PauseTotalNs: uint64(3 * time.Second),
+		PauseEnd:     timeRing(now.Add(-11*time.Second), now.Add(-9*time.Second), now.Add(-time.Second)),
+		PauseNs:      valsRing(time.Second, time.Second/2, time.Second/2),
+	}
+
+	assert.Equal(t,
+		[]point{
+			{metric: "go_alloc_bytes_per_sec", value: 5},
+			{metric: "go_allocs_per_sec", value: 0.4},
+			{metric: "go_frees_per_sec", value: 2.8},
+			{metric: "go_heap_growth_bytes_per_sec", value: -2.5},
+			{metric: "go_gcs_per_sec", value: 0.2},
+			{metric: "go_gc_pause_time", value: 0.1}, // % of time spent paused
+			{metric: "go_max_gc_pause_time", value: float64(time.Second / 2)},
+		},
+		computeMetrics(&prev, &curr, 10*time.Second, now))
+
+	assert.Equal(t,
+		[]point{
+			{metric: "go_alloc_bytes_per_sec", value: 0},
+			{metric: "go_allocs_per_sec", value: 0},
+			{metric: "go_frees_per_sec", value: 0},
+			{metric: "go_heap_growth_bytes_per_sec", value: 0},
+			{metric: "go_gcs_per_sec", value: 0},
+			{metric: "go_gc_pause_time", value: 0},
+			{metric: "go_max_gc_pause_time", value: 0},
+		},
+		computeMetrics(&prev, &prev, 10*time.Second, now),
+		"identical memstats")
+}
+
+func TestMetricsMaxPauseNs(t *testing.T) {
+	start := now()
+
+	assert.Equal(t, uint64(0),
+		maxPauseNs(&runtime.MemStats{}, start),
+		"max is 0 for empty pause buffers")
+
+	assert.Equal(t, uint64(time.Second),
+		maxPauseNs(
+			&runtime.MemStats{
+				NumGC:    3,
+				PauseNs:  valsRing(time.Minute, time.Second, time.Millisecond),
+				PauseEnd: timeRing(start.Add(-1), start, start.Add(1)),
+			},
+			start,
+		),
+		"only values in period are considered")
+
+	assert.Equal(t, uint64(time.Minute),
+		maxPauseNs(
+			&runtime.MemStats{
+				NumGC:    1000,
+				PauseNs:  valsRing(time.Second, time.Minute, time.Millisecond),
+				PauseEnd: timeRing(),
+			},
+			time.Unix(0, 0),
+		),
+		"should terminate if all values are in period")
+}
+
+func TestMetricsReport(t *testing.T) {
+	now := now()
+	var err error
+	var buf bytes.Buffer
+	m := newTestMetrics(now)
+
+	m.compute = func(_ *runtime.MemStats, _ *runtime.MemStats, _ time.Duration, _ time.Time) []point {
+		return []point{
+			{metric: "metric_name", value: 1.1},
+			{metric: "does_not_include_NaN", value: math.NaN()},
+			{metric: "does_not_include_+Inf", value: math.Inf(1)},
+			{metric: "does_not_include_-Inf", value: math.Inf(-1)},
+		}
+	}
+
+	err = m.report(now.Add(time.Second), &buf)
+	assert.NoError(t, err)
+	assert.Equal(t, "[[\"metric_name\",1.1]]", buf.String())
+}
+
+func TestMetricsCollectFrequency(t *testing.T) {
+	now := now()
+	var err error
+	var buf bytes.Buffer
+	m := newTestMetrics(now)
+
+	err = m.report(now.Add(-time.Second), &buf)
+	assert.Error(t, err, "collection call times must be monotonically increasing")
+	assert.Empty(t, buf)
+
+	err = m.report(now.Add(time.Second-1), &buf)
+	assert.Error(t, err, "must be at least one second between collection calls")
+	assert.Empty(t, buf)
+
+	err = m.report(now.Add(time.Second), &buf)
+	assert.NoError(t, err, "one second between calls should work")
+	assert.NotEmpty(t, buf)
+}
diff --git a/profiler/options.go b/profiler/options.go
index f5d5ff4659..d404e03beb 100644
--- a/profiler/options.go
+++ b/profiler/options.go
@@ -68,7 +68,7 @@ var defaultClient = &http.Client{
 	Timeout: defaultHTTPTimeout,
 }
 
-var defaultProfileTypes = []ProfileType{CPUProfile, HeapProfile}
+var defaultProfileTypes = []ProfileType{MetricsProfile, CPUProfile, HeapProfile}
 
 type config struct {
 	apiKey string
@@ -224,6 +224,7 @@ func WithProfileTypes(types ...ProfileType) Option {
 		for k := range cfg.types {
 			delete(cfg.types, k)
 		}
+		cfg.addProfileType(MetricsProfile) // always report metrics
 		for _, t := range types {
 			cfg.addProfileType(t)
 		}
diff --git a/profiler/options_test.go b/profiler/options_test.go
index b1d0e1d29c..e3b72b6b1b 100644
--- a/profiler/options_test.go
+++ b/profiler/options_test.go
@@ -101,7 +101,7 @@ func TestOptions(t *testing.T) {
 		WithProfileTypes(HeapProfile)(&cfg)
 		_, ok := cfg.types[HeapProfile]
 		assert.True(t, ok)
-		assert.Len(t, cfg.types, 1)
+		assert.Len(t, cfg.types, 2)
 	})
 
 	t.Run("WithService", func(t *testing.T) {
diff --git a/profiler/profile.go b/profiler/profile.go
index 4b0d8a0e65..b5501f233e 100644
--- a/profiler/profile.go
+++ b/profiler/profile.go
@@ -8,6 +8,7 @@ package profiler
 import (
 	"bytes"
 	"errors"
+	"fmt"
 	"io"
 	"runtime/pprof"
 	"time"
@@ -31,6 +32,8 @@ const (
 	MutexProfile
 	// GoroutineProfile reports stack traces of all current goroutines
 	GoroutineProfile
+	// MetricsProfile reports top-line metrics associated with user-specified profiles
+	MetricsProfile
 )
 
 func (t ProfileType) String() string {
@@ -45,16 +48,44 @@ func (t ProfileType) String() string {
 		return "block"
 	case GoroutineProfile:
 		return "goroutine"
+	case MetricsProfile:
+		return "metrics"
 	default:
 		return "unknown"
 	}
 }
 
-// profile specifies a pprof's data (gzipped protobuf), and the types contained
-// within it.
+// Filename is the identifier used on upload.
+func (t ProfileType) Filename() string {
+	// There are subtle differences between the root and String() (see GoroutineProfile)
+	switch t {
+	case HeapProfile:
+		return "heap.pprof"
+	case CPUProfile:
+		return "cpu.pprof"
+	case MutexProfile:
+		return "mutex.pprof"
+	case BlockProfile:
+		return "block.pprof"
+	case GoroutineProfile:
+		return "goroutines.pprof"
+	case MetricsProfile:
+		return "metrics.json"
+	default:
+		return "unknown"
+	}
+}
+
+// Tag used on profile metadata
+func (t ProfileType) Tag() string {
+	return fmt.Sprintf("profile_type:%s", t)
+}
+
+// profile specifies a profiles data (gzipped protobuf, json), and the types contained within it.
 type profile struct {
-	types []string
-	data  []byte
+	// name indicates profile type and format (e.g. cpu.pprof, metrics.json)
+	name string
+	data []byte
 }
 
 // batch is a collection of profiles of different types, collected at roughly the same time. It maps
@@ -81,6 +112,8 @@ func (p *profiler) runProfile(t ProfileType) (*profile, error) {
 		return blockProfile(p.cfg)
 	case GoroutineProfile:
 		return goroutineProfile(p.cfg)
+	case MetricsProfile:
+		return p.collectMetrics()
 	default:
 		return nil, errors.New("profile type not implemented")
 	}
@@ -96,11 +129,11 @@ func heapProfile(cfg *config) (*profile, error) {
 		return nil, err
 	}
 	end := now()
-	tags := append(cfg.tags, "profile_type:heap")
+	tags := append(cfg.tags, HeapProfile.Tag())
 	cfg.statsd.Timing("datadog.profiler.go.collect_time", end.Sub(start), tags, 1)
 	return &profile{
-		types: []string{"alloc_objects", "alloc_space", "inuse_objects", "inuse_space"},
-		data:  buf.Bytes(),
+		name: HeapProfile.Filename(),
+		data: buf.Bytes(),
 	}, nil
 }
 
@@ -120,11 +153,11 @@ func cpuProfile(cfg *config) (*profile, error) {
 	time.Sleep(cfg.cpuDuration)
 	stopCPUProfile()
 	end := now()
-	tags := append(cfg.tags, "profile_type:cpu")
+	tags := append(cfg.tags, CPUProfile.Tag())
 	cfg.statsd.Timing("datadog.profiler.go.collect_time", end.Sub(start), tags, 1)
 	return &profile{
-		types: []string{"samples", "cpu"},
-		data:  buf.Bytes(),
+		name: CPUProfile.Filename(),
+		data: buf.Bytes(),
 	}, nil
 }
 
@@ -141,45 +174,60 @@ var lookupProfile = func(name string, w io.Writer) error {
 func blockProfile(cfg *config) (*profile, error) {
 	var buf bytes.Buffer
 	start := now()
-	if err := lookupProfile("block", &buf); err != nil {
+	if err := lookupProfile(BlockProfile.String(), &buf); err != nil {
 		return nil, err
 	}
 	end := now()
-	tags := append(cfg.tags, "profile_type:block")
+	tags := append(cfg.tags, BlockProfile.Tag())
 	cfg.statsd.Timing("datadog.profiler.go.collect_time", end.Sub(start), tags, 1)
 	return &profile{
-		types: []string{"delay"},
-		data:  buf.Bytes(),
+		name: BlockProfile.Filename(),
+		data: buf.Bytes(),
 	}, nil
 }
 
 func mutexProfile(cfg *config) (*profile, error) {
 	var buf bytes.Buffer
 	start := now()
-	if err := lookupProfile("mutex", &buf); err != nil {
+	if err := lookupProfile(MutexProfile.String(), &buf); err != nil {
 		return nil, err
 	}
 	end := now()
-	tags := append(cfg.tags, "profile_type:mutex")
+	tags := append(cfg.tags, MutexProfile.Tag())
 	cfg.statsd.Timing("datadog.profiler.go.collect_time", end.Sub(start), tags, 1)
 	return &profile{
-		types: []string{"contentions"},
-		data:  buf.Bytes(),
+		name: MutexProfile.Filename(),
+		data: buf.Bytes(),
 	}, nil
 }
 
 func goroutineProfile(cfg *config) (*profile, error) {
 	var buf bytes.Buffer
 	start := now()
-	if err := lookupProfile("goroutine", &buf); err != nil {
+	if err := lookupProfile(GoroutineProfile.String(), &buf); err != nil {
 		return nil, err
 	}
 	end := now()
-	tags := append(cfg.tags, "profile_type:goroutine")
+	tags := append(cfg.tags, GoroutineProfile.Tag())
 	cfg.statsd.Timing("datadog.profiler.go.collect_time", end.Sub(start), tags, 1)
 	return &profile{
-		types: []string{"goroutines"},
-		data:  buf.Bytes(),
+		name: GoroutineProfile.Filename(),
+		data: buf.Bytes(),
+	}, nil
+}
+
+func (p *profiler) collectMetrics() (*profile, error) {
+	var buf bytes.Buffer
+	start := now()
+	if err := p.met.report(start, &buf); err != nil {
+		return nil, err
+	}
+	end := now()
+	tags := append(p.cfg.tags, MetricsProfile.Tag())
+	p.cfg.statsd.Timing("datadog.profiler.go.collect_time", end.Sub(start), tags, 1)
+	return &profile{
+		name: MetricsProfile.Filename(),
+		data: buf.Bytes(),
 	}, nil
 }
 
diff --git a/profiler/profile_test.go b/profiler/profile_test.go
index 22657bbbb8..8ea9f381d8 100644
--- a/profiler/profile_test.go
+++ b/profiler/profile_test.go
@@ -24,9 +24,7 @@ func TestRunProfile(t *testing.T) {
 		p, err := unstartedProfiler()
 		prof, err := p.runProfile(HeapProfile)
 		require.NoError(t, err)
-		assert.ElementsMatch(t, []string{
-			"alloc_objects", "alloc_space", "inuse_objects", "inuse_space",
-		}, prof.types)
+		assert.Equal(t, "heap.pprof", prof.name)
 		assert.Equal(t, []byte("my-heap-profile"), prof.data)
 	})
 
@@ -44,10 +42,8 @@ func TestRunProfile(t *testing.T) {
 		prof, err := p.runProfile(CPUProfile)
 		end := time.Now()
 		require.NoError(t, err)
-		assert.ElementsMatch(t, []string{
-			"samples", "cpu",
-		}, prof.types)
 		assert.True(t, end.Sub(start) > 10*time.Millisecond)
+		assert.Equal(t, "cpu.pprof", prof.name)
 		assert.Equal(t, []byte("my-cpu-profile"), prof.data)
 	})
 
@@ -61,9 +57,7 @@ func TestRunProfile(t *testing.T) {
 		p, err := unstartedProfiler()
 		prof, err := p.runProfile(MutexProfile)
 		require.NoError(t, err)
-		assert.ElementsMatch(t, []string{
-			"contentions",
-		}, prof.types)
+		assert.Equal(t, "mutex.pprof", prof.name)
 		assert.Equal(t, []byte("mutex"), prof.data)
 	})
 
@@ -77,9 +71,7 @@ func TestRunProfile(t *testing.T) {
 		p, err := unstartedProfiler()
 		prof, err := p.runProfile(BlockProfile)
 		require.NoError(t, err)
-		assert.ElementsMatch(t, []string{
-			"delay",
-		}, prof.types)
+		assert.Equal(t, "block.pprof", prof.name)
 		assert.Equal(t, []byte("block"), prof.data)
 	})
 
@@ -93,9 +85,7 @@ func TestRunProfile(t *testing.T) {
 		p, err := unstartedProfiler()
 		prof, err := p.runProfile(GoroutineProfile)
 		require.NoError(t, err)
-		assert.ElementsMatch(t, []string{
-			"goroutines",
-		}, prof.types)
+		assert.Equal(t, "goroutines.pprof", prof.name)
 		assert.Equal(t, []byte("goroutine"), prof.data)
 	})
 }
diff --git a/profiler/profiler.go b/profiler/profiler.go
index 82642a8d06..794da9f071 100644
--- a/profiler/profiler.go
+++ b/profiler/profiler.go
@@ -62,6 +62,7 @@ type profiler struct {
 	exit       chan struct{}     // exit signals the profiler to stop; it is closed after stopping
 	stopOnce   sync.Once         // stopOnce ensures the profiler is stopped exactly once.
 	wg         sync.WaitGroup    // wg waits for all goroutines to exit when stopping.
+	met        *metrics          // metric collector state
 }
 
 // newProfiler creates a new, unstarted profiler.
@@ -92,6 +93,7 @@ func newProfiler(opts ...Option) (*profiler, error) {
 		cfg:  cfg,
 		out:  make(chan batch, outChannelSize),
 		exit: make(chan struct{}),
+		met:  newMetrics(),
 	}
 	p.uploadFunc = p.upload
 	return &p, nil
@@ -110,6 +112,7 @@ func (p *profiler) run() {
 		defer p.wg.Done()
 		tick := time.NewTicker(p.cfg.period)
 		defer tick.Stop()
+		p.met.reset(now()) // collect baseline metrics at profiler start
 		p.collect(tick.C)
 	}()
 	p.wg.Add(1)
@@ -126,17 +129,22 @@ func (p *profiler) collect(ticker <-chan time.Time) {
 	for {
 		select {
 		case <-ticker:
-			now := time.Now().UTC()
+			now := now()
 			bat := batch{
 				host:  p.cfg.hostname,
 				start: now,
-				end:   now.Add(p.cfg.cpuDuration), // abstraction violation
+				// NB: while this is technically wrong in that it does not
+				// record the actual start and end timestamps for the batch,
+				// it is how the backend understands the client-side
+				// configured CPU profile duration: (start-end).
+				end: now.Add(p.cfg.cpuDuration),
 			}
 			for t := range p.cfg.types {
 				prof, err := p.runProfile(t)
 				if err != nil {
+					fmt.Printf("error: %v\n", err)
 					log.Error("Error getting %s profile: %v; skipping.\n", t, err)
-					p.cfg.statsd.Count("datadog.profiler.go.collect_error", 1, append(p.cfg.tags, fmt.Sprintf("profile_type:%v", t)), 1)
+					p.cfg.statsd.Count("datadog.profiler.go.collect_error", 1, append(p.cfg.tags, t.Tag()), 1)
 					continue
 				}
 				bat.addProfile(prof)
diff --git a/profiler/profiler_test.go b/profiler/profiler_test.go
index a4eac3e4fe..88213ca75c 100644
--- a/profiler/profiler_test.go
+++ b/profiler/profiler_test.go
@@ -10,7 +10,6 @@ import (
 	"net"
 	"os"
 	"runtime"
-	"sort"
 	"sync"
 	"sync/atomic"
 	"testing"
@@ -166,14 +165,7 @@ func TestProfilerInternal(t *testing.T) {
 		assert.EqualValues(1, startCPU)
 		assert.EqualValues(1, stopCPU)
 
-		assert.Equal(2, len(bat.profiles))
-		firstTypes := []string{
-			bat.profiles[0].types[0],
-			bat.profiles[1].types[0],
-		}
-		sort.Strings(firstTypes)
-		assert.Equal("alloc_objects", firstTypes[0])
-		assert.Equal("samples", firstTypes[1])
+		assert.Equal(3, len(bat.profiles))
 
 		p.exit <- struct{}{}
 		<-wait
@@ -227,13 +219,6 @@ func TestProfilerPassthrough(t *testing.T) {
 
 	assert := assert.New(t)
 	assert.Equal(2, len(bat.profiles))
-	firstTypes := []string{
-		bat.profiles[0].types[0],
-		bat.profiles[1].types[0],
-	}
-	sort.Strings(firstTypes)
-	assert.Equal("alloc_objects", firstTypes[0])
-	assert.Equal("samples", firstTypes[1])
 	assert.NotEmpty(bat.profiles[0].data)
 	assert.NotEmpty(bat.profiles[1].data)
 }
diff --git a/profiler/upload.go b/profiler/upload.go
index a40af360da..5c48a6361b 100644
--- a/profiler/upload.go
+++ b/profiler/upload.go
@@ -14,7 +14,6 @@ import (
 	"math/rand"
 	"mime/multipart"
 	"net/http"
-	"strings"
 	"time"
 
 	"gopkg.in/DataDog/dd-trace-go.v1/internal/log"
@@ -126,10 +125,10 @@ func encode(bat batch, tags []string) (contentType string, body io.Reader, err e
 			err = mw.WriteField(k, v)
 		}
 	}
-	writeField("format", "pprof")
-	writeField("runtime", "go")
-	writeField("recording-start", bat.start.Format(time.RFC3339))
-	writeField("recording-end", bat.end.Format(time.RFC3339))
+	writeField("version", "3")
+	writeField("family", "go")
+	writeField("start", bat.start.Format(time.RFC3339))
+	writeField("end", bat.end.Format(time.RFC3339))
 	if bat.host != "" {
 		writeField("tags[]", fmt.Sprintf("host:%s", bat.host))
 	}
@@ -137,14 +136,11 @@ func encode(bat batch, tags []string) (contentType string, body io.Reader, err e
 	for _, tag := range tags {
 		writeField("tags[]", tag)
 	}
-	for i, p := range bat.profiles {
-		writeField(fmt.Sprintf("types[%d]", i), strings.Join(p.types, ","))
-	}
 	if err != nil {
 		return "", nil, err
 	}
-	for i, p := range bat.profiles {
-		formFile, err := mw.CreateFormFile(fmt.Sprintf("data[%d]", i), "pprof-data")
+	for _, p := range bat.profiles {
+		formFile, err := mw.CreateFormFile(fmt.Sprintf("data[%s]", p.name), "pprof-data")
 		if err != nil {
 			return "", nil, err
 		}
diff --git a/profiler/upload_test.go b/profiler/upload_test.go
index 9b593e7f5c..c0c9b6c789 100644
--- a/profiler/upload_test.go
+++ b/profiler/upload_test.go
@@ -34,12 +34,12 @@ var testBatch = batch{
 	host:  "my-host",
 	profiles: []*profile{
 		{
-			types: []string{"cpu"},
-			data:  []byte("my-cpu-profile"),
+			name: CPUProfile.Filename(),
+			data: []byte("my-cpu-profile"),
 		},
 		{
-			types: []string{"alloc_objects", "alloc_space"},
-			data:  []byte("my-heap-profile"),
+			name: HeapProfile.Filename(),
+			data: []byte("my-heap-profile"),
 		},
 	},
 }
@@ -80,18 +80,20 @@ func TestTryUpload(t *testing.T) {
 		fmt.Sprintf("runtime-id:%s", globalconfig.RuntimeID()),
 	}, tags)
 	for k, v := range map[string]string{
-		"format":   "pprof",
-		"runtime":  "go",
-		"types[0]": "cpu",
-		"data[0]":  "my-cpu-profile",
-		"types[1]": "alloc_objects,alloc_space",
-		"data[1]":  "my-heap-profile",
+		"version":          "3",
+		"family":           "go",
+		"data[cpu.pprof]":  "my-cpu-profile",
+		"data[heap.pprof]": "my-heap-profile",
 	} {
 		assert.Equal(v, fields[k], k)
 	}
-	for _, k := range []string{"recording-start", "recording-end"} {
+	for _, k := range []string{"start", "end"} {
 		_, ok := fields[k]
-		assert.True(ok, k)
+		assert.True(ok, "key should be present: %s", k)
+	}
+	for _, k := range []string{"runtime", "format"} {
+		_, ok := fields[k]
+		assert.False(ok, "key should not be present: %s", k)
 	}
 }
 
@@ -159,8 +161,8 @@ func BenchmarkDoRequest(b *testing.B) {
 	}))
 	defer srv.Close()
 	prof := profile{
-		types: []string{"alloc_objects"},
-		data:  []byte("my-heap-profile"),
+		name: "heap",
+		data: []byte("my-heap-profile"),
 	}
 	bat := batch{
 		start:    time.Now().Add(-10 * time.Second),