Skip to content

Commit

Permalink
Add cassandra schema version check on startup (cadence-workflow#438)
Browse files Browse the repository at this point in the history
  • Loading branch information
madhuravi authored Dec 1, 2017
1 parent 172983f commit 9968ec6
Show file tree
Hide file tree
Showing 5 changed files with 200 additions and 8 deletions.
6 changes: 4 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,11 @@ brew install cassandra
* Setup the cassandra schema:
```bash
./cadence-cassandra-tool --ep 127.0.0.1 create -k "cadence" --rf 1
./cadence-cassandra-tool --ep 127.0.0.1 -k "cadence" setup-schema -d -f ./schema/cadence/schema.cql
./cadence-cassandra-tool -ep 127.0.0.1 -k cadence setup-schema -v 0.0
./cadence-cassandra-tool -ep 127.0.0.1 -k cadence update-schema -d ./schema/cadence/versioned
./cadence-cassandra-tool --ep 127.0.0.1 create -k "cadence_visibility" --rf 1
./cadence-cassandra-tool --ep 127.0.0.1 -k "cadence_visibility" setup-schema -d -f ./schema/visibility/schema.cql
./cadence-cassandra-tool -ep 127.0.0.1 -k cadence_visibility setup-schema -v 0.0
./cadence-cassandra-tool -ep 127.0.0.1 -k cadence_visibility update-schema -d ./schema/visibility/versioned
```

* Start the service:
Expand Down
18 changes: 16 additions & 2 deletions cmd/server/cadence.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,14 @@
package main

import (
"github.com/uber/cadence/common/service/config"
"github.com/urfave/cli"
"log"
"os"
"strings"

"github.com/uber/cadence/common/service/config"
"github.com/uber/cadence/tools/cassandra"

"github.com/urfave/cli"
)

// validServices is the list of all valid cadence services
Expand All @@ -49,6 +52,17 @@ func startHandler(c *cli.Context) {
config.Load(env, configDir, zone, &cfg)
log.Printf("config=\n%v\n", cfg.String())

cassCfg := cfg.Cassandra
if err := cassandra.CheckCompatibleVersion(
cassCfg, cassCfg.Keyspace, "./schema/cadence/versioned",
); err != nil {
log.Fatalf("Incompatible versions", err)
}
if err := cassandra.CheckCompatibleVersion(
cassCfg, cassCfg.VisibilityKeyspace, "./schema/visibility/versioned",
); err != nil {
log.Fatalf("Incompatible versions", err)
}
for _, svc := range getServices(c) {
if _, ok := cfg.Services[svc]; !ok {
log.Fatalf("`%v` service missing config", svc)
Expand Down
4 changes: 3 additions & 1 deletion tools/cassandra/cqlclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,9 @@ func newCQLClient(hostsCsv string, port int, user, password, keyspace string) (C
return nil, errNoHosts
}
clusterCfg := gocql.NewCluster(hosts...)
clusterCfg.Port = port
if port > 0 {
clusterCfg.Port = port
}
if user != "" && password != "" {
clusterCfg.Authenticator = gocql.PasswordAuthenticator{
Username: user,
Expand Down
61 changes: 59 additions & 2 deletions tools/cassandra/version.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,14 @@
package cassandra

import (
"errors"
"fmt"
"io/ioutil"
"regexp"
"strconv"
"strings"

"github.com/uber/cadence/common/service/config"
)

// represents names of the form vx.x where x.x is a (major, minor) version pair
Expand Down Expand Up @@ -79,8 +83,7 @@ func parseVersion(ver string) (major int, minor int, err error) {
return
}

// parseValidteVersion validates that the given
// input conforms to either of vx.x or x.x and
// parseValidateVersion validates that the given input conforms to either of vx.x or x.x and
// returns x.x on success
func parseValidateVersion(ver string) (string, error) {
if len(ver) == 0 {
Expand All @@ -94,3 +97,57 @@ func parseValidateVersion(ver string) (string, error) {
}
return ver, nil
}

// getExpectedVersion gets the latest version from the schema directory
func getExpectedVersion(dir string) (string, error) {
subdirs, err := ioutil.ReadDir(dir)
if err != nil {
return "", err
}

var result string
for _, subdir := range subdirs {
if !subdir.IsDir() {
continue
}
dirname := subdir.Name()
if !versionStrRegex.MatchString(dirname) {
continue
}
ver := dirToVersion(dirname)
if len(result) == 0 || cmpVersion(ver, result) > 0 {
result = ver
}
}
if len(result) == 0 {
return "", errors.New(fmt.Sprintf("no valid schemas found in dir: %s", dir))
}
return result, nil
}

func CheckCompatibleVersion(cfg config.Cassandra, keyspace string, dirPath string) error {
cqlClient, err := newCQLClient(cfg.Hosts, cfg.Port, cfg.User, cfg.Password, keyspace)
if err != nil {
return errors.New(fmt.Sprintf("unable to create CQL Client: %v", err.Error()))
}
defer cqlClient.Close()
version, err := cqlClient.ReadSchemaVersion()
if err != nil {
return errors.New(fmt.Sprintf("unable to read cassandra schema version: %v", err.Error()))
}
expectedVersion, err := getExpectedVersion(dirPath)
if err != nil {
return errors.New(fmt.Sprintf("unable to read expected schema version: %v", err.Error()))
}
// In most cases, the versions should match. However if after a schema upgrade there is a code
// rollback, the code version (expected version) would fall lower than the actual version in
// cassandra. This check is to allow such rollbacks since we only make backwards compatible schema
// changes
if cmpVersion(version, expectedVersion) < 0 {
return errors.New(fmt.Sprintf(
"version mismatch for keyspace: %q. Expected version: %s cannot be greater than "+
"Actual version: %s", keyspace, expectedVersion, version,
))
}
return nil
}
119 changes: 118 additions & 1 deletion tools/cassandra/version_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,17 @@
package cassandra

import (
"fmt"
"io/ioutil"
"math/rand"
"os"
"testing"
"time"

log "github.com/sirupsen/logrus"
"github.com/stretchr/testify/require"
"github.com/stretchr/testify/suite"
"testing"
"github.com/uber/cadence/common/service/config"
)

type (
Expand Down Expand Up @@ -117,3 +125,112 @@ func (s *VersionTestSuite) execParseTest(input string, expMajor int, expMinor in
s.Equal(expMajor, maj)
s.Equal(expMinor, min)
}

func (s *VersionTestSuite) TestGetExpectedVersion() {
s.T().Skip()
flags := []struct {
dirs []string
expected string
err string
}{
{[]string{"1.0"}, "1.0", ""},
{[]string{"1.0", "2.0"}, "2.0", ""},
{[]string{"abc"}, "", "no valid schemas"},
}
for _, flag := range flags {
s.expectedVersionTest(flag.expected, flag.dirs, flag.err)
}
}

func (s *VersionTestSuite) expectedVersionTest(expected string, dirs []string, errStr string) {
tmpDir, err := ioutil.TempDir("", "version_test")
s.NoError(err)
defer os.RemoveAll(tmpDir)

for _, dir := range dirs {
s.createSchemaForVersion(tmpDir, dir)
}
v, err := getExpectedVersion(tmpDir)
if len(errStr) == 0 {
s.Equal(expected, v)
} else {
s.Error(err)
s.Contains(err.Error(), errStr)
}
}

func (s *VersionTestSuite) TestCheckCompatibleVersion() {
flags := []struct {
expectedVersion string
actualVersion string
errStr string
expectedFail bool
}{
{"2.0", "1.0", "version mismatch", false},
{"1.0", "1.0", "", false},
{"1.0", "2.0", "", false},
{"1.0", "abc", "unable to read cassandra schema version", false},
{"abc", "1.0", "unable to read expected schema version", true},
}
for _, flag := range flags {
s.checkCompatibleVersion(flag.expectedVersion, flag.actualVersion, flag.errStr, flag.expectedFail)
}
}

func (s *VersionTestSuite) checkCompatibleVersion(
expected string, actual string, errStr string, expectedFail bool,
) {
client, err := newCQLClient("127.0.0.1", defaultCassandraPort, "", "", "system")
s.NoError(err)
defer client.Close()

r := rand.New(rand.NewSource(time.Now().UnixNano()))
keyspace := fmt.Sprintf("version_test_%v", r.Int63())
err = client.CreateKeyspace(keyspace, 1)
if err != nil {
log.Fatalf("error creating keyspace, err=%v", err)
}
defer client.DropKeyspace(keyspace)

dir := "check_version"
tmpDir, err := ioutil.TempDir("", dir)
s.NoError(err)
defer os.RemoveAll(tmpDir)

subdir := tmpDir + "/" + keyspace
s.NoError(os.Mkdir(subdir, os.FileMode(0744)))

s.createSchemaForVersion(subdir, actual)
if expected != actual {
s.createSchemaForVersion(subdir, expected)
}

cqlFile := subdir + "/v" + actual + "/tmp.cql"
RunTool([]string{
"./tool", "-k", keyspace, "-q", "setup-schema", "-f", cqlFile, "-version", actual, "-o",
})
if expectedFail {
os.RemoveAll(subdir + "/v" + actual)
}

cfg := config.Cassandra{
Hosts: "127.0.0.1",
Port: defaultCassandraPort,
User: "",
Password: "",
}
err = CheckCompatibleVersion(cfg, keyspace, subdir)
if len(errStr) > 0 {
s.Error(err)
s.Contains(err.Error(), errStr)
} else {
s.NoError(err)
}
}

func (s *VersionTestSuite) createSchemaForVersion(subdir string, v string) {
vDir := subdir + "/v" + v
s.NoError(os.Mkdir(vDir, os.FileMode(0744)))
cqlFile := vDir + "/tmp.cql"
s.NoError(ioutil.WriteFile(cqlFile, []byte{}, os.FileMode(0644)))
}

0 comments on commit 9968ec6

Please sign in to comment.