Skip to content

Commit

Permalink
Add size checker when replication messages return (cadence-workflow#4521
Browse files Browse the repository at this point in the history
)

* Add size checker when replication messages return
  • Loading branch information
yux0 authored Oct 1, 2021
1 parent 9ba3b99 commit 844181f
Show file tree
Hide file tree
Showing 5 changed files with 80 additions and 26 deletions.
11 changes: 11 additions & 0 deletions client/clientfactory.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
package client

import (
"fmt"
"time"

"go.uber.org/yarpc"
Expand Down Expand Up @@ -156,8 +157,18 @@ func (cf *rpcClientFactory) NewHistoryClientWithTimeout(timeout time.Duration) (
return cf.newHistoryThriftClient(clientKey)
}

supportedMessageSize := cf.rpcFactory.GetMaxMessageSize()
maxSizeConfig := cf.dynConfig.GetIntProperty(dynamicconfig.GRPCMaxSizeInByte, supportedMessageSize)
if maxSizeConfig() > supportedMessageSize {
return nil, fmt.Errorf(
"GRPCMaxSizeInByte dynamic config value %v is larger than supported value %v",
maxSizeConfig(),
supportedMessageSize,
)
}
client := history.NewClient(
cf.numberOfHistoryShards,
maxSizeConfig,
timeout,
common.NewClientCache(keyResolver, clientProvider),
cf.logger,
Expand Down
75 changes: 49 additions & 26 deletions client/history/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,11 @@ import (
"go.uber.org/yarpc"

"github.com/uber/cadence/common"
"github.com/uber/cadence/common/dynamicconfig"
"github.com/uber/cadence/common/future"
"github.com/uber/cadence/common/log"
"github.com/uber/cadence/common/log/tag"
"github.com/uber/cadence/common/rpc"
"github.com/uber/cadence/common/types"
)

Expand All @@ -41,27 +43,37 @@ const (
DefaultTimeout = time.Second * 30
)

type clientImpl struct {
numberOfShards int
tokenSerializer common.TaskTokenSerializer
timeout time.Duration
clients common.ClientCache
logger log.Logger
}
type (
clientImpl struct {
numberOfShards int
rpcMaxSizeInBytes dynamicconfig.IntPropertyFn // This value currently only used in GetReplicationMessage API
tokenSerializer common.TaskTokenSerializer
timeout time.Duration
clients common.ClientCache
logger log.Logger
}

getReplicationMessagesWithSize struct {
response *types.GetReplicationMessagesResponse
size int
}
)

// NewClient creates a new history service TChannel client
func NewClient(
numberOfShards int,
rpcMaxSizeInBytes dynamicconfig.IntPropertyFn,
timeout time.Duration,
clients common.ClientCache,
logger log.Logger,
) Client {
return &clientImpl{
numberOfShards: numberOfShards,
tokenSerializer: common.NewJSONTaskTokenSerializer(),
timeout: timeout,
clients: clients,
logger: logger,
numberOfShards: numberOfShards,
rpcMaxSizeInBytes: rpcMaxSizeInBytes,
tokenSerializer: common.NewJSONTaskTokenSerializer(),
timeout: timeout,
clients: clients,
logger: logger,
}
}

Expand Down Expand Up @@ -822,19 +834,18 @@ func (c *clientImpl) GetReplicationMessages(
req.Tokens = append(req.Tokens, token)
}

// preserve 5% timeout to return partial of the result if context is timing out
requestContext, cancel := common.CreateChildContext(ctx, 0.05)
defer cancel()

var wg sync.WaitGroup
wg.Add(len(requestsByClient))
respChan := make(chan *types.GetReplicationMessagesResponse, len(requestsByClient))
respChan := make(chan *getReplicationMessagesWithSize, len(requestsByClient))
errChan := make(chan error, 1)

for client, req := range requestsByClient {
go func(ctx context.Context, client Client, request *types.GetReplicationMessagesRequest) {
defer wg.Done()
resp, err := client.GetReplicationMessages(ctx, request, opts...)
requestContext, cancel := common.CreateChildContext(ctx, 0.05)
defer cancel()
requestContext, responseInfo := rpc.ContextWithResponseInfo(requestContext)
resp, err := client.GetReplicationMessages(requestContext, request, opts...)
if err != nil {
c.logger.Warn("Failed to get replication tasks from client", tag.Error(err))
// Returns service busy error to notify replication
Expand All @@ -846,25 +857,37 @@ func (c *clientImpl) GetReplicationMessages(
}
return
}
respChan <- resp
}(requestContext, client, req)
respChan <- &getReplicationMessagesWithSize{
response: resp,
size: responseInfo.Size,
}
}(ctx, client, req)
}

wg.Wait()
close(respChan)
close(errChan)

if len(errChan) > 0 {
err := <-errChan
return nil, err
}

response := &types.GetReplicationMessagesResponse{MessagesByShard: make(map[int32]*types.ReplicationMessages)}
responseTotalSize := 0

for resp := range respChan {
for shardID, tasks := range resp.MessagesByShard {
// return partial response if the response size exceeded supported max size
responseTotalSize += resp.size
if responseTotalSize >= c.rpcMaxSizeInBytes() {
return response, nil
}

for shardID, tasks := range resp.response.GetMessagesByShard() {
response.MessagesByShard[shardID] = tasks
}
}
var err error
if len(errChan) > 0 {
err = <-errChan
}
return response, err
return response, nil
}

func (c *clientImpl) GetDLQReplicationMessages(
Expand Down
7 changes: 7 additions & 0 deletions common/dynamicconfig/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,12 @@ const (
// Default value: TRUE
// Allowed filters: N/A
EnableGRPCOutbound
// GRPCMaxSizeInByte is the key for config GRPC response size
// KeyName: system.grpcMaxSizeInByte
// Value type: Int
// Default value: 4*1024*1024
// Allowed filters: N/A
GRPCMaxSizeInByte
// BlobSizeLimitError is the per event blob size limit
// KeyName: limit.blobSize.error
// Value type: Int
Expand Down Expand Up @@ -2020,6 +2026,7 @@ var Keys = map[Key]string{
EnableDebugMode: "system.enableDebugMode",
RequiredDomainDataKeys: "system.requiredDomainDataKeys",
EnableGRPCOutbound: "system.enableGRPCOutbound",
GRPCMaxSizeInByte: "system.grpcMaxSizeInByte",

// size limit
BlobSizeLimitError: "limit.blobSize.error",
Expand Down
1 change: 1 addition & 0 deletions common/rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ type (
CreateDispatcherForOutbound(callerName, serviceName, hostName string) (*yarpc.Dispatcher, error)
CreateGRPCDispatcherForOutbound(callerName, serviceName, hostName string) (*yarpc.Dispatcher, error)
ReplaceGRPCPort(serviceName, hostAddress string) (string, error)
GetMaxMessageSize() int
}
)

Expand Down
12 changes: 12 additions & 0 deletions common/rpc/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,12 @@ import (
"github.com/uber/cadence/common/log/tag"
)

const defaultGRPCSizeLimit = 4 * 1024 * 1024

// Factory is an implementation of common.RPCFactory interface
type Factory struct {
maxMessageSize int

logger log.Logger
hostAddressMapper HostAddressMapper
tchannel *tchannel.Transport
Expand Down Expand Up @@ -97,6 +101,7 @@ func NewFactory(logger log.Logger, p Params) *Factory {
})

return &Factory{
maxMessageSize: p.GRPCMaxMsgSize,
logger: logger,
hostAddressMapper: p.HostAddressMapper,
tchannel: tchannel,
Expand Down Expand Up @@ -133,6 +138,13 @@ func (d *Factory) ReplaceGRPCPort(serviceName, hostAddress string) (string, erro
return d.hostAddressMapper.GetGRPCAddress(serviceName, hostAddress)
}

func (d *Factory) GetMaxMessageSize() int {
if d.maxMessageSize == 0 {
return defaultGRPCSizeLimit
}
return d.maxMessageSize
}

func (d *Factory) createOutboundDispatcher(
callerName string,
serviceName string,
Expand Down

0 comments on commit 844181f

Please sign in to comment.