Skip to content

Commit

Permalink
Support is_cron field for SQL/Cassandra/Postgres (cadence-workflow#4191)
Browse files Browse the repository at this point in the history
  • Loading branch information
demirkayaender authored May 12, 2021
1 parent d15fbe5 commit 53edb8d
Show file tree
Hide file tree
Showing 22 changed files with 151 additions and 31 deletions.
30 changes: 20 additions & 10 deletions common/persistence/cassandra/cassandraVisibilityPersistence.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,17 +47,17 @@ const (

const (
///////////////// Open Executions /////////////////
openExecutionsColumnsForSelect = " workflow_id, run_id, start_time, execution_time, workflow_type_name, memo, encoding, task_list "
openExecutionsColumnsForSelect = " workflow_id, run_id, start_time, execution_time, workflow_type_name, memo, encoding, task_list, is_cron "

openExecutionsColumnsForInsert = "(domain_id, domain_partition, " + openExecutionsColumnsForSelect + ")"

templateCreateWorkflowExecutionStartedWithTTL = `INSERT INTO open_executions ` +
openExecutionsColumnsForInsert +
`VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?) using TTL ?`
`VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) using TTL ?`

templateCreateWorkflowExecutionStarted = `INSERT INTO open_executions` +
openExecutionsColumnsForInsert +
`VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)`
`VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)`

templateDeleteWorkflowExecutionStarted = `DELETE FROM open_executions ` +
`WHERE domain_id = ? ` +
Expand Down Expand Up @@ -89,25 +89,25 @@ const (
`AND workflow_id = ? `

///////////////// Closed Executions /////////////////
closedExecutionColumnsForSelect = " workflow_id, run_id, start_time, execution_time, close_time, workflow_type_name, status, history_length, memo, encoding, task_list "
closedExecutionColumnsForSelect = " workflow_id, run_id, start_time, execution_time, close_time, workflow_type_name, status, history_length, memo, encoding, task_list, is_cron "

closedExecutionColumnsForInsert = "(domain_id, domain_partition, " + closedExecutionColumnsForSelect + ")"

templateCreateWorkflowExecutionClosedWithTTL = `INSERT INTO closed_executions ` +
closedExecutionColumnsForInsert +
`VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) using TTL ?`
`VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) using TTL ?`

templateCreateWorkflowExecutionClosed = `INSERT INTO closed_executions ` +
closedExecutionColumnsForInsert +
`VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)`
`VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)`

templateCreateWorkflowExecutionClosedWithTTLV2 = `INSERT INTO closed_executions_v2 ` +
closedExecutionColumnsForInsert +
`VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) using TTL ?`
`VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) using TTL ?`

templateCreateWorkflowExecutionClosedV2 = `INSERT INTO closed_executions_v2 ` +
closedExecutionColumnsForInsert +
`VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)`
`VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)`

templateGetClosedWorkflowExecutions = `SELECT ` + closedExecutionColumnsForSelect +
`FROM closed_executions ` +
Expand Down Expand Up @@ -234,6 +234,7 @@ func (v *cassandraVisibilityPersistence) RecordWorkflowExecutionStarted(
request.Memo.Data,
string(request.Memo.GetEncoding()),
request.TaskList,
request.IsCron,
).WithContext(ctx)
} else {
query = v.session.Query(templateCreateWorkflowExecutionStartedWithTTL,
Expand All @@ -247,6 +248,7 @@ func (v *cassandraVisibilityPersistence) RecordWorkflowExecutionStarted(
request.Memo.Data,
string(request.Memo.GetEncoding()),
request.TaskList,
request.IsCron,
ttl,
).WithContext(ctx)
}
Expand Down Expand Up @@ -295,6 +297,7 @@ func (v *cassandraVisibilityPersistence) RecordWorkflowExecutionClosed(
request.Memo.Data,
string(request.Memo.GetEncoding()),
request.TaskList,
request.IsCron,
)
// duplicate write to v2 to order by close time
batch.Query(templateCreateWorkflowExecutionClosedV2,
Expand All @@ -311,6 +314,7 @@ func (v *cassandraVisibilityPersistence) RecordWorkflowExecutionClosed(
request.Memo.Data,
string(request.Memo.GetEncoding()),
request.TaskList,
request.IsCron,
)
} else {
batch.Query(templateCreateWorkflowExecutionClosedWithTTL,
Expand All @@ -327,6 +331,7 @@ func (v *cassandraVisibilityPersistence) RecordWorkflowExecutionClosed(
request.Memo.Data,
string(request.Memo.GetEncoding()),
request.TaskList,
request.IsCron,
int64(retention.Seconds()),
)
// duplicate write to v2 to order by close time
Expand All @@ -344,6 +349,7 @@ func (v *cassandraVisibilityPersistence) RecordWorkflowExecutionClosed(
request.Memo.Data,
string(request.Memo.GetEncoding()),
request.TaskList,
request.IsCron,
int64(retention.Seconds()),
)
}
Expand Down Expand Up @@ -867,7 +873,8 @@ func readOpenWorkflowExecutionRecord(
var memo []byte
var encoding string
var taskList string
if iter.Scan(&workflowID, &runID, &startTime, &executionTime, &typeName, &memo, &encoding, &taskList) {
var isCron bool
if iter.Scan(&workflowID, &runID, &startTime, &executionTime, &typeName, &memo, &encoding, &taskList, &isCron) {
record := &p.InternalVisibilityWorkflowExecutionInfo{
WorkflowID: workflowID,
RunID: runID,
Expand All @@ -876,6 +883,7 @@ func readOpenWorkflowExecutionRecord(
ExecutionTime: executionTime,
Memo: p.NewDataBlob(memo, common.EncodingType(encoding)),
TaskList: taskList,
IsCron: isCron,
}
return record, true
}
Expand All @@ -896,7 +904,8 @@ func readClosedWorkflowExecutionRecord(
var memo []byte
var encoding string
var taskList string
if iter.Scan(&workflowID, &runID, &startTime, &executionTime, &closeTime, &typeName, &status, &historyLength, &memo, &encoding, &taskList) {
var isCron bool
if iter.Scan(&workflowID, &runID, &startTime, &executionTime, &closeTime, &typeName, &status, &historyLength, &memo, &encoding, &taskList, &isCron) {
record := &p.InternalVisibilityWorkflowExecutionInfo{
WorkflowID: workflowID,
RunID: runID,
Expand All @@ -908,6 +917,7 @@ func readClosedWorkflowExecutionRecord(
HistoryLength: historyLength,
Memo: p.NewDataBlob(memo, common.EncodingType(encoding)),
TaskList: taskList,
IsCron: isCron,
}
return record, true
}
Expand Down
1 change: 1 addition & 0 deletions common/persistence/dataInterfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -340,6 +340,7 @@ type (
BranchToken []byte
// Cron
CronSchedule string
IsCron bool
ExpirationSeconds int32 // TODO: is this field useful?
}

Expand Down
1 change: 1 addition & 0 deletions common/persistence/executionStore.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,7 @@ func (m *executionManagerImpl) DeserializeExecutionInfo(
InitiatedID: info.InitiatedID,
CompletionEventBatchID: info.CompletionEventBatchID,
TaskList: info.TaskList,
IsCron: len(info.CronSchedule) > 0,
WorkflowTypeName: info.WorkflowTypeName,
WorkflowTimeout: int32(info.WorkflowTimeout.Seconds()),
DecisionStartToCloseTimeout: int32(info.DecisionStartToCloseTimeout.Seconds()),
Expand Down
56 changes: 56 additions & 0 deletions common/persistence/persistence-tests/visibilityPersistenceTest.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,62 @@ func (s *VisibilityPersistenceSuite) TestBasicVisibility() {
s.assertClosedExecutionEquals(closeReq, resp.Executions[0])
}

// TestCronVisibility test
func (s *VisibilityPersistenceSuite) TestCronVisibility() {
ctx, cancel := context.WithTimeout(context.Background(), testContextTimeout)
defer cancel()

testDomainUUID := uuid.New()

workflowExecution := types.WorkflowExecution{
WorkflowID: "visibility-cron-workflow-test",
RunID: "fb15e4b5-356f-466d-8c6d-a29223e5c537",
}

startTime := time.Now().Add(time.Second * -5).UnixNano()
startReq := &p.RecordWorkflowExecutionStartedRequest{
DomainUUID: testDomainUUID,
Execution: workflowExecution,
WorkflowTypeName: "visibility-cron-workflow",
StartTimestamp: startTime,
IsCron: true,
}
err0 := s.VisibilityMgr.RecordWorkflowExecutionStarted(ctx, startReq)
s.Nil(err0)

resp, err1 := s.VisibilityMgr.ListOpenWorkflowExecutions(ctx, &p.ListWorkflowExecutionsRequest{
DomainUUID: testDomainUUID,
PageSize: 1,
EarliestTime: startTime,
LatestTime: startTime,
})
s.Nil(err1)
s.Equal(1, len(resp.Executions))
s.True(resp.Executions[0].IsCron)

closeReq := &p.RecordWorkflowExecutionClosedRequest{
DomainUUID: testDomainUUID,
Execution: workflowExecution,
WorkflowTypeName: "visibility-workflow",
StartTimestamp: startTime,
CloseTimestamp: time.Now().UnixNano(),
HistoryLength: 5,
IsCron: true,
}
err2 := s.VisibilityMgr.RecordWorkflowExecutionClosed(ctx, closeReq)
s.Nil(err2)

resp, err4 := s.VisibilityMgr.ListClosedWorkflowExecutions(ctx, &p.ListWorkflowExecutionsRequest{
DomainUUID: testDomainUUID,
PageSize: 1,
EarliestTime: startTime,
LatestTime: startTime,
})
s.Nil(err4)
s.Equal(1, len(resp.Executions))
s.True(resp.Executions[0].IsCron)
}

// TestBasicVisibilityTimeSkew test
func (s *VisibilityPersistenceSuite) TestBasicVisibilityTimeSkew() {
ctx, cancel := context.WithTimeout(context.Background(), testContextTimeout)
Expand Down
8 changes: 8 additions & 0 deletions common/persistence/serialization/getters.go
Original file line number Diff line number Diff line change
Expand Up @@ -428,6 +428,14 @@ func (w *WorkflowExecutionInfo) GetTaskList() (o string) {
return
}

// GetIsCron internal sql blob getter
func (w *WorkflowExecutionInfo) GetIsCron() (o bool) {
if w != nil && w.IsCron != nil {
return *w.IsCron
}
return
}

// GetWorkflowTypeName internal sql blob getter
func (w *WorkflowExecutionInfo) GetWorkflowTypeName() (o string) {
if w != nil && w.WorkflowTypeName != nil {
Expand Down
1 change: 1 addition & 0 deletions common/persistence/serialization/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@ type (
CompletionEvent []byte
CompletionEventEncoding *string
TaskList *string
IsCron *bool
WorkflowTypeName *string
WorkflowTimeout *time.Duration
DecisionTaskTimeout *time.Duration
Expand Down
3 changes: 3 additions & 0 deletions common/persistence/sql/sqlVisibilityStore.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ func (s *sqlVisibilityStore) RecordWorkflowExecutionStarted(
WorkflowTypeName: request.WorkflowTypeName,
Memo: request.Memo.Data,
Encoding: string(request.Memo.GetEncoding()),
IsCron: request.IsCron,
})

if err != nil {
Expand All @@ -100,6 +101,7 @@ func (s *sqlVisibilityStore) RecordWorkflowExecutionClosed(
HistoryLength: &request.HistoryLength,
Memo: request.Memo.Data,
Encoding: string(request.Memo.GetEncoding()),
IsCron: request.IsCron,
})
if err != nil {
return convertCommonErrors(s.db, "RecordWorkflowExecutionClosed", "", err)
Expand Down Expand Up @@ -319,6 +321,7 @@ func (s *sqlVisibilityStore) rowToInfo(row *sqlplugin.VisibilityRow) *p.Internal
TypeName: row.WorkflowTypeName,
StartTime: row.StartTime,
ExecutionTime: row.ExecutionTime,
IsCron: row.IsCron,
Memo: p.NewDataBlob(row.Memo, common.EncodingType(row.Encoding)),
}
if row.CloseStatus != nil {
Expand Down
1 change: 1 addition & 0 deletions common/persistence/sql/sqlplugin/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -515,6 +515,7 @@ type (
HistoryLength *int64
Memo []byte
Encoding string
IsCron bool
}

// VisibilityFilter contains the column names within executions_visibility table that
Expand Down
18 changes: 10 additions & 8 deletions common/persistence/sql/sqlplugin/mysql/visibility.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,12 +31,12 @@ import (

const (
templateCreateWorkflowExecutionStarted = `INSERT IGNORE INTO executions_visibility (` +
`domain_id, workflow_id, run_id, start_time, execution_time, workflow_type_name, memo, encoding) ` +
`VALUES (?, ?, ?, ?, ?, ?, ?, ?)`
`domain_id, workflow_id, run_id, start_time, execution_time, workflow_type_name, memo, encoding, is_cron) ` +
`VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)`

templateCreateWorkflowExecutionClosed = `REPLACE INTO executions_visibility (` +
`domain_id, workflow_id, run_id, start_time, execution_time, workflow_type_name, close_time, close_status, history_length, memo, encoding) ` +
`VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)`
`domain_id, workflow_id, run_id, start_time, execution_time, workflow_type_name, close_time, close_status, history_length, memo, encoding, is_cron) ` +
`VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)`

// RunID condition is needed for correct pagination
templateConditions = ` AND domain_id = ?
Expand All @@ -46,7 +46,7 @@ const (
ORDER BY start_time DESC, run_id
LIMIT ?`

templateOpenFieldNames = `workflow_id, run_id, start_time, execution_time, workflow_type_name, memo, encoding`
templateOpenFieldNames = `workflow_id, run_id, start_time, execution_time, workflow_type_name, memo, encoding, is_cron`
templateOpenSelect = `SELECT ` + templateOpenFieldNames + ` FROM executions_visibility WHERE close_status IS NULL `

templateClosedSelect = `SELECT ` + templateOpenFieldNames + `, close_time, close_status, history_length
Expand All @@ -66,7 +66,7 @@ const (

templateGetClosedWorkflowExecutionsByStatus = templateClosedSelect + `AND close_status = ?` + templateConditions

templateGetClosedWorkflowExecution = `SELECT workflow_id, run_id, start_time, execution_time, memo, encoding, close_time, workflow_type_name, close_status, history_length
templateGetClosedWorkflowExecution = `SELECT workflow_id, run_id, start_time, execution_time, memo, encoding, close_time, workflow_type_name, close_status, history_length, is_cron
FROM executions_visibility
WHERE domain_id = ? AND close_status IS NOT NULL
AND run_id = ?`
Expand All @@ -89,7 +89,8 @@ func (mdb *db) InsertIntoVisibility(ctx context.Context, row *sqlplugin.Visibili
row.ExecutionTime,
row.WorkflowTypeName,
row.Memo,
row.Encoding)
row.Encoding,
row.IsCron)
}

// ReplaceIntoVisibility replaces an existing row if it exist or creates a new row in visibility table
Expand All @@ -110,7 +111,8 @@ func (mdb *db) ReplaceIntoVisibility(ctx context.Context, row *sqlplugin.Visibil
*row.CloseStatus,
*row.HistoryLength,
row.Memo,
row.Encoding)
row.Encoding,
row.IsCron)
default:
return nil, errCloseParams
}
Expand Down
23 changes: 13 additions & 10 deletions common/persistence/sql/sqlplugin/postgres/visibility.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,14 +32,14 @@ import (

const (
templateCreateWorkflowExecutionStarted = `INSERT INTO executions_visibility (` +
`domain_id, workflow_id, run_id, start_time, execution_time, workflow_type_name, memo, encoding) ` +
`VALUES ($1, $2, $3, $4, $5, $6, $7, $8)
`domain_id, workflow_id, run_id, start_time, execution_time, workflow_type_name, memo, encoding, is_cron) ` +
`VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9)
ON CONFLICT (domain_id, run_id) DO NOTHING`

templateCreateWorkflowExecutionClosed = `INSERT INTO executions_visibility (` +
`domain_id, workflow_id, run_id, start_time, execution_time, workflow_type_name, close_time, close_status, history_length, memo, encoding) ` +
`VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11)
ON CONFLICT (domain_id, run_id) DO UPDATE
`domain_id, workflow_id, run_id, start_time, execution_time, workflow_type_name, close_time, close_status, history_length, memo, encoding, is_cron) ` +
`VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12)
ON CONFLICT (domain_id, run_id) DO UPDATE
SET workflow_id = excluded.workflow_id,
start_time = excluded.start_time,
execution_time = excluded.execution_time,
Expand All @@ -48,7 +48,8 @@ const (
close_status = excluded.close_status,
history_length = excluded.history_length,
memo = excluded.memo,
encoding = excluded.encoding`
encoding = excluded.encoding,
is_cron = excluded.is_cron`

// RunID condition is needed for correct pagination
templateConditions1 = ` AND domain_id = $1
Expand All @@ -65,7 +66,7 @@ const (
ORDER BY start_time DESC, run_id
LIMIT $7`

templateOpenFieldNames = `workflow_id, run_id, start_time, execution_time, workflow_type_name, memo, encoding`
templateOpenFieldNames = `workflow_id, run_id, start_time, execution_time, workflow_type_name, memo, encoding, is_cron`
templateOpenSelect = `SELECT ` + templateOpenFieldNames + ` FROM executions_visibility WHERE close_status IS NULL `

templateClosedSelect = `SELECT ` + templateOpenFieldNames + `, close_time, close_status, history_length
Expand All @@ -85,7 +86,7 @@ const (

templateGetClosedWorkflowExecutionsByStatus = templateClosedSelect + `AND close_status = $1` + templateConditions2

templateGetClosedWorkflowExecution = `SELECT workflow_id, run_id, start_time, execution_time, memo, encoding, close_time, workflow_type_name, close_status, history_length
templateGetClosedWorkflowExecution = `SELECT workflow_id, run_id, start_time, execution_time, memo, encoding, close_time, workflow_type_name, close_status, history_length, is_cron
FROM executions_visibility
WHERE domain_id = $1 AND close_status IS NOT NULL
AND run_id = $2`
Expand All @@ -107,7 +108,8 @@ func (pdb *db) InsertIntoVisibility(ctx context.Context, row *sqlplugin.Visibili
row.ExecutionTime,
row.WorkflowTypeName,
row.Memo,
row.Encoding)
row.Encoding,
row.IsCron)
}

// ReplaceIntoVisibility replaces an existing row if it exist or creates a new row in visibility table
Expand All @@ -127,7 +129,8 @@ func (pdb *db) ReplaceIntoVisibility(ctx context.Context, row *sqlplugin.Visibil
*row.CloseStatus,
*row.HistoryLength,
row.Memo,
row.Encoding)
row.Encoding,
row.IsCron)
default:
return nil, errCloseParams
}
Expand Down
2 changes: 1 addition & 1 deletion schema/cassandra/version.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,4 +26,4 @@ package cassandra
const Version = "0.30"

// VisibilityVersion is the Cassandra visibility database release version
const VisibilityVersion = "0.5"
const VisibilityVersion = "0.6"
Loading

0 comments on commit 53edb8d

Please sign in to comment.