Skip to content

Commit

Permalink
ddl: Add the txn entry limit (pingcap#4458)
Browse files Browse the repository at this point in the history
  • Loading branch information
zimulala authored and coocood committed Sep 7, 2017
1 parent 8cb0ceb commit 3e125a4
Show file tree
Hide file tree
Showing 4 changed files with 45 additions and 2 deletions.
18 changes: 18 additions & 0 deletions ddl/ddl_db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1080,6 +1080,24 @@ func (s *testDBSuite) TestUpdateMultipleTable(c *C) {
tk.MustQuery("select * from t1").Check(testkit.Rows("8 1 9", "8 2 9"))
}

func (s *testDBSuite) TestCreateTableTooLarge(c *C) {
defer testleak.AfterTest(c)
s.tk = testkit.NewTestKit(c, s.store)
s.tk.MustExec("use test")

sql := "create table t_too_large ("
cnt := 3000
for i := 1; i <= cnt; i++ {
sql += fmt.Sprintf("a%d double, b%d double, c%d double, d%d double", i, i, i, i)
if i != cnt {
sql += ","
}
}
sql += ");"
_, err := s.tk.Exec(sql)
c.Assert(kv.ErrEntryTooLarge.Equal(err), IsTrue, Commentf("sql:%v", sql))
}

func (s *testDBSuite) TestCreateTableWithLike(c *C) {
defer testleak.AfterTest(c)
store, err := tidb.NewStore("memory://create_table_like")
Expand Down
19 changes: 18 additions & 1 deletion ddl/ddl_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,23 @@ func (d *ddl) getFirstDDLJob(t *meta.Meta) (*model.Job, error) {
return job, errors.Trace(err)
}

// handleUpdateJobError handles the too large DDL job.
func (d *ddl) handleUpdateJobError(t *meta.Meta, job *model.Job, err error) error {
if err == nil {
return nil
}
if kv.ErrEntryTooLarge.Equal(err) {
log.Warnf("[ddl] update DDL job %v failed %v", job, errors.ErrorStack(err))
// Reduce this txn entry size.
job.BinlogInfo.Clean()
job.Error = toTError(err)
job.SchemaState = model.StateNone
job.State = model.JobCancelled
err = d.finishDDLJob(t, job)
}
return errors.Trace(err)
}

// updateDDLJob updates the DDL job information.
// Every time we enter another state except final state, we must call this function.
func (d *ddl) updateDDLJob(t *meta.Meta, job *model.Job, updateTS uint64) error {
Expand Down Expand Up @@ -193,7 +210,7 @@ func (d *ddl) handleDDLJobQueue() error {
return errors.Trace(err)
}
err = d.updateDDLJob(t, job, txn.StartTS())
return errors.Trace(err)
return errors.Trace(d.handleUpdateJobError(t, job, err))
})
if err != nil {
return errors.Trace(err)
Expand Down
7 changes: 7 additions & 0 deletions model/ddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,13 @@ func (h *HistoryInfo) AddTableInfo(schemaVer int64, tblInfo *TableInfo) {
h.TableInfo = tblInfo
}

// Clean cleans history information.
func (h *HistoryInfo) Clean() {
h.SchemaVersion = 0
h.DBInfo = nil
h.TableInfo = nil
}

// Job is for a DDL operation.
type Job struct {
ID int64 `json:"id"`
Expand Down
3 changes: 2 additions & 1 deletion model/model_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,12 +148,13 @@ func (*testModelSuite) TestJobCodec(c *C) {
c.Assert(a, DeepEquals, A{Name: ""})
c.Assert(len(newJob.String()), Greater, 0)

job.BinlogInfo.Clean()
b1, err := job.Encode(true)
c.Assert(err, IsNil)
newJob = &Job{}
err = newJob.Decode(b1)
c.Assert(err, IsNil)
c.Assert(newJob.BinlogInfo, DeepEquals, job.BinlogInfo)
c.Assert(newJob.BinlogInfo, DeepEquals, &HistoryInfo{})
name = CIStr{}
a = A{}
err = newJob.DecodeArgs(&name, &a)
Expand Down

0 comments on commit 3e125a4

Please sign in to comment.