Skip to content

Commit

Permalink
Introduce a dynamic config for cassandra all consistency level delete (
Browse files Browse the repository at this point in the history
  • Loading branch information
Shaddoll authored Sep 26, 2022
1 parent d9296f6 commit 6952083
Show file tree
Hide file tree
Showing 23 changed files with 160 additions and 81 deletions.
2 changes: 1 addition & 1 deletion common/dynamicconfig/configstore/config_store_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ func NewConfigStoreClient(clientCfg *csc.ClientConfig, persistenceCfg *config.Pe
}

func newConfigStoreClient(clientCfg *csc.ClientConfig, persistenceCfg *config.NoSQL, logger log.Logger, doneCh chan struct{}) (*configStoreClient, error) {
store, err := nosql.NewNoSQLConfigStore(*persistenceCfg, logger)
store, err := nosql.NewNoSQLConfigStore(*persistenceCfg, logger, nil)
if err != nil {
return nil, err
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ func (s *configStoreClientSuite) SetupSuite() {

mockPlugin := nosqlplugin.NewMockPlugin(s.mockController)
mockPlugin.EXPECT().
CreateDB(gomock.Any(), gomock.Any()).
CreateDB(gomock.Any(), gomock.Any(), gomock.Any()).
Return(nil, nil).AnyTimes()
nosql.RegisterPlugin("cassandra", mockPlugin)
}
Expand Down
7 changes: 7 additions & 0 deletions common/dynamicconfig/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -1744,6 +1744,8 @@ const (
// Default value: false
EnablePendingActivityValidation

EnableCassandraAllConsistencyLevelDelete

// LastBoolKey must be the last one in this const group
LastBoolKey
)
Expand Down Expand Up @@ -3730,6 +3732,11 @@ var BoolKeys = map[BoolKey]DynamicBool{
Description: "Enables pending activity count limiting/validation",
DefaultValue: false,
},
EnableCassandraAllConsistencyLevelDelete: DynamicBool{
KeyName: "system.enableCassandraAllConsistencyLevelDelete",
Description: "Uses all consistency level for Cassandra delete operations",
DefaultValue: false,
},
}

var FloatKeys = map[FloatKey]DynamicFloat{
Expand Down
4 changes: 2 additions & 2 deletions common/persistence/client/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -411,7 +411,7 @@ func (f *factoryImpl) init(clusterName string, limiters map[string]quotas.Limite
defaultDataStore := Datastore{ratelimit: limiters[f.config.DefaultStore]}
switch {
case defaultCfg.NoSQL != nil:
defaultDataStore.factory = nosql.NewFactory(*defaultCfg.NoSQL, clusterName, f.logger)
defaultDataStore.factory = nosql.NewFactory(*defaultCfg.NoSQL, clusterName, f.logger, f.dc)
case defaultCfg.SQL != nil:
if defaultCfg.SQL.EncodingType == "" {
defaultCfg.SQL.EncodingType = string(common.EncodingTypeThriftRW)
Expand Down Expand Up @@ -454,7 +454,7 @@ func (f *factoryImpl) init(clusterName string, limiters map[string]quotas.Limite
visibilityDataStore := Datastore{ratelimit: limiters[f.config.VisibilityStore]}
switch {
case visibilityCfg.NoSQL != nil:
visibilityDataStore.factory = nosql.NewFactory(*visibilityCfg.NoSQL, clusterName, f.logger)
visibilityDataStore.factory = nosql.NewFactory(*visibilityCfg.NoSQL, clusterName, f.logger, f.dc)
case visibilityCfg.SQL != nil:
var decodingTypes []common.EncodingType
for _, dt := range visibilityCfg.SQL.DecodingTypes {
Expand Down
6 changes: 4 additions & 2 deletions common/persistence/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,15 @@ import (
type (
// DynamicConfiguration represents dynamic configuration for persistence layer
DynamicConfiguration struct {
EnableSQLAsyncTransaction dynamicconfig.BoolPropertyFn
EnableSQLAsyncTransaction dynamicconfig.BoolPropertyFn
EnableCassandraAllConsistencyLevelDelete dynamicconfig.BoolPropertyFn
}
)

// NewDynamicConfiguration returns new config with default values
func NewDynamicConfiguration(dc *dynamicconfig.Collection) *DynamicConfiguration {
return &DynamicConfiguration{
EnableSQLAsyncTransaction: dc.GetBoolProperty(dynamicconfig.EnableSQLAsyncTransaction),
EnableSQLAsyncTransaction: dc.GetBoolProperty(dynamicconfig.EnableSQLAsyncTransaction),
EnableCassandraAllConsistencyLevelDelete: dc.GetBoolProperty(dynamicconfig.EnableCassandraAllConsistencyLevelDelete),
}
}
23 changes: 13 additions & 10 deletions common/persistence/nosql/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ type (
clusterName string
logger log.Logger
execStoreFactory *executionStoreFactory
dc *p.DynamicConfiguration
}

executionStoreFactory struct {
Expand All @@ -47,32 +48,33 @@ type (

// NewFactory returns an instance of a factory object which can be used to create
// datastores that are backed by cassandra
func NewFactory(cfg config.Cassandra, clusterName string, logger log.Logger) *Factory {
func NewFactory(cfg config.Cassandra, clusterName string, logger log.Logger, dc *p.DynamicConfiguration) *Factory {
return &Factory{
cfg: cfg,
clusterName: clusterName,
logger: logger,
dc: dc,
}
}

// NewTaskStore returns a new task store
func (f *Factory) NewTaskStore() (p.TaskStore, error) {
return newNoSQLTaskStore(f.cfg, f.logger)
return newNoSQLTaskStore(f.cfg, f.logger, f.dc)
}

// NewShardStore returns a new shard store
func (f *Factory) NewShardStore() (p.ShardStore, error) {
return newNoSQLShardStore(f.cfg, f.clusterName, f.logger)
return newNoSQLShardStore(f.cfg, f.clusterName, f.logger, f.dc)
}

// NewHistoryStore returns a new history store
func (f *Factory) NewHistoryStore() (p.HistoryStore, error) {
return newNoSQLHistoryStore(f.cfg, f.logger)
return newNoSQLHistoryStore(f.cfg, f.logger, f.dc)
}

// NewDomainStore returns a metadata store that understands only v2
func (f *Factory) NewDomainStore() (p.DomainStore, error) {
return newNoSQLDomainStore(f.cfg, f.clusterName, f.logger)
return newNoSQLDomainStore(f.cfg, f.clusterName, f.logger, f.dc)
}

// NewExecutionStore returns an ExecutionStore for a given shardID
Expand All @@ -86,17 +88,17 @@ func (f *Factory) NewExecutionStore(shardID int) (p.ExecutionStore, error) {

// NewVisibilityStore returns a visibility store
func (f *Factory) NewVisibilityStore(sortByCloseTime bool) (p.VisibilityStore, error) {
return newNoSQLVisibilityStore(sortByCloseTime, f.cfg, f.logger)
return newNoSQLVisibilityStore(sortByCloseTime, f.cfg, f.logger, f.dc)
}

// NewQueue returns a new queue backed by cassandra
func (f *Factory) NewQueue(queueType p.QueueType) (p.Queue, error) {
return newNoSQLQueueStore(f.cfg, f.logger, queueType)
return newNoSQLQueueStore(f.cfg, f.logger, queueType, f.dc)
}

// NewConfigStore returns a new config store
func (f *Factory) NewConfigStore() (p.ConfigStore, error) {
return NewNoSQLConfigStore(f.cfg, f.logger)
return NewNoSQLConfigStore(f.cfg, f.logger, f.dc)
}

// Close closes the factory
Expand All @@ -121,7 +123,7 @@ func (f *Factory) executionStoreFactory() (*executionStoreFactory, error) {
return f.execStoreFactory, nil
}

factory, err := newExecutionStoreFactory(f.cfg, f.logger)
factory, err := newExecutionStoreFactory(f.cfg, f.logger, f.dc)
if err != nil {
return nil, err
}
Expand All @@ -133,9 +135,10 @@ func (f *Factory) executionStoreFactory() (*executionStoreFactory, error) {
func newExecutionStoreFactory(
cfg config.Cassandra,
logger log.Logger,
dc *p.DynamicConfiguration,
) (*executionStoreFactory, error) {

db, err := NewNoSQLDB(&cfg, logger)
db, err := NewNoSQLDB(&cfg, logger, dc)
if err != nil {
return nil, err
}
Expand Down
3 changes: 2 additions & 1 deletion common/persistence/nosql/nosqlConfigStore.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,9 @@ type (
func NewNoSQLConfigStore(
cfg config.NoSQL,
logger log.Logger,
dc *persistence.DynamicConfiguration,
) (persistence.ConfigStore, error) {
db, err := NewNoSQLDB(&cfg, logger)
db, err := NewNoSQLDB(&cfg, logger, dc)
if err != nil {
return nil, err
}
Expand Down
3 changes: 2 additions & 1 deletion common/persistence/nosql/nosqlDomainStore.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,9 @@ func newNoSQLDomainStore(
cfg config.NoSQL,
currentClusterName string,
logger log.Logger,
dc *p.DynamicConfiguration,
) (p.DomainStore, error) {
db, err := NewNoSQLDB(&cfg, logger)
db, err := NewNoSQLDB(&cfg, logger, dc)
if err != nil {
return nil, err
}
Expand Down
3 changes: 2 additions & 1 deletion common/persistence/nosql/nosqlHistoryStore.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,9 @@ func NewNoSQLHistoryStoreFromSession(
func newNoSQLHistoryStore(
cfg config.NoSQL,
logger log.Logger,
dc *p.DynamicConfiguration,
) (p.HistoryStore, error) {
db, err := NewNoSQLDB(&cfg, logger)
db, err := NewNoSQLDB(&cfg, logger, dc)
if err != nil {
return nil, err
}
Expand Down
5 changes: 3 additions & 2 deletions common/persistence/nosql/nosqlPersistenceTest.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/uber/cadence/common/config"
"github.com/uber/cadence/common/dynamicconfig"
"github.com/uber/cadence/common/log/loggerimpl"
"github.com/uber/cadence/common/persistence"
"github.com/uber/cadence/common/persistence/persistence-tests/testcluster"
)

Expand Down Expand Up @@ -78,7 +79,7 @@ func (s *testCluster) Config() config.Persistence {

// SetupTestDatabase from PersistenceTestCluster interface
func (s *testCluster) SetupTestDatabase() {
adminDB, err := NewNoSQLAdminDB(&s.cfg, loggerimpl.NewNopLogger())
adminDB, err := NewNoSQLAdminDB(&s.cfg, loggerimpl.NewNopLogger(), &persistence.DynamicConfiguration{})

if err != nil {
log.Fatal(err)
Expand All @@ -91,7 +92,7 @@ func (s *testCluster) SetupTestDatabase() {

// TearDownTestDatabase from PersistenceTestCluster interface
func (s *testCluster) TearDownTestDatabase() {
adminDB, err := NewNoSQLAdminDB(&s.cfg, loggerimpl.NewNopLogger())
adminDB, err := NewNoSQLAdminDB(&s.cfg, loggerimpl.NewNopLogger(), &persistence.DynamicConfiguration{})
if err != nil {
log.Fatal(err)
}
Expand Down
4 changes: 3 additions & 1 deletion common/persistence/nosql/nosqlQueueStore.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/uber/cadence/common/config"
"github.com/uber/cadence/common/log"
"github.com/uber/cadence/common/persistence"
p "github.com/uber/cadence/common/persistence"
"github.com/uber/cadence/common/persistence/nosql/nosqlplugin"
"github.com/uber/cadence/common/types"
)
Expand All @@ -46,8 +47,9 @@ func newNoSQLQueueStore(
cfg config.NoSQL,
logger log.Logger,
queueType persistence.QueueType,
dc *p.DynamicConfiguration,
) (persistence.Queue, error) {
db, err := NewNoSQLDB(&cfg, logger)
db, err := NewNoSQLDB(&cfg, logger, dc)
if err != nil {
return nil, err
}
Expand Down
3 changes: 2 additions & 1 deletion common/persistence/nosql/nosqlShardStore.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,9 @@ func newNoSQLShardStore(
cfg config.NoSQL,
clusterName string,
logger log.Logger,
dc *p.DynamicConfiguration,
) (p.ShardStore, error) {
db, err := NewNoSQLDB(&cfg, logger)
db, err := NewNoSQLDB(&cfg, logger, dc)
if err != nil {
return nil, err
}
Expand Down
3 changes: 2 additions & 1 deletion common/persistence/nosql/nosqlTaskStore.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,9 @@ var _ p.TaskStore = (*nosqlTaskStore)(nil)
func newNoSQLTaskStore(
cfg config.NoSQL,
logger log.Logger,
dc *p.DynamicConfiguration,
) (p.TaskStore, error) {
db, err := NewNoSQLDB(&cfg, logger)
db, err := NewNoSQLDB(&cfg, logger, dc)
if err != nil {
return nil, err
}
Expand Down
3 changes: 2 additions & 1 deletion common/persistence/nosql/nosqlVisibilityStore.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,9 @@ func newNoSQLVisibilityStore(
listClosedOrderingByCloseTime bool,
cfg config.NoSQL,
logger log.Logger,
dc *p.DynamicConfiguration,
) (p.VisibilityStore, error) {
db, err := NewNoSQLDB(&cfg, logger)
db, err := NewNoSQLDB(&cfg, logger, dc)
if err != nil {
return nil, err
}
Expand Down
35 changes: 22 additions & 13 deletions common/persistence/nosql/nosqlplugin/cassandra/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/uber/cadence/common/config"
"github.com/uber/cadence/common/log"
"github.com/uber/cadence/common/log/tag"
"github.com/uber/cadence/common/persistence"
"github.com/uber/cadence/common/persistence/nosql/nosqlplugin"
"github.com/uber/cadence/common/persistence/nosql/nosqlplugin/cassandra/gocql"
)
Expand All @@ -34,17 +35,19 @@ type cdb struct {
client gocql.Client
session gocql.Session
cfg *config.NoSQL
dc *persistence.DynamicConfiguration
}

var _ nosqlplugin.DB = (*cdb)(nil)

// newCassandraDBFromSession returns a DB from a session
func newCassandraDBFromSession(cfg *config.NoSQL, session gocql.Session, logger log.Logger) *cdb {
func newCassandraDBFromSession(cfg *config.NoSQL, session gocql.Session, logger log.Logger, dc *persistence.DynamicConfiguration) *cdb {
return &cdb{
client: gocql.GetRegisteredClient(),
session: session,
logger: logger,
cfg: cfg,
dc: dc,
}
}

Expand Down Expand Up @@ -79,23 +82,29 @@ func (db *cdb) isCassandraConsistencyError(err error) bool {
}

func (db *cdb) executeWithConsistencyAll(q gocql.Query) error {
if err := q.Consistency(cassandraAllConslevel).Exec(); err != nil {
if db.isCassandraConsistencyError(err) {
db.logger.Warn("unable to complete the delete operation due to consistency issue", tag.Error(err))
return q.Consistency(cassandraDefaultConsLevel).Exec()
if db.dc != nil && db.dc.EnableCassandraAllConsistencyLevelDelete() {
if err := q.Consistency(cassandraAllConslevel).Exec(); err != nil {
if db.isCassandraConsistencyError(err) {
db.logger.Warn("unable to complete the delete operation due to consistency issue", tag.Error(err))
return q.Consistency(cassandraDefaultConsLevel).Exec()
}
return err
}
return err
return nil
}
return nil
return q.Exec()
}

func (db *cdb) executeBatchWithConsistencyAll(b gocql.Batch) error {
if err := db.session.ExecuteBatch(b.Consistency(cassandraAllConslevel)); err != nil {
if db.isCassandraConsistencyError(err) {
db.logger.Warn("unable to complete the delete operation due to consistency issue", tag.Error(err))
return db.session.ExecuteBatch(b.Consistency(cassandraDefaultConsLevel))
if db.dc != nil && db.dc.EnableCassandraAllConsistencyLevelDelete() {
if err := db.session.ExecuteBatch(b.Consistency(cassandraAllConslevel)); err != nil {
if db.isCassandraConsistencyError(err) {
db.logger.Warn("unable to complete the delete operation due to consistency issue", tag.Error(err))
return db.session.ExecuteBatch(b.Consistency(cassandraDefaultConsLevel))
}
return err
}
return err
return nil
}
return nil
return db.session.ExecuteBatch(b)
}
13 changes: 7 additions & 6 deletions common/persistence/nosql/nosqlplugin/cassandra/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (

"github.com/uber/cadence/common/config"
"github.com/uber/cadence/common/log"
"github.com/uber/cadence/common/persistence"
"github.com/uber/cadence/common/persistence/nosql"
"github.com/uber/cadence/common/persistence/nosql/nosqlplugin"
"github.com/uber/cadence/common/persistence/nosql/nosqlplugin/cassandra/gocql"
Expand All @@ -47,12 +48,12 @@ func init() {
}

// CreateDB initialize the db object
func (p *plugin) CreateDB(cfg *config.NoSQL, logger log.Logger) (nosqlplugin.DB, error) {
return p.doCreateDB(cfg, logger)
func (p *plugin) CreateDB(cfg *config.NoSQL, logger log.Logger, dc *persistence.DynamicConfiguration) (nosqlplugin.DB, error) {
return p.doCreateDB(cfg, logger, dc)
}

// CreateAdminDB initialize the AdminDB object
func (p *plugin) CreateAdminDB(cfg *config.NoSQL, logger log.Logger) (nosqlplugin.AdminDB, error) {
func (p *plugin) CreateAdminDB(cfg *config.NoSQL, logger log.Logger, dc *persistence.DynamicConfiguration) (nosqlplugin.AdminDB, error) {
// the keyspace is not created yet, so use empty and let the Cassandra connect
keyspace := cfg.Keyspace
cfg.Keyspace = ""
Expand All @@ -61,15 +62,15 @@ func (p *plugin) CreateAdminDB(cfg *config.NoSQL, logger log.Logger) (nosqlplugi
cfg.Keyspace = keyspace
}()

return p.doCreateDB(cfg, logger)
return p.doCreateDB(cfg, logger, dc)
}

func (p *plugin) doCreateDB(cfg *config.NoSQL, logger log.Logger) (*cdb, error) {
func (p *plugin) doCreateDB(cfg *config.NoSQL, logger log.Logger, dc *persistence.DynamicConfiguration) (*cdb, error) {
session, err := gocql.GetRegisteredClient().CreateSession(toGoCqlConfig(cfg))
if err != nil {
return nil, err
}
db := newCassandraDBFromSession(cfg, session, logger)
db := newCassandraDBFromSession(cfg, session, logger, dc)
return db, nil
}

Expand Down
Loading

0 comments on commit 6952083

Please sign in to comment.