Skip to content

Commit

Permalink
Move visibility store to its own keyspace (cadence-workflow#217)
Browse files Browse the repository at this point in the history
  • Loading branch information
Tamer Eldeeb authored Jun 5, 2017
1 parent f48b5cf commit cb971ca
Show file tree
Hide file tree
Showing 11 changed files with 113 additions and 48 deletions.
6 changes: 4 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Cadence
# Cadence

Cadence is a distributed, scalable, durable, and highly available orchestration engine we developed at Uber Engineering to execute asynchronous long-running business logic in a scalable and resilient way.

Expand Down Expand Up @@ -27,6 +27,8 @@ brew install cassandra
```bash
./cadence-cassandra-tool --ep 127.0.0.1 create -k "cadence" --rf 1
./cadence-cassandra-tool --ep 127.0.0.1 -k "cadence" setup-schema -d -f ./schema/cadence/schema.cql
./cadence-cassandra-tool --ep 127.0.0.1 create -k "cadence_visibility" --rf 1
./cadence-cassandra-tool --ep 127.0.0.1 -k "cadence_visibility" setup-schema -d -f ./schema/visibility/schema.cql
```

* Start the service:
Expand All @@ -36,7 +38,7 @@ brew install cassandra

### Using Docker

You can also [build and run](docker/README.md) the service using Docker.
You can also [build and run](docker/README.md) the service using Docker.

## Contributing
We'd love your help in making Cadence great. Please review our [instructions](CONTRIBUTING.md).
Expand Down
9 changes: 5 additions & 4 deletions common/cassandra_helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,12 @@ import (

"github.com/uber/cadence/common/logging"

"io/ioutil"
"os"

log "github.com/Sirupsen/logrus"
"github.com/gocql/gocql"
"github.com/uber/cadence/tools/cassandra"
"io/ioutil"
"os"
)

// NewCassandraCluster creates a cassandra cluster given comma separated list of clusterHosts
Expand Down Expand Up @@ -79,7 +80,7 @@ func DropCassandraKeyspace(s *gocql.Session, keyspace string) (err error) {
}

// LoadCassandraSchema loads the schema from the given .cql files on this keyspace
func LoadCassandraSchema(dir string, fileNames []string, keyspace string) (err error) {
func LoadCassandraSchema(dir string, fileNames []string, keyspace string, override bool) (err error) {

tmpFile, err := ioutil.TempFile("", "_cadence_")
if err != nil {
Expand All @@ -104,7 +105,7 @@ func LoadCassandraSchema(dir string, fileNames []string, keyspace string) (err e
CassKeyspace: keyspace,
},
SchemaFilePath: tmpFile.Name(),
Overwrite: true,
Overwrite: override,
DisableVersioning: true,
}

Expand Down
15 changes: 14 additions & 1 deletion common/persistence/persistenceTestBase.go
Original file line number Diff line number Diff line change
Expand Up @@ -800,6 +800,7 @@ func (s *CassandraTestCluster) setupTestCluster(keySpace string, dropKeySpace bo
s.createCluster(testWorkflowClusterHosts, testDatacenter, gocql.Consistency(1), keySpace)
s.createKeyspace(1, dropKeySpace)
s.loadSchema([]string{"schema.cql"}, schemaDir)
s.loadVisibilitySchema([]string{"schema.cql"}, schemaDir)
}

func (s *CassandraTestCluster) tearDownTestCluster() {
Expand Down Expand Up @@ -842,7 +843,19 @@ func (s *CassandraTestCluster) loadSchema(fileNames []string, schemaDir string)
workflowSchemaDir = schemaDir + "/schema/cadence"
}

err := common.LoadCassandraSchema(workflowSchemaDir, fileNames, s.keyspace)
err := common.LoadCassandraSchema(workflowSchemaDir, fileNames, s.keyspace, true)
if err != nil && !strings.Contains(err.Error(), "AlreadyExists") {
log.Fatal(err)
}
}

func (s *CassandraTestCluster) loadVisibilitySchema(fileNames []string, schemaDir string) {
workflowSchemaDir := "./schema/visibility"
if schemaDir != "" {
workflowSchemaDir = schemaDir + "/schema/visibility"
}

err := common.LoadCassandraSchema(workflowSchemaDir, fileNames, s.keyspace, false)
if err != nil && !strings.Contains(err.Error(), "AlreadyExists") {
log.Fatal(err)
}
Expand Down
2 changes: 2 additions & 0 deletions common/service/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,8 @@ type (
Hosts string `yaml:"hosts" validate:"nonzero"`
// Keyspace is the cassandra keyspace
Keyspace string `yaml:"keyspace" validate:"nonzero"`
// VisibilityKeyspace is the cassandra keyspace for visibility store
VisibilityKeyspace string `yaml:"visibilityKeyspace" validate:"nonzero"`
// Consistency is the default cassandra consistency level
Consistency string `yaml:"consistency"`
// Datacenter is the data center filter arg for cassandra
Expand Down
1 change: 1 addition & 0 deletions config/development.yaml
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
cassandra:
hosts: "127.0.0.1"
keyspace: "cadence"
visibilityKeyspace: "cadence_visibility"
consistency: "One"
numHistoryShards: 4

Expand Down
41 changes: 1 addition & 40 deletions schema/cadence/schema.cql
Original file line number Diff line number Diff line change
Expand Up @@ -204,43 +204,4 @@ CREATE TABLE domains_by_name (
) WITH COMPACTION = {
'class': 'org.apache.cassandra.db.compaction.LeveledCompactionStrategy'
}
AND GC_GRACE_SECONDS = 172800;

CREATE TABLE open_executions (
domain_id uuid,
domain_partition int,
workflow_id text,
run_id uuid,
start_time timestamp,
workflow_type_name text,
PRIMARY KEY ((domain_id, domain_partition), start_time, run_id)
) WITH CLUSTERING ORDER BY (start_time DESC)
AND COMPACTION = {
'class': 'org.apache.cassandra.db.compaction.LeveledCompactionStrategy'
}
AND GC_GRACE_SECONDS = 172800;


CREATE INDEX open_by_workflow_id ON open_executions (workflow_id);
CREATE INDEX open_by_type ON open_executions (workflow_type_name);

CREATE TABLE closed_executions (
domain_id uuid,
domain_partition int,
workflow_id text,
run_id uuid,
start_time timestamp,
close_time timestamp,
status int, -- enum WorkflowExecutionCloseStatus {COMPLETED, FAILED, CANCELED, TERMINATED, CONTINUED_AS_NEW, TIMED_OUT}
workflow_type_name text,
PRIMARY KEY ((domain_id, domain_partition), start_time, run_id)
) WITH CLUSTERING ORDER BY (start_time DESC)
AND COMPACTION = {
'class': 'org.apache.cassandra.db.compaction.LeveledCompactionStrategy'
}
AND GC_GRACE_SECONDS = 172800;

CREATE INDEX closed_by_workflow_id ON closed_executions (workflow_id);
CREATE INDEX closed_by_close_time ON closed_executions (close_time);
CREATE INDEX closed_by_type ON closed_executions (workflow_type_name);
CREATE INDEX closed_by_status ON closed_executions (status);
AND GC_GRACE_SECONDS = 172800;
1 change: 1 addition & 0 deletions schema/visibility/keyspace.cql
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
CREATE KEYSPACE IF NOT EXISTS cadence_visibility WITH replication = { 'class' : 'SimpleStrategy', 'replication_factor' : 1};
38 changes: 38 additions & 0 deletions schema/visibility/schema.cql
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
CREATE TABLE open_executions (
domain_id uuid,
domain_partition int,
workflow_id text,
run_id uuid,
start_time timestamp,
workflow_type_name text,
PRIMARY KEY ((domain_id, domain_partition), start_time, run_id)
) WITH CLUSTERING ORDER BY (start_time DESC)
AND COMPACTION = {
'class': 'org.apache.cassandra.db.compaction.LeveledCompactionStrategy'
}
AND GC_GRACE_SECONDS = 172800;


CREATE INDEX open_by_workflow_id ON open_executions (workflow_id);
CREATE INDEX open_by_type ON open_executions (workflow_type_name);

CREATE TABLE closed_executions (
domain_id uuid,
domain_partition int,
workflow_id text,
run_id uuid,
start_time timestamp,
close_time timestamp,
status int, -- enum WorkflowExecutionCloseStatus {COMPLETED, FAILED, CANCELED, TERMINATED, CONTINUED_AS_NEW, TIMED_OUT}
workflow_type_name text,
PRIMARY KEY ((domain_id, domain_partition), start_time, run_id)
) WITH CLUSTERING ORDER BY (start_time DESC)
AND COMPACTION = {
'class': 'org.apache.cassandra.db.compaction.LeveledCompactionStrategy'
}
AND GC_GRACE_SECONDS = 172800;

CREATE INDEX closed_by_workflow_id ON closed_executions (workflow_id);
CREATE INDEX closed_by_close_time ON closed_executions (close_time);
CREATE INDEX closed_by_type ON closed_executions (workflow_type_name);
CREATE INDEX closed_by_status ON closed_executions (status);
38 changes: 38 additions & 0 deletions schema/visibility/versioned/v0.1/base.cql
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
CREATE TABLE open_executions (
domain_id uuid,
domain_partition int,
workflow_id text,
run_id uuid,
start_time timestamp,
workflow_type_name text,
PRIMARY KEY ((domain_id, domain_partition), start_time, run_id)
) WITH CLUSTERING ORDER BY (start_time DESC)
AND COMPACTION = {
'class': 'org.apache.cassandra.db.compaction.LeveledCompactionStrategy'
}
AND GC_GRACE_SECONDS = 172800;


CREATE INDEX open_by_workflow_id ON open_executions (workflow_id);
CREATE INDEX open_by_type ON open_executions (workflow_type_name);

CREATE TABLE closed_executions (
domain_id uuid,
domain_partition int,
workflow_id text,
run_id uuid,
start_time timestamp,
close_time timestamp,
status int, -- enum WorkflowExecutionCloseStatus {COMPLETED, FAILED, CANCELED, TERMINATED, CONTINUED_AS_NEW, TIMED_OUT}
workflow_type_name text,
PRIMARY KEY ((domain_id, domain_partition), start_time, run_id)
) WITH CLUSTERING ORDER BY (start_time DESC)
AND COMPACTION = {
'class': 'org.apache.cassandra.db.compaction.LeveledCompactionStrategy'
}
AND GC_GRACE_SECONDS = 172800;

CREATE INDEX closed_by_workflow_id ON closed_executions (workflow_id);
CREATE INDEX closed_by_close_time ON closed_executions (close_time);
CREATE INDEX closed_by_type ON closed_executions (workflow_type_name);
CREATE INDEX closed_by_status ON closed_executions (status);
8 changes: 8 additions & 0 deletions schema/visibility/versioned/v0.1/manifest.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
{
"CurrVersion": "0.1",
"MinCompatibleVersion": "0.1",
"Description": "base version of visibility schema",
"SchemaUpdateCqlFiles": [
"base.cql"
]
}
2 changes: 1 addition & 1 deletion service/history/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ func (s *Service) Start() {

visibility, err := persistence.NewCassandraVisibilityPersistence(p.CassandraConfig.Hosts,
p.CassandraConfig.Datacenter,
p.CassandraConfig.Keyspace,
p.CassandraConfig.VisibilityKeyspace,
p.Logger)

if err != nil {
Expand Down

0 comments on commit cb971ca

Please sign in to comment.