Skip to content

Commit

Permalink
add a layer of indiraction between topic name and cadence cluster name (
Browse files Browse the repository at this point in the history
cadence-workflow#721)

* add a layer of indiraction between topic name and cadence cluster name
  • Loading branch information
wxing1292 authored May 10, 2018
1 parent c16ac3a commit c89d2fd
Show file tree
Hide file tree
Showing 7 changed files with 40 additions and 26 deletions.
4 changes: 2 additions & 2 deletions common/messaging/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,8 @@ import (
type (
// Client is the interface used to abstract out interaction with messaging system for replication
Client interface {
NewConsumer(topicName, consumerName string, concurrency int) (kafka.Consumer, error)
NewProducer(topicName string) (Producer, error)
NewConsumer(cadenceCluster, consumerName string, concurrency int) (kafka.Consumer, error)
NewProducer(cadenceCluster string) (Producer, error)
}

// Producer is the interface used to send replication tasks to other clusters through replicator
Expand Down
23 changes: 13 additions & 10 deletions common/messaging/kafkaClient.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,12 @@
package messaging

import (
"strings"

"github.com/Shopify/sarama"
"github.com/uber-common/bark"
"github.com/uber-go/kafka-client"
"github.com/uber-go/kafka-client/kafka"
"strings"
)

type (
Expand All @@ -37,27 +38,28 @@ type (
)

// NewConsumer is used to create a Kafka consumer
func (c *kafkaClient) NewConsumer(topicName, consumerName string, concurrency int) (kafka.Consumer, error) {
clusterName := c.config.getClusterForTopic(topicName)
brokers := c.config.getBrokersForCluster(clusterName)
func (c *kafkaClient) NewConsumer(cadenceCluster, consumerName string, concurrency int) (kafka.Consumer, error) {
topicName := c.config.getTopicForCadenceCluster(cadenceCluster)
kafkaClusterName := c.config.getKafkaClusterForTopic(topicName)
brokers := c.config.getBrokersForKafkaCluster(kafkaClusterName)

consumerConfig := &kafka.ConsumerConfig{
GroupName: consumerName,
TopicList: kafka.ConsumerTopicList{
kafka.ConsumerTopic{
Topic: kafka.Topic{
Name: topicName,
Cluster: clusterName,
Cluster: kafkaClusterName,
BrokerList: brokers,
},
RetryQ: kafka.Topic{
Name: strings.Join([]string{topicName, "retry"}, "-"),
Cluster: clusterName,
Cluster: kafkaClusterName,
BrokerList: brokers,
},
DLQ: kafka.Topic{
Name: strings.Join([]string{topicName, "dlq"}, "-"),
Cluster: clusterName,
Cluster: kafkaClusterName,
BrokerList: brokers,
},
},
Expand All @@ -70,9 +72,10 @@ func (c *kafkaClient) NewConsumer(topicName, consumerName string, concurrency in
}

// NewProducer is used to create a Kafka producer for shipping replication tasks
func (c *kafkaClient) NewProducer(topicName string) (Producer, error) {
clusterName := c.config.getClusterForTopic(topicName)
brokers := c.config.getBrokersForCluster(clusterName)
func (c *kafkaClient) NewProducer(cadenceCluster string) (Producer, error) {
topicName := c.config.getTopicForCadenceCluster(cadenceCluster)
kafkaClusterName := c.config.getKafkaClusterForTopic(topicName)
brokers := c.config.getBrokersForKafkaCluster(kafkaClusterName)

producer, err := sarama.NewSyncProducer(brokers, nil)
if err != nil {
Expand Down
20 changes: 16 additions & 4 deletions common/messaging/kafkaConfig.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
package messaging

import (
"strings"

"github.com/uber-common/bark"
"github.com/uber-go/kafka-client"
"github.com/uber-go/kafka-client/kafka"
Expand All @@ -31,8 +33,9 @@ import (
type (
// KafkaConfig describes the configuration needed to connect to all kafka clusters
KafkaConfig struct {
Clusters map[string]ClusterConfig `yaml:"clusters"`
Topics map[string]TopicConfig `yaml:"topics"`
Clusters map[string]ClusterConfig `yaml:"clusters"`
Topics map[string]TopicConfig `yaml:"topics"`
ClusterToTopic map[string]string `yaml:"cadence-cluster-to-topic"`
}

// ClusterConfig describes the configuration for a single Kafka cluster
Expand All @@ -52,6 +55,11 @@ func (k *KafkaConfig) NewKafkaClient(zLogger *zap.Logger, logger bark.Logger, me
brokers := map[string][]string{}
for cluster, cfg := range k.Clusters {
brokers[cluster] = cfg.Brokers
for i := range brokers[cluster] {
if !strings.Contains(cfg.Brokers[i], ":") {
cfg.Brokers[i] += ":9092"
}
}
}

// mapping from topic name to cluster that has that topic
Expand All @@ -69,10 +77,14 @@ func (k *KafkaConfig) NewKafkaClient(zLogger *zap.Logger, logger bark.Logger, me
}
}

func (k *KafkaConfig) getClusterForTopic(topic string) string {
func (k *KafkaConfig) getTopicForCadenceCluster(cluster string) string {
return k.ClusterToTopic[cluster]
}

func (k *KafkaConfig) getKafkaClusterForTopic(topic string) string {
return k.Topics[topic].Cluster
}

func (k *KafkaConfig) getBrokersForCluster(cluster string) []string {
func (k *KafkaConfig) getBrokersForKafkaCluster(cluster string) []string {
return k.Clusters[cluster].Brokers
}
3 changes: 3 additions & 0 deletions config/development_active.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -75,3 +75,6 @@ kafka:
cluster: test
standby:
cluster: test
cadence-cluster-to-topic:
active: active
standby: standby
3 changes: 3 additions & 0 deletions config/development_standby.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -75,3 +75,6 @@ kafka:
cluster: test
standby:
cluster: test
cadence-cluster-to-topic:
active: active
standby: standby
6 changes: 2 additions & 4 deletions service/worker/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,20 +74,18 @@ var (
ErrUnknownReplicationTask = errors.New("unknown replication task")
)

func newReplicationTaskProcessor(sourceCluster, topic, consumer string, client messaging.Client, config *Config,
func newReplicationTaskProcessor(sourceCluster, consumer string, client messaging.Client, config *Config,
logger bark.Logger, metricsClient metrics.Client, domainReplicator DomainReplicator,
historyClient history.Client) *replicationTaskProcessor {
return &replicationTaskProcessor{
sourceCluster: sourceCluster,
topicName: topic,
consumerName: consumer,
client: client,
shutdownCh: make(chan struct{}),
config: config,
logger: logger.WithFields(bark.Fields{
logging.TagWorkflowComponent: logging.TagValueReplicationTaskProcessorComponent,
logging.TagSourceCluster: sourceCluster,
logging.TagTopicName: topic,
logging.TagConsumerName: consumer,
}),
metricsClient: metricsClient,
Expand All @@ -102,7 +100,7 @@ func (p *replicationTaskProcessor) Start() error {
}

logging.LogReplicationTaskProcessorStartingEvent(p.logger)
consumer, err := p.client.NewConsumer(p.topicName, p.consumerName, p.config.ReplicatorConcurrency)
consumer, err := p.client.NewConsumer(p.sourceCluster, p.consumerName, p.config.ReplicatorConcurrency)
if err != nil {
logging.LogReplicationTaskProcessorStartFailedEvent(p.logger, err)
return err
Expand Down
7 changes: 1 addition & 6 deletions service/worker/replicator.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,9 +69,8 @@ func (r *Replicator) Start() error {
currentClusterName := r.clusterMetadata.GetCurrentClusterName()
for cluster := range r.clusterMetadata.GetAllClusterFailoverVersions() {
if cluster != currentClusterName {
topicName := getTopicName(cluster)
consumerName := getConsumerName(currentClusterName, cluster)
r.processors = append(r.processors, newReplicationTaskProcessor(cluster, topicName, consumerName, r.client,
r.processors = append(r.processors, newReplicationTaskProcessor(cluster, consumerName, r.client,
r.config, r.logger, r.metricsClient, r.domainReplicator, r.historyClient))
}
}
Expand All @@ -95,7 +94,3 @@ func (r *Replicator) Stop() {
func getConsumerName(currentCluster, remoteCluster string) string {
return fmt.Sprintf("%v_consumer_for_%v", currentCluster, remoteCluster)
}

func getTopicName(sourceCluster string) string {
return sourceCluster
}

0 comments on commit c89d2fd

Please sign in to comment.