Skip to content

Commit

Permalink
*: move ddl notifier as domain member and test pub/sub (pingcap#56776)
Browse files Browse the repository at this point in the history
  • Loading branch information
lance6716 authored Oct 23, 2024
1 parent 64ecfa1 commit ba5823b
Show file tree
Hide file tree
Showing 19 changed files with 216 additions and 169 deletions.
5 changes: 4 additions & 1 deletion pkg/ddl/constant.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,8 +120,11 @@ const (
key idx_task_key(task_key),
key idx_state_update_time(state_update_time))`

// NotifierTableName is `tidb_ddl_notifier`.
NotifierTableName = "tidb_ddl_notifier"

// NotifierTableSQL is the CREATE TABLE SQL of `tidb_ddl_notifier`.
NotifierTableSQL = `CREATE TABLE tidb_ddl_notifier (
NotifierTableSQL = `CREATE TABLE ` + NotifierTableName + ` (
ddl_job_id BIGINT,
sub_job_id BIGINT COMMENT '-1 if the schema change does not belong to a multi-schema change DDL or a merged DDL. 0 or positive numbers representing the sub-job index of a multi-schema change DDL or a merged DDL',
schema_change LONGBLOB COMMENT 'SchemaChangeEvent at rest',
Expand Down
40 changes: 25 additions & 15 deletions pkg/ddl/ddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -262,6 +262,7 @@ type ddl struct {
enableTiFlashPoll *atomicutil.Bool
sysTblMgr systable.Manager
minJobIDRefresher *systable.MinJobIDRefresher
eventPublishStore notifier.Store

executor *executor
jobSubmitter *JobSubmitter
Expand Down Expand Up @@ -572,14 +573,35 @@ func asyncNotifyEvent(jobCtx *jobContext, e *notifier.SchemaChangeEvent, job *mo
return nil
}

if intest.InTest && notifier.DefaultStore != nil {
ch := jobCtx.oldDDLCtx.ddlEventCh
if ch != nil {
forLoop:
for i := 0; i < 10; i++ {
select {
case ch <- e:
break forLoop
default:
time.Sleep(time.Microsecond * 10)
}
}
logutil.DDLLogger().Warn("fail to notify DDL event", zap.Stringer("event", e))
}

if intest.InTest && jobCtx.eventPublishStore != nil {
failpoint.Inject("asyncNotifyEventError", func() {
failpoint.Return(errors.New("mock publish event error"))
})
if subJobID == noSubJob && job.MultiSchemaInfo != nil {
subJobID = int64(job.MultiSchemaInfo.Seq)
}
err := notifier.PubSchemaChange(jobCtx.ctx, sctx, job.ID, subJobID, e)
err := notifier.PubSchemeChangeToStore(
jobCtx.stepCtx,
sctx,
job.ID,
subJobID,
e,
jobCtx.eventPublishStore,
)
if err != nil {
logutil.DDLLogger().Error("Error publish schema change event",
zap.Int64("jobID", job.ID),
Expand All @@ -589,19 +611,6 @@ func asyncNotifyEvent(jobCtx *jobContext, e *notifier.SchemaChangeEvent, job *mo
}
return nil
}

ch := jobCtx.oldDDLCtx.ddlEventCh
if ch != nil {
for i := 0; i < 10; i++ {
select {
case ch <- e:
return nil
default:
time.Sleep(time.Microsecond * 10)
}
}
logutil.DDLLogger().Warn("fail to notify DDL event", zap.Stringer("event", e))
}
return nil
}

Expand Down Expand Up @@ -665,6 +674,7 @@ func newDDL(ctx context.Context, options ...Option) (*ddl, *executor) {
d := &ddl{
ddlCtx: ddlCtx,
enableTiFlashPoll: atomicutil.NewBool(true),
eventPublishStore: opt.EventPublishStore,
}

taskexecutor.RegisterTaskType(proto.Backfill,
Expand Down
9 changes: 6 additions & 3 deletions pkg/ddl/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -1095,14 +1095,17 @@ SwitchIndexState:
}
job.FillFinishedArgs(a)

addIndexEvent := notifier.NewAddIndexEvent(tblInfo, allIndexInfos)
err2 := asyncNotifyEvent(jobCtx, addIndexEvent, job, noSubJob, w.sess)
if err2 != nil {
return ver, errors.Trace(err2)
}

// Finish this job.
job.FinishTableJob(model.JobStateDone, model.StatePublic, ver, tblInfo)
if !job.ReorgMeta.IsDistReorg && job.ReorgMeta.ReorgTp == model.ReorgTypeLitMerge {
ingest.LitBackCtxMgr.Unregister(job.ID)
}
// TODO: store this event to the notifier.
// For now, it is not used and just for placeholder.
_ = notifier.NewAddIndexEvent(tblInfo, allIndexInfos)
logutil.DDLLogger().Info("run add index job done",
zap.String("charset", job.Charset),
zap.String("collation", job.Collate))
Expand Down
4 changes: 4 additions & 0 deletions pkg/ddl/job_scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"github.com/pingcap/kvproto/pkg/kvrpcpb"
"github.com/pingcap/tidb/pkg/ddl/ingest"
"github.com/pingcap/tidb/pkg/ddl/logutil"
"github.com/pingcap/tidb/pkg/ddl/notifier"
"github.com/pingcap/tidb/pkg/ddl/schemaver"
"github.com/pingcap/tidb/pkg/ddl/serverstate"
sess "github.com/pingcap/tidb/pkg/ddl/session"
Expand Down Expand Up @@ -108,6 +109,7 @@ func (l *ownerListener) OnBecomeOwner() {
unSyncedTracker: newUnSyncedJobTracker(),
schemaVerMgr: newSchemaVersionManager(l.ddl.store),
schemaVerSyncer: l.ddl.schemaVerSyncer,
eventPublishStore: l.ddl.eventPublishStore,

ddlCtx: l.ddl.ddlCtx,
ddlJobNotifyCh: l.jobSubmitter.ddlJobNotifyCh,
Expand Down Expand Up @@ -141,6 +143,7 @@ type jobScheduler struct {
unSyncedTracker *unSyncedJobTracker
schemaVerMgr *schemaVersionManager
schemaVerSyncer schemaver.Syncer
eventPublishStore notifier.Store

// those fields are created or initialized on start
reorgWorkerPool *workerPool
Expand Down Expand Up @@ -534,6 +537,7 @@ func (s *jobScheduler) getJobRunCtx(jobID int64, traceInfo *model.TraceInfo) *jo
autoidCli: s.autoidCli,
store: s.store,
schemaVerSyncer: s.schemaVerSyncer,
eventPublishStore: s.eventPublishStore,

notifyCh: ch,
logger: tidblogutil.LoggerWithTraceInfo(
Expand Down
12 changes: 7 additions & 5 deletions pkg/ddl/job_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/pingcap/failpoint"
"github.com/pingcap/kvproto/pkg/kvrpcpb"
"github.com/pingcap/tidb/pkg/ddl/logutil"
"github.com/pingcap/tidb/pkg/ddl/notifier"
"github.com/pingcap/tidb/pkg/ddl/schemaver"
sess "github.com/pingcap/tidb/pkg/ddl/session"
"github.com/pingcap/tidb/pkg/ddl/systable"
Expand Down Expand Up @@ -81,11 +82,12 @@ type jobContext struct {
*schemaVersionManager
// ctx is the context of job scheduler. When worker is running the job, it should
// use stepCtx instead.
ctx context.Context
infoCache *infoschema.InfoCache
autoidCli *autoid.ClientDiscover
store kv.Storage
schemaVerSyncer schemaver.Syncer
ctx context.Context
infoCache *infoschema.InfoCache
autoidCli *autoid.ClientDiscover
store kv.Storage
schemaVerSyncer schemaver.Syncer
eventPublishStore notifier.Store

// per job fields, they are not changed in the life cycle of this context.

Expand Down
2 changes: 2 additions & 0 deletions pkg/ddl/notifier/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ go_library(
"//pkg/ddl/session",
"//pkg/kv",
"//pkg/meta/model",
"//pkg/session/types",
"//pkg/sessionctx",
"//pkg/util/intest",
"//pkg/util/logutil",
Expand All @@ -33,6 +34,7 @@ go_test(
flaky = True,
shard_count = 6,
deps = [
"//pkg/ddl",
"//pkg/ddl/session",
"//pkg/meta/model",
"//pkg/parser/model",
Expand Down
29 changes: 5 additions & 24 deletions pkg/ddl/notifier/publish.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,30 +20,11 @@ import (
sess "github.com/pingcap/tidb/pkg/ddl/session"
)

// PubSchemaChange publishes schema changes to the cluster to notify other
// components. It stages changes in given `se` so they will be visible when `se`
// further commits. When the schema change is not from multi-schema change DDL,
// `multiSchemaChangeSeq` is -1. Otherwise, `multiSchemaChangeSeq` is the sub-job
// index of the multi-schema change DDL.
func PubSchemaChange(
ctx context.Context,
se *sess.Session,
ddlJobID int64,
multiSchemaChangeSeq int64,
event *SchemaChangeEvent,
) error {
return PubSchemeChangeToStore(
ctx,
se,
ddlJobID,
multiSchemaChangeSeq,
event,
DefaultStore,
)
}

// PubSchemeChangeToStore is exposed for testing. Caller should use
// PubSchemaChange instead.
// PubSchemeChangeToStore publishes schema changes to the store to notify
// subscribers on the Store. It stages changes in given `se` so they will be
// visible when `se` further commits. When the schema change is not from
// multi-schema change DDL, `multiSchemaChangeSeq` is -1. Otherwise,
// `multiSchemaChangeSeq` is the sub-job index of the multi-schema change DDL.
func PubSchemeChangeToStore(
ctx context.Context,
se *sess.Session,
Expand Down
3 changes: 0 additions & 3 deletions pkg/ddl/notifier/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,6 @@ type Store interface {
List(ctx context.Context, se *sess.Session) ([]*schemaChange, error)
}

// DefaultStore is the system table store. Still WIP now.
var DefaultStore Store

type tableStore struct {
db string
table string
Expand Down
Loading

0 comments on commit ba5823b

Please sign in to comment.