Skip to content

Commit

Permalink
ddl: remove useless code related to legacy job queue (pingcap#56689)
Browse files Browse the repository at this point in the history
  • Loading branch information
D3Hunter authored Nov 5, 2024
1 parent 1de6f3e commit aa83e4e
Show file tree
Hide file tree
Showing 8 changed files with 10 additions and 511 deletions.
78 changes: 0 additions & 78 deletions pkg/ddl/ddl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -207,84 +207,6 @@ func TestIgnorableSpec(t *testing.T) {
}
}

func TestBuildJobDependence(t *testing.T) {
store := createMockStore(t)
defer func() {
require.NoError(t, store.Close())
}()
ctx := kv.WithInternalSourceType(context.Background(), kv.InternalTxnDDL)
// Add some non-add-index jobs.
job1 := &model.Job{ID: 1, TableID: 1, Version: model.JobVersion1, Type: model.ActionAddColumn}
job2 := &model.Job{ID: 2, TableID: 1, Version: model.JobVersion1, Type: model.ActionCreateTable}
job3 := &model.Job{ID: 3, TableID: 2, Version: model.JobVersion1, Type: model.ActionDropColumn}
job6 := &model.Job{ID: 6, TableID: 1, Version: model.JobVersion1, Type: model.ActionDropTable}
job7 := &model.Job{ID: 7, TableID: 2, Version: model.JobVersion1, Type: model.ActionModifyColumn}
job9 := &model.Job{ID: 9, SchemaID: 111, Version: model.JobVersion1, Type: model.ActionDropSchema}
job11 := &model.Job{ID: 11, TableID: 2, Version: model.JobVersion1, Type: model.ActionRenameTable}
job11.FillArgs(&model.RenameTableArgs{
OldSchemaID: 111,
NewTableName: pmodel.NewCIStr("new table name"),
OldSchemaName: pmodel.NewCIStr("old db name"),
})
err := kv.RunInNewTxn(ctx, store, false, func(ctx context.Context, txn kv.Transaction) error {
m := meta.NewMutator(txn)
require.NoError(t, m.EnQueueDDLJob(job1))
require.NoError(t, m.EnQueueDDLJob(job2))
require.NoError(t, m.EnQueueDDLJob(job3))
require.NoError(t, m.EnQueueDDLJob(job6))
require.NoError(t, m.EnQueueDDLJob(job7))
require.NoError(t, m.EnQueueDDLJob(job9))
require.NoError(t, m.EnQueueDDLJob(job11))
return nil
})
require.NoError(t, err)
job4 := &model.Job{ID: 4, TableID: 1, Type: model.ActionAddIndex}
err = kv.RunInNewTxn(ctx, store, false, func(ctx context.Context, txn kv.Transaction) error {
m := meta.NewMutator(txn)
err := buildJobDependence(m, job4)
require.NoError(t, err)
require.Equal(t, job4.DependencyID, int64(2))
return nil
})
require.NoError(t, err)
job5 := &model.Job{ID: 5, TableID: 2, Type: model.ActionAddIndex}
err = kv.RunInNewTxn(ctx, store, false, func(ctx context.Context, txn kv.Transaction) error {
m := meta.NewMutator(txn)
err := buildJobDependence(m, job5)
require.NoError(t, err)
require.Equal(t, job5.DependencyID, int64(3))
return nil
})
require.NoError(t, err)
job8 := &model.Job{ID: 8, TableID: 3, Type: model.ActionAddIndex}
err = kv.RunInNewTxn(ctx, store, false, func(ctx context.Context, txn kv.Transaction) error {
m := meta.NewMutator(txn)
err := buildJobDependence(m, job8)
require.NoError(t, err)
require.Equal(t, job8.DependencyID, int64(0))
return nil
})
require.NoError(t, err)
job10 := &model.Job{ID: 10, SchemaID: 111, TableID: 3, Type: model.ActionAddIndex}
err = kv.RunInNewTxn(ctx, store, false, func(ctx context.Context, txn kv.Transaction) error {
m := meta.NewMutator(txn)
err := buildJobDependence(m, job10)
require.NoError(t, err)
require.Equal(t, job10.DependencyID, int64(9))
return nil
})
require.NoError(t, err)
job12 := &model.Job{ID: 12, SchemaID: 112, TableID: 2, Type: model.ActionAddIndex}
err = kv.RunInNewTxn(ctx, store, false, func(ctx context.Context, txn kv.Transaction) error {
m := meta.NewMutator(txn)
err := buildJobDependence(m, job12)
require.NoError(t, err)
require.Equal(t, job12.DependencyID, int64(11))
return nil
})
require.NoError(t, err)
}

func TestError(t *testing.T) {
kvErrs := []*terror.Error{
dbterror.ErrDDLJobNotFound,
Expand Down
118 changes: 7 additions & 111 deletions pkg/ddl/job_submitter.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,22 +104,16 @@ func (s *JobSubmitter) addBatchDDLJobs(jobWs []*JobWrapper) {
err error
newWs []*JobWrapper
)
// DDLForce2Queue is a flag to tell DDL worker to always push the job to the DDL queue.
toTable := !variable.DDLForce2Queue.Load()
fastCreate := variable.EnableFastCreateTable.Load()
if toTable {
if fastCreate {
newWs, err = mergeCreateTableJobs(jobWs)
if err != nil {
logutil.DDLLogger().Warn("failed to merge create table jobs", zap.Error(err))
} else {
jobWs = newWs
}
if fastCreate {
newWs, err = mergeCreateTableJobs(jobWs)
if err != nil {
logutil.DDLLogger().Warn("failed to merge create table jobs", zap.Error(err))
} else {
jobWs = newWs
}
err = s.addBatchDDLJobs2Table(jobWs)
} else {
err = s.addBatchDDLJobs2Queue(jobWs)
}
err = s.addBatchDDLJobs2Table(jobWs)
var jobs string
for _, jobW := range jobWs {
if err == nil {
Expand All @@ -139,7 +133,6 @@ func (s *JobSubmitter) addBatchDDLJobs(jobWs []*JobWrapper) {
logutil.DDLLogger().Info("add DDL jobs",
zap.Int("batch count", len(jobWs)),
zap.String("jobs", jobs),
zap.Bool("table", toTable),
zap.Bool("fast_create", fastCreate))
}

Expand Down Expand Up @@ -302,12 +295,6 @@ func (s *JobSubmitter) addBatchDDLJobs2Table(jobWs []*JobWrapper) error {
}
startTS = txn.StartTS()

if variable.DDLForce2Queue.Load() {
if err := s.checkFlashbackJobInQueue(t); err != nil {
return err
}
}

return nil
})
if err != nil {
Expand Down Expand Up @@ -355,64 +342,6 @@ func (s *JobSubmitter) addBatchDDLJobs2Table(jobWs []*JobWrapper) error {
return nil
}

func (s *JobSubmitter) addBatchDDLJobs2Queue(jobWs []*JobWrapper) error {
ctx := kv.WithInternalSourceType(context.Background(), kv.InternalTxnDDL)
return kv.RunInNewTxn(ctx, s.store, true, func(_ context.Context, txn kv.Transaction) error {
t := meta.NewMutator(txn)

for _, jobW := range jobWs {
intest.Assert(jobW.Version != 0, "Job version should not be zero")
}

count := getRequiredGIDCount(jobWs)
ids, err := t.GenGlobalIDs(count)
if err != nil {
return errors.Trace(err)
}
assignGIDsForJobs(jobWs, ids)

if err := s.checkFlashbackJobInQueue(t); err != nil {
return errors.Trace(err)
}

for _, jobW := range jobWs {
jobW.FillArgsWithSubJobs()
job := jobW.Job
job.StartTS = txn.StartTS()
setJobStateToQueueing(job)
if err = buildJobDependence(t, job); err != nil {
return errors.Trace(err)
}
jobListKey := meta.DefaultJobListKey
if job.MayNeedReorg() {
jobListKey = meta.AddIndexJobListKey
}
if err = t.EnQueueDDLJob(job, jobListKey); err != nil {
return errors.Trace(err)
}
}
failpoint.Inject("mockAddBatchDDLJobsErr", func(val failpoint.Value) {
if val.(bool) {
failpoint.Return(errors.Errorf("mockAddBatchDDLJobsErr"))
}
})
return nil
})
}

func (*JobSubmitter) checkFlashbackJobInQueue(t *meta.Mutator) error {
jobs, err := t.GetAllDDLJobsInQueue(meta.DefaultJobListKey)
if err != nil {
return errors.Trace(err)
}
for _, job := range jobs {
if job.Type == model.ActionFlashbackCluster {
return errors.Errorf("Can't add ddl job, have flashback cluster job")
}
}
return nil
}

// GenGIDAndInsertJobsWithRetry generate job related global ID and inserts DDL jobs to the DDL job
// table with retry. job id allocation and job insertion are in the same transaction,
// as we want to make sure DDL jobs are inserted in id order, then we can query from
Expand Down Expand Up @@ -767,39 +696,6 @@ func setJobStateToQueueing(job *model.Job) {
job.State = model.JobStateQueueing
}

// buildJobDependence sets the curjob's dependency-ID.
// The dependency-job's ID must less than the current job's ID, and we need the largest one in the list.
func buildJobDependence(t *meta.Mutator, curJob *model.Job) error {
// Jobs in the same queue are ordered. If we want to find a job's dependency-job, we need to look for
// it from the other queue. So if the job is "ActionAddIndex" job, we need find its dependency-job from DefaultJobList.
jobListKey := meta.DefaultJobListKey
if !curJob.MayNeedReorg() {
jobListKey = meta.AddIndexJobListKey
}
jobs, err := t.GetAllDDLJobsInQueue(jobListKey)
if err != nil {
return errors.Trace(err)
}

for _, job := range jobs {
if curJob.ID < job.ID {
continue
}
isDependent, err := curJob.IsDependentOn(job)
if err != nil {
return errors.Trace(err)
}
if isDependent {
logutil.DDLLogger().Info("current DDL job depends on other job",
zap.Stringer("currentJob", curJob),
zap.Stringer("dependentJob", job))
curJob.DependencyID = job.ID
break
}
}
return nil
}

func (s *JobSubmitter) notifyNewJobSubmitted() {
if s.ownerManager.IsOwner() {
asyncNotify(s.ddlJobNotifyCh)
Expand Down
63 changes: 3 additions & 60 deletions pkg/meta/meta.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,9 +178,8 @@ type Option func(m *Mutator)

// Mutator is for handling meta information in a transaction.
type Mutator struct {
txn *structure.TxStructure
StartTS uint64 // StartTS is the txn's start TS.
jobListKey JobListKeyType
txn *structure.TxStructure
StartTS uint64 // StartTS is the txn's start TS.
}

var _ Reader = (*Mutator)(nil)
Expand All @@ -192,8 +191,7 @@ func NewMutator(txn kv.Transaction, options ...Option) *Mutator {
txn.SetDiskFullOpt(kvrpcpb.DiskFullOpt_AllowedOnAlmostFull)
t := structure.NewStructure(txn, txn, mMetaPrefix)
m := &Mutator{txn: t,
StartTS: txn.StartTS(),
jobListKey: DefaultJobListKey,
StartTS: txn.StartTS(),
}
for _, opt := range options {
opt(m)
Expand Down Expand Up @@ -1446,33 +1444,6 @@ var (
mDDLJobHistoryKey = []byte("DDLJobHistory")
)

var (
// DefaultJobListKey keeps all actions of DDL jobs except "add index".
// this and below list are always appended, so the order is the same as the
// job's creation order.
DefaultJobListKey JobListKeyType = mDDLJobListKey
// AddIndexJobListKey only keeps the action of adding index.
AddIndexJobListKey JobListKeyType = mDDLJobAddIdxList
)

func (m *Mutator) enQueueDDLJob(key []byte, job *model.Job) error {
b, err := job.Encode(true)
if err == nil {
err = m.txn.RPush(key, b)
}
return errors.Trace(err)
}

// EnQueueDDLJob adds a DDL job to the list.
func (m *Mutator) EnQueueDDLJob(job *model.Job, jobListKeys ...JobListKeyType) error {
listKey := m.jobListKey
if len(jobListKeys) != 0 {
listKey = jobListKeys[0]
}

return m.enQueueDDLJob(listKey, job)
}

// JobListKeyType is a key type of the DDL job queue.
type JobListKeyType []byte

Expand All @@ -1495,34 +1466,6 @@ func (m *Mutator) getDDLJob(key []byte, index int64) (*model.Job, error) {
return job, errors.Trace(err)
}

// GetAllDDLJobsInQueue gets all DDL Jobs in the current queue.
// The length of jobListKeys can only be 1 or 0.
// If its length is 1, we need to replace m.jobListKey with jobListKeys[0].
// Otherwise, we use m.jobListKey directly.
func (m *Mutator) GetAllDDLJobsInQueue(jobListKeys ...JobListKeyType) ([]*model.Job, error) {
listKey := m.jobListKey
if len(jobListKeys) != 0 {
listKey = jobListKeys[0]
}

values, err := m.txn.LGetAll(listKey)
if err != nil || values == nil {
return nil, errors.Trace(err)
}

jobs := make([]*model.Job, 0, len(values))
for _, val := range values {
job := &model.Job{}
err = job.Decode(val)
if err != nil {
return nil, errors.Trace(err)
}
jobs = append(jobs, job)
}

return jobs, nil
}

func (*Mutator) jobIDKey(id int64) []byte {
b := make([]byte, 8)
binary.BigEndian.PutUint64(b, uint64(id))
Expand Down
Loading

0 comments on commit aa83e4e

Please sign in to comment.