Skip to content

Commit

Permalink
*: must wait 2 * lease schema lease time when running job.
Browse files Browse the repository at this point in the history
  • Loading branch information
siddontang committed Nov 18, 2015
1 parent 7cc8377 commit 9c6b278
Show file tree
Hide file tree
Showing 4 changed files with 35 additions and 13 deletions.
2 changes: 1 addition & 1 deletion ddl/stat.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ func (d *ddl) Stat() (map[string]interface{}, error) {
if job != nil {
m["ddl_job_id"] = job.ID
m["ddl_job_action"] = job.Type.String()
m["ddl_job_last_update_ts"] = job.LastUpdateTS
m["ddl_job_last_update_ts"] = job.LastUpdateTS / 1e9
m["ddl_job_state"] = job.State.String()
m["ddl_job_error"] = job.Error
m["ddl_job_schema_state"] = job.SchemaState.String()
Expand Down
32 changes: 25 additions & 7 deletions ddl/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,8 @@ func (d *ddl) handleJobQueue() error {
return nil
}

waitTime := 2 * d.lease

var job *model.Job
err := kv.RunInNewTxn(d.store, false, func(txn kv.Transaction) error {
t := meta.NewMeta(txn)
Expand All @@ -193,6 +195,20 @@ func (d *ddl) handleJobQueue() error {
return errors.Trace(err)
}

if job.State == model.JobRunning {
// if we enter a new state, crash when waiting 2 * lease time, and restart quickly,
// we may run the job immediately again, but we don't wait enough 2 * lease time to
// let other servers update the schema.
// so here we must check the elapsed time from last update, if < 2 * lease, we must
// wait again.
elapsed := time.Duration(time.Now().UnixNano() - job.LastUpdateTS)
if elapsed > 0 && elapsed < waitTime {
log.Warnf("the elapsed time from last update is %s < %s, wait again", elapsed, waitTime)
waitTime -= elapsed
return nil
}
}

log.Warnf("run DDL job %v", job)

d.hook.OnJobRunBefore(job)
Expand Down Expand Up @@ -236,7 +252,7 @@ func (d *ddl) handleJobQueue() error {
// if the job is done or still running, we will wait 2 * lease time to guarantee other servers to update
// the newest schema.
if job.State == model.JobRunning || job.State == model.JobDone {
d.waitSchemaChanged()
d.waitSchemaChanged(waitTime)
}
}
}
Expand Down Expand Up @@ -322,11 +338,13 @@ func (d *ddl) runJob(t *meta.Meta, job *model.Job) {

// for every lease seconds, we will re-update the whole schema, so we will wait 2 * lease time
// to guarantee that all servers have already updated schema.
func (d *ddl) waitSchemaChanged() {
if d.lease > 0 {
select {
case <-time.After(d.lease * 2):
case <-d.quitCh:
}
func (d *ddl) waitSchemaChanged(waitTime time.Duration) {
if waitTime == 0 {
return
}

select {
case <-time.After(waitTime):
case <-d.quitCh:
}
}
2 changes: 1 addition & 1 deletion meta/meta.go
Original file line number Diff line number Diff line change
Expand Up @@ -479,7 +479,7 @@ func (m *Meta) GetDDLJob(index int64) (*model.Job, error) {
// UpdateDDLJob updates the DDL job with index.
func (m *Meta) UpdateDDLJob(index int64, job *model.Job) error {
// TODO: use timestamp allocated by TSO
job.LastUpdateTS = time.Now().Unix()
job.LastUpdateTS = time.Now().UnixNano()
b, err := job.Encode()
if err != nil {
return errors.Trace(err)
Expand Down
12 changes: 8 additions & 4 deletions model/ddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,8 +74,10 @@ type Job struct {
RawArgs json.RawMessage `json:"raw_args"`
SchemaState SchemaState `json:"schema_state"`
// snapshot version for this job.
SnapshotVer uint64 `json:"snapshot_ver"`
LastUpdateTS int64 `json:"last_update_ts"`
SnapshotVer uint64 `json:"snapshot_ver"`
// unix nano seconds
// TODO: use timestamp allocated by TSO
LastUpdateTS int64 `json:"last_update_ts"`
}

// Encode encodes job with json format.
Expand Down Expand Up @@ -138,6 +140,8 @@ func (s JobState) String() string {

// Owner is for DDL Owner.
type Owner struct {
OwnerID string `json:"owner_id"`
LastUpdateTS int64 `json:"last_update_ts"`
OwnerID string `json:"owner_id"`
// unix nano seconds
// TODO: use timestamp allocated by TSO
LastUpdateTS int64 `json:"last_update_ts"`
}

0 comments on commit 9c6b278

Please sign in to comment.