Skip to content

Commit

Permalink
mpp: Fix the crash or error when mpp generate empty task list. (pingc…
Browse files Browse the repository at this point in the history
  • Loading branch information
LittleFall authored Jan 14, 2022
1 parent 9e948bd commit f1f9230
Show file tree
Hide file tree
Showing 7 changed files with 28 additions and 11 deletions.
4 changes: 2 additions & 2 deletions distsql/distsql.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,10 @@ import (
)

// DispatchMPPTasks dispatches all tasks and returns an iterator.
func DispatchMPPTasks(ctx context.Context, sctx sessionctx.Context, tasks []*kv.MPPDispatchRequest, fieldTypes []*types.FieldType, planIDs []int, rootID int) (SelectResult, error) {
func DispatchMPPTasks(ctx context.Context, sctx sessionctx.Context, tasks []*kv.MPPDispatchRequest, fieldTypes []*types.FieldType, planIDs []int, rootID int, startTs uint64) (SelectResult, error) {
ctx = WithSQLKvExecCounterInterceptor(ctx, sctx.GetSessionVars().StmtCtx)
_, allowTiFlashFallback := sctx.GetSessionVars().AllowFallbackToTiKV[kv.TiFlash]
resp := sctx.GetMPPClient().DispatchMPPTasks(ctx, sctx.GetSessionVars().KVVars, tasks, allowTiFlashFallback)
resp := sctx.GetMPPClient().DispatchMPPTasks(ctx, sctx.GetSessionVars().KVVars, tasks, allowTiFlashFallback, startTs)
if resp == nil {
return nil, errors.New("client returns nil response")
}
Expand Down
2 changes: 1 addition & 1 deletion executor/mpp_gather.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ func (e *MPPGather) Open(ctx context.Context) (err error) {
failpoint.Return(errors.Errorf("The number of tasks is not right, expect %d tasks but actually there are %d tasks", val.(int), len(e.mppReqs)))
}
})
e.respIter, err = distsql.DispatchMPPTasks(ctx, e.ctx, e.mppReqs, e.retFieldTypes, planIDs, e.id)
e.respIter, err = distsql.DispatchMPPTasks(ctx, e.ctx, e.mppReqs, e.retFieldTypes, planIDs, e.id, e.startTS)
if err != nil {
return errors.Trace(err)
}
Expand Down
2 changes: 1 addition & 1 deletion kv/mpp.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ type MPPClient interface {
ConstructMPPTasks(context.Context, *MPPBuildTasksRequest, map[string]time.Time, time.Duration) ([]MPPTaskMeta, error)

// DispatchMPPTasks dispatches ALL mpp requests at once, and returns an iterator that transfers the data.
DispatchMPPTasks(ctx context.Context, vars interface{}, reqs []*MPPDispatchRequest, needTriggerFallback bool) Response
DispatchMPPTasks(ctx context.Context, vars interface{}, reqs []*MPPDispatchRequest, needTriggerFallback bool, startTs uint64) Response
}

// MPPBuildTasksRequest request the stores allocation for a mpp plan fragment.
Expand Down
5 changes: 1 addition & 4 deletions planner/core/fragment.go
Original file line number Diff line number Diff line change
Expand Up @@ -252,17 +252,14 @@ func (e *mppTaskGenerator) generateMPPTasksForFragment(f *Fragment) (tasks []*kv
for _, r := range f.ExchangeReceivers {
childrenTasks = append(childrenTasks, r.Tasks...)
}
if f.singleton {
if f.singleton && len(childrenTasks) > 0 {
childrenTasks = childrenTasks[0:1]
}
tasks = e.constructMPPTasksByChildrenTasks(childrenTasks)
}
if err != nil {
return nil, errors.Trace(err)
}
if len(tasks) == 0 {
return nil, errors.New("cannot find mpp task")
}
for _, r := range f.ExchangeReceivers {
for _, frag := range r.frags {
frag.ExchangeSender.TargetTasks = append(frag.ExchangeSender.TargetTasks, tasks...)
Expand Down
5 changes: 4 additions & 1 deletion store/copr/batch_coprocessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -291,6 +291,10 @@ func balanceBatchCopTaskWithContinuity(storeTaskMap map[uint64]*batchCopTask, ca
// The second balance strategy: Not only consider the region count between TiFlash stores, but also try to make the regions' range continuous(stored in TiFlash closely).
// If balanceWithContinuity is true, the second balance strategy is enable.
func balanceBatchCopTask(ctx context.Context, kvStore *kvStore, originalTasks []*batchCopTask, mppStoreLastFailTime map[string]time.Time, ttl time.Duration, balanceWithContinuity bool, balanceContinuousRegionCount int64) []*batchCopTask {
if len(originalTasks) == 0 {
log.Info("Batch cop task balancer got an empty task set.")
return originalTasks
}
isMPP := mppStoreLastFailTime != nil
// for mpp, we still need to detect the store availability
if len(originalTasks) <= 1 && !isMPP {
Expand Down Expand Up @@ -525,7 +529,6 @@ func buildBatchCopTasks(bo *backoff.Backoffer, store *kvStore, ranges *KeyRanges
const cmdType = tikvrpc.CmdBatchCop
rangesLen := ranges.Len()
for {

locations, err := cache.SplitKeyRangesByLocations(bo, ranges)
if err != nil {
return nil, errors.Trace(err)
Expand Down
17 changes: 17 additions & 0 deletions store/copr/batch_coprocessor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"sort"
"strconv"
"testing"
"time"

"github.com/pingcap/tidb/kv"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -115,6 +116,22 @@ func TestBalanceBatchCopTaskWithContinuity(t *testing.T) {
}
}

func TestBalanceBatchCopTaskWithEmptyTaskSet(t *testing.T) {
{
var nilTaskSet []*batchCopTask
nilResult := balanceBatchCopTask(nil, nil, nilTaskSet, nil, time.Second, false, 0)
require.True(t, nilResult == nil)
}

{
emptyTaskSet := make([]*batchCopTask, 0)
emptyResult := balanceBatchCopTask(nil, nil, emptyTaskSet, nil, time.Second, false, 0)
require.True(t, emptyResult != nil)
require.True(t, len(emptyResult) == 0)
}

}

func TestDeepCopyStoreTaskMap(t *testing.T) {
storeTasks1 := buildStoreTaskMap(10)
for _, task := range storeTasks1 {
Expand Down
4 changes: 2 additions & 2 deletions store/copr/mpp.go
Original file line number Diff line number Diff line change
Expand Up @@ -491,7 +491,7 @@ func (m *mppIterator) Next(ctx context.Context) (kv.ResultSubset, error) {
}

// DispatchMPPTasks dispatches all the mpp task and waits for the responses.
func (c *MPPClient) DispatchMPPTasks(ctx context.Context, variables interface{}, dispatchReqs []*kv.MPPDispatchRequest, needTriggerFallback bool) kv.Response {
func (c *MPPClient) DispatchMPPTasks(ctx context.Context, variables interface{}, dispatchReqs []*kv.MPPDispatchRequest, needTriggerFallback bool, startTs uint64) kv.Response {
vars := variables.(*tikv.Variables)
ctxChild, cancelFunc := context.WithCancel(ctx)
iter := &mppIterator{
Expand All @@ -500,7 +500,7 @@ func (c *MPPClient) DispatchMPPTasks(ctx context.Context, variables interface{},
finishCh: make(chan struct{}),
cancelFunc: cancelFunc,
respChan: make(chan *mppResponse, 4096),
startTs: dispatchReqs[0].StartTs,
startTs: startTs,
vars: vars,
needTriggerFallback: needTriggerFallback,
enableCollectExecutionInfo: config.GetGlobalConfig().EnableCollectExecutionInfo,
Expand Down

0 comments on commit f1f9230

Please sign in to comment.