Skip to content

Commit

Permalink
domain: move infoSyncer to util to avoid cycle import (pingcap#12544)
Browse files Browse the repository at this point in the history
  • Loading branch information
crazycs520 authored Oct 8, 2019
1 parent 4d81fba commit 52647f0
Show file tree
Hide file tree
Showing 6 changed files with 32 additions and 23 deletions.
9 changes: 5 additions & 4 deletions domain/domain.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"github.com/pingcap/tidb/bindinfo"
"github.com/pingcap/tidb/config"
"github.com/pingcap/tidb/ddl"
domainutil "github.com/pingcap/tidb/domain/util"
"github.com/pingcap/tidb/infoschema"
"github.com/pingcap/tidb/infoschema/perfschema"
"github.com/pingcap/tidb/kv"
Expand Down Expand Up @@ -62,7 +63,7 @@ type Domain struct {
statsHandle unsafe.Pointer
statsLease time.Duration
ddl ddl.DDL
info *InfoSyncer
info *domainutil.InfoSyncer
m sync.Mutex
SchemaValidator SchemaValidator
sysSessionPool *sessionPool
Expand Down Expand Up @@ -290,7 +291,7 @@ func (do *Domain) DDL() ddl.DDL {
}

// InfoSyncer gets infoSyncer from domain.
func (do *Domain) InfoSyncer() *InfoSyncer {
func (do *Domain) InfoSyncer() *domainutil.InfoSyncer {
return do.info
}

Expand Down Expand Up @@ -420,7 +421,7 @@ func (do *Domain) topNSlowQueryLoop() {
func (do *Domain) infoSyncerKeeper() {
defer do.wg.Done()
defer recoverInDomain("infoSyncerKeeper", false)
ticker := time.NewTicker(time.Second * time.Duration(InfoSessionTTL) / 2)
ticker := time.NewTicker(time.Second * time.Duration(domainutil.InfoSessionTTL) / 2)
defer ticker.Stop()
for {
select {
Expand Down Expand Up @@ -660,7 +661,7 @@ func (do *Domain) Init(ddlLease time.Duration, sysFactory func(*Domain) (pools.R
if err != nil {
return err
}
do.info = NewInfoSyncer(do.ddl.GetID(), do.etcdClient)
do.info = domainutil.NewInfoSyncer(do.ddl.GetID(), do.etcdClient)
err = do.info.Init(ctx)
if err != nil {
return err
Expand Down
4 changes: 2 additions & 2 deletions domain/domain_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -372,7 +372,7 @@ func (*testSuite) TestT(c *C) {
beforeTS := variable.GoTimeToTS(time.Now())
infoSyncer.ReportMinStartTS(dom.Store())
afterTS := variable.GoTimeToTS(time.Now())
c.Assert(infoSyncer.minStartTS > beforeTS && infoSyncer.minStartTS < afterTS, IsFalse)
c.Assert(infoSyncer.GetMinStartTS() > beforeTS && infoSyncer.GetMinStartTS() < afterTS, IsFalse)
lowerLimit := time.Now().Add(-time.Duration(kv.MaxTxnTimeUse) * time.Millisecond)
validTS := variable.GoTimeToTS(lowerLimit.Add(time.Minute))
sm.PS = []*util.ProcessInfo{
Expand All @@ -383,7 +383,7 @@ func (*testSuite) TestT(c *C) {
}
infoSyncer.SetSessionManager(sm)
infoSyncer.ReportMinStartTS(dom.Store())
c.Assert(infoSyncer.minStartTS == validTS, IsTrue)
c.Assert(infoSyncer.GetMinStartTS() == validTS, IsTrue)

err = store.Close()
c.Assert(err, IsNil)
Expand Down
8 changes: 7 additions & 1 deletion domain/info.go → domain/util/info.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

package domain
package util

import (
"context"
Expand Down Expand Up @@ -160,6 +160,12 @@ func (is *InfoSyncer) RemoveServerInfo() {
}
}

// GetMinStartTS get min start timestamp.
// Export for testing.
func (is *InfoSyncer) GetMinStartTS() uint64 {
return is.minStartTS
}

// storeMinStartTS stores self server min start timestamp to etcd.
func (is *InfoSyncer) storeMinStartTS(ctx context.Context) error {
if is.etcdCli == nil {
Expand Down
17 changes: 9 additions & 8 deletions server/http_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ import (
"github.com/pingcap/parser/terror"
"github.com/pingcap/tidb/config"
"github.com/pingcap/tidb/domain"
"github.com/pingcap/tidb/domain/util"
"github.com/pingcap/tidb/infoschema"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/meta"
Expand Down Expand Up @@ -1447,7 +1448,7 @@ func (h *mvccTxnHandler) handleMvccGetByTxn(params map[string]string) (interface
// serverInfo is used to report the servers info when do http request.
type serverInfo struct {
IsOwner bool `json:"is_owner"`
*domain.ServerInfo
*util.ServerInfo
}

// ServeHTTP handles request of ddl server info.
Expand All @@ -1466,11 +1467,11 @@ func (h serverInfoHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) {

// clusterServerInfo is used to report cluster servers info when do http request.
type clusterServerInfo struct {
ServersNum int `json:"servers_num,omitempty"`
OwnerID string `json:"owner_id"`
IsAllServerVersionConsistent bool `json:"is_all_server_version_consistent,omitempty"`
AllServersDiffVersions []domain.ServerVersionInfo `json:"all_servers_diff_versions,omitempty"`
AllServersInfo map[string]*domain.ServerInfo `json:"all_servers_info,omitempty"`
ServersNum int `json:"servers_num,omitempty"`
OwnerID string `json:"owner_id"`
IsAllServerVersionConsistent bool `json:"is_all_server_version_consistent,omitempty"`
AllServersDiffVersions []util.ServerVersionInfo `json:"all_servers_diff_versions,omitempty"`
AllServersInfo map[string]*util.ServerInfo `json:"all_servers_info,omitempty"`
}

// ServeHTTP handles request of all ddl servers info.
Expand All @@ -1496,8 +1497,8 @@ func (h allServerInfoHandler) ServeHTTP(w http.ResponseWriter, req *http.Request
log.Error(err)
return
}
allVersionsMap := map[domain.ServerVersionInfo]struct{}{}
allVersions := make([]domain.ServerVersionInfo, 0, len(allServersInfo))
allVersionsMap := map[util.ServerVersionInfo]struct{}{}
allVersions := make([]util.ServerVersionInfo, 0, len(allServersInfo))
for _, v := range allServersInfo {
if _, ok := allVersionsMap[v.ServerVersionInfo]; ok {
continue
Expand Down
4 changes: 2 additions & 2 deletions store/tikv/gcworker/gc_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ import (
"github.com/pingcap/parser/terror"
pd "github.com/pingcap/pd/client"
"github.com/pingcap/tidb/ddl/util"
"github.com/pingcap/tidb/domain"
domainutil "github.com/pingcap/tidb/domain/util"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/metrics"
"github.com/pingcap/tidb/privilege"
Expand Down Expand Up @@ -323,7 +323,7 @@ func (w *GCWorker) checkPrepare(ctx context.Context) (bool, uint64, error) {

// calculateNewSafePoint uses the current global transaction min start timestamp to calculate the new safe point.
func (w *GCWorker) calSafePointByMinStartTS(safePoint time.Time) time.Time {
kvs, err := w.store.GetSafePointKV().GetWithPrefix(domain.ServerMinStartTSPath)
kvs, err := w.store.GetSafePointKV().GetWithPrefix(domainutil.ServerMinStartTSPath)
if err != nil {
logutil.BgLogger().Warn("get all minStartTS failed", zap.Error(err))
return safePoint
Expand Down
13 changes: 7 additions & 6 deletions store/tikv/gcworker/gc_worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
pd "github.com/pingcap/pd/client"
"github.com/pingcap/tidb/ddl/util"
"github.com/pingcap/tidb/domain"
domainutil "github.com/pingcap/tidb/domain/util"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/session"
"github.com/pingcap/tidb/sessionctx/variable"
Expand Down Expand Up @@ -191,28 +192,28 @@ func (s *testGCWorkerSuite) TestGetOracleTime(c *C) {

func (s *testGCWorkerSuite) TestMinStartTS(c *C) {
spkv := s.store.GetSafePointKV()
err := spkv.Put(fmt.Sprintf("%s/%s", domain.ServerMinStartTSPath, "a"), strconv.FormatUint(math.MaxUint64, 10))
err := spkv.Put(fmt.Sprintf("%s/%s", domainutil.ServerMinStartTSPath, "a"), strconv.FormatUint(math.MaxUint64, 10))
c.Assert(err, IsNil)
now := time.Now()
sp := s.gcWorker.calSafePointByMinStartTS(now)
c.Assert(sp.Second(), Equals, now.Second())
err = spkv.Put(fmt.Sprintf("%s/%s", domain.ServerMinStartTSPath, "a"), "0")
err = spkv.Put(fmt.Sprintf("%s/%s", domainutil.ServerMinStartTSPath, "a"), "0")
c.Assert(err, IsNil)
sp = s.gcWorker.calSafePointByMinStartTS(now)
zeroTime := time.Unix(0, oracle.ExtractPhysical(0)*1e6)
c.Assert(sp, Equals, zeroTime)

err = spkv.Put(fmt.Sprintf("%s/%s", domain.ServerMinStartTSPath, "a"), "0")
err = spkv.Put(fmt.Sprintf("%s/%s", domainutil.ServerMinStartTSPath, "a"), "0")
c.Assert(err, IsNil)
err = spkv.Put(fmt.Sprintf("%s/%s", domain.ServerMinStartTSPath, "b"), "1")
err = spkv.Put(fmt.Sprintf("%s/%s", domainutil.ServerMinStartTSPath, "b"), "1")
c.Assert(err, IsNil)
sp = s.gcWorker.calSafePointByMinStartTS(now)
c.Assert(sp, Equals, zeroTime)

err = spkv.Put(fmt.Sprintf("%s/%s", domain.ServerMinStartTSPath, "a"),
err = spkv.Put(fmt.Sprintf("%s/%s", domainutil.ServerMinStartTSPath, "a"),
strconv.FormatUint(variable.GoTimeToTS(now), 10))
c.Assert(err, IsNil)
err = spkv.Put(fmt.Sprintf("%s/%s", domain.ServerMinStartTSPath, "b"),
err = spkv.Put(fmt.Sprintf("%s/%s", domainutil.ServerMinStartTSPath, "b"),
strconv.FormatUint(variable.GoTimeToTS(now.Add(-20*time.Second)), 10))
c.Assert(err, IsNil)
sp = s.gcWorker.calSafePointByMinStartTS(now.Add(-10 * time.Second))
Expand Down

0 comments on commit 52647f0

Please sign in to comment.