Skip to content

Commit

Permalink
use HdrHistogram & make maxLatency configurable in TPC-C (#65)
Browse files Browse the repository at this point in the history
Signed-off-by: lysu <[email protected]>
  • Loading branch information
lysu authored Nov 23, 2020
1 parent 1f80be7 commit 79c4a25
Show file tree
Hide file tree
Showing 9 changed files with 573 additions and 106 deletions.
14 changes: 9 additions & 5 deletions ch/workload.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,9 +53,13 @@ type Workloader struct {
// NewWorkloader new work loader
func NewWorkloader(db *sql.DB, cfg *Config) workload.Workloader {
return Workloader{
db: db,
cfg: cfg,
measurement: measurement.NewMeasurement(),
db: db,
cfg: cfg,
measurement: measurement.NewMeasurement(func(m *measurement.Measurement) {
m.MinLatency = 100 * time.Microsecond
m.MaxLatency = 20 * time.Minute
m.SigFigs = 3
}),
}
}

Expand Down Expand Up @@ -224,8 +228,8 @@ func chSummary(h *measurement.Histogram) string {

buf := new(bytes.Buffer)
buf.WriteString(fmt.Sprintf("Count: %d, ", res.Count))
buf.WriteString(fmt.Sprintf("Sum(ms): %d, ", res.Sum))
buf.WriteString(fmt.Sprintf("Avg(ms): %d", res.Avg))
buf.WriteString(fmt.Sprintf("Sum(ms): %.1f, ", res.Sum))
buf.WriteString(fmt.Sprintf("Avg(ms): %.1f", res.Avg))

return buf.String()
}
Expand Down
2 changes: 2 additions & 0 deletions cmd/go-tpc/tpcc.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"os"
"runtime"

"github.com/pingcap/go-tpc/pkg/measurement"
"github.com/pingcap/go-tpc/pkg/workload"
"github.com/pingcap/go-tpc/tpcc"
"github.com/spf13/cobra"
Expand Down Expand Up @@ -89,6 +90,7 @@ func registerTpcc(root *cobra.Command) {
},
}
cmdRun.PersistentFlags().BoolVar(&tpccConfig.Wait, "wait", false, "including keying & thinking time described on TPC-C Standard Specification")
cmdRun.PersistentFlags().DurationVar(&tpccConfig.MaxMeasureLatency, "max-measure-latency", measurement.DefaultMaxLatency, "max measure latency in millisecond")

var cmdCleanup = &cobra.Command{
Use: "cleanup",
Expand Down
3 changes: 2 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,8 @@ module github.com/pingcap/go-tpc
go 1.13

require (
github.com/HdrHistogram/hdrhistogram-go v1.0.0
github.com/go-sql-driver/mysql v1.4.1
github.com/spf13/cobra v0.0.5
github.com/spf13/cobra v1.0.0
google.golang.org/appengine v1.6.2 // indirect
)
472 changes: 463 additions & 9 deletions go.sum

Large diffs are not rendered by default.

119 changes: 47 additions & 72 deletions pkg/measurement/hist.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,122 +3,97 @@ package measurement
import (
"bytes"
"fmt"
"sort"
"sync"
"time"

"github.com/HdrHistogram/hdrhistogram-go"
)

type Histogram struct {
m sync.RWMutex
bucketCount []int64
buckets []int
count int64
sum int64
startTime time.Time
*hdrhistogram.Histogram
m sync.RWMutex
sum int64
startTime time.Time
}

type HistInfo struct {
Elapsed float64
Sum int64
Sum float64
Count int64
Ops float64
Avg int64
P90 int64
P99 int64
P999 int64
Avg float64
P50 float64
P90 float64
P95 float64
P99 float64
P999 float64
Max float64
}

func NewHistogram() *Histogram {
h := new(Histogram)
h.startTime = time.Now()
// Unit 1ms
h.buckets = []int{1, 2, 4, 8, 9, 12, 16, 20, 24, 32, 40, 48, 64, 80,
96, 112, 128, 160, 192, 256, 512, 1000, 1500, 2000, 4000, 8000, 16000}
h.bucketCount = make([]int64, len(h.buckets))
return h
func NewHistogram(minLat, maxLat time.Duration, sf int) *Histogram {
return &Histogram{Histogram: hdrhistogram.New(minLat.Nanoseconds(), maxLat.Nanoseconds(), sf), startTime: time.Now()}
}

func (h *Histogram) Measure(latency time.Duration) {
n := int64(latency / time.Millisecond)

i := sort.SearchInts(h.buckets, int(n))
if i >= len(h.buckets) {
i = len(h.buckets) - 1
func (h *Histogram) Measure(rawLatency time.Duration) {
latency := rawLatency
if latency < time.Duration(h.LowestTrackableValue()) {
latency = time.Duration(h.LowestTrackableValue())
} else if latency > time.Duration(h.HighestTrackableValue()) {
latency = time.Duration(h.HighestTrackableValue())
}

h.m.Lock()
defer h.m.Unlock()

h.sum += n
h.count += 1

h.bucketCount[i] += 1
err := h.RecordValue(latency.Nanoseconds())
h.sum += rawLatency.Nanoseconds()
h.m.Unlock()
if err != nil {
panic(fmt.Sprintf(`recording value error: %s`, err))
}
}

func (h *Histogram) Empty() bool {
h.m.Lock()
defer h.m.Unlock()
return h.count == 0
return h.TotalCount() == 0
}

func (h *Histogram) Summary() string {
res := h.GetInfo()

buf := new(bytes.Buffer)
buf.WriteString(fmt.Sprintf("Takes(s): %.1f, ", res.Elapsed))
buf.WriteString(fmt.Sprintf("Count: %d, ", res.Count))
buf.WriteString(fmt.Sprintf("TPM: %.1f, ", res.Ops*60))
buf.WriteString(fmt.Sprintf("Sum(ms): %d, ", res.Sum))
buf.WriteString(fmt.Sprintf("Avg(ms): %d, ", res.Avg))
buf.WriteString(fmt.Sprintf("90th(ms): %d, ", res.P90))
buf.WriteString(fmt.Sprintf("99th(ms): %d, ", res.P99))
buf.WriteString(fmt.Sprintf("99.9th(ms): %d", res.P999))
buf.WriteString(fmt.Sprintf("Sum(ms): %.1f, ", res.Sum))
buf.WriteString(fmt.Sprintf("Avg(ms): %.1f, ", res.Avg))
buf.WriteString(fmt.Sprintf("50th(ms): %.1f, ", res.P50))
buf.WriteString(fmt.Sprintf("90th(ms): %.1f, ", res.P90))
buf.WriteString(fmt.Sprintf("95th(ms): %.1f, ", res.P95))
buf.WriteString(fmt.Sprintf("99th(ms): %.1f, ", res.P99))
buf.WriteString(fmt.Sprintf("99.9th(ms): %.1f, ", res.P999))
buf.WriteString(fmt.Sprintf("Max(ms): %.1f", res.Max))

return buf.String()
}

func (h *Histogram) GetInfo() HistInfo {
elapsed := time.Now().Sub(h.startTime).Seconds()

per90 := int64(0)
per99 := int64(0)
per999 := int64(0)
opCount := int64(0)

h.m.RLock()
defer h.m.RUnlock()

sum := h.sum
count := h.count

avg := int64(float64(sum) / float64(count))

for i, hc := range h.bucketCount {
opCount += hc
per := float64(opCount) / float64(count)
if per90 == 0 && per >= 0.90 {
per90 = int64(h.buckets[i])
}

if per99 == 0 && per >= 0.99 {
per99 = int64(h.buckets[i])
}

if per999 == 0 && per >= 0.999 {
per999 = int64(h.buckets[i])
}
}

sum := time.Duration(h.sum).Seconds() * 1000
avg := time.Duration(h.Mean()).Seconds() * 1000
elapsed := time.Now().Sub(h.startTime).Seconds()
count := h.TotalCount()
ops := float64(count) / elapsed
info := HistInfo{
Elapsed: elapsed,
Sum: sum,
Count: count,
Ops: ops,
Avg: avg,
P90: per90,
P99: per99,
P999: per999,
P50: time.Duration(h.ValueAtQuantile(50)).Seconds() * 1000,
P90: time.Duration(h.ValueAtQuantile(90)).Seconds() * 1000,
P95: time.Duration(h.ValueAtQuantile(95)).Seconds() * 1000,
P99: time.Duration(h.ValueAtQuantile(99)).Seconds() * 1000,
P999: time.Duration(h.ValueAtQuantile(99.9)).Seconds() * 1000,
Max: time.Duration(h.ValueAtQuantile(100)).Seconds() * 1000,
}
return info
}
10 changes: 5 additions & 5 deletions pkg/measurement/hist_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,12 @@ import (
)

func TestHist(t *testing.T) {
h := NewHistogram()
for i := 0; i < 100; i++ {
n := rand.Intn(100)
h := NewHistogram(1*time.Millisecond, 20*time.Minute, 1)
for i := 0; i < 10000; i++ {
n := rand.Intn(15020)
h.Measure(time.Millisecond * time.Duration(n))
}

h.Measure(time.Minute * 9)
h.Measure(time.Minute * 8)
t.Logf(h.Summary())
// t.Fail()
}
34 changes: 26 additions & 8 deletions pkg/measurement/measure.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,19 @@ import (
"time"
)

const (
sigFigs = 1
defaultMinLatency = 1 * time.Millisecond
DefaultMaxLatency = 16 * time.Second
)

type Measurement struct {
warmUp int32 // use as bool, 1 means in warmup progress, 0 means warmup finished.
sync.RWMutex

MinLatency time.Duration
MaxLatency time.Duration
SigFigs int
OpCurMeasurement map[string]*Histogram
OpSumMeasurement map[string]*Histogram
}
Expand All @@ -31,8 +40,8 @@ func (m *Measurement) getHist(op string, err error, current bool) *Histogram {
opM, ok := opMeasurement[op]
m.RUnlock()
if !ok {
opM = NewHistogram()
opPairedM := NewHistogram()
opM = NewHistogram(m.MinLatency, m.MaxLatency, m.SigFigs)
opPairedM := NewHistogram(m.MinLatency, m.MaxLatency, m.SigFigs)
m.Lock()
opMeasurement[op] = opM
opMeasurement[opPairedKey] = opPairedM
Expand Down Expand Up @@ -101,11 +110,20 @@ func (m *Measurement) Measure(op string, lan time.Duration, err error) {
m.measure(op, err, lan)
}

func NewMeasurement() *Measurement {
return &Measurement{
0,
sync.RWMutex{},
make(map[string]*Histogram, 16),
make(map[string]*Histogram, 16),
func NewMeasurement(opts ...func(*Measurement)) *Measurement {
m := &Measurement{
warmUp: 0,
RWMutex: sync.RWMutex{},
MinLatency: defaultMinLatency,
MaxLatency: DefaultMaxLatency,
SigFigs: sigFigs,
OpCurMeasurement: make(map[string]*Histogram, 16),
OpSumMeasurement: make(map[string]*Histogram, 16),
}
for _, opt := range opts {
if opt != nil {
opt(m)
}
}
return m
}
15 changes: 12 additions & 3 deletions tpcc/workload.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,8 @@ type Config struct {
// whether to involve wait times(keying time&thinking time)
Wait bool

MaxMeasureLatency time.Duration

// for prepare sub-command only
OutputType string
OutputDir string
Expand Down Expand Up @@ -91,13 +93,17 @@ func NewWorkloader(db *sql.DB, cfg *Config) (workload.Workloader, error) {
panic(fmt.Errorf("number warehouses %d must >= partition %d", cfg.Warehouses, cfg.Parts))
}

resetMaxLat := func(m *measurement.Measurement) {
m.MaxLatency = cfg.MaxMeasureLatency
}

w := &Workloader{
db: db,
cfg: cfg,
initLoadTime: time.Now().Format(timeFormat),
ddlManager: newDDLManager(cfg.Parts, cfg.UseFK),
rtMeasurement: measurement.NewMeasurement(),
waitTimeMeasurement: measurement.NewMeasurement(),
rtMeasurement: measurement.NewMeasurement(resetMaxLat),
waitTimeMeasurement: measurement.NewMeasurement(resetMaxLat),
}

w.txns = []txn{
Expand Down Expand Up @@ -329,7 +335,10 @@ func (w *Workloader) OutputStats(ifSummaryReport bool) {
hist, e := w.rtMeasurement.OpSumMeasurement["new_order"]
if e && !hist.Empty() {
result := hist.GetInfo()
fmt.Printf("tpmC: %.1f\n", result.Ops*60)
const specWarehouseFactor = 12.86
tpmC := result.Ops * 60
efc := 100 * tpmC / (specWarehouseFactor * float64(w.cfg.Warehouses))
fmt.Printf("tpmC: %.1f, efficiency: %.1f%%\n", tpmC, efc)
}
}
}
Expand Down
10 changes: 7 additions & 3 deletions tpch/workload.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,9 +53,13 @@ type Workloader struct {
// NewWorkloader new work loader
func NewWorkloader(db *sql.DB, cfg *Config) workload.Workloader {
return Workloader{
db: db,
cfg: cfg,
measurement: measurement.NewMeasurement(),
db: db,
cfg: cfg,
measurement: measurement.NewMeasurement(func(m *measurement.Measurement) {
m.MinLatency = 100 * time.Millisecond
m.MaxLatency = 20 * time.Minute
m.SigFigs = 3
}),
}
}

Expand Down

0 comments on commit 79c4a25

Please sign in to comment.