Skip to content

Commit

Permalink
Adds Dynamic-config type (cadence-workflow#5261)
Browse files Browse the repository at this point in the history
- Refactors the config-store library to include a new dimension called 'configType' which allows for multiple stores.
- Refactors the config-store slightly to reflect the fact that it is indeed actually a daemon

These changes are tested and part of a larger set of changes of the 'zonal-isolation' feature
  • Loading branch information
davidporter-id-au authored May 9, 2023
1 parent e3e9fa7 commit edf534c
Show file tree
Hide file tree
Showing 10 changed files with 105 additions and 60 deletions.
4 changes: 3 additions & 1 deletion cmd/server/cadence/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ import (
"log"
"time"

"github.com/uber/cadence/common/persistence"

"go.uber.org/cadence/.gen/go/cadence/workflowserviceclient"
"go.uber.org/cadence/compatibility"

Expand Down Expand Up @@ -126,7 +128,7 @@ func (s *server) startService() common.Daemon {
&s.cfg.DynamicConfig.ConfigStore,
&s.cfg.Persistence,
params.Logger,
s.doneC,
persistence.DynamicConfig,
)
case dynamicconfig.FileBasedClient:
params.Logger.Info("initialising File Based dynamic config client")
Expand Down
53 changes: 48 additions & 5 deletions common/dynamicconfig/configstore/config_store_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ import (
"sync/atomic"
"time"

"github.com/uber/cadence/common"

"github.com/uber/cadence/common/config"
dc "github.com/uber/cadence/common/dynamicconfig"
csc "github.com/uber/cadence/common/dynamicconfig/configstore/config"
Expand All @@ -41,6 +43,12 @@ import (

var _ dc.Client = (*configStoreClient)(nil)

// Client is a stateful config store
type Client interface {
common.Daemon
dc.Client
}

const (
configStoreMinPollInterval = time.Second * 2
)
Expand All @@ -53,6 +61,8 @@ var defaultConfigValues = &csc.ClientConfig{
}

type configStoreClient struct {
status int32
configStoreType persistence.ConfigType
values atomic.Value
lastUpdatedTime time.Time
config *csc.ClientConfig
Expand All @@ -68,7 +78,11 @@ type cacheEntry struct {
}

// NewConfigStoreClient creates a config store client
func NewConfigStoreClient(clientCfg *csc.ClientConfig, persistenceCfg *config.Persistence, logger log.Logger, doneCh chan struct{}) (dc.Client, error) {
func NewConfigStoreClient(clientCfg *csc.ClientConfig,
persistenceCfg *config.Persistence,
logger log.Logger,
configType persistence.ConfigType,
) (Client, error) {
if persistenceCfg == nil {
return nil, errors.New("persistence cfg is nil")
}
Expand All @@ -95,7 +109,7 @@ func NewConfigStoreClient(clientCfg *csc.ClientConfig, persistenceCfg *config.Pe
dsConfig = ds.NoSQL.ConvertToShardedNoSQLConfig()
}

client, err := newConfigStoreClient(clientCfg, dsConfig, logger, doneCh)
client, err := newConfigStoreClient(clientCfg, dsConfig, logger, configType)
if err != nil {
return nil, err
}
Expand All @@ -106,17 +120,25 @@ func NewConfigStoreClient(clientCfg *csc.ClientConfig, persistenceCfg *config.Pe
return client, nil
}

func newConfigStoreClient(clientCfg *csc.ClientConfig, persistenceCfg *config.ShardedNoSQL, logger log.Logger, doneCh chan struct{}) (*configStoreClient, error) {
func newConfigStoreClient(
clientCfg *csc.ClientConfig,
persistenceCfg *config.ShardedNoSQL,
logger log.Logger,
configType persistence.ConfigType,
) (*configStoreClient, error) {
store, err := nosql.NewNoSQLConfigStore(*persistenceCfg, logger, nil)
if err != nil {
return nil, err
}

doneCh := make(chan struct{})
client := &configStoreClient{
status: common.DaemonStatusStarted,
config: clientCfg,
doneCh: doneCh,
configStoreManager: persistence.NewConfigStoreManagerImpl(store, logger),
logger: logger,
configStoreType: configType,
}

return client, nil
Expand Down Expand Up @@ -337,6 +359,23 @@ func (csc *configStoreClient) ListValue(name dc.Key) ([]*types.DynamicConfigEntr
return resList, nil
}

func (csc *configStoreClient) Stop() {
if !atomic.CompareAndSwapInt32(&csc.status, common.DaemonStatusStarted, common.DaemonStatusStopped) {
return
}
close(csc.doneCh)
}

func (csc *configStoreClient) Start() {
err := csc.startUpdate()
if err != nil {
csc.logger.Fatal("could not start config store", tag.Error(err))
}
if !atomic.CompareAndSwapInt32(&csc.status, common.DaemonStatusInitialized, common.DaemonStatusStarted) {
return
}
}

func (csc *configStoreClient) updateValue(name dc.Key, dcValues []*types.DynamicConfigValue, retryAttempts int) error {
//since values are not unique, no way to know if you are trying to update a specific value
//or if you want to add another of the same value with different filters.
Expand Down Expand Up @@ -413,7 +452,7 @@ func (csc *configStoreClient) updateValue(name dc.Key, dcValues []*types.Dynamic
ctx,
&persistence.UpdateDynamicConfigRequest{
Snapshot: newSnapshot,
},
}, csc.configStoreType,
)

select {
Expand Down Expand Up @@ -506,7 +545,7 @@ func (csc *configStoreClient) update() error {
ctx, cancel := context.WithTimeout(context.Background(), csc.config.FetchTimeout)
defer cancel()

res, err := csc.configStoreManager.FetchDynamicConfig(ctx)
res, err := csc.configStoreManager.FetchDynamicConfig(ctx, csc.configStoreType)

select {
case <-ctx.Done():
Expand Down Expand Up @@ -672,6 +711,10 @@ func validateKeyDataBlobPair(key dc.Key, blob *types.DataBlob) error {
if _, ok := value.(map[string]interface{}); !ok {
return err
}
case dc.ListKey:
if _, ok := value.([]interface{}); !ok {
return err
}
default:
return fmt.Errorf("unknown key type: %T", key)
}
Expand Down
52 changes: 26 additions & 26 deletions common/dynamicconfig/configstore/config_store_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -304,10 +304,6 @@ func (s *configStoreClientSuite) SetupTest() {
PluginName: "cassandra",
},
}
dbConfig := config.ShardedNoSQL{
DefaultShard: config.NonShardedStoreName,
Connections: connections,
}

var err error
s.client, err = newConfigStoreClient(
Expand All @@ -317,7 +313,11 @@ func (s *configStoreClientSuite) SetupTest() {
FetchTimeout: time.Second * 1,
UpdateTimeout: time.Second * 1,
},
&dbConfig, log.NewNoop(), s.doneCh)

&config.ShardedNoSQL{
DefaultShard: config.NonShardedStoreName,
Connections: connections,
}, log.NewNoop(), p.DynamicConfig)
s.Require().NoError(err)

s.mockManager = p.NewMockConfigStoreManager(s.mockController)
Expand All @@ -326,7 +326,7 @@ func (s *configStoreClientSuite) SetupTest() {

func defaultTestSetup(s *configStoreClientSuite) {
s.mockManager.EXPECT().
FetchDynamicConfig(gomock.Any()).
FetchDynamicConfig(gomock.Any(), p.DynamicConfig).
Return(&p.FetchDynamicConfigResponse{
Snapshot: snapshot1,
}, nil).
Expand Down Expand Up @@ -693,8 +693,8 @@ func (s *configStoreClientSuite) TestUpdateValue_NilOverwrite() {
defaultTestSetup(s)

s.mockManager.EXPECT().
UpdateDynamicConfig(gomock.Any(), gomock.Any()).
DoAndReturn(func(_ context.Context, request *p.UpdateDynamicConfigRequest) error {
UpdateDynamicConfig(gomock.Any(), gomock.Any(), p.DynamicConfig).
DoAndReturn(func(_ context.Context, request *p.UpdateDynamicConfigRequest, cfgType p.ConfigType) error {
if request.Snapshot.Values.Entries[0].Name != dc.TestGetBoolPropertyKey.String() {
return nil
}
Expand All @@ -709,7 +709,7 @@ func (s *configStoreClientSuite) TestUpdateValue_NoRetrySuccess() {
defaultTestSetup(s)

s.mockManager.EXPECT().
UpdateDynamicConfig(gomock.Any(), EqSnapshotVersion(2)).
UpdateDynamicConfig(gomock.Any(), EqSnapshotVersion(2), p.DynamicConfig).
Return(nil).MaxTimes(1)

values := []*types.DynamicConfigValue{
Expand All @@ -728,7 +728,7 @@ func (s *configStoreClientSuite) TestUpdateValue_NoRetrySuccess() {
snapshot2 := snapshot1
snapshot2.Values.Entries[0].Values = values
s.mockManager.EXPECT().
FetchDynamicConfig(gomock.Any()).
FetchDynamicConfig(gomock.Any(), p.DynamicConfig).
Return(&p.FetchDynamicConfigResponse{
Snapshot: snapshot2,
}, nil).MaxTimes(1)
Expand All @@ -753,7 +753,7 @@ func (s *configStoreClientSuite) TestUpdateValue_SuccessNewKey() {
}

s.mockManager.EXPECT().
FetchDynamicConfig(gomock.Any()).
FetchDynamicConfig(gomock.Any(), p.DynamicConfig).
Return(&p.FetchDynamicConfigResponse{
Snapshot: &p.DynamicConfigSnapshot{
Version: 1,
Expand All @@ -766,8 +766,8 @@ func (s *configStoreClientSuite) TestUpdateValue_SuccessNewKey() {
AnyTimes()

s.mockManager.EXPECT().
UpdateDynamicConfig(gomock.Any(), gomock.Any()).
DoAndReturn(func(_ context.Context, request *p.UpdateDynamicConfigRequest) error {
UpdateDynamicConfig(gomock.Any(), gomock.Any(), p.DynamicConfig).
DoAndReturn(func(_ context.Context, request *p.UpdateDynamicConfigRequest, cfgType p.ConfigType) error {
s.Equal(1, len(request.Snapshot.Values.Entries))
s.Equal(request.Snapshot.Values.Entries[0].Values, values)
return nil
Expand All @@ -780,16 +780,16 @@ func (s *configStoreClientSuite) TestUpdateValue_SuccessNewKey() {

func (s *configStoreClientSuite) TestUpdateValue_RetrySuccess() {
s.mockManager.EXPECT().
UpdateDynamicConfig(gomock.Any(), EqSnapshotVersion(2)).
UpdateDynamicConfig(gomock.Any(), EqSnapshotVersion(2), p.DynamicConfig).
Return(&p.ConditionFailedError{}).AnyTimes()

s.mockManager.EXPECT().
UpdateDynamicConfig(gomock.Any(), EqSnapshotVersion(3)).
UpdateDynamicConfig(gomock.Any(), EqSnapshotVersion(3), p.DynamicConfig).
Return(nil).AnyTimes()

snapshot1.Version = 2
s.mockManager.EXPECT().
FetchDynamicConfig(gomock.Any()).
FetchDynamicConfig(gomock.Any(), p.DynamicConfig).
Return(&p.FetchDynamicConfigResponse{
Snapshot: snapshot1,
}, nil).AnyTimes()
Expand All @@ -804,7 +804,7 @@ func (s *configStoreClientSuite) TestUpdateValue_RetryFailure() {
defaultTestSetup(s)

s.mockManager.EXPECT().
UpdateDynamicConfig(gomock.Any(), gomock.Any()).
UpdateDynamicConfig(gomock.Any(), gomock.Any(), p.DynamicConfig).
Return(&p.ConditionFailedError{}).MaxTimes(retryAttempts + 1)

err := s.client.UpdateValue(dc.TestGetFloat64PropertyKey, []*types.DynamicConfigValue{})
Expand All @@ -814,8 +814,8 @@ func (s *configStoreClientSuite) TestUpdateValue_RetryFailure() {
func (s *configStoreClientSuite) TestUpdateValue_Timeout() {
defaultTestSetup(s)
s.mockManager.EXPECT().
UpdateDynamicConfig(gomock.Any(), gomock.Any()).
DoAndReturn(func(_ context.Context, _ *p.UpdateDynamicConfigRequest) error {
UpdateDynamicConfig(gomock.Any(), gomock.Any(), p.DynamicConfig).
DoAndReturn(func(_ context.Context, _ *p.UpdateDynamicConfigRequest, cfgType p.ConfigType) error {
time.Sleep(2 * time.Second)
return nil
}).AnyTimes()
Expand All @@ -827,8 +827,8 @@ func (s *configStoreClientSuite) TestUpdateValue_Timeout() {
func (s *configStoreClientSuite) TestRestoreValue_NoFilter() {
defaultTestSetup(s)
s.mockManager.EXPECT().
UpdateDynamicConfig(gomock.Any(), gomock.Any()).
DoAndReturn(func(_ context.Context, request *p.UpdateDynamicConfigRequest) error {
UpdateDynamicConfig(gomock.Any(), gomock.Any(), p.DynamicConfig).
DoAndReturn(func(_ context.Context, request *p.UpdateDynamicConfigRequest, cfgType p.ConfigType) error {
for _, entry := range request.Snapshot.Values.Entries {
if entry.Name == dc.TestGetBoolPropertyKey.String() {
for _, value := range entry.Values {
Expand All @@ -850,8 +850,8 @@ func (s *configStoreClientSuite) TestRestoreValue_FilterNoMatch() {
defaultTestSetup(s)

s.mockManager.EXPECT().
UpdateDynamicConfig(gomock.Any(), gomock.Any()).
DoAndReturn(func(_ context.Context, request *p.UpdateDynamicConfigRequest) error {
UpdateDynamicConfig(gomock.Any(), gomock.Any(), p.DynamicConfig).
DoAndReturn(func(_ context.Context, request *p.UpdateDynamicConfigRequest, cfgType p.ConfigType) error {
for _, resEntry := range request.Snapshot.Values.Entries {
for _, oriEntry := range snapshot1.Values.Entries {
if oriEntry.Name == resEntry.Name {
Expand All @@ -873,8 +873,8 @@ func (s *configStoreClientSuite) TestRestoreValue_FilterNoMatch() {
func (s *configStoreClientSuite) TestRestoreValue_FilterMatch() {
defaultTestSetup(s)
s.mockManager.EXPECT().
UpdateDynamicConfig(gomock.Any(), gomock.Any()).
DoAndReturn(func(_ context.Context, request *p.UpdateDynamicConfigRequest) error {
UpdateDynamicConfig(gomock.Any(), gomock.Any(), p.DynamicConfig).
DoAndReturn(func(_ context.Context, request *p.UpdateDynamicConfigRequest, cfgType p.ConfigType) error {
for _, resEntry := range request.Snapshot.Values.Entries {
if resEntry.Name == dc.TestGetBoolPropertyKey.String() {
s.Equal(2, len(resEntry.Values))
Expand Down Expand Up @@ -906,7 +906,7 @@ func (s *configStoreClientSuite) TestListValues() {

func (s *configStoreClientSuite) TestListValues_EmptyCache() {
s.mockManager.EXPECT().
FetchDynamicConfig(gomock.Any()).
FetchDynamicConfig(gomock.Any(), p.DynamicConfig).
Return(&p.FetchDynamicConfigResponse{
Snapshot: &p.DynamicConfigSnapshot{
Version: 1,
Expand Down
8 changes: 4 additions & 4 deletions common/persistence/configStoreManager.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,8 @@ func (m *configStoreManagerImpl) Close() {
m.persistence.Close()
}

func (m *configStoreManagerImpl) FetchDynamicConfig(ctx context.Context) (*FetchDynamicConfigResponse, error) {
values, err := m.persistence.FetchConfig(ctx, DynamicConfig)
func (m *configStoreManagerImpl) FetchDynamicConfig(ctx context.Context, cfgType ConfigType) (*FetchDynamicConfigResponse, error) {
values, err := m.persistence.FetchConfig(ctx, cfgType)
if err != nil || values == nil {
return nil, err
}
Expand All @@ -70,14 +70,14 @@ func (m *configStoreManagerImpl) FetchDynamicConfig(ctx context.Context) (*Fetch
}}, nil
}

func (m *configStoreManagerImpl) UpdateDynamicConfig(ctx context.Context, request *UpdateDynamicConfigRequest) error {
func (m *configStoreManagerImpl) UpdateDynamicConfig(ctx context.Context, request *UpdateDynamicConfigRequest, cfgType ConfigType) error {
blob, err := m.serializer.SerializeDynamicConfigBlob(request.Snapshot.Values, common.EncodingTypeThriftRW)
if err != nil {
return err
}

entry := &InternalConfigStoreEntry{
RowType: int(DynamicConfig),
RowType: int(cfgType),
Version: request.Snapshot.Version,
Timestamp: time.Now(),
Values: blob,
Expand Down
4 changes: 2 additions & 2 deletions common/persistence/dataManagerInterfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -1866,8 +1866,8 @@ type (

ConfigStoreManager interface {
Closeable
FetchDynamicConfig(ctx context.Context) (*FetchDynamicConfigResponse, error)
UpdateDynamicConfig(ctx context.Context, request *UpdateDynamicConfigRequest) error
FetchDynamicConfig(ctx context.Context, cfgType ConfigType) (*FetchDynamicConfigResponse, error)
UpdateDynamicConfig(ctx context.Context, request *UpdateDynamicConfigRequest, cfgType ConfigType) error
//can add functions for config types other than dynamic config
}
)
Expand Down
16 changes: 8 additions & 8 deletions common/persistence/dataManagerInterfaces_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit edf534c

Please sign in to comment.