diff --git a/service/worker/fx.go b/service/worker/fx.go new file mode 100644 index 00000000000..c34da41543e --- /dev/null +++ b/service/worker/fx.go @@ -0,0 +1,72 @@ +// The MIT License +// +// Copyright (c) 2020 Temporal Technologies Inc. All rights reserved. +// +// Copyright (c) 2020 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package worker + +import ( + "context" + + sdkclient "go.temporal.io/sdk/client" + "go.temporal.io/server/common" + esclient "go.temporal.io/server/common/persistence/visibility/store/elasticsearch/client" + "go.temporal.io/server/common/resource" + "go.uber.org/fx" +) + +var Module = fx.Options( + resource.Module, + fx.Provide(ParamsExpandProvider), // BootstrapParams should be deprecated + fx.Provide(NewService), + fx.Invoke(ServiceLifetimeHooks), +) + +func ParamsExpandProvider(params *resource.BootstrapParams) (sdkclient.Client, esclient.Client) { + return params.SdkClient, + params.ESClient +} + +func ServiceLifetimeHooks( + lc fx.Lifecycle, + svcStoppedCh chan struct{}, + svc *Service, +) { + lc.Append( + fx.Hook{ + OnStart: func(context.Context) error { + go func(svc common.Daemon, svcStoppedCh chan<- struct{}) { + // Start is blocked until Stop() is called. + svc.Start() + close(svcStoppedCh) + }(svc, svcStoppedCh) + + return nil + }, + OnStop: func(ctx context.Context) error { + svc.Stop() + return nil + }, + }, + ) + +} diff --git a/service/worker/service.go b/service/worker/service.go index b3371084418..5988c0f110c 100644 --- a/service/worker/service.go +++ b/service/worker/service.go @@ -75,32 +75,20 @@ type ( } ) -// NewService builds a new worker service func NewService( - params *resource.BootstrapParams, -) (*Service, error) { - - serviceConfig := NewConfig(params) - - serviceResource, err := resource.New( - params, - common.WorkerServiceName, - serviceConfig.PersistenceMaxQPS, - serviceConfig.PersistenceGlobalMaxQPS, - serviceConfig.ThrottledLogRPS, - ) - if err != nil { - return nil, err - } - + serviceResource resource.Resource, + serviceConfig *Config, + sdkClient sdkclient.Client, + esClient esclient.Client, +) *Service { return &Service{ Resource: serviceResource, status: common.DaemonStatusInitialized, config: serviceConfig, - sdkClient: params.SdkClient, - esClient: params.ESClient, + sdkClient: sdkClient, + esClient: esClient, stopC: make(chan struct{}), - }, nil + } } // NewConfig builds the new Config for worker service diff --git a/temporal/server.go b/temporal/server.go index 71ebfd63ca1..75df78cfcc0 100644 --- a/temporal/server.go +++ b/temporal/server.go @@ -60,7 +60,6 @@ import ( "go.temporal.io/server/service/frontend" "go.temporal.io/server/service/history" "go.temporal.io/server/service/matching" - "go.temporal.io/server/service/worker" ) const ( @@ -283,7 +282,27 @@ func (s *Server) Start() error { } continue case primitives.WorkerService: - svc, err = worker.NewService(params) + // todo: generalize this custom case logic as other services onboard fx + workerApp := fx.New( + fx.Supply( + params, + s.serviceStoppedChs[svcName], + ), + matching.Module) + err = workerApp.Err() + if err != nil { + close(s.serviceStoppedChs[svcName]) + return fmt.Errorf("unable to construct service %q: %w", svcName, err) + } + s.serviceApps[svcName] = workerApp + timeoutCtx, cancelFunc := context.WithTimeout(context.Background(), serviceStartTimeout) + err = workerApp.Start(timeoutCtx) + cancelFunc() + if err != nil { + close(s.serviceStoppedChs[svcName]) + return fmt.Errorf("unable to start service %q: %w", svcName, err) + } + continue default: return fmt.Errorf("unknown service %q", svcName) }