Skip to content

Commit

Permalink
Changes to Kafka Config (cadence-workflow#736)
Browse files Browse the repository at this point in the history
* update kafka client

* kafka config should be able to config dlq and retry topic name

* fix retry / dlq bug
  • Loading branch information
wxing1292 authored May 16, 2018
1 parent 69c7daf commit f6c32ff
Show file tree
Hide file tree
Showing 11 changed files with 127 additions and 56 deletions.
4 changes: 2 additions & 2 deletions common/messaging/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,8 @@ import (
type (
// Client is the interface used to abstract out interaction with messaging system for replication
Client interface {
NewConsumer(cadenceCluster, consumerName string, concurrency int) (kafka.Consumer, error)
NewProducer(cadenceCluster string) (Producer, error)
NewConsumer(currentCluster, sourceCluster, consumerName string, concurrency int) (kafka.Consumer, error)
NewProducer(sourceCluster string) (Producer, error)
}

// Producer is the interface used to send replication tasks to other clusters through replicator
Expand Down
62 changes: 31 additions & 31 deletions common/messaging/kafkaClient.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,6 @@
package messaging

import (
"strings"

"github.com/Shopify/sarama"
"github.com/uber-common/bark"
"github.com/uber-go/kafka-client"
Expand All @@ -32,55 +30,57 @@ import (
type (
kafkaClient struct {
config *KafkaConfig
client *kafkaclient.Client
client kafkaclient.Client
logger bark.Logger
}
)

// NewConsumer is used to create a Kafka consumer
func (c *kafkaClient) NewConsumer(cadenceCluster, consumerName string, concurrency int) (kafka.Consumer, error) {
topicName := c.config.getTopicForCadenceCluster(cadenceCluster)
kafkaClusterName := c.config.getKafkaClusterForTopic(topicName)
brokers := c.config.getBrokersForKafkaCluster(kafkaClusterName)
func (c *kafkaClient) NewConsumer(currentCluster, sourceCluster, consumerName string, concurrency int) (kafka.Consumer, error) {
currentTopics := c.config.getTopicsForCadenceCluster(currentCluster)
sourceTopics := c.config.getTopicsForCadenceCluster(sourceCluster)

consumerConfig := &kafka.ConsumerConfig{
GroupName: consumerName,
TopicList: kafka.ConsumerTopicList{
kafka.ConsumerTopic{
Topic: kafka.Topic{
Name: topicName,
Cluster: kafkaClusterName,
BrokerList: brokers,
},
RetryQ: kafka.Topic{
Name: strings.Join([]string{topicName, "retry"}, "-"),
Cluster: kafkaClusterName,
BrokerList: brokers,
},
DLQ: kafka.Topic{
Name: strings.Join([]string{topicName, "dlq"}, "-"),
Cluster: kafkaClusterName,
BrokerList: brokers,
},
topicKafkaCluster := c.config.getKafkaClusterForTopic(sourceTopics.Topic)
retryTopicKafkaCluster := c.config.getKafkaClusterForTopic(currentTopics.RetryTopic)
dqlTopicKafkaCluster := c.config.getKafkaClusterForTopic(currentTopics.DLQTopic)
topicList := kafka.ConsumerTopicList{
kafka.ConsumerTopic{
Topic: kafka.Topic{
Name: sourceTopics.Topic,
Cluster: topicKafkaCluster,
BrokerList: c.config.getBrokersForKafkaCluster(topicKafkaCluster),
},
RetryQ: kafka.Topic{
Name: currentTopics.RetryTopic,
Cluster: retryTopicKafkaCluster,
BrokerList: c.config.getBrokersForKafkaCluster(retryTopicKafkaCluster),
},
DLQ: kafka.Topic{
Name: currentTopics.DLQTopic,
Cluster: dqlTopicKafkaCluster,
BrokerList: c.config.getBrokersForKafkaCluster(dqlTopicKafkaCluster),
},
},
Concurrency: concurrency,
}

consumerConfig := kafka.NewConsumerConfig(consumerName, topicList)
consumerConfig.Concurrency = concurrency
consumerConfig.Offsets.Initial.Offset = kafka.OffsetNewest

consumer, err := c.client.NewConsumer(consumerConfig)
return consumer, err
}

// NewProducer is used to create a Kafka producer for shipping replication tasks
func (c *kafkaClient) NewProducer(cadenceCluster string) (Producer, error) {
topicName := c.config.getTopicForCadenceCluster(cadenceCluster)
kafkaClusterName := c.config.getKafkaClusterForTopic(topicName)
func (c *kafkaClient) NewProducer(sourceCluster string) (Producer, error) {
topics := c.config.getTopicsForCadenceCluster(sourceCluster)
kafkaClusterName := c.config.getKafkaClusterForTopic(topics.Topic)
brokers := c.config.getBrokersForKafkaCluster(kafkaClusterName)

producer, err := sarama.NewSyncProducer(brokers, nil)
if err != nil {
return nil, err
}

return NewKafkaProducer(topicName, producer, c.logger), nil
return NewKafkaProducer(topics.Topic, producer, c.logger), nil
}
50 changes: 45 additions & 5 deletions common/messaging/kafkaConfig.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
package messaging

import (
"fmt"
"strings"

"github.com/uber-common/bark"
Expand All @@ -35,7 +36,7 @@ type (
KafkaConfig struct {
Clusters map[string]ClusterConfig `yaml:"clusters"`
Topics map[string]TopicConfig `yaml:"topics"`
ClusterToTopic map[string]string `yaml:"cadence-cluster-to-topic"`
ClusterToTopic map[string]TopicList `yaml:"cadence-cluster-topics"`
}

// ClusterConfig describes the configuration for a single Kafka cluster
Expand All @@ -47,10 +48,19 @@ type (
TopicConfig struct {
Cluster string `yaml:"cluster"`
}

// TopicList describes the topic names for each cluster
TopicList struct {
Topic string `yaml:"topic"`
RetryTopic string `yaml:"retry-topic"`
DLQTopic string `yaml:"dlq-topic"`
}
)

// NewKafkaClient is used to create an instance of KafkaClient
func (k *KafkaConfig) NewKafkaClient(zLogger *zap.Logger, logger bark.Logger, metricScope tally.Scope) Client {
k.validate()

// mapping from cluster name to list of broker ip addresses
brokers := map[string][]string{}
for cluster, cfg := range k.Clusters {
Expand All @@ -77,14 +87,44 @@ func (k *KafkaConfig) NewKafkaClient(zLogger *zap.Logger, logger bark.Logger, me
}
}

func (k *KafkaConfig) getTopicForCadenceCluster(cluster string) string {
return k.ClusterToTopic[cluster]
func (k *KafkaConfig) validate() {
if len(k.Clusters) == 0 {
panic("Empty Kafka Cluster Config")
}
if len(k.Topics) == 0 {
panic("Empty Topics Config")
}
if len(k.ClusterToTopic) == 0 {
panic("Empty Cluster To Topics Config")
}

validateTopicsFn := func(topic string) {
if topic == "" {
panic("Empty Topic Name")
} else if topicConfig, ok := k.Topics[topic]; !ok {
panic(fmt.Sprintf("Missing Topic Config for Topic %v", topic))
} else if clusterConfig, ok := k.Clusters[topicConfig.Cluster]; !ok {
panic(fmt.Sprintf("Missing Kafka Cluster Config for Cluster %v", topicConfig.Cluster))
} else if len(clusterConfig.Brokers) == 0 {
panic(fmt.Sprintf("Missing Kafka Brokers Config for Cluster %v", topicConfig.Cluster))
}
}

for _, topics := range k.ClusterToTopic {
validateTopicsFn(topics.Topic)
validateTopicsFn(topics.RetryTopic)
validateTopicsFn(topics.DLQTopic)
}
}

func (k *KafkaConfig) getTopicsForCadenceCluster(cadenceCluster string) TopicList {
return k.ClusterToTopic[cadenceCluster]
}

func (k *KafkaConfig) getKafkaClusterForTopic(topic string) string {
return k.Topics[topic].Cluster
}

func (k *KafkaConfig) getBrokersForKafkaCluster(cluster string) []string {
return k.Clusters[cluster].Brokers
func (k *KafkaConfig) getBrokersForKafkaCluster(kafkaCluster string) []string {
return k.Clusters[kafkaCluster].Brokers
}
2 changes: 1 addition & 1 deletion common/mocks/ExecutionManager.go
Original file line number Diff line number Diff line change
Expand Up @@ -241,7 +241,7 @@ func (_m *ExecutionManager) UpdateWorkflowExecution(request *persistence.UpdateW
return r0
}

// UpdateWorkflowExecution provides a mock function with given fields: request
// ResetMutableState provides a mock function with given fields: request
func (_m *ExecutionManager) ResetMutableState(request *persistence.ResetMutableStateRequest) error {
ret := _m.Called(request)

Expand Down
4 changes: 2 additions & 2 deletions common/mocks/MessagingClient.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,11 +44,11 @@ func NewMockMessagingClient(publisher messaging.Producer, consumer kafka.Consume
}

// NewConsumer generates a dummy implementation of kafka consumer
func (c *MessagingClient) NewConsumer(topicName, consumerName string, concurrency int) (kafka.Consumer, error) {
func (c *MessagingClient) NewConsumer(currentCluster, sourceCluster, consumerName string, concurrency int) (kafka.Consumer, error) {
return c.consumerMock, nil
}

// NewProducer generates a dummy implementation of kafka producer
func (c *MessagingClient) NewProducer(topicName string) (messaging.Producer, error) {
func (c *MessagingClient) NewProducer(sourceCluster string) (messaging.Producer, error) {
return c.publisherMock, nil
}
20 changes: 17 additions & 3 deletions config/development_active.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -73,8 +73,22 @@ kafka:
topics:
active:
cluster: test
active-retry:
cluster: test
active-dlq:
cluster: test
standby:
cluster: test
cadence-cluster-to-topic:
active: active
standby: standby
standby-retry:
cluster: test
standby-dlq:
cluster: test
cadence-cluster-topics:
active:
topic: active
retry-topic: active-retry
dlq-topic: active-dlq
standby:
topic: standby
retry-topic: standby-retry
dlq-topic: standby-dlq
20 changes: 17 additions & 3 deletions config/development_standby.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -73,8 +73,22 @@ kafka:
topics:
active:
cluster: test
active-retry:
cluster: test
active-dlq:
cluster: test
standby:
cluster: test
cadence-cluster-to-topic:
active: active
standby: standby
standby-retry:
cluster: test
standby-dlq:
cluster: test
cadence-cluster-topics:
active:
topic: active
retry-topic: active-retry
dlq-topic: active-dlq
standby:
topic: standby
retry-topic: standby-retry
dlq-topic: standby-dlq
2 changes: 1 addition & 1 deletion glide.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions glide.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ import:
- transport/http
- transport/tchannel
- package: github.com/uber-go/kafka-client
version: ^0.1.7

# Added excludeDirs to prevent build from failing on the yarpc generated code.
excludeDirs:
Expand Down
16 changes: 9 additions & 7 deletions service/worker/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ type (
}

replicationTaskProcessor struct {
currentCluster string
sourceCluster string
topicName string
consumerName string
Expand All @@ -74,15 +75,16 @@ var (
ErrUnknownReplicationTask = errors.New("unknown replication task")
)

func newReplicationTaskProcessor(sourceCluster, consumer string, client messaging.Client, config *Config,
func newReplicationTaskProcessor(currentCluster, sourceCluster, consumer string, client messaging.Client, config *Config,
logger bark.Logger, metricsClient metrics.Client, domainReplicator DomainReplicator,
historyClient history.Client) *replicationTaskProcessor {
return &replicationTaskProcessor{
sourceCluster: sourceCluster,
consumerName: consumer,
client: client,
shutdownCh: make(chan struct{}),
config: config,
currentCluster: currentCluster,
sourceCluster: sourceCluster,
consumerName: consumer,
client: client,
shutdownCh: make(chan struct{}),
config: config,
logger: logger.WithFields(bark.Fields{
logging.TagWorkflowComponent: logging.TagValueReplicationTaskProcessorComponent,
logging.TagSourceCluster: sourceCluster,
Expand All @@ -100,7 +102,7 @@ func (p *replicationTaskProcessor) Start() error {
}

logging.LogReplicationTaskProcessorStartingEvent(p.logger)
consumer, err := p.client.NewConsumer(p.sourceCluster, p.consumerName, p.config.ReplicatorConcurrency)
consumer, err := p.client.NewConsumer(p.currentCluster, p.sourceCluster, p.consumerName, p.config.ReplicatorConcurrency)
if err != nil {
logging.LogReplicationTaskProcessorStartFailedEvent(p.logger, err)
return err
Expand Down
2 changes: 1 addition & 1 deletion service/worker/replicator.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ func (r *Replicator) Start() error {
for cluster := range r.clusterMetadata.GetAllClusterFailoverVersions() {
if cluster != currentClusterName {
consumerName := getConsumerName(currentClusterName, cluster)
r.processors = append(r.processors, newReplicationTaskProcessor(cluster, consumerName, r.client,
r.processors = append(r.processors, newReplicationTaskProcessor(currentClusterName, cluster, consumerName, r.client,
r.config, r.logger, r.metricsClient, r.domainReplicator, r.historyClient))
}
}
Expand Down

0 comments on commit f6c32ff

Please sign in to comment.