Skip to content

Commit

Permalink
*: fix the way to use recover in ddl, and unify using recover in …
Browse files Browse the repository at this point in the history
…domain and ddl (pingcap#16798)
  • Loading branch information
zimulala authored May 18, 2020
1 parent 2efab88 commit 51015c1
Show file tree
Hide file tree
Showing 7 changed files with 68 additions and 99 deletions.
34 changes: 4 additions & 30 deletions ddl/ddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@ import (
"github.com/pingcap/tidb/sessionctx/binloginfo"
"github.com/pingcap/tidb/sessionctx/variable"
"github.com/pingcap/tidb/table"
tidbutil "github.com/pingcap/tidb/util"
"github.com/pingcap/tidb/util/logutil"
"go.uber.org/zap"
)
Expand Down Expand Up @@ -318,18 +317,7 @@ func (d *ddl) Start(ctxPool *pools.ResourcePool) error {
d.quitCh = make(chan struct{})

d.wg.Add(1)
go func() {
defer d.wg.Done()
tidbutil.WithRecovery(
func() { d.limitDDLJobs() },
func(r interface{}) {
if r != nil {
logutil.BgLogger().Error("[ddl] limit DDL jobs meet panic",
zap.String("ID", d.uuid), zap.Reflect("r", r), zap.Stack("stack trace"))
metrics.PanicCounter.WithLabelValues(metrics.LabelDDL).Inc()
}
})
}()
go d.limitDDLJobs()

// If RunWorker is true, we need campaign owner and do DDL job.
// Otherwise, we needn't do that.
Expand All @@ -347,30 +335,16 @@ func (d *ddl) Start(ctxPool *pools.ResourcePool) error {
for _, worker := range d.workers {
worker.wg.Add(1)
w := worker
go tidbutil.WithRecovery(
func() { w.start(d.ddlCtx) },
func(r interface{}) {
if r != nil {
logutil.Logger(w.logCtx).Error("[ddl] DDL worker meet panic", zap.String("ID", d.uuid))
metrics.PanicCounter.WithLabelValues(metrics.LabelDDLWorker).Inc()
}
})
go w.start(d.ddlCtx)

metrics.DDLCounter.WithLabelValues(fmt.Sprintf("%s_%s", metrics.CreateDDL, worker.String())).Inc()

// When the start function is called, we will send a fake job to let worker
// checks owner firstly and try to find whether a job exists and run.
asyncNotify(worker.ddlJobCh)
}

go tidbutil.WithRecovery(
func() { d.schemaSyncer.StartCleanWork() },
func(r interface{}) {
if r != nil {
logutil.BgLogger().Error("[ddl] DDL syncer clean worker meet panic",
zap.String("ID", d.uuid), zap.Reflect("r", r), zap.Stack("stack trace"))
metrics.PanicCounter.WithLabelValues(metrics.LabelDDLSyncer).Inc()
}
})
go d.schemaSyncer.StartCleanWork()
metrics.DDLCounter.WithLabelValues(metrics.StartCleanWork).Inc()
}

Expand Down
22 changes: 2 additions & 20 deletions ddl/ddl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,23 +20,19 @@ import (
"time"

. "github.com/pingcap/check"
"github.com/pingcap/log"
"github.com/pingcap/parser/ast"
"github.com/pingcap/parser/model"
"github.com/pingcap/parser/terror"
"github.com/pingcap/tidb/config"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/meta"
"github.com/pingcap/tidb/meta/autoid"
"github.com/pingcap/tidb/metrics"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/store/mockstore"
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util"
"github.com/pingcap/tidb/util/logutil"
"github.com/pingcap/tidb/util/mock"
"github.com/pingcap/tidb/util/testleak"
"go.uber.org/zap"
)

type DDLForTest interface {
Expand Down Expand Up @@ -73,16 +69,7 @@ func (d *ddl) restartWorkers(ctx context.Context) {
d.quitCh = make(chan struct{})

d.wg.Add(1)
go func() {
defer d.wg.Done()
util.WithRecovery(
func() { d.limitDDLJobs() },
func(r interface{}) {
logutil.BgLogger().Error("[ddl] DDL add batch DDL jobs meet panic",
zap.String("ID", d.uuid), zap.Reflect("r", r), zap.Stack("stack trace"))
metrics.PanicCounter.WithLabelValues(metrics.LabelDDL).Inc()
})
}()
go d.limitDDLJobs()
if !RunWorker {
return
}
Expand All @@ -93,12 +80,7 @@ func (d *ddl) restartWorkers(ctx context.Context) {
worker.wg.Add(1)
worker.quitCh = make(chan struct{})
w := worker
go util.WithRecovery(func() { w.start(d.ddlCtx) },
func(r interface{}) {
if r != nil {
log.Error("[ddl] restart DDL worker meet panic", zap.String("worker", w.String()), zap.String("ID", d.uuid))
}
})
go w.start(d.ddlCtx)
asyncNotify(worker.ddlJobCh)
}
}
Expand Down
23 changes: 15 additions & 8 deletions ddl/ddl_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,11 @@ func (w *worker) close() {
func (w *worker) start(d *ddlCtx) {
logutil.Logger(w.logCtx).Info("[ddl] start DDL worker")
defer w.wg.Done()
defer tidbutil.Recover(
metrics.LabelDDLWorker,
fmt.Sprintf("DDL ID %s, %s start", d.uuid, w),
nil, true,
)

// We use 4 * lease time to check owner's timeout, so here, we will update owner's status
// every 2 * lease time. If lease is 0, we will use default 1s.
Expand Down Expand Up @@ -196,6 +201,9 @@ func buildJobDependence(t *meta.Meta, curJob *model.Job) error {
}

func (d *ddl) limitDDLJobs() {
defer d.wg.Done()
defer tidbutil.Recover(metrics.LabelDDL, "limitDDLJobs", nil, true)

tasks := make([]*limitJobTask, 0, batchAddingJobs)
for {
select {
Expand Down Expand Up @@ -460,14 +468,7 @@ func (w *worker) handleDDLJobQueue(d *ddlCtx) error {

// If running job meets error, we will save this error in job Error
// and retry later if the job is not cancelled.
tidbutil.WithRecovery(func() {
schemaVer, runJobErr = w.runDDLJob(d, t, job)
}, func(r interface{}) {
if r != nil {
// If run ddl job panic, just cancel the ddl jobs.
job.State = model.JobStateCancelling
}
})
schemaVer, runJobErr = w.runDDLJob(d, t, job)
if job.IsCancelled() {
txn.Discard()
err = w.finishDDLJob(t, job)
Expand Down Expand Up @@ -562,6 +563,12 @@ func chooseLeaseTime(t, max time.Duration) time.Duration {

// runDDLJob runs a DDL job. It returns the current schema version in this transaction and the error.
func (w *worker) runDDLJob(d *ddlCtx, t *meta.Meta, job *model.Job) (ver int64, err error) {
defer tidbutil.Recover(metrics.LabelDDLWorker, fmt.Sprintf("%s runDDLJob", w),
func() {
// If run DDL job panic, just cancel the DDL jobs.
job.State = model.JobStateCancelling
}, false)

// Mock for run ddl job panic.
failpoint.Inject("mockPanicInRunDDLJob", func(val failpoint.Value) {})

Expand Down
18 changes: 4 additions & 14 deletions ddl/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -540,15 +540,10 @@ func (w *worker) onCreateIndex(d *ddlCtx, t *meta.Meta, job *model.Job, isPK boo
}

err = w.runReorgJob(t, reorgInfo, tbl.Meta(), d.lease, func() (addIndexErr error) {
defer func() {
r := recover()
if r != nil {
buf := util.GetStack()
logutil.BgLogger().Error("[ddl] add table index panic", zap.Any("panic", r), zap.String("stack", string(buf)))
metrics.PanicCounter.WithLabelValues(metrics.LabelDDL).Inc()
defer util.Recover(metrics.LabelDDL, "onCreateIndex",
func() {
addIndexErr = errCancelledDDLJob.GenWithStack("add table `%v` index `%v` panic", tblInfo.Name, indexInfo.Name)
}
}()
}, false)
return w.addTableIndex(tbl, indexInfo, reorgInfo)
})
if err != nil {
Expand Down Expand Up @@ -1192,14 +1187,9 @@ func (w *addIndexWorker) handleBackfillTask(d *ddlCtx, task *reorgIndexTask) *ad
func (w *addIndexWorker) run(d *ddlCtx) {
logutil.BgLogger().Info("[ddl] add index worker start", zap.Int("workerID", w.id))
defer func() {
r := recover()
if r != nil {
buf := util.GetStack()
logutil.BgLogger().Error("[ddl] add index worker panic", zap.Any("panic", r), zap.String("stack", string(buf)))
metrics.PanicCounter.WithLabelValues(metrics.LabelDDL).Inc()
}
w.resultCh <- &addIndexResult{err: errReorgPanic}
}()
defer util.Recover(metrics.LabelDDL, "addIndexWorker.run", nil, false)
for {
task, more := <-w.taskCh
if !more {
Expand Down
3 changes: 3 additions & 0 deletions ddl/util/syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"github.com/pingcap/parser/terror"
"github.com/pingcap/tidb/metrics"
"github.com/pingcap/tidb/owner"
tidbutil "github.com/pingcap/tidb/util"
"github.com/pingcap/tidb/util/logutil"
"go.etcd.io/etcd/clientv3"
"go.etcd.io/etcd/clientv3/concurrency"
Expand Down Expand Up @@ -421,6 +422,8 @@ const (
var NeededCleanTTL = int64(-60)

func (s *schemaVersionSyncer) StartCleanWork() {
defer tidbutil.Recover(metrics.LabelDDLSyncer, "StartCleanWorker", nil, false)

for {
select {
case <-s.notifyCleanExpiredPathsCh:
Expand Down
37 changes: 10 additions & 27 deletions domain/domain.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ package domain

import (
"context"
"os"
"sync"
"sync/atomic"
"time"
Expand Down Expand Up @@ -407,7 +406,7 @@ func (do *Domain) ShowSlowQuery(showSlow *ast.ShowSlow) []*SlowQueryInfo {
}

func (do *Domain) topNSlowQueryLoop() {
defer recoverInDomain("topNSlowQueryLoop", false)
defer util.Recover(metrics.LabelDomain, "topNSlowQueryLoop", nil, false)
ticker := time.NewTicker(time.Minute * 10)
defer func() {
ticker.Stop()
Expand Down Expand Up @@ -442,7 +441,7 @@ func (do *Domain) infoSyncerKeeper() {
defer func() {
do.wg.Done()
logutil.BgLogger().Info("infoSyncerKeeper exited.")
recoverInDomain("infoSyncerKeeper", false)
util.Recover(metrics.LabelDomain, "infoSyncerKeeper", nil, false)
}()
ticker := time.NewTicker(infosync.ReportInterval)
defer ticker.Stop()
Expand All @@ -463,7 +462,7 @@ func (do *Domain) infoSyncerKeeper() {
}

func (do *Domain) topologySyncerKeeper() {
defer recoverInDomain("topologySyncerKeeper", false)
defer util.Recover(metrics.LabelDomain, "topologySyncerKeeper", nil, false)
ticker := time.NewTicker(infosync.TopologyTimeToRefresh)
defer func() {
ticker.Stop()
Expand Down Expand Up @@ -491,7 +490,7 @@ func (do *Domain) topologySyncerKeeper() {
}

func (do *Domain) loadSchemaInLoop(lease time.Duration) {
defer recoverInDomain("loadSchemaInLoop", true)
defer util.Recover(metrics.LabelDomain, "loadSchemaInLoop", nil, true)
// Lease renewal can run at any frequency.
// Use lease/2 here as recommend by paper.
ticker := time.NewTicker(lease / 2)
Expand Down Expand Up @@ -842,7 +841,7 @@ func (do *Domain) LoadPrivilegeLoop(ctx sessionctx.Context) error {
defer func() {
do.wg.Done()
logutil.BgLogger().Info("loadPrivilegeInLoop exited.")
recoverInDomain("loadPrivilegeInLoop", false)
util.Recover(metrics.LabelDomain, "loadPrivilegeInLoop", nil, false)
}()
var count int
for {
Expand Down Expand Up @@ -906,7 +905,7 @@ func (do *Domain) globalBindHandleWorkerLoop() {
defer func() {
do.wg.Done()
logutil.BgLogger().Info("globalBindHandleWorkerLoop exited.")
recoverInDomain("globalBindHandleWorkerLoop", false)
util.Recover(metrics.LabelDomain, "globalBindHandleWorkerLoop", nil, false)
}()
bindWorkerTicker := time.NewTicker(bindinfo.Lease)
defer bindWorkerTicker.Stop()
Expand Down Expand Up @@ -935,7 +934,7 @@ func (do *Domain) handleEvolvePlanTasksLoop(ctx sessionctx.Context) {
defer func() {
do.wg.Done()
logutil.BgLogger().Info("handleEvolvePlanTasksLoop exited.")
recoverInDomain("handleEvolvePlanTasksLoop", false)
util.Recover(metrics.LabelDomain, "handleEvolvePlanTasksLoop", nil, false)
}()
owner := do.newOwnerManager(bindinfo.Prompt, bindinfo.OwnerKey)
for {
Expand Down Expand Up @@ -1026,7 +1025,7 @@ func (do *Domain) newOwnerManager(prompt, ownerKey string) owner.Manager {
}

func (do *Domain) loadStatsWorker() {
defer recoverInDomain("loadStatsWorker", false)
defer util.Recover(metrics.LabelDomain, "loadStatsWorker", nil, false)
lease := do.statsLease
if lease == 0 {
lease = 3 * time.Second
Expand Down Expand Up @@ -1063,7 +1062,7 @@ func (do *Domain) loadStatsWorker() {
}

func (do *Domain) updateStatsWorker(ctx sessionctx.Context, owner owner.Manager) {
defer recoverInDomain("updateStatsWorker", false)
defer util.Recover(metrics.LabelDomain, "updateStatsWorker", nil, false)
lease := do.statsLease
deltaUpdateTicker := time.NewTicker(20 * lease)
gcStatsTicker := time.NewTicker(100 * lease)
Expand Down Expand Up @@ -1124,7 +1123,7 @@ func (do *Domain) updateStatsWorker(ctx sessionctx.Context, owner owner.Manager)
}

func (do *Domain) autoAnalyzeWorker(owner owner.Manager) {
defer recoverInDomain("autoAnalyzeWorker", false)
defer util.Recover(metrics.LabelDomain, "autoAnalyzeWorker", nil, false)
statsHandle := do.StatsHandle()
analyzeTicker := time.NewTicker(do.statsLease)
defer func() {
Expand Down Expand Up @@ -1173,22 +1172,6 @@ func (do *Domain) NotifyUpdatePrivilege(ctx sessionctx.Context) {
}
}

func recoverInDomain(funcName string, quit bool) {
r := recover()
if r == nil {
return
}
buf := util.GetStack()
logutil.BgLogger().Error("recover in domain failed", zap.String("funcName", funcName),
zap.Any("error", r), zap.String("buffer", string(buf)))
metrics.PanicCounter.WithLabelValues(metrics.LabelDomain).Inc()
if quit {
// Wait for metrics to be pushed.
time.Sleep(time.Second * 15)
os.Exit(1)
}
}

var (
// ErrInfoSchemaExpired returns the error that information schema is out of date.
ErrInfoSchemaExpired = terror.ClassDomain.New(errno.ErrInfoSchemaExpired, errno.MySQLErrName[errno.ErrInfoSchemaExpired])
Expand Down
30 changes: 30 additions & 0 deletions util/misc.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"fmt"
"io/ioutil"
"net/http"
"os"
"runtime"
"strconv"
"strings"
Expand All @@ -32,6 +33,7 @@ import (
"github.com/pingcap/parser/mysql"
"github.com/pingcap/parser/terror"
"github.com/pingcap/tidb/config"
"github.com/pingcap/tidb/metrics"
"github.com/pingcap/tidb/util/collate"
"github.com/pingcap/tidb/util/logutil"
"github.com/pingcap/tipb/go-tipb"
Expand Down Expand Up @@ -93,6 +95,34 @@ func WithRecovery(exec func(), recoverFn func(r interface{})) {
exec()
}

// Recover includes operations such as recovering, clearing,and printing information.
// It will dump current goroutine stack into log if catch any recover result.
// metricsLabel: The label of PanicCounter metrics.
// funcInfo: Some information for the panic function.
// recoverFn: Handler will be called after recover and before dump stack, passing `nil` means noop.
// quit: If this value is true, the current program exits after recovery.
func Recover(metricsLabel, funcInfo string, recoverFn func(), quit bool) {
r := recover()
if r == nil {
return
}

if recoverFn != nil {
recoverFn()
}
logutil.BgLogger().Error("panic in the recoverable goroutine",
zap.String("label", metricsLabel),
zap.String("funcInfo", funcInfo),
zap.Reflect("r", r),
zap.String("stack", string(GetStack())))
metrics.PanicCounter.WithLabelValues(metricsLabel).Inc()
if quit {
// Wait for metrics to be pushed.
time.Sleep(time.Second * 15)
os.Exit(1)
}
}

// CompatibleParseGCTime parses a string with `GCTimeFormat` and returns a time.Time. If `value` can't be parsed as that
// format, truncate to last space and try again. This function is only useful when loading times that saved by
// gc_worker. We have changed the format that gc_worker saves time (removed the last field), but when loading times it
Expand Down

0 comments on commit 51015c1

Please sign in to comment.