Skip to content

Commit

Permalink
feature: support new baseline files (erda-project#1360)
Browse files Browse the repository at this point in the history
* feature: support new baseline files

* feature: advance log
  • Loading branch information
dspo authored Aug 6, 2021
1 parent 8e89bf4 commit 1142c50
Show file tree
Hide file tree
Showing 5 changed files with 135 additions and 143 deletions.
194 changes: 55 additions & 139 deletions pkg/database/sqlparser/migrator/migrator.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ package migrator
import (
"context"
"fmt"
"path/filepath"
"strings"
"time"

Expand Down Expand Up @@ -170,15 +169,6 @@ func (mig *Migrator) newInstallation() (err error) {
logrus.Infoln("OK")
}

// pre-migrate schema SQLs
if !mig.SkipPreMigrate() && !mig.SkipMigrate() {
//logrus.Infoln("DO PRE-MIGRATION...")
//if err = mig.preMigrate(ctx); err != nil {
// return err
//}
//logrus.Infoln("OK")
}

// migrate
if !mig.SkipMigrate() {
logrus.Infoln("DO MIGRATION...")
Expand Down Expand Up @@ -235,15 +225,6 @@ func (mig *Migrator) normalUpdate() (err error) {
logrus.Infoln("OK")
}

// pre migrate data
if !mig.SkipPreMigrate() && !mig.SkipMigrate() {
//logrus.Infoln("DO PRE-MIGRATION....")
//if err = mig.preMigrate(ctx); err != nil {
// return err
//}
//logrus.Infoln("PRE-MIGRATE OK")
}

// migrate data
if !mig.SkipMigrate() {
logrus.Infoln("DO MIGRATION....")
Expand Down Expand Up @@ -300,45 +281,24 @@ func (mig *Migrator) migrateSandbox(ctx context.Context) (err error) {
logrus.Infoln(tableName)
}

if mig.installingType == firstTimeUpdate {
reason, ok := mig.compareSchemas(mig.SandBox())
if mig.installingType != firstTimeInstall {
modules := mig.LocalScripts.FreshBaselineModules(mig.DB())
reason, ok := compareSchemas(mig.SandBox(), modules)
if !ok {
logrus.Warnf("local schema is not equal with cloud schema, try to resolve it:\n%s", reason)
if err := mig.patchBeforeMigrating(mig.SandBox(), []string{patchInit}); err != nil {
return errors.Wrap(err, "failed to patch init")
}
reason, ok := mig.compareSchemas(mig.SandBox())
reason, ok := compareSchemas(mig.SandBox(), modules)
if !ok {
return errors.Errorf("local base schema is not equal with cloud schema:\n%s", reason)
}
}

// record base
logrus.Infoln("RECORD BASE... ..")
new(HistoryModel).CreateTable(mig.SandBox())
now := time.Now()
for moduleName, module := range mig.LocalScripts.Services {
for i := range module.Scripts {
module.Scripts[i].Pending = true
if module.Scripts[i].IsBaseline() {
module.Scripts[i].Pending = false
record := HistoryModel{
ID: 0,
CreatedAt: now,
UpdatedAt: now,
ServiceName: moduleName,
Filename: filepath.Base(module.Scripts[i].GetName()),
Checksum: module.Scripts[i].Checksum(),
InstalledBy: "",
InstalledOn: "",
LanguageType: string(module.Scripts[i].Type),
Reversed: ddlreverser.ReverseCreateTableStmts(module.Scripts[i]),
}
if err := mig.SandBox().Create(&record).Error; err != nil {
return err
}
}
}
if err := recordModules(mig.SandBox(), modules); err != nil {
return errors.Wrapf(err, "failed to record base after comparing")
}
}

Expand All @@ -354,6 +314,9 @@ func (mig *Migrator) migrateSandbox(ctx context.Context) (err error) {
// install every module
for moduleName, module := range mig.LocalScripts.Services {
for _, script := range module.Scripts {
logrus.WithField("module", moduleName).WithField("script", script.GetName()).
WithField("to install", !script.Pending).
Infoln("[sandbox]")
if !script.Pending {
continue
}
Expand Down Expand Up @@ -383,107 +346,27 @@ func (mig *Migrator) migrateSandbox(ctx context.Context) (err error) {
return err
}

// pre migrate data SQLs, all applied in this runtime will be rollback
func (mig *Migrator) preMigrate(ctx context.Context) (err error) {
if err = mig.destructiveLint(); err != nil {
return err
}

// finally roll all DDL back
defer func() {
logrus.Infoln(" REVERSE PRE-MIGRATIONS: all schema migration")
if err := mig.reverse(mig.reversing, true); err != nil {
logrus.Fatalln(err)
}
}()

mig.reversing = nil

files, err := retrievePatchesFiles(ctx)
if err != nil {
return err
}
if err = mig.patchBeforeMigrating(mig.DB(), files); err != nil {
return errors.Wrapf(err, "failed to patch before migrating in pre-migration")
}

// install every module
for moduleName, module := range mig.LocalScripts.Services {
for _, script := range module.Scripts {
if !script.Pending {
continue
}

switch script.Type {
case ScriptTypeSQL:
after := func(tx *gorm.DB, err error) {
logrus.WithField("module name", moduleName).
WithField("script name", script.GetName()).
Infoln(" ROLLBACK PRE-MIGRATIONS: current script data migration")
tx.Rollback()
}
tx := mig.DB().Begin()
if err := mig.installSQL(script, mig.DB(), tx, after); err != nil {
return errors.Wrapf(err, "failed to pre-migrate, module name: %s, script name: %s, type: %s",
moduleName, script.GetName(), ScriptTypeSQL)
}
if err := mig.patchSQLAfterInstalling(script, mig.DB()); err != nil {
return errors.Wrapf(err, "failed to patch after pre-migreating, module name: %s, script name: %s, type: %s",
moduleName, script.GetName(), ScriptTypeSQL)
}
case ScriptTypePython:
if err := mig.installPy(script, module, mig.dbSettings, false); err != nil {
return errors.Wrapf(err, "failed to pre-migrate: %+v",
map[string]interface{}{"module name": moduleName, "script name": script.GetName(), "type": ScriptTypePython})
}
}
}
}

return nil
}

func (mig *Migrator) migrate(ctx context.Context) error {
now := time.Now()

if mig.installingType == firstTimeUpdate {
_, ok := mig.compareSchemas(mig.DB())
if mig.installingType != firstTimeInstall {
modules := mig.LocalScripts.FreshBaselineModules(mig.DB())
reason, ok := compareSchemas(mig.DB(), modules)
if !ok {
if err := mig.patchBeforeMigrating(mig.DB(), []string{patchInit}); err != nil {
logrus.Warnf("local schema is not equal with cloud schema, try to resolve it:\n%s", reason)
if err := mig.patchBeforeMigrating(mig.SandBox(), []string{patchInit}); err != nil {
return errors.Wrap(err, "failed to patch init")
}
reason, ok := mig.compareSchemas(mig.DB())
reason, ok := compareSchemas(mig.DB(), modules)
if !ok {
return errors.Errorf("local base schema is not equal with cloud schema: %s", reason)
return errors.Errorf("local base schema is not equal with cloud schema:\n%s", reason)
}
}

// record base
logrus.Infoln("RECORD BASE... ..")
new(HistoryModel).CreateTable(mig.DB())
now := time.Now()
for moduleName, module := range mig.LocalScripts.Services {
for i := range module.Scripts {
module.Scripts[i].Pending = true
if module.Scripts[i].IsBaseline() {
module.Scripts[i].Pending = false
record := HistoryModel{
ID: 0,
CreatedAt: now,
UpdatedAt: now,
ServiceName: moduleName,
Filename: filepath.Base(module.Scripts[i].GetName()),
Checksum: module.Scripts[i].Checksum(),
InstalledBy: "",
InstalledOn: "",
LanguageType: string(module.Scripts[i].Type),
Reversed: ddlreverser.ReverseCreateTableStmts(module.Scripts[i]),
}
if err := mig.DB().Create(&record).Error; err != nil {
return err
}
}
}
if err := recordModules(mig.DB(), modules); err != nil {
return errors.Wrapf(err, "failed to record base after comparing")
}
}

Expand Down Expand Up @@ -686,14 +569,19 @@ func (mig *Migrator) destructiveLint() error {
return nil
}

func (mig *Migrator) compareSchemas(db *gorm.DB) (string, bool) {
logrus.Infoln("compare local schema and cloud schema for every service...")
func compareSchemas(db *gorm.DB, modules map[string]*Module) (string, bool) {
logrus.Infoln("compare local schema and cloud schema for baseline ...")
if len(modules) == 0 {
logrus.Infoln("no new baseline file, exit comparing")
return "", true
}
logrus.Infoln("there are new baseline files in some module, to compare them")
var (
reasons string
eq = true
)
for modName, service := range mig.LocalScripts.Services {
equal := service.BaselineEqualCloud(db)
for modName, module := range modules {
equal := module.Schema().EqualWith(db)
if !equal.Equal() {
eq = false
reasons += fmt.Sprintf("module name: %s:\n%s", modName, equal.Reason())
Expand All @@ -702,6 +590,34 @@ func (mig *Migrator) compareSchemas(db *gorm.DB) (string, bool) {
return reasons, eq
}

func recordModules(db *gorm.DB, modules map[string]*Module) error {
new(HistoryModel).CreateTable(db)
now := time.Now()
for moduleName, module := range modules {
for i := 0; i < len(module.Scripts); i++ {
module.Scripts[i].Pending = false
record := HistoryModel{
ID: 0,
CreatedAt: now,
UpdatedAt: now,
ServiceName: moduleName,
Filename: module.Scripts[i].GetName(),
Checksum: module.Scripts[i].Checksum(),
InstalledBy: "",
InstalledOn: "",
LanguageType: string(module.Scripts[i].Type),
Reversed: ddlreverser.ReverseCreateTableStmts(module.Scripts[i]),
}
if err := db.Create(&record).Error; err != nil {
return errors.Wrapf(err, "failed to record module, module name: %s, script name: %s",
moduleName, module.Scripts[i].GetName())
}
}
}

return nil
}

func retrievePatchesFiles(ctx context.Context) ([]string, error) {
value := ctx.Value(patchesKey)
if value == nil {
Expand Down
37 changes: 37 additions & 0 deletions pkg/database/sqlparser/migrator/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,9 @@ package migrator
import (
"fmt"

"github.com/pingcap/parser"
"github.com/pingcap/parser/ast"
"gorm.io/gorm"
)

// Schema is the set of TableDefinitions.
Expand Down Expand Up @@ -85,3 +87,38 @@ func (s *Schema) Equal(o *Schema) *Equal {

return &Equal{equal: eq, reason: reasons}
}

func (s *Schema) EqualWith(db *gorm.DB) *Equal {
if len(s.TableDefinitions) == 0 {
return &Equal{equal: true}
}

cloud := NewSchema()
for tableName := range s.TableDefinitions {
raw := "SHOW CREATE TABLE " + tableName
this := db.Raw(raw)
if err := this.Error; err != nil {
return &Equal{
equal: false,
reason: fmt.Sprintf("failed to exec %s", raw),
}
}
var _ig, create string
if err := this.Row().Scan(&_ig, &create); err != nil {
return &Equal{
equal: false,
reason: fmt.Sprintf("failed to Scan create table stmt, raw: %s", raw),
}
}
node, err := parser.New().ParseOneStmt(create, "", "")
if err != nil {
return &Equal{
equal: false,
reason: fmt.Sprintf("failed to ParseOneStmt: %s, raw: %s", create, raw),
}
}
node.Accept(cloud)
}

return s.Equal(cloud)
}
12 changes: 12 additions & 0 deletions pkg/database/sqlparser/migrator/scripts.go
Original file line number Diff line number Diff line change
Expand Up @@ -270,6 +270,7 @@ func (s *Scripts) InstalledChangesLint(ctx *context.Context, db *gorm.DB) error
logrus.Infof("found patch file and append it to the list, filename: %s", filename)
patchesList = append(patchesList, filename)
} else {
logrus.Errorf("missing path file, filename: %s", filename)
missingPatchesList = append(missingPatchesList, filepath.Join(moduleName, script.GetName()))
}
}
Expand Down Expand Up @@ -337,3 +338,14 @@ func (s *Scripts) HasDestructiveOperationInPending() (string, bool) {

return "", false
}

func (s *Scripts) FreshBaselineModules(db *gorm.DB) map[string]*Module {
var modules = make(map[string]*Module)
for name, mod := range s.Services {
mod := mod.FilterFreshBaseline(db)
if len(mod.Scripts) > 0 {
modules[name] = mod
}
}
return modules
}
33 changes: 30 additions & 3 deletions pkg/database/sqlparser/migrator/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,12 +126,16 @@ func (m *Module) BaselineTableNames() []string {

func (m *Module) Sort() {
sort.Slice(m.Scripts, func(i, j int) bool {
if m.Scripts[i].IsBaseline() && !m.Scripts[j].IsBaseline() {
return true
}
if !m.Scripts[i].IsBaseline() && m.Scripts[j].IsBaseline() {
return false
}

return strings.TrimSuffix(m.Scripts[i].GetName(), filepath.Ext(m.Scripts[i].GetName())) <
strings.TrimSuffix(m.Scripts[j].GetName(), filepath.Ext(m.Scripts[j].GetName()))
})
sort.Slice(m.Scripts, func(i, j int) bool {
return m.Scripts[i].IsBaseline() && !m.Scripts[j].IsBaseline()
})
}

func (m *Module) Filenames() []string {
Expand All @@ -150,3 +154,26 @@ func (m *Module) GetScriptByFilename(filename string) (*Script, bool) {
}
return nil, false
}

func (m *Module) FilterFreshBaseline(db *gorm.DB) *Module {
var mod Module
mod.Name = m.Name

for _, script := range m.Scripts {
// if the script is not baseline, skip
if !script.IsBaseline() {
continue
}

// if the script is not fresh, skip
var cnt int64
if db.Where(map[string]interface{}{"filename": script.GetName()}).
First(new(HistoryModel)).Count(&cnt); db.Error == nil && cnt > 0 {
continue
}

mod.Scripts = append(mod.Scripts, script)
}

return &mod
}
Loading

0 comments on commit 1142c50

Please sign in to comment.