From c89d2fd6cbab82565ea0252e76e6b0d7f87a210e Mon Sep 17 00:00:00 2001 From: wxing1292 Date: Thu, 10 May 2018 14:48:11 -0700 Subject: [PATCH] add a layer of indiraction between topic name and cadence cluster name (#721) * add a layer of indiraction between topic name and cadence cluster name --- common/messaging/interface.go | 4 ++-- common/messaging/kafkaClient.go | 23 +++++++++++++---------- common/messaging/kafkaConfig.go | 20 ++++++++++++++++---- config/development_active.yaml | 3 +++ config/development_standby.yaml | 3 +++ service/worker/processor.go | 6 ++---- service/worker/replicator.go | 7 +------ 7 files changed, 40 insertions(+), 26 deletions(-) diff --git a/common/messaging/interface.go b/common/messaging/interface.go index c9fa9980fe0..40cbb9bb306 100644 --- a/common/messaging/interface.go +++ b/common/messaging/interface.go @@ -29,8 +29,8 @@ import ( type ( // Client is the interface used to abstract out interaction with messaging system for replication Client interface { - NewConsumer(topicName, consumerName string, concurrency int) (kafka.Consumer, error) - NewProducer(topicName string) (Producer, error) + NewConsumer(cadenceCluster, consumerName string, concurrency int) (kafka.Consumer, error) + NewProducer(cadenceCluster string) (Producer, error) } // Producer is the interface used to send replication tasks to other clusters through replicator diff --git a/common/messaging/kafkaClient.go b/common/messaging/kafkaClient.go index e8c40756040..612fa46fa03 100644 --- a/common/messaging/kafkaClient.go +++ b/common/messaging/kafkaClient.go @@ -21,11 +21,12 @@ package messaging import ( + "strings" + "github.com/Shopify/sarama" "github.com/uber-common/bark" "github.com/uber-go/kafka-client" "github.com/uber-go/kafka-client/kafka" - "strings" ) type ( @@ -37,9 +38,10 @@ type ( ) // NewConsumer is used to create a Kafka consumer -func (c *kafkaClient) NewConsumer(topicName, consumerName string, concurrency int) (kafka.Consumer, error) { - clusterName := c.config.getClusterForTopic(topicName) - brokers := c.config.getBrokersForCluster(clusterName) +func (c *kafkaClient) NewConsumer(cadenceCluster, consumerName string, concurrency int) (kafka.Consumer, error) { + topicName := c.config.getTopicForCadenceCluster(cadenceCluster) + kafkaClusterName := c.config.getKafkaClusterForTopic(topicName) + brokers := c.config.getBrokersForKafkaCluster(kafkaClusterName) consumerConfig := &kafka.ConsumerConfig{ GroupName: consumerName, @@ -47,17 +49,17 @@ func (c *kafkaClient) NewConsumer(topicName, consumerName string, concurrency in kafka.ConsumerTopic{ Topic: kafka.Topic{ Name: topicName, - Cluster: clusterName, + Cluster: kafkaClusterName, BrokerList: brokers, }, RetryQ: kafka.Topic{ Name: strings.Join([]string{topicName, "retry"}, "-"), - Cluster: clusterName, + Cluster: kafkaClusterName, BrokerList: brokers, }, DLQ: kafka.Topic{ Name: strings.Join([]string{topicName, "dlq"}, "-"), - Cluster: clusterName, + Cluster: kafkaClusterName, BrokerList: brokers, }, }, @@ -70,9 +72,10 @@ func (c *kafkaClient) NewConsumer(topicName, consumerName string, concurrency in } // NewProducer is used to create a Kafka producer for shipping replication tasks -func (c *kafkaClient) NewProducer(topicName string) (Producer, error) { - clusterName := c.config.getClusterForTopic(topicName) - brokers := c.config.getBrokersForCluster(clusterName) +func (c *kafkaClient) NewProducer(cadenceCluster string) (Producer, error) { + topicName := c.config.getTopicForCadenceCluster(cadenceCluster) + kafkaClusterName := c.config.getKafkaClusterForTopic(topicName) + brokers := c.config.getBrokersForKafkaCluster(kafkaClusterName) producer, err := sarama.NewSyncProducer(brokers, nil) if err != nil { diff --git a/common/messaging/kafkaConfig.go b/common/messaging/kafkaConfig.go index a96c357c359..40dcd4451ff 100644 --- a/common/messaging/kafkaConfig.go +++ b/common/messaging/kafkaConfig.go @@ -21,6 +21,8 @@ package messaging import ( + "strings" + "github.com/uber-common/bark" "github.com/uber-go/kafka-client" "github.com/uber-go/kafka-client/kafka" @@ -31,8 +33,9 @@ import ( type ( // KafkaConfig describes the configuration needed to connect to all kafka clusters KafkaConfig struct { - Clusters map[string]ClusterConfig `yaml:"clusters"` - Topics map[string]TopicConfig `yaml:"topics"` + Clusters map[string]ClusterConfig `yaml:"clusters"` + Topics map[string]TopicConfig `yaml:"topics"` + ClusterToTopic map[string]string `yaml:"cadence-cluster-to-topic"` } // ClusterConfig describes the configuration for a single Kafka cluster @@ -52,6 +55,11 @@ func (k *KafkaConfig) NewKafkaClient(zLogger *zap.Logger, logger bark.Logger, me brokers := map[string][]string{} for cluster, cfg := range k.Clusters { brokers[cluster] = cfg.Brokers + for i := range brokers[cluster] { + if !strings.Contains(cfg.Brokers[i], ":") { + cfg.Brokers[i] += ":9092" + } + } } // mapping from topic name to cluster that has that topic @@ -69,10 +77,14 @@ func (k *KafkaConfig) NewKafkaClient(zLogger *zap.Logger, logger bark.Logger, me } } -func (k *KafkaConfig) getClusterForTopic(topic string) string { +func (k *KafkaConfig) getTopicForCadenceCluster(cluster string) string { + return k.ClusterToTopic[cluster] +} + +func (k *KafkaConfig) getKafkaClusterForTopic(topic string) string { return k.Topics[topic].Cluster } -func (k *KafkaConfig) getBrokersForCluster(cluster string) []string { +func (k *KafkaConfig) getBrokersForKafkaCluster(cluster string) []string { return k.Clusters[cluster].Brokers } diff --git a/config/development_active.yaml b/config/development_active.yaml index 03f73a5d86b..ddecc4607ae 100644 --- a/config/development_active.yaml +++ b/config/development_active.yaml @@ -75,3 +75,6 @@ kafka: cluster: test standby: cluster: test + cadence-cluster-to-topic: + active: active + standby: standby diff --git a/config/development_standby.yaml b/config/development_standby.yaml index 528dfa3666e..a7ca9c42c28 100644 --- a/config/development_standby.yaml +++ b/config/development_standby.yaml @@ -75,3 +75,6 @@ kafka: cluster: test standby: cluster: test + cadence-cluster-to-topic: + active: active + standby: standby diff --git a/service/worker/processor.go b/service/worker/processor.go index 4c2bbd6aae9..016312ebd95 100644 --- a/service/worker/processor.go +++ b/service/worker/processor.go @@ -74,12 +74,11 @@ var ( ErrUnknownReplicationTask = errors.New("unknown replication task") ) -func newReplicationTaskProcessor(sourceCluster, topic, consumer string, client messaging.Client, config *Config, +func newReplicationTaskProcessor(sourceCluster, consumer string, client messaging.Client, config *Config, logger bark.Logger, metricsClient metrics.Client, domainReplicator DomainReplicator, historyClient history.Client) *replicationTaskProcessor { return &replicationTaskProcessor{ sourceCluster: sourceCluster, - topicName: topic, consumerName: consumer, client: client, shutdownCh: make(chan struct{}), @@ -87,7 +86,6 @@ func newReplicationTaskProcessor(sourceCluster, topic, consumer string, client m logger: logger.WithFields(bark.Fields{ logging.TagWorkflowComponent: logging.TagValueReplicationTaskProcessorComponent, logging.TagSourceCluster: sourceCluster, - logging.TagTopicName: topic, logging.TagConsumerName: consumer, }), metricsClient: metricsClient, @@ -102,7 +100,7 @@ func (p *replicationTaskProcessor) Start() error { } logging.LogReplicationTaskProcessorStartingEvent(p.logger) - consumer, err := p.client.NewConsumer(p.topicName, p.consumerName, p.config.ReplicatorConcurrency) + consumer, err := p.client.NewConsumer(p.sourceCluster, p.consumerName, p.config.ReplicatorConcurrency) if err != nil { logging.LogReplicationTaskProcessorStartFailedEvent(p.logger, err) return err diff --git a/service/worker/replicator.go b/service/worker/replicator.go index 398304382b9..03fc548cabd 100644 --- a/service/worker/replicator.go +++ b/service/worker/replicator.go @@ -69,9 +69,8 @@ func (r *Replicator) Start() error { currentClusterName := r.clusterMetadata.GetCurrentClusterName() for cluster := range r.clusterMetadata.GetAllClusterFailoverVersions() { if cluster != currentClusterName { - topicName := getTopicName(cluster) consumerName := getConsumerName(currentClusterName, cluster) - r.processors = append(r.processors, newReplicationTaskProcessor(cluster, topicName, consumerName, r.client, + r.processors = append(r.processors, newReplicationTaskProcessor(cluster, consumerName, r.client, r.config, r.logger, r.metricsClient, r.domainReplicator, r.historyClient)) } } @@ -95,7 +94,3 @@ func (r *Replicator) Stop() { func getConsumerName(currentCluster, remoteCluster string) string { return fmt.Sprintf("%v_consumer_for_%v", currentCluster, remoteCluster) } - -func getTopicName(sourceCluster string) string { - return sourceCluster -}