Skip to content

Commit

Permalink
Created GRPC handlers and internal clients (cadence-workflow#4057)
Browse files Browse the repository at this point in the history
* Created GRPC handlers and internal clients

* GRPC port config for remaining config files

* Fix metric tags for grpc handlers

* Proper mapping for WorkflowExecutionCloseStatus

* Return yarpcerrors.Status with no match

* Updated changelog

* Fix after merge

* Do not fatal on outbound dispatcher failure

* Updated integration tests

* Minor rename

* Fix after merge
  • Loading branch information
vytautas-karpavicius authored Apr 15, 2021
1 parent 8d3519b commit d0a8f7e
Show file tree
Hide file tree
Showing 33 changed files with 1,748 additions and 25 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
You can find a list of previous releases on the [github releases](https://github.com/uber/cadence/releases) page.

## [Unreleased]
### Added
- Added GRPC support. Cadence server will accept requests on both TChannel and GRPC. With dynamic config flag `system.enableGRPCOutbound` it will also switch to GRPC communication internally between server components.

### Fixed
- Fixed a bug where an error message is always displayed in Cadence UI `persistence max qps reached for list operations` on the workflow list screen (#3958)

Expand Down
129 changes: 129 additions & 0 deletions client/admin/grpcClient.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
// Copyright (c) 2021 Uber Technologies, Inc.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.

package admin

import (
"context"

"go.uber.org/yarpc"

adminv1 "github.com/uber/cadence/.gen/proto/admin/v1"
"github.com/uber/cadence/common/types"
"github.com/uber/cadence/common/types/mapper/proto"
)

type grpcClient struct {
c adminv1.AdminAPIYARPCClient
}

func NewGRPCClient(c adminv1.AdminAPIYARPCClient) Client {
return grpcClient{c}
}

func (g grpcClient) AddSearchAttribute(ctx context.Context, request *types.AddSearchAttributeRequest, opts ...yarpc.CallOption) error {
_, err := g.c.AddSearchAttribute(ctx, proto.FromAdminAddSearchAttributeRequest(request), opts...)
return proto.ToError(err)
}

func (g grpcClient) CloseShard(ctx context.Context, request *types.CloseShardRequest, opts ...yarpc.CallOption) error {
_, err := g.c.CloseShard(ctx, proto.FromAdminCloseShardRequest(request), opts...)
return proto.ToError(err)
}

func (g grpcClient) DescribeCluster(ctx context.Context, opts ...yarpc.CallOption) (*types.DescribeClusterResponse, error) {
response, err := g.c.DescribeCluster(ctx, &adminv1.DescribeClusterRequest{}, opts...)
return proto.ToAdminDescribeClusterResponse(response), proto.ToError(err)
}

func (g grpcClient) DescribeHistoryHost(ctx context.Context, request *types.DescribeHistoryHostRequest, opts ...yarpc.CallOption) (*types.DescribeHistoryHostResponse, error) {
response, err := g.c.DescribeHistoryHost(ctx, proto.FromAdminDescribeHistoryHostRequest(request), opts...)
return proto.ToAdminDescribeHistoryHostResponse(response), proto.ToError(err)
}

func (g grpcClient) DescribeQueue(ctx context.Context, request *types.DescribeQueueRequest, opts ...yarpc.CallOption) (*types.DescribeQueueResponse, error) {
response, err := g.c.DescribeQueue(ctx, proto.FromAdminDescribeQueueRequest(request), opts...)
return proto.ToAdminDescribeQueueResponse(response), proto.ToError(err)
}

func (g grpcClient) DescribeWorkflowExecution(ctx context.Context, request *types.AdminDescribeWorkflowExecutionRequest, opts ...yarpc.CallOption) (*types.AdminDescribeWorkflowExecutionResponse, error) {
response, err := g.c.DescribeWorkflowExecution(ctx, proto.FromAdminDescribeWorkflowExecutionRequest(request), opts...)
return proto.ToAdminDescribeWorkflowExecutionResponse(response), proto.ToError(err)
}

func (g grpcClient) GetDLQReplicationMessages(ctx context.Context, request *types.GetDLQReplicationMessagesRequest, opts ...yarpc.CallOption) (*types.GetDLQReplicationMessagesResponse, error) {
response, err := g.c.GetDLQReplicationMessages(ctx, proto.FromAdminGetDLQReplicationMessagesRequest(request), opts...)
return proto.ToAdminGetDLQReplicationMessagesResponse(response), proto.ToError(err)
}

func (g grpcClient) GetDomainReplicationMessages(ctx context.Context, request *types.GetDomainReplicationMessagesRequest, opts ...yarpc.CallOption) (*types.GetDomainReplicationMessagesResponse, error) {
response, err := g.c.GetDomainReplicationMessages(ctx, proto.FromAdminGetDomainReplicationMessagesRequest(request), opts...)
return proto.ToAdminGetDomainReplicationMessagesResponse(response), proto.ToError(err)
}

func (g grpcClient) GetReplicationMessages(ctx context.Context, request *types.GetReplicationMessagesRequest, opts ...yarpc.CallOption) (*types.GetReplicationMessagesResponse, error) {
response, err := g.c.GetReplicationMessages(ctx, proto.FromAdminGetReplicationMessagesRequest(request), opts...)
return proto.ToAdminGetReplicationMessagesResponse(response), proto.ToError(err)
}

func (g grpcClient) GetWorkflowExecutionRawHistoryV2(ctx context.Context, request *types.GetWorkflowExecutionRawHistoryV2Request, opts ...yarpc.CallOption) (*types.GetWorkflowExecutionRawHistoryV2Response, error) {
response, err := g.c.GetWorkflowExecutionRawHistoryV2(ctx, proto.FromAdminGetWorkflowExecutionRawHistoryV2Request(request), opts...)
return proto.ToAdminGetWorkflowExecutionRawHistoryV2Response(response), proto.ToError(err)
}

func (g grpcClient) MergeDLQMessages(ctx context.Context, request *types.MergeDLQMessagesRequest, opts ...yarpc.CallOption) (*types.MergeDLQMessagesResponse, error) {
response, err := g.c.MergeDLQMessages(ctx, proto.FromAdminMergeDLQMessagesRequest(request), opts...)
return proto.ToAdminMergeDLQMessagesResponse(response), proto.ToError(err)
}

func (g grpcClient) PurgeDLQMessages(ctx context.Context, request *types.PurgeDLQMessagesRequest, opts ...yarpc.CallOption) error {
_, err := g.c.PurgeDLQMessages(ctx, proto.FromAdminPurgeDLQMessagesRequest(request), opts...)
return proto.ToError(err)
}

func (g grpcClient) ReadDLQMessages(ctx context.Context, request *types.ReadDLQMessagesRequest, opts ...yarpc.CallOption) (*types.ReadDLQMessagesResponse, error) {
response, err := g.c.ReadDLQMessages(ctx, proto.FromAdminReadDLQMessagesRequest(request), opts...)
return proto.ToAdminReadDLQMessagesResponse(response), proto.ToError(err)
}

func (g grpcClient) ReapplyEvents(ctx context.Context, request *types.ReapplyEventsRequest, opts ...yarpc.CallOption) error {
_, err := g.c.ReapplyEvents(ctx, proto.FromAdminReapplyEventsRequest(request), opts...)
return proto.ToError(err)
}

func (g grpcClient) RefreshWorkflowTasks(ctx context.Context, request *types.RefreshWorkflowTasksRequest, opts ...yarpc.CallOption) error {
_, err := g.c.RefreshWorkflowTasks(ctx, proto.FromAdminRefreshWorkflowTasksRequest(request), opts...)
return proto.ToError(err)
}

func (g grpcClient) RemoveTask(ctx context.Context, request *types.RemoveTaskRequest, opts ...yarpc.CallOption) error {
_, err := g.c.RemoveTask(ctx, proto.FromAdminRemoveTaskRequest(request), opts...)
return proto.ToError(err)
}

func (g grpcClient) ResendReplicationTasks(ctx context.Context, request *types.ResendReplicationTasksRequest, opts ...yarpc.CallOption) error {
_, err := g.c.ResendReplicationTasks(ctx, proto.FromAdminResendReplicationTasksRequest(request), opts...)
return proto.ToError(err)
}

func (g grpcClient) ResetQueue(ctx context.Context, request *types.ResetQueueRequest, opts ...yarpc.CallOption) error {
_, err := g.c.ResetQueue(ctx, proto.FromAdminResetQueueRequest(request), opts...)
return proto.ToError(err)
}
80 changes: 74 additions & 6 deletions client/clientfactory.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,11 @@ import (
"github.com/uber/cadence/.gen/go/cadence/workflowserviceclient"
"github.com/uber/cadence/.gen/go/history/historyserviceclient"
"github.com/uber/cadence/.gen/go/matching/matchingserviceclient"

apiv1 "github.com/uber/cadence/.gen/proto/api/v1"
historyv1 "github.com/uber/cadence/.gen/proto/history/v1"
matchingv1 "github.com/uber/cadence/.gen/proto/matching/v1"

"github.com/uber/cadence/client/admin"
"github.com/uber/cadence/client/frontend"
"github.com/uber/cadence/client/history"
Expand Down Expand Up @@ -76,6 +81,7 @@ type (
dynConfig *dynamicconfig.Collection
numberOfHistoryShards int
logger log.Logger
enableGRPCOutbound bool
}
)

Expand All @@ -88,13 +94,15 @@ func NewRPCClientFactory(
numberOfHistoryShards int,
logger log.Logger,
) Factory {
enableGRPCOutbound := dc.GetBoolProperty(dynamicconfig.EnableGRPCOutbound, false)()
return &rpcClientFactory{
rpcFactory: rpcFactory,
monitor: monitor,
metricsClient: metricsClient,
dynConfig: dc,
numberOfHistoryShards: numberOfHistoryShards,
logger: logger,
enableGRPCOutbound: enableGRPCOutbound,
}
}

Expand Down Expand Up @@ -125,8 +133,10 @@ func (cf *rpcClientFactory) NewHistoryClientWithTimeout(timeout time.Duration) (
}

clientProvider := func(clientKey string) (interface{}, error) {
dispatcher := cf.rpcFactory.CreateDispatcherForOutbound(historyCaller, common.HistoryServiceName, clientKey)
return history.NewThriftClient(historyserviceclient.New(dispatcher.ClientConfig(common.HistoryServiceName))), nil
if cf.enableGRPCOutbound {
return cf.newHistoryGRPCClient(clientKey)
}
return cf.newHistoryThriftClient(clientKey)
}

client := history.NewClient(
Expand Down Expand Up @@ -163,8 +173,10 @@ func (cf *rpcClientFactory) NewMatchingClientWithTimeout(
}

clientProvider := func(clientKey string) (interface{}, error) {
dispatcher := cf.rpcFactory.CreateDispatcherForOutbound(matchingCaller, common.MatchingServiceName, clientKey)
return matching.NewThriftClient(matchingserviceclient.New(dispatcher.ClientConfig(common.MatchingServiceName))), nil
if cf.enableGRPCOutbound {
return cf.newMatchingGRPCClient(clientKey)
}
return cf.newMatchingThriftClient(clientKey)
}

client := matching.NewClient(
Expand Down Expand Up @@ -202,8 +214,10 @@ func (cf *rpcClientFactory) NewFrontendClientWithTimeout(
}

clientProvider := func(clientKey string) (interface{}, error) {
dispatcher := cf.rpcFactory.CreateDispatcherForOutbound(frontendCaller, common.FrontendServiceName, clientKey)
return frontend.NewThriftClient(workflowserviceclient.New(dispatcher.ClientConfig(common.FrontendServiceName))), nil
if cf.enableGRPCOutbound {
return cf.newFrontendGRPCClient(clientKey)
}
return cf.newFrontendThriftClient(clientKey)
}

client := frontend.NewClient(timeout, longPollTimeout, common.NewClientCache(keyResolver, clientProvider))
Expand Down Expand Up @@ -263,3 +277,57 @@ func (cf *rpcClientFactory) NewFrontendClientWithTimeoutAndDispatcher(
}
return client, nil
}

func (cf *rpcClientFactory) newHistoryThriftClient(hostAddress string) (history.Client, error) {
dispatcher, err := cf.rpcFactory.CreateDispatcherForOutbound(historyCaller, common.HistoryServiceName, hostAddress)
if err != nil {
return nil, err
}
return history.NewThriftClient(historyserviceclient.New(dispatcher.ClientConfig(common.HistoryServiceName))), nil
}

func (cf *rpcClientFactory) newMatchingThriftClient(hostAddress string) (matching.Client, error) {
dispatcher, err := cf.rpcFactory.CreateDispatcherForOutbound(matchingCaller, common.MatchingServiceName, hostAddress)
if err != nil {
return nil, err
}
return matching.NewThriftClient(matchingserviceclient.New(dispatcher.ClientConfig(common.MatchingServiceName))), nil
}

func (cf *rpcClientFactory) newFrontendThriftClient(hostAddress string) (frontend.Client, error) {
dispatcher, err := cf.rpcFactory.CreateDispatcherForOutbound(frontendCaller, common.FrontendServiceName, hostAddress)
if err != nil {
return nil, err
}
return frontend.NewThriftClient(workflowserviceclient.New(dispatcher.ClientConfig(common.FrontendServiceName))), nil
}

func (cf *rpcClientFactory) newHistoryGRPCClient(hostAddress string) (history.Client, error) {
dispatcher, err := cf.rpcFactory.CreateGRPCDispatcherForOutbound(historyCaller, common.HistoryServiceName, hostAddress)
if err != nil {
return nil, err
}
return history.NewGRPCClient(historyv1.NewHistoryAPIYARPCClient(dispatcher.ClientConfig(common.HistoryServiceName))), nil
}

func (cf *rpcClientFactory) newMatchingGRPCClient(hostAddress string) (matching.Client, error) {
dispatcher, err := cf.rpcFactory.CreateGRPCDispatcherForOutbound(matchingCaller, common.MatchingServiceName, hostAddress)
if err != nil {
return nil, err
}
return matching.NewGRPCClient(matchingv1.NewMatchingAPIYARPCClient(dispatcher.ClientConfig(common.MatchingServiceName))), nil
}

func (cf *rpcClientFactory) newFrontendGRPCClient(hostAddress string) (frontend.Client, error) {
dispatcher, err := cf.rpcFactory.CreateGRPCDispatcherForOutbound(frontendCaller, common.FrontendServiceName, hostAddress)
if err != nil {
return nil, err
}
config := dispatcher.ClientConfig(common.FrontendServiceName)
return frontend.NewGRPCClient(
apiv1.NewDomainAPIYARPCClient(config),
apiv1.NewWorkflowAPIYARPCClient(config),
apiv1.NewWorkerAPIYARPCClient(config),
apiv1.NewVisibilityAPIYARPCClient(config),
), nil
}
Loading

0 comments on commit d0a8f7e

Please sign in to comment.