Skip to content

Commit

Permalink
Revert "Create TTL in the UpdateWorkflowExecution cycles. (cadence-wo…
Browse files Browse the repository at this point in the history
  • Loading branch information
agautam478 authored Aug 17, 2023
1 parent 75bc3f8 commit d8518bb
Show file tree
Hide file tree
Showing 17 changed files with 21 additions and 191 deletions.
13 changes: 0 additions & 13 deletions common/dynamicconfig/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -1867,13 +1867,6 @@ const (
EnableShardIDMetrics
// LastBoolKey must be the last one in this const group
LastBoolKey

// EnableExecutionTTL is which domains are allowed to have workflow executions with a TTL
// KeyName: system.enableExecutionTTL
// Value type: Bool
// Default value: false
// Allowed filters: DomainID
EnableExecutionTTL
)

const (
Expand Down Expand Up @@ -4030,12 +4023,6 @@ var BoolKeys = map[BoolKey]DynamicBool{
Description: "Enable shardId metrics in persistence client",
DefaultValue: true,
},
EnableExecutionTTL: DynamicBool{
KeyName: "system.enableExecutionTTL",
Filters: []Filter{DomainID},
Description: "EnableExecutionTTL is which domains are allowed to have workflow executions with a TTL",
DefaultValue: false,
},
}

var FloatKeys = map[FloatKey]DynamicFloat{
Expand Down
2 changes: 0 additions & 2 deletions common/persistence/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ type (
EnableCassandraAllConsistencyLevelDelete dynamicconfig.BoolPropertyFn
PersistenceSampleLoggingRate dynamicconfig.IntPropertyFn
EnableShardIDMetrics dynamicconfig.BoolPropertyFn
EnableExecutionTTL dynamicconfig.BoolPropertyFnWithDomainIDFilter
}
)

Expand All @@ -42,6 +41,5 @@ func NewDynamicConfiguration(dc *dynamicconfig.Collection) *DynamicConfiguration
EnableCassandraAllConsistencyLevelDelete: dc.GetBoolProperty(dynamicconfig.EnableCassandraAllConsistencyLevelDelete),
PersistenceSampleLoggingRate: dc.GetIntProperty(dynamicconfig.SampleLoggingRate),
EnableShardIDMetrics: dc.GetBoolProperty(dynamicconfig.EnableShardIDMetrics),
EnableExecutionTTL: dc.GetBoolPropertyFilteredByDomainID(dynamicconfig.EnableExecutionTTL),
}
}
5 changes: 2 additions & 3 deletions common/persistence/dataManagerInterfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -1024,9 +1024,8 @@ type (
ReplicationTasks []Task
TimerTasks []Task

Condition int64
TTLInSeconds int64
Checksum checksum.Checksum
Condition int64
Checksum checksum.Checksum
}

// WorkflowSnapshot is used as generic workflow execution state snapshot
Expand Down
4 changes: 2 additions & 2 deletions common/persistence/dataStoreInterfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -460,8 +460,8 @@ type (
CrossClusterTasks []Task
TimerTasks []Task
ReplicationTasks []Task
TTLInSeconds int64
Condition int64

Condition int64

Checksum checksum.Checksum
}
Expand Down
7 changes: 4 additions & 3 deletions common/persistence/executionManager.go
Original file line number Diff line number Diff line change
Expand Up @@ -329,6 +329,7 @@ func (m *executionManagerImpl) UpdateWorkflowExecution(
return nil, err
}
}

newRequest := &InternalUpdateWorkflowExecutionRequest{
RangeID: request.RangeID,

Expand Down Expand Up @@ -660,9 +661,9 @@ func (m *executionManagerImpl) SerializeWorkflowMutation(
CrossClusterTasks: input.CrossClusterTasks,
ReplicationTasks: input.ReplicationTasks,
TimerTasks: input.TimerTasks,
TTLInSeconds: input.TTLInSeconds,
Condition: input.Condition,
Checksum: input.Checksum,

Condition: input.Condition,
Checksum: input.Checksum,
}, nil
}

Expand Down
1 change: 1 addition & 0 deletions common/persistence/nosql/nosqlExecutionStore.go
Original file line number Diff line number Diff line change
Expand Up @@ -467,6 +467,7 @@ func (d *nosqlExecutionStore) ConflictResolveWorkflowExecution(
ShardID: d.shardID,
RangeID: request.RangeID,
}

err = d.db.UpdateWorkflowExecutionWithTasks(
ctx, currentWorkflowWriteReq,
mutateExecution, insertExecution, resetExecution,
Expand Down
8 changes: 2 additions & 6 deletions common/persistence/nosql/nosqlExecutionStoreUtil.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,10 +85,9 @@ func (d *nosqlExecutionStore) prepareResetWorkflowExecutionRequestWithMapsAndEve
versionHistories := resetWorkflow.VersionHistories
nowTimestamp := time.Now()

//TTLInSeconds is invalid in this case so passing a default value of 0.
executionRequest, err := d.prepareUpdateWorkflowExecutionTxn(
executionInfo, versionHistories, checkSum,
nowTimestamp, lastWriteVersion, 0,
nowTimestamp, lastWriteVersion,
)
if err != nil {
return nil, err
Expand Down Expand Up @@ -128,14 +127,13 @@ func (d *nosqlExecutionStore) prepareUpdateWorkflowExecutionRequestWithMapsAndEv
) (*nosqlplugin.WorkflowExecutionRequest, error) {
executionInfo := workflowMutation.ExecutionInfo
lastWriteVersion := workflowMutation.LastWriteVersion
ttlInSeconds := workflowMutation.TTLInSeconds
checkSum := workflowMutation.Checksum
versionHistories := workflowMutation.VersionHistories
nowTimestamp := time.Now()

executionRequest, err := d.prepareUpdateWorkflowExecutionTxn(
executionInfo, versionHistories, checkSum,
nowTimestamp, lastWriteVersion, ttlInSeconds,
nowTimestamp, lastWriteVersion,
)
if err != nil {
return nil, err
Expand Down Expand Up @@ -609,7 +607,6 @@ func (d *nosqlExecutionStore) prepareUpdateWorkflowExecutionTxn(
checksum checksum.Checksum,
nowTimestamp time.Time,
lastWriteVersion int64,
ttlInSeconds int64,
) (*nosqlplugin.WorkflowExecutionRequest, error) {
// validate workflow state & close status
if err := p.ValidateUpdateWorkflowStateCloseStatus(
Expand Down Expand Up @@ -639,7 +636,6 @@ func (d *nosqlExecutionStore) prepareUpdateWorkflowExecutionTxn(
VersionHistories: versionHistories,
Checksums: &checksum,
LastWriteVersion: lastWriteVersion,
TTLInSeconds: ttlInSeconds,
}, nil
}

Expand Down
14 changes: 2 additions & 12 deletions common/persistence/nosql/nosqlplugin/cassandra/workflowCql.go
Original file line number Diff line number Diff line change
Expand Up @@ -315,18 +315,8 @@ workflow_state = ? ` +
`WHERE shard_id = ? ` +
`and type = ?`

templateUpdateWorkflowExecutionWithVersionHistoriesQueryPart1 = `INSERT INTO executions (` +
`domain_id, ` +
`run_id, ` +
`shard_id, ` +
`task_id, ` +
`type, ` +
`visibility_ts, ` +
`workflow_id ` +
`) VALUES (?, ?, ?, ?, ?, ?, ?) USING TTL ?`

templateUpdateWorkflowExecutionWithVersionHistoriesQueryPart2 = `UPDATE executions ` +
`USING TTL ? SET execution = ` + templateWorkflowExecutionType +
templateUpdateWorkflowExecutionWithVersionHistoriesQuery = `UPDATE executions ` +
`SET execution = ` + templateWorkflowExecutionType +
`, next_event_id = ? ` +
`, version_histories = ? ` +
`, version_histories_encoding = ? ` +
Expand Down
24 changes: 4 additions & 20 deletions common/persistence/nosql/nosqlplugin/cassandra/workflowUtils.go
Original file line number Diff line number Diff line change
Expand Up @@ -1078,6 +1078,7 @@ func (db *cdb) resetWorkflowExecutionAndMapsAndEventBuffer(
if execution.MapsWriteMode != nosqlplugin.WorkflowExecutionMapsWriteModeReset {
return fmt.Errorf("should only support WorkflowExecutionMapsWriteModeReset")
}

err = db.resetActivityInfos(batch, shardID, domainID, workflowID, execution.RunID, execution.ActivityInfos)
if err != nil {
return err
Expand Down Expand Up @@ -1167,6 +1168,7 @@ func (db *cdb) updateWorkflowExecutionAndEventBufferWithMergeAndDeleteMaps(
return err
}
}

if execution.MapsWriteMode != nosqlplugin.WorkflowExecutionMapsWriteModeUpdate {
return fmt.Errorf("should only support WorkflowExecutionMapsWriteModeUpdate")
}
Expand Down Expand Up @@ -1203,26 +1205,8 @@ func (db *cdb) updateWorkflowExecution(
) error {
execution.StartTimestamp = db.convertToCassandraTimestamp(execution.StartTimestamp)
execution.LastUpdatedTimestamp = db.convertToCassandraTimestamp(execution.LastUpdatedTimestamp)
//default TTL Value. 0 TTL means no ttl is set, hence your records will persist forever unless explicitly deleted.
ttlInSeconds := 0
//Only fires when the workflow is closing.
if execution.State == persistence.WorkflowStateCompleted || execution.State == persistence.WorkflowStateCorrupted {
if db.dc.EnableExecutionTTL(domainID) {
ttlInSeconds = int(execution.TTLInSeconds)
}
batch.Query(templateUpdateWorkflowExecutionWithVersionHistoriesQueryPart1,
domainID,
execution.RunID,
shardID,
rowTypeExecutionTaskID,
rowTypeExecution,
defaultVisibilityTimestamp,
workflowID,
ttlInSeconds,
)
}
batch.Query(templateUpdateWorkflowExecutionWithVersionHistoriesQueryPart2,
ttlInSeconds,

batch.Query(templateUpdateWorkflowExecutionWithVersionHistoriesQuery,
domainID,
workflowID,
execution.RunID,
Expand Down
1 change: 0 additions & 1 deletion common/persistence/nosql/nosqlplugin/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@ type (
VersionHistories *persistence.DataBlob
Checksums *checksum.Checksum
LastWriteVersion int64
TTLInSeconds int64
// condition checking for updating execution info
PreviousNextEventIDCondition *int64

Expand Down
86 changes: 0 additions & 86 deletions common/persistence/persistence-tests/executionManagerTest.go
Original file line number Diff line number Diff line change
Expand Up @@ -609,92 +609,6 @@ func (s *ExecutionManagerSuite) TestUpdateWorkflowExecutionStateCloseStatus() {
}
}

// TestUpdateWorkflowExecutionTTL test
func (s *ExecutionManagerSuite) TestUpdateWorkflowExecutionTTL() {
//TODO: Update the test when the TTLbuffer becomes configurable.
ctx, cancel := context.WithTimeout(context.Background(), testContextTimeout)
defer cancel()
if s.ExecutionManager.GetName() != "cassandra" {
// TTL API is only supported in cassandra"
return
}

domainID := uuid.New()
workflowID := "update-workflow-test-with-ttl"
workflowExecution := types.WorkflowExecution{
WorkflowID: workflowID,
RunID: uuid.New(),
}
tasklist := "some random tasklist"
workflowType := "some random workflow type"
workflowTimeout := int32(10)
decisionTimeout := int32(14)
lastProcessedEventID := int64(0)
nextEventID := int64(3)
csum := s.newRandomChecksum()
versionHistory := p.NewVersionHistory([]byte{}, []*p.VersionHistoryItem{
{
EventID: nextEventID,
Version: common.EmptyVersion,
},
})
versionHistories := p.NewVersionHistories(versionHistory)

// create and update a workflow to make it completed
req := &p.CreateWorkflowExecutionRequest{
NewWorkflowSnapshot: p.WorkflowSnapshot{
ExecutionInfo: &p.WorkflowExecutionInfo{
CreateRequestID: uuid.New(),
DomainID: domainID,
WorkflowID: workflowExecution.GetWorkflowID(),
RunID: workflowExecution.GetRunID(),
FirstExecutionRunID: workflowExecution.GetRunID(),
TaskList: tasklist,
WorkflowTypeName: workflowType,
WorkflowTimeout: workflowTimeout,
DecisionStartToCloseTimeout: decisionTimeout,
NextEventID: nextEventID,
LastProcessedEvent: lastProcessedEventID,
State: p.WorkflowStateRunning,
CloseStatus: p.WorkflowCloseStatusNone,
},
ExecutionStats: &p.ExecutionStats{},
Checksum: csum,
VersionHistories: versionHistories,
},
RangeID: s.ShardInfo.RangeID,
Mode: p.CreateWorkflowModeBrandNew,
}
_, err := s.ExecutionManager.CreateWorkflowExecution(ctx, req)
s.Nil(err)
currentRunID, err := s.GetCurrentWorkflowRunID(ctx, domainID, workflowID)
s.Nil(err)
s.Equal(workflowExecution.GetRunID(), currentRunID)

info, err := s.GetWorkflowExecutionInfo(ctx, domainID, workflowExecution)
s.Nil(err)

updatedInfo := copyWorkflowExecutionInfo(info.ExecutionInfo)
updateStats := copyExecutionStats(info.ExecutionStats)
updatedInfo.State = p.WorkflowStateCompleted
updatedInfo.CloseStatus = p.WorkflowCloseStatusCompleted
_, err = s.ExecutionManager.UpdateWorkflowExecution(ctx, &p.UpdateWorkflowExecutionRequest{
UpdateWorkflowMutation: p.WorkflowMutation{
ExecutionInfo: updatedInfo,
ExecutionStats: updateStats,
Condition: nextEventID,
TTLInSeconds: 1,
VersionHistories: versionHistories,
},
RangeID: s.ShardInfo.RangeID,
Mode: p.UpdateWorkflowModeUpdateCurrent,
})
s.NoError(err)
time.Sleep(2 * time.Second)
info, err = s.GetWorkflowExecutionInfo(ctx, domainID, workflowExecution)
s.IsType(&types.EntityNotExistsError{}, err)
}

// TestUpdateWorkflowExecutionWithZombieState test
func (s *ExecutionManagerSuite) TestUpdateWorkflowExecutionWithZombieState() {
ctx, cancel := context.WithTimeout(context.Background(), testContextTimeout)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,6 @@ func NewTestBaseWithNoSQL(options *TestBaseOptions) TestBase {
EnableCassandraAllConsistencyLevelDelete: dynamicconfig.GetBoolPropertyFn(true),
PersistenceSampleLoggingRate: dynamicconfig.GetIntPropertyFn(100),
EnableShardIDMetrics: dynamicconfig.GetBoolPropertyFn(true),
EnableExecutionTTL: dynamicconfig.GetBoolPropertyFnFilteredByDomainID(true),
}
params := TestBaseParams{
DefaultTestCluster: testCluster,
Expand Down
3 changes: 0 additions & 3 deletions config/dynamicconfig/development.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,6 @@ system.minRetentionDays:
history.EnableConsistentQueryByDomain:
- value: true
constraints: {}
system.enableExecutionTTL:
- value: true
constraints: {}
history.enableCrossClusterOperations:
- value: true
constraints: {}
Expand Down
3 changes: 1 addition & 2 deletions service/history/execution/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,6 @@ import (

const (
defaultRemoteCallTimeout = 30 * time.Second
ttlBufferDays = 15
dayToSecondMultiplier = 86400
)

type conflictError struct {
Expand Down Expand Up @@ -1216,6 +1214,7 @@ func (c *contextImpl) updateWorkflowExecutionWithRetry(
resp, err = c.shard.UpdateWorkflowExecution(ctx, request)
return err
}

isRetryable := func(err error) bool {
if _, ok := err.(*persistence.TimeoutError); ok {
// timeout error is not retryable for update workflow execution
Expand Down
34 changes: 2 additions & 32 deletions service/history/execution/mutable_state_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -4066,11 +4066,6 @@ func (e *mutableStateBuilder) CloseTransactionAsMutation(
// impact the checksum calculation
checksum := e.generateChecksum()

TTLInSeconds, err := e.calculateTTL()
if err != nil {
e.logError("TTL calculation failed")
}

workflowMutation := &persistence.WorkflowMutation{
ExecutionInfo: e.executionInfo,
VersionHistories: e.versionHistories,
Expand All @@ -4095,9 +4090,8 @@ func (e *mutableStateBuilder) CloseTransactionAsMutation(
ReplicationTasks: e.insertReplicationTasks,
TimerTasks: e.insertTimerTasks,

Condition: e.nextEventIDInDB,
Checksum: checksum,
TTLInSeconds: int64(TTLInSeconds),
Condition: e.nextEventIDInDB,
Checksum: checksum,
}

e.checksum = checksum
Expand Down Expand Up @@ -4813,27 +4807,3 @@ func (e *mutableStateBuilder) logDataInconsistency() {
tag.WorkflowRunID(runID),
)
}
func (e *mutableStateBuilder) calculateTTL() (int, error) {
domainID := e.executionInfo.DomainID
//Calculating the TTL for workflow Execution.

domainObj, err := e.shard.GetDomainCache().GetDomainByID(domainID)
if err != nil {
return 0, err
}
config := domainObj.GetConfig()
retention := time.Duration(config.Retention)
daysInSeconds := int((retention + ttlBufferDays) * dayToSecondMultiplier)
//Default state of TTL, means there is no TTL attached.
TTLInSeconds := 0
startTime := e.executionInfo.StartTimestamp
//Handles Cron and Delaystart. For Cron workflows the StartTimestamp does not show up until the wf has started.
//default value os TTL ie. 0 will be passed down in this case. The TTL is calculated only if the startTime is non zero.
if !time.Time.IsZero(startTime) {
CalculateTTLInSeconds := int(e.executionInfo.WorkflowTimeout) - int(time.Now().Sub(startTime).Seconds()) + daysInSeconds
if CalculateTTLInSeconds >= 0 {
return CalculateTTLInSeconds, nil
}
}
return TTLInSeconds, nil
}
1 change: 0 additions & 1 deletion service/history/execution/mutable_state_builder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,6 @@ func (s *mutableStateSuite) SetupTest() {
s.logger = s.mockShard.GetLogger()

s.mockShard.Resource.DomainCache.EXPECT().GetDomainID(constants.TestDomainName).Return(constants.TestDomainID, nil).AnyTimes()
s.mockShard.Resource.DomainCache.EXPECT().GetDomainByID(constants.TestDomainID).Return(constants.TestLocalDomainEntry, nil).AnyTimes()

s.msBuilder = newMutableStateBuilder(s.mockShard, s.logger, constants.TestLocalDomainEntry)
}
Expand Down
Loading

0 comments on commit d8518bb

Please sign in to comment.