Skip to content

Commit

Permalink
[CDNC-4831] Added option to pass consistency level in the cassandra s…
Browse files Browse the repository at this point in the history
…chema versio… (cadence-workflow#5327)

* Added option to pass consistency level in the cassandra schema version validation


---------
  • Loading branch information
agautam478 authored Jun 22, 2023
1 parent 41c8369 commit 714aec4
Show file tree
Hide file tree
Showing 7 changed files with 34 additions and 21 deletions.
4 changes: 3 additions & 1 deletion cmd/server/cadence/cadence.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ import (
"strings"
"syscall"

"github.com/uber/cadence/common/persistence/nosql/nosqlplugin/cassandra/gocql"

"github.com/urfave/cli"

"github.com/uber/cadence/common"
Expand Down Expand Up @@ -69,7 +71,7 @@ func startHandler(c *cli.Context) {
log.Fatalf("config validation failed: %v", err)
}
// cassandra schema version validation
if err := cassandra.VerifyCompatibleVersion(cfg.Persistence); err != nil {
if err := cassandra.VerifyCompatibleVersion(cfg.Persistence, gocql.Quorum); err != nil {
log.Fatal("cassandra schema version compatibility check failed: ", err)
}
// sql schema version validation
Expand Down
4 changes: 3 additions & 1 deletion cmd/server/cadence/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ import (
"testing"
"time"

"github.com/uber/cadence/common/persistence/nosql/nosqlplugin/cassandra/gocql"

"github.com/uber/cadence/testflags"

"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -94,7 +96,7 @@ func (s *ServerSuite) TestServerStartup() {
log.Fatalf("config validation failed: %v", err)
}
// cassandra schema version validation
if err := cassandra.VerifyCompatibleVersion(cfg.Persistence); err != nil {
if err := cassandra.VerifyCompatibleVersion(cfg.Persistence, gocql.All); err != nil {
log.Fatal("cassandra schema version compatibility check failed: ", err)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ import (
"testing"
"time"

"github.com/uber/cadence/common/persistence/nosql/nosqlplugin/cassandra/gocql"

log "github.com/sirupsen/logrus"
"github.com/stretchr/testify/require"
"github.com/stretchr/testify/suite"
Expand Down Expand Up @@ -92,7 +94,7 @@ func (s *VersionTestSuite) TestVerifyCompatibleVersion() {
TransactionSizeLimit: dynamicconfig.GetIntPropertyFn(common.DefaultTransactionSizeLimit),
ErrorInjectionRate: dynamicconfig.GetFloatPropertyFn(0),
}
s.NoError(cassandra.VerifyCompatibleVersion(cfg))
s.NoError(cassandra.VerifyCompatibleVersion(cfg, gocql.All))
}

func (s *VersionTestSuite) TestCheckCompatibleVersion() {
Expand Down Expand Up @@ -121,7 +123,7 @@ func (s *VersionTestSuite) createKeyspace(keyspace string) func() {
NumReplicas: 1,
ProtoVersion: environment.GetCassandraProtoVersion(),
}
client, err := cassandra.NewCQLClient(cfg)
client, err := cassandra.NewCQLClient(cfg, gocql.All)
s.NoError(err)

err = client.CreateKeyspace(keyspace)
Expand Down Expand Up @@ -167,7 +169,7 @@ func (s *VersionTestSuite) runCheckCompatibleVersion(
Password: environment.GetCassandraPassword(),
Keyspace: keyspace,
}
err := cassandra.CheckCompatibleVersion(cfg, expected)
err := cassandra.CheckCompatibleVersion(cfg, expected, gocql.All)
if len(errStr) > 0 {
s.Error(err)
s.Contains(err.Error(), errStr)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
package tests

import (
"github.com/uber/cadence/common/persistence/nosql/nosqlplugin/cassandra/gocql"
"github.com/uber/cadence/environment"
"github.com/uber/cadence/tools/cassandra"
)
Expand All @@ -39,7 +40,7 @@ func NewTestCQLClient(keyspace string) (cassandra.CqlClient, error) {
AllowedAuthenticators: environment.GetCassandraAllowedAuthenticators(),
NumReplicas: 1,
ProtoVersion: environment.GetCassandraProtoVersion(),
})
}, gocql.All)
}

func CreateTestCQLFileContent() string {
Expand Down
4 changes: 2 additions & 2 deletions tools/cassandra/cqlclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ const (
var _ schema.SchemaClient = (*CqlClientImpl)(nil)

// NewCQLClient returns a new instance of CQLClient
func NewCQLClient(cfg *CQLClientConfig) (CqlClient, error) {
func NewCQLClient(cfg *CQLClientConfig, expectedConsistency gocql.Consistency) (CqlClient, error) {
var err error

cqlClient := new(CqlClientImpl)
Expand All @@ -129,7 +129,7 @@ func NewCQLClient(cfg *CQLClientConfig) (CqlClient, error) {
Timeout: time.Duration(cfg.Timeout) * time.Second,
ConnectTimeout: time.Duration(cfg.ConnectTimeout) * time.Second,
ProtoVersion: cfg.ProtoVersion,
Consistency: gocql.All,
Consistency: expectedConsistency,
})
if err != nil {
return nil, err
Expand Down
28 changes: 16 additions & 12 deletions tools/cassandra/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ import (
"fmt"
"log"

"github.com/uber/cadence/common/persistence/nosql/nosqlplugin/cassandra/gocql"

"github.com/urfave/cli"

"github.com/uber/cadence/common/config"
Expand All @@ -45,16 +47,17 @@ type SetupSchemaConfig struct {
// rollback, the code version (expected version) would fall lower than the actual version in
// cassandra.
func VerifyCompatibleVersion(
cfg config.Persistence,
cfg config.Persistence, expectedConsistency gocql.Consistency,
) error {

if ds, ok := cfg.DataStores[cfg.DefaultStore]; ok {
if err := verifyCompatibleVersion(ds, cassandra.Version); err != nil {
if err := verifyCompatibleVersion(ds, cassandra.Version, expectedConsistency); err != nil {
return err
}
}

if ds, ok := cfg.DataStores[cfg.VisibilityStore]; ok {
if err := verifyCompatibleVersion(ds, cassandra.VisibilityVersion); err != nil {
if err := verifyCompatibleVersion(ds, cassandra.VisibilityVersion, expectedConsistency); err != nil {
return err
}
}
Expand All @@ -64,14 +67,14 @@ func VerifyCompatibleVersion(

func verifyCompatibleVersion(
ds config.DataStore,
expectedCassandraVersion string,
expectedCassandraVersion string, expectedConsistency gocql.Consistency,
) error {
if ds.NoSQL != nil {
return verifyPluginVersion(ds.NoSQL, expectedCassandraVersion)
return verifyPluginVersion(ds.NoSQL, expectedCassandraVersion, expectedConsistency)
}
if ds.ShardedNoSQL != nil {
for shardName, connection := range ds.ShardedNoSQL.Connections {
err := verifyPluginVersion(connection.NoSQLPlugin, expectedCassandraVersion)
err := verifyPluginVersion(connection.NoSQLPlugin, expectedCassandraVersion, expectedConsistency)
if err != nil {
return fmt.Errorf("Failed to verify version for DB shard: %v. Error: %v", shardName, err.Error())
}
Expand All @@ -82,21 +85,22 @@ func verifyCompatibleVersion(
return nil
}

func verifyPluginVersion(plugin *config.NoSQL, expectedCassandraVersion string) error {
func verifyPluginVersion(plugin *config.NoSQL, expectedCassandraVersion string, expectedConsistency gocql.Consistency) error {
// Use hardcoded instead of constant because of cycle dependency issue.
// However, this file will be refactor to support NoSQL soon. After the refactoring, cycle dependency issue
// should be gone and we can use constant at that time
if plugin.PluginName != "cassandra" {
return fmt.Errorf("unknown NoSQL plugin name: %q", plugin.PluginName)
}

return CheckCompatibleVersion(*plugin, expectedCassandraVersion)
return CheckCompatibleVersion(*plugin, expectedCassandraVersion, expectedConsistency)
}

// CheckCompatibleVersion check the version compatibility
func CheckCompatibleVersion(
cfg config.Cassandra,
expectedVersion string,
expectedConsistency gocql.Consistency,
) error {

client, err := NewCQLClient(&CQLClientConfig{
Expand All @@ -110,7 +114,7 @@ func CheckCompatibleVersion(
ConnectTimeout: DefaultConnectTimeout,
TLS: cfg.TLS,
ProtoVersion: cfg.ProtoVersion,
})
}, expectedConsistency)
if err != nil {
return fmt.Errorf("creating CQL client: %w", err)
}
Expand All @@ -127,7 +131,7 @@ func setupSchema(cli *cli.Context) error {
if err != nil {
return handleErr(schema.NewConfigError(err.Error()))
}
client, err := NewCQLClient(config)
client, err := NewCQLClient(config, gocql.All)
if err != nil {
return handleErr(err)
}
Expand All @@ -145,7 +149,7 @@ func updateSchema(cli *cli.Context) error {
if err != nil {
return handleErr(schema.NewConfigError(err.Error()))
}
client, err := NewCQLClient(config)
client, err := NewCQLClient(config, gocql.All)
if err != nil {
return handleErr(err)
}
Expand Down Expand Up @@ -176,7 +180,7 @@ func createKeyspace(cli *cli.Context) error {

func doCreateKeyspace(cfg CQLClientConfig, name string, datacenter string) error {
cfg.Keyspace = SystemKeyspace
client, err := NewCQLClient(&cfg)
client, err := NewCQLClient(&cfg, gocql.All)
if err != nil {
return err
}
Expand Down
4 changes: 3 additions & 1 deletion tools/cassandra/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ package cassandra
import (
"os"

"github.com/uber/cadence/common/persistence/nosql/nosqlplugin/cassandra/gocql"

"github.com/urfave/cli"

"github.com/uber/cadence/tools/common/schema"
Expand All @@ -39,7 +41,7 @@ func SetupSchema(config *SetupSchemaConfig) error {
if err := validateCQLClientConfig(&config.CQLClientConfig); err != nil {
return err
}
db, err := NewCQLClient(&config.CQLClientConfig)
db, err := NewCQLClient(&config.CQLClientConfig, gocql.All)
if err != nil {
return err
}
Expand Down

0 comments on commit 714aec4

Please sign in to comment.