Skip to content

Commit

Permalink
implement filter which allow caller choose all events or only close e…
Browse files Browse the repository at this point in the history
…vent of when dumping history events (cadence-workflow#489)

when long poll on end event, use history service for end event
  • Loading branch information
wxing1292 authored Jan 3, 2018
1 parent 6fd468a commit 5e9c49d
Show file tree
Hide file tree
Showing 18 changed files with 472 additions and 65 deletions.
4 changes: 2 additions & 2 deletions .gen/go/shared/idl.go

Large diffs are not rendered by default.

196 changes: 189 additions & 7 deletions .gen/go/shared/types.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions common/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ const (
FirstEventID int64 = 1
// EmptyEventID is the id of the empty event
EmptyEventID int64 = -23
// EndEventID is the id of the end event, here we use the int64 max
EndEventID int64 = 1<<63 - 1
)

const (
Expand Down
5 changes: 5 additions & 0 deletions common/persistence/cassandraPersistence.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,7 @@ const (
`execution_context: ?, ` +
`state: ?, ` +
`close_status: ?, ` +
`last_first_event_id: ?, ` +
`next_event_id: ?, ` +
`last_processed_event: ?, ` +
`start_time: ?, ` +
Expand Down Expand Up @@ -903,6 +904,7 @@ func (d *cassandraPersistence) CreateWorkflowExecutionWithinBatch(request *Creat
request.ExecutionContext,
WorkflowStateCreated,
WorkflowCloseStatusNone,
common.FirstEventID,
request.NextEventID,
request.LastProcessedEvent,
cqlNowTimestamp,
Expand Down Expand Up @@ -1024,6 +1026,7 @@ func (d *cassandraPersistence) UpdateWorkflowExecution(request *UpdateWorkflowEx
executionInfo.ExecutionContext,
executionInfo.State,
executionInfo.CloseStatus,
executionInfo.LastFirstEventID,
executionInfo.NextEventID,
executionInfo.LastProcessedEvent,
executionInfo.StartTimestamp,
Expand Down Expand Up @@ -2013,6 +2016,8 @@ func createWorkflowExecutionInfo(result map[string]interface{}) *WorkflowExecuti
info.State = v.(int)
case "close_status":
info.CloseStatus = v.(int)
case "last_first_event_id":
info.LastFirstEventID = v.(int64)
case "next_event_id":
info.NextEventID = v.(int64)
case "last_processed_event":
Expand Down
3 changes: 3 additions & 0 deletions common/persistence/cassandraPersistence_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,7 @@ func (s *cassandraPersistenceSuite) TestUpdateWorkflow() {
s.Equal(int32(13), info0.DecisionTimeoutValue)
s.Equal([]byte(nil), info0.ExecutionContext)
s.Equal(WorkflowStateCreated, info0.State)
s.Equal(int64(1), info0.LastFirstEventID)
s.Equal(int64(3), info0.NextEventID)
s.Equal(int64(0), info0.LastProcessedEvent)
s.Equal(true, validateTimeRange(info0.LastUpdatedTimestamp, time.Hour))
Expand All @@ -190,6 +191,7 @@ func (s *cassandraPersistenceSuite) TestUpdateWorkflow() {
log.Infof("Workflow execution last updated: %v", info0.LastUpdatedTimestamp)

updatedInfo := copyWorkflowExecutionInfo(info0)
updatedInfo.LastFirstEventID = int64(3)
updatedInfo.NextEventID = int64(5)
updatedInfo.LastProcessedEvent = int64(2)
updatedInfo.DecisionAttempt = int64(123)
Expand All @@ -215,6 +217,7 @@ func (s *cassandraPersistenceSuite) TestUpdateWorkflow() {
s.Equal(int32(13), info1.DecisionTimeoutValue)
s.Equal([]byte(nil), info1.ExecutionContext)
s.Equal(WorkflowStateCreated, info1.State)
s.Equal(int64(3), info1.LastFirstEventID)
s.Equal(int64(5), info1.NextEventID)
s.Equal(int64(2), info1.LastProcessedEvent)
s.Equal(true, validateTimeRange(info1.LastUpdatedTimestamp, time.Hour))
Expand Down
1 change: 1 addition & 0 deletions common/persistence/dataInterfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,7 @@ type (
ExecutionContext []byte
State int
CloseStatus int
LastFirstEventID int64
NextEventID int64
LastProcessedEvent int64
StartTimestamp time.Time
Expand Down
Loading

0 comments on commit 5e9c49d

Please sign in to comment.