Skip to content

Commit

Permalink
ebs br: provide fsr warmup to tikv data volumes (pingcap#47272)
Browse files Browse the repository at this point in the history
  • Loading branch information
BornChanger authored Oct 27, 2023
1 parent 9f88af5 commit 30288c7
Show file tree
Hide file tree
Showing 6 changed files with 184 additions and 12 deletions.
148 changes: 147 additions & 1 deletion br/pkg/aws/ebs.go
Original file line number Diff line number Diff line change
Expand Up @@ -281,6 +281,152 @@ func (e *EC2Session) DeleteSnapshots(snapIDMap map[string]string) {
log.Info("delete snapshot end", zap.Int("need-to-del", len(snapIDMap)), zap.Int32("deleted", deletedCnt.Load()))
}

// EnableDataFSR enables FSR for data volume snapshots
func (e *EC2Session) EnableDataFSR(meta *config.EBSBasedBRMeta, targetAZ string) (map[string][]*string, error) {
snapshotsIDsMap := fetchTargetSnapshots(meta, targetAZ)

if len(snapshotsIDsMap) == 0 {
return snapshotsIDsMap, errors.Errorf("empty backup meta")
}

eg, _ := errgroup.WithContext(context.Background())

for availableZone := range snapshotsIDsMap {
targetAZ := availableZone
eg.Go(func() error {
log.Info("enable fsr for snapshots", zap.String("available zone", targetAZ))
resp, err := e.ec2.EnableFastSnapshotRestores(&ec2.EnableFastSnapshotRestoresInput{
AvailabilityZones: []*string{&targetAZ},
SourceSnapshotIds: snapshotsIDsMap[targetAZ],
})

if err != nil {
return errors.Trace(err)
}

if len(resp.Unsuccessful) > 0 {
log.Warn("not all snapshots enabled FSR")
return errors.Errorf("Some snapshot fails to enable FSR for available zone %s, such as %s, error code is %v", targetAZ, *resp.Unsuccessful[0].SnapshotId, resp.Unsuccessful[0].FastSnapshotRestoreStateErrors)
}

return e.waitDataFSREnabled(snapshotsIDsMap[targetAZ], targetAZ)
})
}
return snapshotsIDsMap, eg.Wait()
}

// waitDataFSREnabled waits FSR for data volume snapshots are all enabled
func (e *EC2Session) waitDataFSREnabled(snapShotIDs []*string, targetAZ string) error {
// Create a map to store the strings as keys
pendingSnapshots := make(map[string]struct{})

// Populate the map with the strings from the array
for _, str := range snapShotIDs {
pendingSnapshots[*str] = struct{}{}
}

log.Info("starts check fsr pending snapshots", zap.Any("snapshots", pendingSnapshots), zap.String("available zone", targetAZ))
for {
if len(pendingSnapshots) == 0 {
log.Info("all snapshots fsr enablement is finished", zap.String("available zone", targetAZ))
return nil
}

// check pending snapshots every 1 minute
time.Sleep(1 * time.Minute)
log.Info("check snapshots not fsr enabled", zap.Int("count", len(pendingSnapshots)))
input := &ec2.DescribeFastSnapshotRestoresInput{
Filters: []*ec2.Filter{
{
Name: aws.String("state"),
Values: []*string{aws.String("disabled"), aws.String("disabling"), aws.String("enabling"), aws.String("optimizing")},
},
{
Name: aws.String("availability-zone"),
Values: []*string{aws.String(targetAZ)},
},
},
}

result, err := e.ec2.DescribeFastSnapshotRestores(input)
if err != nil {
return errors.Trace(err)
}

uncompletedSnapshots := make(map[string]struct{})
for _, fastRestore := range result.FastSnapshotRestores {
_, found := pendingSnapshots[*fastRestore.SnapshotId]
if found {
// Detect some conflict states
if strings.EqualFold(*fastRestore.State, "disabled") || strings.EqualFold(*fastRestore.State, "disabling") {
log.Error("detect conflict status", zap.String("snapshot", *fastRestore.SnapshotId), zap.String("status", *fastRestore.State))
return errors.Errorf("status of snapshot %s is %s ", *fastRestore.SnapshotId, *fastRestore.State)
}
uncompletedSnapshots[*fastRestore.SnapshotId] = struct{}{}
}
}
pendingSnapshots = uncompletedSnapshots
}
}

// DisableDataFSR disables FSR for data volume snapshots
func (e *EC2Session) DisableDataFSR(snapshotsIDsMap map[string][]*string) error {
if len(snapshotsIDsMap) == 0 {
return nil
}

eg, _ := errgroup.WithContext(context.Background())

for availableZone := range snapshotsIDsMap {
targetAZ := availableZone
eg.Go(func() error {
resp, err := e.ec2.DisableFastSnapshotRestores(&ec2.DisableFastSnapshotRestoresInput{
AvailabilityZones: []*string{&targetAZ},
SourceSnapshotIds: snapshotsIDsMap[targetAZ],
})

if err != nil {
return errors.Trace(err)
}

if len(resp.Unsuccessful) > 0 {
log.Warn("not all snapshots disabled FSR", zap.String("available zone", targetAZ))
return errors.Errorf("Some snapshot fails to disable FSR for available zone %s, such as %s, error code is %v", targetAZ, *resp.Unsuccessful[0].SnapshotId, resp.Unsuccessful[0].FastSnapshotRestoreStateErrors)
}

log.Info("Disable FSR issued", zap.String("available zone", targetAZ))

return nil
})
}
return eg.Wait()
}

func fetchTargetSnapshots(meta *config.EBSBasedBRMeta, specifiedAZ string) map[string][]*string {
var sourceSnapshotIDs = make(map[string][]*string)

if len(meta.TiKVComponent.Stores) == 0 {
return sourceSnapshotIDs
}

for i := range meta.TiKVComponent.Stores {
store := meta.TiKVComponent.Stores[i]
for j := range store.Volumes {
oldVol := store.Volumes[j]
// Handle data volume snapshots only
if strings.Compare(oldVol.Type, "storage.data-dir") == 0 {
if specifiedAZ != "" {
sourceSnapshotIDs[specifiedAZ] = append(sourceSnapshotIDs[specifiedAZ], &oldVol.SnapshotID)
} else {
sourceSnapshotIDs[oldVol.VolumeAZ] = append(sourceSnapshotIDs[oldVol.VolumeAZ], &oldVol.SnapshotID)
}
}
}
}

return sourceSnapshotIDs
}

// CreateVolumes create volumes from snapshots
// if err happens in the middle, return half-done result
// returned map: store id -> old volume id -> new volume id
Expand Down Expand Up @@ -377,7 +523,7 @@ func (e *EC2Session) WaitVolumesCreated(volumeIDMap map[string]string, progress
for len(pendingVolumes) > 0 {
// check every 5 seconds
time.Sleep(5 * time.Second)
log.Info("check pending snapshots", zap.Int("count", len(pendingVolumes)))
log.Info("check pending volumes", zap.Int("count", len(pendingVolumes)))
resp, err := e.ec2.DescribeVolumes(&ec2.DescribeVolumesInput{
VolumeIds: pendingVolumes,
})
Expand Down
8 changes: 8 additions & 0 deletions br/pkg/config/ebs.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,14 @@ func (c *EBSBasedBRMeta) GetStoreCount() uint64 {
return uint64(len(c.TiKVComponent.Stores))
}

func (c *EBSBasedBRMeta) GetTiKVVolumeCount() uint64 {
if c.TiKVComponent == nil || len(c.TiKVComponent.Stores) == 0 {
return 0
}
// Assume TiKV nodes are symmetric
return uint64(len(c.TiKVComponent.Stores[0].Volumes))
}

func (c *EBSBasedBRMeta) String() string {
cfg, err := json.Marshal(c)
if err != nil {
Expand Down
1 change: 1 addition & 0 deletions br/pkg/task/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ const (
flagDryRun = "dry-run"
// TODO used for local test, should be removed later
flagSkipAWS = "skip-aws"
flagUseFSR = "use-fsr"
flagCloudAPIConcurrency = "cloud-api-concurrency"
flagWithSysTable = "with-sys-table"
flagOperatorPausedGCAndSchedulers = "operator-paused-gc-and-scheduler"
Expand Down
7 changes: 7 additions & 0 deletions br/pkg/task/restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,7 @@ func DefineRestoreCommonFlags(flags *pflag.FlagSet) {
"batch size for ddl to create a batch of tables once.")
flags.Bool(flagWithSysTable, false, "whether restore system privilege tables on default setting")
flags.StringArrayP(FlagResetSysUsers, "", []string{"cloud_admin", "root"}, "whether reset these users after restoration")
flags.Bool(flagUseFSR, false, "whether enable FSR for AWS snapshots")
_ = flags.MarkHidden(FlagResetSysUsers)
_ = flags.MarkHidden(FlagMergeRegionSizeBytes)
_ = flags.MarkHidden(FlagMergeRegionKeyCount)
Expand Down Expand Up @@ -218,6 +219,7 @@ type RestoreConfig struct {
VolumeThroughput int64 `json:"volume-throughput" toml:"volume-throughput"`
ProgressFile string `json:"progress-file" toml:"progress-file"`
TargetAZ string `json:"target-az" toml:"target-az"`
UseFSR bool `json:"use-fsr" toml:"use-fsr"`
}

// DefineRestoreFlags defines common flags for the restore tidb command.
Expand Down Expand Up @@ -391,6 +393,11 @@ func (cfg *RestoreConfig) ParseFromFlags(flags *pflag.FlagSet) error {
return errors.Trace(err)
}

cfg.UseFSR, err = flags.GetBool(flagUseFSR)
if err != nil {
return errors.Trace(err)
}

// iops: gp3 [3,000-16,000]; io1/io2 [100-32,000]
// throughput: gp3 [125, 1000]; io1/io2 cannot set throughput
// io1 and io2 volumes support up to 64,000 IOPS only on Instances built on the Nitro System.
Expand Down
10 changes: 2 additions & 8 deletions br/pkg/task/restore_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,23 +159,17 @@ func RunResolveKvData(c context.Context, g glue.Glue, cmdName string, cfg *Resto

//TODO: restore volume type into origin type
//ModifyVolume(*ec2.ModifyVolumeInput) (*ec2.ModifyVolumeOutput, error) by backupmeta
// this is used for cloud restoration

err = client.Init(g, mgr.GetStorage())
if err != nil {
return errors.Trace(err)
}
defer client.Close()
log.Info("start to clear system user for cloud")
err = client.ClearSystemUsers(ctx, cfg.ResetSysUsers)

if err != nil {
return errors.Trace(err)
}

// since we cannot reset tiflash automaticlly. so we should start it manually
if err = client.ResetTiFlashReplicas(ctx, g, mgr.GetStorage()); err != nil {
return errors.Trace(err)
}

progress.Close()
summary.CollectDuration("restore duration", time.Since(startAll))
summary.SetSuccessStatus(true)
Expand Down
22 changes: 19 additions & 3 deletions br/pkg/task/restore_ebs_meta.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,10 +175,10 @@ func (h *restoreEBSMetaHelper) restore() error {
return errors.Trace(err)
}

storeCount := h.metaInfo.GetStoreCount()
progress := h.g.StartProgress(ctx, h.cmdName, int64(storeCount), !h.cfg.LogProgress)
volumeCount := h.metaInfo.GetStoreCount() * h.metaInfo.GetTiKVVolumeCount()
progress := h.g.StartProgress(ctx, h.cmdName, int64(volumeCount), !h.cfg.LogProgress)
defer progress.Close()
go progressFileWriterRoutine(ctx, progress, int64(storeCount), h.cfg.ProgressFile)
go progressFileWriterRoutine(ctx, progress, int64(volumeCount), h.cfg.ProgressFile)

resolvedTs = h.metaInfo.ClusterInfo.ResolvedTS
if totalSize, err = h.doRestore(ctx, progress); err != nil {
Expand Down Expand Up @@ -226,6 +226,8 @@ func (h *restoreEBSMetaHelper) restoreVolumes(progress glue.Progress) (map[strin
volumeIDMap = make(map[string]string)
err error
totalSize int64
// a map whose key is available zone, and value is the snapshot id array
snapshotsIDsMap = make(map[string][]*string)
)
ec2Session, err = aws.NewEC2Session(h.cfg.CloudAPIConcurrency, h.cfg.S3.Region)
if err != nil {
Expand All @@ -236,7 +238,21 @@ func (h *restoreEBSMetaHelper) restoreVolumes(progress glue.Progress) (map[strin
log.Error("failed to create all volumes, cleaning up created volume")
ec2Session.DeleteVolumes(volumeIDMap)
}

if h.cfg.UseFSR {
err = ec2Session.DisableDataFSR(snapshotsIDsMap)
log.Error("disable fsr failed", zap.Error(err))
}
}()

// Turn on FSR for TiKV data snapshots
if h.cfg.UseFSR {
snapshotsIDsMap, err = ec2Session.EnableDataFSR(h.metaInfo, h.cfg.TargetAZ)
if err != nil {
return nil, 0, errors.Trace(err)
}
}

volumeIDMap, err = ec2Session.CreateVolumes(h.metaInfo,
string(h.cfg.VolumeType), h.cfg.VolumeIOPS, h.cfg.VolumeThroughput, h.cfg.TargetAZ)
if err != nil {
Expand Down

0 comments on commit 30288c7

Please sign in to comment.