Skip to content

Commit

Permalink
Implement sharded SQL driver to support using multiple SQL databases (c…
Browse files Browse the repository at this point in the history
  • Loading branch information
longquanzheng authored Oct 3, 2021
1 parent 844181f commit f5ce7cb
Show file tree
Hide file tree
Showing 24 changed files with 750 additions and 56 deletions.
65 changes: 54 additions & 11 deletions CONTRIBUTING.md
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ Also use `docker-compose -f ./docker/dev/cassandra.yml down` to stop and clean u
* Alternatively, use `./docker/dev/mysql.yml` for MySQL dependency
* Alternatively, use `./docker/dev/postgres.yml` for PostgreSQL dependency
* Alternatively, use `./docker/dev/cassandra-esv7-kafka.yml` for Cassandra, ElasticSearch(v7) and Kafka/ZooKeeper dependencies
* Alternatively, use `./docker/dev/mysql-esv7-kafka.yml` for MySQL, ElasticSearch(v7) and Kafka/ZooKeeper dependencies
* Alternatively, use `./docker/dev/cassandra-opensearch-kafka.yml` for Cassandra, OpenSearch(compatible with ElasticSearch v7) and Kafka/ZooKeeper dependencies

### 3. Schema installation
Expand All @@ -87,6 +88,9 @@ Based on the above dependency setup, you also need to install the schemas.
* If you use `cassandra-opensearch-kafka.yml` then run `make install-schema && make install-schema-opensearch` to install Casandra & ElasticSearch schemas
* If you use `mysql.yml` then run `install-schema-mysql` to install MySQL schemas
* If you use `postgres.yml` then run `install-schema-postgres` to install Postgres schemas
* `mysql-esv7-kafka.yml` can be used for single MySQL + ElasticSearch or multiple MySQL + ElasticSearch mode
* for single MySQL: run `install-schema-mysql && make install-schema-es-v7`
* for multiple MySQL: run `make install-schema-multiple-mysql` which will install schemas for 4 mysql databases and ElasticSearch

:warning: Note:
>If you use `cassandra-esv7-kafka.yml` and start server before `make install-schema-es-v7`, ElasticSearch may create a wrong index on demand.
Expand All @@ -98,22 +102,61 @@ curl -X DELETE "http://127.0.0.1:9200/cadence-visibility-dev"
### 4. Run
Once you have done all above, try running the local binaries:

* If you use `cassandra.yml` for above steps:
Then you will be able to run a basic local Cadence server for development:
```bash
./cadence-server start
```
* If you use `mysql.yml` then run `./cadence-server --zone mysql start`
* If you use `postgres.yml` then run `./cadence-server --zone postgres start`
* If you use `cassandra-esv7-kafka.yml` then run `./cadence-server --zone es_v7 start`
* If you use `cassandra-opensearch-kafka.yml` then run `./cadence-server --zone es_opensearch start`


Then you will be able to run a basic local Cadence server for development.

* If you use `cassandra.yml`, then run `./cadence-server start`, which will load `config/development.yaml` as config
* If you use `mysql.yml` then run `./cadence-server --zone mysql start`, which will load `config/development.yaml` + `config/development_mysql.yaml` as config
* If you use `postgres.yml` then run `./cadence-server --zone postgres start` , which will load `config/development.yaml` + `config/development_postgres.yaml` as config
* If you use `cassandra-esv7-kafka.yml` then run `./cadence-server --zone es_v7 start`, which will load `config/development.yaml` + `config/development_es_v7.yaml` as config
* If you use `cassandra-opensearch-kafka.yml` then run `./cadence-server --zone es_opensearch start` , which will load `config/development.yaml` + `config/development_es_opensearch.yaml` as config
* If you use `mysql-esv7-kafka.yaml`
* To run with multiple MySQL : `./cadence-server --zone multiple_mysql start`, which will load `config/development.yaml` + `config/development_multiple_mysql.yaml` as config

Then register a domain:
```
./cadence --do samples-domain domain register
```

Then run a helloworld from [Go Client Sample](https://github.com/uber-common/cadence-samples/) or [Java Client Sample](https://github.com/uber/cadence-java-samples)
Then run a helloworld from [Go Client Sample](https://github.com/uber-common/cadence-samples/) or [Java Client Sample](https://github.com/uber/cadence-java-samples)

```
make bins
```
will build all the samples.

Then
```
./bin/helloworld -m worker &
```
will start a worker for helloworld workflow.
You will see like:
```
$./bin/helloworld -m worker &
[1] 16520
2021-09-24T21:07:03.242-0700 INFO common/sample_helper.go:109 Logger created.
2021-09-24T21:07:03.243-0700 DEBUG common/factory.go:151 Creating RPC dispatcher outbound {"ServiceName": "cadence-frontend", "HostPort": "127.0.0.1:7933"}
2021-09-24T21:07:03.250-0700 INFO common/sample_helper.go:161 Domain successfully registered. {"Domain": "samples-domain"}
2021-09-24T21:07:03.291-0700 INFO internal/internal_worker.go:833 Started Workflow Worker {"Domain": "samples-domain", "TaskList": "helloWorldGroup", "WorkerID": "16520@IT-USA-25920@helloWorldGroup"}
2021-09-24T21:07:03.300-0700 INFO internal/internal_worker.go:858 Started Activity Worker {"Domain": "samples-domain", "TaskList": "helloWorldGroup", "WorkerID": "16520@IT-USA-25920@helloWorldGroup"}
```
Then
```
./bin/helloworld
```
to start a helloworld workflow.
You will see the result like :
```
$./bin/helloworld
2021-09-24T21:07:06.220-0700 INFO common/sample_helper.go:109 Logger created.
2021-09-24T21:07:06.220-0700 DEBUG common/factory.go:151 Creating RPC dispatcher outbound {"ServiceName": "cadence-frontend", "HostPort": "127.0.0.1:7933"}
2021-09-24T21:07:06.226-0700 INFO common/sample_helper.go:161 Domain successfully registered. {"Domain": "samples-domain"}
2021-09-24T21:07:06.272-0700 INFO common/sample_helper.go:195 Started Workflow {"WorkflowID": "helloworld_75cf142b-c0de-407e-9115-1d33e9b7551a", "RunID": "98a229b8-8fdd-4d1f-bf41-df00fb06f441"}
2021-09-24T21:07:06.347-0700 INFO helloworld/helloworld_workflow.go:31 helloworld workflow started {"Domain": "samples-domain", "TaskList": "helloWorldGroup", "WorkerID": "16520@IT-USA-25920@helloWorldGroup", "WorkflowType": "helloWorldWorkflow", "WorkflowID": "helloworld_75cf142b-c0de-407e-9115-1d33e9b7551a", "RunID": "98a229b8-8fdd-4d1f-bf41-df00fb06f441"}
2021-09-24T21:07:06.347-0700 DEBUG internal/internal_event_handlers.go:489 ExecuteActivity {"Domain": "samples-domain", "TaskList": "helloWorldGroup", "WorkerID": "16520@IT-USA-25920@helloWorldGroup", "WorkflowType": "helloWorldWorkflow", "WorkflowID": "helloworld_75cf142b-c0de-407e-9115-1d33e9b7551a", "RunID": "98a229b8-8fdd-4d1f-bf41-df00fb06f441", "ActivityID": "0", "ActivityType": "main.helloWorldActivity"}
2021-09-24T21:07:06.437-0700 INFO helloworld/helloworld_workflow.go:62 helloworld activity started {"Domain": "samples-domain", "TaskList": "helloWorldGroup", "WorkerID": "16520@IT-USA-25920@helloWorldGroup", "ActivityID": "0", "ActivityType": "main.helloWorldActivity", "WorkflowType": "helloWorldWorkflow", "WorkflowID": "helloworld_75cf142b-c0de-407e-9115-1d33e9b7551a", "RunID": "98a229b8-8fdd-4d1f-bf41-df00fb06f441"}
2021-09-24T21:07:06.513-0700 INFO helloworld/helloworld_workflow.go:55 Workflow completed. {"Domain": "samples-domain", "TaskList": "helloWorldGroup", "WorkerID": "16520@IT-USA-25920@helloWorldGroup", "WorkflowType": "helloWorldWorkflow", "WorkflowID": "helloworld_75cf142b-c0de-407e-9115-1d33e9b7551a", "RunID": "98a229b8-8fdd-4d1f-bf41-df00fb06f441", "Result": "Hello Cadence!"}
```

See [instructions](service/worker/README.md) for setting up replication(XDC).

Expand Down
14 changes: 14 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -546,6 +546,20 @@ install-schema-mysql: cadence-sql-tool
./cadence-sql-tool --ep 127.0.0.1 --user root --pw cadence --db cadence_visibility setup-schema -v 0.0
./cadence-sql-tool --ep 127.0.0.1 --user root --pw cadence --db cadence_visibility update-schema -d ./schema/mysql/v57/visibility/versioned

install-schema-multiple-mysql: cadence-sql-tool install-schema-es-v7
./cadence-sql-tool --ep 127.0.0.1 --user root --pw cadence create --db cadence0
./cadence-sql-tool --ep 127.0.0.1 --user root --pw cadence --db cadence0 setup-schema -v 0.0
./cadence-sql-tool --ep 127.0.0.1 --user root --pw cadence --db cadence0 update-schema -d ./schema/mysql/v57/cadence/versioned
./cadence-sql-tool --ep 127.0.0.1 --user root --pw cadence create --db cadence1
./cadence-sql-tool --ep 127.0.0.1 --user root --pw cadence --db cadence1 setup-schema -v 0.0
./cadence-sql-tool --ep 127.0.0.1 --user root --pw cadence --db cadence1 update-schema -d ./schema/mysql/v57/cadence/versioned
./cadence-sql-tool --ep 127.0.0.1 --user root --pw cadence create --db cadence2
./cadence-sql-tool --ep 127.0.0.1 --user root --pw cadence --db cadence2 setup-schema -v 0.0
./cadence-sql-tool --ep 127.0.0.1 --user root --pw cadence --db cadence2 update-schema -d ./schema/mysql/v57/cadence/versioned
./cadence-sql-tool --ep 127.0.0.1 --user root --pw cadence create --db cadence3
./cadence-sql-tool --ep 127.0.0.1 --user root --pw cadence --db cadence3 setup-schema -v 0.0
./cadence-sql-tool --ep 127.0.0.1 --user root --pw cadence --db cadence3 update-schema -d ./schema/mysql/v57/cadence/versioned

install-schema-postgres: cadence-sql-tool
./cadence-sql-tool --ep 127.0.0.1 -p 5432 -u postgres -pw cadence --pl postgres create --db cadence
./cadence-sql-tool --ep 127.0.0.1 -p 5432 -u postgres -pw cadence --pl postgres --db cadence setup -v 0.0
Expand Down
31 changes: 29 additions & 2 deletions common/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -244,15 +244,21 @@ type (
// SQL is the configuration for connecting to a SQL backed datastore
SQL struct {
// User is the username to be used for the conn
// If useMultipleDatabases, must be empty and provide it via multipleDatabasesConfig instead
User string `yaml:"user"`
// Password is the password corresponding to the user name
// If useMultipleDatabases, must be empty and provide it via multipleDatabasesConfig instead
Password string `yaml:"password"`
// PluginName is the name of SQL plugin
PluginName string `yaml:"pluginName" validate:"nonzero"`
// DatabaseName is the name of SQL database to connect to
DatabaseName string `yaml:"databaseName" validate:"nonzero"`
// If useMultipleDatabases, must be empty and provide it via multipleDatabasesConfig instead
// Required if not useMultipleDatabases
DatabaseName string `yaml:"databaseName"`
// ConnectAddr is the remote addr of the database
ConnectAddr string `yaml:"connectAddr" validate:"nonzero"`
// If useMultipleDatabases, must be empty and provide it via multipleDatabasesConfig instead
// Required if not useMultipleDatabases
ConnectAddr string `yaml:"connectAddr"`
// ConnectProtocol is the protocol that goes with the ConnectAddr ex - tcp, unix
ConnectProtocol string `yaml:"connectProtocol" validate:"nonzero"`
// ConnectAttributes is a set of key-value attributes to be sent as part of connect data_source_name url
Expand All @@ -275,9 +281,29 @@ type (
// DecodingTypes is the configuration for all the sql blob decoding types which need to be supported
// DecodingTypes should not be removed unless there are no blobs in database with the encoding type
DecodingTypes []string `yaml:"decodingTypes"`
// UseMultipleDatabases enables using multiple databases as a sharding SQL database, default is false
// When enabled, connection will be established using MultipleDatabasesConfig in favor of single values
// of User, Password, DatabaseName, ConnectAddr.
UseMultipleDatabases bool `yaml:"useMultipleDatabases"`
// Required when UseMultipleDatabases is true
// the length of the list should be exactly the same as NumShards
MultipleDatabasesConfig []MultipleDatabasesConfigEntry `yaml:"multipleDatabasesConfig"`
}

// MultipleDatabasesConfigEntry is an entry for MultipleDatabasesConfig to connect to a single SQL database
MultipleDatabasesConfigEntry struct {
// User is the username to be used for the conn
User string `yaml:"user"`
// Password is the password corresponding to the user name
Password string `yaml:"password"`
// DatabaseName is the name of SQL database to connect to
DatabaseName string `yaml:"databaseName" validate:"nonzero"`
// ConnectAddr is the remote addr of the database
ConnectAddr string `yaml:"connectAddr" validate:"nonzero"`
}

// CustomDatastoreConfig is the configuration for connecting to a custom datastore that is not supported by cadence core
// TODO can we remove it?
CustomDatastoreConfig struct {
// Name of the custom datastore
Name string `yaml:"name"`
Expand All @@ -286,6 +312,7 @@ type (
}

// Replicator describes the configuration of replicator
// TODO can we remove it?
Replicator struct{}

// Logger contains the config items for logger
Expand Down
106 changes: 106 additions & 0 deletions common/config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
package config

import (
"net/url"
"testing"

"github.com/stretchr/testify/assert"
Expand Down Expand Up @@ -53,6 +54,111 @@ func TestFillingDefaultSQLEncodingDecodingTypes(t *testing.T) {
assert.Equal(t, []string{string(common.EncodingTypeThriftRW)}, cfg.Persistence.DataStores["sql"].SQL.DecodingTypes)
}

func getValidMultipleDatabasseConfig() *Config {
metadata := validClusterGroupMetadata()
cfg := &Config{
ClusterGroupMetadata: metadata,
Persistence: Persistence{
DefaultStore: "default",
AdvancedVisibilityStore: "esv7",
DataStores: map[string]DataStore{
"default": {
SQL: &SQL{
PluginName: "fake",
ConnectProtocol: "tcp",
NumShards: 2,
UseMultipleDatabases: true,
MultipleDatabasesConfig: []MultipleDatabasesConfigEntry{
{
DatabaseName: "db1",
ConnectAddr: "192.168.0.1:3306",
},
{
DatabaseName: "db2",
ConnectAddr: "192.168.0.2:3306",
},
},
},
},
"esv7": {
ElasticSearch: &ElasticSearchConfig{
Version: "v7",
URL: url.URL{Scheme: "http",
Host: "127.0.0.1:9200",
},
Indices: map[string]string{
"visibility": "cadence-visibility-dev",
},
},
// no sql or nosql, should be populated from cassandra
},
},
},
}
return cfg
}

func TestValidMultipleDatabaseConfig(t *testing.T) {
cfg := getValidMultipleDatabasseConfig()
err := cfg.ValidateAndFillDefaults()
require.NoError(t, err)
}

func TestInvalidMultipleDatabaseConfig_useBasicVisibility(t *testing.T) {
cfg := getValidMultipleDatabasseConfig()
cfg.Persistence.VisibilityStore = "basic"
cfg.Persistence.DataStores["basic"] = DataStore{
SQL: &SQL{},
}
err := cfg.ValidateAndFillDefaults()
require.EqualError(t, err, "sql persistence config: multipleSQLDatabases can only be used with advanced visibility only")
}

func TestInvalidMultipleDatabaseConfig_wrongNumDBShards(t *testing.T) {
cfg := getValidMultipleDatabasseConfig()
sqlds := cfg.Persistence.DataStores["default"]
sqlds.SQL.NumShards = 3
cfg.Persistence.DataStores["default"] = sqlds
err := cfg.ValidateAndFillDefaults()
require.EqualError(t, err, "sql persistence config: nShards must be greater than one and equal to the length of multipleDatabasesConfig")
}

func TestInvalidMultipleDatabaseConfig_nonEmptySQLUser(t *testing.T) {
cfg := getValidMultipleDatabasseConfig()
sqlds := cfg.Persistence.DataStores["default"]
sqlds.SQL.User = "user"
cfg.Persistence.DataStores["default"] = sqlds
err := cfg.ValidateAndFillDefaults()
require.EqualError(t, err, "sql persistence config: user can only be configured in multipleDatabasesConfig when UseMultipleDatabases is true")
}

func TestInvalidMultipleDatabaseConfig_nonEmptySQLPassword(t *testing.T) {
cfg := getValidMultipleDatabasseConfig()
sqlds := cfg.Persistence.DataStores["default"]
sqlds.SQL.Password = "pw"
cfg.Persistence.DataStores["default"] = sqlds
err := cfg.ValidateAndFillDefaults()
require.EqualError(t, err, "sql persistence config: password can only be configured in multipleDatabasesConfig when UseMultipleDatabases is true")
}

func TestInvalidMultipleDatabaseConfig_nonEmptySQLDatabaseName(t *testing.T) {
cfg := getValidMultipleDatabasseConfig()
sqlds := cfg.Persistence.DataStores["default"]
sqlds.SQL.DatabaseName = "db"
cfg.Persistence.DataStores["default"] = sqlds
err := cfg.ValidateAndFillDefaults()
require.EqualError(t, err, "sql persistence config: databaseName can only be configured in multipleDatabasesConfig when UseMultipleDatabases is true")
}

func TestInvalidMultipleDatabaseConfig_nonEmptySQLConnAddr(t *testing.T) {
cfg := getValidMultipleDatabasseConfig()
sqlds := cfg.Persistence.DataStores["default"]
sqlds.SQL.ConnectAddr = "127.0.0.1:3306"
cfg.Persistence.DataStores["default"] = sqlds
err := cfg.ValidateAndFillDefaults()
require.EqualError(t, err, "sql persistence config: connectAddr can only be configured in multipleDatabasesConfig when UseMultipleDatabases is true")
}

func TestConfigFallbacks(t *testing.T) {
metadata := validClusterGroupMetadata()
cfg := &Config{
Expand Down
39 changes: 39 additions & 0 deletions common/config/persistence.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,12 +75,14 @@ func (c *Persistence) FillDefaults() {
func (c *Persistence) Validate() error {
dbStoreKeys := []string{c.DefaultStore}

useAdvancedVisibilityOnly := false
if _, ok := c.DataStores[c.VisibilityStore]; ok {
dbStoreKeys = append(dbStoreKeys, c.VisibilityStore)
} else {
if _, ok := c.DataStores[c.AdvancedVisibilityStore]; !ok {
return fmt.Errorf("must provide one of VisibilityStore and AdvancedVisibilityStore")
}
useAdvancedVisibilityOnly = true
}

for _, st := range dbStoreKeys {
Expand All @@ -97,6 +99,43 @@ func (c *Persistence) Validate() error {
if ds.SQL != nil && ds.NoSQL != nil {
return fmt.Errorf("persistence config: datastore %v: only one of SQL or NoSQL can be specified", st)
}
if ds.SQL != nil {
if ds.SQL.UseMultipleDatabases {
if !useAdvancedVisibilityOnly {
return fmt.Errorf("sql persistence config: multipleSQLDatabases can only be used with advanced visibility only")
}
if ds.SQL.DatabaseName != "" {
return fmt.Errorf("sql persistence config: databaseName can only be configured in multipleDatabasesConfig when UseMultipleDatabases is true")
}
if ds.SQL.ConnectAddr != "" {
return fmt.Errorf("sql persistence config: connectAddr can only be configured in multipleDatabasesConfig when UseMultipleDatabases is true")
}
if ds.SQL.User != "" {
return fmt.Errorf("sql persistence config: user can only be configured in multipleDatabasesConfig when UseMultipleDatabases is true")
}
if ds.SQL.Password != "" {
return fmt.Errorf("sql persistence config: password can only be configured in multipleDatabasesConfig when UseMultipleDatabases is true")
}
if ds.SQL.NumShards <= 1 || len(ds.SQL.MultipleDatabasesConfig) != ds.SQL.NumShards {
return fmt.Errorf("sql persistence config: nShards must be greater than one and equal to the length of multipleDatabasesConfig")
}
for _, entry := range ds.SQL.MultipleDatabasesConfig {
if entry.DatabaseName == "" {
return fmt.Errorf("sql multipleDatabasesConfig persistence config: databaseName can not be empty")
}
if entry.ConnectAddr == "" {
return fmt.Errorf("sql multipleDatabasesConfig persistence config: connectAddr can not be empty")
}
}
} else {
if ds.SQL.DatabaseName == "" {
return fmt.Errorf("sql persistence config: databaseName can not be empty")
}
if ds.SQL.ConnectAddr == "" {
return fmt.Errorf("sql persistence config: connectAddr can not be empty")
}
}
}
}

return nil
Expand Down
1 change: 1 addition & 0 deletions common/dynamicconfig/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -1720,6 +1720,7 @@ const (
// Allowed filters: N/A
ScannerBatchSizeForTasklistHandler
// EnableCleaningOrphanTaskInTasklistScavenger indicates if enabling the scanner to clean up orphan tasks
// Only implemented for single SQL database. TODO https://github.com/uber/cadence/issues/4064 for supporting multiple/sharded SQL database and NoSQL
// KeyName: worker.enableCleaningOrphanTaskInTasklistScavenger
// Value type: Bool
// Default value: FALSE
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ func (s *VersionTestSuite) TestCheckCompatibleVersion() {
{"2.0", "1.0", "version mismatch", false},
{"1.0", "1.0", "", false},
{"1.0", "2.0", "", false},
{"1.0", "abc", "unable to read cassandra schema version", false},
{"1.0", "abc", "unable to read schema version keyspace/database", false},
}
for _, flag := range flags {
s.runCheckCompatibleVersion(flag.expectedVersion, flag.actualVersion, flag.errStr, flag.expectedFail)
Expand Down
Loading

0 comments on commit f5ce7cb

Please sign in to comment.