Skip to content

Commit

Permalink
Pass mutex as reference (cadence-workflow#1617)
Browse files Browse the repository at this point in the history
* Pass mutex as reference

* Use waitgroup to control start up sequence and get rid of mutex
  • Loading branch information
meiliang86 authored Mar 29, 2019
1 parent fa5e9e9 commit d3ad960
Showing 1 changed file with 22 additions and 26 deletions.
48 changes: 22 additions & 26 deletions host/onebox.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ type (

ringpopFactoryImpl struct {
rpHosts []string
initLock sync.Mutex
initLock *sync.Mutex
}
)

Expand Down Expand Up @@ -184,27 +184,27 @@ func (c *cadenceImpl) enableWorker() bool {

func (c *cadenceImpl) Start() error {
var rpHosts []string
rpHosts = append(rpHosts, c.FrontendAddress())
rpHosts = append(rpHosts, c.MatchingServiceAddress())
rpHosts = append(rpHosts, c.HistoryServiceAddress()...)
if c.enableWorker() {
rpHosts = append(rpHosts, c.WorkerServiceAddress())
}

var startWG sync.WaitGroup
var ringpopInitLock sync.Mutex
startWG.Add(2)
go c.startHistory(rpHosts, &startWG, c.enableEventsV2, ringpopInitLock)
go c.startMatching(rpHosts, &startWG, ringpopInitLock)
startWG.Add(1)
go c.startHistory(rpHosts, &startWG, c.enableEventsV2)
startWG.Wait()

rpHosts = append(rpHosts, c.MatchingServiceAddress())
startWG.Add(1)
go c.startFrontend(rpHosts, &startWG, ringpopInitLock)
go c.startMatching(rpHosts, &startWG)
startWG.Wait()

rpHosts = append(rpHosts, c.FrontendAddress())
startWG.Add(1)
go c.startFrontend(rpHosts, &startWG)
startWG.Wait()

if c.enableWorker() {
rpHosts = append(rpHosts, c.WorkerServiceAddress())
startWG.Add(1)
go c.startWorker(rpHosts, &startWG, ringpopInitLock)
go c.startWorker(rpHosts, &startWG)
startWG.Wait()
}

Expand Down Expand Up @@ -316,7 +316,7 @@ func (c *cadenceImpl) GetFrontendService() service.Service {
return c.frontEndService
}

func (c *cadenceImpl) startFrontend(rpHosts []string, startWG *sync.WaitGroup, ringpopInitLock sync.Mutex) {
func (c *cadenceImpl) startFrontend(rpHosts []string, startWG *sync.WaitGroup) {
params := new(service.BootstrapParams)
params.DCRedirectionPolicy = config.DCRedirectionPolicy{}
params.Name = common.FrontendServiceName
Expand All @@ -325,7 +325,7 @@ func (c *cadenceImpl) startFrontend(rpHosts []string, startWG *sync.WaitGroup, r
params.PProfInitializer = newPProfInitializerImpl(c.logger, c.FrontendPProfPort())
params.RPCFactory = newRPCFactoryImpl(common.FrontendServiceName, c.FrontendAddress(), c.logger)
params.MetricScope = tally.NewTestScope(common.FrontendServiceName, make(map[string]string))
params.RingpopFactory = newRingpopFactory(rpHosts, ringpopInitLock)
params.RingpopFactory = newRingpopFactory(rpHosts)
params.ClusterMetadata = c.clusterMetadata
params.DispatcherProvider = c.dispatcherProvider
params.MessagingClient = c.messagingClient
Expand Down Expand Up @@ -373,7 +373,7 @@ func (c *cadenceImpl) startFrontend(rpHosts []string, startWG *sync.WaitGroup, r
c.shutdownWG.Done()
}

func (c *cadenceImpl) startHistory(rpHosts []string, startWG *sync.WaitGroup, enableEventsV2 bool, ringpopInitLock sync.Mutex) {
func (c *cadenceImpl) startHistory(rpHosts []string, startWG *sync.WaitGroup, enableEventsV2 bool) {

pprofPorts := c.HistoryPProfPort()
for i, hostport := range c.HistoryServiceAddress() {
Expand All @@ -384,7 +384,7 @@ func (c *cadenceImpl) startHistory(rpHosts []string, startWG *sync.WaitGroup, en
params.PProfInitializer = newPProfInitializerImpl(c.logger, pprofPorts[i])
params.RPCFactory = newRPCFactoryImpl(common.HistoryServiceName, hostport, c.logger)
params.MetricScope = tally.NewTestScope(common.HistoryServiceName, make(map[string]string))
params.RingpopFactory = newRingpopFactory(rpHosts, ringpopInitLock)
params.RingpopFactory = newRingpopFactory(rpHosts)
params.ClusterMetadata = c.clusterMetadata
params.DispatcherProvider = c.dispatcherProvider
params.MessagingClient = c.messagingClient
Expand Down Expand Up @@ -412,7 +412,7 @@ func (c *cadenceImpl) startHistory(rpHosts []string, startWG *sync.WaitGroup, en
c.shutdownWG.Done()
}

func (c *cadenceImpl) startMatching(rpHosts []string, startWG *sync.WaitGroup, ringpopInitLock sync.Mutex) {
func (c *cadenceImpl) startMatching(rpHosts []string, startWG *sync.WaitGroup) {

params := new(service.BootstrapParams)
params.Name = common.MatchingServiceName
Expand All @@ -421,7 +421,7 @@ func (c *cadenceImpl) startMatching(rpHosts []string, startWG *sync.WaitGroup, r
params.PProfInitializer = newPProfInitializerImpl(c.logger, c.MatchingPProfPort())
params.RPCFactory = newRPCFactoryImpl(common.MatchingServiceName, c.MatchingServiceAddress(), c.logger)
params.MetricScope = tally.NewTestScope(common.MatchingServiceName, make(map[string]string))
params.RingpopFactory = newRingpopFactory(rpHosts, ringpopInitLock)
params.RingpopFactory = newRingpopFactory(rpHosts)
params.ClusterMetadata = c.clusterMetadata
params.DispatcherProvider = c.dispatcherProvider
cassandraConfig := config.Cassandra{Hosts: "127.0.0.1"}
Expand All @@ -443,14 +443,14 @@ func (c *cadenceImpl) startMatching(rpHosts []string, startWG *sync.WaitGroup, r
c.shutdownWG.Done()
}

func (c *cadenceImpl) startWorker(rpHosts []string, startWG *sync.WaitGroup, ringpopInitLock sync.Mutex) {
func (c *cadenceImpl) startWorker(rpHosts []string, startWG *sync.WaitGroup) {
params := new(service.BootstrapParams)
params.Name = common.WorkerServiceName
params.Logger = c.logger
params.PProfInitializer = newPProfInitializerImpl(c.logger, c.WorkerPProfPort())
params.RPCFactory = newRPCFactoryImpl(common.WorkerServiceName, c.WorkerServiceAddress(), c.logger)
params.MetricScope = tally.NewTestScope(common.WorkerServiceName, make(map[string]string))
params.RingpopFactory = newRingpopFactory(rpHosts, ringpopInitLock)
params.RingpopFactory = newRingpopFactory(rpHosts)
params.ClusterMetadata = c.clusterMetadata
params.DispatcherProvider = c.dispatcherProvider
cassandraConfig := config.Cassandra{Hosts: "127.0.0.1"}
Expand Down Expand Up @@ -531,17 +531,13 @@ func (c *cadenceImpl) startWorkerClientWorker(params *service.BootstrapParams, s
}
}

func newRingpopFactory(rpHosts []string, initLock sync.Mutex) service.RingpopFactory {
func newRingpopFactory(rpHosts []string) service.RingpopFactory {
return &ringpopFactoryImpl{
rpHosts: rpHosts,
initLock: initLock,
rpHosts: rpHosts,
}
}

func (p *ringpopFactoryImpl) CreateRingpop(dispatcher *yarpc.Dispatcher) (*ringpop.Ringpop, error) {
p.initLock.Lock()
defer p.initLock.Unlock()

var ch *tcg.Channel
var err error
if ch, err = p.getChannel(dispatcher); err != nil {
Expand Down

0 comments on commit d3ad960

Please sign in to comment.