Skip to content

Commit

Permalink
History Client redirect on ShardOwnershipLostError (cadence-workflow#67)
Browse files Browse the repository at this point in the history
  • Loading branch information
Tamer Eldeeb authored Mar 15, 2017
1 parent 011b4a5 commit 470631a
Show file tree
Hide file tree
Showing 3 changed files with 126 additions and 30 deletions.
145 changes: 118 additions & 27 deletions client/history/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,9 +49,19 @@ func (c *clientImpl) StartWorkflowExecution(context thrift.Context,
if err != nil {
return nil, err
}
ctx, cancel := c.createContext(context)
defer cancel()
return client.StartWorkflowExecution(ctx, request)
var response *workflow.StartWorkflowExecutionResponse
op := func(context thrift.Context, client h.TChanHistoryService) error {
var err error
ctx, cancel := c.createContext(context)
defer cancel()
response, err = client.StartWorkflowExecution(ctx, request)
return err
}
err = c.executeWithRedirect(context, client, op)
if err != nil {
return nil, err
}
return response, nil
}

func (c *clientImpl) GetWorkflowExecutionHistory(context thrift.Context,
Expand All @@ -60,9 +70,19 @@ func (c *clientImpl) GetWorkflowExecutionHistory(context thrift.Context,
if err != nil {
return nil, err
}
ctx, cancel := c.createContext(context)
defer cancel()
return client.GetWorkflowExecutionHistory(ctx, request)
var response *workflow.GetWorkflowExecutionHistoryResponse
op := func(context thrift.Context, client h.TChanHistoryService) error {
var err error
ctx, cancel := c.createContext(context)
defer cancel()
response, err = client.GetWorkflowExecutionHistory(ctx, request)
return err
}
err = c.executeWithRedirect(context, client, op)
if err != nil {
return nil, err
}
return response, nil
}

func (c *clientImpl) RecordDecisionTaskStarted(context thrift.Context,
Expand All @@ -71,9 +91,19 @@ func (c *clientImpl) RecordDecisionTaskStarted(context thrift.Context,
if err != nil {
return nil, err
}
ctx, cancel := c.createContext(context)
defer cancel()
return client.RecordDecisionTaskStarted(ctx, request)
var response *h.RecordDecisionTaskStartedResponse
op := func(context thrift.Context, client h.TChanHistoryService) error {
var err error
ctx, cancel := c.createContext(context)
defer cancel()
response, err = client.RecordDecisionTaskStarted(ctx, request)
return err
}
err = c.executeWithRedirect(context, client, op)
if err != nil {
return nil, err
}
return response, nil
}

func (c *clientImpl) RecordActivityTaskStarted(context thrift.Context,
Expand All @@ -82,9 +112,19 @@ func (c *clientImpl) RecordActivityTaskStarted(context thrift.Context,
if err != nil {
return nil, err
}
ctx, cancel := c.createContext(context)
defer cancel()
return client.RecordActivityTaskStarted(ctx, request)
var response *h.RecordActivityTaskStartedResponse
op := func(context thrift.Context, client h.TChanHistoryService) error {
var err error
ctx, cancel := c.createContext(context)
defer cancel()
response, err = client.RecordActivityTaskStarted(ctx, request)
return err
}
err = c.executeWithRedirect(context, client, op)
if err != nil {
return nil, err
}
return response, nil
}

func (c *clientImpl) RespondDecisionTaskCompleted(context thrift.Context,
Expand All @@ -97,9 +137,13 @@ func (c *clientImpl) RespondDecisionTaskCompleted(context thrift.Context,
if err != nil {
return err
}
ctx, cancel := c.createContext(context)
defer cancel()
return client.RespondDecisionTaskCompleted(ctx, request)
op := func(context thrift.Context, client h.TChanHistoryService) error {
ctx, cancel := c.createContext(context)
defer cancel()
return client.RespondDecisionTaskCompleted(ctx, request)
}
err = c.executeWithRedirect(context, client, op)
return err
}

func (c *clientImpl) RespondActivityTaskCompleted(context thrift.Context,
Expand All @@ -112,9 +156,13 @@ func (c *clientImpl) RespondActivityTaskCompleted(context thrift.Context,
if err != nil {
return err
}
ctx, cancel := c.createContext(context)
defer cancel()
return client.RespondActivityTaskCompleted(ctx, request)
op := func(context thrift.Context, client h.TChanHistoryService) error {
ctx, cancel := c.createContext(context)
defer cancel()
return client.RespondActivityTaskCompleted(ctx, request)
}
err = c.executeWithRedirect(context, client, op)
return err
}

func (c *clientImpl) RespondActivityTaskFailed(context thrift.Context,
Expand All @@ -127,9 +175,13 @@ func (c *clientImpl) RespondActivityTaskFailed(context thrift.Context,
if err != nil {
return err
}
ctx, cancel := c.createContext(context)
defer cancel()
return client.RespondActivityTaskFailed(ctx, request)
op := func(context thrift.Context, client h.TChanHistoryService) error {
ctx, cancel := c.createContext(context)
defer cancel()
return client.RespondActivityTaskFailed(ctx, request)
}
err = c.executeWithRedirect(context, client, op)
return err
}

func (c *clientImpl) RespondActivityTaskCanceled(context thrift.Context,
Expand All @@ -142,9 +194,13 @@ func (c *clientImpl) RespondActivityTaskCanceled(context thrift.Context,
if err != nil {
return err
}
ctx, cancel := c.createContext(context)
defer cancel()
return client.RespondActivityTaskCanceled(ctx, request)
op := func(context thrift.Context, client h.TChanHistoryService) error {
ctx, cancel := c.createContext(context)
defer cancel()
return client.RespondActivityTaskCanceled(ctx, request)
}
err = c.executeWithRedirect(context, client, op)
return err
}

func (c *clientImpl) RecordActivityTaskHeartbeat(context thrift.Context,
Expand All @@ -157,9 +213,19 @@ func (c *clientImpl) RecordActivityTaskHeartbeat(context thrift.Context,
if err != nil {
return nil, err
}
ctx, cancel := c.createContext(context)
defer cancel()
return client.RecordActivityTaskHeartbeat(ctx, request)
var response *workflow.RecordActivityTaskHeartbeatResponse
op := func(context thrift.Context, client h.TChanHistoryService) error {
var err error
ctx, cancel := c.createContext(context)
defer cancel()
response, err = client.RecordActivityTaskHeartbeat(ctx, request)
return err
}
err = c.executeWithRedirect(context, client, op)
if err != nil {
return nil, err
}
return response, nil
}

func (c *clientImpl) getHostForRequest(workflowID string) (h.TChanHistoryService, error) {
Expand Down Expand Up @@ -207,3 +273,28 @@ func (c *clientImpl) getThriftClient(hostPort string) h.TChanHistoryService {
}
return client
}

func (c *clientImpl) executeWithRedirect(ctx thrift.Context, client h.TChanHistoryService,
op func(context thrift.Context, client h.TChanHistoryService) error) error {
var err error
if ctx == nil {
ctx = common.BackgroundThriftContext()
}
redirectLoop:
for {
err = common.IsValidContext(ctx)
if err != nil {
break redirectLoop
}
err = op(ctx, client)
if err != nil {
if s, ok := err.(*h.ShardOwnershipLostError); ok {
// TODO: consider emitting a metric for number of redirects
client = c.getThriftClient(s.GetOwner())
continue redirectLoop
}
}
break redirectLoop
}
return err
}
7 changes: 7 additions & 0 deletions common/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ import (
"sync"
"time"

"golang.org/x/net/context"

farm "github.com/dgryski/go-farm"
"github.com/uber-common/bark"

Expand Down Expand Up @@ -126,3 +128,8 @@ func IsValidContext(ctx thrift.Context) error {
}
return nil
}

// BackgroundThriftContext returns a wrapper around context.Background()
func BackgroundThriftContext() thrift.Context {
return thrift.Wrap(context.Background())
}
4 changes: 1 addition & 3 deletions service/matching/matchingEngine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,6 @@ import (
"testing"
"time"

"golang.org/x/net/context"

log "github.com/Sirupsen/logrus"
"github.com/davecgh/go-spew/spew"
"github.com/emirpasic/gods/maps/treemap"
Expand Down Expand Up @@ -58,7 +56,7 @@ func (s *matchingEngineSuite) SetupSuite() {
//go func() {
// log.Println(http.ListenAndServe("localhost:6060", nil))
//}()
s.callContext = thrift.Wrap(context.Background())
s.callContext = common.BackgroundThriftContext()
}

// Renders content of taskManager and matchingEngine when called at http://localhost:6060/test/tasks
Expand Down

0 comments on commit 470631a

Please sign in to comment.