Skip to content

Commit

Permalink
Added a defensive check in the getTaskListManager function (cadence-w…
Browse files Browse the repository at this point in the history
…orkflow#6199)

What changed?
Added a check whether or not we should own a shard before we steal it.

Why?
This makes sure we do not steal tasklists we do not own according to the
hashring.

How did you test it?
Unit tests, local runs, and deployment in dev environments.
I didn't see any ill effects from the change.

Potential risks
This code path is quite busy, so introducing a check might have performance impacts.

There is a risk that correct requests are rejected because the matching host has a wrong view of the hash ring. I expect this to be transient, and everything should be retried.

We introduce a new error, I have checked the callers and they should all handle unknown errors.

Release notes

Documentation Changes
  • Loading branch information
jakobht authored Aug 2, 2024
1 parent 06e5a6d commit 879bbbe
Show file tree
Hide file tree
Showing 3 changed files with 133 additions and 5 deletions.
46 changes: 46 additions & 0 deletions common/errors/taskListNotOwnedByHostError.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
// The MIT License (MIT)

// Copyright (c) 2017-2020 Uber Technologies Inc.

// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in all
// copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
// SOFTWARE.

package errors

import "fmt"

var _ error = &TaskListNotOwnnedByHostError{}

type TaskListNotOwnnedByHostError struct {
OwnedByIdentity string
MyIdentity string
TasklistName string
}

func (m *TaskListNotOwnnedByHostError) Error() string {
return fmt.Sprintf("task list is not owned by this host: OwnedBy: %s, Me: %s, Tasklist: %s",
m.OwnedByIdentity, m.MyIdentity, m.TasklistName)
}

func NewTaskListNotOwnnedByHostError(ownedByIdentity string, myIdentity string, tasklistName string) *TaskListNotOwnnedByHostError {
return &TaskListNotOwnnedByHostError{
OwnedByIdentity: ownedByIdentity,
MyIdentity: myIdentity,
TasklistName: tasklistName,
}
}
23 changes: 23 additions & 0 deletions service/matching/handler/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ import (
"github.com/uber/cadence/common/client"
"github.com/uber/cadence/common/clock"
"github.com/uber/cadence/common/cluster"
cadence_errors "github.com/uber/cadence/common/errors"
"github.com/uber/cadence/common/log"
"github.com/uber/cadence/common/log/tag"
"github.com/uber/cadence/common/membership"
Expand Down Expand Up @@ -199,6 +200,28 @@ func (e *matchingEngineImpl) getTaskListManager(taskList *tasklist.Identifier, t
}
e.taskListsLock.RUnlock()

// Defensive check to make sure we actually own the task list
// If we try to create a task list manager for a task list that is not owned by us, return an error
// The new task list manager will steal the task list from the current owner, which should only happen if
// the task list is owned by the current host.
taskListOwner, err := e.membershipResolver.Lookup(service.Matching, taskList.GetName())
if err != nil {
return nil, fmt.Errorf("failed to lookup task list owner: %w", err)
}

self, err := e.membershipResolver.WhoAmI()
if err != nil {
return nil, fmt.Errorf("failed to lookup self im membership: %w", err)
}

if taskListOwner.Identity() != self.Identity() {
return nil, cadence_errors.NewTaskListNotOwnnedByHostError(
taskListOwner.Identity(),
self.Identity(),
taskList.GetName(),
)
}

// If it gets here, write lock and check again in case a task list is created between the two locks
e.taskListsLock.Lock()
if result, ok := e.taskLists[*taskList]; ok {
Expand Down
69 changes: 64 additions & 5 deletions service/matching/handler/engine_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (

"github.com/golang/mock/gomock"
"github.com/pborman/uuid"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/suite"
"github.com/uber-go/tally"
"go.uber.org/yarpc"
Expand All @@ -44,10 +45,12 @@ import (
"github.com/uber/cadence/common/clock"
"github.com/uber/cadence/common/cluster"
"github.com/uber/cadence/common/dynamicconfig"
cadence_errors "github.com/uber/cadence/common/errors"
"github.com/uber/cadence/common/isolationgroup/defaultisolationgroupstate"
"github.com/uber/cadence/common/log"
"github.com/uber/cadence/common/log/tag"
"github.com/uber/cadence/common/log/testlogger"
"github.com/uber/cadence/common/membership"
"github.com/uber/cadence/common/metrics"
"github.com/uber/cadence/common/mocks"
"github.com/uber/cadence/common/partition"
Expand All @@ -60,10 +63,11 @@ import (
type (
matchingEngineSuite struct {
suite.Suite
controller *gomock.Controller
mockHistoryClient *history.MockClient
mockDomainCache *cache.MockDomainCache
mockIsolationStore *dynamicconfig.MockClient
controller *gomock.Controller
mockHistoryClient *history.MockClient
mockDomainCache *cache.MockDomainCache
mockMembershipResolver *membership.MockResolver
mockIsolationStore *dynamicconfig.MockClient

matchingEngine *matchingEngineImpl
taskManager *tasklist.TestTaskManager
Expand Down Expand Up @@ -124,6 +128,9 @@ func (s *matchingEngineSuite) SetupTest() {
s.mockDomainCache.EXPECT().GetDomainByID(gomock.Any()).Return(cache.CreateDomainCacheEntry(matchingTestDomainName), nil).AnyTimes()
s.mockDomainCache.EXPECT().GetDomain(gomock.Any()).Return(cache.CreateDomainCacheEntry(matchingTestDomainName), nil).AnyTimes()
s.mockDomainCache.EXPECT().GetDomainName(gomock.Any()).Return(matchingTestDomainName, nil).AnyTimes()
s.mockMembershipResolver = membership.NewMockResolver(s.controller)
s.mockMembershipResolver.EXPECT().Lookup(gomock.Any(), gomock.Any()).Return(membership.HostInfo{}, nil).AnyTimes()
s.mockMembershipResolver.EXPECT().WhoAmI().Return(membership.HostInfo{}, nil).AnyTimes()
s.mockIsolationStore = dynamicconfig.NewMockClient(s.controller)
dcClient := dynamicconfig.NewInMemoryClient()
dcClient.UpdateValue(dynamicconfig.EnableTasklistIsolation, true)
Expand Down Expand Up @@ -162,7 +169,7 @@ func (s *matchingEngineSuite) newMatchingEngine(
s.logger,
metrics.NewClient(tally.NoopScope, metrics.Matching),
s.mockDomainCache,
nil,
s.mockMembershipResolver,
s.partitioner,
s.mockTimeSource,
).(*matchingEngineImpl)
Expand Down Expand Up @@ -1296,6 +1303,58 @@ func (s *matchingEngineSuite) TestConfigDefaultHostName() {
s.EqualValues(configEmpty.HostName, "")
}

func (s *matchingEngineSuite) TestGetTaskListManager_OwnerShip() {
testCases := []struct {
name string
lookUpResult string
lookUpErr error
whoAmIResult string
whoAmIErr error

expectedError error
}{
{
name: "Not owned by current host",
lookUpResult: "A",
whoAmIResult: "B",
expectedError: new(cadence_errors.TaskListNotOwnnedByHostError),
},
{
name: "LookupError",
lookUpErr: assert.AnError,
expectedError: assert.AnError,
},
{
name: "WhoAmIError",
whoAmIErr: assert.AnError,
expectedError: assert.AnError,
},
}

for _, tc := range testCases {
s.T().Run(tc.name, func(t *testing.T) {
resolverMock := membership.NewMockResolver(s.controller)
s.matchingEngine.membershipResolver = resolverMock

resolverMock.EXPECT().Lookup(gomock.Any(), gomock.Any()).Return(
membership.NewDetailedHostInfo("", tc.lookUpResult, make(membership.PortMap)), tc.lookUpErr,
).AnyTimes()
resolverMock.EXPECT().WhoAmI().Return(
membership.NewDetailedHostInfo("", tc.whoAmIResult, make(membership.PortMap)), tc.whoAmIErr,
).AnyTimes()

taskListKind := types.TaskListKindNormal

_, err := s.matchingEngine.getTaskListManager(
tasklist.NewTestTaskListID(s.T(), "domain", "tasklist", persistence.TaskListTypeActivity),
&taskListKind,
)

assert.ErrorAs(s.T(), err, &tc.expectedError)
})
}
}

func newActivityTaskScheduledEvent(eventID int64, decisionTaskCompletedEventID int64,
scheduleAttributes *types.ScheduleActivityTaskDecisionAttributes) *types.HistoryEvent {
historyEvent := newHistoryEvent(eventID, types.EventTypeActivityTaskScheduled)
Expand Down

0 comments on commit 879bbbe

Please sign in to comment.