Skip to content

Commit

Permalink
*: Remove deprecated streaming (pingcap#32765)
Browse files Browse the repository at this point in the history
  • Loading branch information
morgo authored Mar 10, 2022
1 parent 469bc71 commit 9a4ca3c
Show file tree
Hide file tree
Showing 15 changed files with 35 additions and 246 deletions.
9 changes: 3 additions & 6 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,11 +109,9 @@ type Config struct {
MemQuotaQuery int64 `toml:"mem-quota-query" json:"mem-quota-query"`
// TempStorageQuota describe the temporary storage Quota during query exector when OOMUseTmpStorage is enabled
// If the quota exceed the capacity of the TempStoragePath, the tidb-server would exit with fatal error
TempStorageQuota int64 `toml:"tmp-storage-quota" json:"tmp-storage-quota"` // Bytes
// Deprecated
EnableStreaming bool `toml:"-" json:"-"`
EnableBatchDML bool `toml:"enable-batch-dml" json:"enable-batch-dml"`
TxnLocalLatches tikvcfg.TxnLocalLatches `toml:"-" json:"-"`
TempStorageQuota int64 `toml:"tmp-storage-quota" json:"tmp-storage-quota"` // Bytes
EnableBatchDML bool `toml:"enable-batch-dml" json:"enable-batch-dml"`
TxnLocalLatches tikvcfg.TxnLocalLatches `toml:"-" json:"-"`
// Set sys variable lower-case-table-names, ref: https://dev.mysql.com/doc/refman/5.7/en/identifier-case-sensitivity.html.
// TODO: We actually only support mode 2, which keeps the original case, but the comparison is case-insensitive.
LowerCaseTableNames int `toml:"lower-case-table-names" json:"lower-case-table-names"`
Expand Down Expand Up @@ -633,7 +631,6 @@ var defaultConf = Config{
TempStoragePath: tempStorageDirName,
OOMAction: OOMActionCancel,
MemQuotaQuery: 1 << 30,
EnableStreaming: false,
EnableBatchDML: false,
CheckMb4ValueInUTF8: *NewAtomicBool(true),
MaxIndexLength: 3072,
Expand Down
1 change: 0 additions & 1 deletion config/config_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,6 @@ var (
"Log.QueryLogMaxLen": {},
"Log.ExpensiveThreshold": {},
"CheckMb4ValueInUTF8": {},
"EnableStreaming": {},
"TxnLocalLatches.Capacity": {},
"CompatibleKillQuery": {},
"TreatOldVersionUTF8AsUTF8MB4": {},
Expand Down
7 changes: 1 addition & 6 deletions distsql/distsql.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,9 +78,7 @@ func Select(ctx context.Context, sctx sessionctx.Context, kvReq *kv.Request, fie
hook.(func(*kv.Request))(kvReq)
}

if !sctx.GetSessionVars().EnableStreaming {
kvReq.Streaming = false
}
kvReq.Streaming = false
enabledRateLimitAction := sctx.GetSessionVars().EnabledRateLimitAction
originalSQL := sctx.GetSessionVars().StmtCtx.OriginalSQL
eventCb := func(event trxevents.TransactionEvent) {
Expand Down Expand Up @@ -219,9 +217,6 @@ func canUseChunkRPC(ctx sessionctx.Context) bool {
if !ctx.GetSessionVars().EnableChunkRPC {
return false
}
if ctx.GetSessionVars().EnableStreaming {
return false
}
if !checkAlignment() {
return false
}
Expand Down
71 changes: 0 additions & 71 deletions distsql/distsql_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,36 +152,6 @@ func TestSelectResultRuntimeStats(t *testing.T) {
require.Equal(t, expect, s1.String())
}

func TestSelectStreaming(t *testing.T) {
response, colTypes := createSelectStreaming(t, 1, 2)
// Test Next.
chk := chunk.New(colTypes, 32, 32)
numAllRows := 0
for {
err := response.Next(context.TODO(), chk)
require.NoError(t, err)
numAllRows += chk.NumRows()
if chk.NumRows() == 0 {
break
}
}
require.Equal(t, 2, numAllRows)
require.NoError(t, response.Close())
}

func TestSelectStreamingWithNextRaw(t *testing.T) {
response, _ := createSelectStreaming(t, 1, 2)
data, err := response.NextRaw(context.TODO())
require.NoError(t, err)
require.Len(t, data, 16)
}

func TestSelectStreamingChunkSize(t *testing.T) {
response, colTypes := createSelectStreaming(t, 100, 1000000)
testChunkSize(t, response, colTypes)
require.NoError(t, response.Close())
}

func TestAnalyze(t *testing.T) {
sctx := newMockSessionContext()
sctx.GetSessionVars().EnableChunkRPC = false
Expand Down Expand Up @@ -472,44 +442,3 @@ func createSelectNormal(t *testing.T, batch, totalRows int, planIDs []int, sctx

return result, colTypes
}

func createSelectStreaming(t *testing.T, batch, totalRows int) (*streamResult, []*types.FieldType) {
request, err := (&RequestBuilder{}).SetKeyRanges(nil).
SetDAGRequest(&tipb.DAGRequest{}).
SetDesc(false).
SetKeepOrder(false).
SetFromSessionVars(variable.NewSessionVars()).
SetStreaming(true).
Build()
require.NoError(t, err)

// 4 int64 types.
colTypes := []*types.FieldType{
{
Tp: mysql.TypeLonglong,
Flen: mysql.MaxIntWidth,
Decimal: 0,
Flag: mysql.BinaryFlag,
Charset: charset.CharsetBin,
Collate: charset.CollationBin,
},
}
colTypes = append(colTypes, colTypes[0])
colTypes = append(colTypes, colTypes[0])
colTypes = append(colTypes, colTypes[0])

sctx := newMockSessionContext()
sctx.GetSessionVars().EnableStreaming = true

response, err := Select(context.TODO(), sctx, request, colTypes, statistics.NewQueryFeedback(0, nil, 0, false))
require.NoError(t, err)
result, ok := response.(*streamResult)
require.True(t, ok)
require.Len(t, colTypes, result.rowLen)

resp, ok := result.resp.(*mockResponse)
require.True(t, ok)
resp.total = totalRows
resp.batch = batch
return result, colTypes
}
11 changes: 0 additions & 11 deletions executor/analyze_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1013,17 +1013,6 @@ func TestAnalyzeIncremental(t *testing.T) {
tk := testkit.NewTestKit(t, store)
tk.MustExec("use test")
tk.MustExec("set @@tidb_analyze_version = 1")
tk.Session().GetSessionVars().EnableStreaming = false
testAnalyzeIncremental(tk, t, dom)
}

func TestAnalyzeIncrementalStreaming(t *testing.T) {
t.Skip("unistore hasn't support streaming yet.")
store, dom, clean := testkit.CreateMockStoreAndDomain(t)
defer clean()
tk := testkit.NewTestKit(t, store)
tk.MustExec("use test")
tk.Session().GetSessionVars().EnableStreaming = true
testAnalyzeIncremental(tk, t, dom)
}

Expand Down
13 changes: 0 additions & 13 deletions executor/executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4000,19 +4000,6 @@ func (s *testSuite) TestLimit(c *C) {
))
}

func (s *testSuite) TestCoprocessorStreamingWarning(c *C) {
tk := testkit.NewTestKit(c, s.store)
tk.MustExec("use test")
tk.MustExec("drop table if exists t")
tk.MustExec("create table t(a double)")
tk.MustExec("insert into t value(1.2)")
tk.MustExec("set @@session.tidb_enable_streaming = 1")

result := tk.MustQuery("select * from t where a/0 > 1")
result.Check(testkit.Rows())
tk.MustQuery("show warnings").Check(testutil.RowsWithSep("|", "Warning|1365|Division by 0"))
}

func (s *testSuite3) TestYearTypeDeleteIndex(c *C) {
tk := testkit.NewTestKit(c, s.store)
tk.MustExec("use test")
Expand Down
3 changes: 0 additions & 3 deletions executor/set_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -801,9 +801,6 @@ func TestValidateSetVar(t *testing.T) {
tk.MustExec("set @@tidb_pprof_sql_cpu=0;")
tk.MustQuery("select @@tidb_pprof_sql_cpu;").Check(testkit.Rows("0"))

tk.MustExec("set @@tidb_enable_streaming=1;")
tk.MustQuery("select @@tidb_enable_streaming;").Check(testkit.Rows("1"))

err = tk.ExecToErr("set @@tidb_batch_delete=3;")
require.True(t, terror.ErrorEqual(err, variable.ErrWrongValueForVar), fmt.Sprintf("err %v", err))

Expand Down
3 changes: 0 additions & 3 deletions executor/show_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1591,9 +1591,6 @@ func TestShowVar(t *testing.T) {
// Test Hidden tx_read_ts
res = tk.MustQuery("show variables like '%tx_read_ts'")
require.Len(t, res.Rows(), 0)
// Test Hidden tidb_enable_streaming
res = tk.MustQuery("show variables like '%tidb_enable_streaming%';")
require.Len(t, res.Rows(), 0)

// Test versions' related variables
res = tk.MustQuery("show variables like 'version%'")
Expand Down
61 changes: 29 additions & 32 deletions infoschema/cluster_tables_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -229,39 +229,36 @@ func TestSelectClusterTable(t *testing.T) {
slowLogFileName := "tidb-slow.log"
prepareSlowLogfile(t, slowLogFileName)
defer func() { require.NoError(t, os.Remove(slowLogFileName)) }()
for i := 0; i < 2; i++ {
tk.MustExec("use information_schema")
tk.MustExec(fmt.Sprintf("set @@tidb_enable_streaming=%d", i))
tk.MustExec("set @@global.tidb_enable_stmt_summary=1")
tk.MustExec("set time_zone = '+08:00';")
tk.MustQuery("select count(*) from `CLUSTER_SLOW_QUERY`").Check(testkit.Rows("2"))
tk.MustQuery("select time from `CLUSTER_SLOW_QUERY` where time='2019-02-12 19:33:56.571953'").Check(testkit.RowsWithSep("|", "2019-02-12 19:33:56.571953"))
tk.MustQuery("select count(*) from `CLUSTER_PROCESSLIST`").Check(testkit.Rows("1"))
tk.MustQuery("select * from `CLUSTER_PROCESSLIST`").Check(testkit.Rows(fmt.Sprintf(":10080 1 root 127.0.0.1 <nil> Query 9223372036 %s <nil> 0 0 ", "")))
tk.MustQuery("select query_time, conn_id from `CLUSTER_SLOW_QUERY` order by time limit 1").Check(testkit.Rows("4.895492 6"))
tk.MustQuery("select count(*) from `CLUSTER_SLOW_QUERY` group by digest").Check(testkit.Rows("1", "1"))
tk.MustQuery("select digest, count(*) from `CLUSTER_SLOW_QUERY` group by digest order by digest").Check(testkit.Rows("124acb3a0bec903176baca5f9da00b4e7512a41c93b417923f26502edeb324cc 1", "42a1c8aae6f133e934d4bf0147491709a8812ea05ff8819ec522780fe657b772 1"))
tk.MustQuery(`select length(query) as l,time from information_schema.cluster_slow_query where time > "2019-02-12 19:33:56" order by abs(l) desc limit 10;`).Check(testkit.Rows("21 2019-02-12 19:33:56.571953"))
tk.MustQuery("select count(*) from `CLUSTER_SLOW_QUERY` where time > now() group by digest").Check(testkit.Rows())
re := tk.MustQuery("select * from `CLUSTER_statements_summary`")
require.NotNil(t, re)
require.Greater(t, len(re.Rows()), 0)
// Test for TiDB issue 14915.
re = tk.MustQuery("select sum(exec_count*avg_mem) from cluster_statements_summary_history group by schema_name,digest,digest_text;")
require.NotNil(t, re)
require.Greater(t, len(re.Rows()), 0)
tk.MustQuery("select * from `CLUSTER_statements_summary_history`")
require.NotNil(t, re)
require.Greater(t, len(re.Rows()), 0)
tk.MustExec("set @@global.tidb_enable_stmt_summary=0")
re = tk.MustQuery("select * from `CLUSTER_statements_summary`")
require.NotNil(t, re)
require.Equal(t, 0, len(re.Rows()))
tk.MustQuery("select * from `CLUSTER_statements_summary_history`")
require.NotNil(t, re)
require.Equal(t, 0, len(re.Rows()))
}

tk.MustExec("use information_schema")
tk.MustExec("set @@global.tidb_enable_stmt_summary=1")
tk.MustExec("set time_zone = '+08:00';")
tk.MustQuery("select count(*) from `CLUSTER_SLOW_QUERY`").Check(testkit.Rows("2"))
tk.MustQuery("select time from `CLUSTER_SLOW_QUERY` where time='2019-02-12 19:33:56.571953'").Check(testkit.RowsWithSep("|", "2019-02-12 19:33:56.571953"))
tk.MustQuery("select count(*) from `CLUSTER_PROCESSLIST`").Check(testkit.Rows("1"))
tk.MustQuery("select * from `CLUSTER_PROCESSLIST`").Check(testkit.Rows(fmt.Sprintf(":10080 1 root 127.0.0.1 <nil> Query 9223372036 %s <nil> 0 0 ", "")))
tk.MustQuery("select query_time, conn_id from `CLUSTER_SLOW_QUERY` order by time limit 1").Check(testkit.Rows("4.895492 6"))
tk.MustQuery("select count(*) from `CLUSTER_SLOW_QUERY` group by digest").Check(testkit.Rows("1", "1"))
tk.MustQuery("select digest, count(*) from `CLUSTER_SLOW_QUERY` group by digest order by digest").Check(testkit.Rows("124acb3a0bec903176baca5f9da00b4e7512a41c93b417923f26502edeb324cc 1", "42a1c8aae6f133e934d4bf0147491709a8812ea05ff8819ec522780fe657b772 1"))
tk.MustQuery(`select length(query) as l,time from information_schema.cluster_slow_query where time > "2019-02-12 19:33:56" order by abs(l) desc limit 10;`).Check(testkit.Rows("21 2019-02-12 19:33:56.571953"))
tk.MustQuery("select count(*) from `CLUSTER_SLOW_QUERY` where time > now() group by digest").Check(testkit.Rows())
re := tk.MustQuery("select * from `CLUSTER_statements_summary`")
require.NotNil(t, re)
require.Greater(t, len(re.Rows()), 0)
// Test for TiDB issue 14915.
re = tk.MustQuery("select sum(exec_count*avg_mem) from cluster_statements_summary_history group by schema_name,digest,digest_text;")
require.NotNil(t, re)
require.Greater(t, len(re.Rows()), 0)
tk.MustQuery("select * from `CLUSTER_statements_summary_history`")
require.NotNil(t, re)
require.Greater(t, len(re.Rows()), 0)
tk.MustExec("set @@global.tidb_enable_stmt_summary=0")
re = tk.MustQuery("select * from `CLUSTER_statements_summary`")
require.NotNil(t, re)
require.Equal(t, 0, len(re.Rows()))
tk.MustQuery("select * from `CLUSTER_statements_summary_history`")
require.NotNil(t, re)
require.Equal(t, 0, len(re.Rows()))
}

func SubTestSelectClusterTablePrivilege(t *testing.T) {
Expand Down
2 changes: 2 additions & 0 deletions sessionctx/variable/removed.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ const (
tiDBEnableGlobalTemporaryTable = "tidb_enable_global_temporary_table"
tiDBSlowLogMasking = "tidb_slow_log_masking"
placementChecks = "placement_checks"
tiDBEnableStreaming = "tidb_enable_streaming"
tiDBOptBCJ = "tidb_opt_broadcast_join"
)

Expand All @@ -46,6 +47,7 @@ var removedSysVars = map[string]string{
tiDBMemQuotaTopn: "use tidb_mem_quota_query instead",
tiDBMemQuotaIndexLookupReader: "use tidb_mem_quota_query instead",
tiDBMemQuotaIndexLookupJoin: "use tidb_mem_quota_query instead",
tiDBEnableStreaming: "streaming is no longer supported",
tiDBOptBCJ: "tidb_opt_broadcast_join is removed and use tidb_allow_mpp instead",
}

Expand Down
12 changes: 0 additions & 12 deletions sessionctx/variable/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -750,10 +750,6 @@ type SessionVars struct {
// WaitSplitRegionTimeout defines the split region timeout.
WaitSplitRegionTimeout uint64

// EnableStreaming indicates whether the coprocessor request can use streaming API.
// TODO: remove this after tidb-server configuration "enable-streaming' removed.
EnableStreaming bool

// EnableChunkRPC indicates whether the coprocessor request can use chunk API.
EnableChunkRPC bool

Expand Down Expand Up @@ -1269,14 +1265,6 @@ func NewSessionVars() *SessionVars {
MaxChunkSize: DefMaxChunkSize,
}
vars.DMLBatchSize = DefDMLBatchSize
var enableStreaming string
if config.GetGlobalConfig().EnableStreaming {
enableStreaming = "1"
} else {
enableStreaming = "0"
}
terror.Log(vars.SetSystemVar(TiDBEnableStreaming, enableStreaming))

vars.AllowBatchCop = DefTiDBAllowBatchCop
vars.allowMPPExecution = DefTiDBAllowMPPExecution
vars.HashExchangeWithNewCollation = DefTiDBHashExchangeWithNewCollation
Expand Down
5 changes: 0 additions & 5 deletions sessionctx/variable/sysvar.go
Original file line number Diff line number Diff line change
Expand Up @@ -528,11 +528,6 @@ var defaultSysVars = []*SysVar{
s.MemQuotaQuery = TidbOptInt64(val, config.GetGlobalConfig().MemQuotaQuery)
return nil
}},
// Deprecated: tidb_enable_streaming
{Scope: ScopeSession, Name: TiDBEnableStreaming, Value: Off, Type: TypeBool, skipInit: true, Hidden: true, SetSession: func(s *SessionVars, val string) error {
s.EnableStreaming = TiDBOptOn(val)
return nil
}},
{Scope: ScopeSession, Name: TiDBEnableChunkRPC, Value: On, Type: TypeBool, skipInit: true, SetSession: func(s *SessionVars, val string) error {
s.EnableChunkRPC = TiDBOptOn(val)
return nil
Expand Down
3 changes: 0 additions & 3 deletions sessionctx/variable/tidb_vars.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,9 +127,6 @@ const (
// TiDBDisableTxnAutoRetry disables transaction auto retry.
TiDBDisableTxnAutoRetry = "tidb_disable_txn_auto_retry"

// Deprecated: tidb_enable_streaming enables TiDB to use streaming API for coprocessor requests.
TiDBEnableStreaming = "tidb_enable_streaming"

// TiDBEnableChunkRPC enables TiDB to use Chunk format for coprocessor requests.
TiDBEnableChunkRPC = "tidb_enable_chunk_rpc"

Expand Down
13 changes: 0 additions & 13 deletions sessionctx/variable/varsutil_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -234,19 +234,6 @@ func TestVarsutil(t *testing.T) {
require.NoError(t, err)
require.Equal(t, config.HideConfig(string(bVal)), val)

err = SetSessionSystemVar(v, TiDBEnableStreaming, "1")
require.NoError(t, err)
val, err = GetSessionOrGlobalSystemVar(v, TiDBEnableStreaming)
require.NoError(t, err)
require.Equal(t, "ON", val)
require.True(t, v.EnableStreaming)
err = SetSessionSystemVar(v, TiDBEnableStreaming, "0")
require.NoError(t, err)
val, err = GetSessionOrGlobalSystemVar(v, TiDBEnableStreaming)
require.NoError(t, err)
require.Equal(t, "OFF", val)
require.False(t, v.EnableStreaming)

require.Equal(t, DefTiDBOptimizerSelectivityLevel, v.OptimizerSelectivityLevel)
err = SetSessionSystemVar(v, TiDBOptimizerSelectivityLevel, "1")
require.NoError(t, err)
Expand Down
Loading

0 comments on commit 9a4ca3c

Please sign in to comment.