Skip to content

Commit

Permalink
Enable GetWorkflowExecutionHistory after mutable state is deleted (ca…
Browse files Browse the repository at this point in the history
…dence-workflow#236)

In this change we keep the length of the history in the visibility record.
This is useful in itself, but also allows reading the execution history even after deleting its mutable state.
Issue cadence-workflow#206
  • Loading branch information
Tamer Eldeeb authored Jun 9, 2017
1 parent b8d3a81 commit 89bae56
Show file tree
Hide file tree
Showing 10 changed files with 193 additions and 16 deletions.
40 changes: 40 additions & 0 deletions .gen/go/shared/shared.go
Original file line number Diff line number Diff line change
Expand Up @@ -1768,6 +1768,7 @@ func (p *WorkflowExecution) String() string {
// - StartTime
// - CloseTime
// - CloseStatus
// - HistoryLength
type WorkflowExecutionInfo struct {
// unused fields # 1 to 9
Execution *WorkflowExecution `thrift:"execution,10" db:"execution" json:"execution,omitempty"`
Expand All @@ -1779,6 +1780,8 @@ type WorkflowExecutionInfo struct {
CloseTime *int64 `thrift:"closeTime,40" db:"closeTime" json:"closeTime,omitempty"`
// unused fields # 41 to 49
CloseStatus *WorkflowExecutionCloseStatus `thrift:"closeStatus,50" db:"closeStatus" json:"closeStatus,omitempty"`
// unused fields # 51 to 59
HistoryLength *int64 `thrift:"historyLength,60" db:"historyLength" json:"historyLength,omitempty"`
}

func NewWorkflowExecutionInfo() *WorkflowExecutionInfo {
Expand Down Expand Up @@ -1820,6 +1823,13 @@ func (p *WorkflowExecutionInfo) GetCloseStatus() WorkflowExecutionCloseStatus {
}
return *p.CloseStatus
}
var WorkflowExecutionInfo_HistoryLength_DEFAULT int64
func (p *WorkflowExecutionInfo) GetHistoryLength() int64 {
if !p.IsSetHistoryLength() {
return WorkflowExecutionInfo_HistoryLength_DEFAULT
}
return *p.HistoryLength
}
func (p *WorkflowExecutionInfo) IsSetExecution() bool {
return p.Execution != nil
}
Expand All @@ -1840,6 +1850,10 @@ func (p *WorkflowExecutionInfo) IsSetCloseStatus() bool {
return p.CloseStatus != nil
}

func (p *WorkflowExecutionInfo) IsSetHistoryLength() bool {
return p.HistoryLength != nil
}

func (p *WorkflowExecutionInfo) Read(iprot thrift.TProtocol) error {
if _, err := iprot.ReadStructBegin(); err != nil {
return thrift.PrependError(fmt.Sprintf("%T read error: ", p), err)
Expand Down Expand Up @@ -1873,6 +1887,10 @@ func (p *WorkflowExecutionInfo) Read(iprot thrift.TProtocol) error {
if err := p.ReadField50(iprot); err != nil {
return err
}
case 60:
if err := p.ReadField60(iprot); err != nil {
return err
}
default:
if err := iprot.Skip(fieldTypeId); err != nil {
return err
Expand Down Expand Up @@ -1932,6 +1950,15 @@ func (p *WorkflowExecutionInfo) ReadField50(iprot thrift.TProtocol) error {
return nil
}

func (p *WorkflowExecutionInfo) ReadField60(iprot thrift.TProtocol) error {
if v, err := iprot.ReadI64(); err != nil {
return thrift.PrependError("error reading field 60: ", err)
} else {
p.HistoryLength = &v
}
return nil
}

func (p *WorkflowExecutionInfo) Write(oprot thrift.TProtocol) error {
if err := oprot.WriteStructBegin("WorkflowExecutionInfo"); err != nil {
return thrift.PrependError(fmt.Sprintf("%T write struct begin error: ", p), err) }
Expand All @@ -1941,6 +1968,7 @@ func (p *WorkflowExecutionInfo) Write(oprot thrift.TProtocol) error {
if err := p.writeField30(oprot); err != nil { return err }
if err := p.writeField40(oprot); err != nil { return err }
if err := p.writeField50(oprot); err != nil { return err }
if err := p.writeField60(oprot); err != nil { return err }
}
if err := oprot.WriteFieldStop(); err != nil {
return thrift.PrependError("write field stop error: ", err) }
Expand Down Expand Up @@ -2011,6 +2039,18 @@ func (p *WorkflowExecutionInfo) writeField50(oprot thrift.TProtocol) (err error)
return err
}

func (p *WorkflowExecutionInfo) writeField60(oprot thrift.TProtocol) (err error) {
if p.IsSetHistoryLength() {
if err := oprot.WriteFieldBegin("historyLength", thrift.I64, 60); err != nil {
return thrift.PrependError(fmt.Sprintf("%T write field begin error 60:historyLength: ", p), err) }
if err := oprot.WriteI64(int64(*p.HistoryLength)); err != nil {
return thrift.PrependError(fmt.Sprintf("%T.historyLength (60) field write error: ", p), err) }
if err := oprot.WriteFieldEnd(); err != nil {
return thrift.PrependError(fmt.Sprintf("%T write field end error 60:historyLength: ", p), err) }
}
return err
}

func (p *WorkflowExecutionInfo) String() string {
if p == nil {
return "<nil>"
Expand Down
23 changes: 23 additions & 0 deletions common/mocks/VisibilityManager.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,29 @@ type VisibilityManager struct {
mock.Mock
}

// GetClosedWorkflowExecution provides a mock function with given fields: request
func (_m *VisibilityManager) GetClosedWorkflowExecution(request *persistence.GetClosedWorkflowExecutionRequest) (*persistence.GetClosedWorkflowExecutionResponse, error) {
ret := _m.Called(request)

var r0 *persistence.GetClosedWorkflowExecutionResponse
if rf, ok := ret.Get(0).(func(*persistence.GetClosedWorkflowExecutionRequest) *persistence.GetClosedWorkflowExecutionResponse); ok {
r0 = rf(request)
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).(*persistence.GetClosedWorkflowExecutionResponse)
}
}

var r1 error
if rf, ok := ret.Get(1).(func(*persistence.GetClosedWorkflowExecutionRequest) error); ok {
r1 = rf(request)
} else {
r1 = ret.Error(1)
}

return r0, r1
}

// ListClosedWorkflowExecutions provides a mock function with given fields: request
func (_m *VisibilityManager) ListClosedWorkflowExecutions(request *persistence.ListWorkflowExecutionsRequest) (*persistence.ListWorkflowExecutionsResponse, error) {
ret := _m.Called(request)
Expand Down
59 changes: 52 additions & 7 deletions common/persistence/cassandraVisibilityPersistence.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,8 @@ const (
`AND run_id = ?`

templateCreateWorkflowExecutionClosed = `INSERT INTO closed_executions (` +
`domain_id, domain_partition, workflow_id, run_id, start_time, close_time, workflow_type_name, status) ` +
`VALUES (?, ?, ?, ?, ?, ?, ?, ?) using TTL ?`
`domain_id, domain_partition, workflow_id, run_id, start_time, close_time, workflow_type_name, status, history_length) ` +
`VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?) using TTL ?`

templateGetOpenWorkflowExecutions = `SELECT workflow_id, run_id, start_time, workflow_type_name ` +
`FROM open_executions ` +
Expand All @@ -59,7 +59,7 @@ const (
`AND start_time >= ? ` +
`AND start_time <= ? `

templateGetClosedWorkflowExecutions = `SELECT workflow_id, run_id, start_time, close_time, workflow_type_name, status ` +
templateGetClosedWorkflowExecutions = `SELECT workflow_id, run_id, start_time, close_time, workflow_type_name, status, history_length ` +
`FROM closed_executions ` +
`WHERE domain_id = ? ` +
`AND domain_partition IN (?) ` +
Expand All @@ -74,7 +74,7 @@ const (
`AND start_time <= ? ` +
`AND workflow_type_name = ? `

templateGetClosedWorkflowExecutionsByType = `SELECT workflow_id, run_id, start_time, close_time, workflow_type_name, status ` +
templateGetClosedWorkflowExecutionsByType = `SELECT workflow_id, run_id, start_time, close_time, workflow_type_name, status, history_length ` +
`FROM closed_executions ` +
`WHERE domain_id = ? ` +
`AND domain_partition = ? ` +
Expand All @@ -90,21 +90,28 @@ const (
`AND start_time <= ? ` +
`AND workflow_id = ? `

templateGetClosedWorkflowExecutionsByID = `SELECT workflow_id, run_id, start_time, close_time, workflow_type_name, status ` +
templateGetClosedWorkflowExecutionsByID = `SELECT workflow_id, run_id, start_time, close_time, workflow_type_name, status, history_length ` +
`FROM closed_executions ` +
`WHERE domain_id = ? ` +
`AND domain_partition = ? ` +
`AND start_time >= ? ` +
`AND start_time <= ? ` +
`AND workflow_id = ? `

templateGetClosedWorkflowExecutionsByStatus = `SELECT workflow_id, run_id, start_time, close_time, workflow_type_name, status ` +
templateGetClosedWorkflowExecutionsByStatus = `SELECT workflow_id, run_id, start_time, close_time, workflow_type_name, status, history_length ` +
`FROM closed_executions ` +
`WHERE domain_id = ? ` +
`AND domain_partition = ? ` +
`AND start_time >= ? ` +
`AND start_time <= ? ` +
`AND status = ? `

templateGetClosedWorkflowExecution = `SELECT workflow_id, run_id, start_time, close_time, workflow_type_name, status, history_length ` +
`FROM closed_executions ` +
`WHERE domain_id = ? ` +
`AND domain_partition = ? ` +
`AND workflow_id = ? ` +
`AND run_id = ? ALLOW FILTERING `
)

type (
Expand Down Expand Up @@ -183,6 +190,7 @@ func (v *cassandraVisibilityPersistence) RecordWorkflowExecutionClosed(
common.UnixNanoToCQLTimestamp(request.CloseTimestamp),
request.WorkflowTypeName,
request.Status,
request.HistoryLength,
retention,
)

Expand Down Expand Up @@ -446,6 +454,41 @@ func (v *cassandraVisibilityPersistence) ListClosedWorkflowExecutionsByStatus(
return response, nil
}

func (v *cassandraVisibilityPersistence) GetClosedWorkflowExecution(
request *GetClosedWorkflowExecutionRequest) (*GetClosedWorkflowExecutionResponse, error) {
execution := request.Execution
query := v.session.Query(templateGetClosedWorkflowExecution,
request.DomainUUID,
domainPartition,
execution.GetWorkflowId(),
execution.GetRunId())

iter := query.Iter()
if iter == nil {
return nil, &workflow.InternalServiceError{
Message: "GetClosedWorkflowExecution operation failed. Not able to create query iterator.",
}
}

wfexecution, has := readClosedWorkflowExecutionRecord(iter)
if !has {
return nil, &workflow.EntityNotExistsError{
Message: fmt.Sprintf("Workflow execution not found. WorkflowId: %v, RunId: %v",
execution.GetWorkflowId(), execution.GetRunId()),
}
}

if err := iter.Close(); err != nil {
return nil, &workflow.InternalServiceError{
Message: fmt.Sprintf("GetClosedWorkflowExecution operation failed. Error: %v", err),
}
}

return &GetClosedWorkflowExecutionResponse{
Execution: wfexecution,
}, nil
}

func readOpenWorkflowExecutionRecord(iter *gocql.Iter) (*workflow.WorkflowExecutionInfo, bool) {
var workflowID string
var runID gocql.UUID
Expand Down Expand Up @@ -475,7 +518,8 @@ func readClosedWorkflowExecutionRecord(iter *gocql.Iter) (*workflow.WorkflowExec
var startTime time.Time
var closeTime time.Time
var status workflow.WorkflowExecutionCloseStatus
if iter.Scan(&workflowID, &runID, &startTime, &closeTime, &typeName, &status) {
var historyLength int64
if iter.Scan(&workflowID, &runID, &startTime, &closeTime, &typeName, &status, &historyLength) {
execution := workflow.NewWorkflowExecution()
execution.WorkflowId = common.StringPtr(workflowID)
execution.RunId = common.StringPtr(runID.String())
Expand All @@ -489,6 +533,7 @@ func readClosedWorkflowExecutionRecord(iter *gocql.Iter) (*workflow.WorkflowExec
record.CloseTime = common.Int64Ptr(closeTime.UnixNano())
record.Type = wfType
record.CloseStatus = workflow.WorkflowExecutionCloseStatusPtr(status)
record.HistoryLength = common.Int64Ptr(historyLength)
return record, true
}
return nil, false
Expand Down
42 changes: 42 additions & 0 deletions common/persistence/cassandraVisibilityPersistence_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -404,3 +404,45 @@ func (s *visibilityPersistenceSuite) TestFilteringByCloseStatus() {
s.Equal(1, len(resp.Executions))
s.Equal(workflowExecution2.GetWorkflowId(), resp.Executions[0].Execution.GetWorkflowId())
}

func (s *visibilityPersistenceSuite) TestGetClosedExecution() {
testDomainUUID := uuid.New()

workflowExecution := gen.WorkflowExecution{
WorkflowId: common.StringPtr("visibility-workflow-test"),
RunId: common.StringPtr("a3dbc7bf-deb1-4946-b57c-cf0615ea553f"),
}

startTime := time.Now().Add(time.Second * -5).UnixNano()
err0 := s.VisibilityMgr.RecordWorkflowExecutionStarted(&RecordWorkflowExecutionStartedRequest{
DomainUUID: testDomainUUID,
Execution: workflowExecution,
WorkflowTypeName: "visibility-workflow",
StartTimestamp: startTime,
})
s.Nil(err0)

_, err1 := s.VisibilityMgr.GetClosedWorkflowExecution(&GetClosedWorkflowExecutionRequest{
DomainUUID: testDomainUUID,
Execution: workflowExecution,
})
s.NotNil(err1)

err2 := s.VisibilityMgr.RecordWorkflowExecutionClosed(&RecordWorkflowExecutionClosedRequest{
DomainUUID: testDomainUUID,
Execution: workflowExecution,
WorkflowTypeName: "visibility-workflow",
StartTimestamp: startTime,
CloseTimestamp: time.Now().UnixNano(),
HistoryLength: 3,
})
s.Nil(err2)

resp, err3 := s.VisibilityMgr.GetClosedWorkflowExecution(&GetClosedWorkflowExecutionRequest{
DomainUUID: testDomainUUID,
Execution: workflowExecution,
})
s.Nil(err3)
s.Equal(workflowExecution.GetWorkflowId(), resp.Execution.GetExecution().GetWorkflowId())
s.Equal(int64(3), resp.Execution.GetHistoryLength())
}
13 changes: 13 additions & 0 deletions common/persistence/visibilityInterfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ type (
StartTimestamp int64
CloseTimestamp int64
Status s.WorkflowExecutionCloseStatus
HistoryLength int64
RetentionSeconds int64
}

Expand Down Expand Up @@ -91,6 +92,17 @@ type (
Status s.WorkflowExecutionCloseStatus
}

// GetClosedWorkflowExecutionRequest is used retrieve the record for a specific execution
GetClosedWorkflowExecutionRequest struct {
DomainUUID string
Execution s.WorkflowExecution
}

// GetClosedWorkflowExecutionResponse is the response to GetClosedWorkflowExecutionRequest
GetClosedWorkflowExecutionResponse struct {
Execution *s.WorkflowExecutionInfo
}

// VisibilityManager is used to manage the visibility store
VisibilityManager interface {
RecordWorkflowExecutionStarted(request *RecordWorkflowExecutionStartedRequest) error
Expand All @@ -102,5 +114,6 @@ type (
ListOpenWorkflowExecutionsByWorkflowID(request *ListWorkflowExecutionsByWorkflowIDRequest) (*ListWorkflowExecutionsResponse, error)
ListClosedWorkflowExecutionsByWorkflowID(request *ListWorkflowExecutionsByWorkflowIDRequest) (*ListWorkflowExecutionsResponse, error)
ListClosedWorkflowExecutionsByStatus(request *ListClosedWorkflowExecutionsByStatusRequest) (*ListWorkflowExecutionsResponse, error)
GetClosedWorkflowExecution(request *GetClosedWorkflowExecutionRequest) (*GetClosedWorkflowExecutionResponse, error)
}
)
1 change: 1 addition & 0 deletions idl/github.com/uber/cadence/shared.thrift
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,7 @@ struct WorkflowExecutionInfo {
30: optional i64 (js.type = "Long") startTime
40: optional i64 (js.type = "Long") closeTime
50: optional WorkflowExecutionCloseStatus closeStatus
60: optional i64 (js.type = "Long") historyLength
}

struct ScheduleActivityTaskDecisionAttributes {
Expand Down
1 change: 1 addition & 0 deletions schema/visibility/schema.cql
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ CREATE TABLE closed_executions (
close_time timestamp,
status int, -- enum WorkflowExecutionCloseStatus {COMPLETED, FAILED, CANCELED, TERMINATED, CONTINUED_AS_NEW, TIMED_OUT}
workflow_type_name text,
history_length bigint,
PRIMARY KEY ((domain_id, domain_partition), start_time, run_id)
) WITH CLUSTERING ORDER BY (start_time DESC)
AND COMPACTION = {
Expand Down
1 change: 1 addition & 0 deletions schema/visibility/versioned/v0.1/base.cql
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ CREATE TABLE closed_executions (
close_time timestamp,
status int, -- enum WorkflowExecutionCloseStatus {COMPLETED, FAILED, CANCELED, TERMINATED, CONTINUED_AS_NEW, TIMED_OUT}
workflow_type_name text,
history_length bigint,
PRIMARY KEY ((domain_id, domain_partition), start_time, run_id)
) WITH CLUSTERING ORDER BY (start_time DESC)
AND COMPACTION = {
Expand Down
28 changes: 19 additions & 9 deletions service/frontend/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -558,11 +558,7 @@ func (wh *WorkflowHandler) GetWorkflowExecutionHistory(
return nil, errWorkflowIDNotSet
}

if !getRequest.GetExecution().IsSetRunId() {
return nil, errRunIDNotSet
}

if uuid.Parse(getRequest.GetExecution().GetRunId()) == nil {
if getRequest.GetExecution().IsSetRunId() && uuid.Parse(getRequest.GetExecution().GetRunId()) == nil {
return nil, errInvalidRunID
}

Expand All @@ -587,11 +583,25 @@ func (wh *WorkflowHandler) GetWorkflowExecutionHistory(
DomainUUID: common.StringPtr(info.ID),
Execution: getRequest.GetExecution(),
})
if err != nil {
return nil, wrapError(err)
if err == nil {
token.nextEventID = response.GetEventId()
token.runID = response.GetRunId()
} else {
if _, ok := err.(*gen.EntityNotExistsError); !ok || !getRequest.GetExecution().IsSetRunId() {
return nil, wrapError(err)
}
// It is possible that we still have the events in the table even though the mutable state is gone
// Get the nextEventID from visibility store if we still have it.
visibilityResp, err := wh.visibitiltyMgr.GetClosedWorkflowExecution(&persistence.GetClosedWorkflowExecutionRequest{
DomainUUID: info.ID,
Execution: *getRequest.GetExecution(),
})
if err != nil {
return nil, wrapError(err)
}
token.nextEventID = visibilityResp.Execution.GetHistoryLength()
token.runID = visibilityResp.Execution.GetExecution().GetRunId()
}
token.nextEventID = response.GetEventId()
token.runID = response.GetRunId()
}

we := gen.WorkflowExecution{
Expand Down
Loading

0 comments on commit 89bae56

Please sign in to comment.