Skip to content

Commit

Permalink
Physical sharding for NoSQL-based persistence (cadence-workflow#5187)
Browse files Browse the repository at this point in the history
What changed?
Added a sharding layer to the NoSQL persistence stack so that Cadence can use multiple Cassandra clusters at once in a physically sharded manner.

Cadence is a heavily storage-bounded system, so the limits for the load per Cadence cluster is strictly limited by the underlying storage system. Given the massive adoption of Cadence at Uber, this scale limitation forces us to create more Cadence clusters than we want to operate. This capability will let us have one or two orders of magnitude larger Cadence clusters than we have today.

Note that this feature only enables bootstrapping a brand-new cluster with multiple databases behind it. Resharding is designed but not implemented yet.

Why?
So that a Cadence cluster can be bootstrapped with multiple Cassandra clusters powering it.

How did you test it?
Added unit tests. Ran samples and tested bench tests in a staging environment.

Potential risks
Since this change significantly changes the low-level persistence logic, it can cause data loss if something goes terribly wrong.

Release notes
The change is backward compatible. Existing Cadence cluster configurations can be updated, if desired, to use the sharded NoSQL config format. However, they must continue having a single shard since Cadence still doesn't have the ability to reshard data.

Documentation Changes
There is a sample config file included in this PR that shows how to make use of the feature in a new cluster.
  • Loading branch information
emrahs authored Apr 21, 2023
1 parent b18be27 commit 45129f1
Show file tree
Hide file tree
Showing 23 changed files with 1,367 additions and 163 deletions.
60 changes: 60 additions & 0 deletions common/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,8 @@ type (
SQL *SQL `yaml:"sql"`
// NoSQL contains the config for a NoSQL based datastore
NoSQL *NoSQL `yaml:"nosql"`
// ShardedNoSQL contains the config for a collection of NoSQL datastores that are used as a single datastore
ShardedNoSQL *ShardedNoSQL `yaml:"shardedNosql"`
// ElasticSearch contains the config for a ElasticSearch datastore
ElasticSearch *ElasticSearchConfig `yaml:"elasticsearch"`
}
Expand Down Expand Up @@ -239,6 +241,47 @@ type (
ConnectAttributes map[string]string `yaml:"connectAttributes"`
}

// ShardedNoSQL contains configuration to connect to a set of NoSQL Database clusters in a sharded manner
ShardedNoSQL struct {
// DefaultShard is the DB shard where the non-sharded tables (ie. cluster metadata) are stored
DefaultShard string `yaml:"defaultShard"`
// ShardingPolicy is the configuration for the sharding strategy used
ShardingPolicy ShardingPolicy `yaml:"shardingPolicy"`
// Connections is the collection of NoSQL DB plugins that are used to connect to the shard
Connections map[string]DBShardConnection `yaml:"connections"`
}

// ShardingPolicy contains configuration for physical DB sharding
ShardingPolicy struct {
// HistoryShardMapping defines the ranges of history shards stored by each DB shard. Ranges listed here *MUST*
// be continuous and non-overlapping, such that the first range in the list starts from Shard 0, each following
// range starts with <prevRange.End> + 1, and the last range ends with <NumHistoryHosts>-1.
HistoryShardMapping []HistoryShardRange `yaml:"historyShardMapping"`
// TaskListHashing defines the parameters needed for shard ownership calculation based on hashing
TaskListHashing TasklistHashing `yaml:"taskListHashing"`
}

// HistoryShardRange contains configuration for one NoSQL DB Shard
HistoryShardRange struct {
// Start defines the inclusive lower bound for the history shard range
Start int `yaml:"start"`
// End defines the exclusive upper bound for the history shard range
End int `yaml:"end"`
// Shard defines the shard that owns this range
Shard string `yaml:"shard"`
}

TasklistHashing struct {
// ShardOrder defines the order of shards to be used when hashing tasklists to shards
ShardOrder []string `yaml:"shardOrder"`
}

// DBShardConnection contains configuration for one NoSQL DB Shard
DBShardConnection struct {
// NoSQLPlugin is the NoSQL plugin used for connecting to the DB shard
NoSQLPlugin *NoSQL `yaml:"nosqlPlugin"`
}

// SQL is the configuration for connecting to a SQL backed datastore
SQL struct {
// User is the username to be used for the conn
Expand Down Expand Up @@ -517,6 +560,23 @@ type (
}
)

const (
// NonShardedStoreName is the shard name used for singular (non-sharded) stores
NonShardedStoreName = "NonShardedStore"
)

func (n *NoSQL) ConvertToShardedNoSQLConfig() *ShardedNoSQL {
connections := make(map[string]DBShardConnection)
connections[NonShardedStoreName] = DBShardConnection{
NoSQLPlugin: n,
}

return &ShardedNoSQL{
DefaultShard: NonShardedStoreName,
Connections: connections,
}
}

// ValidateAndFillDefaults validates this config and fills default values if needed
func (c *Config) ValidateAndFillDefaults() error {
c.fillDefaults()
Expand Down
159 changes: 159 additions & 0 deletions common/config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -228,3 +228,162 @@ func TestGetServiceConfig(t *testing.T) {
assert.NoError(t, err)
assert.NotEmpty(t, svc)
}

func getValidShardedNoSQLConfig() *Config {
metadata := validClusterGroupMetadata()
cfg := &Config{
ClusterGroupMetadata: metadata,
Persistence: Persistence{
NumHistoryShards: 2,
DefaultStore: "default",
AdvancedVisibilityStore: "visibility",
DataStores: map[string]DataStore{
"default": {
ShardedNoSQL: &ShardedNoSQL{
DefaultShard: "shard-1",
ShardingPolicy: ShardingPolicy{
HistoryShardMapping: []HistoryShardRange{
HistoryShardRange{
Start: 0,
End: 1,
Shard: "shard-1",
},
HistoryShardRange{
Start: 1,
End: 2,
Shard: "shard-2",
},
},
TaskListHashing: TasklistHashing{
ShardOrder: []string{
"shard-1",
"shard-2",
},
},
},
Connections: map[string]DBShardConnection{
"shard-1": {
NoSQLPlugin: &NoSQL{
PluginName: "cassandra",
Hosts: "127.0.0.1",
Keyspace: "unit-test",
Port: 1234,
},
},
"shard-2": {
NoSQLPlugin: &NoSQL{
PluginName: "cassandra",
Hosts: "127.0.0.1",
Keyspace: "unit-test",
Port: 5678,
},
},
},
},
},
"visibility": {
ElasticSearch: &ElasticSearchConfig{
Version: "v7",
URL: url.URL{Scheme: "http",
Host: "127.0.0.1:9200",
},
Indices: map[string]string{
"visibility": "cadence-visibility-dev",
},
},
},
},
},
}
return cfg
}

func TestValidShardedNoSQLConfig(t *testing.T) {
cfg := getValidShardedNoSQLConfig()
err := cfg.ValidateAndFillDefaults()
require.NoError(t, err)
}

func TestInvalidShardedNoSQLConfig_MultipleConfigTypes(t *testing.T) {
cfg := getValidShardedNoSQLConfig()
store := cfg.Persistence.DataStores["default"]
store.NoSQL = &NoSQL{}
cfg.Persistence.DataStores["default"] = store

err := cfg.ValidateAndFillDefaults()
require.ErrorContains(t, err, "must provide exactly one type of config, but provided 2")
}

func TestInvalidShardedNoSQLConfig_MissingDefaultShard(t *testing.T) {
cfg := getValidShardedNoSQLConfig()
store := cfg.Persistence.DataStores["default"]
store.ShardedNoSQL.DefaultShard = ""

err := cfg.ValidateAndFillDefaults()
require.ErrorContains(t, err, "defaultShard can not be empty")
}

func TestInvalidShardedNoSQLConfig_UnknownDefaultShard(t *testing.T) {
cfg := getValidShardedNoSQLConfig()
store := cfg.Persistence.DataStores["default"]
delete(store.ShardedNoSQL.Connections, store.ShardedNoSQL.DefaultShard)

err := cfg.ValidateAndFillDefaults()
require.ErrorContains(t, err, "defaultShard (shard-1) is not defined in connections list")
}

func TestInvalidShardedNoSQLConfig_HistoryShardingUnordered(t *testing.T) {
cfg := getValidShardedNoSQLConfig()
store := cfg.Persistence.DataStores["default"]
store.ShardedNoSQL.ShardingPolicy.HistoryShardMapping[0].Start = 1
store.ShardedNoSQL.ShardingPolicy.HistoryShardMapping[0].End = 2
store.ShardedNoSQL.ShardingPolicy.HistoryShardMapping[1].Start = 0
store.ShardedNoSQL.ShardingPolicy.HistoryShardMapping[1].End = 1

err := cfg.ValidateAndFillDefaults()
require.ErrorContains(t, err, "Non-continuous history shard range")
}

func TestInvalidShardedNoSQLConfig_HistoryShardingOverlapping(t *testing.T) {
cfg := getValidShardedNoSQLConfig()
store := cfg.Persistence.DataStores["default"]
store.ShardedNoSQL.ShardingPolicy.HistoryShardMapping[0].End = 2 // 0-2 overlaps with 1-2

err := cfg.ValidateAndFillDefaults()
require.ErrorContains(t, err, "Non-continuous history shard range")
}

func TestInvalidShardedNoSQLConfig_HistoryShardingMissingFirstShard(t *testing.T) {
cfg := getValidShardedNoSQLConfig()
store := cfg.Persistence.DataStores["default"]
store.ShardedNoSQL.ShardingPolicy.HistoryShardMapping = store.ShardedNoSQL.ShardingPolicy.HistoryShardMapping[1:]

err := cfg.ValidateAndFillDefaults()
require.ErrorContains(t, err, "Non-continuous history shard range")
}

func TestInvalidShardedNoSQLConfig_HistoryShardingMissingLastShard(t *testing.T) {
cfg := getValidShardedNoSQLConfig()
cfg.Persistence.NumHistoryShards = 3 // config only specifies shards 0 and 1, so this is invalid

err := cfg.ValidateAndFillDefaults()
require.ErrorContains(t, err, "Last history shard found in the config is 1 while the max is 2")
}

func TestInvalidShardedNoSQLConfig_HistoryShardingRefersToUnknownConnection(t *testing.T) {
cfg := getValidShardedNoSQLConfig()
store := cfg.Persistence.DataStores["default"]
store.ShardedNoSQL.ShardingPolicy.HistoryShardMapping[0].Shard = "unknown-shard-name"

err := cfg.ValidateAndFillDefaults()
require.ErrorContains(t, err, "Unknown history shard name")
}

func TestInvalidShardedNoSQLConfig_TasklistShardingRefersToUnknownConnection(t *testing.T) {
cfg := getValidShardedNoSQLConfig()
store := cfg.Persistence.DataStores["default"]
store.ShardedNoSQL.ShardingPolicy.TaskListHashing.ShardOrder[1] = "unknown-shard-name"

err := cfg.ValidateAndFillDefaults()
require.ErrorContains(t, err, "Unknown tasklist shard name")
}
59 changes: 55 additions & 4 deletions common/config/persistence.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,11 +93,18 @@ func (c *Persistence) Validate() error {
if ds.Cassandra != nil && ds.NoSQL != nil && ds.Cassandra != ds.NoSQL {
return fmt.Errorf("persistence config: datastore %v: only one of Cassandra or NoSQL can be specified", st)
}
if ds.SQL == nil && ds.NoSQL == nil {
return fmt.Errorf("persistence config: datastore %v: must provide config for one of SQL or NoSQL stores", st)
configCount := 0
if ds.NoSQL != nil {
configCount++
}
if ds.SQL != nil && ds.NoSQL != nil {
return fmt.Errorf("persistence config: datastore %v: only one of SQL or NoSQL can be specified", st)
if ds.ShardedNoSQL != nil {
configCount++
}
if ds.SQL != nil {
configCount++
}
if configCount != 1 {
return fmt.Errorf("persistence config: datastore %v: must provide exactly one type of config, but provided %d", st, configCount)
}
if ds.SQL != nil {
if ds.SQL.UseMultipleDatabases {
Expand Down Expand Up @@ -136,6 +143,50 @@ func (c *Persistence) Validate() error {
}
}
}
if ds.ShardedNoSQL != nil {
cfg := ds.ShardedNoSQL
connections := cfg.Connections

// validate default shard
if cfg.DefaultShard == "" {
return fmt.Errorf("ShardedNosql config: defaultShard can not be empty")
}
if _, found := connections[cfg.DefaultShard]; !found {
return fmt.Errorf(
"ShardedNosql config: defaultShard (%v) is not defined in connections list", cfg.DefaultShard)
}

// validate history sharding
historyShardMapping := cfg.ShardingPolicy.HistoryShardMapping
currentShardID := 0
for _, shardRange := range historyShardMapping {
if _, found := connections[shardRange.Shard]; !found {
return fmt.Errorf("ShardedNosql config: Unknown history shard name: %v", shardRange.Shard)
}
if shardRange.Start != currentShardID {
return fmt.Errorf("ShardedNosql config: Non-continuous history shard range %v (%v) found while expecting %v",
shardRange.Start,
shardRange.Shard,
currentShardID,
)
}
currentShardID = shardRange.End
}
if currentShardID != c.NumHistoryShards {
return fmt.Errorf("ShardedNosql config: Last history shard found in the config is %v while the max is %v",
currentShardID-1,
c.NumHistoryShards-1,
)
}

// validate tasklist sharding
tasklistShards := cfg.ShardingPolicy.TaskListHashing.ShardOrder
for _, shardName := range tasklistShards {
if _, found := connections[shardName]; !found {
return fmt.Errorf("ShardedNosql config: Unknown tasklist shard name: %v", shardName)
}
}
}
}

return nil
Expand Down
16 changes: 12 additions & 4 deletions common/dynamicconfig/configstore/config_store_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,16 +78,24 @@ func NewConfigStoreClient(clientCfg *csc.ClientConfig, persistenceCfg *config.Pe
return nil, errors.New("default persistence config missing")
}

if store.NoSQL == nil {
return nil, errors.New("NoSQL struct is nil")
if store.NoSQL == nil && store.ShardedNoSQL == nil {
return nil, errors.New("NoSQL and ShardedNoSQL structs are nil")
}

if err := validateClientConfig(clientCfg); err != nil {
logger.Warn("invalid ClientConfig values, using default values")
clientCfg = defaultConfigValues
}

client, err := newConfigStoreClient(clientCfg, store.NoSQL, logger, doneCh)
ds := persistenceCfg.DataStores[persistenceCfg.DefaultStore]
var dsConfig *config.ShardedNoSQL
if ds.ShardedNoSQL != nil {
dsConfig = ds.ShardedNoSQL
} else {
dsConfig = ds.NoSQL.ConvertToShardedNoSQLConfig()
}

client, err := newConfigStoreClient(clientCfg, dsConfig, logger, doneCh)
if err != nil {
return nil, err
}
Expand All @@ -98,7 +106,7 @@ func NewConfigStoreClient(clientCfg *csc.ClientConfig, persistenceCfg *config.Pe
return client, nil
}

func newConfigStoreClient(clientCfg *csc.ClientConfig, persistenceCfg *config.NoSQL, logger log.Logger, doneCh chan struct{}) (*configStoreClient, error) {
func newConfigStoreClient(clientCfg *csc.ClientConfig, persistenceCfg *config.ShardedNoSQL, logger log.Logger, doneCh chan struct{}) (*configStoreClient, error) {
store, err := nosql.NewNoSQLConfigStore(*persistenceCfg, logger, nil)
if err != nil {
return nil, err
Expand Down
15 changes: 12 additions & 3 deletions common/dynamicconfig/configstore/config_store_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -298,6 +298,17 @@ func (s *configStoreClientSuite) SetupTest() {
},
}

connections := make(map[string]config.DBShardConnection)
connections[config.NonShardedStoreName] = config.DBShardConnection{
NoSQLPlugin: &config.NoSQL{
PluginName: "cassandra",
},
}
dbConfig := config.ShardedNoSQL{
DefaultShard: config.NonShardedStoreName,
Connections: connections,
}

var err error
s.client, err = newConfigStoreClient(
&c.ClientConfig{
Expand All @@ -306,9 +317,7 @@ func (s *configStoreClientSuite) SetupTest() {
FetchTimeout: time.Second * 1,
UpdateTimeout: time.Second * 1,
},
&config.NoSQL{
PluginName: "cassandra",
}, log.NewNoop(), s.doneCh)
&dbConfig, log.NewNoop(), s.doneCh)
s.Require().NoError(err)

s.mockManager = p.NewMockConfigStoreManager(s.mockController)
Expand Down
5 changes: 5 additions & 0 deletions common/log/tag/tags.go
Original file line number Diff line number Diff line change
Expand Up @@ -427,6 +427,11 @@ func StoreError(storeErr error) Tag {
return newErrorTag("store-error", storeErr)
}

// StoreShard returns tag for StoreShard
func StoreShard(storeShard string) Tag {
return newPredefinedStringTag("store-shard", storeShard)
}

// ClientError returns tag for ClientError
func ClientError(clientErr error) Tag {
return newErrorTag("client-error", clientErr)
Expand Down
Loading

0 comments on commit 45129f1

Please sign in to comment.