Skip to content

Commit

Permalink
Add Pinot as new advanced visibility store option (cadence-workflow#5201
Browse files Browse the repository at this point in the history
)

* Updated server start up to choose visibility manager based on the advanced visibility option
* Added new triple visibility manager to provide options to write to both ES and Pinot for future migration
* Added pinotVisibilityStore and pinotClient to support storing visibility messages in Pinot
* Added util methods to flatten the search attributes into columns
* Added util methods to validate pinot response and query
* Added new kafka indexer message type for Pinot since Pinot doesn't need indexer to process
* Added integration test to set up Pinot test cluster and test Pinot functionality

---------

Co-authored-by: Bowen Xiao <[email protected]>
Co-authored-by: David Porter <[email protected]>
Co-authored-by: Shijie Sheng <[email protected]>
  • Loading branch information
4 people authored Oct 19, 2023
1 parent 7b6cf61 commit eb8eea9
Show file tree
Hide file tree
Showing 46 changed files with 6,854 additions and 34 deletions.
26 changes: 23 additions & 3 deletions cmd/server/cadence/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,12 @@ import (
"log"
"time"

"github.com/uber/cadence/common/persistence"

"go.uber.org/cadence/.gen/go/cadence/workflowserviceclient"
"go.uber.org/cadence/compatibility"

apiv1 "github.com/uber/cadence-idl/go/proto/api/v1"
"github.com/uber/cadence/common/persistence"
"github.com/uber/cadence/service/worker"

"github.com/uber/cadence/common"
"github.com/uber/cadence/common/archiver"
Expand All @@ -52,7 +52,10 @@ import (
"github.com/uber/cadence/service/frontend"
"github.com/uber/cadence/service/history"
"github.com/uber/cadence/service/matching"
"github.com/uber/cadence/service/worker"

"github.com/startreedata/pinot-client-go/pinot"

pnt "github.com/uber/cadence/common/pinot"
)

type (
Expand Down Expand Up @@ -221,6 +224,23 @@ func (s *server) startService() common.Daemon {
}

params.ESConfig = advancedVisStore.ElasticSearch
if params.PersistenceConfig.AdvancedVisibilityStore == common.PinotVisibilityStoreName {
// components like ESAnalyzer is still using ElasticSearch
// The plan is to clean those after we switch to operate on Pinot
esVisibilityStore, ok := s.cfg.Persistence.DataStores[common.ESVisibilityStoreName]
if !ok {
log.Fatalf("Missing Elasticsearch config")
}
params.ESConfig = esVisibilityStore.ElasticSearch
params.PinotConfig = advancedVisStore.Pinot
pinotBroker := params.PinotConfig.Broker
pinotRawClient, err := pinot.NewFromBrokerList([]string{pinotBroker})
if err != nil || pinotRawClient == nil {
log.Fatalf("Creating Pinot visibility client failed: %v", err)
}
pinotClient := pnt.NewPinotClient(pinotRawClient, params.Logger, params.PinotConfig)
params.PinotClient = pinotClient
}
params.ESConfig.SetUsernamePassword()
esClient, err := elasticsearch.NewGenericClient(params.ESConfig, params.Logger)
if err != nil {
Expand Down
2 changes: 2 additions & 0 deletions common/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,8 @@ type (
ShardedNoSQL *ShardedNoSQL `yaml:"shardedNosql"`
// ElasticSearch contains the config for a ElasticSearch datastore
ElasticSearch *ElasticSearchConfig `yaml:"elasticsearch"`
// Pinot contains the config for a Pinot datastore
Pinot *PinotVisibilityConfig `yaml:"pinot"`
}

// Cassandra contains configuration to connect to Cassandra cluster
Expand Down
31 changes: 31 additions & 0 deletions common/config/pinot.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
// Copyright (c) 2017 Uber Technologies, Inc.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.

package config

// PinotVisibilityConfig for connecting to Pinot
type (
PinotVisibilityConfig struct {
Cluster string `yaml:"cluster"` //nolint:govet
Broker string `yaml:"broker"` //nolint:govet
Table string `yaml:"table"` //nolint:govet
ServiceName string `yaml:"serviceName"` //nolint:govet
}
)
12 changes: 11 additions & 1 deletion common/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,15 @@ const (

const (
// VisibilityAppName is used to find kafka topics and ES indexName for visibility
VisibilityAppName = "visibility"
VisibilityAppName = "visibility"
PinotVisibilityAppName = "pinot-visibility"
)

const (
// ESVisibilityStoreName is used to find es advanced visibility store
ESVisibilityStoreName = "es-visibility"
// PinotVisibilityStoreName is used to find pinot advanced visibility store
PinotVisibilityStoreName = "pinot-visibility"
)

// This was flagged by salus as potentially hardcoded credentials. This is a false positive by the scanner and should be
Expand Down Expand Up @@ -149,6 +157,8 @@ const (
AdvancedVisibilityWritingModeOn = "on"
// AdvancedVisibilityWritingModeDual means write to both normal visibility and advanced visibility store
AdvancedVisibilityWritingModeDual = "dual"
// AdvacnedVisibilityWritingModeTriple means write to both normal visibility and advanced visibility store, includes ES and Pinot
AdvacnedVisibilityWritingModeTriple = "triple"
)

const (
Expand Down
11 changes: 11 additions & 0 deletions common/dynamicconfig/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -1438,6 +1438,12 @@ const (
// Default value: true
// Allowed filters: DomainName
EnableReadVisibilityFromES
// EnableReadVisibilityFromPinot is key for enable read from pinot or db visibility, usually using with AdvancedVisibilityWritingMode for seamless migration from db visibility to advanced visibility
// KeyName: system.enableReadVisibilityFromPinot
// Value type: Bool
// Default value: true
// Allowed filters: DomainName
EnableReadVisibilityFromPinot
// EmitShardDiffLog is whether emit the shard diff log
// KeyName: history.emitShardDiffLog
// Value type: Bool
Expand Down Expand Up @@ -3675,6 +3681,11 @@ var BoolKeys = map[BoolKey]DynamicBool{
Description: "EnableReadVisibilityFromES is key for enable read from elastic search or db visibility, usually using with AdvancedVisibilityWritingMode for seamless migration from db visibility to advanced visibility",
DefaultValue: true,
},
EnableReadVisibilityFromPinot: DynamicBool{
KeyName: "system.enableReadVisibilityFromPinot",
Description: "EnableReadVisibilityFromPinot is key for enable read from pinot or db visibility, usually using with AdvancedVisibilityWritingMode for seamless migration from db visibility to advanced visibility",
DefaultValue: true,
},
EmitShardDiffLog: DynamicBool{
KeyName: "history.emitShardDiffLog",
Description: "EmitShardDiffLog is whether emit the shard diff log",
Expand Down
1 change: 1 addition & 0 deletions common/log/tag/values.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,7 @@ var (
ComponentCrossClusterTaskFetcher = component("cross-cluster-task-fetcher")
ComponentShardScanner = component("shardscanner-scanner")
ComponentShardFixer = component("shardscanner-fixer")
ComponentPinotVisibilityManager = component("pinot-visibility-manager")
)

// Pre-defined values for TagSysLifecycle
Expand Down
7 changes: 7 additions & 0 deletions common/messaging/kafka/producerImpl.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,13 @@ func (p *producerImpl) getProducerMessage(message interface{}) (*sarama.Producer
Value: sarama.ByteEncoder(message.Value),
}
return msg, nil
case *indexer.PinotMessage:
msg := &sarama.ProducerMessage{
Topic: p.topic,
Key: sarama.StringEncoder(message.GetWorkflowID()),
Value: sarama.ByteEncoder(message.GetPayload()),
}
return msg, nil
default:
return nil, errors.New("unknown producer message type")
}
Expand Down
73 changes: 73 additions & 0 deletions common/metrics/defs.go
Original file line number Diff line number Diff line change
Expand Up @@ -709,6 +709,41 @@ const (
// ElasticsearchDeleteUninitializedWorkflowExecutionsScope tracks DeleteUninitializedWorkflowExecution calls made by service to persistence layer
ElasticsearchDeleteUninitializedWorkflowExecutionsScope

// PinotRecordWorkflowExecutionStartedScope tracks RecordWorkflowExecutionStarted calls made by service to persistence layer
PinotRecordWorkflowExecutionStartedScope
// PinotRecordWorkflowExecutionClosedScope tracks RecordWorkflowExecutionClosed calls made by service to persistence layer
PinotRecordWorkflowExecutionClosedScope
// PinotRecordWorkflowExecutionUninitializedScope tracks RecordWorkflowExecutionUninitialized calls made by service to persistence layer
PinotRecordWorkflowExecutionUninitializedScope
// PinotUpsertWorkflowExecutionScope tracks UpsertWorkflowExecution calls made by service to persistence layer
PinotUpsertWorkflowExecutionScope
// PinotListOpenWorkflowExecutionsScope tracks ListOpenWorkflowExecutions calls made by service to persistence layer
PinotListOpenWorkflowExecutionsScope
// PinotListClosedWorkflowExecutionsScope tracks ListClosedWorkflowExecutions calls made by service to persistence layer
PinotListClosedWorkflowExecutionsScope
// PinotListOpenWorkflowExecutionsByTypeScope tracks ListOpenWorkflowExecutionsByType calls made by service to persistence layer
PinotListOpenWorkflowExecutionsByTypeScope
// PinotListClosedWorkflowExecutionsByTypeScope tracks ListClosedWorkflowExecutionsByType calls made by service to persistence layer
PinotListClosedWorkflowExecutionsByTypeScope
// PinotListOpenWorkflowExecutionsByWorkflowIDScope tracks ListOpenWorkflowExecutionsByWorkflowID calls made by service to persistence layer
PinotListOpenWorkflowExecutionsByWorkflowIDScope
// PinotListClosedWorkflowExecutionsByWorkflowIDScope tracks ListClosedWorkflowExecutionsByWorkflowID calls made by service to persistence layer
PinotListClosedWorkflowExecutionsByWorkflowIDScope
// PinotListClosedWorkflowExecutionsByStatusScope tracks ListClosedWorkflowExecutionsByStatus calls made by service to persistence layer
PinotListClosedWorkflowExecutionsByStatusScope
// PinotGetClosedWorkflowExecutionScope tracks GetClosedWorkflowExecution calls made by service to persistence layer
PinotGetClosedWorkflowExecutionScope
// PinotListWorkflowExecutionsScope tracks ListWorkflowExecutions calls made by service to persistence layer
PinotListWorkflowExecutionsScope
// PinotScanWorkflowExecutionsScope tracks ScanWorkflowExecutions calls made by service to persistence layer
PinotScanWorkflowExecutionsScope
// PinotCountWorkflowExecutionsScope tracks CountWorkflowExecutions calls made by service to persistence layer
PinotCountWorkflowExecutionsScope
// PinotDeleteWorkflowExecutionsScope tracks DeleteWorkflowExecution calls made by service to persistence layer
PinotDeleteWorkflowExecutionsScope
// PinotDeleteUninitializedWorkflowExecutionsScope tracks DeleteUninitializedWorkflowExecution calls made by service to persistence layer
PinotDeleteUninitializedWorkflowExecutionsScope

// SequentialTaskProcessingScope is used by sequential task processing logic
SequentialTaskProcessingScope
// ParallelTaskProcessingScope is used by parallel task processing logic
Expand Down Expand Up @@ -1551,6 +1586,23 @@ var ScopeDefs = map[ServiceIdx]map[int]scopeDefinition{
ElasticsearchCountWorkflowExecutionsScope: {operation: "CountWorkflowExecutions"},
ElasticsearchDeleteWorkflowExecutionsScope: {operation: "DeleteWorkflowExecution"},
ElasticsearchDeleteUninitializedWorkflowExecutionsScope: {operation: "DeleteUninitializedWorkflowExecution"},
PinotRecordWorkflowExecutionStartedScope: {operation: "RecordWorkflowExecutionStarted"},
PinotRecordWorkflowExecutionClosedScope: {operation: "RecordWorkflowExecutionClosed"},
PinotRecordWorkflowExecutionUninitializedScope: {operation: "RecordWorkflowExecutionUninitialized"},
PinotUpsertWorkflowExecutionScope: {operation: "UpsertWorkflowExecution"},
PinotListOpenWorkflowExecutionsScope: {operation: "ListOpenWorkflowExecutions"},
PinotListClosedWorkflowExecutionsScope: {operation: "ListClosedWorkflowExecutions"},
PinotListOpenWorkflowExecutionsByTypeScope: {operation: "ListOpenWorkflowExecutionsByType"},
PinotListClosedWorkflowExecutionsByTypeScope: {operation: "ListClosedWorkflowExecutionsByType"},
PinotListOpenWorkflowExecutionsByWorkflowIDScope: {operation: "ListOpenWorkflowExecutionsByWorkflowID"},
PinotListClosedWorkflowExecutionsByWorkflowIDScope: {operation: "ListClosedWorkflowExecutionsByWorkflowID"},
PinotListClosedWorkflowExecutionsByStatusScope: {operation: "ListClosedWorkflowExecutionsByStatus"},
PinotGetClosedWorkflowExecutionScope: {operation: "GetClosedWorkflowExecution"},
PinotListWorkflowExecutionsScope: {operation: "ListWorkflowExecutions"},
PinotScanWorkflowExecutionsScope: {operation: "ScanWorkflowExecutions"},
PinotCountWorkflowExecutionsScope: {operation: "CountWorkflowExecutions"},
PinotDeleteWorkflowExecutionsScope: {operation: "DeleteWorkflowExecution"},
PinotDeleteUninitializedWorkflowExecutionsScope: {operation: "DeleteUninitializedWorkflowExecution"},
SequentialTaskProcessingScope: {operation: "SequentialTaskProcessing"},
ParallelTaskProcessingScope: {operation: "ParallelTaskProcessing"},
TaskSchedulerScope: {operation: "TaskScheduler"},
Expand Down Expand Up @@ -1935,6 +1987,17 @@ const (
ElasticsearchErrBadRequestCounterPerDomain
ElasticsearchErrBusyCounterPerDomain

PinotRequests
PinotFailures
PinotLatency
PinotErrBadRequestCounter
PinotErrBusyCounter
PinotRequestsPerDomain
PinotFailuresPerDomain
PinotLatencyPerDomain
PinotErrBadRequestCounterPerDomain
PinotErrBusyCounterPerDomain

SequentialTaskSubmitRequest
SequentialTaskSubmitRequestTaskQueueExist
SequentialTaskSubmitRequestTaskQueueMissing
Expand Down Expand Up @@ -2522,6 +2585,16 @@ var MetricDefs = map[ServiceIdx]map[int]metricDefinition{
ElasticsearchLatencyPerDomain: {metricName: "elasticsearch_latency_per_domain", metricRollupName: "elasticsearch_latency", metricType: Timer},
ElasticsearchErrBadRequestCounterPerDomain: {metricName: "elasticsearch_errors_bad_request_per_domain", metricRollupName: "elasticsearch_errors_bad_request", metricType: Counter},
ElasticsearchErrBusyCounterPerDomain: {metricName: "elasticsearch_errors_busy_per_domain", metricRollupName: "elasticsearch_errors_busy", metricType: Counter},
PinotRequests: {metricName: "pinot_requests", metricType: Counter},
PinotFailures: {metricName: "pinot_errors", metricType: Counter},
PinotLatency: {metricName: "pinot_latency", metricType: Timer},
PinotErrBadRequestCounter: {metricName: "pinot_errors_bad_request", metricType: Counter},
PinotErrBusyCounter: {metricName: "pinot_errors_busy", metricType: Counter},
PinotRequestsPerDomain: {metricName: "pinot_requests_per_domain", metricRollupName: "pinot_requests", metricType: Counter},
PinotFailuresPerDomain: {metricName: "pinot_errors_per_domain", metricRollupName: "pinot_errors", metricType: Counter},
PinotLatencyPerDomain: {metricName: "pinot_latency_per_domain", metricRollupName: "pinot_latency", metricType: Timer},
PinotErrBadRequestCounterPerDomain: {metricName: "pinot_errors_bad_request_per_domain", metricRollupName: "pinot_errors_bad_request", metricType: Counter},
PinotErrBusyCounterPerDomain: {metricName: "pinot_errors_busy_per_domain", metricRollupName: "pinot_errors_busy", metricType: Counter},
SequentialTaskSubmitRequest: {metricName: "sequentialtask_submit_request", metricType: Counter},
SequentialTaskSubmitRequestTaskQueueExist: {metricName: "sequentialtask_submit_request_taskqueue_exist", metricType: Counter},
SequentialTaskSubmitRequestTaskQueueMissing: {metricName: "sequentialtask_submit_request_taskqueue_missing", metricType: Counter},
Expand Down
4 changes: 4 additions & 0 deletions common/persistence/client/bean.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ package client
import (
"sync"

"github.com/uber/cadence/common/pinot"

"github.com/uber/cadence/common/config"
es "github.com/uber/cadence/common/elasticsearch"
"github.com/uber/cadence/common/messaging"
Expand Down Expand Up @@ -85,6 +87,8 @@ type (
MessagingClient messaging.Client
ESClient es.GenericClient
ESConfig *config.ElasticSearchConfig
PinotConfig *config.PinotVisibilityConfig
PinotClient pinot.GenericClient
}
)

Expand Down
59 changes: 57 additions & 2 deletions common/persistence/client/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,10 @@ package client
import (
"sync"

pnt "github.com/uber/cadence/common/pinot"

pinotVisibility "github.com/uber/cadence/common/persistence/pinot"

"github.com/uber/cadence/common"
"github.com/uber/cadence/common/config"
es "github.com/uber/cadence/common/elasticsearch"
Expand Down Expand Up @@ -265,15 +269,39 @@ func (f *factoryImpl) NewVisibilityManager(
// No need to create visibility manager as no read/write needed
return nil, nil
}
var visibilityFromDB, visibilityFromES p.VisibilityManager
var visibilityFromDB, visibilityFromES, visibilityFromPinot p.VisibilityManager
var err error
if params.PersistenceConfig.VisibilityStore != "" {
visibilityFromDB, err = f.newDBVisibilityManager(resourceConfig)
if err != nil {
return nil, err
}
}
if params.PersistenceConfig.AdvancedVisibilityStore != "" {
if params.PersistenceConfig.AdvancedVisibilityStore == common.PinotVisibilityStoreName {
visibilityProducer, err := params.MessagingClient.NewProducer(common.PinotVisibilityAppName)
if err != nil {
f.logger.Fatal("Creating visibility producer failed", tag.Error(err))
}

visibilityFromPinot = newPinotVisibilityManager(
params.PinotClient, resourceConfig, visibilityProducer, params.MetricsClient, f.logger)

esVisibilityProducer, err := params.MessagingClient.NewProducer(common.VisibilityAppName)
visibilityIndexName := params.ESConfig.Indices[common.VisibilityAppName]
visibilityFromES = newESVisibilityManager(
visibilityIndexName, params.ESClient, resourceConfig, esVisibilityProducer, params.MetricsClient, f.logger,
)

return p.NewPinotVisibilityTripleManager(
visibilityFromDB,
visibilityFromPinot,
visibilityFromES,
resourceConfig.EnableReadVisibilityFromPinot,
resourceConfig.EnableReadVisibilityFromES,
resourceConfig.AdvancedVisibilityWritingMode,
f.logger,
), nil
} else if params.PersistenceConfig.AdvancedVisibilityStore != "" {
visibilityIndexName := params.ESConfig.Indices[common.VisibilityAppName]
visibilityProducer, err := params.MessagingClient.NewProducer(common.VisibilityAppName)
if err != nil {
Expand All @@ -292,6 +320,33 @@ func (f *factoryImpl) NewVisibilityManager(
), nil
}

// NewESVisibilityManager create a visibility manager for ElasticSearch
// In history, it only needs kafka producer for writing data;
// In frontend, it only needs ES client and related config for reading data
func newPinotVisibilityManager(
pinotClient pnt.GenericClient,
visibilityConfig *service.Config,
producer messaging.Producer,
metricsClient metrics.Client,
log log.Logger,
) p.VisibilityManager {
visibilityFromPinotStore := pinotVisibility.NewPinotVisibilityStore(pinotClient, visibilityConfig, producer, log)
visibilityFromPinot := p.NewVisibilityManagerImpl(visibilityFromPinotStore, log)

// wrap with rate limiter
if visibilityConfig.PersistenceMaxQPS != nil && visibilityConfig.PersistenceMaxQPS() != 0 {
pinotRateLimiter := quotas.NewDynamicRateLimiter(visibilityConfig.PersistenceMaxQPS.AsFloat64())
visibilityFromPinot = p.NewVisibilityPersistenceRateLimitedClient(visibilityFromPinot, pinotRateLimiter, log)
}

if metricsClient != nil {
// wrap with metrics
visibilityFromPinot = pinotVisibility.NewPinotVisibilityMetricsClient(visibilityFromPinot, metricsClient, log)
}

return visibilityFromPinot
}

// NewESVisibilityManager create a visibility manager for ElasticSearch
// In history, it only needs kafka producer for writing data;
// In frontend, it only needs ES client and related config for reading data
Expand Down
Loading

0 comments on commit eb8eea9

Please sign in to comment.