Skip to content

Commit

Permalink
Add auto config hint to poll response (cadence-workflow#6542)
Browse files Browse the repository at this point in the history
* Add auto config hint to poll response

* Use defer func to set auto config hint when poll for tasks
  • Loading branch information
neil-xie authored Dec 7, 2024
1 parent d33ce74 commit cd52fed
Show file tree
Hide file tree
Showing 21 changed files with 562 additions and 302 deletions.
428 changes: 277 additions & 151 deletions .gen/proto/matching/v1/service.pb.go

Large diffs are not rendered by default.

300 changes: 151 additions & 149 deletions .gen/proto/matching/v1/service.pb.yarpc.go

Large diffs are not rendered by default.

13 changes: 13 additions & 0 deletions common/dynamicconfig/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -2077,6 +2077,13 @@ const (
// Allowed filters: DomainName
EnableStrongIdempotencySanityCheck

// MatchingEnableClientAutoConfig enables auto config for domain and tasklist on client (worker) side
// KeyName: matching.enableClientAutoConfig
// Value type: Bool
// Default value: false
// Allowed filters: DomainName,TasklistName,TasklistType
MatchingEnableClientAutoConfig

// LastBoolKey must be the last one in this const group
LastBoolKey
)
Expand Down Expand Up @@ -4413,6 +4420,12 @@ var BoolKeys = map[BoolKey]DynamicBool{
Description: "EnableStrongIdempotencySanityCheck enables sanity check for strong idempotency",
DefaultValue: false,
},
MatchingEnableClientAutoConfig: {
KeyName: "matching.enableClientAutoConfig",
Filters: []Filter{DomainName, TaskListName, TaskType},
Description: "MatchingEnableClientAutoConfig is to enable auto config on worker side",
DefaultValue: false,
},
}

var FloatKeys = map[FloatKey]DynamicFloat{
Expand Down
20 changes: 20 additions & 0 deletions common/types/mapper/proto/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -6095,3 +6095,23 @@ func ToAPITaskListPartitionConfig(t *apiv1.TaskListPartitionConfig) *types.TaskL
NumWritePartitions: t.NumWritePartitions,
}
}

func FromAutoConfigHint(t *types.AutoConfigHint) *apiv1.AutoConfigHint {
if t == nil {
return nil
}
return &apiv1.AutoConfigHint{
PollerWaitTimeInMs: t.PollerWaitTimeInMs,
EnableAutoConfig: t.EnableAutoConfig,
}
}

func ToAutoConfigHint(t *apiv1.AutoConfigHint) *types.AutoConfigHint {
if t == nil {
return nil
}
return &types.AutoConfigHint{
PollerWaitTimeInMs: t.PollerWaitTimeInMs,
EnableAutoConfig: t.EnableAutoConfig,
}
}
4 changes: 4 additions & 0 deletions common/types/mapper/proto/matching.go
Original file line number Diff line number Diff line change
Expand Up @@ -407,6 +407,7 @@ func FromMatchingPollForActivityTaskResponse(t *types.MatchingPollForActivityTas
Header: FromHeader(t.Header),
PartitionConfig: FromTaskListPartitionConfig(t.PartitionConfig),
LoadBalancerHints: FromLoadBalancerHints(t.LoadBalancerHints),
AutoConfigHint: FromAutoConfigHint(t.AutoConfigHint),
}
}

Expand All @@ -433,6 +434,7 @@ func ToMatchingPollForActivityTaskResponse(t *matchingv1.PollForActivityTaskResp
Header: ToHeader(t.Header),
PartitionConfig: ToTaskListPartitionConfig(t.PartitionConfig),
LoadBalancerHints: ToLoadBalancerHints(t.LoadBalancerHints),
AutoConfigHint: ToAutoConfigHint(t.AutoConfigHint),
}
}

Expand Down Expand Up @@ -487,6 +489,7 @@ func FromMatchingPollForDecisionTaskResponse(t *types.MatchingPollForDecisionTas
TotalHistoryBytes: t.TotalHistoryBytes,
PartitionConfig: FromTaskListPartitionConfig(t.PartitionConfig),
LoadBalancerHints: FromLoadBalancerHints(t.LoadBalancerHints),
AutoConfigHint: FromAutoConfigHint(t.AutoConfigHint),
}
}

Expand Down Expand Up @@ -515,6 +518,7 @@ func ToMatchingPollForDecisionTaskResponse(t *matchingv1.PollForDecisionTaskResp
TotalHistoryBytes: t.TotalHistoryBytes,
PartitionConfig: ToTaskListPartitionConfig(t.PartitionConfig),
LoadBalancerHints: ToLoadBalancerHints(t.LoadBalancerHints),
AutoConfigHint: ToAutoConfigHint(t.AutoConfigHint),
}
}

Expand Down
2 changes: 2 additions & 0 deletions common/types/mapper/thrift/matching.go
Original file line number Diff line number Diff line change
Expand Up @@ -293,6 +293,7 @@ func FromMatchingPollForDecisionTaskResponse(t *types.MatchingPollForDecisionTas
StartedTimestamp: t.StartedTimestamp,
Queries: FromWorkflowQueryMap(t.Queries),
TotalHistoryBytes: &t.TotalHistoryBytes,
AutoConfigHint: FromAutoConfigHint(t.AutoConfigHint),
}
}

Expand Down Expand Up @@ -320,6 +321,7 @@ func ToMatchingPollForDecisionTaskResponse(t *matching.PollForDecisionTaskRespon
StartedTimestamp: t.StartedTimestamp,
Queries: ToWorkflowQueryMap(t.Queries),
TotalHistoryBytes: t.GetTotalHistoryBytes(),
AutoConfigHint: ToAutoConfigHint(t.AutoConfigHint),
}
}

Expand Down
22 changes: 22 additions & 0 deletions common/types/mapper/thrift/shared.go
Original file line number Diff line number Diff line change
Expand Up @@ -8155,3 +8155,25 @@ func FromAny(t *types.Any) *shared.Any {
Value: t.Value,
}
}

// FromWorkflowQuery converts internal WorkflowQuery type to thrift
func FromAutoConfigHint(t *types.AutoConfigHint) *shared.AutoConfigHint {
if t == nil {
return nil
}
return &shared.AutoConfigHint{
PollerWaitTimeInMs: &t.PollerWaitTimeInMs,
EnableAutoConfig: &t.EnableAutoConfig,
}
}

// ToWorkflowQuery converts thrift WorkflowQuery type to internal
func ToAutoConfigHint(t *shared.AutoConfigHint) *types.AutoConfigHint {
if t == nil {
return nil
}
return &types.AutoConfigHint{
PollerWaitTimeInMs: *t.PollerWaitTimeInMs,
EnableAutoConfig: *t.EnableAutoConfig,
}
}
2 changes: 2 additions & 0 deletions common/types/matching.go
Original file line number Diff line number Diff line change
Expand Up @@ -446,6 +446,7 @@ type MatchingPollForDecisionTaskResponse struct {
TotalHistoryBytes int64 `json:"currentHistorySize,omitempty"`
PartitionConfig *TaskListPartitionConfig
LoadBalancerHints *LoadBalancerHints
AutoConfigHint *AutoConfigHint
}

// GetWorkflowExecution is an internal getter (TBD...)
Expand Down Expand Up @@ -516,6 +517,7 @@ type MatchingPollForActivityTaskResponse struct {
BacklogCountHint int64 `json:"backlogCountHint,omitempty"`
PartitionConfig *TaskListPartitionConfig
LoadBalancerHints *LoadBalancerHints
AutoConfigHint *AutoConfigHint
}

// MatchingQueryWorkflowRequest is an internal type (TBD...)
Expand Down
8 changes: 8 additions & 0 deletions common/types/shared.go
Original file line number Diff line number Diff line change
Expand Up @@ -3906,6 +3906,7 @@ type PollForActivityTaskResponse struct {
WorkflowType *WorkflowType `json:"workflowType,omitempty"`
WorkflowDomain string `json:"workflowDomain,omitempty"`
Header *Header `json:"header,omitempty"`
AutoConfigHint *AutoConfigHint `json:"autoConfigHint,omitempty"`
}

// GetActivityID is an internal getter (TBD...)
Expand Down Expand Up @@ -3974,6 +3975,7 @@ type PollForDecisionTaskResponse struct {
Queries map[string]*WorkflowQuery `json:"queries,omitempty"`
NextEventID int64 `json:"nextEventId,omitempty"`
TotalHistoryBytes int64 `json:"historySize,omitempty"`
AutoConfigHint *AutoConfigHint `json:"autoConfigHint,omitempty"`
}

// GetTaskToken is an internal getter (TBD...)
Expand Down Expand Up @@ -8101,3 +8103,9 @@ type Any struct {
// To interpret, you MUST check ValueType.
Value []byte `json:"value"`
}

// AutoConfigHint is an internal type (TBD...)
type AutoConfigHint struct {
EnableAutoConfig bool `json:"enableAutoConfig"`
PollerWaitTimeInMs int64 `json:"pollerWaitTimeInMs"`
}
4 changes: 4 additions & 0 deletions common/types/testdata/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -441,4 +441,8 @@ var (
OriginalScheduledTimestamp: &Timestamp3,
ScheduleID: ScheduleID,
}
AutoConfigHint = types.AutoConfigHint{
EnableAutoConfig: false,
PollerWaitTimeInMs: 10,
}
)
1 change: 1 addition & 0 deletions common/types/testdata/service_matching.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,7 @@ var (
Queries: WorkflowQueryMap,
PartitionConfig: &TaskListPartitionConfig,
LoadBalancerHints: &LoadBalancerHints,
AutoConfigHint: &AutoConfigHint,
}
MatchingQueryWorkflowRequest = types.MatchingQueryWorkflowRequest{
DomainUUID: DomainID,
Expand Down
2 changes: 2 additions & 0 deletions proto/internal/uber/cadence/matching/v1/service.proto
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,7 @@ message PollForDecisionTaskResponse {
int64 total_history_bytes = 18;
TaskListPartitionConfig partition_config = 19;
LoadBalancerHints load_balancer_hints = 20;
api.v1.AutoConfigHint auto_config_hint = 21;
}

message PollForActivityTaskRequest {
Expand Down Expand Up @@ -166,6 +167,7 @@ message PollForActivityTaskResponse {
api.v1.Header header = 16;
LoadBalancerHints load_balancer_hints = 17;
TaskListPartitionConfig partition_config = 19;
api.v1.AutoConfigHint auto_config_hint = 20;
}

message AddDecisionTaskRequest {
Expand Down
4 changes: 4 additions & 0 deletions service/frontend/api/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -368,6 +368,7 @@ func (wh *WorkflowHandler) PollForActivityTask(
// Must be cancellation error. Doesn't matter what we return here. Client already went away.
return nil, nil
}

return &types.PollForActivityTaskResponse{
TaskToken: matchingResp.TaskToken,
WorkflowExecution: matchingResp.WorkflowExecution,
Expand All @@ -385,6 +386,7 @@ func (wh *WorkflowHandler) PollForActivityTask(
WorkflowType: matchingResp.WorkflowType,
WorkflowDomain: matchingResp.WorkflowDomain,
Header: matchingResp.Header,
AutoConfigHint: matchingResp.AutoConfigHint,
}, nil
}

Expand Down Expand Up @@ -514,6 +516,7 @@ func (wh *WorkflowHandler) PollForDecisionTask(
if err != nil {
return nil, err
}

return resp, nil
}

Expand Down Expand Up @@ -2986,6 +2989,7 @@ func (wh *WorkflowHandler) createPollForDecisionTaskResponse(
Queries: matchingResp.Queries,
NextEventID: matchingResp.NextEventID,
TotalHistoryBytes: matchingResp.TotalHistoryBytes,
AutoConfigHint: matchingResp.AutoConfigHint,
}

return resp, nil
Expand Down
3 changes: 3 additions & 0 deletions service/matching/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ type (
AdaptiveScalerUpdateInterval dynamicconfig.DurationPropertyFnWithTaskListInfoFilters
EnableAdaptiveScaler dynamicconfig.BoolPropertyFnWithTaskListInfoFilters
EnableStandbyTaskCompletion dynamicconfig.BoolPropertyFnWithTaskListInfoFilters
EnableClientAutoConfig dynamicconfig.BoolPropertyFnWithTaskListInfoFilters

// Time to hold a poll request before returning an empty response if there are no tasks
LongPollExpirationInterval dynamicconfig.DurationPropertyFnWithTaskListInfoFilters
Expand Down Expand Up @@ -141,6 +142,7 @@ type (
MaxTimeBetweenTaskDeletes time.Duration
// standby task completion configuration
EnableStandbyTaskCompletion func() bool
EnableClientAutoConfig func() bool
}
)

Expand Down Expand Up @@ -193,5 +195,6 @@ func NewConfig(dc *dynamicconfig.Collection, hostName string, getIsolationGroups
MaxTimeBetweenTaskDeletes: time.Second,
AllIsolationGroups: getIsolationGroups,
EnableStandbyTaskCompletion: dc.GetBoolPropertyFilteredByTaskListInfo(dynamicconfig.MatchingEnableStandbyTaskCompletion),
EnableClientAutoConfig: dc.GetBoolPropertyFilteredByTaskListInfo(dynamicconfig.MatchingEnableClientAutoConfig),
}
}
1 change: 1 addition & 0 deletions service/matching/config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ func TestNewConfig(t *testing.T) {
"AdaptiveScalerUpdateInterval": {dynamicconfig.MatchingAdaptiveScalerUpdateInterval, time.Duration(34)},
"EnableAdaptiveScaler": {dynamicconfig.MatchingEnableAdaptiveScaler, true},
"EnableStandbyTaskCompletion": {dynamicconfig.MatchingEnableStandbyTaskCompletion, false},
"EnableClientAutoConfig": {dynamicconfig.MatchingEnableClientAutoConfig, false},
}
client := dynamicconfig.NewInMemoryClient()
for fieldName, expected := range fields {
Expand Down
4 changes: 4 additions & 0 deletions service/matching/handler/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -581,6 +581,7 @@ pollLoop:
resp := task.PollForDecisionResponse()
resp.PartitionConfig = tlMgr.TaskListPartitionConfig()
resp.LoadBalancerHints = tlMgr.LoadBalancerHints()
resp.AutoConfigHint = task.AutoConfigHint
return resp, nil
// TODO: Maybe add history expose here?
}
Expand Down Expand Up @@ -743,6 +744,7 @@ pollLoop:
resp := task.PollForActivityResponse()
resp.PartitionConfig = tlMgr.TaskListPartitionConfig()
resp.LoadBalancerHints = tlMgr.LoadBalancerHints()
resp.AutoConfigHint = task.AutoConfigHint
return resp, nil
}
e.emitForwardedFromStats(hCtx.scope, task.IsForwarded(), req.GetForwardedFrom())
Expand Down Expand Up @@ -1196,6 +1198,7 @@ func (e *matchingEngineImpl) createPollForDecisionTaskResponse(
response.BacklogCountHint = task.BacklogCountHint
response.PartitionConfig = partitionConfig
response.LoadBalancerHints = loadBalancerHints
response.AutoConfigHint = task.AutoConfigHint
return response
}

Expand Down Expand Up @@ -1251,6 +1254,7 @@ func (e *matchingEngineImpl) createPollForActivityTaskResponse(
response.WorkflowDomain = historyResponse.WorkflowDomain
response.PartitionConfig = partitionConfig
response.LoadBalancerHints = loadBalancerHints
response.AutoConfigHint = task.AutoConfigHint
return response
}

Expand Down
6 changes: 6 additions & 0 deletions service/matching/handler/engine_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -330,6 +330,10 @@ func (s *matchingEngineSuite) PollForDecisionTasksResultTest() {
Name: tl,
Kind: &tlKind,
},
AutoConfigHint: &types.AutoConfigHint{
EnableAutoConfig: false,
PollerWaitTimeInMs: 0,
},
}

s.Nil(err)
Expand Down Expand Up @@ -1597,6 +1601,7 @@ type pollTaskResponse struct {
EventStoreVersion int32
BranchToken []byte
Queries map[string]*types.WorkflowQuery
AutoConfigHint *types.AutoConfigHint
}

func pollTask(engine *matchingEngineImpl, hCtx *handlerContext, request *pollTaskRequest) (*pollTaskResponse, error) {
Expand Down Expand Up @@ -1680,6 +1685,7 @@ func pollTask(engine *matchingEngineImpl, hCtx *handlerContext, request *pollTas
ScheduledTimestamp: resp.ScheduledTimestamp,
StartedTimestamp: resp.StartedTimestamp,
Queries: resp.Queries,
AutoConfigHint: resp.AutoConfigHint,
}, nil
}

Expand Down
16 changes: 14 additions & 2 deletions service/matching/tasklist/matcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -410,8 +410,19 @@ func (tm *taskMatcherImpl) Poll(ctx context.Context, isolationGroup string) (*In
ctxWithCancelPropagation, stopFn := ctxutils.WithPropagatedContextCancel(ctx, tm.cancelCtx)
defer stopFn()

var task *InternalTask
var err error
defer func() {
if task != nil {
task.AutoConfigHint = &types.AutoConfigHint{
EnableAutoConfig: tm.config.EnableClientAutoConfig(),
PollerWaitTimeInMs: time.Since(startT).Milliseconds(),
}
}
}()

// try local match first without blocking until context timeout
if task, err := tm.pollNonBlocking(ctxWithCancelPropagation, isolatedTaskC, tm.taskC, tm.queryTaskC); err == nil {
if task, err = tm.pollNonBlocking(ctxWithCancelPropagation, isolatedTaskC, tm.taskC, tm.queryTaskC); err == nil {
tm.scope.RecordTimer(metrics.PollLocalMatchLatencyPerTaskList, time.Since(startT))
return task, nil
}
Expand All @@ -429,7 +440,8 @@ func (tm *taskMatcherImpl) Poll(ctx context.Context, isolationGroup string) (*In
TaskListKind: tm.tasklistKind.Ptr(),
EventName: "Matcher Falling Back to Non-Local Polling",
})
return tm.pollOrForward(ctxWithCancelPropagation, startT, isolationGroup, isolatedTaskC, tm.taskC, tm.queryTaskC)
task, err = tm.pollOrForward(ctxWithCancelPropagation, startT, isolationGroup, isolatedTaskC, tm.taskC, tm.queryTaskC)
return task, err
}

// PollForQuery blocks until a *query* task is found or context deadline is exceeded
Expand Down
20 changes: 20 additions & 0 deletions service/matching/tasklist/matcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1638,3 +1638,23 @@ func ensureAsyncReady(ctxTimeout time.Duration, cb func(ctx context.Context)) (w
<-closed
}
}

func (t *MatcherTestSuite) Test_LocalSyncMatch_AutoConfigHint() {
t.disableRemoteForwarding("")

wait := ensureAsyncReady(time.Second, func(ctx context.Context) {
task, err := t.matcher.Poll(ctx, "")
t.NotNil(task.AutoConfigHint)
t.Equal(false, task.AutoConfigHint.EnableAutoConfig) // disabled by default
if err == nil {
task.Finish(nil)
}
})

task := newInternalTask(t.newTaskInfo(), nil, types.TaskSourceHistory, "", true, nil, "")
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
_, err := t.matcher.Offer(ctx, task)
cancel()
wait()
t.NoError(err)
}
1 change: 1 addition & 0 deletions service/matching/tasklist/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ type (
ResponseC chan error // non-nil only where there is a caller waiting for response (sync-match)
BacklogCountHint int64
ActivityTaskDispatchInfo *types.ActivityTaskDispatchInfo
AutoConfigHint *types.AutoConfigHint // worker auto-scaler hint, which includes enable auto config flag and poller wait time on the matching engine
}
)

Expand Down
3 changes: 3 additions & 0 deletions service/matching/tasklist/task_list_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -1044,6 +1044,9 @@ func newTaskListConfig(id *Identifier, cfg *config.Config, domainName string) *c
EnableStandbyTaskCompletion: func() bool {
return cfg.EnableStandbyTaskCompletion(domainName, taskListName, taskType)
},
EnableClientAutoConfig: func() bool {
return cfg.EnableClientAutoConfig(domainName, taskListName, taskType)
},
}
}

Expand Down

0 comments on commit cd52fed

Please sign in to comment.