diff --git a/cmd/server/server.go b/cmd/server/server.go index 8647e197441..3ce0282f0d2 100644 --- a/cmd/server/server.go +++ b/cmd/server/server.go @@ -115,7 +115,7 @@ func (s *server) startService() common.Daemon { s.cfg.ClustersInfo.ClusterNames, ) // TODO: We need to switch Cadence to use zap logger, until then just pass zap.NewNop - params.MessagingClient = s.cfg.Kafka.NewKafkaClient(zap.NewNop(), params.MetricScope) + params.MessagingClient = s.cfg.Kafka.NewKafkaClient(zap.NewNop(), params.Logger, params.MetricScope) var daemon common.Daemon diff --git a/common/logging/tags.go b/common/logging/tags.go index 0b74e714cda..6c5da6f366a 100644 --- a/common/logging/tags.go +++ b/common/logging/tags.go @@ -44,6 +44,8 @@ const ( TagTaskType = "task-type" TagTopicName = "topic-name" TagConsumerName = "consumer-name" + TagPartition = "partition" + TagOffset = "offset" // workflow logging tag values // TagWorkflowComponent Values diff --git a/common/messaging/interface.go b/common/messaging/interface.go index 88e8a4737de..c9fa9980fe0 100644 --- a/common/messaging/interface.go +++ b/common/messaging/interface.go @@ -22,11 +22,21 @@ package messaging import ( "github.com/uber-go/kafka-client/kafka" + + "github.com/uber/cadence/.gen/go/replicator" ) type ( // Client is the interface used to abstract out interaction with messaging system for replication Client interface { NewConsumer(topicName, consumerName string, concurrency int) (kafka.Consumer, error) + NewProducer(topicName string) (Producer, error) + } + + // Producer is the interface used to send replication tasks to other clusters through replicator + Producer interface { + Publish(msg *replicator.ReplicationTask) error + PublishBatch(msgs []*replicator.ReplicationTask) error + Close() error } ) diff --git a/common/messaging/kafkaClient.go b/common/messaging/kafkaClient.go index fd895cd3303..e8c40756040 100644 --- a/common/messaging/kafkaClient.go +++ b/common/messaging/kafkaClient.go @@ -21,6 +21,8 @@ package messaging import ( + "github.com/Shopify/sarama" + "github.com/uber-common/bark" "github.com/uber-go/kafka-client" "github.com/uber-go/kafka-client/kafka" "strings" @@ -30,6 +32,7 @@ type ( kafkaClient struct { config *KafkaConfig client *kafkaclient.Client + logger bark.Logger } ) @@ -65,3 +68,16 @@ func (c *kafkaClient) NewConsumer(topicName, consumerName string, concurrency in consumer, err := c.client.NewConsumer(consumerConfig) return consumer, err } + +// NewProducer is used to create a Kafka producer for shipping replication tasks +func (c *kafkaClient) NewProducer(topicName string) (Producer, error) { + clusterName := c.config.getClusterForTopic(topicName) + brokers := c.config.getBrokersForCluster(clusterName) + + producer, err := sarama.NewSyncProducer(brokers, nil) + if err != nil { + return nil, err + } + + return NewKafkaProducer(topicName, producer, c.logger), nil +} diff --git a/common/messaging/kafkaConfig.go b/common/messaging/kafkaConfig.go index 70948849c74..a96c357c359 100644 --- a/common/messaging/kafkaConfig.go +++ b/common/messaging/kafkaConfig.go @@ -21,10 +21,10 @@ package messaging import ( + "github.com/uber-common/bark" "github.com/uber-go/kafka-client" "github.com/uber-go/kafka-client/kafka" "github.com/uber-go/tally" - "go.uber.org/zap" ) @@ -47,7 +47,7 @@ type ( ) // NewKafkaClient is used to create an instance of KafkaClient -func (k *KafkaConfig) NewKafkaClient(logger *zap.Logger, metricScope tally.Scope) Client { +func (k *KafkaConfig) NewKafkaClient(zLogger *zap.Logger, logger bark.Logger, metricScope tally.Scope) Client { // mapping from cluster name to list of broker ip addresses brokers := map[string][]string{} for cluster, cfg := range k.Clusters { @@ -60,11 +60,12 @@ func (k *KafkaConfig) NewKafkaClient(logger *zap.Logger, metricScope tally.Scope topicClusterAssignment[topic] = []string{cfg.Cluster} } - client := kafkaclient.New(kafka.NewStaticNameResolver(topicClusterAssignment, brokers), zap.NewNop(), tally.NoopScope) + client := kafkaclient.New(kafka.NewStaticNameResolver(topicClusterAssignment, brokers), zLogger, metricScope) return &kafkaClient{ config: k, client: client, + logger: logger, } } diff --git a/common/messaging/kafkaProducer.go b/common/messaging/kafkaProducer.go new file mode 100644 index 00000000000..1cf2ebd2da2 --- /dev/null +++ b/common/messaging/kafkaProducer.go @@ -0,0 +1,121 @@ +// Copyright (c) 2017 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package messaging + +import ( + "encoding/json" + + "github.com/Shopify/sarama" + "github.com/uber-common/bark" + "github.com/uber/cadence/common/logging" + + "github.com/uber/cadence/.gen/go/replicator" +) + +type ( + kafkaProducer struct { + topic string + producer sarama.SyncProducer + logger bark.Logger + } +) + +// NewKafkaProducer is used to create the Kafka based producer implementation +func NewKafkaProducer(topic string, producer sarama.SyncProducer, logger bark.Logger) Producer { + return &kafkaProducer{ + topic: topic, + producer: producer, + logger: logger.WithFields(bark.Fields{ + logging.TagTopicName: topic, + }), + } +} + +// Publish is used to send messages to other clusters through Kafka topic +func (p *kafkaProducer) Publish(task *replicator.ReplicationTask) error { + payload, err := p.serializeTask(task) + if err != nil { + return err + } + + msg := &sarama.ProducerMessage{ + Topic: p.topic, + Value: sarama.ByteEncoder(payload), + } + + partition, offset, err := p.producer.SendMessage(msg) + if err != nil { + p.logger.WithFields(bark.Fields{ + logging.TagPartition: partition, + logging.TagOffset: offset, + logging.TagErr: err, + }).Warn("Failed to publish message to kafka") + + return err + } + + return nil +} + +// PublishBatch is used to send messages to other clusters through Kafka topic +func (p *kafkaProducer) PublishBatch(tasks []*replicator.ReplicationTask) error { + var msgs []*sarama.ProducerMessage + for _, task := range tasks { + payload, err := p.serializeTask(task) + if err != nil { + return err + } + + msgs = append(msgs, &sarama.ProducerMessage{ + Topic: p.topic, + Value: sarama.ByteEncoder(payload), + }) + } + + err := p.producer.SendMessages(msgs) + if err != nil { + p.logger.WithFields(bark.Fields{ + logging.TagErr: err, + }).Warn("Failed to publish batch of messages to kafka") + + return err + } + + return nil +} + +// Close is used to close Kafka publisher +func (p *kafkaProducer) Close() error { + return p.producer.Close() +} + +func (p *kafkaProducer) serializeTask(task *replicator.ReplicationTask) ([]byte, error) { + payload, err := json.Marshal(task) + if err != nil { + p.logger.WithFields(bark.Fields{ + logging.TagErr: err, + }).Error("Failed to serialize replication task") + + return nil, err + } + + return payload, nil +}