diff --git a/common/messaging/interface.go b/common/messaging/interface.go index 40cbb9bb306..5f4e06ea272 100644 --- a/common/messaging/interface.go +++ b/common/messaging/interface.go @@ -29,8 +29,8 @@ import ( type ( // Client is the interface used to abstract out interaction with messaging system for replication Client interface { - NewConsumer(cadenceCluster, consumerName string, concurrency int) (kafka.Consumer, error) - NewProducer(cadenceCluster string) (Producer, error) + NewConsumer(currentCluster, sourceCluster, consumerName string, concurrency int) (kafka.Consumer, error) + NewProducer(sourceCluster string) (Producer, error) } // Producer is the interface used to send replication tasks to other clusters through replicator diff --git a/common/messaging/kafkaClient.go b/common/messaging/kafkaClient.go index 612fa46fa03..8e47a021b9a 100644 --- a/common/messaging/kafkaClient.go +++ b/common/messaging/kafkaClient.go @@ -21,8 +21,6 @@ package messaging import ( - "strings" - "github.com/Shopify/sarama" "github.com/uber-common/bark" "github.com/uber-go/kafka-client" @@ -32,49 +30,51 @@ import ( type ( kafkaClient struct { config *KafkaConfig - client *kafkaclient.Client + client kafkaclient.Client logger bark.Logger } ) // NewConsumer is used to create a Kafka consumer -func (c *kafkaClient) NewConsumer(cadenceCluster, consumerName string, concurrency int) (kafka.Consumer, error) { - topicName := c.config.getTopicForCadenceCluster(cadenceCluster) - kafkaClusterName := c.config.getKafkaClusterForTopic(topicName) - brokers := c.config.getBrokersForKafkaCluster(kafkaClusterName) +func (c *kafkaClient) NewConsumer(currentCluster, sourceCluster, consumerName string, concurrency int) (kafka.Consumer, error) { + currentTopics := c.config.getTopicsForCadenceCluster(currentCluster) + sourceTopics := c.config.getTopicsForCadenceCluster(sourceCluster) - consumerConfig := &kafka.ConsumerConfig{ - GroupName: consumerName, - TopicList: kafka.ConsumerTopicList{ - kafka.ConsumerTopic{ - Topic: kafka.Topic{ - Name: topicName, - Cluster: kafkaClusterName, - BrokerList: brokers, - }, - RetryQ: kafka.Topic{ - Name: strings.Join([]string{topicName, "retry"}, "-"), - Cluster: kafkaClusterName, - BrokerList: brokers, - }, - DLQ: kafka.Topic{ - Name: strings.Join([]string{topicName, "dlq"}, "-"), - Cluster: kafkaClusterName, - BrokerList: brokers, - }, + topicKafkaCluster := c.config.getKafkaClusterForTopic(sourceTopics.Topic) + retryTopicKafkaCluster := c.config.getKafkaClusterForTopic(currentTopics.RetryTopic) + dqlTopicKafkaCluster := c.config.getKafkaClusterForTopic(currentTopics.DLQTopic) + topicList := kafka.ConsumerTopicList{ + kafka.ConsumerTopic{ + Topic: kafka.Topic{ + Name: sourceTopics.Topic, + Cluster: topicKafkaCluster, + BrokerList: c.config.getBrokersForKafkaCluster(topicKafkaCluster), + }, + RetryQ: kafka.Topic{ + Name: currentTopics.RetryTopic, + Cluster: retryTopicKafkaCluster, + BrokerList: c.config.getBrokersForKafkaCluster(retryTopicKafkaCluster), + }, + DLQ: kafka.Topic{ + Name: currentTopics.DLQTopic, + Cluster: dqlTopicKafkaCluster, + BrokerList: c.config.getBrokersForKafkaCluster(dqlTopicKafkaCluster), }, }, - Concurrency: concurrency, } + consumerConfig := kafka.NewConsumerConfig(consumerName, topicList) + consumerConfig.Concurrency = concurrency + consumerConfig.Offsets.Initial.Offset = kafka.OffsetNewest + consumer, err := c.client.NewConsumer(consumerConfig) return consumer, err } // NewProducer is used to create a Kafka producer for shipping replication tasks -func (c *kafkaClient) NewProducer(cadenceCluster string) (Producer, error) { - topicName := c.config.getTopicForCadenceCluster(cadenceCluster) - kafkaClusterName := c.config.getKafkaClusterForTopic(topicName) +func (c *kafkaClient) NewProducer(sourceCluster string) (Producer, error) { + topics := c.config.getTopicsForCadenceCluster(sourceCluster) + kafkaClusterName := c.config.getKafkaClusterForTopic(topics.Topic) brokers := c.config.getBrokersForKafkaCluster(kafkaClusterName) producer, err := sarama.NewSyncProducer(brokers, nil) @@ -82,5 +82,5 @@ func (c *kafkaClient) NewProducer(cadenceCluster string) (Producer, error) { return nil, err } - return NewKafkaProducer(topicName, producer, c.logger), nil + return NewKafkaProducer(topics.Topic, producer, c.logger), nil } diff --git a/common/messaging/kafkaConfig.go b/common/messaging/kafkaConfig.go index 40dcd4451ff..ea594901966 100644 --- a/common/messaging/kafkaConfig.go +++ b/common/messaging/kafkaConfig.go @@ -21,6 +21,7 @@ package messaging import ( + "fmt" "strings" "github.com/uber-common/bark" @@ -35,7 +36,7 @@ type ( KafkaConfig struct { Clusters map[string]ClusterConfig `yaml:"clusters"` Topics map[string]TopicConfig `yaml:"topics"` - ClusterToTopic map[string]string `yaml:"cadence-cluster-to-topic"` + ClusterToTopic map[string]TopicList `yaml:"cadence-cluster-topics"` } // ClusterConfig describes the configuration for a single Kafka cluster @@ -47,10 +48,19 @@ type ( TopicConfig struct { Cluster string `yaml:"cluster"` } + + // TopicList describes the topic names for each cluster + TopicList struct { + Topic string `yaml:"topic"` + RetryTopic string `yaml:"retry-topic"` + DLQTopic string `yaml:"dlq-topic"` + } ) // NewKafkaClient is used to create an instance of KafkaClient func (k *KafkaConfig) NewKafkaClient(zLogger *zap.Logger, logger bark.Logger, metricScope tally.Scope) Client { + k.validate() + // mapping from cluster name to list of broker ip addresses brokers := map[string][]string{} for cluster, cfg := range k.Clusters { @@ -77,14 +87,44 @@ func (k *KafkaConfig) NewKafkaClient(zLogger *zap.Logger, logger bark.Logger, me } } -func (k *KafkaConfig) getTopicForCadenceCluster(cluster string) string { - return k.ClusterToTopic[cluster] +func (k *KafkaConfig) validate() { + if len(k.Clusters) == 0 { + panic("Empty Kafka Cluster Config") + } + if len(k.Topics) == 0 { + panic("Empty Topics Config") + } + if len(k.ClusterToTopic) == 0 { + panic("Empty Cluster To Topics Config") + } + + validateTopicsFn := func(topic string) { + if topic == "" { + panic("Empty Topic Name") + } else if topicConfig, ok := k.Topics[topic]; !ok { + panic(fmt.Sprintf("Missing Topic Config for Topic %v", topic)) + } else if clusterConfig, ok := k.Clusters[topicConfig.Cluster]; !ok { + panic(fmt.Sprintf("Missing Kafka Cluster Config for Cluster %v", topicConfig.Cluster)) + } else if len(clusterConfig.Brokers) == 0 { + panic(fmt.Sprintf("Missing Kafka Brokers Config for Cluster %v", topicConfig.Cluster)) + } + } + + for _, topics := range k.ClusterToTopic { + validateTopicsFn(topics.Topic) + validateTopicsFn(topics.RetryTopic) + validateTopicsFn(topics.DLQTopic) + } +} + +func (k *KafkaConfig) getTopicsForCadenceCluster(cadenceCluster string) TopicList { + return k.ClusterToTopic[cadenceCluster] } func (k *KafkaConfig) getKafkaClusterForTopic(topic string) string { return k.Topics[topic].Cluster } -func (k *KafkaConfig) getBrokersForKafkaCluster(cluster string) []string { - return k.Clusters[cluster].Brokers +func (k *KafkaConfig) getBrokersForKafkaCluster(kafkaCluster string) []string { + return k.Clusters[kafkaCluster].Brokers } diff --git a/common/mocks/ExecutionManager.go b/common/mocks/ExecutionManager.go index 5ed229a09eb..76d18d80030 100644 --- a/common/mocks/ExecutionManager.go +++ b/common/mocks/ExecutionManager.go @@ -241,7 +241,7 @@ func (_m *ExecutionManager) UpdateWorkflowExecution(request *persistence.UpdateW return r0 } -// UpdateWorkflowExecution provides a mock function with given fields: request +// ResetMutableState provides a mock function with given fields: request func (_m *ExecutionManager) ResetMutableState(request *persistence.ResetMutableStateRequest) error { ret := _m.Called(request) diff --git a/common/mocks/MessagingClient.go b/common/mocks/MessagingClient.go index b998194c907..b87f203e462 100644 --- a/common/mocks/MessagingClient.go +++ b/common/mocks/MessagingClient.go @@ -44,11 +44,11 @@ func NewMockMessagingClient(publisher messaging.Producer, consumer kafka.Consume } // NewConsumer generates a dummy implementation of kafka consumer -func (c *MessagingClient) NewConsumer(topicName, consumerName string, concurrency int) (kafka.Consumer, error) { +func (c *MessagingClient) NewConsumer(currentCluster, sourceCluster, consumerName string, concurrency int) (kafka.Consumer, error) { return c.consumerMock, nil } // NewProducer generates a dummy implementation of kafka producer -func (c *MessagingClient) NewProducer(topicName string) (messaging.Producer, error) { +func (c *MessagingClient) NewProducer(sourceCluster string) (messaging.Producer, error) { return c.publisherMock, nil } diff --git a/config/development_active.yaml b/config/development_active.yaml index ddecc4607ae..cbc512a46ec 100644 --- a/config/development_active.yaml +++ b/config/development_active.yaml @@ -73,8 +73,22 @@ kafka: topics: active: cluster: test + active-retry: + cluster: test + active-dlq: + cluster: test standby: cluster: test - cadence-cluster-to-topic: - active: active - standby: standby + standby-retry: + cluster: test + standby-dlq: + cluster: test + cadence-cluster-topics: + active: + topic: active + retry-topic: active-retry + dlq-topic: active-dlq + standby: + topic: standby + retry-topic: standby-retry + dlq-topic: standby-dlq diff --git a/config/development_standby.yaml b/config/development_standby.yaml index a7ca9c42c28..0f0aabb131e 100644 --- a/config/development_standby.yaml +++ b/config/development_standby.yaml @@ -73,8 +73,22 @@ kafka: topics: active: cluster: test + active-retry: + cluster: test + active-dlq: + cluster: test standby: cluster: test - cadence-cluster-to-topic: - active: active - standby: standby + standby-retry: + cluster: test + standby-dlq: + cluster: test + cadence-cluster-topics: + active: + topic: active + retry-topic: active-retry + dlq-topic: active-dlq + standby: + topic: standby + retry-topic: standby-retry + dlq-topic: standby-dlq diff --git a/glide.lock b/glide.lock index de5daf73a12..cab3da01c94 100644 --- a/glide.lock +++ b/glide.lock @@ -132,7 +132,7 @@ imports: subpackages: - utils - name: github.com/uber-go/kafka-client - version: 01a7eeb4f9ea6e4c16a6ec5d159076c88244c947 + version: d3982364cc55cd78569fc58a587ff5ec0e06ab20 subpackages: - internal/consumer - internal/list diff --git a/glide.yaml b/glide.yaml index 1a8c28eaa42..247d80897c9 100644 --- a/glide.yaml +++ b/glide.yaml @@ -46,6 +46,7 @@ import: - transport/http - transport/tchannel - package: github.com/uber-go/kafka-client + version: ^0.1.7 # Added excludeDirs to prevent build from failing on the yarpc generated code. excludeDirs: diff --git a/service/worker/processor.go b/service/worker/processor.go index 016312ebd95..fcc6869ed33 100644 --- a/service/worker/processor.go +++ b/service/worker/processor.go @@ -50,6 +50,7 @@ type ( } replicationTaskProcessor struct { + currentCluster string sourceCluster string topicName string consumerName string @@ -74,15 +75,16 @@ var ( ErrUnknownReplicationTask = errors.New("unknown replication task") ) -func newReplicationTaskProcessor(sourceCluster, consumer string, client messaging.Client, config *Config, +func newReplicationTaskProcessor(currentCluster, sourceCluster, consumer string, client messaging.Client, config *Config, logger bark.Logger, metricsClient metrics.Client, domainReplicator DomainReplicator, historyClient history.Client) *replicationTaskProcessor { return &replicationTaskProcessor{ - sourceCluster: sourceCluster, - consumerName: consumer, - client: client, - shutdownCh: make(chan struct{}), - config: config, + currentCluster: currentCluster, + sourceCluster: sourceCluster, + consumerName: consumer, + client: client, + shutdownCh: make(chan struct{}), + config: config, logger: logger.WithFields(bark.Fields{ logging.TagWorkflowComponent: logging.TagValueReplicationTaskProcessorComponent, logging.TagSourceCluster: sourceCluster, @@ -100,7 +102,7 @@ func (p *replicationTaskProcessor) Start() error { } logging.LogReplicationTaskProcessorStartingEvent(p.logger) - consumer, err := p.client.NewConsumer(p.sourceCluster, p.consumerName, p.config.ReplicatorConcurrency) + consumer, err := p.client.NewConsumer(p.currentCluster, p.sourceCluster, p.consumerName, p.config.ReplicatorConcurrency) if err != nil { logging.LogReplicationTaskProcessorStartFailedEvent(p.logger, err) return err diff --git a/service/worker/replicator.go b/service/worker/replicator.go index 03fc548cabd..23adb5a6ece 100644 --- a/service/worker/replicator.go +++ b/service/worker/replicator.go @@ -70,7 +70,7 @@ func (r *Replicator) Start() error { for cluster := range r.clusterMetadata.GetAllClusterFailoverVersions() { if cluster != currentClusterName { consumerName := getConsumerName(currentClusterName, cluster) - r.processors = append(r.processors, newReplicationTaskProcessor(cluster, consumerName, r.client, + r.processors = append(r.processors, newReplicationTaskProcessor(currentClusterName, cluster, consumerName, r.client, r.config, r.logger, r.metricsClient, r.domainReplicator, r.historyClient)) } }