Skip to content

Commit

Permalink
telemetry: add SQL statistics to telemetry report data (pingcap#24990)
Browse files Browse the repository at this point in the history
  • Loading branch information
YinWeiling authored Jun 6, 2021
1 parent 7cc1ebc commit 01dc05f
Show file tree
Hide file tree
Showing 2 changed files with 119 additions and 40 deletions.
46 changes: 6 additions & 40 deletions telemetry/data_slow_query.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,6 @@ import (
"github.com/pingcap/tidb/domain/infosync"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/util/logutil"
"github.com/prometheus/client_golang/api"
promv1 "github.com/prometheus/client_golang/api/prometheus/v1"
pmodel "github.com/prometheus/common/model"
"go.uber.org/zap"
)
Expand Down Expand Up @@ -90,7 +88,12 @@ func updateCurrentSQB(ctx sessionctx.Context) (err error) {
}
}()

value, err := querySlowQueryMetric(ctx) //TODO: judge error here
pQueryCtx, cancel := context.WithTimeout(context.Background(), time.Second*30)
defer cancel()
pQueryTs := time.Now().Add(-time.Minute)
promQL := "tidb_server_slow_query_process_duration_seconds_bucket{sql_type=\"general\"}"
value, err := querySQLMetric(pQueryCtx, pQueryTs, promQL)

if err != nil && err != infosync.ErrPrometheusAddrIsNotSet {
logutil.BgLogger().Info("querySlowQueryMetric got error")
return err
Expand All @@ -112,43 +115,6 @@ func updateCurrentSQB(ctx sessionctx.Context) (err error) {
return nil
}

func querySlowQueryMetric(sctx sessionctx.Context) (result pmodel.Value, err error) {
// Add retry to avoid network error.
var prometheusAddr string
for i := 0; i < 5; i++ {
//TODO: the prometheus will be Integrated into the PD, then we need to query the prometheus in PD directly, which need change the query API
prometheusAddr, err = infosync.GetPrometheusAddr()
if err == nil || err == infosync.ErrPrometheusAddrIsNotSet {
break
}
time.Sleep(100 * time.Millisecond)
}
if err != nil {
return nil, err
}
promClient, err := api.NewClient(api.Config{
Address: prometheusAddr,
})
if err != nil {
return nil, err
}
promQLAPI := promv1.NewAPI(promClient)
promQL := "tidb_server_slow_query_process_duration_seconds_bucket{sql_type=\"general\"}"

ts := time.Now().Add(-time.Minute)
// Add retry to avoid network error.
ctx, cancel := context.WithTimeout(context.Background(), time.Second*30)
defer cancel()
for i := 0; i < 5; i++ {
result, _, err = promQLAPI.Query(ctx, promQL, ts)
if err == nil {
break
}
time.Sleep(100 * time.Millisecond)
}
return result, err
}

// calculateDeltaSQB calculate the delta between current slow query bucket and last slow query bucket
func calculateDeltaSQB() *SlowQueryBucket {
deltaMap := make(SlowQueryBucket)
Expand Down
113 changes: 113 additions & 0 deletions telemetry/data_window.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,18 @@
package telemetry

import (
"context"
"sync"
"time"

"github.com/pingcap/errors"
"github.com/pingcap/tidb/domain/infosync"
"github.com/pingcap/tidb/store/tikv/logutil"
"github.com/prometheus/client_golang/api"
promv1 "github.com/prometheus/client_golang/api/prometheus/v1"
pmodel "github.com/prometheus/common/model"
"go.uber.org/atomic"
"go.uber.org/zap"
)

var (
Expand Down Expand Up @@ -51,13 +59,22 @@ const (

maxSubWindowLength = int(ReportInterval / SubWindowSize) // TODO: Ceiling?
maxSubWindowLengthInWindow = int(WindowSize / SubWindowSize) // TODO: Ceiling?
promReadTimeout = time.Second * 30
)

type windowData struct {
BeginAt time.Time `json:"beginAt"`
ExecuteCount uint64 `json:"executeCount"`
TiFlashUsage tiFlashUsageData `json:"tiFlashUsage"`
CoprCacheUsage coprCacheUsageData `json:"coprCacheUsage"`
SQLUsage sqlUsageData `json:"SQLUsage"`
}

type sqlType map[string]int64

type sqlUsageData struct {
SQLTotal int64 `json:"total"`
SQLType sqlType `json:"type"`
}

type coprCacheUsageData struct {
Expand All @@ -80,6 +97,75 @@ var (
subWindowsLock = sync.RWMutex{}
)

func getSQLSum(sqlTypeData *sqlType) int64 {
result := int64(0)
for _, v := range *sqlTypeData {
result += v
}
return result
}

func readSQLMetric(timepoint time.Time, SQLResult *sqlUsageData) error {
ctx := context.TODO()
promQL := "sum(tidb_executor_statement_total{}) by (instance,type)"
result, err := querySQLMetric(ctx, timepoint, promQL)
if err != nil {
if err1, ok := err.(*promv1.Error); ok {
return errors.Errorf("query metric error, msg: %v, detail: %v", err1.Msg, err1.Detail)
}
return errors.Errorf("query metric error: %v", err.Error())
}

anylisSQLUsage(result, SQLResult)
return nil
}

func querySQLMetric(ctx context.Context, queryTime time.Time, promQL string) (result pmodel.Value, err error) {
// Add retry to avoid network error.
var prometheusAddr string
for i := 0; i < 5; i++ {
//TODO: the prometheus will be Integrated into the PD, then we need to query the prometheus in PD directly, which need change the quire API
prometheusAddr, err = infosync.GetPrometheusAddr()
if err == nil || err == infosync.ErrPrometheusAddrIsNotSet {
break
}
time.Sleep(100 * time.Millisecond)
}
if err != nil {
return nil, err
}
promClient, err := api.NewClient(api.Config{
Address: prometheusAddr,
})
if err != nil {
return nil, err
}
promQLAPI := promv1.NewAPI(promClient)
ctx, cancel := context.WithTimeout(ctx, promReadTimeout)
defer cancel()
// Add retry to avoid network error.
for i := 0; i < 5; i++ {
result, _, err = promQLAPI.Query(ctx, promQL, queryTime)
if err == nil {
break
}
time.Sleep(100 * time.Millisecond)
}
return result, err
}

func anylisSQLUsage(promResult pmodel.Value, SQLResult *sqlUsageData) {
switch promResult.Type() {
case pmodel.ValVector:
matrix := promResult.(pmodel.Vector)
for _, m := range matrix {
v := m.Value
promLable := string(m.Metric[pmodel.LabelName("type")])
SQLResult.SQLType[promLable] = int64(float64(v))
}
}
}

// RotateSubWindow rotates the telemetry sub window.
func RotateSubWindow() {
thisSubWindow := windowData{
Expand All @@ -98,7 +184,18 @@ func RotateSubWindow() {
GTE80: CurrentCoprCacheHitRatioGTE80Count.Swap(0),
GTE100: CurrentCoprCacheHitRatioGTE100Count.Swap(0),
},
SQLUsage: sqlUsageData{
SQLTotal: 0,
SQLType: make(sqlType),
},
}

if err := readSQLMetric(time.Now(), &thisSubWindow.SQLUsage); err != nil {
logutil.BgLogger().Error("Error exists when calling prometheus", zap.Error(err))

}
thisSubWindow.SQLUsage.SQLTotal = getSQLSum(&thisSubWindow.SQLUsage.SQLType)

subWindowsLock.Lock()
rotatedSubWindows = append(rotatedSubWindows, &thisSubWindow)
if len(rotatedSubWindows) > maxSubWindowLength {
Expand All @@ -108,6 +205,14 @@ func RotateSubWindow() {
subWindowsLock.Unlock()
}

func calDeltaSQLTypeMap(cur sqlType, last sqlType) sqlType {
deltaMap := make(sqlType)
for key, value := range cur {
deltaMap[key] = value - (last)[key]
}
return deltaMap
}

// getWindowData returns data aggregated by window size.
func getWindowData() []*windowData {
results := make([]*windowData, 0)
Expand All @@ -117,6 +222,12 @@ func getWindowData() []*windowData {
i := 0
for i < len(rotatedSubWindows) {
thisWindow := *rotatedSubWindows[i]
var startWindow windowData
if i == 0 {
startWindow = thisWindow
} else {
startWindow = *rotatedSubWindows[i-1]
}
aggregatedSubWindows := 1
// Aggregate later sub windows
i++
Expand All @@ -131,6 +242,8 @@ func getWindowData() []*windowData {
thisWindow.CoprCacheUsage.GTE40 += rotatedSubWindows[i].CoprCacheUsage.GTE40
thisWindow.CoprCacheUsage.GTE80 += rotatedSubWindows[i].CoprCacheUsage.GTE80
thisWindow.CoprCacheUsage.GTE100 += rotatedSubWindows[i].CoprCacheUsage.GTE100
thisWindow.SQLUsage.SQLTotal = rotatedSubWindows[i].SQLUsage.SQLTotal - startWindow.SQLUsage.SQLTotal
thisWindow.SQLUsage.SQLType = calDeltaSQLTypeMap(rotatedSubWindows[i].SQLUsage.SQLType, startWindow.SQLUsage.SQLType)
aggregatedSubWindows++
i++
}
Expand Down

0 comments on commit 01dc05f

Please sign in to comment.