Skip to content

Commit

Permalink
sql: make visibility APIs do upsert instead of insert/update (cadence…
Browse files Browse the repository at this point in the history
  • Loading branch information
venkat1109 authored Apr 8, 2019
1 parent ea292b5 commit ad28117
Show file tree
Hide file tree
Showing 4 changed files with 191 additions and 65 deletions.
171 changes: 147 additions & 24 deletions common/persistence/persistence-tests/visibilityPersistenceTest.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,12 +71,13 @@ func (s *VisibilityPersistenceSuite) TestBasicVisibility() {
}

startTime := time.Now().Add(time.Second * -5).UnixNano()
err0 := s.VisibilityMgr.RecordWorkflowExecutionStarted(&p.RecordWorkflowExecutionStartedRequest{
startReq := &p.RecordWorkflowExecutionStartedRequest{
DomainUUID: testDomainUUID,
Execution: workflowExecution,
WorkflowTypeName: "visibility-workflow",
StartTimestamp: startTime,
})
}
err0 := s.VisibilityMgr.RecordWorkflowExecutionStarted(startReq)
s.Nil(err0)

resp, err1 := s.VisibilityMgr.ListOpenWorkflowExecutions(&p.ListWorkflowExecutionsRequest{
Expand All @@ -87,15 +88,17 @@ func (s *VisibilityPersistenceSuite) TestBasicVisibility() {
})
s.Nil(err1)
s.Equal(1, len(resp.Executions))
s.Equal(workflowExecution.WorkflowId, resp.Executions[0].Execution.WorkflowId)
s.assertOpenExecutionEquals(startReq, resp.Executions[0])

err2 := s.VisibilityMgr.RecordWorkflowExecutionClosed(&p.RecordWorkflowExecutionClosedRequest{
closeReq := &p.RecordWorkflowExecutionClosedRequest{
DomainUUID: testDomainUUID,
Execution: workflowExecution,
WorkflowTypeName: "visibility-workflow",
StartTimestamp: startTime,
CloseTimestamp: time.Now().UnixNano(),
})
HistoryLength: 5,
}
err2 := s.VisibilityMgr.RecordWorkflowExecutionClosed(closeReq)
s.Nil(err2)

resp, err3 := s.VisibilityMgr.ListOpenWorkflowExecutions(&p.ListWorkflowExecutionsRequest{
Expand All @@ -115,6 +118,7 @@ func (s *VisibilityPersistenceSuite) TestBasicVisibility() {
})
s.Nil(err4)
s.Equal(1, len(resp.Executions))
s.assertClosedExecutionEquals(closeReq, resp.Executions[0])
}

// TestBasicVisibilityTimeSkew test
Expand Down Expand Up @@ -183,25 +187,30 @@ func (s *VisibilityPersistenceSuite) TestVisibilityPagination() {
WorkflowId: common.StringPtr("visibility-pagination-test1"),
RunId: common.StringPtr("fb15e4b5-356f-466d-8c6d-a29223e5c536"),
}
err0 := s.VisibilityMgr.RecordWorkflowExecutionStarted(&p.RecordWorkflowExecutionStartedRequest{

startReq1 := &p.RecordWorkflowExecutionStartedRequest{
DomainUUID: testDomainUUID,
Execution: workflowExecution1,
WorkflowTypeName: "visibility-workflow",
StartTimestamp: startTime1.UnixNano(),
})
}

err0 := s.VisibilityMgr.RecordWorkflowExecutionStarted(startReq1)
s.Nil(err0)

startTime2 := startTime1.Add(time.Second)
workflowExecution2 := gen.WorkflowExecution{
WorkflowId: common.StringPtr("visibility-pagination-test2"),
RunId: common.StringPtr("843f6fc7-102a-4c63-a2d4-7c653b01bf52"),
}
err1 := s.VisibilityMgr.RecordWorkflowExecutionStarted(&p.RecordWorkflowExecutionStartedRequest{

startReq2 := &p.RecordWorkflowExecutionStartedRequest{
DomainUUID: testDomainUUID,
Execution: workflowExecution2,
WorkflowTypeName: "visibility-workflow",
StartTimestamp: startTime2.UnixNano(),
})
}
err1 := s.VisibilityMgr.RecordWorkflowExecutionStarted(startReq2)
s.Nil(err1)

// Get the first one
Expand All @@ -213,7 +222,7 @@ func (s *VisibilityPersistenceSuite) TestVisibilityPagination() {
})
s.Nil(err2)
s.Equal(1, len(resp.Executions))
s.Equal(workflowExecution2.GetWorkflowId(), resp.Executions[0].GetExecution().GetWorkflowId())
s.assertOpenExecutionEquals(startReq2, resp.Executions[0])

// Use token to get the second one
resp, err3 := s.VisibilityMgr.ListOpenWorkflowExecutions(&p.ListWorkflowExecutionsRequest{
Expand All @@ -225,7 +234,7 @@ func (s *VisibilityPersistenceSuite) TestVisibilityPagination() {
})
s.Nil(err3)
s.Equal(1, len(resp.Executions))
s.Equal(workflowExecution1.GetWorkflowId(), resp.Executions[0].GetExecution().GetWorkflowId())
s.assertOpenExecutionEquals(startReq1, resp.Executions[0])

// TODO: See if it is possible in Cassandra to not return non empty token which is going to return empty result
if s.ExecutionManager.GetName() == "cassandra" {
Expand Down Expand Up @@ -298,13 +307,15 @@ func (s *VisibilityPersistenceSuite) TestFilteringByType() {
})
s.Nil(err3)

err4 := s.VisibilityMgr.RecordWorkflowExecutionClosed(&p.RecordWorkflowExecutionClosedRequest{
closeReq := &p.RecordWorkflowExecutionClosedRequest{
DomainUUID: testDomainUUID,
Execution: workflowExecution2,
WorkflowTypeName: "visibility-workflow-2",
StartTimestamp: startTime,
CloseTimestamp: time.Now().UnixNano(),
})
HistoryLength: 3,
}
err4 := s.VisibilityMgr.RecordWorkflowExecutionClosed(closeReq)
s.Nil(err4)

// List closed with filtering
Expand All @@ -319,7 +330,7 @@ func (s *VisibilityPersistenceSuite) TestFilteringByType() {
})
s.Nil(err5)
s.Equal(1, len(resp.Executions))
s.Equal(workflowExecution2.WorkflowId, resp.Executions[0].Execution.WorkflowId)
s.assertClosedExecutionEquals(closeReq, resp.Executions[0])
}

// TestFilteringByWorkflowID test
Expand Down Expand Up @@ -376,13 +387,15 @@ func (s *VisibilityPersistenceSuite) TestFilteringByWorkflowID() {
})
s.Nil(err3)

err4 := s.VisibilityMgr.RecordWorkflowExecutionClosed(&p.RecordWorkflowExecutionClosedRequest{
closeReq := &p.RecordWorkflowExecutionClosedRequest{
DomainUUID: testDomainUUID,
Execution: workflowExecution2,
WorkflowTypeName: "visibility-workflow",
StartTimestamp: startTime,
CloseTimestamp: time.Now().UnixNano(),
})
HistoryLength: 3,
}
err4 := s.VisibilityMgr.RecordWorkflowExecutionClosed(closeReq)
s.Nil(err4)

// List closed with filtering
Expand All @@ -397,7 +410,7 @@ func (s *VisibilityPersistenceSuite) TestFilteringByWorkflowID() {
})
s.Nil(err5)
s.Equal(1, len(resp.Executions))
s.Equal(workflowExecution2.WorkflowId, resp.Executions[0].Execution.WorkflowId)
s.assertClosedExecutionEquals(closeReq, resp.Executions[0])
}

// TestFilteringByCloseStatus test
Expand Down Expand Up @@ -441,14 +454,16 @@ func (s *VisibilityPersistenceSuite) TestFilteringByCloseStatus() {
})
s.Nil(err2)

err3 := s.VisibilityMgr.RecordWorkflowExecutionClosed(&p.RecordWorkflowExecutionClosedRequest{
closeReq := &p.RecordWorkflowExecutionClosedRequest{
DomainUUID: testDomainUUID,
Execution: workflowExecution2,
WorkflowTypeName: "visibility-workflow",
StartTimestamp: startTime,
CloseTimestamp: time.Now().UnixNano(),
Status: gen.WorkflowExecutionCloseStatusFailed,
})
CloseTimestamp: time.Now().UnixNano(),
HistoryLength: 3,
}
err3 := s.VisibilityMgr.RecordWorkflowExecutionClosed(closeReq)
s.Nil(err3)

// List closed with filtering
Expand All @@ -463,7 +478,7 @@ func (s *VisibilityPersistenceSuite) TestFilteringByCloseStatus() {
})
s.Nil(err4)
s.Equal(1, len(resp.Executions))
s.Equal(workflowExecution2.WorkflowId, resp.Executions[0].Execution.WorkflowId)
s.assertClosedExecutionEquals(closeReq, resp.Executions[0])
}

// TestGetClosedExecution test
Expand Down Expand Up @@ -493,21 +508,129 @@ func (s *VisibilityPersistenceSuite) TestGetClosedExecution() {
s.True(ok, "EntityNotExistsError")
s.Nil(closedResp)

err2 := s.VisibilityMgr.RecordWorkflowExecutionClosed(&p.RecordWorkflowExecutionClosedRequest{
closeReq := &p.RecordWorkflowExecutionClosedRequest{
DomainUUID: testDomainUUID,
Execution: workflowExecution,
WorkflowTypeName: "visibility-workflow",
StartTimestamp: startTime,
Status: gen.WorkflowExecutionCloseStatusFailed,
CloseTimestamp: time.Now().UnixNano(),
HistoryLength: 3,
}
err2 := s.VisibilityMgr.RecordWorkflowExecutionClosed(closeReq)
s.Nil(err2)

resp, err3 := s.VisibilityMgr.GetClosedWorkflowExecution(&p.GetClosedWorkflowExecutionRequest{
DomainUUID: testDomainUUID,
Execution: workflowExecution,
})
s.Nil(err3)
s.assertClosedExecutionEquals(closeReq, resp.Execution)
}

// TestClosedWithoutStarted
func (s *VisibilityPersistenceSuite) TestClosedWithoutStarted() {
testDomainUUID := uuid.New()
workflowExecution := gen.WorkflowExecution{
WorkflowId: common.StringPtr("visibility-workflow-test"),
RunId: common.StringPtr("1bdb0122-e8c9-4b35-b6f8-d692ab259b09"),
}

closedResp, err0 := s.VisibilityMgr.GetClosedWorkflowExecution(&p.GetClosedWorkflowExecutionRequest{
DomainUUID: testDomainUUID,
Execution: workflowExecution,
})
s.Error(err0)
_, ok := err0.(*gen.EntityNotExistsError)
s.True(ok, "EntityNotExistsError")
s.Nil(closedResp)

closeReq := &p.RecordWorkflowExecutionClosedRequest{
DomainUUID: testDomainUUID,
Execution: workflowExecution,
WorkflowTypeName: "visibility-workflow",
StartTimestamp: time.Now().Add(time.Second * -5).UnixNano(),
Status: gen.WorkflowExecutionCloseStatusFailed,
CloseTimestamp: time.Now().UnixNano(),
HistoryLength: 3,
}
err1 := s.VisibilityMgr.RecordWorkflowExecutionClosed(closeReq)
s.Nil(err1)

resp, err2 := s.VisibilityMgr.GetClosedWorkflowExecution(&p.GetClosedWorkflowExecutionRequest{
DomainUUID: testDomainUUID,
Execution: workflowExecution,
})
s.Nil(err2)
s.assertClosedExecutionEquals(closeReq, resp.Execution)
}

// TestMultipleUpserts
func (s *VisibilityPersistenceSuite) TestMultipleUpserts() {
testDomainUUID := uuid.New()

workflowExecution := gen.WorkflowExecution{
WorkflowId: common.StringPtr("visibility-workflow-test"),
RunId: common.StringPtr("a3dbc7bf-deb1-4946-b57c-cf0615ea553f"),
}

startTime := time.Now().Add(time.Second * -5).UnixNano()
closeReq := &p.RecordWorkflowExecutionClosedRequest{
DomainUUID: testDomainUUID,
Execution: workflowExecution,
WorkflowTypeName: "visibility-workflow",
StartTimestamp: startTime,
Status: gen.WorkflowExecutionCloseStatusFailed,
CloseTimestamp: time.Now().UnixNano(),
HistoryLength: 3,
}

count := 3
for i := 0; i < count; i++ {
err0 := s.VisibilityMgr.RecordWorkflowExecutionStarted(&p.RecordWorkflowExecutionStartedRequest{
DomainUUID: testDomainUUID,
Execution: workflowExecution,
WorkflowTypeName: "visibility-workflow",
StartTimestamp: startTime,
})
s.Nil(err0)
if i < count-1 {
err1 := s.VisibilityMgr.RecordWorkflowExecutionClosed(closeReq)
s.Nil(err1)
}
}

resp, err3 := s.VisibilityMgr.GetClosedWorkflowExecution(&p.GetClosedWorkflowExecutionRequest{
DomainUUID: testDomainUUID,
Execution: workflowExecution,
})
s.Nil(err3)
s.Equal(workflowExecution.WorkflowId, resp.Execution.Execution.WorkflowId)
s.Equal(int64(3), *resp.Execution.HistoryLength)
s.assertClosedExecutionEquals(closeReq, resp.Execution)

}

func (s *VisibilityPersistenceSuite) assertClosedExecutionEquals(
req *p.RecordWorkflowExecutionClosedRequest, resp *gen.WorkflowExecutionInfo) {
s.Equal(req.Execution.RunId, resp.Execution.RunId)
s.Equal(req.Execution.WorkflowId, resp.Execution.WorkflowId)
s.Equal(req.WorkflowTypeName, resp.GetType().GetName())
s.Equal(s.nanosToMillis(req.StartTimestamp), s.nanosToMillis(resp.GetStartTime()))
s.Equal(s.nanosToMillis(req.CloseTimestamp), s.nanosToMillis(resp.GetCloseTime()))
s.Equal(req.Status, resp.GetCloseStatus())
s.Equal(req.HistoryLength, *resp.HistoryLength)
}

func (s *VisibilityPersistenceSuite) assertOpenExecutionEquals(
req *p.RecordWorkflowExecutionStartedRequest, resp *gen.WorkflowExecutionInfo) {
s.Equal(req.Execution.RunId, resp.Execution.RunId)
s.Equal(req.Execution.WorkflowId, resp.Execution.WorkflowId)
s.Equal(req.WorkflowTypeName, resp.GetType().GetName())
s.Equal(s.nanosToMillis(req.StartTimestamp), s.nanosToMillis(resp.GetStartTime()))
s.Nil(resp.CloseTime)
s.Nil(resp.CloseStatus)
s.Nil(resp.HistoryLength)
}

func (s *VisibilityPersistenceSuite) nanosToMillis(nanos int64) int64 {
return nanos / int64(time.Millisecond)
}
38 changes: 14 additions & 24 deletions common/persistence/sql/sqlVisibilityStore.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,47 +61,37 @@ func NewSQLVisibilityStore(cfg config.SQL, logger bark.Logger) (p.VisibilityMana
}

func (s *sqlVisibilityStore) RecordWorkflowExecutionStarted(request *p.RecordWorkflowExecutionStartedRequest) error {
result, err := s.db.InsertIntoVisibility(&sqldb.VisibilityRow{
_, err := s.db.InsertIntoVisibility(&sqldb.VisibilityRow{
DomainID: request.DomainUUID,
WorkflowID: *request.Execution.WorkflowId,
RunID: *request.Execution.RunId,
StartTime: time.Unix(0, request.StartTimestamp),
WorkflowTypeName: request.WorkflowTypeName,
})
if err != nil {
return err
}
noRowsAffected, err := result.RowsAffected()
if err != nil {
return fmt.Errorf("RecordWorkflowExecutionStarted rowsAffected error: %v", err)
}
if noRowsAffected != 1 {
return fmt.Errorf("RecordWorkflowExecutionStarted %v rows updated instead of one", noRowsAffected)
}
return nil
return err
}

func (s *sqlVisibilityStore) RecordWorkflowExecutionClosed(request *p.RecordWorkflowExecutionClosedRequest) error {
closeTime := time.Unix(0, request.CloseTimestamp)
result, err := s.db.UpdateVisibility(&sqldb.VisibilityRow{
CloseTime: &closeTime,
CloseStatus: common.Int32Ptr(int32(request.Status)),
HistoryLength: &request.HistoryLength,
DomainID: request.DomainUUID,
RunID: *request.Execution.RunId,
result, err := s.db.ReplaceIntoVisibility(&sqldb.VisibilityRow{
DomainID: request.DomainUUID,
WorkflowID: *request.Execution.WorkflowId,
RunID: *request.Execution.RunId,
StartTime: time.Unix(0, request.StartTimestamp),
WorkflowTypeName: request.WorkflowTypeName,
CloseTime: &closeTime,
CloseStatus: common.Int32Ptr(int32(request.Status)),
HistoryLength: &request.HistoryLength,
})
if err != nil {
return err
}
noRowsAffected, err := result.RowsAffected()
if err != nil {
return fmt.Errorf("RecordWorkflowExecutionStarted rowsAffected error: %v", err)
return fmt.Errorf("RecordWorkflowExecutionClosed rowsAffected error: %v", err)
}
if noRowsAffected != 1 {
// TODO:this should never happen since we always create the record before setting it to closed
// However, its ideal to provide same semantics as cassandra API here i.e. always upsert old
// record and return success
return nil
if noRowsAffected > 2 { // either adds a new row or deletes old row and adds new row
return fmt.Errorf("RecordWorkflowExecutionClosed unexpected numRows (%v) updated", noRowsAffected)
}
return nil
}
Expand Down
Loading

0 comments on commit ad28117

Please sign in to comment.