Skip to content

Commit

Permalink
Add publish visibility for open workflow to Kafka (cadence-workflow#1256
Browse files Browse the repository at this point in the history
)

* Move kafka consumer out

* Add producer for visibility to kafka

* add publish batch back

* remove test cycle import

* fix tests

* address comment

* address comments

* rebase

* move config to development

* fix travis kafka install
  • Loading branch information
vancexu authored Nov 20, 2018
1 parent 860d64a commit 6d858c3
Show file tree
Hide file tree
Showing 33 changed files with 458 additions and 138 deletions.
2 changes: 1 addition & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ services:

before_install:
- pip install --user ccm
- wget http://www.us.apache.org/dist/kafka/1.1.0/kafka_2.12-1.1.0.tgz -O kafka.tgz
- wget http://www.us.apache.org/dist/kafka/1.1.1/kafka_2.12-1.1.1.tgz -O kafka.tgz
- mkdir -p kafka && tar xzf kafka.tgz -C kafka --strip-components 1
- nohup bash -c "cd kafka && bin/zookeeper-server-start.sh config/zookeeper.properties &"
- nohup bash -c "cd kafka && bin/kafka-server-start.sh config/server.properties &"
Expand Down
10 changes: 9 additions & 1 deletion cmd/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,12 +123,20 @@ func (s *server) startService() common.Daemon {
)
// TODO: We need to switch Cadence to use zap logger, until then just pass zap.NewNop
if params.ClusterMetadata.IsGlobalDomainEnabled() {
params.MessagingClient = messaging.NewKafkaClient(&s.cfg.Kafka, zap.NewNop(), params.Logger, params.MetricScope)
params.MessagingClient = messaging.NewKafkaClient(&s.cfg.Kafka, zap.NewNop(), params.Logger, params.MetricScope, true)
} else {
params.MessagingClient = nil
}

enableVisibilityToKafka := dc.GetBoolProperty(dynamicconfig.EnableVisibilityToKafka, dynamicconfig.DefaultEnableVisibilityToKafka)
if enableVisibilityToKafka() {
params.MessagingClient = messaging.NewKafkaClient(&s.cfg.Kafka, zap.NewNop(), params.Logger, params.MetricScope, false)
} else {
params.MessagingClient = nil
}

params.Logger.Info("Starting service " + s.name)

var daemon common.Daemon

switch s.name {
Expand Down
72 changes: 72 additions & 0 deletions common/codec/gob/gob.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
// Copyright (c) 2017 Uber Technologies, Inc.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.

package gob

import (
"bytes"
"encoding/gob"
"errors"
"fmt"
"reflect"
)

var errEmptyArgument = errors.New("length of input argument is 0")

// Encoder is wrapper of gob encoder/decoder
type Encoder struct{}

// NewGobEncoder create new Encoder
func NewGobEncoder() *Encoder {
return &Encoder{}
}

// Encode one or more objects to binary
func (gobEncoder *Encoder) Encode(value ...interface{}) ([]byte, error) {
if len(value) == 0 {
return nil, errEmptyArgument
}

var buf bytes.Buffer
enc := gob.NewEncoder(&buf)
for i, obj := range value {
if err := enc.Encode(obj); err != nil {
return nil, fmt.Errorf(
"unable to encode argument: %d, %v, with gob error: %v", i, reflect.TypeOf(obj), err)
}
}
return buf.Bytes(), nil
}

// Decode binary to one or more objects
func (gobEncoder *Encoder) Decode(input []byte, valuePtr ...interface{}) error {
if len(valuePtr) == 0 {
return errEmptyArgument
}

dec := gob.NewDecoder(bytes.NewBuffer(input))
for i, obj := range valuePtr {
if err := dec.Decode(obj); err != nil {
return fmt.Errorf(
"unable to decode argument: %d, %v, with gob error: %v", i, reflect.TypeOf(obj), err)
}
}
return nil
}
74 changes: 74 additions & 0 deletions common/codec/gob/gob_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
// Copyright (c) 2017 Uber Technologies, Inc.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.

package gob

import (
"github.com/pborman/uuid"
"github.com/stretchr/testify/require"
"testing"
"time"
)

type testStruct struct {
Domain string
WorkflowID string
RunID string
StartTime int64
}

func TestGobEncoder(t *testing.T) {
encoder := NewGobEncoder()

domain := "test-domain"
wid := uuid.New()
rid := uuid.New()
startTime := time.Now().UnixNano()

// test encode and decode 1 object
msg := &testStruct{
Domain: domain,
WorkflowID: wid,
RunID: rid,
StartTime: startTime,
}
payload, err := encoder.Encode(msg)
require.NoError(t, err)
var decoded *testStruct
err = encoder.Decode(payload, &decoded)
require.NoError(t, err)
require.Equal(t, msg, decoded)

// test encode and decode 2 objects
msg2 := "test-string"
payload, err = encoder.Encode(msg2, msg)
require.NoError(t, err)
var decoded2 string
err = encoder.Decode(payload, &decoded2, &decoded)
require.NoError(t, err)
require.Equal(t, msg, decoded)
require.Equal(t, msg2, decoded2)

// test encode and decode 0 object
_, err = encoder.Encode()
require.Error(t, err)
err = encoder.Decode(payload)
require.Error(t, err)
}
12 changes: 5 additions & 7 deletions common/messaging/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,12 @@

package messaging

import (
"github.com/uber/cadence/.gen/go/replicator"
)

type (
// Client is the interface used to abstract out interaction with messaging system for replication
Client interface {
NewConsumer(currentCluster, sourceCluster, consumerName string, concurrency int) (Consumer, error)
NewProducer(sourceCluster string) (Producer, error)
NewProducer(topic string) (Producer, error)
NewProducerWithClusterName(sourceCluster string) (Producer, error)
}

// Consumer is the unified interface for both internal and external kafka clients
Expand Down Expand Up @@ -57,8 +54,9 @@ type (

// Producer is the interface used to send replication tasks to other clusters through replicator
Producer interface {
Publish(msg *replicator.ReplicationTask) error
PublishBatch(msgs []*replicator.ReplicationTask) error
//PublishBatch(msgs []*replicator.ReplicationTask) error
PublishBatch(msgs []interface{}) error
Publish(msgs interface{}) error
Close() error
}
)
77 changes: 19 additions & 58 deletions common/messaging/kafkaClient.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,28 +31,20 @@ import (
"go.uber.org/zap"
)

const rcvBufferSize = 2 * 1024

type (
// This is a default implementation of Client interface which makes use of uber-go/kafka-client as consumer
kafkaClient struct {
config *KafkaConfig
client uberKafkaClient.Client
logger bark.Logger
}

// a wrapper of uberKafka.Consumer to let the compiler happy
kafkaConsumer struct {
uConsumer uberKafka.Consumer
logger bark.Logger
msgC chan Message
doneC chan struct{}
}
)

var _ Client = (*kafkaClient)(nil)

// NewKafkaClient is used to create an instance of KafkaClient
func NewKafkaClient(kc *KafkaConfig, zLogger *zap.Logger, logger bark.Logger, metricScope tally.Scope) Client {
kc.Validate()
func NewKafkaClient(kc *KafkaConfig, zLogger *zap.Logger, logger bark.Logger, metricScope tally.Scope, checkCluster bool) Client {
kc.Validate(checkCluster)

// mapping from cluster name to list of broker ip addresses
brokers := map[string][]string{}
Expand Down Expand Up @@ -80,50 +72,6 @@ func NewKafkaClient(kc *KafkaConfig, zLogger *zap.Logger, logger bark.Logger, me
}
}

var _ Client = (*kafkaClient)(nil)
var _ Consumer = (*kafkaConsumer)(nil)

func newKafkaConsumer(uConsumer uberKafka.Consumer, logger bark.Logger) Consumer {
return &kafkaConsumer{
uConsumer: uConsumer,
logger: logger,
msgC: make(chan Message, rcvBufferSize),
doneC: make(chan struct{}),
}
}

func (c *kafkaConsumer) Start() error {
if err := c.uConsumer.Start(); err != nil {
return err
}
go func() {
for {
select {
case <-c.doneC:
c.logger.Info("Stop consuming messages from channel")
return
// our Message interface is just a subset of Message interface in kafka-client so we don't need a wrapper here
case uMsg := <-c.uConsumer.Messages():
c.msgC <- uMsg
}
}
}()
return nil
}

// Stop stops the consumer
func (c *kafkaConsumer) Stop() {
c.logger.Info("Stopping consumer")
close(c.doneC)
close(c.msgC)
c.uConsumer.Stop()
}

// Messages return the message channel for this consumer
func (c *kafkaConsumer) Messages() <-chan Message {
return c.msgC
}

// NewConsumer is used to create a Kafka consumer
func (c *kafkaClient) NewConsumer(currentCluster, sourceCluster, consumerName string, concurrency int) (Consumer, error) {
currentTopics := c.config.getTopicsForCadenceCluster(currentCluster)
Expand Down Expand Up @@ -156,8 +104,21 @@ func (c *kafkaClient) NewConsumer(currentCluster, sourceCluster, consumerName st
return newKafkaConsumer(uConsumer, c.logger), nil
}

// NewProducer is used to create a Kafka producer for shipping replication tasks
func (c *kafkaClient) NewProducer(sourceCluster string) (Producer, error) {
// NewProducer is used to create a Kafka producer
func (c *kafkaClient) NewProducer(topic string) (Producer, error) {
kafkaClusterName := c.config.getKafkaClusterForTopic(topic)
brokers := c.config.getBrokersForKafkaCluster(kafkaClusterName)

producer, err := sarama.NewSyncProducer(brokers, nil)
if err != nil {
return nil, err
}

return NewKafkaProducer(topic, producer, c.logger), nil
}

// NewProducerWithClusterName is used to create a Kafka producer for shipping replication tasks
func (c *kafkaClient) NewProducerWithClusterName(sourceCluster string) (Producer, error) {
topics := c.config.getTopicsForCadenceCluster(sourceCluster)
kafkaClusterName := c.config.getKafkaClusterForTopic(topics.Topic)
brokers := c.config.getBrokersForKafkaCluster(kafkaClusterName)
Expand Down
25 changes: 16 additions & 9 deletions common/messaging/kafkaConfig.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,17 +50,17 @@ type (
}
)

// Validate will validate config
func (k *KafkaConfig) Validate() {
// VisibilityTopicName for visibility data to kafka
const VisibilityTopicName = "visibility-topic"

// Validate will validate config for kafka
func (k *KafkaConfig) Validate(checkCluster bool) {
if len(k.Clusters) == 0 {
panic("Empty Kafka Cluster Config")
}
if len(k.Topics) == 0 {
panic("Empty Topics Config")
}
if len(k.ClusterToTopic) == 0 {
panic("Empty Cluster To Topics Config")
}

validateTopicsFn := func(topic string) {
if topic == "" {
Expand All @@ -74,10 +74,17 @@ func (k *KafkaConfig) Validate() {
}
}

for _, topics := range k.ClusterToTopic {
validateTopicsFn(topics.Topic)
validateTopicsFn(topics.RetryTopic)
validateTopicsFn(topics.DLQTopic)
if checkCluster {
if len(k.ClusterToTopic) == 0 {
panic("Empty Cluster To Topics Config")
}
for _, topics := range k.ClusterToTopic {
validateTopicsFn(topics.Topic)
validateTopicsFn(topics.RetryTopic)
validateTopicsFn(topics.DLQTopic)
}
} else {
validateTopicsFn(VisibilityTopicName)
}
}

Expand Down
Loading

0 comments on commit 6d858c3

Please sign in to comment.