Skip to content

Commit

Permalink
bindinfo: support drop bindings of dropped table (pingcap#14071)
Browse files Browse the repository at this point in the history
  • Loading branch information
alivxxx authored and sre-bot committed Dec 19, 2019
1 parent 4cd837a commit 39a370c
Show file tree
Hide file tree
Showing 7 changed files with 46 additions and 53 deletions.
3 changes: 3 additions & 0 deletions bindinfo/bind_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -520,10 +520,12 @@ func (s *testSuite) TestDropSingleBindings(c *C) {
rows = tk.MustQuery("show bindings").Rows()
c.Assert(len(rows), Equals, 1)
c.Assert(rows[0][1], Equals, "select * from t use index(idx_b)")
tk.MustExec("drop table t")
tk.MustExec("drop binding for select * from t using select * from t use index(idx_b)")
rows = tk.MustQuery("show bindings").Rows()
c.Assert(len(rows), Equals, 0)

tk.MustExec("create table t(a int, b int, c int, index idx_a(a), index idx_b(b))")
// Test drop global bindings.
tk.MustExec("create global binding for select * from t using select * from t use index(idx_a)")
tk.MustExec("create global binding for select * from t using select * from t use index(idx_b)")
Expand All @@ -535,6 +537,7 @@ func (s *testSuite) TestDropSingleBindings(c *C) {
rows = tk.MustQuery("show global bindings").Rows()
c.Assert(len(rows), Equals, 1)
c.Assert(rows[0][1], Equals, "select * from t use index(idx_b)")
tk.MustExec("drop table t")
tk.MustExec("drop global binding for select * from t using select * from t use index(idx_b)")
rows = tk.MustQuery("show global bindings").Rows()
c.Assert(len(rows), Equals, 0)
Expand Down
18 changes: 13 additions & 5 deletions bindinfo/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,14 @@ type Binding struct {
id string
}

func (b *Binding) isSame(rb *Binding) bool {
if b.id != "" && rb.id != "" {
return b.id == rb.id
}
// Sometimes we cannot construct `id` because of the changed schema, so we need to compare by bind sql.
return b.BindSQL == rb.BindSQL
}

// cache is a k-v map, key is original sql, value is a slice of BindRecord.
type cache map[string][]*BindRecord

Expand Down Expand Up @@ -90,7 +98,7 @@ func (br *BindRecord) FindBinding(hint string) *Binding {
func (br *BindRecord) prepareHints(sctx sessionctx.Context, is infoschema.InfoSchema) error {
p := parser.New()
for i, bind := range br.Bindings {
if bind.Hint != nil || bind.id != "" {
if bind.Hint != nil || bind.id != "" || bind.Status == deleted {
continue
}
stmtNode, err := p.ParseOneStmt(bind.BindSQL, bind.Charset, bind.Collation)
Expand Down Expand Up @@ -119,7 +127,7 @@ func merge(lBindRecord, rBindRecord *BindRecord) *BindRecord {
for _, rbind := range rBindRecord.Bindings {
found := false
for j, lbind := range lBindRecord.Bindings {
if lbind.id == rbind.id {
if lbind.isSame(&rbind) {
found = true
if rbind.UpdateTime.Compare(lbind.UpdateTime) >= 0 {
result.Bindings[j] = rbind
Expand All @@ -142,7 +150,7 @@ func (br *BindRecord) remove(deleted *BindRecord) *BindRecord {
result := br.shallowCopy()
for _, deletedBind := range deleted.Bindings {
for i, bind := range result.Bindings {
if bind.id == deletedBind.id {
if bind.isSame(&deletedBind) {
result.Bindings = append(result.Bindings[:i], result.Bindings[i+1:]...)
break
}
Expand Down Expand Up @@ -205,8 +213,8 @@ func (br *BindRecord) metrics() ([]float64, []int) {
}

// size calculates the memory size of a bind info.
func (m *Binding) size() float64 {
res := len(m.BindSQL) + len(m.Status) + 2*int(unsafe.Sizeof(m.CreateTime)) + len(m.Charset) + len(m.Collation)
func (b *Binding) size() float64 {
res := len(b.BindSQL) + len(b.Status) + 2*int(unsafe.Sizeof(b.CreateTime)) + len(b.Charset) + len(b.Collation)
return float64(res)
}

Expand Down
49 changes: 18 additions & 31 deletions bindinfo/handle.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,9 +104,7 @@ func NewBindHandle(ctx sessionctx.Context) *BindHandle {
handle.bindInfo.parser = parser.New()
handle.invalidBindRecordMap.Value.Store(make(map[string]*bindRecordUpdate))
handle.invalidBindRecordMap.flushFunc = func(record *BindRecord) error {
// We do not need the first two parameters because they are only use to generate hint,
// and we already have the hint.
return handle.DropBindRecord(nil, nil, record)
return handle.DropBindRecord(record.OriginalSQL, record.Db, &record.Bindings[0])
}
handle.pendingVerifyBindRecordMap.Value.Store(make(map[string]*bindRecordUpdate))
handle.pendingVerifyBindRecordMap.flushFunc = func(record *BindRecord) error {
Expand Down Expand Up @@ -248,13 +246,9 @@ func (h *BindHandle) AddBindRecord(sctx sessionctx.Context, is infoschema.InfoSc
}

// DropBindRecord drops a BindRecord to the storage and BindRecord int the cache.
func (h *BindHandle) DropBindRecord(sctx sessionctx.Context, is infoschema.InfoSchema, record *BindRecord) (err error) {
err = record.prepareHints(sctx, is)
if err != nil {
return err
}
exec, _ := h.sctx.Context.(sqlexec.SQLExecutor)
func (h *BindHandle) DropBindRecord(originalSQL, db string, binding *Binding) (err error) {
h.sctx.Lock()
exec, _ := h.sctx.Context.(sqlexec.SQLExecutor)

_, err = exec.Execute(context.TODO(), "BEGIN")
if err != nil {
Expand All @@ -276,7 +270,11 @@ func (h *BindHandle) DropBindRecord(sctx sessionctx.Context, is infoschema.InfoS
return
}

err = h.removeBindRecord(parser.DigestNormalized(record.OriginalSQL), record)
record := &BindRecord{OriginalSQL: originalSQL, Db: db}
if binding != nil {
record.Bindings = append(record.Bindings, *binding)
}
err = h.removeBindRecord(parser.DigestNormalized(originalSQL), record)
}()

txn, err1 := h.sctx.Context.Txn(true)
Expand All @@ -289,21 +287,13 @@ func (h *BindHandle) DropBindRecord(sctx sessionctx.Context, is infoschema.InfoS
Type: mysql.TypeDatetime,
Fsp: 3,
}
oldBindRecord := h.GetBindRecord(parser.DigestNormalized(record.OriginalSQL), record.OriginalSQL, record.Db)
bindingSQLs := make([]string, 0, len(record.Bindings))
for i := range record.Bindings {
record.Bindings[i].Status = deleted
record.Bindings[i].UpdateTime = updateTs
if oldBindRecord == nil {
continue
}
binding := oldBindRecord.FindBinding(record.Bindings[i].id)
if binding != nil {
bindingSQLs = append(bindingSQLs, binding.BindSQL)
}

bindSQL := ""
if binding != nil {
bindSQL = binding.BindSQL
}

_, err = exec.Execute(context.TODO(), h.logicalDeleteBindInfoSQL(record.OriginalSQL, record.Db, updateTs, bindingSQLs))
_, err = exec.Execute(context.TODO(), h.logicalDeleteBindInfoSQL(originalSQL, db, updateTs, bindSQL))
return err
}

Expand Down Expand Up @@ -522,19 +512,16 @@ func (h *BindHandle) insertBindInfoSQL(orignalSQL string, db string, info Bindin
)
}

func (h *BindHandle) logicalDeleteBindInfoSQL(originalSQL, db string, updateTs types.Time, bindingSQLs []string) string {
func (h *BindHandle) logicalDeleteBindInfoSQL(originalSQL, db string, updateTs types.Time, bindingSQL string) string {
sql := fmt.Sprintf(`UPDATE mysql.bind_info SET status=%s,update_time=%s WHERE original_sql=%s and default_db=%s`,
expression.Quote(deleted),
expression.Quote(updateTs.String()),
expression.Quote(originalSQL),
expression.Quote(db))
if len(bindingSQLs) == 0 {
if bindingSQL == "" {
return sql
}
for i, sql := range bindingSQLs {
bindingSQLs[i] = expression.Quote(sql)
}
return sql + fmt.Sprintf(` and bind_sql in (%s)`, strings.Join(bindingSQLs, ","))
return sql + fmt.Sprintf(` and bind_sql = %s`, expression.Quote(bindingSQL))
}

// GenHintsFromSQL is used to generate hints from SQL.
Expand Down Expand Up @@ -744,7 +731,7 @@ func (h *BindHandle) HandleEvolvePlanTask(sctx sessionctx.Context) error {
// since it is still in the bind record. Now we just drop it and if it is actually retryable,
// we will hope for that we can capture this evolve task again.
if err != nil {
return h.DropBindRecord(sctx, sctx.GetSessionVars().TxnCtx.InfoSchema.(infoschema.InfoSchema), &BindRecord{OriginalSQL: originalSQL, Db: db, Bindings: []Binding{binding}})
return h.DropBindRecord(originalSQL, db, &binding)
}
// If the accepted plan timeouts, it is hard to decide the timeout for verify plan.
// Currently we simply mark the verify plan as `using` if it could run successfully within maxTime.
Expand All @@ -754,7 +741,7 @@ func (h *BindHandle) HandleEvolvePlanTask(sctx sessionctx.Context) error {
sctx.GetSessionVars().UsePlanBaselines = false
verifyPlanTime, err := h.getRunningDuration(sctx, db, binding.BindSQL, maxTime)
if err != nil {
return h.DropBindRecord(sctx, sctx.GetSessionVars().TxnCtx.InfoSchema.(infoschema.InfoSchema), &BindRecord{OriginalSQL: originalSQL, Db: db, Bindings: []Binding{binding}})
return h.DropBindRecord(originalSQL, db, &binding)
}
if verifyPlanTime < 0 {
binding.Status = Rejected
Expand Down
12 changes: 6 additions & 6 deletions bindinfo/session_handle.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,13 +67,13 @@ func (h *SessionHandle) AddBindRecord(sctx sessionctx.Context, is infoschema.Inf
}

// DropBindRecord drops a BindRecord in the cache.
func (h *SessionHandle) DropBindRecord(sctx sessionctx.Context, is infoschema.InfoSchema, record *BindRecord) error {
err := record.prepareHints(sctx, is)
if err != nil {
return err
}
oldRecord := h.GetBindRecord(record.OriginalSQL, record.Db)
func (h *SessionHandle) DropBindRecord(originalSQL, db string, binding *Binding) error {
oldRecord := h.GetBindRecord(originalSQL, db)
var newRecord *BindRecord
record := &BindRecord{OriginalSQL: originalSQL, Db: db}
if binding != nil {
record.Bindings = append(record.Bindings, *binding)
}
if oldRecord != nil {
newRecord = oldRecord.remove(record)
} else {
Expand Down
12 changes: 4 additions & 8 deletions executor/bind.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,23 +59,19 @@ func (e *SQLBindExec) Next(ctx context.Context, req *chunk.Chunk) error {
}

func (e *SQLBindExec) dropSQLBind() error {
record := &bindinfo.BindRecord{
OriginalSQL: e.normdOrigSQL,
Db: e.ctx.GetSessionVars().CurrentDB,
}
var bindInfo *bindinfo.Binding
if e.bindSQL != "" {
bindInfo := bindinfo.Binding{
bindInfo = &bindinfo.Binding{
BindSQL: e.bindSQL,
Charset: e.charset,
Collation: e.collation,
}
record.Bindings = append(record.Bindings, bindInfo)
}
if !e.isGlobal {
handle := e.ctx.Value(bindinfo.SessionBindInfoKeyType).(*bindinfo.SessionHandle)
return handle.DropBindRecord(e.ctx, infoschema.GetInfoSchema(e.ctx), record)
return handle.DropBindRecord(e.normdOrigSQL, e.ctx.GetSessionVars().CurrentDB, bindInfo)
}
return domain.GetDomain(e.ctx).BindHandle().DropBindRecord(e.ctx, infoschema.GetInfoSchema(e.ctx), record)
return domain.GetDomain(e.ctx).BindHandle().DropBindRecord(e.normdOrigSQL, e.ctx.GetSessionVars().CurrentDB, bindInfo)
}

func (e *SQLBindExec) createSQLBind() error {
Expand Down
1 change: 1 addition & 0 deletions planner/core/preprocess.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,7 @@ func (p *preprocessor) Enter(in ast.Node) (out ast.Node, skipChildren bool) {
EraseLastSemicolon(node.HintedSel)
p.checkBindGrammar(node.OriginSel, node.HintedSel)
}
return in, true
case *ast.RecoverTableStmt, *ast.FlashBackTableStmt:
// The specified table in recover table statement maybe already been dropped.
// So skip check table name here, otherwise, recover table [table_name] syntax will return
Expand Down
4 changes: 1 addition & 3 deletions planner/optimize.go
Original file line number Diff line number Diff line change
Expand Up @@ -205,9 +205,7 @@ func getBindRecord(ctx sessionctx.Context, stmt ast.StmtNode) (*bindinfo.BindRec

func handleInvalidBindRecord(ctx context.Context, sctx sessionctx.Context, level string, bindRecord bindinfo.BindRecord) {
sessionHandle := sctx.Value(bindinfo.SessionBindInfoKeyType).(*bindinfo.SessionHandle)
// The first two parameters are only used to generate hints, but since we already have the hints,
// we do not need to pass real values and the error won't happen too.
err := sessionHandle.DropBindRecord(nil, nil, &bindRecord)
err := sessionHandle.DropBindRecord(bindRecord.OriginalSQL, bindRecord.Db, &bindRecord.Bindings[0])
if err != nil {
logutil.Logger(ctx).Info("drop session bindings failed")
}
Expand Down

0 comments on commit 39a370c

Please sign in to comment.