Skip to content

Commit

Permalink
Introduce new type MatchingPollForActivityTaskResponse (cadence-workf…
Browse files Browse the repository at this point in the history
  • Loading branch information
Shaddoll authored Oct 4, 2024
1 parent f8d70de commit d4884cc
Show file tree
Hide file tree
Showing 26 changed files with 193 additions and 44 deletions.
2 changes: 1 addition & 1 deletion client/matching/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ func (c *clientImpl) PollForActivityTask(
ctx context.Context,
request *types.MatchingPollForActivityTaskRequest,
opts ...yarpc.CallOption,
) (*types.PollForActivityTaskResponse, error) {
) (*types.MatchingPollForActivityTaskResponse, error) {
partition := c.loadBalancer.PickReadPartition(
request.GetDomainUUID(),
*request.PollRequest.GetTaskList(),
Expand Down
4 changes: 2 additions & 2 deletions client/matching/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -228,9 +228,9 @@ func TestClient_withResponse(t *testing.T) {
mock: func(p *MockPeerResolver, balancer *MockLoadBalancer, c *MockClient) {
balancer.EXPECT().PickReadPartition(_testDomainUUID, types.TaskList{Name: _testTaskList}, persistence.TaskListTypeActivity, "").Return(_testPartition)
p.EXPECT().FromTaskList(_testPartition).Return("peer0", nil)
c.EXPECT().PollForActivityTask(gomock.Any(), gomock.Any(), []yarpc.CallOption{yarpc.WithShardKey("peer0")}).Return(&types.PollForActivityTaskResponse{}, nil)
c.EXPECT().PollForActivityTask(gomock.Any(), gomock.Any(), []yarpc.CallOption{yarpc.WithShardKey("peer0")}).Return(&types.MatchingPollForActivityTaskResponse{}, nil)
},
want: &types.PollForActivityTaskResponse{},
want: &types.MatchingPollForActivityTaskResponse{},
},
{
name: "PollForActivityTask - Error in resolving peer",
Expand Down
2 changes: 1 addition & 1 deletion client/matching/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ type Client interface {
DescribeTaskList(context.Context, *types.MatchingDescribeTaskListRequest, ...yarpc.CallOption) (*types.DescribeTaskListResponse, error)
ListTaskListPartitions(context.Context, *types.MatchingListTaskListPartitionsRequest, ...yarpc.CallOption) (*types.ListTaskListPartitionsResponse, error)
GetTaskListsByDomain(context.Context, *types.GetTaskListsByDomainRequest, ...yarpc.CallOption) (*types.GetTaskListsByDomainResponse, error)
PollForActivityTask(context.Context, *types.MatchingPollForActivityTaskRequest, ...yarpc.CallOption) (*types.PollForActivityTaskResponse, error)
PollForActivityTask(context.Context, *types.MatchingPollForActivityTaskRequest, ...yarpc.CallOption) (*types.MatchingPollForActivityTaskResponse, error)
PollForDecisionTask(context.Context, *types.MatchingPollForDecisionTaskRequest, ...yarpc.CallOption) (*types.MatchingPollForDecisionTaskResponse, error)
QueryWorkflow(context.Context, *types.MatchingQueryWorkflowRequest, ...yarpc.CallOption) (*types.QueryWorkflowResponse, error)
RespondQueryTaskCompleted(context.Context, *types.MatchingRespondQueryTaskCompletedRequest, ...yarpc.CallOption) error
Expand Down
4 changes: 2 additions & 2 deletions client/matching/interface_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions client/wrappers/errorinjectors/matching_generated.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion client/wrappers/grpc/matching_generated.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 3 additions & 3 deletions client/wrappers/metered/matching_generated.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions client/wrappers/retryable/matching_generated.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion client/wrappers/thrift/matching_generated.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion client/wrappers/timeout/matching_generated.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 3 additions & 3 deletions common/types/mapper/proto/matching.go
Original file line number Diff line number Diff line change
Expand Up @@ -324,7 +324,7 @@ func ToMatchingPollForActivityTaskRequest(t *matchingv1.PollForActivityTaskReque
}
}

func FromMatchingPollForActivityTaskResponse(t *types.PollForActivityTaskResponse) *matchingv1.PollForActivityTaskResponse {
func FromMatchingPollForActivityTaskResponse(t *types.MatchingPollForActivityTaskResponse) *matchingv1.PollForActivityTaskResponse {
if t == nil {
return nil
}
Expand All @@ -348,11 +348,11 @@ func FromMatchingPollForActivityTaskResponse(t *types.PollForActivityTaskRespons
}
}

func ToMatchingPollForActivityTaskResponse(t *matchingv1.PollForActivityTaskResponse) *types.PollForActivityTaskResponse {
func ToMatchingPollForActivityTaskResponse(t *matchingv1.PollForActivityTaskResponse) *types.MatchingPollForActivityTaskResponse {
if t == nil {
return nil
}
return &types.PollForActivityTaskResponse{
return &types.MatchingPollForActivityTaskResponse{
TaskToken: t.TaskToken,
WorkflowExecution: ToWorkflowExecution(t.WorkflowExecution),
ActivityID: t.ActivityId,
Expand Down
2 changes: 1 addition & 1 deletion common/types/mapper/proto/matching_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ func TestMatchingPollForActivityTaskRequest(t *testing.T) {
}

func TestMatchingPollForActivityTaskResponse(t *testing.T) {
for _, item := range []*types.PollForActivityTaskResponse{nil, {}, &testdata.MatchingPollForActivityTaskResponse} {
for _, item := range []*types.MatchingPollForActivityTaskResponse{nil, {}, &testdata.MatchingPollForActivityTaskResponse} {
assert.Equal(t, item, ToMatchingPollForActivityTaskResponse(FromMatchingPollForActivityTaskResponse(item)))
}
}
Expand Down
51 changes: 49 additions & 2 deletions common/types/mapper/thrift/matching.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ package thrift

import (
"github.com/uber/cadence/.gen/go/matching"
"github.com/uber/cadence/.gen/go/shared"
"github.com/uber/cadence/common"
"github.com/uber/cadence/common/types"
)
Expand All @@ -35,8 +36,6 @@ var (
ToMatchingGetTaskListsByDomainResponse = ToGetTaskListsByDomainResponse
FromMatchingListTaskListPartitionsResponse = FromListTaskListPartitionsResponse
ToMatchingListTaskListPartitionsResponse = ToListTaskListPartitionsResponse
FromMatchingPollForActivityTaskResponse = FromPollForActivityTaskResponse
ToMatchingPollForActivityTaskResponse = ToPollForActivityTaskResponse
FromMatchingQueryWorkflowResponse = FromQueryWorkflowResponse
ToMatchingQueryWorkflowResponse = ToQueryWorkflowResponse
)
Expand Down Expand Up @@ -324,6 +323,54 @@ func ToMatchingPollForDecisionTaskResponse(t *matching.PollForDecisionTaskRespon
}
}

func FromMatchingPollForActivityTaskResponse(t *types.MatchingPollForActivityTaskResponse) *shared.PollForActivityTaskResponse {
if t == nil {
return nil
}
return &shared.PollForActivityTaskResponse{
TaskToken: t.TaskToken,
WorkflowExecution: FromWorkflowExecution(t.WorkflowExecution),
ActivityId: &t.ActivityID,
ActivityType: FromActivityType(t.ActivityType),
Input: t.Input,
ScheduledTimestamp: t.ScheduledTimestamp,
ScheduleToCloseTimeoutSeconds: t.ScheduleToCloseTimeoutSeconds,
StartedTimestamp: t.StartedTimestamp,
StartToCloseTimeoutSeconds: t.StartToCloseTimeoutSeconds,
HeartbeatTimeoutSeconds: t.HeartbeatTimeoutSeconds,
Attempt: &t.Attempt,
ScheduledTimestampOfThisAttempt: t.ScheduledTimestampOfThisAttempt,
HeartbeatDetails: t.HeartbeatDetails,
WorkflowType: FromWorkflowType(t.WorkflowType),
WorkflowDomain: &t.WorkflowDomain,
Header: FromHeader(t.Header),
}
}

func ToMatchingPollForActivityTaskResponse(t *shared.PollForActivityTaskResponse) *types.MatchingPollForActivityTaskResponse {
if t == nil {
return nil
}
return &types.MatchingPollForActivityTaskResponse{
TaskToken: t.TaskToken,
WorkflowExecution: ToWorkflowExecution(t.WorkflowExecution),
ActivityID: t.GetActivityId(),
ActivityType: ToActivityType(t.ActivityType),
Input: t.Input,
ScheduledTimestamp: t.ScheduledTimestamp,
ScheduleToCloseTimeoutSeconds: t.ScheduleToCloseTimeoutSeconds,
StartedTimestamp: t.StartedTimestamp,
StartToCloseTimeoutSeconds: t.StartToCloseTimeoutSeconds,
HeartbeatTimeoutSeconds: t.HeartbeatTimeoutSeconds,
Attempt: t.GetAttempt(),
ScheduledTimestampOfThisAttempt: t.ScheduledTimestampOfThisAttempt,
HeartbeatDetails: t.HeartbeatDetails,
WorkflowType: ToWorkflowType(t.WorkflowType),
WorkflowDomain: t.GetWorkflowDomain(),
Header: ToHeader(t.Header),
}
}

// FromMatchingQueryWorkflowRequest converts internal QueryWorkflowRequest type to thrift
func FromMatchingQueryWorkflowRequest(t *types.MatchingQueryWorkflowRequest) *matching.QueryWorkflowRequest {
if t == nil {
Expand Down
25 changes: 25 additions & 0 deletions common/types/mapper/thrift/matching_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -262,6 +262,31 @@ func TestMatchingPollForDecisionResponse(t *testing.T) {
}
}

func TestMatchingPollForActivityTaskResponse(t *testing.T) {
testCases := []struct {
desc string
input *types.MatchingPollForActivityTaskResponse
}{
{
desc: "non-nil input test",
input: &testdata.MatchingPollForActivityTaskResponse,
},
{
desc: "empty input test",
input: &types.MatchingPollForActivityTaskResponse{},
},
{
desc: "nil input test",
input: nil,
},
}
for _, tc := range testCases {
thriftObj := FromMatchingPollForActivityTaskResponse(tc.input)
roundTripObj := ToMatchingPollForActivityTaskResponse(thriftObj)
assert.Equal(t, tc.input, roundTripObj)
}
}

func TestMatchingQueryWorkflowRequest(t *testing.T) {
testCases := []struct {
desc string
Expand Down
20 changes: 20 additions & 0 deletions common/types/matching.go
Original file line number Diff line number Diff line change
Expand Up @@ -480,6 +480,26 @@ func (v *MatchingPollForDecisionTaskResponse) GetTotalHistoryBytes() (o int64) {
return
}

type MatchingPollForActivityTaskResponse struct {
TaskToken []byte `json:"taskToken,omitempty"`
WorkflowExecution *WorkflowExecution `json:"workflowExecution,omitempty"`
ActivityID string `json:"activityId,omitempty"`
ActivityType *ActivityType `json:"activityType,omitempty"`
Input []byte `json:"input,omitempty"`
ScheduledTimestamp *int64 `json:"scheduledTimestamp,omitempty"`
ScheduleToCloseTimeoutSeconds *int32 `json:"scheduleToCloseTimeoutSeconds,omitempty"`
StartedTimestamp *int64 `json:"startedTimestamp,omitempty"`
StartToCloseTimeoutSeconds *int32 `json:"startToCloseTimeoutSeconds,omitempty"`
HeartbeatTimeoutSeconds *int32 `json:"heartbeatTimeoutSeconds,omitempty"`
Attempt int32 `json:"attempt,omitempty"`
ScheduledTimestampOfThisAttempt *int64 `json:"scheduledTimestampOfThisAttempt,omitempty"`
HeartbeatDetails []byte `json:"heartbeatDetails,omitempty"`
WorkflowType *WorkflowType `json:"workflowType,omitempty"`
WorkflowDomain string `json:"workflowDomain,omitempty"`
Header *Header `json:"header,omitempty"`
BacklogCountHint int64 `json:"backlogCountHint,omitempty"`
}

// MatchingQueryWorkflowRequest is an internal type (TBD...)
type MatchingQueryWorkflowRequest struct {
DomainUUID string `json:"domainUUID,omitempty"`
Expand Down
2 changes: 1 addition & 1 deletion common/types/testdata/service_matching.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ var (
ForwardedFrom: ForwardedFrom,
IsolationGroup: IsolationGroup,
}
MatchingPollForActivityTaskResponse = types.PollForActivityTaskResponse{
MatchingPollForActivityTaskResponse = types.MatchingPollForActivityTaskResponse{
TaskToken: TaskToken,
WorkflowExecution: &WorkflowExecution,
ActivityID: ActivityID,
Expand Down
22 changes: 20 additions & 2 deletions service/frontend/api/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -554,8 +554,9 @@ func (wh *WorkflowHandler) PollForActivityTask(
return &types.PollForActivityTaskResponse{}, nil
}
pollerID := uuid.New().String()
var matchingResp *types.MatchingPollForActivityTaskResponse
op := func() error {
resp, err = wh.GetMatchingClient().PollForActivityTask(ctx, &types.MatchingPollForActivityTaskRequest{
matchingResp, err = wh.GetMatchingClient().PollForActivityTask(ctx, &types.MatchingPollForActivityTaskRequest{
DomainUUID: domainID,
PollerID: pollerID,
PollRequest: pollRequest,
Expand All @@ -581,7 +582,24 @@ func (wh *WorkflowHandler) PollForActivityTask(
return nil, err
}
}
return resp, nil
return &types.PollForActivityTaskResponse{
TaskToken: matchingResp.TaskToken,
WorkflowExecution: matchingResp.WorkflowExecution,
ActivityID: matchingResp.ActivityID,
ActivityType: matchingResp.ActivityType,
Input: matchingResp.Input,
ScheduledTimestamp: matchingResp.ScheduledTimestamp,
ScheduleToCloseTimeoutSeconds: matchingResp.ScheduleToCloseTimeoutSeconds,
StartedTimestamp: matchingResp.StartedTimestamp,
StartToCloseTimeoutSeconds: matchingResp.StartToCloseTimeoutSeconds,
HeartbeatTimeoutSeconds: matchingResp.HeartbeatTimeoutSeconds,
Attempt: matchingResp.Attempt,
ScheduledTimestampOfThisAttempt: matchingResp.ScheduledTimestampOfThisAttempt,
HeartbeatDetails: matchingResp.HeartbeatDetails,
WorkflowType: matchingResp.WorkflowType,
WorkflowDomain: matchingResp.WorkflowDomain,
Header: matchingResp.Header,
}, nil
}

// PollForDecisionTask - Poll for a decision task.
Expand Down
39 changes: 39 additions & 0 deletions service/frontend/api/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -309,6 +309,45 @@ func (s *workflowHandlerSuite) TestPollForActivityTask_IsolationGroupDrained() {
s.Equal(&types.PollForActivityTaskResponse{}, resp)
}

func (s *workflowHandlerSuite) TestPollForActivityTask_Success() {
config := s.newConfig(dc.NewInMemoryClient())
config.EnableTasklistIsolation = dc.GetBoolPropertyFnFilteredByDomain(true)
wh := s.getWorkflowHandler(config)

ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
defer cancel()
isolationGroup := "dca1"
ctx = partition.ContextWithIsolationGroup(ctx, isolationGroup)

s.mockDomainCache.EXPECT().GetDomainID(s.testDomain).Return(s.testDomainID, nil)
s.mockResource.IsolationGroups.EXPECT().IsDrained(gomock.Any(), s.testDomain, isolationGroup).Return(false, nil).AnyTimes()
s.mockMatchingClient.EXPECT().PollForActivityTask(gomock.Any(), gomock.Any()).Return(&types.MatchingPollForActivityTaskResponse{
TaskToken: []byte("token"),
WorkflowExecution: &types.WorkflowExecution{
WorkflowID: "wid",
RunID: "rid",
},
ActivityID: "1",
Input: []byte(`{"key": "value"}`),
}, nil)
resp, err := wh.PollForActivityTask(ctx, &types.PollForActivityTaskRequest{
Domain: s.testDomain,
TaskList: &types.TaskList{
Name: "task-list",
},
})
s.NoError(err)
s.Equal(&types.PollForActivityTaskResponse{
TaskToken: []byte("token"),
WorkflowExecution: &types.WorkflowExecution{
WorkflowID: "wid",
RunID: "rid",
},
ActivityID: "1",
Input: []byte(`{"key": "value"}`),
}, resp)
}

func (s *workflowHandlerSuite) TestStartWorkflowExecution_Failed_RequestIdNotSet() {
config := s.newConfig(dc.NewInMemoryClient())
config.UserRPS = dc.GetIntPropertyFn(10)
Expand Down
Loading

0 comments on commit d4884cc

Please sign in to comment.