From cb971cad6463645606a7d710378b4bbbabc1144e Mon Sep 17 00:00:00 2001 From: Tamer Eldeeb Date: Mon, 5 Jun 2017 12:15:35 -0700 Subject: [PATCH] Move visibility store to its own keyspace (#217) Issue #181 --- README.md | 6 ++- common/cassandra_helpers.go | 9 ++-- common/persistence/persistenceTestBase.go | 15 ++++++- common/service/config/config.go | 2 + config/development.yaml | 1 + schema/cadence/schema.cql | 41 +------------------ schema/visibility/keyspace.cql | 1 + schema/visibility/schema.cql | 38 +++++++++++++++++ schema/visibility/versioned/v0.1/base.cql | 38 +++++++++++++++++ .../visibility/versioned/v0.1/manifest.json | 8 ++++ service/history/service.go | 2 +- 11 files changed, 113 insertions(+), 48 deletions(-) create mode 100644 schema/visibility/keyspace.cql create mode 100644 schema/visibility/schema.cql create mode 100644 schema/visibility/versioned/v0.1/base.cql create mode 100644 schema/visibility/versioned/v0.1/manifest.json diff --git a/README.md b/README.md index 103d967adba..a979da8d122 100644 --- a/README.md +++ b/README.md @@ -1,4 +1,4 @@ -# Cadence +# Cadence Cadence is a distributed, scalable, durable, and highly available orchestration engine we developed at Uber Engineering to execute asynchronous long-running business logic in a scalable and resilient way. @@ -27,6 +27,8 @@ brew install cassandra ```bash ./cadence-cassandra-tool --ep 127.0.0.1 create -k "cadence" --rf 1 ./cadence-cassandra-tool --ep 127.0.0.1 -k "cadence" setup-schema -d -f ./schema/cadence/schema.cql +./cadence-cassandra-tool --ep 127.0.0.1 create -k "cadence_visibility" --rf 1 +./cadence-cassandra-tool --ep 127.0.0.1 -k "cadence_visibility" setup-schema -d -f ./schema/visibility/schema.cql ``` * Start the service: @@ -36,7 +38,7 @@ brew install cassandra ### Using Docker -You can also [build and run](docker/README.md) the service using Docker. +You can also [build and run](docker/README.md) the service using Docker. ## Contributing We'd love your help in making Cadence great. Please review our [instructions](CONTRIBUTING.md). diff --git a/common/cassandra_helpers.go b/common/cassandra_helpers.go index 25b37eaf46c..55554e9fe48 100644 --- a/common/cassandra_helpers.go +++ b/common/cassandra_helpers.go @@ -26,11 +26,12 @@ import ( "github.com/uber/cadence/common/logging" + "io/ioutil" + "os" + log "github.com/Sirupsen/logrus" "github.com/gocql/gocql" "github.com/uber/cadence/tools/cassandra" - "io/ioutil" - "os" ) // NewCassandraCluster creates a cassandra cluster given comma separated list of clusterHosts @@ -79,7 +80,7 @@ func DropCassandraKeyspace(s *gocql.Session, keyspace string) (err error) { } // LoadCassandraSchema loads the schema from the given .cql files on this keyspace -func LoadCassandraSchema(dir string, fileNames []string, keyspace string) (err error) { +func LoadCassandraSchema(dir string, fileNames []string, keyspace string, override bool) (err error) { tmpFile, err := ioutil.TempFile("", "_cadence_") if err != nil { @@ -104,7 +105,7 @@ func LoadCassandraSchema(dir string, fileNames []string, keyspace string) (err e CassKeyspace: keyspace, }, SchemaFilePath: tmpFile.Name(), - Overwrite: true, + Overwrite: override, DisableVersioning: true, } diff --git a/common/persistence/persistenceTestBase.go b/common/persistence/persistenceTestBase.go index 2bc01afc51d..097576877e8 100644 --- a/common/persistence/persistenceTestBase.go +++ b/common/persistence/persistenceTestBase.go @@ -800,6 +800,7 @@ func (s *CassandraTestCluster) setupTestCluster(keySpace string, dropKeySpace bo s.createCluster(testWorkflowClusterHosts, testDatacenter, gocql.Consistency(1), keySpace) s.createKeyspace(1, dropKeySpace) s.loadSchema([]string{"schema.cql"}, schemaDir) + s.loadVisibilitySchema([]string{"schema.cql"}, schemaDir) } func (s *CassandraTestCluster) tearDownTestCluster() { @@ -842,7 +843,19 @@ func (s *CassandraTestCluster) loadSchema(fileNames []string, schemaDir string) workflowSchemaDir = schemaDir + "/schema/cadence" } - err := common.LoadCassandraSchema(workflowSchemaDir, fileNames, s.keyspace) + err := common.LoadCassandraSchema(workflowSchemaDir, fileNames, s.keyspace, true) + if err != nil && !strings.Contains(err.Error(), "AlreadyExists") { + log.Fatal(err) + } +} + +func (s *CassandraTestCluster) loadVisibilitySchema(fileNames []string, schemaDir string) { + workflowSchemaDir := "./schema/visibility" + if schemaDir != "" { + workflowSchemaDir = schemaDir + "/schema/visibility" + } + + err := common.LoadCassandraSchema(workflowSchemaDir, fileNames, s.keyspace, false) if err != nil && !strings.Contains(err.Error(), "AlreadyExists") { log.Fatal(err) } diff --git a/common/service/config/config.go b/common/service/config/config.go index ff2c9aa7303..f2f2dcc662e 100644 --- a/common/service/config/config.go +++ b/common/service/config/config.go @@ -82,6 +82,8 @@ type ( Hosts string `yaml:"hosts" validate:"nonzero"` // Keyspace is the cassandra keyspace Keyspace string `yaml:"keyspace" validate:"nonzero"` + // VisibilityKeyspace is the cassandra keyspace for visibility store + VisibilityKeyspace string `yaml:"visibilityKeyspace" validate:"nonzero"` // Consistency is the default cassandra consistency level Consistency string `yaml:"consistency"` // Datacenter is the data center filter arg for cassandra diff --git a/config/development.yaml b/config/development.yaml index 97116fc8135..2b7e06e6384 100644 --- a/config/development.yaml +++ b/config/development.yaml @@ -1,6 +1,7 @@ cassandra: hosts: "127.0.0.1" keyspace: "cadence" + visibilityKeyspace: "cadence_visibility" consistency: "One" numHistoryShards: 4 diff --git a/schema/cadence/schema.cql b/schema/cadence/schema.cql index db5fb397491..76225c312b4 100644 --- a/schema/cadence/schema.cql +++ b/schema/cadence/schema.cql @@ -204,43 +204,4 @@ CREATE TABLE domains_by_name ( ) WITH COMPACTION = { 'class': 'org.apache.cassandra.db.compaction.LeveledCompactionStrategy' } - AND GC_GRACE_SECONDS = 172800; - -CREATE TABLE open_executions ( - domain_id uuid, - domain_partition int, - workflow_id text, - run_id uuid, - start_time timestamp, - workflow_type_name text, - PRIMARY KEY ((domain_id, domain_partition), start_time, run_id) -) WITH CLUSTERING ORDER BY (start_time DESC) - AND COMPACTION = { - 'class': 'org.apache.cassandra.db.compaction.LeveledCompactionStrategy' - } - AND GC_GRACE_SECONDS = 172800; - - -CREATE INDEX open_by_workflow_id ON open_executions (workflow_id); -CREATE INDEX open_by_type ON open_executions (workflow_type_name); - -CREATE TABLE closed_executions ( - domain_id uuid, - domain_partition int, - workflow_id text, - run_id uuid, - start_time timestamp, - close_time timestamp, - status int, -- enum WorkflowExecutionCloseStatus {COMPLETED, FAILED, CANCELED, TERMINATED, CONTINUED_AS_NEW, TIMED_OUT} - workflow_type_name text, - PRIMARY KEY ((domain_id, domain_partition), start_time, run_id) -) WITH CLUSTERING ORDER BY (start_time DESC) - AND COMPACTION = { - 'class': 'org.apache.cassandra.db.compaction.LeveledCompactionStrategy' - } - AND GC_GRACE_SECONDS = 172800; - -CREATE INDEX closed_by_workflow_id ON closed_executions (workflow_id); -CREATE INDEX closed_by_close_time ON closed_executions (close_time); -CREATE INDEX closed_by_type ON closed_executions (workflow_type_name); -CREATE INDEX closed_by_status ON closed_executions (status); \ No newline at end of file + AND GC_GRACE_SECONDS = 172800; \ No newline at end of file diff --git a/schema/visibility/keyspace.cql b/schema/visibility/keyspace.cql new file mode 100644 index 00000000000..50d6e6a7a5d --- /dev/null +++ b/schema/visibility/keyspace.cql @@ -0,0 +1 @@ +CREATE KEYSPACE IF NOT EXISTS cadence_visibility WITH replication = { 'class' : 'SimpleStrategy', 'replication_factor' : 1}; diff --git a/schema/visibility/schema.cql b/schema/visibility/schema.cql new file mode 100644 index 00000000000..9518c6cad2a --- /dev/null +++ b/schema/visibility/schema.cql @@ -0,0 +1,38 @@ +CREATE TABLE open_executions ( + domain_id uuid, + domain_partition int, + workflow_id text, + run_id uuid, + start_time timestamp, + workflow_type_name text, + PRIMARY KEY ((domain_id, domain_partition), start_time, run_id) +) WITH CLUSTERING ORDER BY (start_time DESC) + AND COMPACTION = { + 'class': 'org.apache.cassandra.db.compaction.LeveledCompactionStrategy' + } + AND GC_GRACE_SECONDS = 172800; + + +CREATE INDEX open_by_workflow_id ON open_executions (workflow_id); +CREATE INDEX open_by_type ON open_executions (workflow_type_name); + +CREATE TABLE closed_executions ( + domain_id uuid, + domain_partition int, + workflow_id text, + run_id uuid, + start_time timestamp, + close_time timestamp, + status int, -- enum WorkflowExecutionCloseStatus {COMPLETED, FAILED, CANCELED, TERMINATED, CONTINUED_AS_NEW, TIMED_OUT} + workflow_type_name text, + PRIMARY KEY ((domain_id, domain_partition), start_time, run_id) +) WITH CLUSTERING ORDER BY (start_time DESC) + AND COMPACTION = { + 'class': 'org.apache.cassandra.db.compaction.LeveledCompactionStrategy' + } + AND GC_GRACE_SECONDS = 172800; + +CREATE INDEX closed_by_workflow_id ON closed_executions (workflow_id); +CREATE INDEX closed_by_close_time ON closed_executions (close_time); +CREATE INDEX closed_by_type ON closed_executions (workflow_type_name); +CREATE INDEX closed_by_status ON closed_executions (status); \ No newline at end of file diff --git a/schema/visibility/versioned/v0.1/base.cql b/schema/visibility/versioned/v0.1/base.cql new file mode 100644 index 00000000000..9518c6cad2a --- /dev/null +++ b/schema/visibility/versioned/v0.1/base.cql @@ -0,0 +1,38 @@ +CREATE TABLE open_executions ( + domain_id uuid, + domain_partition int, + workflow_id text, + run_id uuid, + start_time timestamp, + workflow_type_name text, + PRIMARY KEY ((domain_id, domain_partition), start_time, run_id) +) WITH CLUSTERING ORDER BY (start_time DESC) + AND COMPACTION = { + 'class': 'org.apache.cassandra.db.compaction.LeveledCompactionStrategy' + } + AND GC_GRACE_SECONDS = 172800; + + +CREATE INDEX open_by_workflow_id ON open_executions (workflow_id); +CREATE INDEX open_by_type ON open_executions (workflow_type_name); + +CREATE TABLE closed_executions ( + domain_id uuid, + domain_partition int, + workflow_id text, + run_id uuid, + start_time timestamp, + close_time timestamp, + status int, -- enum WorkflowExecutionCloseStatus {COMPLETED, FAILED, CANCELED, TERMINATED, CONTINUED_AS_NEW, TIMED_OUT} + workflow_type_name text, + PRIMARY KEY ((domain_id, domain_partition), start_time, run_id) +) WITH CLUSTERING ORDER BY (start_time DESC) + AND COMPACTION = { + 'class': 'org.apache.cassandra.db.compaction.LeveledCompactionStrategy' + } + AND GC_GRACE_SECONDS = 172800; + +CREATE INDEX closed_by_workflow_id ON closed_executions (workflow_id); +CREATE INDEX closed_by_close_time ON closed_executions (close_time); +CREATE INDEX closed_by_type ON closed_executions (workflow_type_name); +CREATE INDEX closed_by_status ON closed_executions (status); \ No newline at end of file diff --git a/schema/visibility/versioned/v0.1/manifest.json b/schema/visibility/versioned/v0.1/manifest.json new file mode 100644 index 00000000000..fb9620bd7af --- /dev/null +++ b/schema/visibility/versioned/v0.1/manifest.json @@ -0,0 +1,8 @@ +{ + "CurrVersion": "0.1", + "MinCompatibleVersion": "0.1", + "Description": "base version of visibility schema", + "SchemaUpdateCqlFiles": [ + "base.cql" + ] +} diff --git a/service/history/service.go b/service/history/service.go index b3c834e5bf5..790bb710639 100644 --- a/service/history/service.go +++ b/service/history/service.go @@ -87,7 +87,7 @@ func (s *Service) Start() { visibility, err := persistence.NewCassandraVisibilityPersistence(p.CassandraConfig.Hosts, p.CassandraConfig.Datacenter, - p.CassandraConfig.Keyspace, + p.CassandraConfig.VisibilityKeyspace, p.Logger) if err != nil {