Skip to content

Commit

Permalink
Scaffold asyncqueue provider component (cadence-workflow#5627)
Browse files Browse the repository at this point in the history
  • Loading branch information
Shaddoll authored Jan 26, 2024
1 parent ac3d45f commit 58f9fb5
Show file tree
Hide file tree
Showing 25 changed files with 1,070 additions and 35 deletions.
7 changes: 7 additions & 0 deletions cmd/server/cadence/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ import (
"github.com/uber/cadence/common"
"github.com/uber/cadence/common/archiver"
"github.com/uber/cadence/common/archiver/provider"
"github.com/uber/cadence/common/asyncworkflow/queue"
asyncworkflowprovider "github.com/uber/cadence/common/asyncworkflow/queue/provider"
"github.com/uber/cadence/common/blobstore/filestore"
"github.com/uber/cadence/common/cluster"
"github.com/uber/cadence/common/config"
Expand Down Expand Up @@ -284,6 +286,11 @@ func (s *server) startService() common.Daemon {
params.BlobstoreClient = nil
}

params.AsyncWorkflowQueueProvider, err = queue.NewAsyncQueueProvider(s.cfg.AsyncWorkflowQueues, &asyncworkflowprovider.Params{Logger: params.Logger, MetricsClient: params.MetricsClient})
if err != nil {
log.Fatalf("error creating async queue provider: %v", err)
}

params.Logger.Info("Starting service " + s.name)

var daemon common.Daemon
Expand Down
1 change: 1 addition & 0 deletions cmd/server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/uber/cadence/common/metrics"

_ "github.com/uber/cadence/common/archiver/gcloud" // needed to load the optional gcloud archiver plugin
_ "github.com/uber/cadence/common/asyncworkflow/queue/kafka" // needed to load kafka asyncworkflow queue
_ "github.com/uber/cadence/common/persistence/nosql/nosqlplugin/cassandra" // needed to load cassandra plugin
_ "github.com/uber/cadence/common/persistence/nosql/nosqlplugin/cassandra/gocql/public" // needed to load the default gocql client
_ "github.com/uber/cadence/common/persistence/sql/sqlplugin/mysql" // needed to load mysql plugin
Expand Down
2 changes: 1 addition & 1 deletion common/archiver/provider/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@ import (
"sync"

"github.com/uber/cadence/common/archiver"
"github.com/uber/cadence/common/archiver/provider/syncmap"
"github.com/uber/cadence/common/config"
"github.com/uber/cadence/common/syncmap"
)

var (
Expand Down
35 changes: 35 additions & 0 deletions common/asyncworkflow/queue/interface.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
// The MIT License (MIT)

// Copyright (c) 2017-2020 Uber Technologies Inc.

// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in all
// copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
// SOFTWARE.

//go:generate mockgen -package $GOPACKAGE -source $GOFILE -destination interface_mock.go -self_package github.com/uber/cadence/common/asyncworkflow/queue

package queue

import "github.com/uber/cadence/common/messaging"

type (
// Provider is used to get a queue for a given domain
Provider interface {
GetAsyncQueueProducer(domain string) (messaging.Producer, error)
GetAsyncQueueConsumer(domain string) (messaging.Consumer, error)
}
)
88 changes: 88 additions & 0 deletions common/asyncworkflow/queue/interface_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

84 changes: 84 additions & 0 deletions common/asyncworkflow/queue/kafka/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
// The MIT License (MIT)

// Copyright (c) 2017-2020 Uber Technologies Inc.

// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in all
// copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
// SOFTWARE.

package kafka

import (
"fmt"

"github.com/Shopify/sarama"

"github.com/uber/cadence/common/authorization"
"github.com/uber/cadence/common/config"
)

type (
QueueConfig struct {
Connection ConnectionConfig `yaml:"connection"`
Topic string `yaml:"topic"`
}

ConnectionConfig struct {
Brokers []string `yaml:"brokers"`
TLS config.TLS `yaml:"tls"`
SASL config.SASL `yaml:"sasl"`
}
)

func newSaramaConfigWithAuth(tls *config.TLS, sasl *config.SASL) (*sarama.Config, error) {
saramaConfig := sarama.NewConfig()

// TLS support
tlsConfig, err := tls.ToTLSConfig()
if err != nil {
return nil, fmt.Errorf("Error creating Kafka TLS config %w", err)
}
if tlsConfig != nil {
saramaConfig.Net.TLS.Enable = true
saramaConfig.Net.TLS.Config = tlsConfig
}

// SASL support
if sasl.Enabled {
saramaConfig.Net.SASL.Enable = true
saramaConfig.Net.SASL.Handshake = true
saramaConfig.Net.SASL.User = sasl.User
saramaConfig.Net.SASL.Password = sasl.Password
switch sasl.Algorithm {
case "sha512":
saramaConfig.Net.SASL.SCRAMClientGeneratorFunc = func() sarama.SCRAMClient {
return &authorization.XDGSCRAMClient{HashGeneratorFcn: authorization.SHA512}
}
saramaConfig.Net.SASL.Mechanism = sarama.SASLTypeSCRAMSHA512
case "sha256":
saramaConfig.Net.SASL.SCRAMClientGeneratorFunc = func() sarama.SCRAMClient {
return &authorization.XDGSCRAMClient{HashGeneratorFcn: authorization.SHA256}
}
saramaConfig.Net.SASL.Mechanism = sarama.SASLTypeSCRAMSHA256
case "plain":
saramaConfig.Net.SASL.Mechanism = sarama.SASLTypePlaintext
default:
return nil, fmt.Errorf("unknown SASL algorithm %v", sasl.Algorithm)
}
}
return saramaConfig, nil
}
70 changes: 70 additions & 0 deletions common/asyncworkflow/queue/kafka/consumer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
// The MIT License (MIT)

// Copyright (c) 2017-2020 Uber Technologies Inc.

// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in all
// copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
// SOFTWARE.

package kafka

import (
"fmt"
"time"

"github.com/Shopify/sarama"

"github.com/uber/cadence/common/asyncworkflow/queue/provider"
"github.com/uber/cadence/common/config"
"github.com/uber/cadence/common/log"
"github.com/uber/cadence/common/messaging"
"github.com/uber/cadence/common/messaging/kafka"
"github.com/uber/cadence/common/metrics"
)

// ConsumerConstructor is a function that constructs a queue consumer
func ConsumerConstructor(cfg *config.YamlNode, params *provider.Params) (messaging.Consumer, error) {
return newConsumerConstructor(params.MetricsClient, params.Logger)(cfg)

}
func newConsumerConstructor(metricsClient metrics.Client, logger log.Logger) func(cfg *config.YamlNode) (messaging.Consumer, error) {
return func(cfg *config.YamlNode) (messaging.Consumer, error) {
var out *QueueConfig
if err := cfg.Decode(&out); err != nil {
return nil, fmt.Errorf("bad config: %w", err)
}
consumerGroup := fmt.Sprintf("%s-consumer", out.Topic)
dlqTopic := fmt.Sprintf("%s-dlq", out.Topic)
saramaConfig, err := newSaramaConfigWithAuth(&out.Connection.TLS, &out.Connection.SASL)
if err != nil {
return nil, err
}
saramaConfig.Consumer.Fetch.Default = 30 * 1024 * 1024 // 30MB.
saramaConfig.Consumer.Return.Errors = true
saramaConfig.Consumer.Offsets.CommitInterval = time.Second
saramaConfig.Consumer.Offsets.Initial = sarama.OffsetOldest
saramaConfig.Consumer.MaxProcessingTime = 250 * time.Millisecond

dlqConfig, err := newSaramaConfigWithAuth(&out.Connection.TLS, &out.Connection.SASL)
if err != nil {
return nil, err
}
dlqConfig.Producer.Return.Successes = true
dlqProducer, err := newProducer(dlqTopic, out.Connection.Brokers, saramaConfig, metricsClient, logger)
return kafka.NewKafkaConsumer(dlqProducer, out.Connection.Brokers, out.Topic, consumerGroup, saramaConfig, metricsClient, logger)
}
}
Loading

0 comments on commit 58f9fb5

Please sign in to comment.