Skip to content

Commit

Permalink
worker: auto-register system domain (cadence-workflow#1761)
Browse files Browse the repository at this point in the history
  • Loading branch information
venkat1109 authored May 1, 2019
1 parent b7552d0 commit 5971e43
Show file tree
Hide file tree
Showing 3 changed files with 81 additions and 27 deletions.
3 changes: 3 additions & 0 deletions Gopkg.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 3 additions & 1 deletion common/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,9 @@ const (

const (
// SystemDomainName is domain name for all cadence system workflows
SystemDomainName = "cadence-system"
SystemDomainName = "cadence-system"
SystemDomainID = "32049b68-7872-4094-8e63-d0dd59896a83"
SystemDomainRetentionDays = 7
)

const (
Expand Down
101 changes: 75 additions & 26 deletions service/worker/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,17 +21,18 @@
package worker

import (
"context"
"sync/atomic"
"time"

"github.com/uber/cadence/.gen/go/shared"
"github.com/uber/cadence/common"
"github.com/uber/cadence/common/blobstore"
"github.com/uber/cadence/common/cache"
"github.com/uber/cadence/common/log"
"github.com/uber/cadence/common/log/loggerimpl"
"github.com/uber/cadence/common/log/tag"
"github.com/uber/cadence/common/metrics"
"github.com/uber/cadence/common/persistence"
persistencefactory "github.com/uber/cadence/common/persistence/persistence-factory"
"github.com/uber/cadence/common/service"
"github.com/uber/cadence/common/service/config"
Expand All @@ -40,8 +41,6 @@ import (
"github.com/uber/cadence/service/worker/indexer"
"github.com/uber/cadence/service/worker/replicator"
"github.com/uber/cadence/service/worker/scanner"
"go.uber.org/cadence/.gen/go/cadence/workflowserviceclient"
"go.uber.org/cadence/client"
)

type (
Expand All @@ -68,6 +67,8 @@ type (
}
)

const domainRefreshInterval = time.Second * 30

// NewService builds a new cadence-worker service
func NewService(params *service.BootstrapParams) common.Daemon {
config := NewConfig(params)
Expand Down Expand Up @@ -124,21 +125,32 @@ func (s *Service) Start() {
s.metricsClient = base.GetMetricsClient()
s.logger.Info("service starting", tag.ComponentWorker)

pConfig := s.params.PersistenceConfig
pConfig.SetMaxQPS(pConfig.DefaultStore, s.config.ReplicationCfg.PersistenceMaxQPS())
pFactory := persistencefactory.New(&pConfig, s.params.ClusterMetadata.GetCurrentClusterName(), s.metricsClient, s.logger)

if base.GetClusterMetadata().IsGlobalDomainEnabled() {
s.startReplicator(base, pFactory)
}
if base.GetClusterMetadata().ArchivalConfig().ConfiguredForArchival() {
s.startArchiver(base, pFactory)
}
if s.params.ESConfig.Enable {
s.startIndexer(base)
}

s.startScanner(base)
replicatorEnabled := base.GetClusterMetadata().IsGlobalDomainEnabled()
archiverEnabled := base.GetClusterMetadata().ArchivalConfig().ConfiguredForArchival()
scannerEnabled := s.config.ScannerCfg.Persistence.DefaultStoreType() == config.StoreTypeSQL

if replicatorEnabled || archiverEnabled || scannerEnabled {
pConfig := s.params.PersistenceConfig
pConfig.SetMaxQPS(pConfig.DefaultStore, s.config.ReplicationCfg.PersistenceMaxQPS())
pFactory := persistencefactory.New(&pConfig, s.params.ClusterMetadata.GetCurrentClusterName(), s.metricsClient, s.logger)

if archiverEnabled || scannerEnabled {
s.ensureSystemDomainExists(pFactory, base.GetClusterMetadata().GetCurrentClusterName())
}
if replicatorEnabled {
s.startReplicator(base, pFactory)
}
if archiverEnabled {
s.startArchiver(base, pFactory)
}
if scannerEnabled {
s.startScanner(base)
}
}

s.logger.Info("service started", tag.ComponentWorker)
<-s.stopC
Expand All @@ -155,11 +167,6 @@ func (s *Service) Stop() {
}

func (s *Service) startScanner(base service.Service) {
storeType := s.config.ScannerCfg.Persistence.DefaultStoreType()
if storeType != config.StoreTypeSQL {
s.logger.Info("Scanner not started: incompatible persistence store type", tag.StoreType(storeType))
return
}
params := &scanner.BootstrapParams{
Config: *s.config.ScannerCfg,
SDKClient: s.params.PublicClient,
Expand Down Expand Up @@ -212,7 +219,6 @@ func (s *Service) startIndexer(base service.Service) {

func (s *Service) startArchiver(base service.Service, pFactory persistencefactory.Factory) {
publicClient := s.params.PublicClient
s.ensureSystemDomainExists(publicClient)

historyManager, err := pFactory.NewHistoryManager()
if err != nil {
Expand Down Expand Up @@ -252,12 +258,55 @@ func (s *Service) startArchiver(base service.Service, pFactory persistencefactor
}
}

func (s *Service) ensureSystemDomainExists(publicClient workflowserviceclient.Interface) {
domainClient := client.NewDomainClient(publicClient, nil)
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
_, err := domainClient.Describe(ctx, common.SystemDomainName)
func (s *Service) ensureSystemDomainExists(pFactory persistencefactory.Factory, clusterName string) {
metadataProxy, err := pFactory.NewMetadataManager(persistencefactory.MetadataV1V2)
if err != nil {
s.logger.Fatal("error creating metadataMgr proxy", tag.Error(err))
}
defer metadataProxy.Close()
_, err = metadataProxy.GetDomain(&persistence.GetDomainRequest{Name: common.SystemDomainName})
switch err.(type) {
case nil:
return
case *shared.EntityNotExistsError:
s.logger.Info("cadence-system domain does not exist, attempting to register domain")
s.registerSystemDomain(pFactory, clusterName)
default:
s.logger.Fatal("failed to verify if cadence system domain exists", tag.Error(err))
}
}

func (s *Service) registerSystemDomain(pFactory persistencefactory.Factory, clusterName string) {
metadataV2, err := pFactory.NewMetadataManager(persistencefactory.MetadataV2)
if err != nil {
s.logger.Fatal("error creating metadataV2Mgr", tag.Error(err))
}
defer metadataV2.Close()
_, err = metadataV2.CreateDomain(&persistence.CreateDomainRequest{
Info: &persistence.DomainInfo{
ID: common.SystemDomainID,
Name: common.SystemDomainName,
Description: "Cadence internal system domain",
},
Config: &persistence.DomainConfig{
Retention: common.SystemDomainRetentionDays,
EmitMetric: true,
},
ReplicationConfig: &persistence.DomainReplicationConfig{
ActiveClusterName: clusterName,
Clusters: persistence.GetOrUseDefaultClusters(clusterName, nil),
},
IsGlobalDomain: false,
FailoverVersion: common.EmptyVersion,
})
if err != nil {
s.logger.Fatal("failed to verify that cadence system domain exists", tag.Error(err))
if _, ok := err.(*shared.DomainAlreadyExistsError); ok {
return
}
s.logger.Fatal("failed to register system domain", tag.Error(err))
}
// this is needed because frontend domainCache will take about 10s to load the
// domain after its created first time. Archiver/Scanner cannot start their cadence
// workers until this refresh happens
time.Sleep(domainRefreshInterval)
}

0 comments on commit 5971e43

Please sign in to comment.