Skip to content

Commit

Permalink
Add database schema version check for mysql (cadence-workflow#2771)
Browse files Browse the repository at this point in the history
* Add database schema version check for mysql
  • Loading branch information
yux0 authored Nov 6, 2019
1 parent 33321e2 commit 1046b5b
Show file tree
Hide file tree
Showing 4 changed files with 265 additions and 1 deletion.
8 changes: 7 additions & 1 deletion cmd/server/cadence.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (

"github.com/uber/cadence/common/service/config"
"github.com/uber/cadence/tools/cassandra"
"github.com/uber/cadence/tools/sql"
)

// validServices is the list of all valid cadence services
Expand Down Expand Up @@ -60,8 +61,13 @@ func startHandler(c *cli.Context) {
if err := cfg.Validate(); err != nil {
log.Fatalf("config validation failed: %v", err)
}
// cassandra schema version validation
if err := cassandra.VerifyCompatibleVersion(cfg.Persistence); err != nil {
log.Fatal("Incompatible versions: ", err)
log.Fatal("Incompatible cassandra versions: ", err)
}
// sql schema version validation
if err := sql.VerifyCompatibleVersion(cfg.Persistence); err != nil {
log.Fatal("Incompatible sql versions: ", err)
}

services := getServices(c)
Expand Down
44 changes: 44 additions & 0 deletions tools/sql/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,53 @@ import (

"github.com/urfave/cli"

"github.com/uber/cadence/common/service/config"
"github.com/uber/cadence/schema/mysql"
"github.com/uber/cadence/tools/common/schema"
)

func VerifyCompatibleVersion(
cfg config.Persistence,
) error {

ds, ok := cfg.DataStores[cfg.DefaultStore]
if ok && ds.SQL != nil {
err := checkCompatibleVersion(*ds.SQL, mysql.Version)
if err != nil {
return err
}
}
ds, ok = cfg.DataStores[cfg.VisibilityStore]
if ok && ds.SQL != nil {
err := checkCompatibleVersion(*ds.SQL, mysql.VisibilityVersion)
if err != nil {
return err
}
}
return nil
}

// checkCompatibleVersion check the version compatibility
func checkCompatibleVersion(
cfg config.SQL,
expectedVersion string,
) error {

connection, err := newConn(&sqlConnectParams{
host: cfg.ConnectAddr,
user: cfg.User,
password: cfg.Password,
driverName: cfg.DriverName,
database: cfg.DatabaseName,
})
if err != nil {
return fmt.Errorf("unable to create SQL connection: %v", err.Error())
}
defer connection.Close()

return schema.VerifyCompatibleVersion(connection, cfg.DatabaseName, expectedVersion)
}

// setupSchema executes the setupSchemaTask
// using the given command line arguments
// as input
Expand Down
4 changes: 4 additions & 0 deletions tools/sql/mysql/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ package mysql

import (
"fmt"
"strings"

"github.com/iancoleman/strcase"
"github.com/jmoiron/sqlx"
Expand All @@ -37,6 +38,9 @@ const DriverName = "mysql"
// NewConnection returns a new connection to mysql database
func NewConnection(host string, port int, user string, passwd string, database string) (*sqlx.DB, error) {
addr := fmt.Sprintf("%v:%v", host, port)
if strings.Contains(host, ":") && port == 0 {
addr = host
}
db, err := sqlx.Connect(DriverName, fmt.Sprintf(dataSourceName, user, passwd, "tcp", addr, database))
if err != nil {
return nil, err
Expand Down
210 changes: 210 additions & 0 deletions tools/sql/version_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,210 @@
// Copyright (c) 2019 Uber Technologies, Inc.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.

package sql

import (
"fmt"
"io/ioutil"
"math/rand"
"os"
"path"
"runtime"
"strconv"
"testing"
"time"

"github.com/stretchr/testify/require"
"github.com/stretchr/testify/suite"

"github.com/uber/cadence/common"
"github.com/uber/cadence/common/service/config"
"github.com/uber/cadence/common/service/dynamicconfig"
"github.com/uber/cadence/environment"
"github.com/uber/cadence/tools/sql/mysql"
)

type (
VersionTestSuite struct {
*require.Assertions // override suite.Suite.Assertions with require.Assertions; this means that s.NotNil(nil) will stop the test, not merely log an error
suite.Suite
}
)

func TestVersionTestSuite(t *testing.T) {
suite.Run(t, new(VersionTestSuite))
}

func (s *VersionTestSuite) SetupTest() {
s.Assertions = require.New(s.T()) // Have to define our overridden assertions in the test setup. If we did it earlier, s.T() will return nil
}

func (s *VersionTestSuite) TestVerifyCompatibleVersion() {
database := "cadence_test"
visDatabase := "cadence_visibility_test"
_, filename, _, ok := runtime.Caller(0)
s.True(ok)
root := path.Dir(path.Dir(path.Dir(filename)))
sqlFile := path.Join(root, "schema/mysql/v57/cadence/schema.sql")
visSqlFile := path.Join(root, "schema/mysql/v57/visibility/schema.sql")

defer s.createDatabase(database)()
defer s.createDatabase(visDatabase)()
err := RunTool([]string{
"./tool",
"-ep", environment.GetMySQLAddress(),
"-p", strconv.Itoa(environment.GetMySQLPort()),
"-u", testUser,
"-pw", testPassword,
"-db", database,
"-dr", mysql.DriverName,
"-q",
"setup-schema",
"-f", sqlFile,
"-version", "10.0",
"-o",
})
s.NoError(err)
err = RunTool([]string{
"./tool",
"-ep", environment.GetMySQLAddress(),
"-p", strconv.Itoa(environment.GetMySQLPort()),
"-u", testUser,
"-pw", testPassword,
"-db", visDatabase,
"-dr", mysql.DriverName,
"-q",
"setup-schema",
"-f", visSqlFile,
"-version", "10.0",
"-o",
})
s.NoError(err)

defaultCfg := config.SQL{
ConnectAddr: fmt.Sprintf("%v:%v", environment.GetMySQLAddress(), environment.GetMySQLPort()),
User: testUser,
Password: testPassword,
DriverName: mysql.DriverName,
DatabaseName: database,
}
visibilityCfg := defaultCfg
visibilityCfg.DatabaseName = visDatabase
cfg := config.Persistence{
DefaultStore: "default",
VisibilityStore: "visibility",
DataStores: map[string]config.DataStore{
"default": {SQL: &defaultCfg},
"visibility": {SQL: &visibilityCfg},
},
TransactionSizeLimit: dynamicconfig.GetIntPropertyFn(common.DefaultTransactionSizeLimit),
}
s.NoError(VerifyCompatibleVersion(cfg))
}

func (s *VersionTestSuite) TestCheckCompatibleVersion() {
flags := []struct {
expectedVersion string
actualVersion string
errStr string
expectedFail bool
}{
{"2.0", "1.0", "version mismatch", false},
{"1.0", "1.0", "", false},
{"1.0", "2.0", "", false},
{"1.0", "abc", "unable to read cassandra schema version", false},
}
for _, flag := range flags {
s.runCheckCompatibleVersion(flag.expectedVersion, flag.actualVersion, flag.errStr, flag.expectedFail)
}
}

func (s *VersionTestSuite) createDatabase(database string) func() {
connection, err := newTestConn("")
s.NoError(err)
err = connection.CreateDatabase(database)
s.NoError(err)
return func() {
s.NoError(connection.DropDatabase(database))
connection.Close()
}
}

func (s *VersionTestSuite) runCheckCompatibleVersion(
expected string, actual string, errStr string, expectedFail bool,
) {
r := rand.New(rand.NewSource(time.Now().UnixNano()))
database := fmt.Sprintf("version_test_%v", r.Int63())
defer s.createDatabase(database)()

dir := "check_version"
tmpDir, err := ioutil.TempDir("", dir)
s.NoError(err)
defer os.RemoveAll(tmpDir)

subdir := tmpDir + "/" + database
s.NoError(os.Mkdir(subdir, os.FileMode(0744)))

s.createSchemaForVersion(subdir, actual)
if expected != actual {
s.createSchemaForVersion(subdir, expected)
}

sqlFile := subdir + "/v" + actual + "/tmp.sql"
RunTool([]string{
"./tool",
"-ep", environment.GetMySQLAddress(),
"-p", strconv.Itoa(environment.GetMySQLPort()),
"-u", testUser,
"-pw", testPassword,
"-db", database,
"-dr", mysql.DriverName,
"-q",
"setup-schema",
"-f", sqlFile,
"-version", actual,
"-o",
})
if expectedFail {
os.RemoveAll(subdir + "/v" + actual)
}

cfg := config.SQL{
ConnectAddr: fmt.Sprintf("%v:%v", environment.GetMySQLAddress(), environment.GetMySQLPort()),
User: testUser,
Password: testPassword,
DriverName: mysql.DriverName,
DatabaseName: database,
}
err = checkCompatibleVersion(cfg, expected)
if len(errStr) > 0 {
s.Error(err)
s.Contains(err.Error(), errStr)
} else {
s.NoError(err)
}
}

func (s *VersionTestSuite) createSchemaForVersion(subdir string, v string) {
vDir := subdir + "/v" + v
s.NoError(os.Mkdir(vDir, os.FileMode(0744)))
cqlFile := vDir + "/tmp.sql"
s.NoError(ioutil.WriteFile(cqlFile, []byte{}, os.FileMode(0644)))
}

0 comments on commit 1046b5b

Please sign in to comment.