Skip to content

Commit

Permalink
lightning: support inject external storage when as library (pingcap#3…
Browse files Browse the repository at this point in the history
  • Loading branch information
lance6716 authored Mar 25, 2022
1 parent 5a14cc3 commit 96a507e
Show file tree
Hide file tree
Showing 11 changed files with 247 additions and 58 deletions.
2 changes: 1 addition & 1 deletion br/cmd/tidb-lightning/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ func main() {
if err := cfg.LoadFromGlobal(globalCfg); err != nil {
return err
}
return app.RunOnce(context.Background(), cfg, nil)
return app.RunOnceWithOptions(context.Background(), cfg)
}()

finished := true
Expand Down
39 changes: 28 additions & 11 deletions br/pkg/lightning/checkpoints/checkpoints.go
Original file line number Diff line number Diff line change
Expand Up @@ -954,24 +954,23 @@ type FileCheckpointsDB struct {
exStorage storage.ExternalStorage
}

func NewFileCheckpointsDB(ctx context.Context, path string) (*FileCheckpointsDB, error) {
func newFileCheckpointsDB(
ctx context.Context,
path string,
exStorage storage.ExternalStorage,
fileName string,
) (*FileCheckpointsDB, error) {
cpdb := &FileCheckpointsDB{
path: path,
checkpoints: checkpointspb.CheckpointsModel{
TaskCheckpoint: &checkpointspb.TaskCheckpointModel{},
Checkpoints: map[string]*checkpointspb.TableCheckpointModel{},
},
ctx: ctx,
path: path,
fileName: fileName,
exStorage: exStorage,
}

// init ExternalStorage
s, fileName, err := createExstorageByCompletePath(ctx, path)
if err != nil {
return nil, errors.Trace(err)
}
cpdb.ctx = ctx
cpdb.fileName = fileName
cpdb.exStorage = s

if cpdb.fileName == "" {
return nil, errors.Errorf("the checkpoint DSN '%s' must not be a directory", path)
}
Expand Down Expand Up @@ -1013,6 +1012,24 @@ func NewFileCheckpointsDB(ctx context.Context, path string) (*FileCheckpointsDB,
return cpdb, nil
}

func NewFileCheckpointsDB(ctx context.Context, path string) (*FileCheckpointsDB, error) {
// init ExternalStorage
s, fileName, err := createExstorageByCompletePath(ctx, path)
if err != nil {
return nil, errors.Trace(err)
}
return newFileCheckpointsDB(ctx, path, s, fileName)
}

func NewFileCheckpointsDBWithExstorageFileName(
ctx context.Context,
path string,
s storage.ExternalStorage,
fileName string,
) (*FileCheckpointsDB, error) {
return newFileCheckpointsDB(ctx, path, s, fileName)
}

// createExstorageByCompletePath create ExternalStorage by completePath and return fileName.
func createExstorageByCompletePath(ctx context.Context, completePath string) (storage.ExternalStorage, string, error) {
if completePath == "" {
Expand Down
86 changes: 74 additions & 12 deletions br/pkg/lightning/lightning.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"sync"
"time"

"github.com/google/uuid"
"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/kvproto/pkg/import_sstpb"
Expand All @@ -49,8 +50,6 @@ import (
"github.com/pingcap/tidb/br/pkg/storage"
"github.com/pingcap/tidb/br/pkg/utils"
"github.com/pingcap/tidb/br/pkg/version/build"

"github.com/google/uuid"
"github.com/prometheus/client_golang/prometheus/promhttp"
"github.com/shurcooL/httpgzip"
"go.uber.org/zap"
Expand Down Expand Up @@ -188,6 +187,7 @@ func (l *Lightning) goServe(statusAddr string, realAddrWriter io.Writer) error {
// use a default glue later.
// - for lightning as a library, taskCtx could be a meaningful context that get canceled outside, and glue could be a
// caller implemented glue.
// deprecated: use RunOnceWithOptions instead.
func (l *Lightning) RunOnce(taskCtx context.Context, taskCfg *config.Config, glue glue.Glue) error {
if err := taskCfg.Adjust(taskCtx); err != nil {
return err
Expand All @@ -198,7 +198,7 @@ func (l *Lightning) RunOnce(taskCtx context.Context, taskCfg *config.Config, glu
taskCfg.TaskID = int64(val.(int))
})

return l.run(taskCtx, taskCfg, glue)
return l.run(taskCtx, taskCfg, &options{glue: glue})
}

func (l *Lightning) RunServer() error {
Expand All @@ -223,12 +223,60 @@ func (l *Lightning) RunServer() error {
}
}

// RunOnceWithOptions is used by binary lightning and host when using lightning as a library.
// - for binary lightning, taskCtx could be context.Background which means taskCtx wouldn't be canceled directly by its
// cancel function, but only by Lightning.Stop or HTTP DELETE using l.cancel. No need to set Options
// - for lightning as a library, taskCtx could be a meaningful context that get canceled outside, and there Options may
// be used:
// - WithGlue: set a caller implemented glue. Otherwise, lightning will use a default glue later.
// - WithDumpFileStorage: caller has opened an external storage for lightning. Otherwise, lightning will open a
// storage by config
// - WithCheckpointStorage: caller has opened an external storage for lightning and want to save checkpoint
// in it. Otherwise, lightning will save checkpoint by the Checkpoint.DSN in config
func (l *Lightning) RunOnceWithOptions(taskCtx context.Context, taskCfg *config.Config, opts ...Option) error {
o := &options{}
for _, opt := range opts {
opt(o)
}

failpoint.Inject("setExtStorage", func(val failpoint.Value) {
path := val.(string)
b, err := storage.ParseBackend(path, nil)
if err != nil {
panic(err)
}
s, err := storage.New(context.Background(), b, &storage.ExternalStorageOptions{})
if err != nil {
panic(err)
}
o.dumpFileStorage = s
o.checkpointStorage = s
})
failpoint.Inject("setCheckpointName", func(val failpoint.Value) {
file := val.(string)
o.checkpointName = file
})

if o.dumpFileStorage != nil {
// we don't use it, set a value to pass Adjust
taskCfg.Mydumper.SourceDir = "noop://"
}

if err := taskCfg.Adjust(taskCtx); err != nil {
return err
}

taskCfg.TaskID = time.Now().UnixNano()

return l.run(taskCtx, taskCfg, o)
}

var (
taskRunNotifyKey = "taskRunNotifyKey"
taskCfgRecorderKey = "taskCfgRecorderKey"
)

func (l *Lightning) run(taskCtx context.Context, taskCfg *config.Config, g glue.Glue) (err error) {
func (l *Lightning) run(taskCtx context.Context, taskCfg *config.Config, o *options) (err error) {
build.LogInfo(build.Lightning)
log.L().Info("cfg", zap.Stringer("cfg", taskCfg))

Expand Down Expand Up @@ -279,6 +327,7 @@ func (l *Lightning) run(taskCtx context.Context, taskCfg *config.Config, g glue.

// initiation of default glue should be after RegisterMySQL, which is ready to be called after taskCfg.Adjust
// and also put it here could avoid injecting another two SkipRunTask failpoint to caller
g := o.glue
if g == nil {
db, err := restore.DBFromConfig(ctx, taskCfg.TiDB)
if err != nil {
Expand All @@ -287,13 +336,16 @@ func (l *Lightning) run(taskCtx context.Context, taskCfg *config.Config, g glue.
g = glue.NewExternalTiDBGlue(db, taskCfg.TiDB.SQLMode)
}

u, err := storage.ParseBackend(taskCfg.Mydumper.SourceDir, nil)
if err != nil {
return common.NormalizeError(err)
}
s, err := storage.New(ctx, u, &storage.ExternalStorageOptions{})
if err != nil {
return common.NormalizeError(err)
s := o.dumpFileStorage
if s == nil {
u, err := storage.ParseBackend(taskCfg.Mydumper.SourceDir, nil)
if err != nil {
return common.NormalizeError(err)
}
s, err = storage.New(ctx, u, &storage.ExternalStorageOptions{})
if err != nil {
return common.NormalizeError(err)
}
}

// return expectedErr means at least meet one file
Expand Down Expand Up @@ -333,7 +385,17 @@ func (l *Lightning) run(taskCtx context.Context, taskCfg *config.Config, g glue.

var procedure *restore.Controller

procedure, err = restore.NewRestoreController(ctx, dbMetas, taskCfg, &l.status, s, g)
param := &restore.ControllerParam{
DBMetas: dbMetas,
Status: &l.status,
DumpFileStorage: s,
OwnExtStorage: o.dumpFileStorage == nil,
Glue: g,
CheckpointStorage: o.checkpointStorage,
CheckpointName: o.checkpointName,
}

procedure, err = restore.NewRestoreController(ctx, taskCfg, param)
if err != nil {
log.L().Error("restore failed", log.ShortError(err))
return errors.Trace(err)
Expand Down
7 changes: 4 additions & 3 deletions br/pkg/lightning/lightning_serial_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,13 +54,14 @@ func TestRun(t *testing.T) {
cfg := config.NewConfig()
err := cfg.LoadFromGlobal(globalConfig)
require.NoError(t, err)
err = lightning.RunOnce(context.Background(), cfg, nil)
err = lightning.RunOnceWithOptions(context.Background(), cfg)
require.Error(t, err)
require.Regexp(t, "`mydumper.data-source-dir` does not exist$", err.Error())

path, _ := filepath.Abs(".")
ctx := context.Background()
invalidGlue := glue.NewExternalTiDBGlue(nil, 0)
o := &options{glue: invalidGlue}
err = lightning.run(ctx, &config.Config{
Mydumper: config.MydumperRuntime{
SourceDir: "file://" + filepath.ToSlash(path),
Expand All @@ -71,7 +72,7 @@ func TestRun(t *testing.T) {
Enable: true,
Driver: "invalid",
},
}, invalidGlue)
}, o)
require.EqualError(t, err, "[Lightning:Checkpoint:ErrUnknownCheckpointDriver]unknown checkpoint driver 'invalid'")

err = lightning.run(ctx, &config.Config{
Expand All @@ -84,7 +85,7 @@ func TestRun(t *testing.T) {
Driver: "file",
DSN: "any-file",
},
}, invalidGlue)
}, o)
require.Error(t, err)
}

Expand Down
2 changes: 1 addition & 1 deletion br/pkg/lightning/lightning_server_serial_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -317,7 +317,7 @@ func TestHTTPAPIOutsideServerMode(t *testing.T) {
err := cfg.LoadFromGlobal(s.lightning.globalCfg)
require.NoError(t, err)
go func() {
errCh <- s.lightning.RunOnce(s.lightning.ctx, cfg, nil)
errCh <- s.lightning.RunOnceWithOptions(s.lightning.ctx, cfg)
}()
time.Sleep(600 * time.Millisecond)

Expand Down
8 changes: 3 additions & 5 deletions br/pkg/lightning/restore/check_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,6 @@ import (
"github.com/docker/go-units"
"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"go.uber.org/zap"
"golang.org/x/sync/errgroup"
"modernc.org/mathutil"

"github.com/pingcap/kvproto/pkg/metapb"
"github.com/pingcap/tidb/br/pkg/lightning/backend"
"github.com/pingcap/tidb/br/pkg/lightning/backend/kv"
Expand All @@ -52,6 +48,9 @@ import (
"github.com/pingcap/tidb/table"
"github.com/pingcap/tidb/table/tables"
"github.com/pingcap/tidb/types"
"go.uber.org/zap"
"golang.org/x/sync/errgroup"
"modernc.org/mathutil"
)

const (
Expand Down Expand Up @@ -348,7 +347,6 @@ func (rc *Controller) checkClusterRegion(ctx context.Context) error {
}

// StoragePermission checks whether Lightning has enough permission to storage.
// this test cannot be skipped.
func (rc *Controller) StoragePermission(ctx context.Context) error {
passed := true
message := "Lightning has the correct storage permission"
Expand Down
Loading

0 comments on commit 96a507e

Please sign in to comment.