Skip to content

Commit

Permalink
domain: fast new a etcd session when the session is stale in the sche…
Browse files Browse the repository at this point in the history
…maVersionSyncer (pingcap#7774)
  • Loading branch information
winkyao authored Sep 28, 2018
1 parent 0d979a2 commit 6a1e94f
Show file tree
Hide file tree
Showing 2 changed files with 29 additions and 9 deletions.
25 changes: 18 additions & 7 deletions ddl/syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@ import (
"math"
"strconv"
"sync"
"sync/atomic"
"time"
"unsafe"

"github.com/coreos/etcd/clientv3"
"github.com/coreos/etcd/clientv3/concurrency"
Expand Down Expand Up @@ -88,7 +90,7 @@ type SchemaSyncer interface {
type schemaVersionSyncer struct {
selfSchemaVerPath string
etcdCli *clientv3.Client
session *concurrency.Session
session unsafe.Pointer
mu struct {
sync.RWMutex
globalVerCh clientv3.WatchChan
Expand Down Expand Up @@ -143,23 +145,32 @@ func (s *schemaVersionSyncer) Init(ctx context.Context) error {
return errors.Trace(err)
}
logPrefix := fmt.Sprintf("[%s] %s", ddlPrompt, s.selfSchemaVerPath)
s.session, err = owner.NewSession(ctx, logPrefix, s.etcdCli, owner.NewSessionDefaultRetryCnt, SyncerSessionTTL)
session, err := owner.NewSession(ctx, logPrefix, s.etcdCli, owner.NewSessionDefaultRetryCnt, SyncerSessionTTL)
if err != nil {
return errors.Trace(err)
}
s.storeSession(session)

s.mu.Lock()
s.mu.globalVerCh = s.etcdCli.Watch(ctx, DDLGlobalSchemaVersion)
s.mu.Unlock()

err = PutKVToEtcd(ctx, s.etcdCli, keyOpDefaultRetryCnt, s.selfSchemaVerPath, InitialVersion,
clientv3.WithLease(s.session.Lease()))
clientv3.WithLease(s.loadSession().Lease()))
return errors.Trace(err)
}

func (s *schemaVersionSyncer) loadSession() *concurrency.Session {
return (*concurrency.Session)(atomic.LoadPointer(&s.session))
}

func (s *schemaVersionSyncer) storeSession(session *concurrency.Session) {
atomic.StorePointer(&s.session, (unsafe.Pointer)(session))
}

// Done implements SchemaSyncer.Done interface.
func (s *schemaVersionSyncer) Done() <-chan struct{} {
return s.session.Done()
return s.loadSession().Done()
}

// Restart implements SchemaSyncer.Restart interface.
Expand All @@ -176,12 +187,12 @@ func (s *schemaVersionSyncer) Restart(ctx context.Context) error {
if err != nil {
return errors.Trace(err)
}
s.session = session
s.storeSession(session)

childCtx, cancel := context.WithTimeout(ctx, keyOpDefaultTimeout)
defer cancel()
err = PutKVToEtcd(childCtx, s.etcdCli, putKeyRetryUnlimited, s.selfSchemaVerPath, InitialVersion,
clientv3.WithLease(s.session.Lease()))
clientv3.WithLease(s.loadSession().Lease()))

return errors.Trace(err)
}
Expand Down Expand Up @@ -219,7 +230,7 @@ func (s *schemaVersionSyncer) UpdateSelfVersion(ctx context.Context, version int
startTime := time.Now()
ver := strconv.FormatInt(version, 10)
err := PutKVToEtcd(ctx, s.etcdCli, putKeyNoRetry, s.selfSchemaVerPath, ver,
clientv3.WithLease(s.session.Lease()))
clientv3.WithLease(s.loadSession().Lease()))

metrics.UpdateSelfVersionHistogram.WithLabelValues(metrics.RetLabel(err)).Observe(time.Since(startTime).Seconds())
return errors.Trace(err)
Expand Down
13 changes: 11 additions & 2 deletions domain/domain.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/grpc-ecosystem/go-grpc-prometheus"
"github.com/ngaut/pools"
"github.com/pingcap/tidb/ast"
"github.com/pingcap/tidb/config"
"github.com/pingcap/tidb/ddl"
"github.com/pingcap/tidb/infoschema"
"github.com/pingcap/tidb/kv"
Expand All @@ -42,6 +43,7 @@ import (
log "github.com/sirupsen/logrus"
"golang.org/x/net/context"
"google.golang.org/grpc"
"google.golang.org/grpc/keepalive"
)

// Domain represents a storage space. Different domains can use the same database name.
Expand Down Expand Up @@ -405,16 +407,16 @@ func (do *Domain) loadSchemaInLoop(lease time.Duration) {
case <-syncer.Done():
// The schema syncer stops, we need stop the schema validator to synchronize the schema version.
log.Info("[ddl] reload schema in loop, schema syncer need restart")
do.SchemaValidator.Stop()
err := do.mustRestartSyncer()
if err != nil {
log.Errorf("[ddl] reload schema in loop, schema syncer restart err %v", errors.ErrorStack(err))
break
}
do.SchemaValidator.Restart()
log.Info("[ddl] schema syncer restarted.")
case <-do.info.Done():
log.Info("[ddl] reload schema in loop, server info syncer need restart")
do.info.Restart(context.Background())
log.Info("[ddl] server info syncer restarted.")
case <-do.exit:
return
}
Expand Down Expand Up @@ -527,12 +529,19 @@ func NewDomain(store kv.Storage, ddlLease time.Duration, statsLease time.Duratio
func (do *Domain) Init(ddlLease time.Duration, sysFactory func(*Domain) (pools.Resource, error)) error {
if ebd, ok := do.store.(EtcdBackend); ok {
if addrs := ebd.EtcdAddrs(); addrs != nil {
cfg := config.GetGlobalConfig()
cli, err := clientv3.New(clientv3.Config{
Endpoints: addrs,
DialTimeout: 5 * time.Second,
DialOptions: []grpc.DialOption{
grpc.WithUnaryInterceptor(grpc_prometheus.UnaryClientInterceptor),
grpc.WithStreamInterceptor(grpc_prometheus.StreamClientInterceptor),
grpc.WithBackoffMaxDelay(time.Second * 3),
grpc.WithKeepaliveParams(keepalive.ClientParameters{
Time: time.Duration(cfg.TiKVClient.GrpcKeepAliveTime) * time.Second,
Timeout: time.Duration(cfg.TiKVClient.GrpcKeepAliveTimeout) * time.Second,
PermitWithoutStream: true,
}),
},
TLS: ebd.TLSConfig(),
})
Expand Down

0 comments on commit 6a1e94f

Please sign in to comment.