Skip to content

Commit

Permalink
Kafka based publisher for replication tasks (cadence-workflow#585)
Browse files Browse the repository at this point in the history
  • Loading branch information
samarabbas authored Feb 28, 2018
1 parent d914e07 commit f265fd8
Show file tree
Hide file tree
Showing 6 changed files with 154 additions and 4 deletions.
2 changes: 1 addition & 1 deletion cmd/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ func (s *server) startService() common.Daemon {
s.cfg.ClustersInfo.ClusterNames,
)
// TODO: We need to switch Cadence to use zap logger, until then just pass zap.NewNop
params.MessagingClient = s.cfg.Kafka.NewKafkaClient(zap.NewNop(), params.MetricScope)
params.MessagingClient = s.cfg.Kafka.NewKafkaClient(zap.NewNop(), params.Logger, params.MetricScope)

var daemon common.Daemon

Expand Down
2 changes: 2 additions & 0 deletions common/logging/tags.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ const (
TagTaskType = "task-type"
TagTopicName = "topic-name"
TagConsumerName = "consumer-name"
TagPartition = "partition"
TagOffset = "offset"

// workflow logging tag values
// TagWorkflowComponent Values
Expand Down
10 changes: 10 additions & 0 deletions common/messaging/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,21 @@ package messaging

import (
"github.com/uber-go/kafka-client/kafka"

"github.com/uber/cadence/.gen/go/replicator"
)

type (
// Client is the interface used to abstract out interaction with messaging system for replication
Client interface {
NewConsumer(topicName, consumerName string, concurrency int) (kafka.Consumer, error)
NewProducer(topicName string) (Producer, error)
}

// Producer is the interface used to send replication tasks to other clusters through replicator
Producer interface {
Publish(msg *replicator.ReplicationTask) error
PublishBatch(msgs []*replicator.ReplicationTask) error
Close() error
}
)
16 changes: 16 additions & 0 deletions common/messaging/kafkaClient.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
package messaging

import (
"github.com/Shopify/sarama"
"github.com/uber-common/bark"
"github.com/uber-go/kafka-client"
"github.com/uber-go/kafka-client/kafka"
"strings"
Expand All @@ -30,6 +32,7 @@ type (
kafkaClient struct {
config *KafkaConfig
client *kafkaclient.Client
logger bark.Logger
}
)

Expand Down Expand Up @@ -65,3 +68,16 @@ func (c *kafkaClient) NewConsumer(topicName, consumerName string, concurrency in
consumer, err := c.client.NewConsumer(consumerConfig)
return consumer, err
}

// NewProducer is used to create a Kafka producer for shipping replication tasks
func (c *kafkaClient) NewProducer(topicName string) (Producer, error) {
clusterName := c.config.getClusterForTopic(topicName)
brokers := c.config.getBrokersForCluster(clusterName)

producer, err := sarama.NewSyncProducer(brokers, nil)
if err != nil {
return nil, err
}

return NewKafkaProducer(topicName, producer, c.logger), nil
}
7 changes: 4 additions & 3 deletions common/messaging/kafkaConfig.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,10 @@
package messaging

import (
"github.com/uber-common/bark"
"github.com/uber-go/kafka-client"
"github.com/uber-go/kafka-client/kafka"
"github.com/uber-go/tally"

"go.uber.org/zap"
)

Expand All @@ -47,7 +47,7 @@ type (
)

// NewKafkaClient is used to create an instance of KafkaClient
func (k *KafkaConfig) NewKafkaClient(logger *zap.Logger, metricScope tally.Scope) Client {
func (k *KafkaConfig) NewKafkaClient(zLogger *zap.Logger, logger bark.Logger, metricScope tally.Scope) Client {
// mapping from cluster name to list of broker ip addresses
brokers := map[string][]string{}
for cluster, cfg := range k.Clusters {
Expand All @@ -60,11 +60,12 @@ func (k *KafkaConfig) NewKafkaClient(logger *zap.Logger, metricScope tally.Scope
topicClusterAssignment[topic] = []string{cfg.Cluster}
}

client := kafkaclient.New(kafka.NewStaticNameResolver(topicClusterAssignment, brokers), zap.NewNop(), tally.NoopScope)
client := kafkaclient.New(kafka.NewStaticNameResolver(topicClusterAssignment, brokers), zLogger, metricScope)

return &kafkaClient{
config: k,
client: client,
logger: logger,
}
}

Expand Down
121 changes: 121 additions & 0 deletions common/messaging/kafkaProducer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
// Copyright (c) 2017 Uber Technologies, Inc.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.

package messaging

import (
"encoding/json"

"github.com/Shopify/sarama"
"github.com/uber-common/bark"
"github.com/uber/cadence/common/logging"

"github.com/uber/cadence/.gen/go/replicator"
)

type (
kafkaProducer struct {
topic string
producer sarama.SyncProducer
logger bark.Logger
}
)

// NewKafkaProducer is used to create the Kafka based producer implementation
func NewKafkaProducer(topic string, producer sarama.SyncProducer, logger bark.Logger) Producer {
return &kafkaProducer{
topic: topic,
producer: producer,
logger: logger.WithFields(bark.Fields{
logging.TagTopicName: topic,
}),
}
}

// Publish is used to send messages to other clusters through Kafka topic
func (p *kafkaProducer) Publish(task *replicator.ReplicationTask) error {
payload, err := p.serializeTask(task)
if err != nil {
return err
}

msg := &sarama.ProducerMessage{
Topic: p.topic,
Value: sarama.ByteEncoder(payload),
}

partition, offset, err := p.producer.SendMessage(msg)
if err != nil {
p.logger.WithFields(bark.Fields{
logging.TagPartition: partition,
logging.TagOffset: offset,
logging.TagErr: err,
}).Warn("Failed to publish message to kafka")

return err
}

return nil
}

// PublishBatch is used to send messages to other clusters through Kafka topic
func (p *kafkaProducer) PublishBatch(tasks []*replicator.ReplicationTask) error {
var msgs []*sarama.ProducerMessage
for _, task := range tasks {
payload, err := p.serializeTask(task)
if err != nil {
return err
}

msgs = append(msgs, &sarama.ProducerMessage{
Topic: p.topic,
Value: sarama.ByteEncoder(payload),
})
}

err := p.producer.SendMessages(msgs)
if err != nil {
p.logger.WithFields(bark.Fields{
logging.TagErr: err,
}).Warn("Failed to publish batch of messages to kafka")

return err
}

return nil
}

// Close is used to close Kafka publisher
func (p *kafkaProducer) Close() error {
return p.producer.Close()
}

func (p *kafkaProducer) serializeTask(task *replicator.ReplicationTask) ([]byte, error) {
payload, err := json.Marshal(task)
if err != nil {
p.logger.WithFields(bark.Fields{
logging.TagErr: err,
}).Error("Failed to serialize replication task")

return nil, err
}

return payload, nil
}

0 comments on commit f265fd8

Please sign in to comment.