From 5494354771d0e92bd64c66f7afe0b37648e48acb Mon Sep 17 00:00:00 2001 From: sankari gopalakrishnan Date: Fri, 26 Apr 2024 16:03:08 +0200 Subject: [PATCH] Integration test for workflow ID based rate limiting task processing (#5933) * Integration test for workflow ID based rate limiting task processing * Update workflowsidinternalratelimit_test.go * Update workflowsidinternalratelimit_test.go * Update workflowsidinternalratelimit_test.go * Update workflowsidinternalratelimit_test.go * Update workflowsidinternalratelimit_test.go * Update workflowsidinternalratelimit_test.go * Update workflowsidinternalratelimit_test.go * Update workflowsidinternalratelimit_test.go --- host/test_suites.go | 5 + host/workflowsidinternalratelimit_test.go | 232 ++++++++++++++++++++++ 2 files changed, 237 insertions(+) create mode 100644 host/workflowsidinternalratelimit_test.go diff --git a/host/test_suites.go b/host/test_suites.go index 6f22796c9ca..055a85b4f01 100644 --- a/host/test_suites.go +++ b/host/test_suites.go @@ -65,4 +65,9 @@ type ( *require.Assertions *IntegrationBase } + + WorkflowIDInternalRateLimitIntegrationSuite struct { + *require.Assertions + *IntegrationBase + } ) diff --git a/host/workflowsidinternalratelimit_test.go b/host/workflowsidinternalratelimit_test.go new file mode 100644 index 00000000000..921b88a5062 --- /dev/null +++ b/host/workflowsidinternalratelimit_test.go @@ -0,0 +1,232 @@ +// The MIT License (MIT) + +// Copyright (c) 2017-2020 Uber Technologies Inc. + +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in all +// copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +// SOFTWARE. + +package host + +import ( + "bytes" + "encoding/binary" + "flag" + "strconv" + "testing" + "time" + + "github.com/pborman/uuid" + "github.com/stretchr/testify/require" + "github.com/stretchr/testify/suite" + + "github.com/uber/cadence/common" + "github.com/uber/cadence/common/clock" + "github.com/uber/cadence/common/dynamicconfig" + "github.com/uber/cadence/common/log/tag" + "github.com/uber/cadence/common/persistence" + pt "github.com/uber/cadence/common/persistence/persistence-tests" + "github.com/uber/cadence/common/types" + "github.com/uber/cadence/service/matching" +) + +func TestWorkflowIDInternalRateLimitIntegrationSuite(t *testing.T) { + // Loads the flags for persistence etc., if none are given they are set in ./flag.go + flag.Parse() + + clusterConfig, err := GetTestClusterConfig("testdata/integration_wfidratelimit_cluster.yaml") + require.NoError(t, err) + + clusterConfig.TimeSource = clock.NewMockedTimeSource() + clusterConfig.HistoryDynamicConfigOverrides = map[dynamicconfig.Key]interface{}{ + dynamicconfig.WorkflowIDCacheInternalEnabled: true, + dynamicconfig.WorkflowIDInternalRPS: 2, + dynamicconfig.WorkflowIDInternalRateLimitEnabled: true, + } + + testCluster := NewPersistenceTestCluster(t, clusterConfig) + + s := new(WorkflowIDInternalRateLimitIntegrationSuite) + params := IntegrationBaseParams{ + DefaultTestCluster: testCluster, + VisibilityTestCluster: testCluster, + TestClusterConfig: clusterConfig, + } + s.IntegrationBase = NewIntegrationBase(params) + suite.Run(t, s) +} + +func (s *WorkflowIDInternalRateLimitIntegrationSuite) SetupSuite() { + s.setupLogger() + + s.Logger.Info("Running integration test against test cluster") + clusterMetadata := NewClusterMetadata(s.T(), s.testClusterConfig) + dc := persistence.DynamicConfiguration{ + EnableCassandraAllConsistencyLevelDelete: dynamicconfig.GetBoolPropertyFn(true), + PersistenceSampleLoggingRate: dynamicconfig.GetIntPropertyFn(100), + EnableShardIDMetrics: dynamicconfig.GetBoolPropertyFn(true), + } + params := pt.TestBaseParams{ + DefaultTestCluster: s.defaultTestCluster, + VisibilityTestCluster: s.visibilityTestCluster, + ClusterMetadata: clusterMetadata, + DynamicConfiguration: dc, + } + cluster, err := NewCluster(s.T(), s.testClusterConfig, s.Logger, params) + s.Require().NoError(err) + s.testCluster = cluster + s.engine = s.testCluster.GetFrontendClient() + s.adminClient = s.testCluster.GetAdminClient() + + s.domainName = s.randomizeStr("integration-test-domain") + s.Require().NoError(s.registerDomain(s.domainName, 1, types.ArchivalStatusDisabled, "", types.ArchivalStatusDisabled, "")) + + s.domainCacheRefresh() +} + +func (s *WorkflowIDInternalRateLimitIntegrationSuite) SetupTest() { + s.Assertions = require.New(s.T()) +} + +func (s *WorkflowIDInternalRateLimitIntegrationSuite) TearDownSuite() { + s.tearDownSuite() +} + +func (s *WorkflowIDInternalRateLimitIntegrationSuite) TestWorkflowIDSpecificInternalRateLimits() { + const ( + testWorkflowID = "integration-workflow-specific-internal-rate-limit-test" + testWorkflowType = "integration-workflow-specific-internal-rate-limit-test-type" + testTaskListName = "integration-workflow-specific-internal-rate-limit-test-taskList" + testIdentity = "worker1" + ) + + activityName := "test-activity" + + request := &types.StartWorkflowExecutionRequest{ + RequestID: uuid.New(), + Domain: s.domainName, + WorkflowID: testWorkflowID, + WorkflowType: &types.WorkflowType{Name: testWorkflowType}, + TaskList: &types.TaskList{Name: testTaskListName}, + Input: nil, + ExecutionStartToCloseTimeoutSeconds: common.Int32Ptr(100), + TaskStartToCloseTimeoutSeconds: common.Int32Ptr(1), + Identity: testIdentity, + WorkflowIDReusePolicy: types.WorkflowIDReusePolicyTerminateIfRunning.Ptr(), + } + + ctx, cancel := createContext() + defer cancel() + + we, err := s.engine.StartWorkflowExecution(ctx, request) + s.NoError(err) + + s.Logger.Info("StartWorkflowExecution", tag.WorkflowRunID(we.RunID)) + + workflowComplete := false + activityCount := int32(5) + activityCounter := int32(0) + + dtHandler := func(execution *types.WorkflowExecution, wt *types.WorkflowType, + previousStartedEventID, startedEventID int64, history *types.History) ([]byte, []*types.Decision, error) { + if activityCounter < activityCount { + activityCounter++ + buf := new(bytes.Buffer) + s.Nil(binary.Write(buf, binary.LittleEndian, activityCounter)) + + return []byte(strconv.Itoa(int(activityCounter))), []*types.Decision{{ + DecisionType: types.DecisionTypeScheduleActivityTask.Ptr(), + ScheduleActivityTaskDecisionAttributes: &types.ScheduleActivityTaskDecisionAttributes{ + ActivityID: strconv.Itoa(int(activityCounter)), + ActivityType: &types.ActivityType{Name: activityName}, + TaskList: &types.TaskList{Name: testTaskListName}, + Input: buf.Bytes(), + ScheduleToCloseTimeoutSeconds: common.Int32Ptr(1), + ScheduleToStartTimeoutSeconds: common.Int32Ptr(1), + StartToCloseTimeoutSeconds: common.Int32Ptr(1), + HeartbeatTimeoutSeconds: common.Int32Ptr(1), + }, + }}, nil + } + + s.Logger.Info("Completing Workflow.") + + workflowComplete = true + return []byte(strconv.Itoa(int(activityCounter))), []*types.Decision{{ + DecisionType: types.DecisionTypeCompleteWorkflowExecution.Ptr(), + CompleteWorkflowExecutionDecisionAttributes: &types.CompleteWorkflowExecutionDecisionAttributes{ + Result: []byte("Done."), + }, + }}, nil + } + + activityExecutedCount := int32(0) + atHandler := func(execution *types.WorkflowExecution, activityType *types.ActivityType, + activityID string, input []byte, taskToken []byte) ([]byte, bool, error) { + s.Equal(testWorkflowID, execution.WorkflowID) + s.Equal(activityName, activityType.Name) + + activityExecutedCount++ + return []byte("Activity Result."), false, nil + } + + poller := &TaskPoller{ + Engine: s.engine, + Domain: s.domainName, + TaskList: &types.TaskList{Name: testTaskListName}, + Identity: testIdentity, + DecisionHandler: dtHandler, + ActivityHandler: atHandler, + Logger: s.Logger, + T: s.T(), + } + + for i := int32(0); i < activityCount; i++ { + _, err = poller.PollAndProcessDecisionTask(false, false) + s.True(err == nil || err == matching.ErrNoTasks) + + err = poller.PollAndProcessActivityTask(false) + s.True(err == nil || err == matching.ErrNoTasks) + } + + s.Logger.Info("Waiting for workflow to complete", tag.WorkflowRunID(we.RunID)) + + s.False(workflowComplete) + _, err = poller.PollAndProcessDecisionTask(false, false) + s.True(err == nil || err == matching.ErrNoTasks) + s.True(workflowComplete) + + historyResponse, err := s.engine.GetWorkflowExecutionHistory(ctx, &types.GetWorkflowExecutionHistoryRequest{ + Domain: s.domainName, + Execution: &types.WorkflowExecution{ + WorkflowID: testWorkflowID, + }, + }) + s.NoError(err) + history := historyResponse.History + firstEvent := history.Events[0] + lastEvent := history.Events[len(history.Events)-1] + // First 7 event ids --> 0 (Workflow start), 1-3 ( Decision scheduled,started,completed) , 4-6 (Activity scheduled,started,completed) post which the tasks will be rate limited since RPS is 2 + eventBeforeRatelimited := history.Events[7] + + timeElapsedBeforeRatelimiting := time.Unix(0, common.Int64Default(eventBeforeRatelimited.Timestamp)).Sub(time.Unix(0, common.Int64Default(firstEvent.Timestamp))) + s.True(timeElapsedBeforeRatelimiting < 1*time.Second) + + totalTime := time.Unix(0, common.Int64Default(lastEvent.Timestamp)).Sub(time.Unix(0, common.Int64Default(firstEvent.Timestamp))) + s.True(totalTime > 3*time.Second) + +}