Skip to content

Commit

Permalink
*: 1. remove EnableChunk 2. implement Next in baseExecutor to return …
Browse files Browse the repository at this point in the history
…nil (pingcap#5988)
  • Loading branch information
XuHuaiyu authored Mar 9, 2018
1 parent b6ad6a2 commit a3bf058
Show file tree
Hide file tree
Showing 12 changed files with 38 additions and 158 deletions.
2 changes: 0 additions & 2 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,6 @@ type Config struct {
RunDDL bool `toml:"run-ddl" json:"run-ddl"`
SplitTable bool `toml:"split-table" json:"split-table"`
TokenLimit uint `toml:"token-limit" json:"token-limit"`
EnableChunk bool `toml:"enable-chunk" json:"enable-chunk"`
OOMAction string `toml:"oom-action" json:"oom-action"`
EnableStreaming bool `toml:"enable-streaming" json:"enable-streaming"`

Expand Down Expand Up @@ -225,7 +224,6 @@ var defaultConf = Config{
SplitTable: true,
Lease: "10s",
TokenLimit: 1000,
EnableChunk: true,
OOMAction: "log",
EnableStreaming: false,
Log: Log{
Expand Down
23 changes: 3 additions & 20 deletions executor/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -285,26 +285,9 @@ func (a *ExecStmt) handleNoDelayExecutor(ctx context.Context, sctx sessionctx.Co
a.logSlowQuery(txnTS, err == nil)
}()

if sctx.GetSessionVars().EnableChunk {
err = e.NextChunk(ctx, e.newChunk())
if err != nil {
return nil, errors.Trace(err)
}
} else {
for {
var row Row
row, err = e.Next(ctx)
if err != nil {
return nil, errors.Trace(err)
}
// Even though there isn't any result set, the row is still used to indicate if there is
// more work to do.
// For example, the UPDATE statement updates a single row on a Next call, we keep calling Next until
// There is no more rows to update.
if row == nil {
return nil, nil
}
}
err = e.NextChunk(ctx, e.newChunk())
if err != nil {
return nil, errors.Trace(err)
}

return nil, nil
Expand Down
34 changes: 14 additions & 20 deletions executor/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,12 @@ func (e *baseExecutor) Open(ctx context.Context) error {
return nil
}

// Next implements interface Executor.
// To be removed in near future.
func (e *baseExecutor) Next(context.Context) (Row, error) {
return nil, nil
}

// Close closes all executors and release all resources.
func (e *baseExecutor) Close() error {
for _, child := range e.children {
Expand Down Expand Up @@ -656,32 +662,20 @@ func init() {
if err != nil {
return rows, errors.Trace(err)
}
if sctx.GetSessionVars().EnableChunk {
for {
chk := exec.newChunk()
err = exec.NextChunk(ctx, chk)
if err != nil {
return rows, errors.Trace(err)
}
if chk.NumRows() == 0 {
return rows, nil
}
iter := chunk.NewIterator4Chunk(chk)
for r := iter.Begin(); r != iter.End(); r = iter.Next() {
row := r.GetDatumRow(exec.retTypes())
rows = append(rows, row)
}
}
}
for {
row, err := exec.Next(ctx)
chk := exec.newChunk()
err = exec.NextChunk(ctx, chk)
if err != nil {
return rows, errors.Trace(err)
}
if row == nil {
if chk.NumRows() == 0 {
return rows, nil
}
rows = append(rows, row)
iter := chunk.NewIterator4Chunk(chk)
for r := iter.Begin(); r != iter.End(); r = iter.Next() {
row := r.GetDatumRow(exec.retTypes())
rows = append(rows, row)
}
}
}
tableMySQLErrCodes := map[terror.ErrCode]uint16{
Expand Down
10 changes: 3 additions & 7 deletions executor/join_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -782,13 +782,9 @@ func (s *testSuite) TestIndexLookupJoin(c *C) {
tk.MustExec("CREATE TABLE `s` (`a` int, `b` char (20))")
tk.MustExec("CREATE INDEX idx_s_a ON s(`a`)")
tk.MustExec("INSERT INTO s VALUES (-277544960, 'fpnndsjo') , (2, 'kfpnndsjof') , (2, 'vtdiockfpn'), (-277544960, 'fpnndsjo') , (2, 'kfpnndsjof') , (6, 'ckfp')")
for _, enableChk := range []bool{false, true} {
tk.Se.GetSessionVars().EnableChunk = enableChk
tk.MustQuery("select /*+ TIDB_INLJ(t, s) */ t.a from t join s on t.a = s.a").Check(testkit.Rows("-277544960", "-277544960"))
tk.MustQuery("select /*+ TIDB_INLJ(t, s) */ t.a from t left join s on t.a = s.a").Check(testkit.Rows("148307968", "-1327693824", "-277544960", "-277544960"))
tk.MustQuery("select /*+ TIDB_INLJ(t, s) */ t.a from t right join s on t.a = s.a").Check(testkit.Rows("-277544960", "<nil>", "<nil>", "-277544960", "<nil>", "<nil>"))

}
tk.MustQuery("select /*+ TIDB_INLJ(t, s) */ t.a from t join s on t.a = s.a").Check(testkit.Rows("-277544960", "-277544960"))
tk.MustQuery("select /*+ TIDB_INLJ(t, s) */ t.a from t left join s on t.a = s.a").Check(testkit.Rows("148307968", "-1327693824", "-277544960", "-277544960"))
tk.MustQuery("select /*+ TIDB_INLJ(t, s) */ t.a from t right join s on t.a = s.a").Check(testkit.Rows("-277544960", "<nil>", "<nil>", "-277544960", "<nil>", "<nil>"))
tk.MustExec("DROP TABLE IF EXISTS t;")
tk.MustExec("CREATE TABLE t(a BIGINT PRIMARY KEY, b BIGINT);")
tk.MustExec("INSERT INTO t VALUES(1, 2);")
Expand Down
20 changes: 5 additions & 15 deletions executor/merge_join_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -275,11 +275,8 @@ func (s *testSuite) TestMergeJoin(c *C) {
tk.MustExec("drop table if exists t1")
tk.MustExec("create table t1 (c1 int)")
tk.MustExec("insert into t1 values (1), (1), (1)")
for _, enableChk := range []bool{false, true} {
tk.Se.GetSessionVars().EnableChunk = enableChk
result = tk.MustQuery("select/*+ TIDB_SMJ(t) */ * from t1 a join t1 b on a.c1 = b.c1;")
result.Check(testkit.Rows("1 1", "1 1", "1 1", "1 1", "1 1", "1 1", "1 1", "1 1", "1 1"))
}
result = tk.MustQuery("select/*+ TIDB_SMJ(t) */ * from t1 a join t1 b on a.c1 = b.c1;")
result.Check(testkit.Rows("1 1", "1 1", "1 1", "1 1", "1 1", "1 1", "1 1", "1 1", "1 1"))

tk.MustExec("drop table if exists t")
tk.MustExec("drop table if exists t1")
Expand All @@ -306,11 +303,8 @@ func (s *testSuite) TestMergeJoin(c *C) {
tk.MustExec("create table t1(c1 int unsigned)")
tk.MustExec("insert into t values (1)")
tk.MustExec("insert into t1 values (1)")
for _, enableChk := range []bool{false, true} {
tk.Se.GetSessionVars().EnableChunk = enableChk
result = tk.MustQuery("select /*+ TIDB_SMJ(t,t1) */ t.c1 from t , t1 where t.c1 = t1.c1")
result.Check(testkit.Rows("1"))
}
result = tk.MustQuery("select /*+ TIDB_SMJ(t,t1) */ t.c1 from t , t1 where t.c1 = t1.c1")
result.Check(testkit.Rows("1"))

tk.MustExec("drop table if exists t")
tk.MustExec("create table t(a int, b int, index a(a), index b(b))")
Expand All @@ -323,11 +317,7 @@ func (s *testSuite) TestMergeJoin(c *C) {
tk.MustExec("insert into t value(1,1),(1,2),(1,3),(1,4)")
tk.MustExec("create table s(a int, primary key(a))")
tk.MustExec("insert into s value(1)")
for _, enableChk := range []bool{false, true} {
tk.Se.GetSessionVars().EnableChunk = enableChk
tk.MustQuery("select /*+ TIDB_SMJ(t, s) */ count(*) from t join s on t.a = s.a").Check(testkit.Rows("4"))
}

tk.MustQuery("select /*+ TIDB_SMJ(t, s) */ count(*) from t join s on t.a = s.a").Check(testkit.Rows("4"))
}

func (s *testSuite) Test3WaysMergeJoin(c *C) {
Expand Down
51 changes: 1 addition & 50 deletions server/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -402,9 +402,6 @@ func (cc *clientConn) readOptionalSSLRequestAndHandshakeResponse() error {
}
}
cc.ctx.SetSessionManager(cc.server)
if cc.server.cfg.EnableChunk {
cc.ctx.EnableChunk()
}
return nil
}

Expand Down Expand Up @@ -924,56 +921,10 @@ func (cc *clientConn) writeResultset(ctx context.Context, rs ResultSet, binary b
buf = buf[:stackSize]
log.Errorf("query: %s:\n%s", cc.lastCmd, buf)
}()
if cc.server.cfg.EnableChunk {
err := cc.writeChunks(ctx, rs, binary, more)
if err != nil {
return errors.Trace(err)
}
return errors.Trace(cc.flush())
}
// We need to call Next before we get columns.
// Otherwise, we will get incorrect columns info.
row, err := rs.Next(ctx)
err := cc.writeChunks(ctx, rs, binary, more)
if err != nil {
return errors.Trace(err)
}
columns := rs.Columns()
err = cc.writeColumnInfo(columns)
if err != nil {
return errors.Trace(err)
}
data := make([]byte, 4, 1024)
for {
if err != nil {
return errors.Trace(err)
}
if row == nil {
break
}
data = data[0:4]
if binary {
data, err = dumpBinaryRow(data, columns, row)
if err != nil {
return errors.Trace(err)
}
} else {
data, err = dumpTextRow(data, columns, row)
if err != nil {
return errors.Trace(err)
}
}

if err = cc.writePacket(data); err != nil {
return errors.Trace(err)
}
row, err = rs.Next(ctx)
}

err = cc.writeEOF(more)
if err != nil {
return errors.Trace(err)
}

return errors.Trace(cc.flush())
}

Expand Down
4 changes: 0 additions & 4 deletions server/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,10 +84,6 @@ type QueryCtx interface {
ShowProcess() util.ProcessInfo

SetSessionManager(util.SessionManager)

// EnableChunk indicates whether the chunk execution model is enabled.
// TODO: remove this after tidb-server configuration "enable-chunk' removed.
EnableChunk()
}

// PreparedStatement is the interface to use a prepared statement.
Expand Down
5 changes: 0 additions & 5 deletions server/driver_tidb.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,11 +146,6 @@ func (qd *TiDBDriver) OpenCtx(connID uint64, capability uint32, collation uint8,
return tc, nil
}

// EnableChunk enables TiDBContext to use chunk.
func (tc *TiDBContext) EnableChunk() {
tc.session.GetSessionVars().EnableChunk = true
}

// Status implements QueryCtx Status method.
func (tc *TiDBContext) Status() uint16 {
return tc.session.Status()
Expand Down
2 changes: 0 additions & 2 deletions session.go
Original file line number Diff line number Diff line change
Expand Up @@ -1044,8 +1044,6 @@ func CreateSession4Test(store kv.Storage) (Session, error) {
if err == nil {
// initialize session variables for test.
s.GetSessionVars().MaxChunkSize = 2
// enable chunk when test
s.GetSessionVars().EnableChunk = true
}
return s, errors.Trace(err)
}
Expand Down
4 changes: 0 additions & 4 deletions session_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1680,7 +1680,6 @@ func (s *testSchemaSuite) TestInsertExecChunk(c *C) {
tk.MustExec("create table test2(a int)")

tk.Se.GetSessionVars().DistSQLScanConcurrency = 1
tk.Se.GetSessionVars().EnableChunk = true
tk.MustExec("insert into test2(a) select a from test1;")

rs, err := tk.Exec("select * from test2")
Expand Down Expand Up @@ -1713,7 +1712,6 @@ func (s *testSchemaSuite) TestUpdateExecChunk(c *C) {
}

tk.Se.GetSessionVars().DistSQLScanConcurrency = 1
tk.Se.GetSessionVars().EnableChunk = true
for i := 0; i < 100; i++ {
tk.MustExec(fmt.Sprintf("update chk set a = a + 100 where a = %d", i))
}
Expand Down Expand Up @@ -1749,7 +1747,6 @@ func (s *testSchemaSuite) TestDeleteExecChunk(c *C) {
}

tk.Se.GetSessionVars().DistSQLScanConcurrency = 1
tk.Se.GetSessionVars().EnableChunk = true

for i := 0; i < 99; i++ {
tk.MustExec(fmt.Sprintf("delete from chk where a = %d", i))
Expand Down Expand Up @@ -1782,7 +1779,6 @@ func (s *testSchemaSuite) TestDeleteMultiTableExecChunk(c *C) {
}

tk.Se.GetSessionVars().DistSQLScanConcurrency = 1
tk.Se.GetSessionVars().EnableChunk = true

tk.MustExec("delete chk1, chk2 from chk1 inner join chk2 where chk1.a = chk2.a")

Expand Down
4 changes: 0 additions & 4 deletions sessionctx/variable/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -288,10 +288,6 @@ type SessionVars struct {
// MemQuotaTopn defines the memory quota for a top n executor.
MemQuotaTopn int64

// EnableChunk indicates whether the chunk execution model is enabled.
// TODO: remove this after tidb-server configuration "enable-chunk' removed.
EnableChunk bool

// EnableStreaming indicates whether the coprocessor request can use streaming API.
// TODO: remove this after tidb-server configuration "enable-streaming' removed.
EnableStreaming bool
Expand Down
37 changes: 12 additions & 25 deletions tidb.go
Original file line number Diff line number Diff line change
Expand Up @@ -217,33 +217,20 @@ func GetRows4Test(ctx context.Context, sctx sessionctx.Context, rs ast.RecordSet
return nil, nil
}
var rows []types.Row
if sctx.GetSessionVars().EnableChunk {
for {
// Since we collect all the rows, we can not reuse the chunk.
chk := rs.NewChunk()
iter := chunk.NewIterator4Chunk(chk)
for {
// Since we collect all the rows, we can not reuse the chunk.
chk := rs.NewChunk()
iter := chunk.NewIterator4Chunk(chk)

err := rs.NextChunk(ctx, chk)
if err != nil {
return nil, errors.Trace(err)
}
if chk.NumRows() == 0 {
break
}

for row := iter.Begin(); row != iter.End(); row = iter.Next() {
rows = append(rows, row)
}
err := rs.NextChunk(ctx, chk)
if err != nil {
return nil, errors.Trace(err)
}
} else {
for {
row, err := rs.Next(ctx)
if err != nil {
return nil, errors.Trace(err)
}
if row == nil {
break
}
if chk.NumRows() == 0 {
break
}

for row := iter.Begin(); row != iter.End(); row = iter.Next() {
rows = append(rows, row)
}
}
Expand Down

0 comments on commit a3bf058

Please sign in to comment.