Skip to content

Commit

Permalink
tooling: support for cassandra update-schema (temporalio#177)
Browse files Browse the repository at this point in the history
  • Loading branch information
venkat1109 authored May 15, 2017
1 parent 437a8a9 commit ef260bf
Show file tree
Hide file tree
Showing 21 changed files with 1,530 additions and 110 deletions.
4 changes: 1 addition & 3 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,4 @@ test
test.log

# Executables produced by cadence repo
cadence
cadence-cassandra-tool

/cadence*
6 changes: 3 additions & 3 deletions common/persistence/persistenceTestBase.go
Original file line number Diff line number Diff line change
Expand Up @@ -719,7 +719,7 @@ func (s *CassandraTestCluster) setupTestCluster(keySpace string, dropKeySpace bo
}
s.createCluster(testWorkflowClusterHosts, testDatacenter, gocql.Consistency(1), keySpace)
s.createKeyspace(1, dropKeySpace)
s.loadSchema([]string{"workflow_test.cql", "visibility_test.cql"}, schemaDir)
s.loadSchema([]string{"schema.cql"}, schemaDir)
}

func (s *CassandraTestCluster) tearDownTestCluster() {
Expand Down Expand Up @@ -757,9 +757,9 @@ func (s *CassandraTestCluster) dropKeyspace() {
}

func (s *CassandraTestCluster) loadSchema(fileNames []string, schemaDir string) {
workflowSchemaDir := "./schema/"
workflowSchemaDir := "./schema/cadence"
if schemaDir != "" {
workflowSchemaDir = schemaDir + "/schema/"
workflowSchemaDir = schemaDir + "/schema/cadence"
}

err := common.LoadCassandraSchema(workflowSchemaDir, fileNames, s.keyspace)
Expand Down
46 changes: 46 additions & 0 deletions schema/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
What
----
This directory contains the cassandra schema for every keyspace that cadence owns. The directory structure is as follows

```
./schema
- keyspace1/
- keyspace2/
- keyspace.cql -- Contains the keyspace definition
- schema.cql -- Contains the latest & greatest snapshot of the schema for the keyspace
- versioned
- v0.1/
- v0.2/ -- One directory per schema version change
- v1.0/
- manifest.json -- json file describing the change
- changes.cql -- changes in this version, only [CREATE, ALTER] commands are allowed
```

How
---

Q: How do I update existing schema ?
* Add your changes to schema.cql
* Create a new schema version directory under ./schema/keyspace/versioned/vx.x
** Add a manifest.json
** Add your changes in a cql file
* Update the unit test within ./tools/cassandra/updateTask_test.go `TestDryrun` with your version x.x
* Once you are done with these use the ./cadence-cassandra-tool to update the schema

Q: What's the format of manifest.json

Example below:
* MinCompatibleVersion is the minimum schema version that your code can handle
* SchemaUpdateCqlFiles are list of .cql files containg your create/alter commands


```
{
"CurrVersion": "0.1",
"MinCompatibleVersion": "0.1",
"Description": "base version of schema",
"SchemaUpdateCqlFiles": [
"base.cql"
]
}
```
File renamed without changes.
210 changes: 210 additions & 0 deletions schema/cadence/schema.cql
Original file line number Diff line number Diff line change
@@ -0,0 +1,210 @@
CREATE TABLE shards (
shard_id int,
PRIMARY KEY (shard_id)
);

CREATE TYPE shard (
shard_id int,
owner text, -- Host identifier processing the shard
-- Range identifier used for generating ack ids for tasks within shard.
-- Also used for optimistic concurrency and all writes to a shard are conditional on this value.
range_id bigint,
-- This field keeps track of number of times owner for a shard changes before updating range_id or ack_levels
stolen_since_renew int,
updated_at timestamp,
transfer_ack_level bigint,
);

--- Workflow execution and mutable state ---
CREATE TYPE workflow_execution (
domain_id uuid,
workflow_id text,
run_id uuid,
task_list text,
workflow_type_name text,
decision_task_timeout int,
execution_context blob,
state int, -- enum WorkflowState {Created, Running, Completed}
close_status int, -- enum WorkflowCloseStatus {None, Completed, Failed, Canceled, Terminated, ContinuedAsNew, TimedOut}
next_event_id bigint,
last_processed_event bigint,
start_time timestamp,
last_updated_time timestamp,
create_request_id uuid,
decision_schedule_id bigint,
decision_started_id bigint,
decision_request_id text, -- Identifier used by matching engine for retrying history service calls for recording task is started
decision_timeout int,
);

-- TODO: Remove fields that are left over from activity and workflow tasks.
CREATE TYPE transfer_task (
domain_id uuid, -- The domain ID that this transfer task belongs to
workflow_id text, -- The workflow ID that this transfer task belongs to
run_id uuid, -- The run ID that this transfer task belongs to
task_id bigint,
target_domain_id uuid, -- The external domain ID that this transfer task is doing work for.
target_workflow_id text, -- The external workflow ID that this transfer task is doing work for.
target_run_id uuid, -- The external run ID that this transfer task is doing work for.
task_list text,
type int, -- enum TaskType {ActivityTask, DecisionTask, DeleteExecution, CancelExecution}
schedule_id bigint,
);

CREATE TYPE timer_task (
domain_id uuid,
workflow_id text,
run_id uuid,
task_id bigint,
type int, -- enum TaskType {DecisionTaskTimeout, ActivityTaskTimeout, UserTimer}
timeout_type int, -- enum TimeoutType in IDL {START_TO_CLOSE, SCHEDULE_TO_START, SCHEDULE_TO_CLOSE, HEARTBEAT}
event_id bigint, -- Corresponds to event ID in history that is responsible for this timer.
);

-- Workflow activity in progress mutable state
CREATE TYPE activity_info (
schedule_id bigint,
scheduled_event blob,
started_id bigint,
started_event blob,
activity_id text, -- Client generated unique ID for the activity.
request_id text, -- Identifier used by matching engine for retrying history service calls for recording task is started
details blob,
schedule_to_start_timeout int,
schedule_to_close_timeout int,
start_to_close_timeout int,
heart_beat_timeout int,
cancel_requested boolean, -- If a cancel request is made to cancel the activity in progress.
cancel_request_id bigint, -- Event ID that identifies the cancel request.
last_hb_updated_time timestamp, -- Last time the heartbeat is received.
);

-- User timer details
CREATE TYPE timer_info (
timer_id text, -- User defined timer ID
started_id bigint, -- The event ID corresponding to timer started.
expiry_time timestamp, -- Timestamp at which this timer expires or fires
task_id bigint, -- The task ID if we have one created for this timer
);

-- Activity or workflow task in a task list
CREATE TYPE task (
domain_id uuid,
workflow_id text,
run_id uuid,
schedule_id bigint,
);

CREATE TYPE task_list (
domain_id uuid,
name text,
type int, -- enum TaskRowType {ActivityTask, DecisionTask}
ack_level bigint, -- task_id of the last acknowledged message
);

CREATE TYPE domain (
id uuid,
name text,
status int, -- enum DomainStatus {Registered, Deprecated, Deleted}
description text,
owner_email text,
);

CREATE TYPE domain_config (
retention int,
emit_metric boolean
);

CREATE TABLE executions (
shard_id int,
type int, -- enum RowType { Shard, Execution, TransferTask, TimerTask}
domain_id uuid,
workflow_id text,
run_id uuid,
current_run_id uuid,
task_id bigint, -- unique identifier for transfer and timer tasks for an execution
shard frozen<shard>,
execution frozen<workflow_execution>,
transfer frozen<transfer_task>,
timer frozen<timer_task>,
next_event_id bigint, -- This is needed to make conditional updates on session history
range_id bigint static, -- Increasing sequence identifier for transfer queue, checkpointed into shard info
activity_map map<bigint, frozen<activity_info>>,
timer_map map<text, frozen<timer_info>>,
PRIMARY KEY (shard_id, type, domain_id, workflow_id, run_id, task_id)
);

CREATE TABLE events (
domain_id uuid,
workflow_id text,
run_id uuid,
-- We insert a batch of events with each append transaction.
-- This field stores the event id of first event in the batch.
first_event_id bigint,
range_id bigint,
tx_id bigint,
data blob, -- Batch of workflow execution history events as a blob
data_encoding text, -- Protocol used for history serialization
data_version int, -- history blob version
PRIMARY KEY ((domain_id, workflow_id, run_id), first_event_id)
);

-- Stores activity or workflow tasks
CREATE TABLE tasks (
domain_id uuid,
task_list_name text,
task_list_type int, -- enum TaskListType {ActivityTask, DecisionTask}
type int, -- enum rowType {Task, TaskList}
task_id bigint, -- unique identifier for tasks, monotonically increasing
range_id bigint static, -- Used to ensure that only one process can write to the table
task frozen<task>,
task_list frozen<task_list>,
PRIMARY KEY ((domain_id, task_list_name, task_list_type), type, task_id)
);

CREATE TABLE domains (
id uuid,
domain frozen<domain>,
config frozen<domain_config>,
PRIMARY KEY (id)
);

CREATE TABLE domains_by_name (
name text,
domain frozen<domain>,
config frozen<domain_config>,
PRIMARY KEY (name)
);

-- Visiblity schema goes below

CREATE TABLE open_executions (
domain_id uuid,
domain_partition int,
workflow_id text,
run_id uuid,
start_time timestamp,
workflow_type_name text,
PRIMARY KEY ((domain_id, domain_partition), start_time, run_id)
) WITH CLUSTERING ORDER BY (start_time DESC);


CREATE INDEX open_by_workflow_id ON open_executions (workflow_id);
CREATE INDEX open_by_type ON open_executions (workflow_type_name);

CREATE TABLE closed_executions (
domain_id uuid,
domain_partition int,
workflow_id text,
run_id uuid,
start_time timestamp,
close_time timestamp,
status int, -- enum WorkflowExecutionCloseStatus {COMPLETED, FAILED, CANCELED, TERMINATED, CONTINUED_AS_NEW, TIMED_OUT}
workflow_type_name text,
PRIMARY KEY ((domain_id, domain_partition), start_time, run_id)
) WITH CLUSTERING ORDER BY (start_time DESC);

CREATE INDEX closed_by_workflow_id ON closed_executions (workflow_id);
CREATE INDEX closed_by_close_time ON closed_executions (close_time);
CREATE INDEX closed_by_type ON closed_executions (workflow_type_name);
CREATE INDEX closed_by_status ON closed_executions (status);
Original file line number Diff line number Diff line change
Expand Up @@ -174,4 +174,35 @@ CREATE TABLE domains_by_name (
domain frozen<domain>,
config frozen<domain_config>,
PRIMARY KEY (name)
);
);

CREATE TABLE open_executions (
domain_id uuid,
domain_partition int,
workflow_id text,
run_id uuid,
start_time timestamp,
workflow_type_name text,
PRIMARY KEY ((domain_id, domain_partition), start_time, run_id)
) WITH CLUSTERING ORDER BY (start_time DESC);


CREATE INDEX open_by_workflow_id ON open_executions (workflow_id);
CREATE INDEX open_by_type ON open_executions (workflow_type_name);

CREATE TABLE closed_executions (
domain_id uuid,
domain_partition int,
workflow_id text,
run_id uuid,
start_time timestamp,
close_time timestamp,
status int, -- enum WorkflowExecutionCloseStatus {COMPLETED, FAILED, CANCELED, TERMINATED, CONTINUED_AS_NEW, TIMED_OUT}
workflow_type_name text,
PRIMARY KEY ((domain_id, domain_partition), start_time, run_id)
) WITH CLUSTERING ORDER BY (start_time DESC);

CREATE INDEX closed_by_workflow_id ON closed_executions (workflow_id);
CREATE INDEX closed_by_close_time ON closed_executions (close_time);
CREATE INDEX closed_by_type ON closed_executions (workflow_type_name);
CREATE INDEX closed_by_status ON closed_executions (status);
8 changes: 8 additions & 0 deletions schema/cadence/versioned/v0.1/manifest.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
{
"CurrVersion": "0.1",
"MinCompatibleVersion": "0.1",
"Description": "base version of schema",
"SchemaUpdateCqlFiles": [
"base.cql"
]
}
30 changes: 0 additions & 30 deletions schema/visibility_test.cql

This file was deleted.

24 changes: 24 additions & 0 deletions tools/cassandra/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
What
----
This package contains the tooling for cadence cassandra operations.

How
---
- Run make bins
- You should see an executable `cadence-cassandra-tool`

Setting up initial cassandra schema on a new cluster
----------------------------------------------------
```
./cadence-cassandra-tool -ep 127.0.0.1 -k cadence setup-schema -v 0.0 -- this sets up just the schema version tables with initial version of 0.0
./cadence-cassandra-tool -ep 127.0.0.1 -k cadence update-schema -d ./schema/cadence -- upgrades your schema to the latest version
```

Updating schema on an existing cluster
--------------------------------------
You can only upgrade to a new version after the initial setup done above.

```
./cadence-cassandra-tool -ep 127.0.0.1 -k cadence update-schema -d ./schema/cadence -v x.x -y -- executes a dryrun of upgrade to version x.x
./cadence-cassandra-tool -ep 127.0.0.1 -k cadence update-schema -d ./schema/cadence -v x.x -- actually executes the upgrade to version x.x
```
Loading

0 comments on commit ef260bf

Please sign in to comment.