diff --git a/.gitignore b/.gitignore index 048087793db..4dea32f6a38 100644 --- a/.gitignore +++ b/.gitignore @@ -14,6 +14,4 @@ test test.log # Executables produced by cadence repo -cadence -cadence-cassandra-tool - +/cadence* \ No newline at end of file diff --git a/common/persistence/persistenceTestBase.go b/common/persistence/persistenceTestBase.go index b978775b48c..88bc42d624a 100644 --- a/common/persistence/persistenceTestBase.go +++ b/common/persistence/persistenceTestBase.go @@ -719,7 +719,7 @@ func (s *CassandraTestCluster) setupTestCluster(keySpace string, dropKeySpace bo } s.createCluster(testWorkflowClusterHosts, testDatacenter, gocql.Consistency(1), keySpace) s.createKeyspace(1, dropKeySpace) - s.loadSchema([]string{"workflow_test.cql", "visibility_test.cql"}, schemaDir) + s.loadSchema([]string{"schema.cql"}, schemaDir) } func (s *CassandraTestCluster) tearDownTestCluster() { @@ -757,9 +757,9 @@ func (s *CassandraTestCluster) dropKeyspace() { } func (s *CassandraTestCluster) loadSchema(fileNames []string, schemaDir string) { - workflowSchemaDir := "./schema/" + workflowSchemaDir := "./schema/cadence" if schemaDir != "" { - workflowSchemaDir = schemaDir + "/schema/" + workflowSchemaDir = schemaDir + "/schema/cadence" } err := common.LoadCassandraSchema(workflowSchemaDir, fileNames, s.keyspace) diff --git a/schema/README.md b/schema/README.md new file mode 100644 index 00000000000..c40df97b75d --- /dev/null +++ b/schema/README.md @@ -0,0 +1,46 @@ +What +---- +This directory contains the cassandra schema for every keyspace that cadence owns. The directory structure is as follows + +``` +./schema + - keyspace1/ + - keyspace2/ + - keyspace.cql -- Contains the keyspace definition + - schema.cql -- Contains the latest & greatest snapshot of the schema for the keyspace + - versioned + - v0.1/ + - v0.2/ -- One directory per schema version change + - v1.0/ + - manifest.json -- json file describing the change + - changes.cql -- changes in this version, only [CREATE, ALTER] commands are allowed +``` + +How +--- + +Q: How do I update existing schema ? +* Add your changes to schema.cql +* Create a new schema version directory under ./schema/keyspace/versioned/vx.x +** Add a manifest.json +** Add your changes in a cql file +* Update the unit test within ./tools/cassandra/updateTask_test.go `TestDryrun` with your version x.x +* Once you are done with these use the ./cadence-cassandra-tool to update the schema + +Q: What's the format of manifest.json + +Example below: +* MinCompatibleVersion is the minimum schema version that your code can handle +* SchemaUpdateCqlFiles are list of .cql files containg your create/alter commands + + +``` +{ + "CurrVersion": "0.1", + "MinCompatibleVersion": "0.1", + "Description": "base version of schema", + "SchemaUpdateCqlFiles": [ + "base.cql" + ] +} +``` \ No newline at end of file diff --git a/schema/keyspace_test.cql b/schema/cadence/keyspace.cql similarity index 100% rename from schema/keyspace_test.cql rename to schema/cadence/keyspace.cql diff --git a/schema/cadence/schema.cql b/schema/cadence/schema.cql new file mode 100644 index 00000000000..0da33a0621b --- /dev/null +++ b/schema/cadence/schema.cql @@ -0,0 +1,210 @@ +CREATE TABLE shards ( + shard_id int, + PRIMARY KEY (shard_id) +); + +CREATE TYPE shard ( + shard_id int, + owner text, -- Host identifier processing the shard + -- Range identifier used for generating ack ids for tasks within shard. + -- Also used for optimistic concurrency and all writes to a shard are conditional on this value. + range_id bigint, + -- This field keeps track of number of times owner for a shard changes before updating range_id or ack_levels + stolen_since_renew int, + updated_at timestamp, + transfer_ack_level bigint, +); + +--- Workflow execution and mutable state --- +CREATE TYPE workflow_execution ( + domain_id uuid, + workflow_id text, + run_id uuid, + task_list text, + workflow_type_name text, + decision_task_timeout int, + execution_context blob, + state int, -- enum WorkflowState {Created, Running, Completed} + close_status int, -- enum WorkflowCloseStatus {None, Completed, Failed, Canceled, Terminated, ContinuedAsNew, TimedOut} + next_event_id bigint, + last_processed_event bigint, + start_time timestamp, + last_updated_time timestamp, + create_request_id uuid, + decision_schedule_id bigint, + decision_started_id bigint, + decision_request_id text, -- Identifier used by matching engine for retrying history service calls for recording task is started + decision_timeout int, +); + +-- TODO: Remove fields that are left over from activity and workflow tasks. +CREATE TYPE transfer_task ( + domain_id uuid, -- The domain ID that this transfer task belongs to + workflow_id text, -- The workflow ID that this transfer task belongs to + run_id uuid, -- The run ID that this transfer task belongs to + task_id bigint, + target_domain_id uuid, -- The external domain ID that this transfer task is doing work for. + target_workflow_id text, -- The external workflow ID that this transfer task is doing work for. + target_run_id uuid, -- The external run ID that this transfer task is doing work for. + task_list text, + type int, -- enum TaskType {ActivityTask, DecisionTask, DeleteExecution, CancelExecution} + schedule_id bigint, +); + +CREATE TYPE timer_task ( + domain_id uuid, + workflow_id text, + run_id uuid, + task_id bigint, + type int, -- enum TaskType {DecisionTaskTimeout, ActivityTaskTimeout, UserTimer} + timeout_type int, -- enum TimeoutType in IDL {START_TO_CLOSE, SCHEDULE_TO_START, SCHEDULE_TO_CLOSE, HEARTBEAT} + event_id bigint, -- Corresponds to event ID in history that is responsible for this timer. +); + +-- Workflow activity in progress mutable state +CREATE TYPE activity_info ( + schedule_id bigint, + scheduled_event blob, + started_id bigint, + started_event blob, + activity_id text, -- Client generated unique ID for the activity. + request_id text, -- Identifier used by matching engine for retrying history service calls for recording task is started + details blob, + schedule_to_start_timeout int, + schedule_to_close_timeout int, + start_to_close_timeout int, + heart_beat_timeout int, + cancel_requested boolean, -- If a cancel request is made to cancel the activity in progress. + cancel_request_id bigint, -- Event ID that identifies the cancel request. + last_hb_updated_time timestamp, -- Last time the heartbeat is received. +); + +-- User timer details +CREATE TYPE timer_info ( + timer_id text, -- User defined timer ID + started_id bigint, -- The event ID corresponding to timer started. + expiry_time timestamp, -- Timestamp at which this timer expires or fires + task_id bigint, -- The task ID if we have one created for this timer +); + +-- Activity or workflow task in a task list +CREATE TYPE task ( + domain_id uuid, + workflow_id text, + run_id uuid, + schedule_id bigint, +); + +CREATE TYPE task_list ( + domain_id uuid, + name text, + type int, -- enum TaskRowType {ActivityTask, DecisionTask} + ack_level bigint, -- task_id of the last acknowledged message +); + +CREATE TYPE domain ( + id uuid, + name text, + status int, -- enum DomainStatus {Registered, Deprecated, Deleted} + description text, + owner_email text, +); + +CREATE TYPE domain_config ( + retention int, + emit_metric boolean +); + +CREATE TABLE executions ( + shard_id int, + type int, -- enum RowType { Shard, Execution, TransferTask, TimerTask} + domain_id uuid, + workflow_id text, + run_id uuid, + current_run_id uuid, + task_id bigint, -- unique identifier for transfer and timer tasks for an execution + shard frozen, + execution frozen, + transfer frozen, + timer frozen, + next_event_id bigint, -- This is needed to make conditional updates on session history + range_id bigint static, -- Increasing sequence identifier for transfer queue, checkpointed into shard info + activity_map map>, + timer_map map>, + PRIMARY KEY (shard_id, type, domain_id, workflow_id, run_id, task_id) +); + +CREATE TABLE events ( + domain_id uuid, + workflow_id text, + run_id uuid, + -- We insert a batch of events with each append transaction. + -- This field stores the event id of first event in the batch. + first_event_id bigint, + range_id bigint, + tx_id bigint, + data blob, -- Batch of workflow execution history events as a blob + data_encoding text, -- Protocol used for history serialization + data_version int, -- history blob version + PRIMARY KEY ((domain_id, workflow_id, run_id), first_event_id) +); + +-- Stores activity or workflow tasks +CREATE TABLE tasks ( + domain_id uuid, + task_list_name text, + task_list_type int, -- enum TaskListType {ActivityTask, DecisionTask} + type int, -- enum rowType {Task, TaskList} + task_id bigint, -- unique identifier for tasks, monotonically increasing + range_id bigint static, -- Used to ensure that only one process can write to the table + task frozen, + task_list frozen, + PRIMARY KEY ((domain_id, task_list_name, task_list_type), type, task_id) +); + +CREATE TABLE domains ( + id uuid, + domain frozen, + config frozen, + PRIMARY KEY (id) +); + +CREATE TABLE domains_by_name ( + name text, + domain frozen, + config frozen, + PRIMARY KEY (name) +); + +-- Visiblity schema goes below + +CREATE TABLE open_executions ( + domain_id uuid, + domain_partition int, + workflow_id text, + run_id uuid, + start_time timestamp, + workflow_type_name text, + PRIMARY KEY ((domain_id, domain_partition), start_time, run_id) +) WITH CLUSTERING ORDER BY (start_time DESC); + + +CREATE INDEX open_by_workflow_id ON open_executions (workflow_id); +CREATE INDEX open_by_type ON open_executions (workflow_type_name); + +CREATE TABLE closed_executions ( + domain_id uuid, + domain_partition int, + workflow_id text, + run_id uuid, + start_time timestamp, + close_time timestamp, + status int, -- enum WorkflowExecutionCloseStatus {COMPLETED, FAILED, CANCELED, TERMINATED, CONTINUED_AS_NEW, TIMED_OUT} + workflow_type_name text, + PRIMARY KEY ((domain_id, domain_partition), start_time, run_id) +) WITH CLUSTERING ORDER BY (start_time DESC); + +CREATE INDEX closed_by_workflow_id ON closed_executions (workflow_id); +CREATE INDEX closed_by_close_time ON closed_executions (close_time); +CREATE INDEX closed_by_type ON closed_executions (workflow_type_name); +CREATE INDEX closed_by_status ON closed_executions (status); \ No newline at end of file diff --git a/schema/workflow_test.cql b/schema/cadence/versioned/v0.1/base.cql similarity index 84% rename from schema/workflow_test.cql rename to schema/cadence/versioned/v0.1/base.cql index f3a158fe2e1..92de35cfe4c 100644 --- a/schema/workflow_test.cql +++ b/schema/cadence/versioned/v0.1/base.cql @@ -174,4 +174,35 @@ CREATE TABLE domains_by_name ( domain frozen, config frozen, PRIMARY KEY (name) -); \ No newline at end of file +); + +CREATE TABLE open_executions ( + domain_id uuid, + domain_partition int, + workflow_id text, + run_id uuid, + start_time timestamp, + workflow_type_name text, + PRIMARY KEY ((domain_id, domain_partition), start_time, run_id) +) WITH CLUSTERING ORDER BY (start_time DESC); + + +CREATE INDEX open_by_workflow_id ON open_executions (workflow_id); +CREATE INDEX open_by_type ON open_executions (workflow_type_name); + +CREATE TABLE closed_executions ( + domain_id uuid, + domain_partition int, + workflow_id text, + run_id uuid, + start_time timestamp, + close_time timestamp, + status int, -- enum WorkflowExecutionCloseStatus {COMPLETED, FAILED, CANCELED, TERMINATED, CONTINUED_AS_NEW, TIMED_OUT} + workflow_type_name text, + PRIMARY KEY ((domain_id, domain_partition), start_time, run_id) +) WITH CLUSTERING ORDER BY (start_time DESC); + +CREATE INDEX closed_by_workflow_id ON closed_executions (workflow_id); +CREATE INDEX closed_by_close_time ON closed_executions (close_time); +CREATE INDEX closed_by_type ON closed_executions (workflow_type_name); +CREATE INDEX closed_by_status ON closed_executions (status); \ No newline at end of file diff --git a/schema/cadence/versioned/v0.1/manifest.json b/schema/cadence/versioned/v0.1/manifest.json new file mode 100644 index 00000000000..fe2f06baff2 --- /dev/null +++ b/schema/cadence/versioned/v0.1/manifest.json @@ -0,0 +1,8 @@ +{ + "CurrVersion": "0.1", + "MinCompatibleVersion": "0.1", + "Description": "base version of schema", + "SchemaUpdateCqlFiles": [ + "base.cql" + ] +} diff --git a/schema/visibility_test.cql b/schema/visibility_test.cql deleted file mode 100644 index c6653604881..00000000000 --- a/schema/visibility_test.cql +++ /dev/null @@ -1,30 +0,0 @@ -CREATE TABLE open_executions ( - domain_id uuid, - domain_partition int, - workflow_id text, - run_id uuid, - start_time timestamp, - workflow_type_name text, - PRIMARY KEY ((domain_id, domain_partition), start_time, run_id) -) WITH CLUSTERING ORDER BY (start_time DESC); - - -CREATE INDEX open_by_workflow_id ON open_executions (workflow_id); -CREATE INDEX open_by_type ON open_executions (workflow_type_name); - -CREATE TABLE closed_executions ( - domain_id uuid, - domain_partition int, - workflow_id text, - run_id uuid, - start_time timestamp, - close_time timestamp, - status int, -- enum WorkflowExecutionCloseStatus {COMPLETED, FAILED, CANCELED, TERMINATED, CONTINUED_AS_NEW, TIMED_OUT} - workflow_type_name text, - PRIMARY KEY ((domain_id, domain_partition), start_time, run_id) -) WITH CLUSTERING ORDER BY (start_time DESC); - -CREATE INDEX closed_by_workflow_id ON closed_executions (workflow_id); -CREATE INDEX closed_by_close_time ON closed_executions (close_time); -CREATE INDEX closed_by_type ON closed_executions (workflow_type_name); -CREATE INDEX closed_by_status ON closed_executions (status); \ No newline at end of file diff --git a/tools/cassandra/README.md b/tools/cassandra/README.md new file mode 100644 index 00000000000..e37119a3700 --- /dev/null +++ b/tools/cassandra/README.md @@ -0,0 +1,24 @@ +What +---- +This package contains the tooling for cadence cassandra operations. + +How +--- +- Run make bins +- You should see an executable `cadence-cassandra-tool` + +Setting up initial cassandra schema on a new cluster +---------------------------------------------------- +``` +./cadence-cassandra-tool -ep 127.0.0.1 -k cadence setup-schema -v 0.0 -- this sets up just the schema version tables with initial version of 0.0 +./cadence-cassandra-tool -ep 127.0.0.1 -k cadence update-schema -d ./schema/cadence -- upgrades your schema to the latest version +``` + +Updating schema on an existing cluster +-------------------------------------- +You can only upgrade to a new version after the initial setup done above. + +``` +./cadence-cassandra-tool -ep 127.0.0.1 -k cadence update-schema -d ./schema/cadence -v x.x -y -- executes a dryrun of upgrade to version x.x +./cadence-cassandra-tool -ep 127.0.0.1 -k cadence update-schema -d ./schema/cadence -v x.x -- actually executes the upgrade to version x.x +``` \ No newline at end of file diff --git a/tools/cassandra/config.go b/tools/cassandra/config.go index 99ead66752a..741c022f5f6 100644 --- a/tools/cassandra/config.go +++ b/tools/cassandra/config.go @@ -20,7 +20,10 @@ package cassandra -import "fmt" +import ( + "fmt" + "regexp" +) type ( // baseConfig is the common config @@ -35,7 +38,7 @@ type ( // params for executing a UpdateSchemaTask UpdateSchemaConfig struct { BaseConfig - TargetVersion int + TargetVersion string SchemaDir string IsDryRun bool } @@ -45,7 +48,7 @@ type ( SetupSchemaConfig struct { BaseConfig SchemaFilePath string - InitialVersion int + InitialVersion string Overwrite bool // overwrite previous data DisableVersioning bool // do not use schema versioning } @@ -64,6 +67,9 @@ const ( cliOptSchemaFile = "schema-file" cliOptOverwrite = "overwrite" cliOptDisableVersioning = "disable-versioning" + cliOptTargetVersion = "version" + cliOptDryrun = "dryrun" + cliOptSchemaDir = "schema-dir" cliFlagEndpoint = cliOptEndpoint + ", ep" cliFlagKeyspace = cliOptKeyspace + ", k" @@ -71,8 +77,13 @@ const ( cliFlagSchemaFile = cliOptSchemaFile + ", f" cliFlagOverwrite = cliOptOverwrite + ", o" cliFlagDisableVersioning = cliOptDisableVersioning + ", d" + cliFlagTargetVersion = cliOptTargetVersion + ", v" + cliFlagDryrun = cliOptDryrun + ", y" + cliFlagSchemaDir = cliOptSchemaDir + ", d" ) +var rmspaceRegex = regexp.MustCompile("\\s+") + func newConfigError(msg string) error { return &ConfigError{msg: msg} } diff --git a/tools/cassandra/cqlclient.go b/tools/cassandra/cqlclient.go index 18fabf90564..a682dbac7d7 100644 --- a/tools/cassandra/cqlclient.go +++ b/tools/cassandra/cqlclient.go @@ -30,6 +30,7 @@ import ( "fmt" "github.com/gocql/gocql" + "log" ) type ( @@ -49,14 +50,18 @@ type ( DropTable(name string) error // DropType drops a user defined type from keyspace DropType(name string) error + // DropKeyspace drops a keyspace + DropKeyspace(keyspace string) error // CreateSchemaVersionTables sets up the schema version tables CreateSchemaVersionTables() error // ReadSchemaVersion returns the current schema version for the keyspace - ReadSchemaVersion() (int64, error) + ReadSchemaVersion() (string, error) // UpdateSchemaVersion updates the schema version for the keyspace - UpdateSchemaVersion(newVersion int64, minCompatibleVersion int64) error + UpdateSchemaVersion(newVersion string, minCompatibleVersion string) error // WriteSchemaUpdateLog adds an entry to the schema update history table - WriteSchemaUpdateLog(oldVersion int64, newVersion int64, manifestMD5 string, desc string) error + WriteSchemaUpdateLog(oldVersion string, newVersion string, manifestMD5 string, desc string) error + // Close gracefully closes the client object + Close() } cqlClient struct { session *gocql.Session @@ -83,8 +88,8 @@ const ( createSchemaVersionTableCQL = `CREATE TABLE schema_version(keyspace_name text PRIMARY KEY, ` + `creation_time timestamp, ` + - `curr_version bigint, ` + - `min_compatible_version bigint);` + `curr_version text, ` + + `min_compatible_version text);` createSchemaUpdateHistoryTableCQL = `CREATE TABLE schema_update_history(` + `year int, ` + @@ -92,8 +97,8 @@ const ( `update_time timestamp, ` + `description text, ` + `manifest_md5 text, ` + - `new_version bigint, ` + - `old_version bigint, ` + + `new_version text, ` + + `old_version text, ` + `PRIMARY KEY ((year, month), update_time));` createKeyspaceCQL = `CREATE KEYSPACE IF NOT EXISTS %v ` + @@ -166,6 +171,11 @@ func (client *cqlClient) DropType(name string) error { return client.Exec(fmt.Sprintf("DROP TYPE %v", name)) } +// DropKeyspace drops a keyspace +func (client *cqlClient) DropKeyspace(keyspace string) error { + return client.Exec(fmt.Sprintf("DROP KEYSPACE %v", keyspace)) +} + // CreateSchemaVersionTables sets up the schema version tables func (client *cqlClient) CreateSchemaVersionTables() error { if err := client.Exec(createSchemaVersionTableCQL); err != nil { @@ -178,28 +188,28 @@ func (client *cqlClient) CreateSchemaVersionTables() error { } // ReadSchemaVersion returns the current schema version for the keyspace -func (client *cqlClient) ReadSchemaVersion() (int64, error) { +func (client *cqlClient) ReadSchemaVersion() (string, error) { query := client.session.Query(readSchemaVersionCQL, client.clusterConfig.Keyspace) iter := query.Iter() - var version int64 + var version string if !iter.Scan(&version) { iter.Close() - return 0, errGetSchemaVersion + return "", errGetSchemaVersion } if err := iter.Close(); err != nil { - return 0, err + return "", err } return version, nil } // UpdateShemaVersion updates the schema version for the keyspace -func (client *cqlClient) UpdateSchemaVersion(newVersion int64, minCompatibleVersion int64) error { +func (client *cqlClient) UpdateSchemaVersion(newVersion string, minCompatibleVersion string) error { query := client.session.Query(writeSchemaVersionCQL, client.clusterConfig.Keyspace, time.Now(), newVersion, minCompatibleVersion) return query.Exec() } // WriteSchemaUpdateLog adds an entry to the schema update history table -func (client *cqlClient) WriteSchemaUpdateLog(oldVersion int64, newVersion int64, manifestMD5 string, desc string) error { +func (client *cqlClient) WriteSchemaUpdateLog(oldVersion string, newVersion string, manifestMD5 string, desc string) error { now := time.Now().UTC() query := client.session.Query(writeSchemaUpdateHistoryCQL) query.Bind(now.Year(), int(now.Month()), now, oldVersion, newVersion, manifestMD5, desc) @@ -211,6 +221,13 @@ func (client *cqlClient) Exec(stmt string) error { return client.session.Query(stmt).Exec() } +// Close closes the cql client +func (client *cqlClient) Close() { + if client.session != nil { + client.session.Close() + } +} + func parseHosts(input string) []string { var hosts = make([]string, 0) for _, h := range strings.Split(input, ",") { @@ -266,29 +283,29 @@ func ParseCQLFile(filePath string) ([]string, error) { return nil, err } -// dropKeyspace deletes all tables/types in the +// dropAllTablesTypes deletes all tables/types in the // keyspace without deleting the keyspace -func dropKeyspace(client CQLClient) { +func dropAllTablesTypes(client CQLClient) { tables, err := client.ListTables() if err != nil { return } - fmt.Printf("Dropping following tables: %v\n", tables) + log.Printf("Dropping following tables: %v\n", tables) for _, table := range tables { err1 := client.DropTable(table) if err1 != nil { - fmt.Printf("Error dropping table %v, err=%v\n", table, err1) + log.Printf("Error dropping table %v, err=%v\n", table, err1) } } types, err := client.ListTypes() if err != nil { return } - fmt.Printf("Dropping following types: %v\n", types) + log.Printf("Dropping following types: %v\n", types) for _, t := range types { err1 := client.DropType(t) if err1 != nil { - fmt.Printf("Error dropping type %v, err=%v\n", t, err1) + log.Printf("Error dropping type %v, err=%v\n", t, err1) } } } diff --git a/tools/cassandra/cqlclient_test.go b/tools/cassandra/cqlclient_test.go index 02182d8ed9f..5c087ae2404 100644 --- a/tools/cassandra/cqlclient_test.go +++ b/tools/cassandra/cqlclient_test.go @@ -74,6 +74,7 @@ func (s *CQLClientTestSuite) SetupSuite() { func (s *CQLClientTestSuite) TearDownSuite() { s.client.Exec("DROP keyspace " + s.keyspace) + s.client.Close() } func (s *CQLClientTestSuite) TestParseCQLFile() { @@ -93,19 +94,19 @@ func (s *CQLClientTestSuite) TestParseCQLFile() { func (s *CQLClientTestSuite) testUpdate(client CQLClient) { // Update / Read schema version test - err := client.UpdateSchemaVersion(10, 5) + err := client.UpdateSchemaVersion("10.0", "5.0") s.Nil(err) - err = client.WriteSchemaUpdateLog(9, 10, "abc", "test") + err = client.WriteSchemaUpdateLog("9.0", "10.0", "abc", "test") s.Nil(err) ver, err := client.ReadSchemaVersion() s.Nil(err) - s.Equal(10, int(ver)) + s.Equal("10.0", ver) - err = client.UpdateSchemaVersion(12, 5) + err = client.UpdateSchemaVersion("12.0", "5.0") ver, err = client.ReadSchemaVersion() s.Nil(err) - s.Equal(12, int(ver)) + s.Equal("12.0", ver) } func (s *CQLClientTestSuite) testDrop(client CQLClient) { diff --git a/tools/cassandra/handler.go b/tools/cassandra/handler.go index f476ef51332..d6c2e9d5c43 100644 --- a/tools/cassandra/handler.go +++ b/tools/cassandra/handler.go @@ -23,6 +23,7 @@ package cassandra import ( "fmt" "github.com/urfave/cli" + "log" ) // setupSchema executes the setupSchemaTask @@ -31,10 +32,40 @@ import ( func setupSchema(cli *cli.Context) error { config, err := newSetupSchemaConfig(cli) if err != nil { - return newConfigError(err.Error()) + err = newConfigError(err.Error()) + log.Println(err) + return err } if err := handleSetupSchema(config); err != nil { - fmt.Println(err) + log.Println(err) + return err + } + return nil +} + +// updateSchema executes the updateSchemaTask +// using the given command lien args as input +func updateSchema(cli *cli.Context) error { + config, err := newUpdateSchemaConfig(cli) + if err != nil { + err = newConfigError(err.Error()) + log.Println(err) + return err + } + if err := handleUpdateSchema(config); err != nil { + log.Println(err) + return err + } + return nil +} + +func handleUpdateSchema(config *UpdateSchemaConfig) error { + task, err := NewUpdateSchemaTask(config) + if err != nil { + return fmt.Errorf("Error creating task, err=%v\n", err) + } + if err := task.run(); err != nil { + return fmt.Errorf("Error setting up schema, err=%v\n", err) } return nil } @@ -52,17 +83,25 @@ func handleSetupSchema(config *SetupSchemaConfig) error { func validateSetupSchemaConfig(config *SetupSchemaConfig) error { if len(config.CassHosts) == 0 { - return newConfigError("missing cassandra host") + return newConfigError("missing cassandra endpoint argument " + flag(cliOptEndpoint)) } if len(config.CassKeyspace) == 0 { - return newConfigError("missing keyspace") + return newConfigError("missing " + flag(cliOptKeyspace) + " argument ") + } + if len(config.SchemaFilePath) == 0 && config.DisableVersioning { + return newConfigError("missing schemaFilePath " + flag(cliOptSchemaFile)) } - if len(config.SchemaFilePath) == 0 { - return newConfigError("missing schemaFilePath") + if (config.DisableVersioning && len(config.InitialVersion) > 0) || + (!config.DisableVersioning && len(config.InitialVersion) == 0) { + return newConfigError("either " + flag(cliOptDisableVersioning) + " or " + + flag(cliOptVersion) + " but not both must be specified") } - if (config.DisableVersioning && config.InitialVersion > 0) || - (!config.DisableVersioning && config.InitialVersion == 0) { - return newConfigError("either disableVersioning or initialVersion must be specified") + if !config.DisableVersioning { + ver, err := parseValidateVersion(config.InitialVersion) + if err != nil { + return newConfigError("invalid " + flag(cliOptVersion) + " argument:" + err.Error()) + } + config.InitialVersion = ver } return nil } @@ -73,25 +112,54 @@ func newSetupSchemaConfig(cli *cli.Context) (*SetupSchemaConfig, error) { config.CassHosts = cli.GlobalString(cliOptEndpoint) config.CassKeyspace = cli.GlobalString(cliOptKeyspace) config.SchemaFilePath = cli.String(cliOptSchemaFile) - config.InitialVersion = cli.Int(cliOptVersion) + config.InitialVersion = cli.String(cliOptVersion) config.DisableVersioning = cli.Bool(cliOptDisableVersioning) config.Overwrite = cli.Bool(cliOptOverwrite) + if err := validateSetupSchemaConfig(config); err != nil { + return nil, err + } + + return config, nil +} + +func validateUpdateSchemaConfig(config *UpdateSchemaConfig) error { + if len(config.CassHosts) == 0 { - return nil, fmt.Errorf("'%v' flag cannot be empty\n", cliOptEndpoint) + return newConfigError("missing cassandra endpoint argument " + flag(cliOptEndpoint)) } if len(config.CassKeyspace) == 0 { - return nil, fmt.Errorf("'%v' flag cannot be empty\n", cliOptKeyspace) + return newConfigError("missing " + flag(cliOptKeyspace) + " argument ") } - if len(config.SchemaFilePath) == 0 { - return nil, fmt.Errorf("'%v' flag cannot be empty\n", cliOptSchemaFile) + if len(config.SchemaDir) == 0 { + return newConfigError("missing " + flag(cliOptSchemaDir) + " argument ") } - if config.DisableVersioning && config.InitialVersion > 0 { - return nil, fmt.Errorf("either specify '%v' or '%v', but not both", cliOptDisableVersioning, cliOptVersion) + if len(config.TargetVersion) > 0 { + ver, err := parseValidateVersion(config.TargetVersion) + if err != nil { + return newConfigError("invalid " + flag(cliOptTargetVersion) + " argument:" + err.Error()) + } + config.TargetVersion = ver } - if !config.DisableVersioning && config.InitialVersion == 0 { - return nil, fmt.Errorf("must specify a value for either '%v' or '%v'\n", cliOptDisableVersioning, cliOptVersion) + return nil +} + +func newUpdateSchemaConfig(cli *cli.Context) (*UpdateSchemaConfig, error) { + + config := new(UpdateSchemaConfig) + config.CassHosts = cli.GlobalString(cliOptEndpoint) + config.CassKeyspace = cli.GlobalString(cliOptKeyspace) + config.SchemaDir = cli.String(cliOptSchemaDir) + config.IsDryRun = cli.Bool(cliOptDryrun) + config.TargetVersion = cli.String(cliOptTargetVersion) + + if err := validateUpdateSchemaConfig(config); err != nil { + return nil, err } return config, nil } + +func flag(opt string) string { + return "(-" + opt + ")" +} diff --git a/tools/cassandra/handler_test.go b/tools/cassandra/handler_test.go new file mode 100644 index 00000000000..3e3487aebfe --- /dev/null +++ b/tools/cassandra/handler_test.go @@ -0,0 +1,117 @@ +package cassandra + +import ( + "github.com/stretchr/testify/require" + "github.com/stretchr/testify/suite" + "testing" +) + +type ( + HandlerTestSuite struct { + *require.Assertions // override suite.Suite.Assertions with require.Assertions; this means that s.NotNil(nil) will stop the test, not merely log an error + suite.Suite + } +) + +func TestHandlerTestSuite(t *testing.T) { + suite.Run(t, new(HandlerTestSuite)) +} + +func (s *HandlerTestSuite) SetupTest() { + s.Assertions = require.New(s.T()) // Have to define our overridden assertions in the test setup. If we did it earlier, s.T() will return nil +} + +func (s *HandlerTestSuite) TestValidateSetupSchemaConfig() { + + config := new(SetupSchemaConfig) + s.assertValidateSetupFails(config) + + config.CassHosts = "127.0.0.1" + s.assertValidateSetupFails(config) + + config.CassKeyspace = "test-keyspace" + s.assertValidateSetupFails(config) + + config.InitialVersion = "0.1" + config.DisableVersioning = true + config.SchemaFilePath = "" + s.assertValidateSetupFails(config) + + config.InitialVersion = "0.1" + config.DisableVersioning = true + config.SchemaFilePath = "/tmp/foo.cql" + s.assertValidateSetupFails(config) + + config.InitialVersion = "" + config.DisableVersioning = true + config.SchemaFilePath = "" + s.assertValidateSetupFails(config) + + config.InitialVersion = "0.1" + config.DisableVersioning = false + config.SchemaFilePath = "/tmp/foo.cql" + s.assertValidateSetupSucceeds(config) + + config.InitialVersion = "0.1" + config.DisableVersioning = false + config.SchemaFilePath = "" + s.assertValidateSetupSucceeds(config) + + config.InitialVersion = "" + config.DisableVersioning = true + config.SchemaFilePath = "/tmp/foo.cql" + s.assertValidateSetupSucceeds(config) +} + +func (s *HandlerTestSuite) TestValidateUpdateSchemaConfig() { + + config := new(UpdateSchemaConfig) + s.assertValidateUpdateFails(config) + + config.CassHosts = "127.0.0.1" + s.assertValidateUpdateFails(config) + + config.CassKeyspace = "test-keyspace" + s.assertValidateUpdateFails(config) + + config.SchemaDir = "/tmp" + config.TargetVersion = "abc" + s.assertValidateUpdateFails(config) + + config.SchemaDir = "/tmp" + config.TargetVersion = "" + s.assertValidateUpdateSucceeds(config) + + config.SchemaDir = "/tmp" + config.TargetVersion = "1.2" + s.assertValidateUpdateSucceeds(config) + + config.SchemaDir = "/tmp" + config.TargetVersion = "v1.2" + s.assertValidateUpdateSucceeds(config) + s.Equal("1.2", config.TargetVersion) +} + +func (s *HandlerTestSuite) assertValidateSetupSucceeds(input *SetupSchemaConfig) { + err := validateSetupSchemaConfig(input) + s.Nil(err) +} + +func (s *HandlerTestSuite) assertValidateSetupFails(input *SetupSchemaConfig) { + err := validateSetupSchemaConfig(input) + s.NotNil(err) + _, ok := err.(*ConfigError) + s.True(ok) +} + +func (s *HandlerTestSuite) assertValidateUpdateSucceeds(input *UpdateSchemaConfig) { + err := validateUpdateSchemaConfig(input) + s.Nil(err) +} + +func (s *HandlerTestSuite) assertValidateUpdateFails(input *UpdateSchemaConfig) { + err := validateUpdateSchemaConfig(input) + s.NotNil(err) + _, ok := err.(*ConfigError) + s.True(ok) +} diff --git a/tools/cassandra/main.go b/tools/cassandra/main.go index b8e319ec47f..37e4aa7c6cb 100644 --- a/tools/cassandra/main.go +++ b/tools/cassandra/main.go @@ -66,13 +66,13 @@ func buildCLIOptions() *cli.App { Aliases: []string{"setup"}, Usage: "setup initial version of cassandra schema", Flags: []cli.Flag{ - cli.IntFlag{ + cli.StringFlag{ Name: cliFlagVersion, Usage: "initial version of the schema, cannot be used with disable-versioning", }, cli.StringFlag{ Name: cliFlagSchemaFile, - Usage: "path to the .cql schema file", + Usage: "path to the .cql schema file; if un-specified, will just setup versioning tables", }, cli.BoolFlag{ Name: cliFlagDisableVersioning, @@ -87,6 +87,28 @@ func buildCLIOptions() *cli.App { setupSchema(c) }, }, + { + Name: "update-schema", + Aliases: []string{"update"}, + Usage: "update cassandra schema to a specific version", + Flags: []cli.Flag{ + cli.StringFlag{ + Name: cliFlagTargetVersion, + Usage: "target version for the schema update, defaults to latest", + }, + cli.StringFlag{ + Name: cliFlagSchemaDir, + Usage: "path to directory containing versioned schema", + }, + cli.BoolFlag{ + Name: cliFlagDryrun, + Usage: "do a dryrun", + }, + }, + Action: func(c *cli.Context) { + updateSchema(c) + }, + }, } return app diff --git a/tools/cassandra/setupTask.go b/tools/cassandra/setupTask.go index 4c8f14bd3cc..7ce499e8ac2 100644 --- a/tools/cassandra/setupTask.go +++ b/tools/cassandra/setupTask.go @@ -21,8 +21,7 @@ package cassandra import ( - "fmt" - "regexp" + "log" ) // SetupSchemaTask represents a task @@ -49,49 +48,53 @@ func (task *SetupSchemaTask) run() error { config := task.config - fmt.Printf("Starting schema setup, config=%+v\n", config) + defer func() { + task.client.Close() + }() + + log.Printf("Starting schema setup, config=%+v\n", config) if config.Overwrite { - dropKeyspace(task.client) + dropAllTablesTypes(task.client) } if !config.DisableVersioning { - fmt.Printf("Setting up version tables\n") + log.Printf("Setting up version tables\n") if err := task.client.CreateSchemaVersionTables(); err != nil { return err } } - stmts, err := ParseCQLFile(config.SchemaFilePath) - if err != nil { - return err - } - - re := regexp.MustCompile("\\s+") - - fmt.Println("----- Creating types and tables -----") - for _, stmt := range stmts { - fmt.Println(re.ReplaceAllString(stmt, " ")) - if err := task.client.Exec(stmt); err != nil { + if len(config.SchemaFilePath) > 0 { + stmts, err := ParseCQLFile(config.SchemaFilePath) + if err != nil { return err } + + log.Println("----- Creating types and tables -----") + for _, stmt := range stmts { + log.Println(rmspaceRegex.ReplaceAllString(stmt, " ")) + if err := task.client.Exec(stmt); err != nil { + return err + } + } + log.Println("----- Done -----") } - fmt.Println("----- Done -----") if !config.DisableVersioning { - fmt.Printf("Setting initial schema version to %v\n", config.InitialVersion) - err := task.client.UpdateSchemaVersion(int64(config.InitialVersion), int64(config.InitialVersion)) + log.Printf("Setting initial schema version to %v\n", config.InitialVersion) + err := task.client.UpdateSchemaVersion(config.InitialVersion, config.InitialVersion) if err != nil { return err } - fmt.Printf("Updating schema update log\n") - err = task.client.WriteSchemaUpdateLog(int64(0), int64(config.InitialVersion), "", "initial version") + log.Printf("Updating schema update log\n") + err = task.client.WriteSchemaUpdateLog("0", config.InitialVersion, "", "initial version") if err != nil { return err } } - fmt.Println("Schema setup complete") + log.Println("Schema setup complete") return nil } diff --git a/tools/cassandra/setupTask_test.go b/tools/cassandra/setupTask_test.go index 27b523c4c79..683f9fc1d72 100644 --- a/tools/cassandra/setupTask_test.go +++ b/tools/cassandra/setupTask_test.go @@ -55,7 +55,8 @@ func (s *SetupSchemaTestSuite) SetupSuite() { } func (s *SetupSchemaTestSuite) TearDownSuite() { - s.client.Exec("DROP keyspace " + s.keyspace) + s.client.DropKeyspace(s.keyspace) + s.client.Close() } func (s *SetupSchemaTestSuite) TestSetupSchema() { @@ -87,12 +88,12 @@ func (s *SetupSchemaTestSuite) TestSetupSchema() { for i := 0; i < 4; i++ { - ver := int(s.rand.Int31()) + ver := strconv.Itoa(int(s.rand.Int31())) versioningEnabled := (i%2 == 0) // test overwrite with versioning works if versioningEnabled { - RunTool([]string{"./tool", "-k", s.keyspace, "setup-schema", "-f", cqlFile.Name(), "-version", strconv.Itoa(ver), "-o"}) + RunTool([]string{"./tool", "-k", s.keyspace, "setup-schema", "-f", cqlFile.Name(), "-version", ver, "-o"}) } else { RunTool([]string{"./tool", "-k", s.keyspace, "setup-schema", "-f", cqlFile.Name(), "-d", "-o"}) } @@ -112,7 +113,7 @@ func (s *SetupSchemaTestSuite) TestSetupSchema() { gotVer, err := client.ReadSchemaVersion() if versioningEnabled { s.Nil(err) - s.Equal(ver, int(gotVer)) + s.Equal(ver, gotVer) } else { s.NotNil(err) } diff --git a/tools/cassandra/updateTask.go b/tools/cassandra/updateTask.go new file mode 100644 index 00000000000..d930a8c1eeb --- /dev/null +++ b/tools/cassandra/updateTask.go @@ -0,0 +1,405 @@ +// Copyright (c) 2017 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package cassandra + +import ( + "crypto/md5" + "encoding/hex" + "encoding/json" + "fmt" + "io/ioutil" + "log" + "sort" + "strings" +) + +type ( + // UpdateSchemaTask represents a task + // that executes a cassandra schema upgrade + UpdateSchemaTask struct { + client CQLClient + config *UpdateSchemaConfig + } + + // manifest is a value type that represents + // the deserialized manifest.json file within + // a schema version directory + manifest struct { + CurrVersion string + MinCompatibleVersion string + Description string + SchemaUpdateCqlFiles []string + md5 string + } + + // changeSet represents all the changes + // corresponding to a single schema version + changeSet struct { + version string + manifest *manifest + cqlStmts []string + } + + // byVersion is a comparator type + // for sorting a set of version + // strings + byVersion []string +) + +const ( + dryrunKeyspace = "dryrun_" + systemKeyspace = "system" + manifestFileName = "manifest.json" +) + +var ( + whitelistedCQLPrefixes = [2]string{"CREATE", "ALTER"} +) + +// NewUpdateSchemaTask returns a new instance of UpdateSchemaTask +func NewUpdateSchemaTask(config *UpdateSchemaConfig) (*UpdateSchemaTask, error) { + + keyspace := config.CassKeyspace + if config.IsDryRun { + keyspace = dryrunKeyspace + err := setupDryrunKeyspace(config) + if err != nil { + return nil, fmt.Errorf("error creating dryrun keyspace:%v", err.Error()) + } + } + + client, err := newCQLClient(config.CassHosts, keyspace) + if err != nil { + return nil, err + } + + return &UpdateSchemaTask{ + client: client, + config: config, + }, nil +} + +// run executes the task +func (task *UpdateSchemaTask) run() error { + + config := task.config + + defer func() { + if config.IsDryRun { + task.client.DropKeyspace(dryrunKeyspace) + } + task.client.Close() + }() + + log.Printf("UpdateSchemeTask started, config=%+v\n", config) + + currVer, err := task.client.ReadSchemaVersion() + if err != nil { + return fmt.Errorf("error reading current schema version:%v", err.Error()) + } + + updates, err := task.buildChangeSet(currVer) + if err != nil { + return err + } + + err = task.executeUpdates(currVer, updates) + if err != nil { + return err + } + + log.Printf("UpdateSchemeTask done\n") + + return nil +} + +func (task *UpdateSchemaTask) executeUpdates(currVer string, updates []changeSet) error { + + for _, cs := range updates { + + err := task.execCQLStmts(cs.version, cs.cqlStmts) + if err != nil { + return err + } + err = task.updateSchemaVersion(currVer, &cs) + if err != nil { + return err + } + + log.Printf("Schema updated from %v to %v\n", currVer, cs.version) + currVer = cs.version + } + + return nil +} + +func (task *UpdateSchemaTask) execCQLStmts(ver string, stmts []string) error { + log.Printf("---- Executing updates for version %v ----\n", ver) + for _, stmt := range stmts { + log.Println(rmspaceRegex.ReplaceAllString(stmt, " ")) + e := task.client.Exec(stmt) + if e != nil { + return fmt.Errorf("error executing CQL statement:%v", e) + } + } + log.Printf("---- Done ----\n") + return nil +} + +func (task *UpdateSchemaTask) updateSchemaVersion(oldVer string, cs *changeSet) error { + + err := task.client.UpdateSchemaVersion(cs.version, cs.manifest.MinCompatibleVersion) + if err != nil { + return fmt.Errorf("failed to update schema_version table, err=%v", err.Error()) + } + + err = task.client.WriteSchemaUpdateLog(oldVer, cs.manifest.CurrVersion, cs.manifest.md5, cs.manifest.Description) + if err != nil { + return fmt.Errorf("failed to add entry to schema_update_history, err=%v\n", err.Error()) + } + + return nil +} + +func (task *UpdateSchemaTask) buildChangeSet(currVer string) ([]changeSet, error) { + + config := task.config + + verDirs, err := readSchemaDir(config.SchemaDir, currVer, config.TargetVersion) + if err != nil { + return nil, fmt.Errorf("error listing schema dir:%v", err.Error()) + } + if len(verDirs) == 0 { + return nil, fmt.Errorf("no schema dirs in version range [%v-%v]", currVer, config.TargetVersion) + } + + var result []changeSet + + for _, vd := range verDirs { + + dirPath := config.SchemaDir + "/" + vd + + m, e := readManifest(dirPath) + if e != nil { + return nil, fmt.Errorf("error processing manifest for version %v:%v", vd, e.Error()) + } + + if m.CurrVersion != dirToVersion(vd) { + return nil, fmt.Errorf("manifest version doesn't match with dirname, dir=%v,manifest.version=%v", + vd, m.CurrVersion) + } + + stmts, e := parseCQLStmts(dirPath, m) + if e != nil { + return nil, e + } + + e = validateCQLStmts(stmts) + if e != nil { + return nil, fmt.Errorf("error processing version %v:%v", vd, e.Error()) + } + + cs := changeSet{} + cs.manifest = m + cs.cqlStmts = stmts + cs.version = m.CurrVersion + result = append(result, cs) + } + + return result, nil +} + +func parseCQLStmts(dir string, manifest *manifest) ([]string, error) { + + result := make([]string, 0, 4) + + for _, file := range manifest.SchemaUpdateCqlFiles { + path := dir + "/" + file + stmts, err := ParseCQLFile(path) + if err != nil { + return nil, fmt.Errorf("error parsing file %v, err=%v", path, err) + } + result = append(result, stmts...) + } + + if len(result) == 0 { + return nil, fmt.Errorf("found 0 updates in dir %v", dir) + } + + return result, nil +} + +func validateCQLStmts(stmts []string) error { + for _, stmt := range stmts { + valid := false + for _, prefix := range whitelistedCQLPrefixes { + if strings.HasPrefix(stmt, prefix) { + valid = true + break + } + } + if !valid { + return fmt.Errorf("CQL prefix not in whitelist, stmt=%v", stmt) + } + } + return nil +} + +func readManifest(dirPath string) (*manifest, error) { + + filePath := dirPath + "/" + manifestFileName + jsonStr, err := ioutil.ReadFile(filePath) + if err != nil { + return nil, err + } + + jsonBlob := []byte(jsonStr) + + var manifest manifest + err = json.Unmarshal(jsonBlob, &manifest) + if err != nil { + return nil, err + } + + currVer, err := parseValidateVersion(manifest.CurrVersion) + if err != nil { + return nil, fmt.Errorf("invalid CurrVersion in manifest") + } + manifest.CurrVersion = currVer + + minVer, err := parseValidateVersion(manifest.MinCompatibleVersion) + if len(manifest.MinCompatibleVersion) == 0 { + return nil, fmt.Errorf("invalid MinCompatibleVersion in manifest") + } + manifest.MinCompatibleVersion = minVer + + if len(manifest.SchemaUpdateCqlFiles) == 0 { + return nil, fmt.Errorf("manifest missing SchemaUpdateCqlFiles") + } + + md5Bytes := md5.Sum(jsonBlob) + manifest.md5 = hex.EncodeToString(md5Bytes[:]) + + return &manifest, nil +} + +// readSchemaDir returns a sorted list of subdir names that hold +// the schema changes for versions in the range [startVer - endVer] +// this method has an assumption that the subdirs containing the +// schema changes will be of the form vx.x, where x.x is the version +func readSchemaDir(dir string, startVer string, endVer string) ([]string, error) { + + subdirs, err := ioutil.ReadDir(dir) + if err != nil { + return nil, err + } + + var endFound bool + var result []string + + hasEndVer := len(endVer) > 0 + + for _, dir := range subdirs { + + if !dir.IsDir() { + continue + } + + dirname := dir.Name() + + if !versionStrRegex.MatchString(dirname) { + continue + } + + ver := dirToVersion(dirname) + + highcmp := 0 + lowcmp := cmpVersion(ver, startVer) + if hasEndVer { + highcmp = cmpVersion(ver, endVer) + } + + if lowcmp <= 0 || highcmp > 0 { + continue // out of range + } + + endFound = endFound || (highcmp == 0) + result = append(result, dirname) + } + + if !endFound { + return nil, fmt.Errorf("version dir not found for target version %v", endVer) + } + + sort.Sort(byVersion(result)) + + return result, nil +} + +// sets up a temporary dryrun keyspace for +// executing the cassandra schema update +func setupDryrunKeyspace(config *UpdateSchemaConfig) error { + + client, err := newCQLClient(config.CassHosts, systemKeyspace) + if err != nil { + return err + } + defer client.Close() + + err = client.CreateKeyspace(dryrunKeyspace, 1) + if err != nil { + return err + } + + setupConfig := &SetupSchemaConfig{ + BaseConfig: BaseConfig{ + CassHosts: config.CassHosts, + CassKeyspace: dryrunKeyspace, + }, + Overwrite: true, + InitialVersion: "0.0", + } + + setupTask, err := newSetupSchemaTask(setupConfig) + if err != nil { + return err + } + + return setupTask.run() +} + +func dirToVersion(dir string) string { + return dir[1:] +} + +func (v byVersion) Len() int { + return len(v) +} + +func (v byVersion) Less(i, j int) bool { + v1 := dirToVersion(v[i]) + v2 := dirToVersion(v[j]) + return cmpVersion(v1, v2) < 0 +} + +func (v byVersion) Swap(i, j int) { + v[i], v[j] = v[j], v[i] +} diff --git a/tools/cassandra/updateTask_test.go b/tools/cassandra/updateTask_test.go new file mode 100644 index 00000000000..c13f891ed56 --- /dev/null +++ b/tools/cassandra/updateTask_test.go @@ -0,0 +1,273 @@ +// Copyright (c) 2017 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package cassandra + +import ( + "fmt" + log "github.com/Sirupsen/logrus" + "github.com/gocql/gocql" + "github.com/stretchr/testify/require" + "github.com/stretchr/testify/suite" + "github.com/uber-common/bark" + "io/ioutil" + "math/rand" + "os" + "testing" + "time" +) + +type ( + UpdateSchemaTestSuite struct { + *require.Assertions // override suite.Suite.Assertions with require.Assertions; this means that s.NotNil(nil) will stop the test, not merely log an error + suite.Suite + rand *rand.Rand + keyspace string + session *gocql.Session + client CQLClient + log bark.Logger + } +) + +func TestUpdateSchemaTestSuite(t *testing.T) { + suite.Run(t, new(UpdateSchemaTestSuite)) +} + +func (s *UpdateSchemaTestSuite) SetupTest() { + s.Assertions = require.New(s.T()) // Have to define our overridden assertions in the test setup. If we did it earlier, s.T() will return nil +} + +func (s *UpdateSchemaTestSuite) SetupSuite() { + + s.log = bark.NewLoggerFromLogrus(log.New()) + s.rand = rand.New(rand.NewSource(time.Now().UnixNano())) + s.keyspace = fmt.Sprintf("update_schema_test_%v", s.rand.Int63()) + + client, err := newCQLClient("127.0.0.1", "system") + if err != nil { + s.log.Fatal("Error creating CQLClient") + } + + err = client.CreateKeyspace(s.keyspace, 1) + if err != nil { + log.Fatalf("error creating keyspace, err=%v", err) + } + + s.client = client +} + +func (s *UpdateSchemaTestSuite) TearDownSuite() { + s.client.DropKeyspace(s.keyspace) + s.client.Close() +} + +func (s *UpdateSchemaTestSuite) TestUpdateSchema() { + + client, err := newCQLClient("127.0.0.1", s.keyspace) + s.Nil(err) + defer client.Close() + + tmpDir, err := ioutil.TempDir("", "update_schema_test") + s.Nil(err) + defer os.RemoveAll(tmpDir) + + s.makeSchemaVersionDirs(tmpDir) + + RunTool([]string{"./tool", "-k", s.keyspace, "setup-schema", "-v", "0.0"}) + RunTool([]string{"./tool", "-k", s.keyspace, "update-schema", "-d", tmpDir, "-v", "2.0"}) + + expected := getExpectedTables(true) + expected["domains"] = struct{}{} + + ver, err := client.ReadSchemaVersion() + s.Nil(err) + s.Equal("2.0", ver) + + tables, err := client.ListTables() + s.Nil(err) + s.Equal(len(expected), len(tables)) + + for _, t := range tables { + _, ok := expected[t] + s.True(ok) + delete(expected, t) + } + + s.Equal(0, len(expected)) + + dropAllTablesTypes(client) +} + +func (s *UpdateSchemaTestSuite) TestDryrun() { + + client, err := newCQLClient("127.0.0.1", s.keyspace) + s.Nil(err) + defer client.Close() + + dir := "../../schema/cadence/versioned" + RunTool([]string{"./tool", "-k", s.keyspace, "setup-schema", "-v", "0.0"}) + RunTool([]string{"./tool", "-k", s.keyspace, "update-schema", "-d", dir}) + + ver, err := client.ReadSchemaVersion() + s.Nil(err) + // update the version to the latest + s.Equal(0, cmpVersion(ver, "0.1")) + + dropAllTablesTypes(client) +} + +func (s *UpdateSchemaTestSuite) makeSchemaVersionDirs(rootDir string) { + + mData := `{ + "CurrVersion": "1.0", + "MinCompatibleVersion": "1.0", + "Description": "base version of schema", + "SchemaUpdateCqlFiles": ["base.cql"] + }` + + dir := rootDir + "/v1.0" + os.Mkdir(rootDir+"/v1.0", os.FileMode(0700)) + err := ioutil.WriteFile(dir+"/manifest.json", []byte(mData), os.FileMode(0600)) + s.Nil(err) + err = ioutil.WriteFile(dir+"/base.cql", []byte(createTestCQLFileContent()), os.FileMode(0600)) + s.Nil(err) + + mData = `{ + "CurrVersion": "2.0", + "MinCompatibleVersion": "1.0", + "Description": "v2 of schema", + "SchemaUpdateCqlFiles": ["domain.cql"] + }` + + domain := `CREATE TABLE domains( + id uuid, + domain text, + config text, + PRIMARY KEY (id) + );` + + dir = rootDir + "/v2.0" + os.Mkdir(rootDir+"/v2.0", os.FileMode(0700)) + err = ioutil.WriteFile(dir+"/manifest.json", []byte(mData), os.FileMode(0600)) + s.Nil(err) + err = ioutil.WriteFile(dir+"/domain.cql", []byte(domain), os.FileMode(0600)) + s.Nil(err) +} + +func (s *UpdateSchemaTestSuite) TestReadManifest() { + + tmpDir, err := ioutil.TempDir("", "update_schema_test") + s.Nil(err) + defer os.RemoveAll(tmpDir) + + input := `{ + "CurrVersion": "0.4", + "MinCompatibleVersion": "0.1", + "Description": "base version of schema", + "SchemaUpdateCqlFiles": ["base1.cql", "base2.cql", "base3.cql"] + }` + files := []string{"base1.cql", "base2.cql", "base3.cql"} + s.runReadManifestTest(tmpDir, input, "0.4", "0.1", "base version of schema", files, false) + + errInputs := []string{ + `{ + "MinCompatibleVersion": "0.1", + "Description": "base", + "SchemaUpdateCqlFiles": ["base1.cql"] + }`, + `{ + "CurrVersion": "0.4", + "Description": "base version of schema", + "SchemaUpdateCqlFiles": ["base1.cql", "base2.cql", "base3.cql"] + }`, + `{ + "CurrVersion": "0.4", + "MinCompatibleVersion": "0.1", + "Description": "base version of schema", + }`, + `{ + "CurrVersion": "", + "MinCompatibleVersion": "0.1", + "Description": "base version of schema", + "SchemaUpdateCqlFiles": ["base1.cql", "base2.cql", "base3.cql"] + }`, + `{ + "CurrVersion": "0.4", + "MinCompatibleVersion": "", + "Description": "base version of schema", + "SchemaUpdateCqlFiles": ["base1.cql", "base2.cql", "base3.cql"] + }`, + `{ + "CurrVersion": "", + "MinCompatibleVersion": "0.1", + "Description": "base version of schema", + "SchemaUpdateCqlFiles": [] + }`, + } + + for _, in := range errInputs { + s.runReadManifestTest(tmpDir, in, "", "", "", nil, true) + } +} + +func (s *UpdateSchemaTestSuite) TestReadSchemaDir() { + + tmpDir, err := ioutil.TempDir("", "update_schema_test") + s.Nil(err) + defer os.RemoveAll(tmpDir) + + subDirs := []string{"v0.5", "v1.5", "v2.5", "v3.5", "v10.2", "abc", "2.0", "3.0"} + for _, d := range subDirs { + os.Mkdir(tmpDir+"/"+d, os.FileMode(0444)) + } + + _, err = readSchemaDir(tmpDir, "11.0", "11.2") + s.NotNil(err) + _, err = readSchemaDir(tmpDir, "0.5", "10.3") + s.NotNil(err) + + ans, err := readSchemaDir(tmpDir, "0.4", "10.2") + s.Nil(err) + s.Equal([]string{"v0.5", "v1.5", "v2.5", "v3.5", "v10.2"}, ans) + + ans, err = readSchemaDir(tmpDir, "0.5", "3.5") + s.Nil(err) + s.Equal([]string{"v1.5", "v2.5", "v3.5"}, ans) +} + +func (s *UpdateSchemaTestSuite) runReadManifestTest(dir, input, currVer, minVer, desc string, + files []string, isErr bool) { + + file := dir + "/manifest.json" + err := ioutil.WriteFile(file, []byte(input), os.FileMode(0644)) + s.Nil(err) + + m, err := readManifest(dir) + if isErr { + s.NotNil(err) + return + } + s.Nil(err) + s.Equal(currVer, m.CurrVersion) + s.Equal(minVer, m.MinCompatibleVersion) + s.Equal(desc, m.Description) + s.True(len(m.md5) > 0) + s.Equal(files, m.SchemaUpdateCqlFiles) +} diff --git a/tools/cassandra/version.go b/tools/cassandra/version.go new file mode 100644 index 00000000000..5260df8e9c5 --- /dev/null +++ b/tools/cassandra/version.go @@ -0,0 +1,96 @@ +// Copyright (c) 2017 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package cassandra + +import ( + "fmt" + "regexp" + "strconv" + "strings" +) + +// represents names of the form vx.x where x.x is a (major, minor) version pair +var versionStrRegex = regexp.MustCompile("^v\\d+(\\.\\d)?$") + +// represents names of the form x.x where minor version is always single digit +var versionNumRegex = regexp.MustCompile("^\\d+(\\.\\d)?$") + +// cmpVersion compares two version strings +// returns 0 if a == b +// returns < 0 if a < b +// returns > 0 if a > b +func cmpVersion(a, b string) int { + + aMajor, aMinor, _ := parseVersion(a) + bMajor, bMinor, _ := parseVersion(b) + + if aMajor != bMajor { + return aMajor - bMajor + } + + return aMinor - bMinor +} + +// parseVersion parses a version string and +// returns the major, minor version pair +func parseVersion(ver string) (major int, minor int, err error) { + + if len(ver) == 0 { + return + } + + vals := strings.Split(ver, ".") + if len(vals) == 0 { // Split returns slice of size=1 on empty string + return major, minor, nil + } + + if len(vals) > 0 { + major, err = strconv.Atoi(vals[0]) + if err != nil { + return + } + } + + if len(vals) > 1 { + minor, err = strconv.Atoi(vals[1]) + if err != nil { + return + } + } + + return +} + +// parseValidteVersion validates that the given +// input conforms to either of vx.x or x.x and +// returns x.x on success +func parseValidateVersion(ver string) (string, error) { + if len(ver) == 0 { + return "", fmt.Errorf("version is empty") + } + if versionStrRegex.MatchString(ver) { + return ver[1:], nil + } + if !versionNumRegex.MatchString(ver) { + return "", fmt.Errorf("invalid version, expected format is x.x") + } + return ver, nil +} diff --git a/tools/cassandra/version_test.go b/tools/cassandra/version_test.go new file mode 100644 index 00000000000..c2803b823d2 --- /dev/null +++ b/tools/cassandra/version_test.go @@ -0,0 +1,119 @@ +// Copyright (c) 2017 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package cassandra + +import ( + "github.com/stretchr/testify/require" + "github.com/stretchr/testify/suite" + "testing" +) + +type ( + VersionTestSuite struct { + *require.Assertions // override suite.Suite.Assertions with require.Assertions; this means that s.NotNil(nil) will stop the test, not merely log an error + suite.Suite + } +) + +func TestVersionTestSuite(t *testing.T) { + suite.Run(t, new(VersionTestSuite)) +} + +func (s *VersionTestSuite) SetupTest() { + s.Assertions = require.New(s.T()) // Have to define our overridden assertions in the test setup. If we did it earlier, s.T() will return nil +} + +func (s *VersionTestSuite) TestParseVersion() { + s.execParseTest("", 0, 0, false) + s.execParseTest("0", 0, 0, false) + s.execParseTest("99", 99, 0, false) + s.execParseTest("0.0", 0, 0, false) + s.execParseTest("0.9", 0, 9, false) + s.execParseTest("1.0", 1, 0, false) + s.execParseTest("9999.0", 9999, 0, false) + s.execParseTest("999.999", 999, 999, false) + s.execParseTest("88.88.88", 88, 88, false) + s.execParseTest("a.b", 0, 0, true) + s.execParseTest("1.5a", 0, 0, true) + s.execParseTest("5.b", 0, 0, true) + s.execParseTest("golang", 0, 0, true) +} + +func (s *VersionTestSuite) TestCmpVersion() { + + s.Equal(0, cmpVersion("0", "0")) + s.Equal(0, cmpVersion("999", "999")) + s.Equal(0, cmpVersion("0.0", "0.0")) + s.Equal(0, cmpVersion("0.999", "0.999")) + s.Equal(0, cmpVersion("99.888", "99.888")) + + s.True(cmpVersion("0.1", "0") > 0) + s.True(cmpVersion("0.5", "0.1") > 0) + s.True(cmpVersion("1.1", "0.1") > 0) + s.True(cmpVersion("1.1", "0.9") > 0) + s.True(cmpVersion("1.1", "1.0") > 0) + + s.True(cmpVersion("0", "0.1") < 0) + s.True(cmpVersion("0.1", "0.5") < 0) + s.True(cmpVersion("0.1", "1.1") < 0) + s.True(cmpVersion("0.9", "1.1") < 0) + s.True(cmpVersion("1.0", "1.1") < 0) + + s.True(cmpVersion("0.1a", "0.5") < 0) + s.True(cmpVersion("0.1", "0.5a") > 0) + s.True(cmpVersion("ab", "cd") == 0) +} + +func (s *VersionTestSuite) TestParseValidateVersion() { + + inputs := []string{"0", "1000", "9999", "0.1", "0.9", "99.9", "100.8"} + for _, in := range inputs { + s.execParseValidateTest(in, in, false) + s.execParseValidateTest("v"+in, in, false) + } + + errInputs := []string{"1.2a", "ab", "0.88", "5.11"} + for _, in := range errInputs { + s.execParseValidateTest(in, "", true) + s.execParseValidateTest("v"+in, "", true) + } +} + +func (s *VersionTestSuite) execParseValidateTest(input string, output string, isErr bool) { + ver, err := parseValidateVersion(input) + if isErr { + s.NotNil(err) + return + } + s.Nil(err) + s.Equal(output, ver) +} + +func (s *VersionTestSuite) execParseTest(input string, expMajor int, expMinor int, isErr bool) { + maj, min, err := parseVersion(input) + if isErr { + s.NotNil(err) + return + } + s.Nil(err) + s.Equal(expMajor, maj) + s.Equal(expMinor, min) +}