Skip to content

Commit

Permalink
Support read from closed execution v2 (cadence-workflow#1470)
Browse files Browse the repository at this point in the history
  • Loading branch information
vancexu authored Feb 16, 2019
1 parent ae5bb31 commit 5bda240
Show file tree
Hide file tree
Showing 9 changed files with 389 additions and 57 deletions.
297 changes: 297 additions & 0 deletions common/persistence/cassandra/cassandraVisibilityPersistenceV2.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,297 @@
// Copyright (c) 2017 Uber Technologies, Inc.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.

package cassandra

import (
"fmt"
"github.com/gocql/gocql"
"github.com/uber-common/bark"
workflow "github.com/uber/cadence/.gen/go/shared"
p "github.com/uber/cadence/common/persistence"
"github.com/uber/cadence/common/service/config"
)

const (
templateGetClosedWorkflowExecutionsV2 = `SELECT workflow_id, run_id, start_time, close_time, workflow_type_name, status, history_length ` +
`FROM closed_executions_v2 ` +
`WHERE domain_id = ? ` +
`AND domain_partition IN (?) ` +
`AND close_time >= ? ` +
`AND close_time <= ? `

templateGetClosedWorkflowExecutionsByTypeV2 = `SELECT workflow_id, run_id, start_time, close_time, workflow_type_name, status, history_length ` +
`FROM closed_executions_v2 ` +
`WHERE domain_id = ? ` +
`AND domain_partition = ? ` +
`AND close_time >= ? ` +
`AND close_time <= ? ` +
`AND workflow_type_name = ? `

templateGetClosedWorkflowExecutionsByIDV2 = `SELECT workflow_id, run_id, start_time, close_time, workflow_type_name, status, history_length ` +
`FROM closed_executions_v2 ` +
`WHERE domain_id = ? ` +
`AND domain_partition = ? ` +
`AND close_time >= ? ` +
`AND close_time <= ? ` +
`AND workflow_id = ? `

templateGetClosedWorkflowExecutionsByStatusV2 = `SELECT workflow_id, run_id, start_time, close_time, workflow_type_name, status, history_length ` +
`FROM closed_executions_v2 ` +
`WHERE domain_id = ? ` +
`AND domain_partition = ? ` +
`AND close_time >= ? ` +
`AND close_time <= ? ` +
`AND status = ? `
)

type (
cassandraVisibilityPersistenceV2 struct {
cassandraStore
lowConslevel gocql.Consistency
persistence p.VisibilityManager
}
)

// NewVisibilityPersistenceV2 create a wrapper of cassandra visibilityPersistence, with all list closed executions using v2 table
func NewVisibilityPersistenceV2(persistence p.VisibilityManager, cfg *config.Cassandra, logger bark.Logger) (p.VisibilityManager, error) {
cluster := NewCassandraCluster(cfg.Hosts, cfg.Port, cfg.User, cfg.Password, cfg.Datacenter)
cluster.Keyspace = cfg.Keyspace
cluster.ProtoVersion = cassandraProtoVersion
cluster.Consistency = gocql.LocalQuorum
cluster.SerialConsistency = gocql.LocalSerial
cluster.Timeout = defaultSessionTimeout

session, err := cluster.CreateSession()
if err != nil {
return nil, err
}

return &cassandraVisibilityPersistenceV2{
cassandraStore: cassandraStore{session: session, logger: logger},
lowConslevel: gocql.One,
persistence: persistence,
}, nil
}

// Close releases the resources held by this object
func (v *cassandraVisibilityPersistenceV2) Close() {
if v.session != nil {
v.session.Close()
}
v.persistence.Close()
}

func (v *cassandraVisibilityPersistenceV2) GetName() string {
return v.persistence.GetName()
}

func (v *cassandraVisibilityPersistenceV2) RecordWorkflowExecutionStarted(
request *p.RecordWorkflowExecutionStartedRequest) error {
return v.persistence.RecordWorkflowExecutionStarted(request)
}

func (v *cassandraVisibilityPersistenceV2) RecordWorkflowExecutionClosed(
request *p.RecordWorkflowExecutionClosedRequest) error {
return v.persistence.RecordWorkflowExecutionClosed(request)
}

func (v *cassandraVisibilityPersistenceV2) ListOpenWorkflowExecutions(
request *p.ListWorkflowExecutionsRequest) (*p.ListWorkflowExecutionsResponse, error) {
return v.persistence.ListOpenWorkflowExecutions(request)
}

func (v *cassandraVisibilityPersistenceV2) ListOpenWorkflowExecutionsByType(
request *p.ListWorkflowExecutionsByTypeRequest) (*p.ListWorkflowExecutionsResponse, error) {
return v.persistence.ListOpenWorkflowExecutionsByType(request)
}

func (v *cassandraVisibilityPersistenceV2) ListOpenWorkflowExecutionsByWorkflowID(
request *p.ListWorkflowExecutionsByWorkflowIDRequest) (*p.ListWorkflowExecutionsResponse, error) {
return v.persistence.ListOpenWorkflowExecutionsByWorkflowID(request)
}

func (v *cassandraVisibilityPersistenceV2) GetClosedWorkflowExecution(
request *p.GetClosedWorkflowExecutionRequest) (*p.GetClosedWorkflowExecutionResponse, error) {
return v.persistence.GetClosedWorkflowExecution(request)
}

func (v *cassandraVisibilityPersistenceV2) ListClosedWorkflowExecutions(
request *p.ListWorkflowExecutionsRequest) (*p.ListWorkflowExecutionsResponse, error) {
query := v.session.Query(templateGetClosedWorkflowExecutionsV2,
request.DomainUUID,
domainPartition,
p.UnixNanoToDBTimestamp(request.EarliestStartTime),
p.UnixNanoToDBTimestamp(request.LatestStartTime)).Consistency(v.lowConslevel)
iter := query.PageSize(request.PageSize).PageState(request.NextPageToken).Iter()
if iter == nil {
// TODO: should return a bad request error if the token is invalid
return nil, &workflow.InternalServiceError{
Message: "ListClosedWorkflowExecutions operation failed. Not able to create query iterator.",
}
}

response := &p.ListWorkflowExecutionsResponse{}
response.Executions = make([]*workflow.WorkflowExecutionInfo, 0)
wfexecution, has := readClosedWorkflowExecutionRecord(iter)
for has {
response.Executions = append(response.Executions, wfexecution)
wfexecution, has = readClosedWorkflowExecutionRecord(iter)
}

nextPageToken := iter.PageState()
response.NextPageToken = make([]byte, len(nextPageToken))
copy(response.NextPageToken, nextPageToken)
if err := iter.Close(); err != nil {
if isThrottlingError(err) {
return nil, &workflow.ServiceBusyError{
Message: fmt.Sprintf("ListClosedWorkflowExecutions operation failed. Error: %v", err),
}
}
return nil, &workflow.InternalServiceError{
Message: fmt.Sprintf("ListClosedWorkflowExecutions operation failed. Error: %v", err),
}
}

return response, nil
}

func (v *cassandraVisibilityPersistenceV2) ListClosedWorkflowExecutionsByType(
request *p.ListWorkflowExecutionsByTypeRequest) (*p.ListWorkflowExecutionsResponse, error) {
query := v.session.Query(templateGetClosedWorkflowExecutionsByTypeV2,
request.DomainUUID,
domainPartition,
p.UnixNanoToDBTimestamp(request.EarliestStartTime),
p.UnixNanoToDBTimestamp(request.LatestStartTime),
request.WorkflowTypeName).Consistency(v.lowConslevel)
iter := query.PageSize(request.PageSize).PageState(request.NextPageToken).Iter()
if iter == nil {
// TODO: should return a bad request error if the token is invalid
return nil, &workflow.InternalServiceError{
Message: "ListClosedWorkflowExecutionsByType operation failed. Not able to create query iterator.",
}
}

response := &p.ListWorkflowExecutionsResponse{}
response.Executions = make([]*workflow.WorkflowExecutionInfo, 0)
wfexecution, has := readClosedWorkflowExecutionRecord(iter)
for has {
response.Executions = append(response.Executions, wfexecution)
wfexecution, has = readClosedWorkflowExecutionRecord(iter)
}

nextPageToken := iter.PageState()
response.NextPageToken = make([]byte, len(nextPageToken))
copy(response.NextPageToken, nextPageToken)
if err := iter.Close(); err != nil {
if isThrottlingError(err) {
return nil, &workflow.ServiceBusyError{
Message: fmt.Sprintf("ListClosedWorkflowExecutionsByType operation failed. Error: %v", err),
}
}
return nil, &workflow.InternalServiceError{
Message: fmt.Sprintf("ListClosedWorkflowExecutionsByType operation failed. Error: %v", err),
}
}

return response, nil
}

func (v *cassandraVisibilityPersistenceV2) ListClosedWorkflowExecutionsByWorkflowID(
request *p.ListWorkflowExecutionsByWorkflowIDRequest) (*p.ListWorkflowExecutionsResponse, error) {
query := v.session.Query(templateGetClosedWorkflowExecutionsByIDV2,
request.DomainUUID,
domainPartition,
p.UnixNanoToDBTimestamp(request.EarliestStartTime),
p.UnixNanoToDBTimestamp(request.LatestStartTime),
request.WorkflowID).Consistency(v.lowConslevel)
iter := query.PageSize(request.PageSize).PageState(request.NextPageToken).Iter()
if iter == nil {
// TODO: should return a bad request error if the token is invalid
return nil, &workflow.InternalServiceError{
Message: "ListClosedWorkflowExecutionsByWorkflowID operation failed. Not able to create query iterator.",
}
}

response := &p.ListWorkflowExecutionsResponse{}
response.Executions = make([]*workflow.WorkflowExecutionInfo, 0)
wfexecution, has := readClosedWorkflowExecutionRecord(iter)
for has {
response.Executions = append(response.Executions, wfexecution)
wfexecution, has = readClosedWorkflowExecutionRecord(iter)
}

nextPageToken := iter.PageState()
response.NextPageToken = make([]byte, len(nextPageToken))
copy(response.NextPageToken, nextPageToken)
if err := iter.Close(); err != nil {
if isThrottlingError(err) {
return nil, &workflow.ServiceBusyError{
Message: fmt.Sprintf("ListClosedWorkflowExecutionsByWorkflowID operation failed. Error: %v", err),
}
}
return nil, &workflow.InternalServiceError{
Message: fmt.Sprintf("ListClosedWorkflowExecutionsByWorkflowID operation failed. Error: %v", err),
}
}

return response, nil
}

func (v *cassandraVisibilityPersistenceV2) ListClosedWorkflowExecutionsByStatus(
request *p.ListClosedWorkflowExecutionsByStatusRequest) (*p.ListWorkflowExecutionsResponse, error) {
query := v.session.Query(templateGetClosedWorkflowExecutionsByStatusV2,
request.DomainUUID,
domainPartition,
p.UnixNanoToDBTimestamp(request.EarliestStartTime),
p.UnixNanoToDBTimestamp(request.LatestStartTime),
request.Status).Consistency(v.lowConslevel)
iter := query.PageSize(request.PageSize).PageState(request.NextPageToken).Iter()
if iter == nil {
// TODO: should return a bad request error if the token is invalid
return nil, &workflow.InternalServiceError{
Message: "ListClosedWorkflowExecutionsByStatus operation failed. Not able to create query iterator.",
}
}

response := &p.ListWorkflowExecutionsResponse{}
response.Executions = make([]*workflow.WorkflowExecutionInfo, 0)
wfexecution, has := readClosedWorkflowExecutionRecord(iter)
for has {
response.Executions = append(response.Executions, wfexecution)
wfexecution, has = readClosedWorkflowExecutionRecord(iter)
}

nextPageToken := iter.PageState()
response.NextPageToken = make([]byte, len(nextPageToken))
copy(response.NextPageToken, nextPageToken)
if err := iter.Close(); err != nil {
if isThrottlingError(err) {
return nil, &workflow.ServiceBusyError{
Message: fmt.Sprintf("ListClosedWorkflowExecutionsByStatus operation failed. Error: %v", err),
}
}
return nil, &workflow.InternalServiceError{
Message: fmt.Sprintf("ListClosedWorkflowExecutionsByStatus operation failed. Error: %v", err),
}
}

return response, nil
}
22 changes: 18 additions & 4 deletions common/persistence/persistence-factory/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ type (
// NewExecutionManager returns a new execution manager for a given shardID
NewExecutionManager(shardID int) (p.ExecutionManager, error)
// NewVisibilityManager returns a new visibility manager
NewVisibilityManager(enableSampling bool) (p.VisibilityManager, error)
NewVisibilityManager() (p.VisibilityManager, error)
}
// DataStoreFactory is a low level interface to be implemented by a datastore
// Examples of datastores are cassandra, mysql etc
Expand Down Expand Up @@ -263,17 +263,21 @@ func (f *factoryImpl) NewExecutionManager(shardID int) (p.ExecutionManager, erro
}

// NewVisibilityManager returns a new visibility manager
func (f *factoryImpl) NewVisibilityManager(enableSampling bool) (p.VisibilityManager, error) {
func (f *factoryImpl) NewVisibilityManager() (p.VisibilityManager, error) {
ds := f.datastores[storeTypeVisibility]
result, err := ds.factory.NewVisibilityStore()
if err != nil {
return nil, err
}
visConfig := f.config.VisibilityConfig
if visConfig != nil && visConfig.EnableReadFromClosedExecutionV2() && f.isCassandra() {
result, err = cassandra.NewVisibilityPersistenceV2(result, f.getCassandraConfig(), f.logger)
}
if ds.ratelimit != nil {
result = p.NewVisibilityPersistenceRateLimitedClient(result, ds.ratelimit, f.logger)
}
if enableSampling {
result = p.NewVisibilitySamplingClient(result, &f.config.SamplingConfig, f.metricsClient, f.logger)
if visConfig != nil && visConfig.EnableSampling() {
result = p.NewVisibilitySamplingClient(result, visConfig, f.metricsClient, f.logger)
}
if f.metricsClient != nil {
result = p.NewVisibilityPersistenceMetricsClient(result, f.metricsClient, f.logger)
Expand All @@ -287,6 +291,16 @@ func (f *factoryImpl) Close() {
ds.factory.Close()
}

func (f *factoryImpl) isCassandra() bool {
cfg := f.config
return cfg.DataStores[cfg.VisibilityStore].SQL == nil
}

func (f *factoryImpl) getCassandraConfig() *config.Cassandra {
cfg := f.config
return cfg.DataStores[cfg.VisibilityStore].Cassandra
}

func newStore(cfg config.DataStore, tb common.TokenBucket, clusterName string, maxConnsOverride int, logger bark.Logger) Datastore {
var ds Datastore
ds.ratelimit = tb
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,7 @@ func (s *TestBase) Setup() {
visibilityFactory = pfactory.New(&vCfg, clusterName, nil, log)
}
// SQL currently doesn't have support for visibility manager
s.VisibilityMgr, err = visibilityFactory.NewVisibilityManager(false)
s.VisibilityMgr, err = visibilityFactory.NewVisibilityManager()
if err != nil {
s.fatalOnError("NewVisibilityManager", err)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ func (s *VisibilitySamplingSuite) SetupTest() {
s.Assertions = require.New(s.T()) // Have to define our overridden assertions in the test setup. If we did it earlier, s.T() will return nil

s.persistence = &mocks.VisibilityManager{}
config := &c.SamplingConfig{
config := &c.VisibilityConfig{
VisibilityOpenMaxQPS: dynamicconfig.GetIntPropertyFilteredByDomain(1),
VisibilityClosedMaxQPS: dynamicconfig.GetIntPropertyFilteredByDomain(10),
VisibilityListMaxQPS: dynamicconfig.GetIntPropertyFilteredByDomain(1),
Expand Down
4 changes: 2 additions & 2 deletions common/persistence/visibilitySamplingClient.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,15 +43,15 @@ type visibilitySamplingClient struct {
rateLimitersForClosed *domainToBucketMap
rateLimitersForList *domainToBucketMap
persistence VisibilityManager
config *config.SamplingConfig
config *config.VisibilityConfig
metricClient metrics.Client
logger bark.Logger
}

var _ VisibilityManager = (*visibilitySamplingClient)(nil)

// NewVisibilitySamplingClient creates a client to manage visibility with sampling
func NewVisibilitySamplingClient(persistence VisibilityManager, config *config.SamplingConfig, metricClient metrics.Client, logger bark.Logger) VisibilityManager {
func NewVisibilitySamplingClient(persistence VisibilityManager, config *config.VisibilityConfig, metricClient metrics.Client, logger bark.Logger) VisibilityManager {
return &visibilitySamplingClient{
persistence: persistence,
rateLimitersForOpen: newDomainToBucketMap(),
Expand Down
Loading

0 comments on commit 5bda240

Please sign in to comment.