Skip to content

Commit

Permalink
Fix cron calculation bug with @every expression (cadence-workflow#2253)
Browse files Browse the repository at this point in the history
* Fix cron calculation bug with @every expression
  • Loading branch information
yux0 authored Jul 23, 2019
1 parent 8766b37 commit a5f84bc
Show file tree
Hide file tree
Showing 11 changed files with 96 additions and 67 deletions.
21 changes: 13 additions & 8 deletions common/backoff/cron.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright (c) 2017 Uber Technologies, Inc.
// Copyright (c) 2019 Uber Technologies, Inc.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
Expand Down Expand Up @@ -43,8 +43,8 @@ func ValidateSchedule(cronSchedule string) error {
}

// GetBackoffForNextSchedule calculates the backoff time for the next run given
// a cronSchedule and current time
func GetBackoffForNextSchedule(cronSchedule string, nowTime time.Time) time.Duration {
// a cronSchedule, workflow start time and workflow close time
func GetBackoffForNextSchedule(cronSchedule string, startTime time.Time, closeTime time.Time) time.Duration {
if len(cronSchedule) == 0 {
return NoBackoff
}
Expand All @@ -53,17 +53,22 @@ func GetBackoffForNextSchedule(cronSchedule string, nowTime time.Time) time.Dura
if err != nil {
return NoBackoff
}

nowTime = nowTime.In(time.UTC)
backoffInterval := schedule.Next(nowTime).Sub(nowTime)
startUTCTime := startTime.In(time.UTC)
closeUTCTime := closeTime.In(time.UTC)
nextScheduleTime := schedule.Next(startUTCTime)
// Calculate the next schedule start time which is nearest to the close time
for nextScheduleTime.Before(closeUTCTime) {
nextScheduleTime = schedule.Next(nextScheduleTime)
}
backoffInterval := nextScheduleTime.Sub(closeUTCTime)
roundedInterval := time.Second * time.Duration(math.Ceil(backoffInterval.Seconds()))
return roundedInterval
}

// GetBackoffForNextScheduleInSeconds calculates the backoff time in seconds for the
// next run given a cronSchedule and current time
func GetBackoffForNextScheduleInSeconds(cronSchedule string, nowTime time.Time) int32 {
backoffDuration := GetBackoffForNextSchedule(cronSchedule, nowTime)
func GetBackoffForNextScheduleInSeconds(cronSchedule string, startTime time.Time, closeTime time.Time) int32 {
backoffDuration := GetBackoffForNextSchedule(cronSchedule, startTime, closeTime)
if backoffDuration == NoBackoff {
return 0
}
Expand Down
64 changes: 33 additions & 31 deletions common/backoff/cron_test.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright (c) 2017 Uber Technologies, Inc.
// Copyright (c) 2019 Uber Technologies, Inc.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
Expand All @@ -21,41 +21,43 @@
package backoff

import (
"strconv"
"testing"
"time"

"github.com/stretchr/testify/assert"
)

func Test_NextCronSchedule(t *testing.T) {
a := assert.New(t)

// every day cron
now, _ := time.Parse(time.RFC3339, "2018-12-17T08:00:00-08:00") // UTC: 2018-12-17 16:00:00 +0000 UTC
cronSpec := "0 10 * * *"
backoff := GetBackoffForNextSchedule(cronSpec, now)
a.Equal(time.Hour*18, backoff)
backoff = GetBackoffForNextSchedule(cronSpec, now.Add(backoff))
a.Equal(time.Hour*24, backoff)

// every hour cron
now, _ = time.Parse(time.RFC3339, "2018-12-17T08:08:00+00:00")
cronSpec = "0 * * * *"
backoff = GetBackoffForNextSchedule(cronSpec, now)
a.Equal(time.Minute*52, backoff)
backoff = GetBackoffForNextSchedule(cronSpec, now.Add(backoff))
a.Equal(time.Hour, backoff)

// every minute cron
now, _ = time.Parse(time.RFC3339, "2018-12-17T08:08:18+00:00")
cronSpec = "* * * * *"
backoff = GetBackoffForNextSchedule(cronSpec, now)
a.Equal(time.Second*42, backoff)
backoff = GetBackoffForNextSchedule(cronSpec, now.Add(backoff))
a.Equal(time.Minute, backoff)
var crontests = []struct {
cron string
startTime string
endTime string
result time.Duration
}{
{"0 10 * * *", "2018-12-17T08:00:00-08:00", "", time.Hour * 18},
{"0 10 * * *", "2018-12-18T02:00:00-08:00", "", time.Hour * 24},
{"0 * * * *", "2018-12-17T08:08:00+00:00", "", time.Minute * 52},
{"0 * * * *", "2018-12-17T09:00:00+00:00", "", time.Hour},
{"* * * * *", "2018-12-17T08:08:18+00:00", "", time.Second * 42},
{"0 * * * *", "2018-12-17T09:00:00+00:00", "", time.Minute * 60},
{"0 10 * * *", "2018-12-17T08:00:00+00:00", "2018-12-20T00:00:00+00:00", time.Hour * 10},
{"0 10 * * *", "2018-12-17T08:00:00+00:00", "2018-12-17T09:00:00+00:00", time.Hour},
{"*/10 * * * *", "2018-12-17T00:04:00+00:00", "2018-12-17T01:02:00+00:00", time.Minute * 8},
{"invalid-cron-spec", "2018-12-17T00:04:00+00:00", "2018-12-17T01:02:00+00:00", NoBackoff},
{"@every 5h", "2018-12-17T08:00:00+00:00", "2018-12-17T09:00:00+00:00", time.Hour * 4},
{"@every 5h", "2018-12-17T08:00:00+00:00", "2018-12-18T00:00:00+00:00", time.Hour * 4},
}

// invalid cron spec
cronSpec = "invalid-cron-spec"
backoff = GetBackoffForNextSchedule(cronSpec, now)
a.Equal(NoBackoff, backoff)
func TestCron(t *testing.T) {
for idx, tt := range crontests {
t.Run(strconv.Itoa(idx), func(t *testing.T) {
start, _ := time.Parse(time.RFC3339, tt.startTime)
end := start
if tt.endTime != "" {
end, _ = time.Parse(time.RFC3339, tt.endTime)
}
backoff := GetBackoffForNextSchedule(tt.cron, start, end)
assert.Equal(t, tt.result, backoff, "The cron spec is %s and the expected result is %s", tt.cron, tt.result)
})
}
}
2 changes: 1 addition & 1 deletion common/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -388,7 +388,7 @@ func CreateHistoryStartWorkflowRequest(domainID string, startRequest *workflow.S
deadline := time.Now().Add(time.Second * time.Duration(expirationInSeconds))
histRequest.ExpirationTimestamp = Int64Ptr(deadline.Round(time.Millisecond).UnixNano())
}
histRequest.FirstDecisionTaskBackoffSeconds = Int32Ptr(backoff.GetBackoffForNextScheduleInSeconds(startRequest.GetCronSchedule(), time.Now()))
histRequest.FirstDecisionTaskBackoffSeconds = Int32Ptr(backoff.GetBackoffForNextScheduleInSeconds(startRequest.GetCronSchedule(), time.Now(), time.Now()))
return histRequest
}

Expand Down
4 changes: 4 additions & 0 deletions host/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -685,6 +685,10 @@ func (s *integrationSuite) TestWorkflowRetry() {
backoff := time.Duration(0)
if i > 0 {
backoff = time.Duration(float64(initialIntervalInSeconds)*math.Pow(backoffCoefficient, float64(i-1))) * time.Second
// retry backoff cannot larger than MaximumIntervalInSeconds
if backoff > time.Second {
backoff = time.Second
}
}
expectedExecutionTime := dweResponse.WorkflowExecutionInfo.GetStartTime() + backoff.Nanoseconds()
s.Equal(expectedExecutionTime, dweResponse.WorkflowExecutionInfo.GetExecutionTime())
Expand Down
9 changes: 3 additions & 6 deletions service/history/historyEngine.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ import (
"encoding/json"
"errors"
"fmt"
"math"
"time"

"github.com/pborman/uuid"
Expand All @@ -35,7 +34,6 @@ import (
"github.com/uber/cadence/client/matching"
"github.com/uber/cadence/common"
"github.com/uber/cadence/common/archiver/provider"
"github.com/uber/cadence/common/backoff"
"github.com/uber/cadence/common/cache"
"github.com/uber/cadence/common/clock"
"github.com/uber/cadence/common/cluster"
Expand Down Expand Up @@ -748,10 +746,9 @@ func (e *historyEngineImpl) DescribeWorkflowExecution(
// For now execution time will be calculated based on start time and cron schedule/retry policy
// each time DescribeWorkflowExecution is called.
backoffDuration := time.Duration(0)
if executionInfo.HasRetryPolicy && (executionInfo.Attempt > 0) {
backoffDuration = time.Duration(float64(executionInfo.InitialInterval)*math.Pow(executionInfo.BackoffCoefficient, float64(executionInfo.Attempt-1))) * time.Second
} else if len(executionInfo.CronSchedule) != 0 {
backoffDuration = backoff.GetBackoffForNextSchedule(executionInfo.CronSchedule, executionInfo.StartTimestamp)
startEvent, ok := msBuilder.GetStartEvent()
if ok {
backoffDuration = time.Duration(startEvent.GetWorkflowExecutionStartedEventAttributes().GetFirstDecisionTaskBackoffSeconds()) * time.Second
}
result.WorkflowExecutionInfo.ExecutionTime = common.Int64Ptr(result.WorkflowExecutionInfo.GetStartTime() + backoffDuration.Nanoseconds())

Expand Down
2 changes: 1 addition & 1 deletion service/history/mutableStateBuilder.go
Original file line number Diff line number Diff line change
Expand Up @@ -766,7 +766,7 @@ func (e *mutableStateBuilder) GetCronBackoffDuration() time.Duration {
if len(info.CronSchedule) == 0 {
return backoff.NoBackoff
}
return backoff.GetBackoffForNextSchedule(info.CronSchedule, e.timeSource.Now())
return backoff.GetBackoffForNextSchedule(info.CronSchedule, e.executionInfo.StartTimestamp, e.timeSource.Now())
}

// GetSignalInfo get details about a signal request that is currently in progress.
Expand Down
19 changes: 13 additions & 6 deletions service/history/stateBuilder.go
Original file line number Diff line number Diff line change
Expand Up @@ -656,14 +656,21 @@ func (b *stateBuilderImpl) scheduleWorkflowTimerTask(event *shared.HistoryEvent,
timerTasks := []persistence.Task{}
now := time.Unix(0, event.GetTimestamp())
timeout := now.Add(time.Duration(msBuilder.GetExecutionInfo().WorkflowTimeout) * time.Second)
backoffDuration := backoff.NoBackoff
startWorkflowAttribute := event.GetWorkflowExecutionStartedEventAttributes()
if startWorkflowAttribute.GetFirstDecisionTaskBackoffSeconds() > 0 {
backoffDuration = time.Duration(event.GetWorkflowExecutionStartedEventAttributes().GetFirstDecisionTaskBackoffSeconds()) * time.Second
}

cronSchedule := b.msBuilder.GetExecutionInfo().CronSchedule
cronBackoffDuration := backoff.GetBackoffForNextSchedule(cronSchedule, now)
if cronBackoffDuration != backoff.NoBackoff {
timeout = timeout.Add(cronBackoffDuration)
if backoffDuration != backoff.NoBackoff {
timeout = timeout.Add(backoffDuration)
timeoutType := persistence.WorkflowBackoffTimeoutTypeRetry
if startWorkflowAttribute.GetInitiator().Equals(shared.ContinueAsNewInitiatorCronSchedule) {
timeoutType = persistence.WorkflowBackoffTimeoutTypeCron
}
timerTasks = append(timerTasks, &persistence.WorkflowBackoffTimerTask{
VisibilityTimestamp: now.Add(cronBackoffDuration),
TimeoutType: persistence.WorkflowBackoffTimeoutTypeCron,
VisibilityTimestamp: now.Add(backoffDuration),
TimeoutType: timeoutType,
})
}

Expand Down
26 changes: 16 additions & 10 deletions service/history/stateBuilder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,16 +172,23 @@ func (s *stateBuilderSuite) applyWorkflowExecutionStartedEventTest(cronSchedule

now := time.Now()
evenType := shared.EventTypeWorkflowExecutionStarted
startWorkflowAttribute := &shared.WorkflowExecutionStartedEventAttributes{
ParentWorkflowDomain: common.StringPtr(parentName),
}

if len(cronSchedule) > 0 {
startWorkflowAttribute.Initiator = shared.ContinueAsNewInitiatorCronSchedule.Ptr()
startWorkflowAttribute.FirstDecisionTaskBackoffSeconds = common.Int32Ptr(int32(backoff.GetBackoffForNextSchedule(cronSchedule, now, now).Seconds()))
}
event := &shared.HistoryEvent{
Version: common.Int64Ptr(version),
EventId: common.Int64Ptr(1),
Timestamp: common.Int64Ptr(now.UnixNano()),
EventType: &evenType,
WorkflowExecutionStartedEventAttributes: &shared.WorkflowExecutionStartedEventAttributes{
ParentWorkflowDomain: common.StringPtr(parentName),
},
Version: common.Int64Ptr(version),
EventId: common.Int64Ptr(1),
Timestamp: common.Int64Ptr(now.UnixNano()),
EventType: &evenType,
WorkflowExecutionStartedEventAttributes: startWorkflowAttribute,
}

s.mockMutableState.On("GetStartEvent").Return(event, true)
s.mockMetadataMgr.On("GetDomain", &persistence.GetDomainRequest{ID: domainID}).Return(
&persistence.GetDomainResponse{
Info: &persistence.DomainInfo{ID: domainID},
Expand Down Expand Up @@ -221,7 +228,7 @@ func (s *stateBuilderSuite) applyWorkflowExecutionStartedEventTest(cronSchedule

expectedTimerTasksLength := 1
timeout := now.Add(time.Duration(executionInfo.WorkflowTimeout) * time.Second)
backoffDuration := backoff.GetBackoffForNextSchedule(cronSchedule, now)
backoffDuration := backoff.GetBackoffForNextSchedule(cronSchedule, now, now)
if backoffDuration != backoff.NoBackoff {
expectedTimerTasksLength = 2
timeout = timeout.Add(backoffDuration)
Expand Down Expand Up @@ -511,7 +518,6 @@ func (s *stateBuilderSuite) TestApplyEvents_EventTypeWorkflowExecutionContinuedA
Attempt: common.Int64Ptr(newRunDecisionAttempt),
},
}

s.mockMetadataMgr.On("GetDomain", &persistence.GetDomainRequest{ID: domainID}).Return(
&persistence.GetDomainResponse{
Info: &persistence.DomainInfo{ID: domainID, Name: domainName},
Expand Down Expand Up @@ -895,7 +901,7 @@ func (s *stateBuilderSuite) TestApplyEvents_EventTypeWorkflowExecutionContinuedA

newRunHistory := &shared.History{Events: []*shared.HistoryEvent{newRunStartedEvent, newRunSignalEvent, newRunDecisionEvent}}
s.mockMutableState.On("ClearStickyness").Once()
s.mockEventsCache.On("putEvent", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return().Times(2)
s.mockEventsCache.On("putEvent", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return().Twice()
_, _, newRunStateBuilder, err := s.stateBuilder.applyEvents(domainID, requestID, execution, s.toHistory(continueAsNewEvent), newRunHistory.Events,
0, persistence.EventStoreVersionV2)
s.Nil(err)
Expand Down
12 changes: 10 additions & 2 deletions service/history/timerQueueProcessor2_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,8 +128,15 @@ func (s *timerQueueProcessor2Suite) SetupTest() {

domainCache := cache.NewDomainCache(s.mockMetadataMgr, s.mockClusterMetadata, metricsClient, s.logger)
s.mockShard = &shardContextImpl{
service: s.mockService,
shardInfo: &persistence.ShardInfo{ShardID: shardID, RangeID: 1, TransferAckLevel: 0},
service: s.mockService,
shardInfo: &persistence.ShardInfo{
ShardID: shardID,
RangeID: 1,
TransferAckLevel: 0,
ClusterTransferAckLevel: make(map[string]int64),
ClusterTimerAckLevel: make(map[string]time.Time),
TransferFailoverLevels: make(map[string]persistence.TransferFailoverLevel),
TimerFailoverLevels: make(map[string]persistence.TimerFailoverLevel)},
transferSequenceNumber: 1,
executionManager: s.mockExecutionMgr,
shardManager: s.mockShardManager,
Expand Down Expand Up @@ -361,6 +368,7 @@ func (s *timerQueueProcessor2Suite) TestWorkflowTimeout_Cron() {
schedule := "@every 30s"

builder := newMutableStateBuilderWithEventV2(s.mockShard, s.mockEventsCache, s.logger, we.GetRunId())
s.mockShardManager.On("UpdateShard", mock.Anything).Return(nil).Maybe()
s.mockEventsCache.On("putEvent", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything,
mock.Anything).Return().Once()
startRequest := &workflow.StartWorkflowExecutionRequest{
Expand Down
2 changes: 1 addition & 1 deletion service/history/transferQueueActiveProcessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -837,7 +837,7 @@ func (t *transferQueueActiveProcessorImpl) processStartChildExecution(
},
InitiatedId: common.Int64Ptr(initiatedEventID),
},
FirstDecisionTaskBackoffSeconds: common.Int32Ptr(backoff.GetBackoffForNextScheduleInSeconds(attributes.GetCronSchedule(), t.timeSource.Now())),
FirstDecisionTaskBackoffSeconds: common.Int32Ptr(backoff.GetBackoffForNextScheduleInSeconds(attributes.GetCronSchedule(), t.timeSource.Now(), t.timeSource.Now())),
}

var startResponse *workflow.StartWorkflowExecutionResponse
Expand Down
2 changes: 1 addition & 1 deletion service/history/transferQueueActiveProcessor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1764,7 +1764,7 @@ func (s *transferQueueActiveProcessorSuite) createChildWorkflowExecutionRequest(
Execution: &execution,
InitiatedId: common.Int64Ptr(task.ScheduleID),
},
FirstDecisionTaskBackoffSeconds: common.Int32Ptr(backoff.GetBackoffForNextScheduleInSeconds(attributes.GetCronSchedule(), time.Now())),
FirstDecisionTaskBackoffSeconds: common.Int32Ptr(backoff.GetBackoffForNextScheduleInSeconds(attributes.GetCronSchedule(), time.Now(), time.Now())),
}
}

Expand Down

0 comments on commit a5f84bc

Please sign in to comment.