Skip to content

Commit

Permalink
Delete archival config from cluster metadata (cadence-workflow#2282)
Browse files Browse the repository at this point in the history
  • Loading branch information
yycptt authored Jul 26, 2019
1 parent 2cc59b5 commit f6bbaf8
Show file tree
Hide file tree
Showing 49 changed files with 453 additions and 317 deletions.
25 changes: 19 additions & 6 deletions cmd/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (

"github.com/uber/cadence/client"
"github.com/uber/cadence/common"
"github.com/uber/cadence/common/archiver"
"github.com/uber/cadence/common/archiver/provider"
"github.com/uber/cadence/common/cluster"
"github.com/uber/cadence/common/elasticsearch"
Expand Down Expand Up @@ -131,14 +132,11 @@ func (s *server) startService() common.Daemon {
clusterMetadata := s.cfg.ClusterMetadata
params.ClusterMetadata = cluster.NewMetadata(
params.Logger,
dc,
s.cfg.ClusterMetadata.EnableGlobalDomain,
dc.GetBoolProperty(dynamicconfig.EnableGlobalDomain, clusterMetadata.EnableGlobalDomain),
clusterMetadata.FailoverVersionIncrement,
clusterMetadata.MasterClusterName,
clusterMetadata.CurrentClusterName,
clusterMetadata.ClusterInformation,
s.cfg.Archival,
s.cfg.DomainDefaults.Archival,
)

if s.cfg.PublicClient.HostPort != "" {
Expand Down Expand Up @@ -177,13 +175,28 @@ func (s *server) startService() common.Daemon {
}
params.PublicClient = workflowserviceclient.New(dispatcher.ClientConfig(common.FrontendServiceName))

configuredForHistoryArchival := params.ClusterMetadata.HistoryArchivalConfig().ClusterConfiguredForArchival()
// Dynamic archival config is accessed once on cluster startup than never accessed again.
// This is done so as to keep archival status and and the initialization of archiver.Provider in sync.
// TODO: Once archival pause is implemented archival config can be made truly dynamic.
clusterHistoryArchivalStatus := dc.GetStringProperty(dynamicconfig.HistoryArchivalStatus, s.cfg.Archival.History.Status)()
enableReadFromHistoryArchival := dc.GetBoolProperty(dynamicconfig.EnableReadFromHistoryArchival, s.cfg.Archival.History.EnableRead)()
clusterVisibilityArchivalStatus := dc.GetStringProperty(dynamicconfig.VisibilityArchivalStatus, s.cfg.Archival.Visibility.Status)()
enableReadFromVisibilityArchival := dc.GetBoolProperty(dynamicconfig.EnableReadFromVisibilityArchival, s.cfg.Archival.Visibility.EnableRead)()
params.ArchivalMetadata = archiver.NewArchivalMetadata(
clusterHistoryArchivalStatus,
enableReadFromHistoryArchival,
clusterVisibilityArchivalStatus,
enableReadFromVisibilityArchival,
&s.cfg.DomainDefaults.Archival,
)

configuredForHistoryArchival := params.ArchivalMetadata.GetHistoryConfig().ClusterConfiguredForArchival()
historyArchiverProviderCfg := s.cfg.Archival.History.Provider
if (configuredForHistoryArchival && historyArchiverProviderCfg == nil) || (!configuredForHistoryArchival && historyArchiverProviderCfg != nil) {
log.Fatalf("invalid history archival config")
}

configuredForVisibilityArchival := params.ClusterMetadata.VisibilityArchivalConfig().ClusterConfiguredForArchival()
configuredForVisibilityArchival := params.ArchivalMetadata.GetVisibilityConfig().ClusterConfiguredForArchival()
visibilityArchiverProviderCfg := s.cfg.Archival.Visibility.Provider
if (configuredForVisibilityArchival && visibilityArchiverProviderCfg == nil) || (!configuredForVisibilityArchival && visibilityArchiverProviderCfg != nil) {
log.Fatalf("invalid visibility archival config")
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright (c) 2017 Uber Technologies, Inc.
// Copyright (c) 2019 Uber Technologies, Inc.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
Expand All @@ -18,27 +18,47 @@
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.

package cluster
package archiver

import (
"fmt"
"strings"

"github.com/uber/cadence/.gen/go/shared"
"github.com/uber/cadence/common/service/config"
)

type (
// ArchivalStatus represents the archival status of the cluster
ArchivalStatus int
// ArchivalMetadata provides cluster level archival information
ArchivalMetadata interface {
GetHistoryConfig() ArchivalConfig
GetVisibilityConfig() ArchivalConfig
}

// ArchivalConfig is an immutable representation of the archival configuration of the cluster
// This config is determined at cluster startup time
ArchivalConfig struct {
ClusterStatus ArchivalStatus
EnableRead bool
DomainDefaultStatus shared.ArchivalStatus
DomainDefaultURI string
ArchivalConfig interface {
ClusterConfiguredForArchival() bool
GetClusterStatus() ArchivalStatus
ReadEnabled() bool
GetDomainDefaultStatus() shared.ArchivalStatus
GetDomainDefaultURI() string
}

archivalMetadata struct {
historyConfig ArchivalConfig
visibilityConfig ArchivalConfig
}

archivalConfig struct {
clusterStatus ArchivalStatus
enableRead bool
domainDefaultStatus shared.ArchivalStatus
domainDefaultURI string
}

// ArchivalStatus represents the archival status of the cluster
ArchivalStatus int
)

const (
Expand All @@ -51,18 +71,59 @@ const (
ArchivalEnabled
)

// NewArchivalMetadata constructs a new ArchivalMetadata
func NewArchivalMetadata(
historyStatus string,
historyReadEnabled bool,
visibilityStatus string,
visibilityReadEnabled bool,
domainDefaults *config.ArchivalDomainDefaults,
) ArchivalMetadata {
return &archivalMetadata{
historyConfig: NewArchivalConfig(
historyStatus,
historyReadEnabled,
domainDefaults.History.Status,
domainDefaults.History.URI,
),
visibilityConfig: NewArchivalConfig(
visibilityStatus,
visibilityReadEnabled,
domainDefaults.Visibility.Status,
domainDefaults.Visibility.URI,
),
}
}

func (metadata *archivalMetadata) GetHistoryConfig() ArchivalConfig {
return metadata.historyConfig
}

func (metadata *archivalMetadata) GetVisibilityConfig() ArchivalConfig {
return metadata.visibilityConfig
}

// NewArchivalConfig constructs a new valid ArchivalConfig
func NewArchivalConfig(
clusterStatus ArchivalStatus,
clusterStatusStr string,
enableRead bool,
domainDefaultStatus shared.ArchivalStatus,
domainDefaultStatusStr string,
domainDefaultURI string,
) *ArchivalConfig {
ac := &ArchivalConfig{
ClusterStatus: clusterStatus,
EnableRead: enableRead,
DomainDefaultStatus: domainDefaultStatus,
DomainDefaultURI: domainDefaultURI,
) ArchivalConfig {
clusterStatus, err := getClusterArchivalStatus(clusterStatusStr)
if err != nil {
panic(err)
}
domainDefaultStatus, err := getDomainArchivalStatus(domainDefaultStatusStr)
if err != nil {
panic(err)
}

ac := &archivalConfig{
clusterStatus: clusterStatus,
enableRead: enableRead,
domainDefaultStatus: domainDefaultStatus,
domainDefaultURI: domainDefaultURI,
}
if !ac.isValid() {
panic("invalid cluster level archival configuration")
Expand All @@ -71,12 +132,28 @@ func NewArchivalConfig(
}

// ClusterConfiguredForArchival returns true if cluster is configured to handle archival, false otherwise
func (a *ArchivalConfig) ClusterConfiguredForArchival() bool {
return a.ClusterStatus == ArchivalEnabled
func (a *archivalConfig) ClusterConfiguredForArchival() bool {
return a.clusterStatus == ArchivalEnabled
}

func (a *archivalConfig) GetClusterStatus() ArchivalStatus {
return a.clusterStatus
}

func (a *archivalConfig) ReadEnabled() bool {
return a.enableRead
}

func (a *archivalConfig) GetDomainDefaultStatus() shared.ArchivalStatus {
return a.domainDefaultStatus
}

func (a *archivalConfig) GetDomainDefaultURI() string {
return a.domainDefaultURI
}

func (a *ArchivalConfig) isValid() bool {
URISet := len(a.DomainDefaultURI) != 0
func (a *archivalConfig) isValid() bool {
URISet := len(a.domainDefaultURI) != 0
return (URISet && a.ClusterConfiguredForArchival()) || (!URISet && !a.ClusterConfiguredForArchival())
}

Expand Down
62 changes: 62 additions & 0 deletions common/archiver/archivalMetadata_mock.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
// Copyright (c) 2019 Uber Technologies, Inc.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.

package archiver

import (
"github.com/stretchr/testify/mock"
)

// MockArchivalMetadata is an autogenerated mock type for the ArchivalMetadata type
type MockArchivalMetadata struct {
mock.Mock
}

// GetHistoryConfig provides a mock function with given fields:
func (_m *MockArchivalMetadata) GetHistoryConfig() ArchivalConfig {
ret := _m.Called()

var r0 ArchivalConfig
if rf, ok := ret.Get(0).(func() ArchivalConfig); ok {
r0 = rf()
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).(ArchivalConfig)
}
}

return r0
}

// GetVisibilityConfig provides a mock function with given fields:
func (_m *MockArchivalMetadata) GetVisibilityConfig() ArchivalConfig {
ret := _m.Called()

var r0 ArchivalConfig
if rf, ok := ret.Get(0).(func() ArchivalConfig); ok {
r0 = rf()
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).(ArchivalConfig)
}
}

return r0
}
16 changes: 9 additions & 7 deletions common/archiver/provider/provider_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit f6bbaf8

Please sign in to comment.