Skip to content

Commit

Permalink
store/copr: use a ttl duration to protect a new recovered tiflash nod…
Browse files Browse the repository at this point in the history
…e from processing mpp tasks. (pingcap#26793)
  • Loading branch information
hanfei1991 authored Aug 4, 2021
1 parent 853c41e commit 47514c2
Show file tree
Hide file tree
Showing 8 changed files with 67 additions and 38 deletions.
3 changes: 0 additions & 3 deletions executor/tiflash_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -526,9 +526,6 @@ func (s *tiflashTestSuite) TestDispatchTaskRetry(c *C) {
c.Assert(failpoint.Enable("github.com/pingcap/tidb/store/mockstore/unistore/mppDispatchTimeout", "3*return(true)"), IsNil)
tk.MustQuery("select count(*) from t").Check(testkit.Rows("4"))
c.Assert(failpoint.Disable("github.com/pingcap/tidb/store/mockstore/unistore/mppDispatchTimeout"), IsNil)
c.Assert(failpoint.Enable("github.com/pingcap/tidb/store/mockstore/unistore/mppConnTimeout", "3*return(true)"), IsNil)
tk.MustQuery("select count(*) from t").Check(testkit.Rows("4"))
c.Assert(failpoint.Disable("github.com/pingcap/tidb/store/mockstore/unistore/mppConnTimeout"), IsNil)
}

func (s *tiflashTestSuite) TestCancelMppTasks(c *C) {
Expand Down
3 changes: 2 additions & 1 deletion kv/mpp.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ package kv

import (
"context"
"time"

"github.com/pingcap/kvproto/pkg/mpp"
)
Expand Down Expand Up @@ -76,7 +77,7 @@ type MPPDispatchRequest struct {
type MPPClient interface {
// ConstructMPPTasks schedules task for a plan fragment.
// TODO:: This interface will be refined after we support more executors.
ConstructMPPTasks(context.Context, *MPPBuildTasksRequest) ([]MPPTaskMeta, error)
ConstructMPPTasks(context.Context, *MPPBuildTasksRequest, map[string]time.Time, time.Duration) ([]MPPTaskMeta, error)

// DispatchMPPTasks dispatches ALL mpp requests at once, and returns an iterator that transfers the data.
DispatchMPPTasks(ctx context.Context, vars interface{}, reqs []*MPPDispatchRequest) Response
Expand Down
8 changes: 7 additions & 1 deletion planner/core/fragment.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ package core

import (
"context"
"time"

"github.com/pingcap/errors"
"github.com/pingcap/parser/model"
Expand Down Expand Up @@ -346,7 +347,12 @@ func (e *mppTaskGenerator) constructMPPTasksImpl(ctx context.Context, ts *Physic

func (e *mppTaskGenerator) constructMPPTasksForSinglePartitionTable(ctx context.Context, kvRanges []kv.KeyRange, tableID int64) ([]*kv.MPPTask, error) {
req := &kv.MPPBuildTasksRequest{KeyRanges: kvRanges}
metas, err := e.ctx.GetMPPClient().ConstructMPPTasks(ctx, req)
ttl, err := time.ParseDuration(e.ctx.GetSessionVars().MPPStoreFailTTL)
if err != nil {
logutil.BgLogger().Warn("MPP store fail ttl is invalid", zap.Error(err))
ttl = 30 * time.Second
}
metas, err := e.ctx.GetMPPClient().ConstructMPPTasks(ctx, req, e.ctx.GetSessionVars().MPPStoreLastFailTime, ttl)
if err != nil {
return nil, errors.Trace(err)
}
Expand Down
9 changes: 9 additions & 0 deletions sessionctx/variable/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -876,6 +876,12 @@ type SessionVars struct {

// TemporaryTableData stores committed kv values for temporary table for current session.
TemporaryTableData kv.MemBuffer

// MPPStoreLastFailTime records the lastest fail time that a TiFlash store failed.
MPPStoreLastFailTime map[string]time.Time

// MPPStoreFailTTL indicates the duration that protect TiDB from sending task to a new recovered TiFlash.
MPPStoreFailTTL string
}

// AllocMPPTaskID allocates task id for mpp tasks. It will reset the task id if the query's
Expand Down Expand Up @@ -1097,6 +1103,8 @@ func NewSessionVars() *SessionVars {
CTEMaxRecursionDepth: DefCTEMaxRecursionDepth,
TMPTableSize: DefTMPTableSize,
EnableGlobalTemporaryTable: DefTiDBEnableGlobalTemporaryTable,
MPPStoreLastFailTime: make(map[string]time.Time),
MPPStoreFailTTL: DefTiDBMPPStoreFailTTL,
}
vars.KVVars = tikvstore.NewVariables(&vars.Killed)
vars.Concurrency = Concurrency{
Expand Down Expand Up @@ -1146,6 +1154,7 @@ func NewSessionVars() *SessionVars {
vars.allowMPPExecution = DefTiDBAllowMPPExecution
vars.HashExchangeWithNewCollation = DefTiDBHashExchangeWithNewCollation
vars.enforceMPPExecution = DefTiDBEnforceMPPExecution
vars.MPPStoreFailTTL = DefTiDBMPPStoreFailTTL

var enableChunkRPC string
if config.GetGlobalConfig().TiKVClient.EnableChunkRPC {
Expand Down
4 changes: 4 additions & 0 deletions sessionctx/variable/sysvar.go
Original file line number Diff line number Diff line change
Expand Up @@ -843,6 +843,10 @@ var defaultSysVars = []*SysVar{
s.allowMPPExecution = TiDBOptOn(val)
return nil
}},
{Scope: ScopeGlobal | ScopeSession, Name: TiDBMPPStoreFailTTL, Type: TypeStr, Value: DefTiDBMPPStoreFailTTL, SetSession: func(s *SessionVars, val string) error {
s.MPPStoreFailTTL = val
return nil
}},
{Scope: ScopeGlobal | ScopeSession, Name: TiDBHashExchangeWithNewCollation, Type: TypeBool, Value: BoolToOnOff(DefTiDBHashExchangeWithNewCollation), SetSession: func(s *SessionVars, val string) error {
s.HashExchangeWithNewCollation = TiDBOptOn(val)
return nil
Expand Down
5 changes: 5 additions & 0 deletions sessionctx/variable/tidb_vars.go
Original file line number Diff line number Diff line change
Expand Up @@ -315,6 +315,10 @@ const (
// Note if you want to set `tidb_enforce_mpp` to `true`, you must set `tidb_allow_mpp` to `true` first.
TiDBEnforceMPPExecution = "tidb_enforce_mpp"

// TiDBMPPStoreFailTTL is the unavailable time when a store is detected failed. During that time, tidb will not send any task to
// TiFlash even though the failed TiFlash node has been recovered.
TiDBMPPStoreFailTTL = "tidb_mpp_store_fail_ttl"

// TiDBInitChunkSize is used to control the init chunk size during query execution.
TiDBInitChunkSize = "tidb_init_chunk_size"

Expand Down Expand Up @@ -668,6 +672,7 @@ const (
DefTiDBAllowMPPExecution = true
DefTiDBHashExchangeWithNewCollation = true
DefTiDBEnforceMPPExecution = false
DefTiDBMPPStoreFailTTL = "60s"
DefTiDBTxnMode = ""
DefTiDBRowFormatV1 = 1
DefTiDBRowFormatV2 = 2
Expand Down
57 changes: 35 additions & 22 deletions store/copr/batch_coprocessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,10 +103,11 @@ func (rs *batchCopResponse) RespTime() time.Duration {
// 2. for the remaining regions:
// if there is only 1 available store, then put the region to the related store
// otherwise, use a greedy algorithm to put it into the store with highest weight
func balanceBatchCopTask(ctx context.Context, kvStore *kvStore, originalTasks []*batchCopTask, isMPP bool) []*batchCopTask {
func balanceBatchCopTask(ctx context.Context, kvStore *kvStore, originalTasks []*batchCopTask, mppStoreLastFailTime map[string]time.Time, ttl time.Duration) []*batchCopTask {
if len(originalTasks) <= 1 {
return originalTasks
}
isMPP := mppStoreLastFailTime != nil
cache := kvStore.GetRegionCache()
storeTaskMap := make(map[uint64]*batchCopTask)
// storeCandidateRegionMap stores all the possible store->region map. Its content is
Expand All @@ -133,30 +134,42 @@ func balanceBatchCopTask(ctx context.Context, kvStore *kvStore, originalTasks []
var wg sync.WaitGroup
var mu sync.Mutex
wg.Add(len(stores))
cur := time.Now()
for i := range stores {
go func(idx int) {
defer wg.Done()
s := stores[idx]
aliveReq := tikvrpc.NewRequest(tikvrpc.CmdMPPAlive, &mpp.IsAliveRequest{}, kvrpcpb.Context{})
aliveReq.StoreTp = tikvrpc.TiFlash
alive := false
resp, err := kvStore.GetTiKVClient().SendRequest(ctx, s.GetAddr(), aliveReq, 3*time.Second)
if err != nil {

var last time.Time
var ok bool
mu.Lock()
if last, ok = mppStoreLastFailTime[s.GetAddr()]; ok && cur.Sub(last) < 100*time.Millisecond {
// The interval time is so short that may happen in a same query, so we needn't to check again.
mu.Unlock()
return
}
mu.Unlock()

resp, err := kvStore.GetTiKVClient().SendRequest(ctx, s.GetAddr(), &tikvrpc.Request{
Type: tikvrpc.CmdMPPAlive,
StoreTp: tikvrpc.TiFlash,
Req: &mpp.IsAliveRequest{},
Context: kvrpcpb.Context{},
}, 2*time.Second)

if err != nil || !resp.Resp.(*mpp.IsAliveResponse).Available {
logutil.BgLogger().Warn("Cannot detect store's availability", zap.String("store address", s.GetAddr()), zap.String("err message", err.Error()))
} else {
rpcResp := resp.Resp.(*mpp.IsAliveResponse)
if rpcResp.Available {
alive = true
} else {
logutil.BgLogger().Warn("Cannot detect store's availability", zap.String("store address", s.GetAddr()))
}
mu.Lock()
mppStoreLastFailTime[s.GetAddr()] = time.Now()
mu.Unlock()
return
}
if !alive {

if cur.Sub(last) < ttl {
logutil.BgLogger().Warn("Cannot detect store's availability because the current time has not reached MPPStoreLastFailTime + MPPStoreFailTTL", zap.String("store address", s.GetAddr()), zap.Time("last fail time", last))
return
}

mu.Lock()
defer mu.Unlock()
storeTaskMap[s.StoreID()] = &batchCopTask{
storeAddr: s.GetAddr(),
cmdType: originalTasks[0].cmdType,
Expand Down Expand Up @@ -292,7 +305,7 @@ func balanceBatchCopTask(ctx context.Context, kvStore *kvStore, originalTasks []
return ret
}

func buildBatchCopTasks(bo *Backoffer, store *kvStore, ranges *KeyRanges, storeType kv.StoreType, isMPP bool) ([]*batchCopTask, error) {
func buildBatchCopTasks(bo *backoff.Backoffer, store *kvStore, ranges *KeyRanges, storeType kv.StoreType, mppStoreLastFailTime map[string]time.Time, ttl time.Duration) ([]*batchCopTask, error) {
cache := store.GetRegionCache()
start := time.Now()
const cmdType = tikvrpc.CmdBatchCop
Expand Down Expand Up @@ -367,7 +380,7 @@ func buildBatchCopTasks(bo *Backoffer, store *kvStore, ranges *KeyRanges, storeT
}
logutil.BgLogger().Debug(msg)
}
batchTasks = balanceBatchCopTask(bo.GetCtx(), store, batchTasks, isMPP)
batchTasks = balanceBatchCopTask(bo.GetCtx(), store, batchTasks, mppStoreLastFailTime, ttl)
if log.GetLevel() <= zap.DebugLevel {
msg := "After region balance:"
for _, task := range batchTasks {
Expand All @@ -394,7 +407,7 @@ func (c *CopClient) sendBatch(ctx context.Context, req *kv.Request, vars *tikv.V
ctx = context.WithValue(ctx, tikv.TxnStartKey(), req.StartTs)
bo := backoff.NewBackofferWithVars(ctx, copBuildTaskMaxBackoff, vars)
ranges := NewKeyRanges(req.KeyRanges)
tasks, err := buildBatchCopTasks(bo, c.store.kvStore, ranges, req.StoreType, false)
tasks, err := buildBatchCopTasks(bo, c.store.kvStore, ranges, req.StoreType, nil, 0)
if err != nil {
return copErrorResponse{err}
}
Expand Down Expand Up @@ -528,19 +541,19 @@ func (b *batchCopIterator) handleTask(ctx context.Context, bo *Backoffer, task *
}

// Merge all ranges and request again.
func (b *batchCopIterator) retryBatchCopTask(ctx context.Context, bo *Backoffer, batchTask *batchCopTask) ([]*batchCopTask, error) {
func (b *batchCopIterator) retryBatchCopTask(ctx context.Context, bo *backoff.Backoffer, batchTask *batchCopTask) ([]*batchCopTask, error) {
var ranges []kv.KeyRange
for _, ri := range batchTask.regionInfos {
ri.Ranges.Do(func(ran *kv.KeyRange) {
ranges = append(ranges, *ran)
})
}
return buildBatchCopTasks(bo, b.store, NewKeyRanges(ranges), b.req.StoreType, false)
return buildBatchCopTasks(bo, b.store, NewKeyRanges(ranges), b.req.StoreType, nil, 0)
}

const readTimeoutUltraLong = 3600 * time.Second // For requests that may scan many regions for tiflash.

func (b *batchCopIterator) handleTaskOnce(ctx context.Context, bo *Backoffer, task *batchCopTask) ([]*batchCopTask, error) {
func (b *batchCopIterator) handleTaskOnce(ctx context.Context, bo *backoff.Backoffer, task *batchCopTask) ([]*batchCopTask, error) {
sender := NewRegionBatchRequestSender(b.store.GetRegionCache(), b.store.GetTiKVClient())
var regionInfos = make([]*coprocessor.RegionInfo, 0, len(task.regionInfos))
for _, ri := range task.regionInfos {
Expand Down
16 changes: 5 additions & 11 deletions store/copr/mpp.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,14 +57,14 @@ func (c *MPPClient) selectAllTiFlashStore() []kv.MPPTaskMeta {
}

// ConstructMPPTasks receives ScheduleRequest, which are actually collects of kv ranges. We allocates MPPTaskMeta for them and returns.
func (c *MPPClient) ConstructMPPTasks(ctx context.Context, req *kv.MPPBuildTasksRequest) ([]kv.MPPTaskMeta, error) {
func (c *MPPClient) ConstructMPPTasks(ctx context.Context, req *kv.MPPBuildTasksRequest, mppStoreLastFailTime map[string]time.Time, ttl time.Duration) ([]kv.MPPTaskMeta, error) {
ctx = context.WithValue(ctx, tikv.TxnStartKey(), req.StartTS)
bo := backoff.NewBackofferWithVars(ctx, copBuildTaskMaxBackoff, nil)
if req.KeyRanges == nil {
return c.selectAllTiFlashStore(), nil
}
ranges := NewKeyRanges(req.KeyRanges)
tasks, err := buildBatchCopTasks(bo, c.store, ranges, kv.TiFlash, true)
tasks, err := buildBatchCopTasks(bo, c.store, ranges, kv.TiFlash, mppStoreLastFailTime, ttl)
if err != nil {
return nil, errors.Trace(err)
}
Expand Down Expand Up @@ -331,15 +331,9 @@ func (m *mppIterator) establishMPPConns(bo *Backoffer, req *kv.MPPDispatchReques
rpcResp, err := m.store.GetTiKVClient().SendRequest(bo.GetCtx(), req.Meta.GetAddress(), wrappedReq, readTimeoutUltraLong)

if err != nil {
logutil.BgLogger().Warn("establish mpp connection meet error, and retrying", zap.String("error", err.Error()), zap.Uint64("timestamp", taskMeta.StartTs), zap.Int64("task", taskMeta.TaskId))
err = bo.Backoff(tikv.BoTiFlashRPC(), err)
if err != nil {
logutil.BgLogger().Warn("establish mpp connection meet error and cannot retry", zap.String("error", err.Error()), zap.Uint64("timestamp", taskMeta.StartTs), zap.Int64("task", taskMeta.TaskId))
// we return timeout to trigger tikv's fallback
m.sendError(derr.ErrTiFlashServerTimeout)
return
}
m.establishMPPConns(bo, req, taskMeta)
logutil.BgLogger().Warn("establish mpp connection meet error and cannot retry", zap.String("error", err.Error()), zap.Uint64("timestamp", taskMeta.StartTs), zap.Int64("task", taskMeta.TaskId))
// we return timeout to trigger tikv's fallback
m.sendError(derr.ErrTiFlashServerTimeout)
return
}

Expand Down

0 comments on commit 47514c2

Please sign in to comment.