Skip to content

Commit

Permalink
Add onebox host and use it to run integration test
Browse files Browse the repository at this point in the history
Summary:
This revision adds a host that starts all cadence services in a single process.
It also wires the integration test to communicate with the one-box hosted cadence
frontend.
The integration test is now controlled by a flag. By default it won't run unless
that flag is set to true, because it takes too much time.

Test Plan: Integration test is passing

Reviewers: samar, sivakk

Reviewed By: sivakk

Subscribers: jenkins

Differential Revision: https://code.uberinternal.com/D700756
  • Loading branch information
Tamer Eldeeb committed Jan 13, 2017
1 parent 07738de commit ef7d0ba
Show file tree
Hide file tree
Showing 15 changed files with 368 additions and 70 deletions.
92 changes: 92 additions & 0 deletions client/frontend/client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
package frontend

import (
"time"

"golang.org/x/net/context"

m "code.uber.internal/devexp/minions/.gen/go/minions"
workflow "code.uber.internal/devexp/minions/.gen/go/shared"
tchannel "github.com/uber/tchannel-go"
"github.com/uber/tchannel-go/thrift"
)

const frontendServiceName = "cadence-frontend"

var _ Client = (*clientImpl)(nil)

type clientImpl struct {
connection *tchannel.Channel
client m.TChanWorkflowService
}

// NewClient creates a new frontend TChannel client
func NewClient(ch *tchannel.Channel, hostPort string) (Client, error) {
var opts *thrift.ClientOptions
if hostPort != "" {
opts = &thrift.ClientOptions{
HostPort: hostPort,
}
}
tClient := thrift.NewClient(ch, frontendServiceName, opts)

client := &clientImpl{
connection: ch,
client: m.NewTChanWorkflowServiceClient(tClient),
}
return client, nil
}

func (c *clientImpl) createContext() (thrift.Context, context.CancelFunc) {
// TODO: make timeout configurable
return thrift.NewContext(time.Minute * 3)
}

func (c *clientImpl) StartWorkflowExecution(request *workflow.StartWorkflowExecutionRequest) (*workflow.StartWorkflowExecutionResponse, error) {
ctx, cancel := c.createContext()
defer cancel()
return c.client.StartWorkflowExecution(ctx, request)
}

func (c *clientImpl) GetWorkflowExecutionHistory(
request *workflow.GetWorkflowExecutionHistoryRequest) (*workflow.GetWorkflowExecutionHistoryResponse, error) {
ctx, cancel := c.createContext()
defer cancel()
return c.client.GetWorkflowExecutionHistory(ctx, request)
}

func (c *clientImpl) PollForActivityTask(pollRequest *workflow.PollForActivityTaskRequest) (*workflow.PollForActivityTaskResponse, error) {
ctx, cancel := c.createContext()
defer cancel()
return c.client.PollForActivityTask(ctx, pollRequest)
}

func (c *clientImpl) PollForDecisionTask(pollRequest *workflow.PollForDecisionTaskRequest) (*workflow.PollForDecisionTaskResponse, error) {
ctx, cancel := c.createContext()
defer cancel()
return c.client.PollForDecisionTask(ctx, pollRequest)
}

func (c *clientImpl) RecordActivityTaskHeartbeat(heartbeatRequest *workflow.RecordActivityTaskHeartbeatRequest) (*workflow.RecordActivityTaskHeartbeatResponse, error) {
ctx, cancel := c.createContext()
defer cancel()
return c.client.RecordActivityTaskHeartbeat(ctx, heartbeatRequest)
}

func (c *clientImpl) RespondDecisionTaskCompleted(request *workflow.RespondDecisionTaskCompletedRequest) error {
ctx, cancel := c.createContext()
defer cancel()
return c.client.RespondDecisionTaskCompleted(ctx, request)
}

func (c *clientImpl) RespondActivityTaskCompleted(request *workflow.RespondActivityTaskCompletedRequest) error {
ctx, cancel := c.createContext()
defer cancel()
return c.client.RespondActivityTaskCompleted(ctx, request)
}

func (c *clientImpl) RespondActivityTaskFailed(request *workflow.RespondActivityTaskFailedRequest) error {
ctx, cancel := c.createContext()
defer cancel()
return c.client.RespondActivityTaskFailed(ctx, request)
}
17 changes: 17 additions & 0 deletions client/frontend/interface.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package frontend

import (
"code.uber.internal/devexp/minions/.gen/go/shared"
)

// Client is the interface exposed by frontend service client
type Client interface {
GetWorkflowExecutionHistory(getRequest *shared.GetWorkflowExecutionHistoryRequest) (*shared.GetWorkflowExecutionHistoryResponse, error)
PollForActivityTask(pollRequest *shared.PollForActivityTaskRequest) (*shared.PollForActivityTaskResponse, error)
PollForDecisionTask(pollRequest *shared.PollForDecisionTaskRequest) (*shared.PollForDecisionTaskResponse, error)
RecordActivityTaskHeartbeat(heartbeatRequest *shared.RecordActivityTaskHeartbeatRequest) (*shared.RecordActivityTaskHeartbeatResponse, error)
RespondActivityTaskCompleted(completeRequest *shared.RespondActivityTaskCompletedRequest) error
RespondActivityTaskFailed(failRequest *shared.RespondActivityTaskFailedRequest) error
RespondDecisionTaskCompleted(completeRequest *shared.RespondDecisionTaskCompletedRequest) error
StartWorkflowExecution(startRequest *shared.StartWorkflowExecutionRequest) (*shared.StartWorkflowExecutionResponse, error)
}
2 changes: 1 addition & 1 deletion client/history/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ func (c *clientImpl) getHostForRequest(key string) (h.TChanHistoryService, error

func (c *clientImpl) createContext() (thrift.Context, context.CancelFunc) {
// TODO: make timeout configurable
return thrift.NewContext(time.Second * 10)
return thrift.NewContext(time.Second * 30)
}

func (c *clientImpl) StartWorkflowExecution(request *workflow.StartWorkflowExecutionRequest) (*workflow.StartWorkflowExecutionResponse, error) {
Expand Down
2 changes: 1 addition & 1 deletion client/matching/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ func (c *clientImpl) getHostForRequest(key string) (m.TChanMatchingService, erro

func (c *clientImpl) createContext() (thrift.Context, context.CancelFunc) {
// TODO: make timeout configurable
return thrift.NewContext(time.Second * 10)
return thrift.NewContext(time.Minute * 3)
}

func (c *clientImpl) AddActivityTask(addRequest *m.AddActivityTaskRequest) error {
Expand Down
64 changes: 35 additions & 29 deletions workflow/integration_test.go → host/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,30 +18,40 @@
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.

package workflow
package host

import (
"flag"
"os"
"testing"

log "github.com/Sirupsen/logrus"
"github.com/stretchr/testify/suite"
"github.com/uber-common/bark"
tchannel "github.com/uber/tchannel-go"

"bytes"
"encoding/binary"
"strconv"

workflow "code.uber.internal/devexp/minions/.gen/go/shared"
"code.uber.internal/devexp/minions/client/frontend"
"code.uber.internal/devexp/minions/common"
wf "code.uber.internal/devexp/minions/workflow"
)

var (
integration = flag.Bool("integration", false, "run integration tests")
)

type (
integrationSuite struct {
engine Engine
host Cadence
ch *tchannel.Channel
engine frontend.Client
logger bark.Logger
suite.Suite
TestBase
wf.TestBase
}

decisionTaskHandler func(execution *workflow.WorkflowExecution, wt *workflow.WorkflowType,
Expand All @@ -50,7 +60,7 @@ type (
activityID string, startedEventID int64, input []byte) ([]byte, error)

taskPoller struct {
engine Engine
engine frontend.Client
taskList *workflow.TaskList
identity string
decisionHandler decisionTaskHandler
Expand All @@ -60,8 +70,13 @@ type (
)

func TestIntegrationSuite(t *testing.T) {
s := new(integrationSuite)
suite.Run(t, s)
flag.Parse()
if *integration {
s := new(integrationSuite)
suite.Run(t, s)
} else {
t.Skip()
}
}

func (s *integrationSuite) SetupSuite() {
Expand All @@ -73,6 +88,8 @@ func (s *integrationSuite) SetupSuite() {
logger := log.New()
logger.Level = log.DebugLevel
s.logger = bark.NewLoggerFromLogrus(logger)

s.ch, _ = tchannel.NewChannel("cadence-integration-test", nil)
}

func (s *integrationSuite) TearDownSuite() {
Expand All @@ -81,13 +98,14 @@ func (s *integrationSuite) TearDownSuite() {

func (s *integrationSuite) SetupTest() {
s.ClearTransferQueue()
s.engine = NewWorkflowEngineWithShard(s.ShardContext, s.WorkflowMgr, s.TaskMgr, s.logger)
s.engine.Start()
s.host = NewCadence(s.WorkflowMgr, s.TaskMgr, s.logger)
s.host.Start()
s.engine, _ = frontend.NewClient(s.ch, s.host.FrontendAddress())
}

func (s *integrationSuite) TearDownTest() {
s.engine.Stop()
s.engine = nil
s.host.Stop()
s.host = nil
}

func (s *integrationSuite) TestStartWorkflowExecution() {
Expand Down Expand Up @@ -232,20 +250,20 @@ retry:
Identity: common.StringPtr(p.identity),
})

if err1 == errDuplicate {
if err1 == wf.ErrDuplicate {
continue retry
}

if err1 != nil {
return err1
}

if response == nil || response == emptyPollForDecisionTaskResponse {
if response == nil || response == wf.EmptyPollForDecisionTaskResponse {
continue retry
}

if dumpHistory {
printHistory(response.GetHistory(), p.logger)
wf.PrintHistory(response.GetHistory(), p.logger)
}

context, decisions := p.decisionHandler(response.GetWorkflowExecution(), response.GetWorkflowType(),
Expand All @@ -259,7 +277,7 @@ retry:
})
}

return errNoTasks
return wf.ErrNoTasks
}

func (p *taskPoller) pollAndProcessActivityTask() error {
Expand All @@ -270,15 +288,15 @@ retry:
Identity: common.StringPtr(p.identity),
})

if err1 == errDuplicate {
if err1 == wf.ErrDuplicate {
continue retry
}

if err1 != nil {
return err1
}

if response == nil || response == emptyPollForActivityTaskResponse {
if response == nil || response == wf.EmptyPollForActivityTaskResponse {
continue retry
}

Expand All @@ -299,17 +317,5 @@ retry:
})
}

return errNoTasks
}

func printHistory(history *workflow.History, logger bark.Logger) {
serializer := newJSONHistorySerializer()
data, err := serializer.Serialize(history.GetEvents())
if err != nil {
logger.Errorf("Error serializing history: %v\n", err)
}

logger.Info("******************************************")
logger.Infof("History: %v", string(data))
logger.Info("******************************************")
return wf.ErrNoTasks
}
Loading

0 comments on commit ef7d0ba

Please sign in to comment.