Skip to content

Commit

Permalink
domain,owner: unify and normalize the format of the log (pingcap#9646)
Browse files Browse the repository at this point in the history
  • Loading branch information
xiekeyi98 authored and zz-jason committed Mar 25, 2019
1 parent 0313cbb commit 1154456
Show file tree
Hide file tree
Showing 4 changed files with 94 additions and 78 deletions.
90 changes: 47 additions & 43 deletions domain/domain.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@ import (
"github.com/pingcap/tidb/util"
"github.com/pingcap/tidb/util/logutil"
"github.com/pingcap/tidb/util/sqlexec"
log "github.com/sirupsen/logrus"
"go.uber.org/zap"
"google.golang.org/grpc"
"google.golang.org/grpc/keepalive"
Expand Down Expand Up @@ -96,24 +95,28 @@ func (do *Domain) loadInfoSchema(handle *infoschema.Handle, usedSchemaVersion in
// Update self schema version to etcd.
defer func() {
if err != nil {
log.Info("[ddl] not update self schema version to etcd")
logutil.Logger(context.Background()).Info("cannot update self schema version to etcd")
return
}
err = do.ddl.SchemaSyncer().UpdateSelfVersion(context.Background(), latestSchemaVersion)
if err != nil {
log.Infof("[ddl] update self version from %v to %v failed %v", usedSchemaVersion, latestSchemaVersion, err)
logutil.Logger(context.Background()).Info("update self version failed", zap.Int64("usedSchemaVersion", usedSchemaVersion),
zap.Int64("latestSchemaVersion", latestSchemaVersion), zap.Error(err))
}
}()

startTime := time.Now()
ok, tblIDs, err := do.tryLoadSchemaDiffs(m, usedSchemaVersion, latestSchemaVersion)
if err != nil {
// We can fall back to full load, don't need to return the error.
log.Errorf("[ddl] failed to load schema diff err %v", err)
logutil.Logger(context.Background()).Error("failed to load schema diff", zap.Error(err))
}
if ok {
log.Infof("[ddl] diff load InfoSchema from version %d to %d in %v, tableIDs %v",
usedSchemaVersion, latestSchemaVersion, time.Since(startTime), tblIDs)
logutil.Logger(context.Background()).Info("diff load InfoSchema from version failed",
zap.Int64("usedSchemaVersion", usedSchemaVersion),
zap.Int64("latestSchemaVersion", latestSchemaVersion),
zap.Duration("start time", time.Since(startTime)),
zap.Int64s("tblIDs", tblIDs))
return latestSchemaVersion, tblIDs, fullLoad, nil
}

Expand All @@ -127,8 +130,8 @@ func (do *Domain) loadInfoSchema(handle *infoschema.Handle, usedSchemaVersion in
if err != nil {
return 0, nil, fullLoad, errors.Trace(err)
}
log.Infof("[ddl] full load InfoSchema from version %d to %d, in %v",
usedSchemaVersion, latestSchemaVersion, time.Since(startTime))
logutil.Logger(context.Background()).Info("full load InfoSchema failed", zap.Int64("usedSchemaVersion", usedSchemaVersion),
zap.Int64("latestSchemaVersion", latestSchemaVersion), zap.Duration("start time", time.Since(startTime)))
newISBuilder.Build()
return latestSchemaVersion, nil, fullLoad, nil
}
Expand Down Expand Up @@ -329,7 +332,7 @@ func (do *Domain) Reload() error {
metrics.LoadSchemaCounter.WithLabelValues("succ").Inc()

if fullLoad {
log.Info("[ddl] full load and reset schema validator.")
logutil.Logger(context.Background()).Info("full load and reset schema validator")
do.SchemaValidator.Reset()
}
do.SchemaValidator.Update(ver.Ver, schemaVersion, latestSchemaVersion, changedTableIDs)
Expand All @@ -339,7 +342,7 @@ func (do *Domain) Reload() error {
// Reload interval is lease / 2, if load schema time elapses more than this interval,
// some query maybe responded by ErrInfoSchemaExpired error.
if sub > (lease/2) && lease > 0 {
log.Warnf("[ddl] loading schema takes a long time %v", sub)
logutil.Logger(context.Background()).Warn("loading schema takes a long time", zap.Duration("take time", sub))
}

return nil
Expand Down Expand Up @@ -405,11 +408,11 @@ func (do *Domain) infoSyncerKeeper() {
for {
select {
case <-do.info.Done():
log.Info("[ddl] server info syncer need to restart")
logutil.Logger(context.Background()).Info("server info syncer need to restart")
if err := do.info.Restart(context.Background()); err != nil {
log.Error(err)
logutil.Logger(context.Background()).Error("server restart failed", zap.Error(err))
}
log.Info("[ddl] server info syncer restarted.")
logutil.Logger(context.Background()).Info("server info syncer restarted")
case <-do.exit:
return
}
Expand All @@ -430,21 +433,21 @@ func (do *Domain) loadSchemaInLoop(lease time.Duration) {
case <-ticker.C:
err := do.Reload()
if err != nil {
log.Errorf("[ddl] reload schema in loop err %v", errors.ErrorStack(err))
logutil.Logger(context.Background()).Error("reload schema in loop failed", zap.Error(err))
}
case _, ok := <-syncer.GlobalVersionCh():
err := do.Reload()
if err != nil {
log.Errorf("[ddl] reload schema in loop err %v", errors.ErrorStack(err))
logutil.Logger(context.Background()).Error("reload schema in loop failed", zap.Error(err))
}
if !ok {
log.Warn("[ddl] reload schema in loop, schema syncer need rewatch")
logutil.Logger(context.Background()).Warn("reload schema in loop, schema syncer need rewatch")
// Make sure the rewatch doesn't affect load schema, so we watch the global schema version asynchronously.
syncer.WatchGlobalSchemaVer(context.Background())
}
case <-syncer.Done():
// The schema syncer stops, we need stop the schema validator to synchronize the schema version.
log.Info("[ddl] reload schema in loop, schema syncer need restart")
logutil.Logger(context.Background()).Info("reload schema in loop, schema syncer need restart")
// The etcd is responsible for schema synchronization, we should ensure there is at most two different schema version
// in the TiDB cluster, to make the data/schema be consistent. If we lost connection/session to etcd, the cluster
// will treats this TiDB as a down instance, and etcd will remove the key of `/tidb/ddl/all_schema_versions/tidb-id`.
Expand All @@ -454,18 +457,18 @@ func (do *Domain) loadSchemaInLoop(lease time.Duration) {
do.SchemaValidator.Stop()
err := do.mustRestartSyncer()
if err != nil {
log.Errorf("[ddl] reload schema in loop, schema syncer restart err %v", errors.ErrorStack(err))
logutil.Logger(context.Background()).Error("reload schema in loop, schema syncer restart failed", zap.Error(err))
break
}
// The schema maybe changed, must reload schema then the schema validator can restart.
exitLoop := do.mustReload()
if exitLoop {
// domain is closed.
log.Errorf("[ddl] domain is closed. exit loadSchemaInLoop")
logutil.Logger(context.Background()).Error("domain is closed, exit loadSchemaInLoop")
return
}
do.SchemaValidator.Restart()
log.Info("[ddl] schema syncer restarted.")
logutil.Logger(context.Background()).Info("schema syncer restarted")
case <-do.exit:
return
}
Expand All @@ -490,7 +493,7 @@ func (do *Domain) mustRestartSyncer() error {
default:
}
time.Sleep(time.Second)
log.Infof("[ddl] restart the schema syncer failed %v", err)
logutil.Logger(context.Background()).Info("restart the schema syncer failed", zap.Error(err))
}
}

Expand All @@ -500,15 +503,15 @@ func (do *Domain) mustReload() (exitLoop bool) {
for {
err := do.Reload()
if err == nil {
log.Infof("[ddl] mustReload succeed.")
logutil.Logger(context.Background()).Info("mustReload succeed")
return false
}

log.Infof("[ddl] reload the schema failed: %v", err)
logutil.Logger(context.Background()).Info("reload the schema failed", zap.Error(err))
// If the domain is closed, we returns immediately.
select {
case <-do.exit:
log.Infof("[ddl] domain is closed.")
logutil.Logger(context.Background()).Info("domain is closed")
return true
default:
}
Expand All @@ -532,7 +535,7 @@ func (do *Domain) Close() {
do.sysSessionPool.Close()
do.slowQuery.Close()
do.wg.Wait()
log.Info("[domain] close")
logutil.Logger(context.Background()).Info("domain closed")
}

type ddlCallback struct {
Expand All @@ -544,11 +547,11 @@ func (c *ddlCallback) OnChanged(err error) error {
if err != nil {
return err
}
log.Infof("[ddl] on DDL change, must reload")
logutil.Logger(context.Background()).Info("performing DDL change, must reload")

err = c.do.Reload()
if err != nil {
log.Errorf("[ddl] on DDL change reload err %v", err)
logutil.Logger(context.Background()).Error("performing DDL change failed", zap.Error(err))
}

return nil
Expand Down Expand Up @@ -748,7 +751,7 @@ func (do *Domain) LoadPrivilegeLoop(ctx sessionctx.Context) error {
case <-time.After(duration):
}
if !ok {
log.Error("[domain] load privilege loop watch channel closed.")
logutil.Logger(context.Background()).Error("load privilege loop watch channel closed")
watchCh = do.etcdClient.Watch(context.Background(), privilegeKey)
count++
if count > 10 {
Expand All @@ -761,9 +764,9 @@ func (do *Domain) LoadPrivilegeLoop(ctx sessionctx.Context) error {
err := do.privHandle.Update(ctx)
metrics.LoadPrivilegeCounter.WithLabelValues(metrics.RetLabel(err)).Inc()
if err != nil {
log.Error("[domain] load privilege fail:", errors.ErrorStack(err))
logutil.Logger(context.Background()).Error("load privilege failed", zap.Error(err))
} else {
log.Debug("[domain] reload privilege success.")
logutil.Logger(context.Background()).Debug("reload privilege success")
}
}
}()
Expand Down Expand Up @@ -873,7 +876,7 @@ func (do *Domain) newStatsOwner() owner.Manager {
// TODO: Need to do something when err is not nil.
err := statsOwner.CampaignOwner(cancelCtx)
if err != nil {
log.Warnf("[stats] campaign owner fail: %s", errors.ErrorStack(err))
logutil.Logger(context.Background()).Warn("campaign owner failed", zap.Error(err))
}
return statsOwner
}
Expand All @@ -898,9 +901,9 @@ func (do *Domain) updateStatsWorker(ctx sessionctx.Context, owner owner.Manager)
t := time.Now()
err := statsHandle.InitStats(do.InfoSchema())
if err != nil {
log.Debug("[stats] init stats info failed: ", errors.ErrorStack(err))
logutil.Logger(context.Background()).Debug("init stats info failed", zap.Error(err))
} else {
log.Info("[stats] init stats info takes ", time.Since(t))
logutil.Logger(context.Background()).Info("init stats info time", zap.Duration("take time", time.Since(t)))
}
defer func() {
do.SetStatsUpdating(false)
Expand All @@ -911,7 +914,7 @@ func (do *Domain) updateStatsWorker(ctx sessionctx.Context, owner owner.Manager)
case <-loadTicker.C:
err = statsHandle.Update(do.InfoSchema())
if err != nil {
log.Debug("[stats] update stats info fail: ", errors.ErrorStack(err))
logutil.Logger(context.Background()).Debug("update stats info failed", zap.Error(err))
}
case <-do.exit:
statsHandle.FlushStats()
Expand All @@ -920,18 +923,18 @@ func (do *Domain) updateStatsWorker(ctx sessionctx.Context, owner owner.Manager)
case t := <-statsHandle.DDLEventCh():
err = statsHandle.HandleDDLEvent(t)
if err != nil {
log.Debug("[stats] handle ddl event fail: ", errors.ErrorStack(err))
logutil.Logger(context.Background()).Debug("handle ddl event failed", zap.Error(err))
}
case <-deltaUpdateTicker.C:
err = statsHandle.DumpStatsDeltaToKV(statistics.DumpDelta)
if err != nil {
log.Debug("[stats] dump stats delta fail: ", errors.ErrorStack(err))
logutil.Logger(context.Background()).Debug("dump stats delta failed", zap.Error(err))
}
statsHandle.UpdateErrorRate(do.InfoSchema())
case <-loadHistogramTicker.C:
err = statsHandle.LoadNeededHistograms()
if err != nil {
log.Debug("[stats] load histograms fail: ", errors.ErrorStack(err))
logutil.Logger(context.Background()).Debug("load histograms failed", zap.Error(err))
}
case <-loadFeedbackTicker.C:
statsHandle.UpdateStatsByLocalFeedback(do.InfoSchema())
Expand All @@ -940,20 +943,20 @@ func (do *Domain) updateStatsWorker(ctx sessionctx.Context, owner owner.Manager)
}
err = statsHandle.HandleUpdateStats(do.InfoSchema())
if err != nil {
log.Debug("[stats] update stats using feedback fail: ", errors.ErrorStack(err))
logutil.Logger(context.Background()).Debug("update stats using feedback failed", zap.Error(err))
}
case <-dumpFeedbackTicker.C:
err = statsHandle.DumpStatsFeedbackToKV()
if err != nil {
log.Debug("[stats] dump stats feedback fail: ", errors.ErrorStack(err))
logutil.Logger(context.Background()).Debug("dump stats feedback failed", zap.Error(err))
}
case <-gcStatsTicker.C:
if !owner.IsOwner() {
continue
}
err = statsHandle.GCStats(do.InfoSchema(), do.DDL().GetLease())
if err != nil {
log.Debug("[stats] gc stats fail: ", errors.ErrorStack(err))
logutil.Logger(context.Background()).Debug("GC stats failed", zap.Error(err))
}
}
}
Expand Down Expand Up @@ -988,13 +991,13 @@ func (do *Domain) NotifyUpdatePrivilege(ctx sessionctx.Context) {
row := do.etcdClient.KV
_, err := row.Put(context.Background(), privilegeKey, "")
if err != nil {
log.Warn("notify update privilege failed:", err)
logutil.Logger(context.Background()).Warn("notify update privilege failed", zap.Error(err))
}
}
// update locally
_, _, err := ctx.(sqlexec.RestrictedSQLExecutor).ExecRestrictedSQL(ctx, `FLUSH PRIVILEGES`)
if err != nil {
log.Errorf("Unable to update privileges: %s", err)
logutil.Logger(context.Background()).Error("unable to update privileges", zap.Error(err))
}
}

Expand All @@ -1004,7 +1007,8 @@ func recoverInDomain(funcName string, quit bool) {
return
}
buf := util.GetStack()
log.Errorf("%s, %v, %s", funcName, r, buf)
logutil.Logger(context.Background()).Error("recover in domain failed", zap.String("funcName", funcName),
zap.Any("error", r), zap.String("buffer", string(buf)))
metrics.PanicCounter.WithLabelValues(metrics.LabelDomain).Inc()
if quit {
// Wait for metrics to be pushed.
Expand Down
10 changes: 6 additions & 4 deletions domain/info.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,9 @@ import (
"github.com/pingcap/tidb/ddl"
"github.com/pingcap/tidb/owner"
"github.com/pingcap/tidb/util/hack"
"github.com/pingcap/tidb/util/logutil"
"github.com/pingcap/tidb/util/printer"
log "github.com/sirupsen/logrus"
"go.uber.org/zap"
)

const (
Expand Down Expand Up @@ -139,7 +140,7 @@ func (is *InfoSyncer) RemoveServerInfo() {
}
err := ddl.DeleteKeyFromEtcd(is.serverInfoPath, is.etcdCli, keyOpDefaultRetryCnt, keyOpDefaultTimeout)
if err != nil {
log.Errorf("[info-syncer] remove server info failed %v", err)
logutil.Logger(context.Background()).Error("remove server info failed", zap.Error(err))
}
}

Expand Down Expand Up @@ -188,15 +189,16 @@ func getInfo(ctx context.Context, etcdCli *clientv3.Client, key string, retryCnt
resp, err = etcdCli.Get(childCtx, key, opts...)
cancel()
if err != nil {
log.Infof("[info-syncer] get %s failed %v, continue checking.", key, err)
logutil.Logger(context.Background()).Info("get key failed", zap.String("key", key), zap.Error(err))
time.Sleep(200 * time.Millisecond)
continue
}
for _, kv := range resp.Kvs {
info := &ServerInfo{}
err = json.Unmarshal(kv.Value, info)
if err != nil {
log.Infof("[info-syncer] get %s, json.Unmarshal %v failed %v.", kv.Key, kv.Value, err)
logutil.Logger(context.Background()).Info("get key failed", zap.String("key", string(kv.Key)), zap.ByteString("value", kv.Value),
zap.Error(err))
return nil, errors.Trace(err)
}
allInfo[info.ID] = info
Expand Down
Loading

0 comments on commit 1154456

Please sign in to comment.