Skip to content

Commit

Permalink
Visibility Store schema and persistence layer APIs (cadence-workflow#111
Browse files Browse the repository at this point in the history
  • Loading branch information
Tamer Eldeeb authored Apr 3, 2017
1 parent 511cd17 commit 49da962
Show file tree
Hide file tree
Showing 6 changed files with 463 additions and 2 deletions.
205 changes: 205 additions & 0 deletions common/persistence/cassandraVisibilityPersistence.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,205 @@
package persistence

import (
"fmt"
"time"

"github.com/gocql/gocql"
"github.com/uber-common/bark"

workflow "github.com/uber/cadence/.gen/go/shared"
"github.com/uber/cadence/common"
)

// Fixed domain values for now
const (
domainPartition = 0
)

const (
templateCreateWorkflowExecutionStarted = `INSERT INTO open_executions (` +
`domain_id, domain_partition, workflow_id, run_id, start_time, workflow_type_name) ` +
`VALUES (?, ?, ?, ?, ?, ?)`

templateDeleteWorkflowExecutionStarted = `DELETE FROM open_executions ` +
`WHERE domain_id = ? ` +
`AND domain_partition = ? ` +
`AND workflow_id = ? ` +
`AND run_id = ? `

templateCreateWorkflowExecutionClosed = `INSERT INTO closed_executions (` +
`domain_id, domain_partition, workflow_id, run_id, start_time, close_time, workflow_type_name) ` +
`VALUES (?, ?, ?, ?, ?, ?, ?) using TTL ?`

templateGetOpenWorkflowExecutions = `SELECT workflow_id, run_id, start_time, workflow_type_name ` +
`FROM open_executions ` +
`WHERE domain_id = ? ` +
`AND domain_partition IN (?) `

templateGetClosedWorkflowExecutions = `SELECT workflow_id, run_id, start_time, close_time, workflow_type_name ` +
`FROM closed_executions ` +
`WHERE domain_id = ? ` +
`AND domain_partition IN (?) `
)

type (
cassandraVisibilityPersistence struct {
session *gocql.Session
lowConslevel gocql.Consistency
logger bark.Logger
}
)

// NewCassandraVisibilityPersistence is used to create an instance of VisibilityManager implementation
func NewCassandraVisibilityPersistence(
hosts string, dc string, keyspace string, logger bark.Logger) (VisibilityManager, error) {
cluster := common.NewCassandraCluster(hosts, dc)
cluster.Keyspace = keyspace
cluster.ProtoVersion = cassandraProtoVersion
cluster.Consistency = gocql.LocalQuorum
cluster.SerialConsistency = gocql.LocalSerial
cluster.Timeout = defaultSessionTimeout

session, err := cluster.CreateSession()
if err != nil {
return nil, err
}

return &cassandraVisibilityPersistence{session: session, lowConslevel: gocql.One, logger: logger}, nil
}

func (v *cassandraVisibilityPersistence) RecordWorkflowExecutionStarted(
request *RecordWorkflowExecutionStartedRequest) error {
query := v.session.Query(templateCreateWorkflowExecutionStarted,
request.DomainUUID,
domainPartition,
request.Execution.GetWorkflowId(),
request.Execution.GetRunId(),
request.StartTime,
request.WorkflowTypeName,
)
err := query.Exec()
if err != nil {
return &workflow.InternalServiceError{
Message: fmt.Sprintf("RecordWorkflowExecutionStarted operation failed. Error: %v", err),
}
}

return nil
}

func (v *cassandraVisibilityPersistence) RecordWorkflowExecutionClosed(
request *RecordWorkflowExecutionClosedRequest) error {
batch := v.session.NewBatch(gocql.LoggedBatch)

// First, remove execution from the open table
batch.Query(templateDeleteWorkflowExecutionStarted,
request.DomainUUID,
domainPartition,
request.Execution.GetWorkflowId(),
request.Execution.GetRunId(),
)

// Next, add a row in the closed table. This row is kepy for defaultDeleteTTLSeconds
batch.Query(templateCreateWorkflowExecutionClosed,
request.DomainUUID,
domainPartition,
request.Execution.GetWorkflowId(),
request.Execution.GetRunId(),
request.StartTime,
request.CloseTime,
request.WorkflowTypeName,
defaultDeleteTTLSeconds,
)
err := v.session.ExecuteBatch(batch)
if err != nil {
return &workflow.InternalServiceError{
Message: fmt.Sprintf("RecordWorkflowExecutionClosed operation failed. Error: %v", err),
}
}
return nil
}

func (v *cassandraVisibilityPersistence) ListOpenWorkflowExecutions(
request *ListWorkflowExecutionsRequest) (*ListWorkflowExecutionsResponse, error) {
query := v.session.Query(templateGetOpenWorkflowExecutions, request.DomainUUID, domainPartition)
iter := query.PageSize(request.PageSize).PageState(request.NextPageToken).Iter()
if iter == nil {
// TODO: should return a bad request error if the token is invalid
return nil, &workflow.InternalServiceError{
Message: "ListOpenWorkflowExecutions operation failed. Not able to create query iterator.",
}
}

response := &ListWorkflowExecutionsResponse{}
response.Executions = make([]*WorkflowExecutionRecord, 0)
rec := make(map[string]interface{})
for iter.MapScan(rec) {
wfexecution := createWorkflowExecutionRecord(rec)
response.Executions = append(response.Executions, wfexecution)
}

nextPageToken := iter.PageState()
response.NextPageToken = make([]byte, len(nextPageToken))
copy(response.NextPageToken, nextPageToken)
if err := iter.Close(); err != nil {
return nil, &workflow.InternalServiceError{
Message: fmt.Sprintf("ListOpenWorkflowExecutions operation failed. Error: %v", err),
}
}

return response, nil
}

func (v *cassandraVisibilityPersistence) ListClosedWorkflowExecutions(
request *ListWorkflowExecutionsRequest) (*ListWorkflowExecutionsResponse, error) {
query := v.session.Query(templateGetClosedWorkflowExecutions, request.DomainUUID, domainPartition)
iter := query.PageSize(request.PageSize).PageState(request.NextPageToken).Iter()
if iter == nil {
// TODO: should return a bad request error if the token is invalid
return nil, &workflow.InternalServiceError{
Message: "ListOpenWorkflowExecutions operation failed. Not able to create query iterator.",
}
}

response := &ListWorkflowExecutionsResponse{}
response.Executions = make([]*WorkflowExecutionRecord, 0)
rec := make(map[string]interface{})
for iter.MapScan(rec) {
wfexecution := createWorkflowExecutionRecord(rec)
response.Executions = append(response.Executions, wfexecution)
}

nextPageToken := iter.PageState()
response.NextPageToken = make([]byte, len(nextPageToken))
copy(response.NextPageToken, nextPageToken)
if err := iter.Close(); err != nil {
return nil, &workflow.InternalServiceError{
Message: fmt.Sprintf("ListOpenWorkflowExecutions operation failed. Error: %v", err),
}
}

return response, nil
}

func createWorkflowExecutionRecord(result map[string]interface{}) *WorkflowExecutionRecord {
record := &WorkflowExecutionRecord{}
for k, v := range result {
switch k {
case "workflow_id":
record.Execution.WorkflowId = common.StringPtr(v.(string))
case "run_id":
record.Execution.RunId = common.StringPtr(v.(gocql.UUID).String())
case "workflow_type_name":
record.WorkflowTypeName = v.(string)
case "start_time":
record.StartTime = v.(time.Time)
case "close_time":
record.CloseTime = v.(time.Time)
default:
// Unknown field, could happen due to schema update
}
}

return record
}
152 changes: 152 additions & 0 deletions common/persistence/cassandraVisibilityPersistence_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,152 @@
package persistence

import (
"os"
"testing"
"time"

log "github.com/Sirupsen/logrus"
"github.com/pborman/uuid"
"github.com/stretchr/testify/require"
"github.com/stretchr/testify/suite"

gen "github.com/uber/cadence/.gen/go/shared"
"github.com/uber/cadence/common"
)

type (
visibilityPersistenceSuite struct {
suite.Suite
TestBase
// override suite.Suite.Assertions with require.Assertions; this means that s.NotNil(nil) will stop the test,
// not merely log an error
*require.Assertions
}
)

func TestVisibilityPersistenceSuite(t *testing.T) {
s := new(visibilityPersistenceSuite)
suite.Run(t, s)
}

func (s *visibilityPersistenceSuite) SetupSuite() {
if testing.Verbose() {
log.SetOutput(os.Stdout)
}

s.SetupWorkflowStore()
}

func (s *visibilityPersistenceSuite) SetupTest() {
// Have to define our overridden assertions in the test setup. If we did it earlier, s.T() will return nil
s.Assertions = require.New(s.T())
}

func (s *visibilityPersistenceSuite) TearDownSuite() {
s.TearDownWorkflowStore()
}

func (s *visibilityPersistenceSuite) TestBasicVisibility() {
testDomainUUID := uuid.New()

workflowExecution := gen.WorkflowExecution{
WorkflowId: common.StringPtr("visibility-workflow-test"),
RunId: common.StringPtr("fb15e4b5-356f-466d-8c6d-a29223e5c536"),
}

startTime := time.Now().Add(time.Second * -5)
err0 := s.VisibilityMgr.RecordWorkflowExecutionStarted(&RecordWorkflowExecutionStartedRequest{
DomainUUID: testDomainUUID,
Execution: workflowExecution,
WorkflowTypeName: "visibility-workflow",
StartTime: startTime,
})
s.Nil(err0)

resp, err1 := s.VisibilityMgr.ListOpenWorkflowExecutions(&ListWorkflowExecutionsRequest{
DomainUUID: testDomainUUID,
PageSize: 1,
})
s.Nil(err1)
s.Equal(1, len(resp.Executions))
s.Equal(workflowExecution.GetWorkflowId(), resp.Executions[0].Execution.GetWorkflowId())

err2 := s.VisibilityMgr.RecordWorkflowExecutionClosed(&RecordWorkflowExecutionClosedRequest{
DomainUUID: testDomainUUID,
Execution: workflowExecution,
WorkflowTypeName: "visibility-workflow",
StartTime: startTime,
CloseTime: time.Now(),
})
s.Nil(err2)

resp, err3 := s.VisibilityMgr.ListOpenWorkflowExecutions(&ListWorkflowExecutionsRequest{
DomainUUID: testDomainUUID,
PageSize: 1,
})
s.Nil(err3)
s.Equal(0, len(resp.Executions))

resp, err4 := s.VisibilityMgr.ListClosedWorkflowExecutions(&ListWorkflowExecutionsRequest{
DomainUUID: testDomainUUID,
PageSize: 1,
})
s.Nil(err4)
s.Equal(1, len(resp.Executions))
}

func (s *visibilityPersistenceSuite) TestVisibilityPagination() {
testDomainUUID := uuid.New()
// Create 2 executions
workflowExecution1 := gen.WorkflowExecution{
WorkflowId: common.StringPtr("visibility-pagination-test1"),
RunId: common.StringPtr("fb15e4b5-356f-466d-8c6d-a29223e5c536"),
}
err0 := s.VisibilityMgr.RecordWorkflowExecutionStarted(&RecordWorkflowExecutionStartedRequest{
DomainUUID: testDomainUUID,
Execution: workflowExecution1,
WorkflowTypeName: "visibility-workflow",
StartTime: time.Now(),
})
s.Nil(err0)

workflowExecution2 := gen.WorkflowExecution{
WorkflowId: common.StringPtr("visibility-pagination-test2"),
RunId: common.StringPtr("fb15e4b5-356f-466d-8c6d-a29223e5c536"),
}
err1 := s.VisibilityMgr.RecordWorkflowExecutionStarted(&RecordWorkflowExecutionStartedRequest{
DomainUUID: testDomainUUID,
Execution: workflowExecution2,
WorkflowTypeName: "visibility-workflow",
StartTime: time.Now(),
})
s.Nil(err1)

// Get the first one
resp, err2 := s.VisibilityMgr.ListOpenWorkflowExecutions(&ListWorkflowExecutionsRequest{
DomainUUID: testDomainUUID,
PageSize: 1,
})
s.Nil(err2)
s.Equal(1, len(resp.Executions))
s.Equal(workflowExecution1.GetWorkflowId(), resp.Executions[0].Execution.GetWorkflowId())

// Use token to get the second one
resp, err3 := s.VisibilityMgr.ListOpenWorkflowExecutions(&ListWorkflowExecutionsRequest{
DomainUUID: testDomainUUID,
PageSize: 1,
NextPageToken: resp.NextPageToken,
})
s.Nil(err3)
s.Equal(1, len(resp.Executions))
s.Equal(workflowExecution2.GetWorkflowId(), resp.Executions[0].Execution.GetWorkflowId())

// Now should get empty result by using token
resp, err4 := s.VisibilityMgr.ListOpenWorkflowExecutions(&ListWorkflowExecutionsRequest{
DomainUUID: testDomainUUID,
PageSize: 1,
NextPageToken: resp.NextPageToken,
})
s.Nil(err4)
s.Equal(0, len(resp.Executions))
}
5 changes: 3 additions & 2 deletions common/persistence/dataInterfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
workflow "github.com/uber/cadence/.gen/go/shared"
)

// Domain status
const (
DomainStatusRegistered = iota
DomainStatusDeprecated
Expand All @@ -14,7 +15,7 @@ const (

// Workflow execution states
const (
WorkflowStateCreated = iota
WorkflowStateCreated = iota
WorkflowStateRunning
WorkflowStateCompleted
)
Expand All @@ -27,7 +28,7 @@ const (

// Transfer task types
const (
TransferTaskTypeDecisionTask = iota
TransferTaskTypeDecisionTask = iota
TransferTaskTypeActivityTask
TransferTaskTypeDeleteExecution
)
Expand Down
Loading

0 comments on commit 49da962

Please sign in to comment.