Skip to content

Commit

Permalink
tpcc: add wait times on each transactions (#42)
Browse files Browse the repository at this point in the history
Signed-off-by: mahjonp <[email protected]>
  • Loading branch information
mahjonp authored May 21, 2020
1 parent 883c1a1 commit 3ed127b
Show file tree
Hide file tree
Showing 4 changed files with 42 additions and 16 deletions.
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ For example:
./bin/go-tpc tpcc --warehouses 4 --parts 4 prepare
# Run TPCC workloads
./bin/go-tpc tpcc --warehouses 4 run
# Run TPCC including wait times(keying & thinking time) on every transactions
./bin/go-tpc tpcc --warehouses 4 run --wait
# Cleanup
./bin/go-tpc tpcc --warehouses 4 cleanup
# Check consistency
Expand Down
1 change: 1 addition & 0 deletions cmd/go-tpc/tpcc.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ func registerTpcc(root *cobra.Command) {
executeTpcc("run")
},
}
cmdRun.PersistentFlags().BoolVar(&tpccConfig.Wait, "wait", false, "including keying & thinking time described on TPC-C Standard Specification")

var cmdCleanup = &cobra.Command{
Use: "cleanup",
Expand Down
12 changes: 6 additions & 6 deletions pkg/measurement/hist.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ type histInfo struct {
count int64
ops float64
avg int64
p95 int64
p90 int64
p99 int64
p999 int64
}
Expand Down Expand Up @@ -70,7 +70,7 @@ func (h *histogram) Summary() string {
buf.WriteString(fmt.Sprintf("TPM: %.1f, ", res.ops*60))
buf.WriteString(fmt.Sprintf("Sum(ms): %d, ", res.sum))
buf.WriteString(fmt.Sprintf("Avg(ms): %d, ", res.avg))
buf.WriteString(fmt.Sprintf("95th(ms): %d, ", res.p95))
buf.WriteString(fmt.Sprintf("90th(ms): %d, ", res.p90))
buf.WriteString(fmt.Sprintf("99th(ms): %d, ", res.p99))
buf.WriteString(fmt.Sprintf("99.9th(ms): %d", res.p999))

Expand All @@ -80,7 +80,7 @@ func (h *histogram) Summary() string {
func (h *histogram) getInfo() histInfo {
elapsed := time.Now().Sub(h.startTime).Seconds()

per95 := int64(0)
per90 := int64(0)
per99 := int64(0)
per999 := int64(0)
opCount := int64(0)
Expand All @@ -96,8 +96,8 @@ func (h *histogram) getInfo() histInfo {
for i, hc := range h.bucketCount {
opCount += hc
per := float64(opCount) / float64(count)
if per95 == 0 && per >= 0.95 {
per95 = int64(h.buckets[i])
if per90 == 0 && per >= 0.90 {
per90 = int64(h.buckets[i])
}

if per99 == 0 && per >= 0.99 {
Expand All @@ -116,7 +116,7 @@ func (h *histogram) getInfo() histInfo {
count: count,
ops: ops,
avg: avg,
p95: per95,
p90: per90,
p99: per99,
p999: per999,
}
Expand Down
43 changes: 33 additions & 10 deletions tpcc/workload.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ import (
"context"
"database/sql"
"fmt"
"math"
"math/rand"
"sync"
"time"

Expand All @@ -20,11 +22,11 @@ var tables = []string{tableItem, tableCustomer, tableDistrict, tableHistory,
tableNewOrder, tableOrderLine, tableOrders, tableStock, tableWareHouse}

type txn struct {
name string
action func(ctx context.Context, threadID int) error
weight int
// keyingTime time.Duration
// thinkingTime time.Duration
name string
action func(ctx context.Context, threadID int) error
weight int
keyingTime float64
thinkingTime float64
}

type tpccState struct {
Expand All @@ -50,6 +52,9 @@ type Config struct {
Isolation int
CheckAll bool

// whether to involve wait times(keying time&thinking time)
Wait bool

// for prepare sub-command only
OutputType string
OutputDir string
Expand Down Expand Up @@ -88,11 +93,11 @@ func NewWorkloader(db *sql.DB, cfg *Config) (workload.Workloader, error) {
}

w.txns = []txn{
{name: "new_order", action: w.runNewOrder, weight: 45},
{name: "payment", action: w.runPayment, weight: 43},
{name: "order_status", action: w.runOrderStatus, weight: 4},
{name: "delivery", action: w.runDelivery, weight: 4},
{name: "stock_level", action: w.runStockLevel, weight: 4},
{name: "new_order", action: w.runNewOrder, weight: 45, keyingTime: 18, thinkingTime: 12},
{name: "payment", action: w.runPayment, weight: 43, keyingTime: 3, thinkingTime: 12},
{name: "order_status", action: w.runOrderStatus, weight: 4, keyingTime: 2, thinkingTime: 10},
{name: "delivery", action: w.runDelivery, weight: 4, keyingTime: 2, thinkingTime: 5},
{name: "stock_level", action: w.runStockLevel, weight: 4, keyingTime: 2, thinkingTime: 5},
}

if w.db != nil {
Expand Down Expand Up @@ -233,11 +238,29 @@ func (w *Workloader) Run(ctx context.Context, threadID int) error {
txnIndex := s.decks[s.R.Intn(len(s.decks))]
txn := w.txns[txnIndex]

// For each transaction type, the Keying Time is constant
// and must be a minimum of 18 seconds for New Order,
// 3 seconds for Payment,
// and 2 seconds each for Order-Status, Delivery, and Stock-Level.
if w.cfg.Wait {
time.Sleep(time.Duration(txn.keyingTime * float64(time.Second)))
}

start := time.Now()
err := txn.action(ctx, threadID)

measurement.Measure(txn.name, time.Now().Sub(start), err)

// 5.2.5.4, For each transaction type, think time is taken independently from a negative exponential distribution.
// Think time, T t , is computed from the following equation: Tt = -log(r) * (mean think time),
// r = random number uniformly distributed between 0 and 1
if w.cfg.Wait {
thinkTime := -math.Log(rand.Float64()) * txn.thinkingTime
if thinkTime > txn.thinkingTime*10 {
thinkTime = txn.thinkingTime * 10
}
time.Sleep(time.Duration(thinkTime * float64(time.Second)))
}
// TODO: add check
return err
}
Expand Down

0 comments on commit 3ed127b

Please sign in to comment.