Skip to content

Commit

Permalink
Add test case verifying that workflow ID reuse policy is hornored. (c…
Browse files Browse the repository at this point in the history
…adence-workflow#1343)

Fix sending message to closed channel issue on kafka consumer when stopped
  • Loading branch information
wxing1292 authored Dec 18, 2018
1 parent c0c38f7 commit 75f771c
Show file tree
Hide file tree
Showing 2 changed files with 138 additions and 2 deletions.
2 changes: 1 addition & 1 deletion common/messaging/kafkaConsumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ func (c *kafkaConsumer) Start() error {
for {
select {
case <-c.doneC:
close(c.msgC)
c.logger.Info("Stop consuming messages from channel")
return
// our Message interface is just a subset of Message interface in kafka-client so we don't need a wrapper here
Expand All @@ -71,7 +72,6 @@ func (c *kafkaConsumer) Start() error {
func (c *kafkaConsumer) Stop() {
c.logger.Info("Stopping consumer")
close(c.doneC)
close(c.msgC)
c.uConsumer.Stop()
}

Expand Down
138 changes: 137 additions & 1 deletion hostxdc/Integration_domain_failover_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ import (
"github.com/uber/cadence/common/cache"
"github.com/uber/cadence/common/cluster"
"github.com/uber/cadence/common/messaging"
"github.com/uber/cadence/common/persistence/persistence-tests"
persistencetests "github.com/uber/cadence/common/persistence/persistence-tests"
"github.com/uber/cadence/common/service/config"
"github.com/uber/cadence/common/service/dynamicconfig"
"github.com/uber/cadence/host"
Expand Down Expand Up @@ -520,6 +520,142 @@ func (s *integrationClustersTestSuite) TestSimpleWorkflowFailover() {
s.Nil(err)
}

func (s *integrationClustersTestSuite) TestStartWorkflowExecution_Failover_WorkflowIDReusePolicy() {
domainName := "test-start-workflow-failover-ID-reuse-policy" + common.GenerateRandomString(5)
client1 := s.cluster1.host.GetFrontendClient() // active
regReq := &workflow.RegisterDomainRequest{
Name: common.StringPtr(domainName),
Clusters: clusterReplicationConfig,
ActiveClusterName: common.StringPtr(clusterName[0]),
WorkflowExecutionRetentionPeriodInDays: common.Int32Ptr(1),
}
err := client1.RegisterDomain(createContext(), regReq)
s.NoError(err)

descReq := &workflow.DescribeDomainRequest{
Name: common.StringPtr(domainName),
}
resp, err := client1.DescribeDomain(createContext(), descReq)
s.NoError(err)
s.NotNil(resp)
// Wait for domain cache to pick the chenge
time.Sleep(cache.DomainCacheRefreshInterval)

client2 := s.cluster2.host.GetFrontendClient() // standby
resp2, err := client2.DescribeDomain(createContext(), descReq)
s.NoError(err)
s.NotNil(resp2)
s.Equal(resp, resp2)

// start a workflow
id := "integration-start-workflow-failover-ID-reuse-policy-test"
wt := "integration-start-workflow-failover-ID-reuse-policy-test-type"
tl := "integration-start-workflow-failover-ID-reuse-policy-test-tasklist"
identity := "worker1"
workflowType := &workflow.WorkflowType{Name: common.StringPtr(wt)}
taskList := &workflow.TaskList{Name: common.StringPtr(tl)}
startReq := &workflow.StartWorkflowExecutionRequest{
RequestId: common.StringPtr(uuid.New()),
Domain: common.StringPtr(domainName),
WorkflowId: common.StringPtr(id),
WorkflowType: workflowType,
TaskList: taskList,
Input: nil,
ExecutionStartToCloseTimeoutSeconds: common.Int32Ptr(100),
TaskStartToCloseTimeoutSeconds: common.Int32Ptr(1),
Identity: common.StringPtr(identity),
WorkflowIdReusePolicy: workflow.WorkflowIdReusePolicyAllowDuplicate.Ptr(),
}
we, err := client1.StartWorkflowExecution(createContext(), startReq)
s.Nil(err)
s.NotNil(we.GetRunId())
s.logger.Infof("StartWorkflowExecution in cluster 1: response: %v \n", we.GetRunId())

workflowCompleteTimes := 0
dtHandler := func(execution *workflow.WorkflowExecution, wt *workflow.WorkflowType,
previousStartedEventID, startedEventID int64, history *workflow.History) ([]byte, []*workflow.Decision, error) {

workflowCompleteTimes++
return nil, []*workflow.Decision{{
DecisionType: common.DecisionTypePtr(workflow.DecisionTypeCompleteWorkflowExecution),
CompleteWorkflowExecutionDecisionAttributes: &workflow.CompleteWorkflowExecutionDecisionAttributes{
Result: []byte("Done."),
},
}}, nil
}

poller := host.TaskPoller{
Engine: client1,
Domain: domainName,
TaskList: taskList,
Identity: identity,
DecisionHandler: dtHandler,
ActivityHandler: nil,
Logger: s.logger,
T: s.T(),
}

poller2 := host.TaskPoller{
Engine: client2,
Domain: domainName,
TaskList: taskList,
Identity: identity,
DecisionHandler: dtHandler,
ActivityHandler: nil,
Logger: s.logger,
T: s.T(),
}

// make some progress in cluster 1
_, err = poller.PollAndProcessDecisionTask(false, false)
s.logger.Infof("PollAndProcessDecisionTask: %v", err)
s.Nil(err)
s.Equal(1, workflowCompleteTimes)

// update domain to fail over
updateReq := &workflow.UpdateDomainRequest{
Name: common.StringPtr(domainName),
ReplicationConfiguration: &workflow.DomainReplicationConfiguration{
ActiveClusterName: common.StringPtr(clusterName[1]),
},
}
updateResp, err := client1.UpdateDomain(createContext(), updateReq)
s.NoError(err)
s.NotNil(updateResp)
s.Equal(clusterName[1], updateResp.ReplicationConfiguration.GetActiveClusterName())
s.Equal(int64(1), updateResp.GetFailoverVersion())

// wait till failover completed
time.Sleep(cacheRefreshInterval)

// start the workflow in cluster 2 with ID reuse policy being allow if last run fails
startReq.RequestId = common.StringPtr(uuid.New())
startReq.WorkflowIdReusePolicy = workflow.WorkflowIdReusePolicyAllowDuplicateFailedOnly.Ptr()
we, err = client2.StartWorkflowExecution(createContext(), startReq)
s.IsType(&workflow.WorkflowExecutionAlreadyStartedError{}, err)
s.Nil(we)

// start the workflow in cluster 2 with ID reuse policy being reject ID reuse
startReq.RequestId = common.StringPtr(uuid.New())
startReq.WorkflowIdReusePolicy = workflow.WorkflowIdReusePolicyRejectDuplicate.Ptr()
we, err = client2.StartWorkflowExecution(createContext(), startReq)
s.IsType(&workflow.WorkflowExecutionAlreadyStartedError{}, err)
s.Nil(we)

// start the workflow in cluster 2
startReq.RequestId = common.StringPtr(uuid.New())
startReq.WorkflowIdReusePolicy = workflow.WorkflowIdReusePolicyAllowDuplicate.Ptr()
we, err = client2.StartWorkflowExecution(createContext(), startReq)
s.Nil(err)
s.NotNil(we.GetRunId())
s.logger.Infof("StartWorkflowExecution in cluster 2: response: %v \n", we.GetRunId())

_, err = poller2.PollAndProcessDecisionTask(false, false)
s.logger.Infof("PollAndProcessDecisionTask 2: %v", err)
s.Nil(err)
s.Equal(2, workflowCompleteTimes)
}

func (s *integrationClustersTestSuite) TestTerminateFailover() {
domainName := "test-terminate-workflow-failover-" + common.GenerateRandomString(5)
client1 := s.cluster1.host.GetFrontendClient() // active
Expand Down

0 comments on commit 75f771c

Please sign in to comment.