Skip to content

Commit

Permalink
Adding support for long retention to sampled workflows (cadence-workf…
Browse files Browse the repository at this point in the history
…low#1141)

* Adding SampleRetention to domain config

* comments
  • Loading branch information
yiminc authored Sep 25, 2018
1 parent 5955efa commit a0cd3f5
Show file tree
Hide file tree
Showing 7 changed files with 86 additions and 18 deletions.
37 changes: 37 additions & 0 deletions common/cache/domainCache.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,9 @@
package cache

import (
"hash/fnv"
"sort"
"strconv"
"sync"
"sync/atomic"
"time"
Expand Down Expand Up @@ -606,3 +608,38 @@ func (t DomainCacheEntries) Less(i, j int) bool {
func CreateDomainCacheEntry(domainName string) *DomainCacheEntry {
return &DomainCacheEntry{info: &persistence.DomainInfo{Name: domainName}}
}

// SampleRetentionKey is key to specify sample retention
var SampleRetentionKey = "sample_retention_days"

// SampleRateKey is key to specify sample rate
var SampleRateKey = "sample_retention_rate"

// GetRetentionDays returns retention in days for given workflow
func (entry *DomainCacheEntry) GetRetentionDays(workflowID string) int32 {
if sampledRetentionValue, ok := entry.info.Data[SampleRetentionKey]; ok {
sampledRetentionDays, err := strconv.Atoi(sampledRetentionValue)
if err != nil || sampledRetentionDays < int(entry.config.Retention) {
return entry.config.Retention
}

if sampledRateValue, ok := entry.info.Data[SampleRateKey]; ok {
sampledRate, err := strconv.ParseFloat(sampledRateValue, 64)
if err != nil {
return entry.config.Retention
}

h := fnv.New32a()
h.Write([]byte(workflowID))
hash := h.Sum32()

r := float64(hash%1000) / float64(1000) // use 1000 so we support one decimal rate like 1.5%.
if r < sampledRate {
// sampled
return int32(sampledRetentionDays)
}
}
}

return entry.config.Retention
}
31 changes: 31 additions & 0 deletions common/cache/domainCache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"github.com/uber-go/tally"

log "github.com/sirupsen/logrus"
"github.com/stretchr/testify/require"
"github.com/stretchr/testify/suite"
"github.com/uber-common/bark"
"github.com/uber/cadence/common/cluster"
Expand Down Expand Up @@ -571,3 +572,33 @@ func (s *domainCacheSuite) buildEntryFromRecord(record *persistence.GetDomainRes
newEntry.notificationVersion = record.NotificationVersion
return newEntry
}

func Test_GetRetentionDays(t *testing.T) {
d := &DomainCacheEntry{
info: &persistence.DomainInfo{
Data: make(map[string]string),
},
config: &persistence.DomainConfig{
Retention: 7,
},
}
d.info.Data[SampleRetentionKey] = "30"
d.info.Data[SampleRateKey] = "0"

wid := uuid.New()
rd := d.GetRetentionDays(wid)
require.Equal(t, int32(7), rd)

d.info.Data[SampleRateKey] = "1"
rd = d.GetRetentionDays(wid)
require.Equal(t, int32(30), rd)

d.info.Data[SampleRetentionKey] = "invalid-value"
rd = d.GetRetentionDays(wid)
require.Equal(t, int32(7), rd) // fallback to normal retention

d.info.Data[SampleRetentionKey] = "30"
d.info.Data[SampleRateKey] = "invalid-value"
rd = d.GetRetentionDays(wid)
require.Equal(t, int32(7), rd) // fallback to normal retention
}
12 changes: 6 additions & 6 deletions service/history/historyEngine.go
Original file line number Diff line number Diff line change
Expand Up @@ -1450,7 +1450,7 @@ Update_History_Loop:
}

if isComplete {
tranT, timerT, err := e.getDeleteWorkflowTasks(domainID, tBuilder)
tranT, timerT, err := e.getDeleteWorkflowTasks(domainID, workflowExecution.GetWorkflowId(), tBuilder)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -2352,7 +2352,7 @@ Update_History_Loop:

transferTasks, timerTasks := postActions.transferTasks, postActions.timerTasks
if postActions.deleteWorkflow {
tranT, timerT, err := e.getDeleteWorkflowTasks(domainID, tBuilder)
tranT, timerT, err := e.getDeleteWorkflowTasks(domainID, execution.GetWorkflowId(), tBuilder)
if err != nil {
return err
}
Expand Down Expand Up @@ -2417,14 +2417,14 @@ func (e *historyEngineImpl) updateWorkflowExecution(ctx context.Context, domainI
}

func (e *historyEngineImpl) getDeleteWorkflowTasks(
domainID string,
domainID, workflowID string,
tBuilder *timerBuilder,
) (persistence.Task, persistence.Task, error) {
return getDeleteWorkflowTasksFromShard(e.shard, domainID, tBuilder)
return getDeleteWorkflowTasksFromShard(e.shard, domainID, workflowID, tBuilder)
}

func getDeleteWorkflowTasksFromShard(shard ShardContext,
domainID string,
domainID, workflowID string,
tBuilder *timerBuilder,
) (persistence.Task, persistence.Task, error) {
// Create a transfer task to close workflow execution
Expand All @@ -2438,7 +2438,7 @@ func getDeleteWorkflowTasksFromShard(shard ShardContext,
return nil, nil, err
}
} else {
retentionInDays = domainEntry.GetConfig().Retention
retentionInDays = domainEntry.GetRetentionDays(workflowID)
}
cleanupTask := tBuilder.createDeleteHistoryEventTimerTask(time.Duration(retentionInDays) * time.Hour * 24)

Expand Down
16 changes: 8 additions & 8 deletions service/history/stateBuilder.go
Original file line number Diff line number Diff line change
Expand Up @@ -315,7 +315,7 @@ func (b *stateBuilderImpl) applyEvents(domainID, requestID string, execution sha
case shared.EventTypeWorkflowExecutionCompleted:
b.msBuilder.ReplicateWorkflowExecutionCompletedEvent(event)
b.transferTasks = append(b.transferTasks, b.scheduleDeleteHistoryTransferTask())
timerTask, err := b.scheduleDeleteHistoryTimerTask(event, domainID)
timerTask, err := b.scheduleDeleteHistoryTimerTask(event, domainID, execution.GetWorkflowId())
if err != nil {
return nil, nil, nil, err
}
Expand All @@ -324,7 +324,7 @@ func (b *stateBuilderImpl) applyEvents(domainID, requestID string, execution sha
case shared.EventTypeWorkflowExecutionFailed:
b.msBuilder.ReplicateWorkflowExecutionFailedEvent(event)
b.transferTasks = append(b.transferTasks, b.scheduleDeleteHistoryTransferTask())
timerTask, err := b.scheduleDeleteHistoryTimerTask(event, domainID)
timerTask, err := b.scheduleDeleteHistoryTimerTask(event, domainID, execution.GetWorkflowId())
if err != nil {
return nil, nil, nil, err
}
Expand All @@ -333,7 +333,7 @@ func (b *stateBuilderImpl) applyEvents(domainID, requestID string, execution sha
case shared.EventTypeWorkflowExecutionTimedOut:
b.msBuilder.ReplicateWorkflowExecutionTimedoutEvent(event)
b.transferTasks = append(b.transferTasks, b.scheduleDeleteHistoryTransferTask())
timerTask, err := b.scheduleDeleteHistoryTimerTask(event, domainID)
timerTask, err := b.scheduleDeleteHistoryTimerTask(event, domainID, execution.GetWorkflowId())
if err != nil {
return nil, nil, nil, err
}
Expand All @@ -342,7 +342,7 @@ func (b *stateBuilderImpl) applyEvents(domainID, requestID string, execution sha
case shared.EventTypeWorkflowExecutionCanceled:
b.msBuilder.ReplicateWorkflowExecutionCanceledEvent(event)
b.transferTasks = append(b.transferTasks, b.scheduleDeleteHistoryTransferTask())
timerTask, err := b.scheduleDeleteHistoryTimerTask(event, domainID)
timerTask, err := b.scheduleDeleteHistoryTimerTask(event, domainID, execution.GetWorkflowId())
if err != nil {
return nil, nil, nil, err
}
Expand All @@ -351,7 +351,7 @@ func (b *stateBuilderImpl) applyEvents(domainID, requestID string, execution sha
case shared.EventTypeWorkflowExecutionTerminated:
b.msBuilder.ReplicateWorkflowExecutionTerminatedEvent(event)
b.transferTasks = append(b.transferTasks, b.scheduleDeleteHistoryTransferTask())
timerTask, err := b.scheduleDeleteHistoryTimerTask(event, domainID)
timerTask, err := b.scheduleDeleteHistoryTimerTask(event, domainID, execution.GetWorkflowId())
if err != nil {
return nil, nil, nil, err
}
Expand Down Expand Up @@ -423,7 +423,7 @@ func (b *stateBuilderImpl) applyEvents(domainID, requestID string, execution sha
// BTW, the newRunTransferTasks and newRunTimerTasks are not used

b.transferTasks = append(b.transferTasks, b.scheduleDeleteHistoryTransferTask())
timerTask, err := b.scheduleDeleteHistoryTimerTask(event, domainID)
timerTask, err := b.scheduleDeleteHistoryTimerTask(event, domainID, execution.GetWorkflowId())
if err != nil {
return nil, nil, nil, err
}
Expand Down Expand Up @@ -517,15 +517,15 @@ func (b *stateBuilderImpl) scheduleWorkflowTimerTask(event *shared.HistoryEvent,
return &persistence.WorkflowTimeoutTask{VisibilityTimestamp: timeout}
}

func (b *stateBuilderImpl) scheduleDeleteHistoryTimerTask(event *shared.HistoryEvent, domainID string) (persistence.Task, error) {
func (b *stateBuilderImpl) scheduleDeleteHistoryTimerTask(event *shared.HistoryEvent, domainID, workflowID string) (persistence.Task, error) {
var retentionInDays int32
domainEntry, err := b.shard.GetDomainCache().GetDomainByID(domainID)
if err != nil {
if _, ok := err.(*shared.EntityNotExistsError); !ok {
return nil, err
}
} else {
retentionInDays = domainEntry.GetConfig().Retention
retentionInDays = domainEntry.GetRetentionDays(workflowID)
}
return b.getTimerBuilder(event).createDeleteHistoryEventTimerTask(time.Duration(retentionInDays) * time.Hour * 24), nil
}
Expand Down
4 changes: 2 additions & 2 deletions service/history/timerQueueActiveProcessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -760,7 +760,7 @@ Update_History_Loop:

tBuilder := t.historyService.getTimerBuilder(&context.workflowExecution)
var transferTasks, timerTasks []persistence.Task
tranT, timerT, err := getDeleteWorkflowTasksFromShard(t.shard, domainID, tBuilder)
tranT, timerT, err := getDeleteWorkflowTasksFromShard(t.shard, domainID, workflowExecution.GetWorkflowId(), tBuilder)
if err != nil {
return err
}
Expand Down Expand Up @@ -808,7 +808,7 @@ func (t *timerQueueActiveProcessorImpl) updateWorkflowExecution(

if createDeletionTask {
tBuilder := t.historyService.getTimerBuilder(&context.workflowExecution)
tranT, timerT, err := t.historyService.getDeleteWorkflowTasks(executionInfo.DomainID, tBuilder)
tranT, timerT, err := t.historyService.getDeleteWorkflowTasks(executionInfo.DomainID, executionInfo.WorkflowID, tBuilder)
if err != nil {
return nil
}
Expand Down
2 changes: 1 addition & 1 deletion service/history/transferQueueProcessorBase.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,7 @@ func (t *transferQueueProcessorBase) recordWorkflowClosed(
// it is possible that the domain got deleted. Use default retention.
} else {
// retention in domain config is in days, convert to seconds
retentionSeconds = int64(domainEntry.GetConfig().Retention) * 24 * 60 * 60
retentionSeconds = int64(domainEntry.GetRetentionDays(execution.GetWorkflowId())) * 24 * 60 * 60
domain = domainEntry.GetInfo().Name
}

Expand Down
2 changes: 1 addition & 1 deletion service/history/workflowExecutionContext.go
Original file line number Diff line number Diff line change
Expand Up @@ -325,7 +325,7 @@ func (c *workflowExecutionContext) updateHelper(transferTasks []persistence.Task
return err
}
// NOTE: domain retention is in days, so we need to do a conversion
finishExecutionTTL = domainEntry.GetConfig().Retention * secondsInDay
finishExecutionTTL = domainEntry.GetRetentionDays(executionInfo.WorkflowID) * secondsInDay
}

var replicationTasks []persistence.Task
Expand Down

0 comments on commit a0cd3f5

Please sign in to comment.