Skip to content

Commit

Permalink
executor, store: fixed daylight saving time issue (pingcap#6823)
Browse files Browse the repository at this point in the history
Thank you for working on TiDB! Please read TiDB's [CONTRIBUTING](https://github.com/pingcap/tidb/blob/master/CONTRIBUTING.md) document **BEFORE** filing this PR.

## What have you changed? (mandatory)

During coprocessor dag task,  it first uses timezone `name`, if non-empty, to get legitimate timezone variable. To achieve this, we need to push down such data into tikv which leads to change the logic of building pushdown request. The logic I mentioned mainly resides in `executor` package. 

I change `timeZoneOffset` to `zone` and add second return parameter `name string`.  The intentioned of doing this to adopt the convention of `time` package. 

For the same purpose, I change `GetTimeZone` to `Location`. As you can see, in `time` package, timezone was bind to `Location`. 


## What are the type of the changes (mandatory)?
- Bug fix (non-breaking change which fixes an issue)


## How has this PR been tested (mandatory)?
unit-test
integration-test will be added shortly
  • Loading branch information
zhexuany authored Jul 16, 2018
1 parent b729a60 commit 7c18d24
Show file tree
Hide file tree
Showing 19 changed files with 152 additions and 67 deletions.
2 changes: 1 addition & 1 deletion ddl/column_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -275,7 +275,7 @@ func (s *testColumnSuite) checkColumnKVExist(ctx sessionctx.Context, t table.Tab
}
colMap := make(map[int64]*types.FieldType)
colMap[col.ID] = &col.FieldType
rowMap, err := tablecodec.DecodeRow(data, colMap, ctx.GetSessionVars().GetTimeZone())
rowMap, err := tablecodec.DecodeRow(data, colMap, ctx.GetSessionVars().Location())
if err != nil {
return errors.Trace(err)
}
Expand Down
2 changes: 1 addition & 1 deletion distsql/select_result.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ func (r *selectResult) getSelectResp() error {
func (r *selectResult) readRowsData(chk *chunk.Chunk) (err error) {
rowsData := r.selectResp.Chunks[r.respChkIdx].RowsData
maxChunkSize := r.ctx.GetSessionVars().MaxChunkSize
decoder := codec.NewDecoder(chk, r.ctx.GetSessionVars().GetTimeZone())
decoder := codec.NewDecoder(chk, r.ctx.GetSessionVars().Location())
for chk.NumRows() < maxChunkSize && len(rowsData) > 0 {
for i := 0; i < r.rowLen; i++ {
rowsData, err = decoder.DecodeOne(rowsData, i, r.fieldTypes[i])
Expand Down
2 changes: 1 addition & 1 deletion distsql/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ func (r *streamResult) readDataIfNecessary(ctx context.Context) error {
func (r *streamResult) flushToChunk(chk *chunk.Chunk) (err error) {
remainRowsData := r.curr.RowsData
maxChunkSize := r.ctx.GetSessionVars().MaxChunkSize
decoder := codec.NewDecoder(chk, r.ctx.GetSessionVars().GetTimeZone())
decoder := codec.NewDecoder(chk, r.ctx.GetSessionVars().Location())
for chk.NumRows() < maxChunkSize && len(remainRowsData) > 0 {
for i := 0; i < r.rowLen; i++ {
remainRowsData, err = decoder.DecodeOne(remainRowsData, i, r.fieldTypes[i])
Expand Down
6 changes: 3 additions & 3 deletions executor/admin.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ func (e *CheckIndexRangeExec) Open(ctx context.Context) error {
func (e *CheckIndexRangeExec) buildDAGPB() (*tipb.DAGRequest, error) {
dagReq := &tipb.DAGRequest{}
dagReq.StartTs = e.ctx.Txn().StartTS()
dagReq.TimeZoneOffset = timeZoneOffset(e.ctx)
dagReq.TimeZoneName, dagReq.TimeZoneOffset = zone(e.ctx)
sc := e.ctx.GetSessionVars().StmtCtx
dagReq.Flags = statementContextToFlags(sc)
for i := range e.schema.Columns {
Expand Down Expand Up @@ -219,7 +219,7 @@ func (e *RecoverIndexExec) constructLimitPB(count uint64) *tipb.Executor {
func (e *RecoverIndexExec) buildDAGPB(txn kv.Transaction, limitCnt uint64) (*tipb.DAGRequest, error) {
dagReq := &tipb.DAGRequest{}
dagReq.StartTs = txn.StartTS()
dagReq.TimeZoneOffset = timeZoneOffset(e.ctx)
dagReq.TimeZoneName, dagReq.TimeZoneOffset = zone(e.ctx)
sc := e.ctx.GetSessionVars().StmtCtx
dagReq.Flags = statementContextToFlags(sc)
for i := range e.columns {
Expand Down Expand Up @@ -646,7 +646,7 @@ func (e *CleanupIndexExec) Open(ctx context.Context) error {
func (e *CleanupIndexExec) buildIdxDAGPB(txn kv.Transaction) (*tipb.DAGRequest, error) {
dagReq := &tipb.DAGRequest{}
dagReq.StartTs = txn.StartTS()
dagReq.TimeZoneOffset = timeZoneOffset(e.ctx)
dagReq.TimeZoneName, dagReq.TimeZoneOffset = zone(e.ctx)
sc := e.ctx.GetSessionVars().StmtCtx
dagReq.Flags = statementContextToFlags(sc)
for i := range e.idxCols {
Expand Down
2 changes: 1 addition & 1 deletion executor/analyze.go
Original file line number Diff line number Diff line change
Expand Up @@ -331,7 +331,7 @@ func (e *AnalyzeColumnsExec) buildStats() (hists []*statistics.Histogram, cms []
collectors[i].MergeSampleCollector(sc, statistics.SampleCollectorFromProto(rc))
}
}
timeZone := e.ctx.GetSessionVars().GetTimeZone()
timeZone := e.ctx.GetSessionVars().Location()
if e.pkInfo != nil {
pkHist.ID = e.pkInfo.ID
err = pkHist.DecodeTo(&e.pkInfo.FieldType, timeZone)
Expand Down
9 changes: 6 additions & 3 deletions executor/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -1231,6 +1231,7 @@ func (b *executorBuilder) buildDelete(v *plan.Delete) Executor {
}

func (b *executorBuilder) buildAnalyzeIndexPushdown(task plan.AnalyzeIndexTask) *AnalyzeIndexExec {
_, offset := zone(b.ctx)
e := &AnalyzeIndexExec{
ctx: b.ctx,
tblInfo: task.TableInfo,
Expand All @@ -1240,7 +1241,7 @@ func (b *executorBuilder) buildAnalyzeIndexPushdown(task plan.AnalyzeIndexTask)
Tp: tipb.AnalyzeType_TypeIndex,
StartTs: math.MaxUint64,
Flags: statementContextToFlags(b.ctx.GetSessionVars().StmtCtx),
TimeZoneOffset: timeZoneOffset(b.ctx),
TimeZoneOffset: offset,
},
}
e.analyzePB.IdxReq = &tipb.AnalyzeIndexReq{
Expand All @@ -1261,6 +1262,8 @@ func (b *executorBuilder) buildAnalyzeColumnsPushdown(task plan.AnalyzeColumnsTa
keepOrder = true
cols = append([]*model.ColumnInfo{task.PKInfo}, cols...)
}

_, offset := zone(b.ctx)
e := &AnalyzeColumnsExec{
ctx: b.ctx,
tblInfo: task.TableInfo,
Expand All @@ -1272,7 +1275,7 @@ func (b *executorBuilder) buildAnalyzeColumnsPushdown(task plan.AnalyzeColumnsTa
Tp: tipb.AnalyzeType_TypeColumn,
StartTs: math.MaxUint64,
Flags: statementContextToFlags(b.ctx.GetSessionVars().StmtCtx),
TimeZoneOffset: timeZoneOffset(b.ctx),
TimeZoneOffset: offset,
},
}
depth := int32(defaultCMSketchDepth)
Expand Down Expand Up @@ -1336,7 +1339,7 @@ func constructDistExec(sctx sessionctx.Context, plans []plan.PhysicalPlan) ([]*t
func (b *executorBuilder) constructDAGReq(plans []plan.PhysicalPlan) (dagReq *tipb.DAGRequest, streaming bool, err error) {
dagReq = &tipb.DAGRequest{}
dagReq.StartTs = b.getStartTS()
dagReq.TimeZoneOffset = timeZoneOffset(b.ctx)
dagReq.TimeZoneName, dagReq.TimeZoneOffset = zone(b.ctx)
sc := b.ctx.GetSessionVars().StmtCtx
dagReq.Flags = statementContextToFlags(sc)
dagReq.Executors, streaming, err = constructDistExec(b.ctx, plans)
Expand Down
17 changes: 13 additions & 4 deletions executor/distsql.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,11 +118,20 @@ func closeAll(objs ...Closeable) error {
return errors.Trace(err)
}

// timeZoneOffset returns the local time zone offset in seconds.
func timeZoneOffset(ctx sessionctx.Context) int64 {
loc := ctx.GetSessionVars().GetTimeZone()
// zone returns the current timezone name and timezone offset in seconds.
// In compatible with MySQL, we change `Local` to `System`.
// TODO: Golang team plan to return system timezone name intead of
// returning `Local` when `loc` is `time.Local`. We need keep an eye on this.
func zone(sctx sessionctx.Context) (string, int64) {
loc := sctx.GetSessionVars().Location()
_, offset := time.Now().In(loc).Zone()
return int64(offset)
var name string
name = loc.String()
if name == "Local" {
name = "System"
}

return name, int64(offset)
}

// statementContextToFlags converts StatementContext to tipb.SelectRequest.Flags.
Expand Down
1 change: 1 addition & 0 deletions executor/executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1938,6 +1938,7 @@ func (s *testSuite) TestTimestampTimeZone(c *C) {
r.Check(testkit.Rows("123381351 1734 2014-03-31 08:57:10 127.0.0.1")) // Cover IndexLookupExec

// For issue https://github.com/pingcap/tidb/issues/3485
tk.MustExec("set time_zone = 'Asia/Shanghai'")
tk.MustExec("drop table if exists t1")
tk.MustExec(`CREATE TABLE t1 (
id bigint(20) NOT NULL AUTO_INCREMENT,
Expand Down
2 changes: 1 addition & 1 deletion executor/prepared.go
Original file line number Diff line number Diff line change
Expand Up @@ -264,7 +264,7 @@ func CompileExecutePreparedStmt(ctx sessionctx.Context, ID uint32, args ...inter
func ResetStmtCtx(ctx sessionctx.Context, s ast.StmtNode) (err error) {
sessVars := ctx.GetSessionVars()
sc := new(stmtctx.StatementContext)
sc.TimeZone = sessVars.GetTimeZone()
sc.TimeZone = sessVars.Location()
sc.MemTracker = memory.NewTracker(s.Text(), sessVars.MemQuotaQuery)
switch config.GetGlobalConfig().OOMAction {
case config.OOMActionCancel:
Expand Down
14 changes: 7 additions & 7 deletions expression/builtin_time.go
Original file line number Diff line number Diff line change
Expand Up @@ -1889,8 +1889,8 @@ func (b *builtinSysDateWithFspSig) evalTime(row types.Row) (d types.Time, isNull
return types.Time{}, isNull, errors.Trace(err)
}

tz := b.ctx.GetSessionVars().GetTimeZone()
now := time.Now().In(tz)
loc := b.ctx.GetSessionVars().Location()
now := time.Now().In(loc)
result, err := convertTimeToMysqlTime(now, int(fsp))
if err != nil {
return types.Time{}, true, errors.Trace(err)
Expand All @@ -1911,7 +1911,7 @@ func (b *builtinSysDateWithoutFspSig) Clone() builtinFunc {
// evalTime evals SYSDATE().
// See https://dev.mysql.com/doc/refman/5.7/en/date-and-time-functions.html#function_sysdate
func (b *builtinSysDateWithoutFspSig) evalTime(row types.Row) (d types.Time, isNull bool, err error) {
tz := b.ctx.GetSessionVars().GetTimeZone()
tz := b.ctx.GetSessionVars().Location()
now := time.Now().In(tz)
result, err := convertTimeToMysqlTime(now, 0)
if err != nil {
Expand Down Expand Up @@ -1947,7 +1947,7 @@ func (b *builtinCurrentDateSig) Clone() builtinFunc {
// evalTime evals CURDATE().
// See https://dev.mysql.com/doc/refman/5.7/en/date-and-time-functions.html#function_curdate
func (b *builtinCurrentDateSig) evalTime(row types.Row) (d types.Time, isNull bool, err error) {
tz := b.ctx.GetSessionVars().GetTimeZone()
tz := b.ctx.GetSessionVars().Location()
year, month, day := time.Now().In(tz).Date()
result := types.Time{
Time: types.FromDate(year, int(month), day, 0, 0, 0, 0),
Expand Down Expand Up @@ -2002,7 +2002,7 @@ func (b *builtinCurrentTime0ArgSig) Clone() builtinFunc {
}

func (b *builtinCurrentTime0ArgSig) evalDuration(row types.Row) (types.Duration, bool, error) {
tz := b.ctx.GetSessionVars().GetTimeZone()
tz := b.ctx.GetSessionVars().Location()
dur := time.Now().In(tz).Format(types.TimeFormat)
res, err := types.ParseDuration(dur, types.MinFsp)
if err != nil {
Expand All @@ -2026,7 +2026,7 @@ func (b *builtinCurrentTime1ArgSig) evalDuration(row types.Row) (types.Duration,
if err != nil {
return types.Duration{}, true, errors.Trace(err)
}
tz := b.ctx.GetSessionVars().GetTimeZone()
tz := b.ctx.GetSessionVars().Location()
dur := time.Now().In(tz).Format(types.TimeFSPFormat)
res, err := types.ParseDuration(dur, int(fsp))
if err != nil {
Expand Down Expand Up @@ -2310,7 +2310,7 @@ func evalNowWithFsp(ctx sessionctx.Context, fsp int) (types.Time, bool, error) {
return types.Time{}, true, errors.Trace(err)
}

err = result.ConvertTimeZone(time.Local, ctx.GetSessionVars().GetTimeZone())
err = result.ConvertTimeZone(time.Local, ctx.GetSessionVars().Location())
if err != nil {
return types.Time{}, true, errors.Trace(err)
}
Expand Down
2 changes: 1 addition & 1 deletion expression/builtin_time_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2212,7 +2212,7 @@ func (s *testEvaluatorSuite) TestLastDay(c *C) {

func (s *testEvaluatorSuite) TestWithTimeZone(c *C) {
sv := s.ctx.GetSessionVars()
originTZ := sv.GetTimeZone()
originTZ := sv.Location()
sv.TimeZone, _ = time.LoadLocation("Asia/Tokyo")
defer func() {
sv.TimeZone = originTZ
Expand Down
2 changes: 1 addition & 1 deletion expression/helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ func GetTimeValue(ctx sessionctx.Context, v interface{}, tp byte, fsp int) (d ty
if upperX == strings.ToUpper(ast.CurrentTimestamp) {
value.Time = types.FromGoTime(defaultTime)
if tp == mysql.TypeTimestamp {
err = value.ConvertTimeZone(time.Local, ctx.GetSessionVars().GetTimeZone())
err = value.ConvertTimeZone(time.Local, ctx.GetSessionVars().Location())
if err != nil {
return d, errors.Trace(err)
}
Expand Down
19 changes: 17 additions & 2 deletions expression/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2663,7 +2663,7 @@ func (s *testIntegrationSuite) TestCompareBuiltin(c *C) {
tk.MustExec(`insert into t2 values(1, 1.1, "2017-08-01 12:01:01", "12:01:01", "abcdef", 0b10101)`)

result = tk.MustQuery("select coalesce(NULL, a), coalesce(NULL, b, a), coalesce(c, NULL, a, b), coalesce(d, NULL), coalesce(d, c), coalesce(NULL, NULL, e, 1), coalesce(f), coalesce(1, a, b, c, d, e, f) from t2")
result.Check(testkit.Rows(fmt.Sprintf("1 1.1 2017-08-01 12:01:01 12:01:01 %s 12:01:01 abcdef 21 1", time.Now().In(tk.Se.GetSessionVars().GetTimeZone()).Format("2006-01-02"))))
result.Check(testkit.Rows(fmt.Sprintf("1 1.1 2017-08-01 12:01:01 12:01:01 %s 12:01:01 abcdef 21 1", time.Now().In(tk.Se.GetSessionVars().Location()).Format("2006-01-02"))))

// nullif
result = tk.MustQuery(`SELECT NULLIF(NULL, 1), NULLIF(1, NULL), NULLIF(1, 1), NULLIF(NULL, NULL);`)
Expand Down Expand Up @@ -2727,9 +2727,24 @@ func (s *testIntegrationSuite) TestCompareBuiltin(c *C) {
result = tk.MustQuery(`select least(cast("2017-01-01" as datetime), "123", "234", cast("2018-01-01" as date)), least(cast("2017-01-01" as date), "123", null)`)
result.Check(testkit.Rows("123 <nil>"))
tk.MustQuery("show warnings").Check(testutil.RowsWithSep("|", "Warning|1292|invalid time format: '123'", "Warning|1292|invalid time format: '234'", "Warning|1292|invalid time format: '123'"))

tk.MustQuery(`select 1 < 17666000000000000000, 1 > 17666000000000000000, 1 = 17666000000000000000`).Check(testkit.Rows("1 0 0"))

tk.MustExec("drop table if exists t")
// insert value at utc timezone
tk.MustExec("set time_zone = '+00:00'")
tk.MustExec("create table t(a timestamp)")
tk.MustExec("insert into t value('1991-05-06 04:59:28')")
// check daylight saving time in Asia/Shanghai
tk.MustExec("set time_zone='Asia/Shanghai'")
tk.MustQuery("select * from t").Check(testkit.Rows("1991-05-06 13:59:28"))
// insert an nonexistent time
tk.MustExec("set time_zone = 'America/Los_Angeles'")
_, err := tk.Exec("insert into t value('2011-03-13 02:00:00')")
c.Assert(err, NotNil)
// reset timezone to a +8 offset
tk.MustExec("set time_zone = '+08:00'")
tk.MustQuery("select * from t").Check(testkit.Rows("1991-05-06 12:59:28"))

tk.MustExec("drop table if exists t")
tk.MustExec("create table t(a bigint unsigned)")
tk.MustExec("insert into t value(17666000000000000000)")
Expand Down
4 changes: 2 additions & 2 deletions sessionctx/variable/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -407,8 +407,8 @@ func (s *SessionVars) GetNextPreparedStmtID() uint32 {
return s.preparedStmtID
}

// GetTimeZone returns the value of time_zone session variable.
func (s *SessionVars) GetTimeZone() *time.Location {
// Location returns the value of time_zone session variable. If it is nil, then return time.Local.
func (s *SessionVars) Location() *time.Location {
loc := s.TimeZone
if loc == nil {
loc = time.Local
Expand Down
58 changes: 57 additions & 1 deletion store/mockstore/mocktikv/cop_handler_dag.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,9 @@ package mocktikv

import (
"bytes"
"fmt"
"io"
"sync"
"time"

"github.com/golang/protobuf/proto"
Expand Down Expand Up @@ -43,6 +45,50 @@ import (

var dummySlice = make([]byte, 0)

// locCache is a simple map with lock. It stores all used timezone during the lifetime of tidb instance.
// Talked with Golang team about whether they can have some forms of cache policy available for programmer,
// they suggests that only programmers knows which one is best for their use case.
// For detail, please refer to: https://github.com/golang/go/issues/26106
type locCache struct {
sync.RWMutex
// locMap stores locations used in past and can be retrieved by a timezone's name.
locMap map[string]*time.Location
}

// init initializes `locCache`.
func init() {
LocCache = &locCache{}
LocCache.locMap = make(map[string]*time.Location)
}

// LocCache is a simple cache policy to improve the performance of 'time.LoadLocation'.
var LocCache *locCache

// getLoc first trying to load location from a cache map. If nothing found in such map, then call
// `time.LocadLocation` to get a timezone location. After trying both way, an error wil be returned
// if valid Location is not found.
func (lm *locCache) getLoc(name string) (*time.Location, error) {
if name == "System" {
name = "Local"
}
lm.RLock()
if v, ok := lm.locMap[name]; ok {
lm.RUnlock()
return v, nil
}

if loc, err := time.LoadLocation(name); err == nil {
lm.RUnlock()
lm.Lock()
lm.locMap[name] = loc
lm.Unlock()
return loc, nil
}

lm.RUnlock()
return nil, errors.New(fmt.Sprintf("invalid name for timezone %s", name))
}

type dagContext struct {
dagReq *tipb.DAGRequest
keyRanges []*coprocessor.KeyRange
Expand Down Expand Up @@ -100,7 +146,17 @@ func (h *rpcHandler) buildDAGExecutor(req *coprocessor.Request) (*dagContext, ex
return nil, nil, nil, errors.Trace(err)
}
sc := flagsToStatementContext(dagReq.Flags)
sc.TimeZone = time.FixedZone("UTC", int(dagReq.TimeZoneOffset))

// retrieving timezone by name first. When name is set, it means we need
// consider daylight saving time. If it is not, we can use offset.
if dagReq.TimeZoneName != "" {
if sc.TimeZone, err = LocCache.getLoc(dagReq.TimeZoneName); err != nil {
return nil, nil, nil, errors.Trace(err)
}
dagReq.TimeZoneName = sc.TimeZone.String()
} else {
sc.TimeZone = time.FixedZone("UTC", int(dagReq.TimeZoneOffset))
}
ctx := &dagContext{
dagReq: dagReq,
keyRanges: req.Ranges,
Expand Down
Loading

0 comments on commit 7c18d24

Please sign in to comment.