Skip to content

Commit

Permalink
Map internal errors to thrift in thrift handlers (cadence-workflow#3694)
Browse files Browse the repository at this point in the history
  • Loading branch information
vytautas-karpavicius authored Oct 29, 2020
1 parent 7e03347 commit d2289ce
Show file tree
Hide file tree
Showing 8 changed files with 445 additions and 322 deletions.
55 changes: 37 additions & 18 deletions service/frontend/adminThriftHandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"github.com/uber/cadence/.gen/go/admin/adminserviceserver"
"github.com/uber/cadence/.gen/go/replicator"
"github.com/uber/cadence/.gen/go/shared"
"github.com/uber/cadence/common/types/mapper/thrift"
)

// AdminThriftHandler wrap underlying handler and handles Thrift related type conversions
Expand All @@ -47,90 +48,108 @@ func (t AdminThriftHandler) register(dispatcher *yarpc.Dispatcher) {

// AddSearchAttribute forwards request to the underlying handler
func (t AdminThriftHandler) AddSearchAttribute(ctx context.Context, request *admin.AddSearchAttributeRequest) (err error) {
return t.h.AddSearchAttribute(ctx, request)
err = t.h.AddSearchAttribute(ctx, request)
return thrift.FromError(err)
}

// CloseShard forwards request to the underlying handler
func (t AdminThriftHandler) CloseShard(ctx context.Context, request *shared.CloseShardRequest) (err error) {
return t.h.CloseShard(ctx, request)
err = t.h.CloseShard(ctx, request)
return thrift.FromError(err)
}

// DescribeCluster forwards request to the underlying handler
func (t AdminThriftHandler) DescribeCluster(ctx context.Context) (response *admin.DescribeClusterResponse, err error) {
return t.h.DescribeCluster(ctx)
response, err = t.h.DescribeCluster(ctx)
return response, thrift.FromError(err)
}

// DescribeHistoryHost forwards request to the underlying handler
func (t AdminThriftHandler) DescribeHistoryHost(ctx context.Context, request *shared.DescribeHistoryHostRequest) (response *shared.DescribeHistoryHostResponse, err error) {
return t.h.DescribeHistoryHost(ctx, request)
response, err = t.h.DescribeHistoryHost(ctx, request)
return response, thrift.FromError(err)
}

// DescribeQueue forwards request to the underlying handler
func (t AdminThriftHandler) DescribeQueue(ctx context.Context, request *shared.DescribeQueueRequest) (response *shared.DescribeQueueResponse, err error) {
return t.h.DescribeQueue(ctx, request)
response, err = t.h.DescribeQueue(ctx, request)
return response, thrift.FromError(err)
}

// DescribeWorkflowExecution forwards request to the underlying handler
func (t AdminThriftHandler) DescribeWorkflowExecution(ctx context.Context, request *admin.DescribeWorkflowExecutionRequest) (response *admin.DescribeWorkflowExecutionResponse, err error) {
return t.h.DescribeWorkflowExecution(ctx, request)
response, err = t.h.DescribeWorkflowExecution(ctx, request)
return response, thrift.FromError(err)
}

// GetDLQReplicationMessages forwards request to the underlying handler
func (t AdminThriftHandler) GetDLQReplicationMessages(ctx context.Context, request *replicator.GetDLQReplicationMessagesRequest) (response *replicator.GetDLQReplicationMessagesResponse, err error) {
return t.h.GetDLQReplicationMessages(ctx, request)
response, err = t.h.GetDLQReplicationMessages(ctx, request)
return response, thrift.FromError(err)
}

// GetDomainReplicationMessages forwards request to the underlying handler
func (t AdminThriftHandler) GetDomainReplicationMessages(ctx context.Context, request *replicator.GetDomainReplicationMessagesRequest) (response *replicator.GetDomainReplicationMessagesResponse, err error) {
return t.h.GetDomainReplicationMessages(ctx, request)
response, err = t.h.GetDomainReplicationMessages(ctx, request)
return response, thrift.FromError(err)
}

// GetReplicationMessages forwards request to the underlying handler
func (t AdminThriftHandler) GetReplicationMessages(ctx context.Context, request *replicator.GetReplicationMessagesRequest) (response *replicator.GetReplicationMessagesResponse, err error) {
return t.h.GetReplicationMessages(ctx, request)
response, err = t.h.GetReplicationMessages(ctx, request)
return response, thrift.FromError(err)
}

// GetWorkflowExecutionRawHistoryV2 forwards request to the underlying handler
func (t AdminThriftHandler) GetWorkflowExecutionRawHistoryV2(ctx context.Context, request *admin.GetWorkflowExecutionRawHistoryV2Request) (response *admin.GetWorkflowExecutionRawHistoryV2Response, err error) {
return t.h.GetWorkflowExecutionRawHistoryV2(ctx, request)
response, err = t.h.GetWorkflowExecutionRawHistoryV2(ctx, request)
return response, thrift.FromError(err)
}

// MergeDLQMessages forwards request to the underlying handler
func (t AdminThriftHandler) MergeDLQMessages(ctx context.Context, request *replicator.MergeDLQMessagesRequest) (response *replicator.MergeDLQMessagesResponse, err error) {
return t.h.MergeDLQMessages(ctx, request)
response, err = t.h.MergeDLQMessages(ctx, request)
return response, thrift.FromError(err)
}

// PurgeDLQMessages forwards request to the underlying handler
func (t AdminThriftHandler) PurgeDLQMessages(ctx context.Context, request *replicator.PurgeDLQMessagesRequest) (err error) {
return t.h.PurgeDLQMessages(ctx, request)
err = t.h.PurgeDLQMessages(ctx, request)
return thrift.FromError(err)
}

// ReadDLQMessages forwards request to the underlying handler
func (t AdminThriftHandler) ReadDLQMessages(ctx context.Context, request *replicator.ReadDLQMessagesRequest) (response *replicator.ReadDLQMessagesResponse, err error) {
return t.h.ReadDLQMessages(ctx, request)
response, err = t.h.ReadDLQMessages(ctx, request)
return response, thrift.FromError(err)
}

// ReapplyEvents forwards request to the underlying handler
func (t AdminThriftHandler) ReapplyEvents(ctx context.Context, request *shared.ReapplyEventsRequest) (err error) {
return t.h.ReapplyEvents(ctx, request)
err = t.h.ReapplyEvents(ctx, request)
return thrift.FromError(err)
}

// RefreshWorkflowTasks forwards request to the underlying handler
func (t AdminThriftHandler) RefreshWorkflowTasks(ctx context.Context, request *shared.RefreshWorkflowTasksRequest) (err error) {
return t.h.RefreshWorkflowTasks(ctx, request)
err = t.h.RefreshWorkflowTasks(ctx, request)
return thrift.FromError(err)
}

// RemoveTask forwards request to the underlying handler
func (t AdminThriftHandler) RemoveTask(ctx context.Context, request *shared.RemoveTaskRequest) (err error) {
return t.h.RemoveTask(ctx, request)
err = t.h.RemoveTask(ctx, request)
return thrift.FromError(err)
}

// ResendReplicationTasks forwards request to the underlying handler
func (t AdminThriftHandler) ResendReplicationTasks(ctx context.Context, request *admin.ResendReplicationTasksRequest) (err error) {
return t.h.ResendReplicationTasks(ctx, request)
err = t.h.ResendReplicationTasks(ctx, request)
return thrift.FromError(err)
}

// ResetQueue forwards request to the underlying handler
func (t AdminThriftHandler) ResetQueue(ctx context.Context, request *shared.ResetQueueRequest) (err error) {
return t.h.ResetQueue(ctx, request)
err = t.h.ResetQueue(ctx, request)
return thrift.FromError(err)
}
75 changes: 39 additions & 36 deletions service/frontend/adminThriftHandler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"github.com/uber/cadence/.gen/go/admin"
"github.com/uber/cadence/.gen/go/replicator"
"github.com/uber/cadence/.gen/go/shared"
"github.com/uber/cadence/common/types"
)

func TestAdminThriftHandler(t *testing.T) {
Expand All @@ -39,105 +40,107 @@ func TestAdminThriftHandler(t *testing.T) {
h := NewMockAdminHandler(ctrl)
th := NewAdminThriftHandler(h)
ctx := context.Background()
internalErr := &types.InternalServiceError{Message: "test"}
expectedErr := &shared.InternalServiceError{Message: "test"}

t.Run("AddSearchAttribute", func(t *testing.T) {
h.EXPECT().AddSearchAttribute(ctx, &admin.AddSearchAttributeRequest{}).Return(assert.AnError).Times(1)
h.EXPECT().AddSearchAttribute(ctx, &admin.AddSearchAttributeRequest{}).Return(internalErr).Times(1)
err := th.AddSearchAttribute(ctx, &admin.AddSearchAttributeRequest{})
assert.Equal(t, assert.AnError, err)
assert.Equal(t, expectedErr, err)
})
t.Run("CloseShard", func(t *testing.T) {
h.EXPECT().CloseShard(ctx, &shared.CloseShardRequest{}).Return(assert.AnError).Times(1)
h.EXPECT().CloseShard(ctx, &shared.CloseShardRequest{}).Return(internalErr).Times(1)
err := th.CloseShard(ctx, &shared.CloseShardRequest{})
assert.Equal(t, assert.AnError, err)
assert.Equal(t, expectedErr, err)
})
t.Run("DescribeCluster", func(t *testing.T) {
h.EXPECT().DescribeCluster(ctx).Return(&admin.DescribeClusterResponse{}, assert.AnError).Times(1)
h.EXPECT().DescribeCluster(ctx).Return(&admin.DescribeClusterResponse{}, internalErr).Times(1)
resp, err := th.DescribeCluster(ctx)
assert.Equal(t, admin.DescribeClusterResponse{}, *resp)
assert.Equal(t, assert.AnError, err)
assert.Equal(t, expectedErr, err)
})
t.Run("DescribeHistoryHost", func(t *testing.T) {
h.EXPECT().DescribeHistoryHost(ctx, &shared.DescribeHistoryHostRequest{}).Return(&shared.DescribeHistoryHostResponse{}, assert.AnError).Times(1)
h.EXPECT().DescribeHistoryHost(ctx, &shared.DescribeHistoryHostRequest{}).Return(&shared.DescribeHistoryHostResponse{}, internalErr).Times(1)
resp, err := th.DescribeHistoryHost(ctx, &shared.DescribeHistoryHostRequest{})
assert.Equal(t, shared.DescribeHistoryHostResponse{}, *resp)
assert.Equal(t, assert.AnError, err)
assert.Equal(t, expectedErr, err)
})
t.Run("DescribeQueue", func(t *testing.T) {
h.EXPECT().DescribeQueue(ctx, &shared.DescribeQueueRequest{}).Return(&shared.DescribeQueueResponse{}, assert.AnError).Times(1)
h.EXPECT().DescribeQueue(ctx, &shared.DescribeQueueRequest{}).Return(&shared.DescribeQueueResponse{}, internalErr).Times(1)
resp, err := th.DescribeQueue(ctx, &shared.DescribeQueueRequest{})
assert.Equal(t, shared.DescribeQueueResponse{}, *resp)
assert.Equal(t, assert.AnError, err)
assert.Equal(t, expectedErr, err)
})
t.Run("DescribeWorkflowExecution", func(t *testing.T) {
h.EXPECT().DescribeWorkflowExecution(ctx, &admin.DescribeWorkflowExecutionRequest{}).Return(&admin.DescribeWorkflowExecutionResponse{}, assert.AnError).Times(1)
h.EXPECT().DescribeWorkflowExecution(ctx, &admin.DescribeWorkflowExecutionRequest{}).Return(&admin.DescribeWorkflowExecutionResponse{}, internalErr).Times(1)
resp, err := th.DescribeWorkflowExecution(ctx, &admin.DescribeWorkflowExecutionRequest{})
assert.Equal(t, admin.DescribeWorkflowExecutionResponse{}, *resp)
assert.Equal(t, assert.AnError, err)
assert.Equal(t, expectedErr, err)
})
t.Run("GetDLQReplicationMessages", func(t *testing.T) {
h.EXPECT().GetDLQReplicationMessages(ctx, &replicator.GetDLQReplicationMessagesRequest{}).Return(&replicator.GetDLQReplicationMessagesResponse{}, assert.AnError).Times(1)
h.EXPECT().GetDLQReplicationMessages(ctx, &replicator.GetDLQReplicationMessagesRequest{}).Return(&replicator.GetDLQReplicationMessagesResponse{}, internalErr).Times(1)
resp, err := th.GetDLQReplicationMessages(ctx, &replicator.GetDLQReplicationMessagesRequest{})
assert.Equal(t, replicator.GetDLQReplicationMessagesResponse{}, *resp)
assert.Equal(t, assert.AnError, err)
assert.Equal(t, expectedErr, err)
})
t.Run("GetDomainReplicationMessages", func(t *testing.T) {
h.EXPECT().GetDomainReplicationMessages(ctx, &replicator.GetDomainReplicationMessagesRequest{}).Return(&replicator.GetDomainReplicationMessagesResponse{}, assert.AnError).Times(1)
h.EXPECT().GetDomainReplicationMessages(ctx, &replicator.GetDomainReplicationMessagesRequest{}).Return(&replicator.GetDomainReplicationMessagesResponse{}, internalErr).Times(1)
resp, err := th.GetDomainReplicationMessages(ctx, &replicator.GetDomainReplicationMessagesRequest{})
assert.Equal(t, replicator.GetDomainReplicationMessagesResponse{}, *resp)
assert.Equal(t, assert.AnError, err)
assert.Equal(t, expectedErr, err)
})
t.Run("GetReplicationMessages", func(t *testing.T) {
h.EXPECT().GetReplicationMessages(ctx, &replicator.GetReplicationMessagesRequest{}).Return(&replicator.GetReplicationMessagesResponse{}, assert.AnError).Times(1)
h.EXPECT().GetReplicationMessages(ctx, &replicator.GetReplicationMessagesRequest{}).Return(&replicator.GetReplicationMessagesResponse{}, internalErr).Times(1)
resp, err := th.GetReplicationMessages(ctx, &replicator.GetReplicationMessagesRequest{})
assert.Equal(t, replicator.GetReplicationMessagesResponse{}, *resp)
assert.Equal(t, assert.AnError, err)
assert.Equal(t, expectedErr, err)
})
t.Run("GetWorkflowExecutionRawHistoryV2", func(t *testing.T) {
h.EXPECT().GetWorkflowExecutionRawHistoryV2(ctx, &admin.GetWorkflowExecutionRawHistoryV2Request{}).Return(&admin.GetWorkflowExecutionRawHistoryV2Response{}, assert.AnError).Times(1)
h.EXPECT().GetWorkflowExecutionRawHistoryV2(ctx, &admin.GetWorkflowExecutionRawHistoryV2Request{}).Return(&admin.GetWorkflowExecutionRawHistoryV2Response{}, internalErr).Times(1)
resp, err := th.GetWorkflowExecutionRawHistoryV2(ctx, &admin.GetWorkflowExecutionRawHistoryV2Request{})
assert.Equal(t, admin.GetWorkflowExecutionRawHistoryV2Response{}, *resp)
assert.Equal(t, assert.AnError, err)
assert.Equal(t, expectedErr, err)
})
t.Run("MergeDLQMessages", func(t *testing.T) {
h.EXPECT().MergeDLQMessages(ctx, &replicator.MergeDLQMessagesRequest{}).Return(&replicator.MergeDLQMessagesResponse{}, assert.AnError).Times(1)
h.EXPECT().MergeDLQMessages(ctx, &replicator.MergeDLQMessagesRequest{}).Return(&replicator.MergeDLQMessagesResponse{}, internalErr).Times(1)
resp, err := th.MergeDLQMessages(ctx, &replicator.MergeDLQMessagesRequest{})
assert.Equal(t, replicator.MergeDLQMessagesResponse{}, *resp)
assert.Equal(t, assert.AnError, err)
assert.Equal(t, expectedErr, err)
})
t.Run("PurgeDLQMessages", func(t *testing.T) {
h.EXPECT().PurgeDLQMessages(ctx, &replicator.PurgeDLQMessagesRequest{}).Return(assert.AnError).Times(1)
h.EXPECT().PurgeDLQMessages(ctx, &replicator.PurgeDLQMessagesRequest{}).Return(internalErr).Times(1)
err := th.PurgeDLQMessages(ctx, &replicator.PurgeDLQMessagesRequest{})
assert.Equal(t, assert.AnError, err)
assert.Equal(t, expectedErr, err)
})
t.Run("ReadDLQMessages", func(t *testing.T) {
h.EXPECT().ReadDLQMessages(ctx, &replicator.ReadDLQMessagesRequest{}).Return(&replicator.ReadDLQMessagesResponse{}, assert.AnError).Times(1)
h.EXPECT().ReadDLQMessages(ctx, &replicator.ReadDLQMessagesRequest{}).Return(&replicator.ReadDLQMessagesResponse{}, internalErr).Times(1)
resp, err := th.ReadDLQMessages(ctx, &replicator.ReadDLQMessagesRequest{})
assert.Equal(t, replicator.ReadDLQMessagesResponse{}, *resp)
assert.Equal(t, assert.AnError, err)
assert.Equal(t, expectedErr, err)
})
t.Run("ReapplyEvents", func(t *testing.T) {
h.EXPECT().ReapplyEvents(ctx, &shared.ReapplyEventsRequest{}).Return(assert.AnError).Times(1)
h.EXPECT().ReapplyEvents(ctx, &shared.ReapplyEventsRequest{}).Return(internalErr).Times(1)
err := th.ReapplyEvents(ctx, &shared.ReapplyEventsRequest{})
assert.Equal(t, assert.AnError, err)
assert.Equal(t, expectedErr, err)
})
t.Run("RefreshWorkflowTasks", func(t *testing.T) {
h.EXPECT().RefreshWorkflowTasks(ctx, &shared.RefreshWorkflowTasksRequest{}).Return(assert.AnError).Times(1)
h.EXPECT().RefreshWorkflowTasks(ctx, &shared.RefreshWorkflowTasksRequest{}).Return(internalErr).Times(1)
err := th.RefreshWorkflowTasks(ctx, &shared.RefreshWorkflowTasksRequest{})
assert.Equal(t, assert.AnError, err)
assert.Equal(t, expectedErr, err)
})
t.Run("RemoveTask", func(t *testing.T) {
h.EXPECT().RemoveTask(ctx, &shared.RemoveTaskRequest{}).Return(assert.AnError).Times(1)
h.EXPECT().RemoveTask(ctx, &shared.RemoveTaskRequest{}).Return(internalErr).Times(1)
err := th.RemoveTask(ctx, &shared.RemoveTaskRequest{})
assert.Equal(t, assert.AnError, err)
assert.Equal(t, expectedErr, err)
})
t.Run("ResendReplicationTasks", func(t *testing.T) {
h.EXPECT().ResendReplicationTasks(ctx, &admin.ResendReplicationTasksRequest{}).Return(assert.AnError).Times(1)
h.EXPECT().ResendReplicationTasks(ctx, &admin.ResendReplicationTasksRequest{}).Return(internalErr).Times(1)
err := th.ResendReplicationTasks(ctx, &admin.ResendReplicationTasksRequest{})
assert.Equal(t, assert.AnError, err)
assert.Equal(t, expectedErr, err)
})
t.Run("ResetQueue", func(t *testing.T) {
h.EXPECT().ResetQueue(ctx, &shared.ResetQueueRequest{}).Return(assert.AnError).Times(1)
h.EXPECT().ResetQueue(ctx, &shared.ResetQueueRequest{}).Return(internalErr).Times(1)
err := th.ResetQueue(ctx, &shared.ResetQueueRequest{})
assert.Equal(t, assert.AnError, err)
assert.Equal(t, expectedErr, err)
})
}
Loading

0 comments on commit d2289ce

Please sign in to comment.