From 6d858c3969503fe954a7286b277333038ec51a6f Mon Sep 17 00:00:00 2001 From: Bowei Xu Date: Tue, 20 Nov 2018 14:12:29 -0800 Subject: [PATCH] Add publish visibility for open workflow to Kafka (#1256) * Move kafka consumer out * Add producer for visibility to kafka * add publish batch back * remove test cycle import * fix tests * address comment * address comments * rebase * move config to development * fix travis kafka install --- .travis.yml | 2 +- cmd/server/server.go | 10 ++- common/codec/gob/gob.go | 72 +++++++++++++++++ common/codec/gob/gob_test.go | 74 +++++++++++++++++ common/messaging/interface.go | 12 ++- common/messaging/kafkaClient.go | 77 +++++------------- common/messaging/kafkaConfig.go | 25 +++--- common/messaging/kafkaConsumer.go | 81 +++++++++++++++++++ common/messaging/kafkaProducer.go | 76 ++++++++++------- common/messaging/message.go | 29 +++++++ common/mocks/KafkaProducer.go | 11 ++- common/mocks/MessagingClient.go | 7 +- common/service/config/config.go | 2 +- common/service/dynamicconfig/constants.go | 6 ++ common/service/serviceTestBase.go | 1 + config/development.yaml | 9 +++ host/onebox.go | 2 +- hostxdc/Integration_domain_failover_test.go | 4 +- service/frontend/service.go | 2 +- service/history/handler.go | 4 +- service/history/historyEngine.go | 19 ++++- service/history/historyEngine2_test.go | 2 +- .../history/historyEngine3_eventsv2_test.go | 2 +- service/history/historyEngine_test.go | 2 +- service/history/service.go | 2 + service/history/timerQueueProcessor2_test.go | 2 +- service/history/timerQueueProcessor_test.go | 2 +- .../history/transferQueueActiveProcessor.go | 11 ++- .../transferQueueActiveProcessor_test.go | 3 +- service/history/transferQueueProcessor.go | 12 ++- service/history/transferQueueProcessorBase.go | 20 ++++- .../history/transferQueueStandbyProcessor.go | 7 +- .../transferQueueStandbyProcessor_test.go | 6 +- 33 files changed, 458 insertions(+), 138 deletions(-) create mode 100644 common/codec/gob/gob.go create mode 100644 common/codec/gob/gob_test.go create mode 100644 common/messaging/kafkaConsumer.go create mode 100644 common/messaging/message.go diff --git a/.travis.yml b/.travis.yml index 6943712a8d5..0c468957bf3 100644 --- a/.travis.yml +++ b/.travis.yml @@ -27,7 +27,7 @@ services: before_install: - pip install --user ccm - - wget http://www.us.apache.org/dist/kafka/1.1.0/kafka_2.12-1.1.0.tgz -O kafka.tgz + - wget http://www.us.apache.org/dist/kafka/1.1.1/kafka_2.12-1.1.1.tgz -O kafka.tgz - mkdir -p kafka && tar xzf kafka.tgz -C kafka --strip-components 1 - nohup bash -c "cd kafka && bin/zookeeper-server-start.sh config/zookeeper.properties &" - nohup bash -c "cd kafka && bin/kafka-server-start.sh config/server.properties &" diff --git a/cmd/server/server.go b/cmd/server/server.go index 8006c4ad910..55ed8c4cc7a 100644 --- a/cmd/server/server.go +++ b/cmd/server/server.go @@ -123,12 +123,20 @@ func (s *server) startService() common.Daemon { ) // TODO: We need to switch Cadence to use zap logger, until then just pass zap.NewNop if params.ClusterMetadata.IsGlobalDomainEnabled() { - params.MessagingClient = messaging.NewKafkaClient(&s.cfg.Kafka, zap.NewNop(), params.Logger, params.MetricScope) + params.MessagingClient = messaging.NewKafkaClient(&s.cfg.Kafka, zap.NewNop(), params.Logger, params.MetricScope, true) + } else { + params.MessagingClient = nil + } + + enableVisibilityToKafka := dc.GetBoolProperty(dynamicconfig.EnableVisibilityToKafka, dynamicconfig.DefaultEnableVisibilityToKafka) + if enableVisibilityToKafka() { + params.MessagingClient = messaging.NewKafkaClient(&s.cfg.Kafka, zap.NewNop(), params.Logger, params.MetricScope, false) } else { params.MessagingClient = nil } params.Logger.Info("Starting service " + s.name) + var daemon common.Daemon switch s.name { diff --git a/common/codec/gob/gob.go b/common/codec/gob/gob.go new file mode 100644 index 00000000000..5bf183e22bd --- /dev/null +++ b/common/codec/gob/gob.go @@ -0,0 +1,72 @@ +// Copyright (c) 2017 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package gob + +import ( + "bytes" + "encoding/gob" + "errors" + "fmt" + "reflect" +) + +var errEmptyArgument = errors.New("length of input argument is 0") + +// Encoder is wrapper of gob encoder/decoder +type Encoder struct{} + +// NewGobEncoder create new Encoder +func NewGobEncoder() *Encoder { + return &Encoder{} +} + +// Encode one or more objects to binary +func (gobEncoder *Encoder) Encode(value ...interface{}) ([]byte, error) { + if len(value) == 0 { + return nil, errEmptyArgument + } + + var buf bytes.Buffer + enc := gob.NewEncoder(&buf) + for i, obj := range value { + if err := enc.Encode(obj); err != nil { + return nil, fmt.Errorf( + "unable to encode argument: %d, %v, with gob error: %v", i, reflect.TypeOf(obj), err) + } + } + return buf.Bytes(), nil +} + +// Decode binary to one or more objects +func (gobEncoder *Encoder) Decode(input []byte, valuePtr ...interface{}) error { + if len(valuePtr) == 0 { + return errEmptyArgument + } + + dec := gob.NewDecoder(bytes.NewBuffer(input)) + for i, obj := range valuePtr { + if err := dec.Decode(obj); err != nil { + return fmt.Errorf( + "unable to decode argument: %d, %v, with gob error: %v", i, reflect.TypeOf(obj), err) + } + } + return nil +} diff --git a/common/codec/gob/gob_test.go b/common/codec/gob/gob_test.go new file mode 100644 index 00000000000..60ac67eb952 --- /dev/null +++ b/common/codec/gob/gob_test.go @@ -0,0 +1,74 @@ +// Copyright (c) 2017 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package gob + +import ( + "github.com/pborman/uuid" + "github.com/stretchr/testify/require" + "testing" + "time" +) + +type testStruct struct { + Domain string + WorkflowID string + RunID string + StartTime int64 +} + +func TestGobEncoder(t *testing.T) { + encoder := NewGobEncoder() + + domain := "test-domain" + wid := uuid.New() + rid := uuid.New() + startTime := time.Now().UnixNano() + + // test encode and decode 1 object + msg := &testStruct{ + Domain: domain, + WorkflowID: wid, + RunID: rid, + StartTime: startTime, + } + payload, err := encoder.Encode(msg) + require.NoError(t, err) + var decoded *testStruct + err = encoder.Decode(payload, &decoded) + require.NoError(t, err) + require.Equal(t, msg, decoded) + + // test encode and decode 2 objects + msg2 := "test-string" + payload, err = encoder.Encode(msg2, msg) + require.NoError(t, err) + var decoded2 string + err = encoder.Decode(payload, &decoded2, &decoded) + require.NoError(t, err) + require.Equal(t, msg, decoded) + require.Equal(t, msg2, decoded2) + + // test encode and decode 0 object + _, err = encoder.Encode() + require.Error(t, err) + err = encoder.Decode(payload) + require.Error(t, err) +} diff --git a/common/messaging/interface.go b/common/messaging/interface.go index e603639e782..3f011ca0986 100644 --- a/common/messaging/interface.go +++ b/common/messaging/interface.go @@ -20,15 +20,12 @@ package messaging -import ( - "github.com/uber/cadence/.gen/go/replicator" -) - type ( // Client is the interface used to abstract out interaction with messaging system for replication Client interface { NewConsumer(currentCluster, sourceCluster, consumerName string, concurrency int) (Consumer, error) - NewProducer(sourceCluster string) (Producer, error) + NewProducer(topic string) (Producer, error) + NewProducerWithClusterName(sourceCluster string) (Producer, error) } // Consumer is the unified interface for both internal and external kafka clients @@ -57,8 +54,9 @@ type ( // Producer is the interface used to send replication tasks to other clusters through replicator Producer interface { - Publish(msg *replicator.ReplicationTask) error - PublishBatch(msgs []*replicator.ReplicationTask) error + //PublishBatch(msgs []*replicator.ReplicationTask) error + PublishBatch(msgs []interface{}) error + Publish(msgs interface{}) error Close() error } ) diff --git a/common/messaging/kafkaClient.go b/common/messaging/kafkaClient.go index cb839e1d724..17617657b75 100644 --- a/common/messaging/kafkaClient.go +++ b/common/messaging/kafkaClient.go @@ -31,8 +31,6 @@ import ( "go.uber.org/zap" ) -const rcvBufferSize = 2 * 1024 - type ( // This is a default implementation of Client interface which makes use of uber-go/kafka-client as consumer kafkaClient struct { @@ -40,19 +38,13 @@ type ( client uberKafkaClient.Client logger bark.Logger } - - // a wrapper of uberKafka.Consumer to let the compiler happy - kafkaConsumer struct { - uConsumer uberKafka.Consumer - logger bark.Logger - msgC chan Message - doneC chan struct{} - } ) +var _ Client = (*kafkaClient)(nil) + // NewKafkaClient is used to create an instance of KafkaClient -func NewKafkaClient(kc *KafkaConfig, zLogger *zap.Logger, logger bark.Logger, metricScope tally.Scope) Client { - kc.Validate() +func NewKafkaClient(kc *KafkaConfig, zLogger *zap.Logger, logger bark.Logger, metricScope tally.Scope, checkCluster bool) Client { + kc.Validate(checkCluster) // mapping from cluster name to list of broker ip addresses brokers := map[string][]string{} @@ -80,50 +72,6 @@ func NewKafkaClient(kc *KafkaConfig, zLogger *zap.Logger, logger bark.Logger, me } } -var _ Client = (*kafkaClient)(nil) -var _ Consumer = (*kafkaConsumer)(nil) - -func newKafkaConsumer(uConsumer uberKafka.Consumer, logger bark.Logger) Consumer { - return &kafkaConsumer{ - uConsumer: uConsumer, - logger: logger, - msgC: make(chan Message, rcvBufferSize), - doneC: make(chan struct{}), - } -} - -func (c *kafkaConsumer) Start() error { - if err := c.uConsumer.Start(); err != nil { - return err - } - go func() { - for { - select { - case <-c.doneC: - c.logger.Info("Stop consuming messages from channel") - return - // our Message interface is just a subset of Message interface in kafka-client so we don't need a wrapper here - case uMsg := <-c.uConsumer.Messages(): - c.msgC <- uMsg - } - } - }() - return nil -} - -// Stop stops the consumer -func (c *kafkaConsumer) Stop() { - c.logger.Info("Stopping consumer") - close(c.doneC) - close(c.msgC) - c.uConsumer.Stop() -} - -// Messages return the message channel for this consumer -func (c *kafkaConsumer) Messages() <-chan Message { - return c.msgC -} - // NewConsumer is used to create a Kafka consumer func (c *kafkaClient) NewConsumer(currentCluster, sourceCluster, consumerName string, concurrency int) (Consumer, error) { currentTopics := c.config.getTopicsForCadenceCluster(currentCluster) @@ -156,8 +104,21 @@ func (c *kafkaClient) NewConsumer(currentCluster, sourceCluster, consumerName st return newKafkaConsumer(uConsumer, c.logger), nil } -// NewProducer is used to create a Kafka producer for shipping replication tasks -func (c *kafkaClient) NewProducer(sourceCluster string) (Producer, error) { +// NewProducer is used to create a Kafka producer +func (c *kafkaClient) NewProducer(topic string) (Producer, error) { + kafkaClusterName := c.config.getKafkaClusterForTopic(topic) + brokers := c.config.getBrokersForKafkaCluster(kafkaClusterName) + + producer, err := sarama.NewSyncProducer(brokers, nil) + if err != nil { + return nil, err + } + + return NewKafkaProducer(topic, producer, c.logger), nil +} + +// NewProducerWithClusterName is used to create a Kafka producer for shipping replication tasks +func (c *kafkaClient) NewProducerWithClusterName(sourceCluster string) (Producer, error) { topics := c.config.getTopicsForCadenceCluster(sourceCluster) kafkaClusterName := c.config.getKafkaClusterForTopic(topics.Topic) brokers := c.config.getBrokersForKafkaCluster(kafkaClusterName) diff --git a/common/messaging/kafkaConfig.go b/common/messaging/kafkaConfig.go index 4e10c2f3927..2182229ca46 100644 --- a/common/messaging/kafkaConfig.go +++ b/common/messaging/kafkaConfig.go @@ -50,17 +50,17 @@ type ( } ) -// Validate will validate config -func (k *KafkaConfig) Validate() { +// VisibilityTopicName for visibility data to kafka +const VisibilityTopicName = "visibility-topic" + +// Validate will validate config for kafka +func (k *KafkaConfig) Validate(checkCluster bool) { if len(k.Clusters) == 0 { panic("Empty Kafka Cluster Config") } if len(k.Topics) == 0 { panic("Empty Topics Config") } - if len(k.ClusterToTopic) == 0 { - panic("Empty Cluster To Topics Config") - } validateTopicsFn := func(topic string) { if topic == "" { @@ -74,10 +74,17 @@ func (k *KafkaConfig) Validate() { } } - for _, topics := range k.ClusterToTopic { - validateTopicsFn(topics.Topic) - validateTopicsFn(topics.RetryTopic) - validateTopicsFn(topics.DLQTopic) + if checkCluster { + if len(k.ClusterToTopic) == 0 { + panic("Empty Cluster To Topics Config") + } + for _, topics := range k.ClusterToTopic { + validateTopicsFn(topics.Topic) + validateTopicsFn(topics.RetryTopic) + validateTopicsFn(topics.DLQTopic) + } + } else { + validateTopicsFn(VisibilityTopicName) } } diff --git a/common/messaging/kafkaConsumer.go b/common/messaging/kafkaConsumer.go new file mode 100644 index 00000000000..1c5e5da3bfd --- /dev/null +++ b/common/messaging/kafkaConsumer.go @@ -0,0 +1,81 @@ +// Copyright (c) 2017 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package messaging + +import ( + "github.com/uber-common/bark" + uberKafka "github.com/uber-go/kafka-client/kafka" +) + +const rcvBufferSize = 2 * 1024 + +type ( + // a wrapper of uberKafka.Consumer to let the compiler happy + kafkaConsumer struct { + uConsumer uberKafka.Consumer + logger bark.Logger + msgC chan Message + doneC chan struct{} + } +) + +var _ Consumer = (*kafkaConsumer)(nil) + +func newKafkaConsumer(uConsumer uberKafka.Consumer, logger bark.Logger) Consumer { + return &kafkaConsumer{ + uConsumer: uConsumer, + logger: logger, + msgC: make(chan Message, rcvBufferSize), + doneC: make(chan struct{}), + } +} + +func (c *kafkaConsumer) Start() error { + if err := c.uConsumer.Start(); err != nil { + return err + } + go func() { + for { + select { + case <-c.doneC: + c.logger.Info("Stop consuming messages from channel") + return + // our Message interface is just a subset of Message interface in kafka-client so we don't need a wrapper here + case uMsg := <-c.uConsumer.Messages(): + c.msgC <- uMsg + } + } + }() + return nil +} + +// Stop stops the consumer +func (c *kafkaConsumer) Stop() { + c.logger.Info("Stopping consumer") + close(c.doneC) + close(c.msgC) + c.uConsumer.Stop() +} + +// Messages return the message channel for this consumer +func (c *kafkaConsumer) Messages() <-chan Message { + return c.msgC +} diff --git a/common/messaging/kafkaProducer.go b/common/messaging/kafkaProducer.go index c6a5aecb108..bbd9cb27214 100644 --- a/common/messaging/kafkaProducer.go +++ b/common/messaging/kafkaProducer.go @@ -21,13 +21,13 @@ package messaging import ( + "errors" "github.com/Shopify/sarama" - "github.com/uber-common/bark" + "github.com/uber/cadence/.gen/go/replicator" "github.com/uber/cadence/common/codec" + "github.com/uber/cadence/common/codec/gob" "github.com/uber/cadence/common/logging" - - "github.com/uber/cadence/.gen/go/replicator" ) type ( @@ -35,16 +35,20 @@ type ( topic string producer sarama.SyncProducer msgEncoder codec.BinaryEncoder + gobEncoder *gob.Encoder logger bark.Logger } ) +var _ Producer = (*kafkaProducer)(nil) + // NewKafkaProducer is used to create the Kafka based producer implementation func NewKafkaProducer(topic string, producer sarama.SyncProducer, logger bark.Logger) Producer { return &kafkaProducer{ topic: topic, producer: producer, msgEncoder: codec.NewThriftRWEncoder(), + gobEncoder: gob.NewGobEncoder(), logger: logger.WithFields(bark.Fields{ logging.TagTopicName: topic, }), @@ -52,29 +56,20 @@ func NewKafkaProducer(topic string, producer sarama.SyncProducer, logger bark.Lo } // Publish is used to send messages to other clusters through Kafka topic -func (p *kafkaProducer) Publish(task *replicator.ReplicationTask) error { - payload, err := p.serializeTask(task) +func (p *kafkaProducer) Publish(msg interface{}) error { + message, err := p.getProducerMessage(msg) if err != nil { return err } - partitionKey := p.getKey(task) - - msg := &sarama.ProducerMessage{ - Topic: p.topic, - Key: partitionKey, - Value: sarama.ByteEncoder(payload), - } - - partition, offset, err := p.producer.SendMessage(msg) + partition, offset, err := p.producer.SendMessage(message) if err != nil { p.logger.WithFields(bark.Fields{ logging.TagPartition: partition, - logging.TagPartitionKey: partitionKey, + logging.TagPartitionKey: message.Key, logging.TagOffset: offset, logging.TagErr: err, }).Warn("Failed to publish message to kafka") - return err } @@ -82,26 +77,21 @@ func (p *kafkaProducer) Publish(task *replicator.ReplicationTask) error { } // PublishBatch is used to send messages to other clusters through Kafka topic -func (p *kafkaProducer) PublishBatch(tasks []*replicator.ReplicationTask) error { - var msgs []*sarama.ProducerMessage - for _, task := range tasks { - payload, err := p.serializeTask(task) +func (p *kafkaProducer) PublishBatch(msgs []interface{}) error { + var producerMsgs []*sarama.ProducerMessage + for _, msg := range msgs { + message, err := p.getProducerMessage(msg) if err != nil { return err } - - msgs = append(msgs, &sarama.ProducerMessage{ - Topic: p.topic, - Value: sarama.ByteEncoder(payload), - }) + producerMsgs = append(producerMsgs, message) } - err := p.producer.SendMessages(msgs) + err := p.producer.SendMessages(producerMsgs) if err != nil { p.logger.WithFields(bark.Fields{ logging.TagErr: err, }).Warn("Failed to publish batch of messages to kafka") - return err } @@ -148,3 +138,35 @@ func (p *kafkaProducer) getKey(task *replicator.ReplicationTask) sarama.Encoder return nil } + +func (p *kafkaProducer) getProducerMessage(message interface{}) (*sarama.ProducerMessage, error) { + switch message.(type) { + case *replicator.ReplicationTask: + task := message.(*replicator.ReplicationTask) + payload, err := p.serializeTask(task) + if err != nil { + return nil, err + } + partitionKey := p.getKey(task) + msg := &sarama.ProducerMessage{ + Topic: p.topic, + Key: partitionKey, + Value: sarama.ByteEncoder(payload), + } + return msg, nil + case *OpenWorkflowMsg: + openRecord := message.(*OpenWorkflowMsg) + payload, err := p.gobEncoder.Encode(openRecord) + if err != nil { + return nil, err + } + msg := &sarama.ProducerMessage{ + Topic: VisibilityTopicName, + Key: sarama.StringEncoder(openRecord.WorkflowID), + Value: sarama.ByteEncoder(payload), + } + return msg, nil + default: + return nil, errors.New("unknown producer message type") + } +} diff --git a/common/messaging/message.go b/common/messaging/message.go new file mode 100644 index 00000000000..2bd803f85fe --- /dev/null +++ b/common/messaging/message.go @@ -0,0 +1,29 @@ +// Copyright (c) 2017 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package messaging + +// OpenWorkflowMsg is visibility data for open workflow +type OpenWorkflowMsg struct { + Domain string + WorkflowID string + RunID string + StartTime int64 +} diff --git a/common/mocks/KafkaProducer.go b/common/mocks/KafkaProducer.go index e485be2e264..f40e3d36997 100644 --- a/common/mocks/KafkaProducer.go +++ b/common/mocks/KafkaProducer.go @@ -21,8 +21,7 @@ package mocks import ( - mock "github.com/stretchr/testify/mock" - "github.com/uber/cadence/.gen/go/replicator" + "github.com/stretchr/testify/mock" "github.com/uber/cadence/common/messaging" ) @@ -46,11 +45,11 @@ func (_m *KafkaProducer) Close() error { } // Publish provides a mock function with given fields: msg -func (_m *KafkaProducer) Publish(msg *replicator.ReplicationTask) error { +func (_m *KafkaProducer) Publish(msg interface{}) error { ret := _m.Called(msg) var r0 error - if rf, ok := ret.Get(0).(func(*replicator.ReplicationTask) error); ok { + if rf, ok := ret.Get(0).(func(interface{}) error); ok { r0 = rf(msg) } else { r0 = ret.Error(0) @@ -60,11 +59,11 @@ func (_m *KafkaProducer) Publish(msg *replicator.ReplicationTask) error { } // PublishBatch provides a mock function with given fields: msgs -func (_m *KafkaProducer) PublishBatch(msgs []*replicator.ReplicationTask) error { +func (_m *KafkaProducer) PublishBatch(msgs []interface{}) error { ret := _m.Called(msgs) var r0 error - if rf, ok := ret.Get(0).(func([]*replicator.ReplicationTask) error); ok { + if rf, ok := ret.Get(0).(func([]interface{}) error); ok { r0 = rf(msgs) } else { r0 = ret.Error(0) diff --git a/common/mocks/MessagingClient.go b/common/mocks/MessagingClient.go index 000a600885d..d106b9da236 100644 --- a/common/mocks/MessagingClient.go +++ b/common/mocks/MessagingClient.go @@ -48,6 +48,11 @@ func (c *MessagingClient) NewConsumer(currentCluster, sourceCluster, consumerNam } // NewProducer generates a dummy implementation of kafka producer -func (c *MessagingClient) NewProducer(sourceCluster string) (messaging.Producer, error) { +func (c *MessagingClient) NewProducer(topic string) (messaging.Producer, error) { + return c.publisherMock, nil +} + +// NewProducerWithClusterName generates a dummy implementation of kafka producer +func (c *MessagingClient) NewProducerWithClusterName(sourceCluster string) (messaging.Producer, error) { return c.publisherMock, nil } diff --git a/common/service/config/config.go b/common/service/config/config.go index 2398f3ab627..32218864a1f 100644 --- a/common/service/config/config.go +++ b/common/service/config/config.go @@ -39,7 +39,7 @@ type ( Persistence Persistence `yaml:"persistence"` // Log is the logging config Log Logger `yaml:"log"` - // ClustersInfo is the config containing all valid clusters and active acluster + // ClustersInfo is the config containing all valid clusters and active cluster ClustersInfo ClustersInfo `yaml:"clustersInfo"` // Services is a map of service name to service config items Services map[string]Service `yaml:"services"` diff --git a/common/service/dynamicconfig/constants.go b/common/service/dynamicconfig/constants.go index 23e9e96cdf6..a137522f7f1 100644 --- a/common/service/dynamicconfig/constants.go +++ b/common/service/dynamicconfig/constants.go @@ -51,6 +51,7 @@ var keys = map[Key]string{ EnableGlobalDomain: "system.enableGlobalDomain", EnableNewKafkaClient: "system.enableNewKafkaClient", EnableVisibilitySampling: "system.enableVisibilitySampling", + EnableVisibilityToKafka: "system.enableVisibilityToKafka", // frontend settings FrontendPersistenceMaxQPS: "frontend.persistenceMaxQPS", @@ -162,6 +163,8 @@ const ( EnableNewKafkaClient // EnableVisibilitySampling is key for enable visibility sampling EnableVisibilitySampling + // EnableVisibilityToKafka is key for enable kafka + EnableVisibilityToKafka // key for frontend @@ -395,3 +398,6 @@ func TaskTypeFilter(taskType int) FilterOption { filterMap[TaskType] = taskType } } + +// DefaultEnableVisibilityToKafka default value for config EnableVisibilityToKafka +const DefaultEnableVisibilityToKafka = false diff --git a/common/service/serviceTestBase.go b/common/service/serviceTestBase.go index 0a8880b94b4..a272c7ab3ee 100644 --- a/common/service/serviceTestBase.go +++ b/common/service/serviceTestBase.go @@ -38,6 +38,7 @@ type ( hostInfo *membership.HostInfo clusterMetadata cluster.Metadata messagingClient messaging.Client + kafkaClient messaging.Client clientFactory client.Factory membershipMonitor membership.Monitor diff --git a/config/development.yaml b/config/development.yaml index 33702366427..e9f622d7a88 100644 --- a/config/development.yaml +++ b/config/development.yaml @@ -73,3 +73,12 @@ clustersInfo: clusterInitialFailoverVersion: active: 0 standby: 1 + +kafka: + clusters: + test: + brokers: + - 127.0.0.1:9092 + topics: + visibility-topic: + cluster: test diff --git a/host/onebox.go b/host/onebox.go index 9c84a885527..9999d6c17c4 100644 --- a/host/onebox.go +++ b/host/onebox.go @@ -286,7 +286,7 @@ func (c *cadenceImpl) startFrontend(rpHosts []string, startWG *sync.WaitGroup) { var kafkaProducer messaging.Producer var err error if c.enableWorker() { - kafkaProducer, err = c.messagingClient.NewProducer(topicName[c.clusterNo]) + kafkaProducer, err = c.messagingClient.NewProducerWithClusterName(topicName[c.clusterNo]) if err != nil { c.logger.WithField("error", err).Fatal("Failed to create kafka producer when start frontend") } diff --git a/hostxdc/Integration_domain_failover_test.go b/hostxdc/Integration_domain_failover_test.go index 3408d329b44..f459d183200 100644 --- a/hostxdc/Integration_domain_failover_test.go +++ b/hostxdc/Integration_domain_failover_test.go @@ -175,7 +175,7 @@ func (s *testCluster) createMessagingClient() messaging.Client { Topics: topics, ClusterToTopic: clusterToTopic, } - return messaging.NewKafkaClient(&kafkaConfig, zap.NewNop(), s.logger, tally.NoopScope) + return messaging.NewKafkaClient(&kafkaConfig, zap.NewNop(), s.logger, tally.NoopScope, true) } func getTopicList(topicName string) messaging.TopicList { @@ -254,7 +254,7 @@ func (s *integrationClustersTestSuite) TestDomainFailover() { resp, err := client1.DescribeDomain(createContext(), descReq) s.NoError(err) s.NotNil(resp) - // Wait for domain cache to pick the chenge + // Wait for domain cache to pick the change time.Sleep(cacheRefreshInterval) client2 := s.cluster2.host.GetFrontendClient() // standby diff --git a/service/frontend/service.go b/service/frontend/service.go index 20d3308df42..6d4cb65ea48 100644 --- a/service/frontend/service.go +++ b/service/frontend/service.go @@ -119,7 +119,7 @@ func (s *Service) Start() { // TODO when global domain is enabled, uncomment the line below and remove the line after var kafkaProducer messaging.Producer if base.GetClusterMetadata().IsGlobalDomainEnabled() { - kafkaProducer, err = base.GetMessagingClient().NewProducer(base.GetClusterMetadata().GetCurrentClusterName()) + kafkaProducer, err = base.GetMessagingClient().NewProducerWithClusterName(base.GetClusterMetadata().GetCurrentClusterName()) if err != nil { log.Fatalf("Creating kafka producer failed: %v", err) } diff --git a/service/history/handler.go b/service/history/handler.go index 3b6bb6e29fb..d6cd2e4dab1 100644 --- a/service/history/handler.go +++ b/service/history/handler.go @@ -137,7 +137,7 @@ func (h *Handler) Start() error { // TODO when global domain is enabled, uncomment the line below and remove the line after if h.GetClusterMetadata().IsGlobalDomainEnabled() { var err error - h.publisher, err = h.GetMessagingClient().NewProducer(h.GetClusterMetadata().GetCurrentClusterName()) + h.publisher, err = h.GetMessagingClient().NewProducerWithClusterName(h.GetClusterMetadata().GetCurrentClusterName()) if err != nil { h.GetLogger().Fatalf("Creating kafka producer failed: %v", err) } @@ -172,7 +172,7 @@ func (h *Handler) Stop() { // CreateEngine is implementation for HistoryEngineFactory used for creating the engine instance for shard func (h *Handler) CreateEngine(context ShardContext) Engine { - return NewEngineWithShardContext(context, h.visibilityMgr, h.matchingServiceClient, h.historyServiceClient, h.historyEventNotifier, h.publisher, h.config) + return NewEngineWithShardContext(context, h.visibilityMgr, h.matchingServiceClient, h.historyServiceClient, h.historyEventNotifier, h.publisher, h.GetMessagingClient(), h.config) } // Health is for health check diff --git a/service/history/historyEngine.go b/service/history/historyEngine.go index 12c81f915ab..6fe90701d57 100644 --- a/service/history/historyEngine.go +++ b/service/history/historyEngine.go @@ -126,7 +126,7 @@ var ( // NewEngineWithShardContext creates an instance of history engine func NewEngineWithShardContext(shard ShardContext, visibilityMgr persistence.VisibilityManager, - matching matching.Client, historyClient hc.Client, historyEventNotifier historyEventNotifier, publisher messaging.Producer, config *Config) Engine { + matching matching.Client, historyClient hc.Client, historyEventNotifier historyEventNotifier, publisher messaging.Producer, messagingClient messaging.Client, config *Config) Engine { currentClusterName := shard.GetService().GetClusterMetadata().GetCurrentClusterName() shardWrapper := &shardContextWrapper{ currentClusterName: currentClusterName, @@ -154,7 +154,11 @@ func NewEngineWithShardContext(shard ShardContext, visibilityMgr persistence.Vis historyEventNotifier: historyEventNotifier, config: config, } - txProcessor := newTransferQueueProcessor(shard, historyEngImpl, visibilityMgr, matching, historyClient, logger) + var visibilityProducer messaging.Producer + if config.EnableVisibilityToKafka() { + visibilityProducer = getVisibilityProducer(messagingClient) + } + txProcessor := newTransferQueueProcessor(shard, historyEngImpl, visibilityMgr, visibilityProducer, matching, historyClient, logger) historyEngImpl.timerProcessor = newTimerQueueProcessor(shard, historyEngImpl, matching, logger) historyEngImpl.txProcessor = txProcessor shardWrapper.txProcessor = txProcessor @@ -3042,3 +3046,14 @@ func getWorkflowAlreadyStartedError(errMsg string, createRequestID string, workf RunId: common.StringPtr(fmt.Sprintf("%v", runID)), } } + +func getVisibilityProducer(messagingClient messaging.Client) messaging.Producer { + if messagingClient == nil { + return nil + } + visibilityProducer, err := messagingClient.NewProducer(messaging.VisibilityTopicName) + if err != nil { + panic(err) + } + return visibilityProducer +} diff --git a/service/history/historyEngine2_test.go b/service/history/historyEngine2_test.go index 90f5b2ddae8..e9aff6f7b4a 100644 --- a/service/history/historyEngine2_test.go +++ b/service/history/historyEngine2_test.go @@ -142,7 +142,7 @@ func (s *engine2Suite) SetupTest() { tokenSerializer: common.NewJSONTaskTokenSerializer(), config: s.config, } - h.txProcessor = newTransferQueueProcessor(mockShard, h, s.mockVisibilityMgr, s.mockMatchingClient, s.mockHistoryClient, s.logger) + h.txProcessor = newTransferQueueProcessor(mockShard, h, s.mockVisibilityMgr, s.mockProducer, s.mockMatchingClient, s.mockHistoryClient, s.logger) h.timerProcessor = newTimerQueueProcessor(mockShard, h, s.mockMatchingClient, s.logger) s.historyEngine = h } diff --git a/service/history/historyEngine3_eventsv2_test.go b/service/history/historyEngine3_eventsv2_test.go index 86f142a71b2..a7bd106df51 100644 --- a/service/history/historyEngine3_eventsv2_test.go +++ b/service/history/historyEngine3_eventsv2_test.go @@ -144,7 +144,7 @@ func (s *engine3Suite) SetupTest() { tokenSerializer: common.NewJSONTaskTokenSerializer(), config: s.config, } - h.txProcessor = newTransferQueueProcessor(mockShard, h, s.mockVisibilityMgr, s.mockMatchingClient, s.mockHistoryClient, s.logger) + h.txProcessor = newTransferQueueProcessor(mockShard, h, s.mockVisibilityMgr, s.mockProducer, s.mockMatchingClient, s.mockHistoryClient, s.logger) h.timerProcessor = newTimerQueueProcessor(mockShard, h, s.mockMatchingClient, s.logger) s.historyEngine = h } diff --git a/service/history/historyEngine_test.go b/service/history/historyEngine_test.go index 343110ca91e..112a984cedc 100644 --- a/service/history/historyEngine_test.go +++ b/service/history/historyEngine_test.go @@ -161,7 +161,7 @@ func (s *engineSuite) SetupTest() { historyEventNotifier: historyEventNotifier, config: NewDynamicConfigForTest(), } - h.txProcessor = newTransferQueueProcessor(shardContextWrapper, h, s.mockVisibilityMgr, s.mockMatchingClient, s.mockHistoryClient, s.logger) + h.txProcessor = newTransferQueueProcessor(shardContextWrapper, h, s.mockVisibilityMgr, s.mockProducer, s.mockMatchingClient, s.mockHistoryClient, s.logger) h.timerProcessor = newTimerQueueProcessor(shardContextWrapper, h, s.mockMatchingClient, s.logger) h.historyEventNotifier.Start() shardContextWrapper.txProcessor = h.txProcessor diff --git a/service/history/service.go b/service/history/service.go index d4d1b80a425..38b75d48a8d 100644 --- a/service/history/service.go +++ b/service/history/service.go @@ -40,6 +40,7 @@ type Config struct { EnableVisibilitySampling dynamicconfig.BoolPropertyFn VisibilityOpenMaxQPS dynamicconfig.IntPropertyFnWithDomainFilter VisibilityClosedMaxQPS dynamicconfig.IntPropertyFnWithDomainFilter + EnableVisibilityToKafka dynamicconfig.BoolPropertyFn // HistoryCache settings // Change of these configs require shard restart @@ -129,6 +130,7 @@ func NewConfig(dc *dynamicconfig.Collection, numberOfShards int) *Config { EnableVisibilitySampling: dc.GetBoolProperty(dynamicconfig.EnableVisibilitySampling, true), VisibilityOpenMaxQPS: dc.GetIntPropertyFilteredByDomain(dynamicconfig.HistoryVisibilityOpenMaxQPS, 300), VisibilityClosedMaxQPS: dc.GetIntPropertyFilteredByDomain(dynamicconfig.HistoryVisibilityClosedMaxQPS, 300), + EnableVisibilityToKafka: dc.GetBoolProperty(dynamicconfig.EnableVisibilityToKafka, dynamicconfig.DefaultEnableVisibilityToKafka), HistoryCacheInitialSize: dc.GetIntProperty(dynamicconfig.HistoryCacheInitialSize, 128), HistoryCacheMaxSize: dc.GetIntProperty(dynamicconfig.HistoryCacheMaxSize, 512), HistoryCacheTTL: dc.GetDurationProperty(dynamicconfig.HistoryCacheTTL, time.Hour), diff --git a/service/history/timerQueueProcessor2_test.go b/service/history/timerQueueProcessor2_test.go index 46a69ba9eec..6ecd0df1258 100644 --- a/service/history/timerQueueProcessor2_test.go +++ b/service/history/timerQueueProcessor2_test.go @@ -146,7 +146,7 @@ func (s *timerQueueProcessor2Suite) SetupTest() { tokenSerializer: common.NewJSONTaskTokenSerializer(), metricsClient: s.mockShard.GetMetricsClient(), } - h.txProcessor = newTransferQueueProcessor(s.mockShard, h, s.mockVisibilityMgr, s.mockMatchingClient, &mocks.HistoryClient{}, s.logger) + h.txProcessor = newTransferQueueProcessor(s.mockShard, h, s.mockVisibilityMgr, s.mockProducer, s.mockMatchingClient, &mocks.HistoryClient{}, s.logger) h.timerProcessor = newTimerQueueProcessor(s.mockShard, h, s.mockMatchingClient, s.logger) s.mockHistoryEngine = h } diff --git a/service/history/timerQueueProcessor_test.go b/service/history/timerQueueProcessor_test.go index 83ba8e151ef..d1ce247c5d5 100644 --- a/service/history/timerQueueProcessor_test.go +++ b/service/history/timerQueueProcessor_test.go @@ -99,7 +99,7 @@ func (s *timerQueueProcessorSuite) SetupTest() { metricsClient: metrics.NewClient(tally.NoopScope, metrics.History), } s.engineImpl.txProcessor = newTransferQueueProcessor( - s.ShardContext, s.engineImpl, s.mockVisibilityMgr, &mocks.MatchingClient{}, &mocks.HistoryClient{}, s.logger, + s.ShardContext, s.engineImpl, s.mockVisibilityMgr, &mocks.KafkaProducer{}, &mocks.MatchingClient{}, &mocks.HistoryClient{}, s.logger, ) } diff --git a/service/history/transferQueueActiveProcessor.go b/service/history/transferQueueActiveProcessor.go index 857e5665c3f..0338df83338 100644 --- a/service/history/transferQueueActiveProcessor.go +++ b/service/history/transferQueueActiveProcessor.go @@ -31,6 +31,7 @@ import ( "github.com/uber/cadence/common" "github.com/uber/cadence/common/backoff" "github.com/uber/cadence/common/logging" + "github.com/uber/cadence/common/messaging" "github.com/uber/cadence/common/metrics" "github.com/uber/cadence/common/persistence" ) @@ -55,7 +56,7 @@ type ( } ) -func newTransferQueueActiveProcessor(shard ShardContext, historyService *historyEngineImpl, visibilityMgr persistence.VisibilityManager, +func newTransferQueueActiveProcessor(shard ShardContext, historyService *historyEngineImpl, visibilityMgr persistence.VisibilityManager, visibilityProducer messaging.Producer, matchingClient matching.Client, historyClient history.Client, logger bark.Logger) *transferQueueActiveProcessorImpl { config := shard.GetConfig() options := &QueueProcessorOptions{ @@ -99,7 +100,7 @@ func newTransferQueueActiveProcessor(shard ShardContext, historyService *history cache: historyService.historyCache, transferTaskFilter: transferTaskFilter, transferQueueProcessorBase: newTransferQueueProcessorBase( - shard, options, visibilityMgr, matchingClient, maxReadAckLevel, updateTransferAckLevel, transferQueueShutdown, logger, + shard, options, visibilityMgr, visibilityProducer, matchingClient, maxReadAckLevel, updateTransferAckLevel, transferQueueShutdown, logger, ), } @@ -111,7 +112,8 @@ func newTransferQueueActiveProcessor(shard ShardContext, historyService *history return processor } -func newTransferQueueFailoverProcessor(shard ShardContext, historyService *historyEngineImpl, visibilityMgr persistence.VisibilityManager, +func newTransferQueueFailoverProcessor(shard ShardContext, historyService *historyEngineImpl, + visibilityMgr persistence.VisibilityManager, visibilityProducer messaging.Producer, matchingClient matching.Client, historyClient history.Client, domainID string, standbyClusterName string, minLevel int64, maxLevel int64, logger bark.Logger) (func(ackLevel int64) error, *transferQueueActiveProcessorImpl) { config := shard.GetConfig() @@ -167,7 +169,8 @@ func newTransferQueueFailoverProcessor(shard ShardContext, historyService *histo cache: historyService.historyCache, transferTaskFilter: transferTaskFilter, transferQueueProcessorBase: newTransferQueueProcessorBase( - shard, options, visibilityMgr, matchingClient, maxReadAckLevel, updateTransferAckLevel, transferQueueShutdown, logger, + shard, options, visibilityMgr, visibilityProducer, matchingClient, + maxReadAckLevel, updateTransferAckLevel, transferQueueShutdown, logger, ), } diff --git a/service/history/transferQueueActiveProcessor_test.go b/service/history/transferQueueActiveProcessor_test.go index 9e2e5c029f3..acf812d81e9 100644 --- a/service/history/transferQueueActiveProcessor_test.go +++ b/service/history/transferQueueActiveProcessor_test.go @@ -154,7 +154,7 @@ func (s *transferQueueActiveProcessorSuite) SetupTest() { timerProcessor: s.mockTimerQueueProcessor, } s.mockHistoryEngine = h - s.transferQueueActiveProcessor = newTransferQueueActiveProcessor(s.mockShard, h, s.mockVisibilityMgr, s.mockMatchingClient, s.mockHistoryClient, s.logger) + s.transferQueueActiveProcessor = newTransferQueueActiveProcessor(s.mockShard, h, s.mockVisibilityMgr, s.mockProducer, s.mockMatchingClient, s.mockHistoryClient, s.logger) s.mockQueueAckMgr = &MockQueueAckMgr{} s.transferQueueActiveProcessor.queueAckMgr = s.mockQueueAckMgr s.transferQueueActiveProcessor.queueProcessorBase.ackMgr = s.mockQueueAckMgr @@ -328,6 +328,7 @@ func (s *transferQueueActiveProcessorSuite) TestProcessDecisionTask_FirstDecisio s.mockExecutionMgr.On("GetWorkflowExecution", mock.Anything).Return(&persistence.GetWorkflowExecutionResponse{State: persistenceMutableState}, nil) s.mockMatchingClient.On("AddDecisionTask", nil, s.createAddDecisionTaskRequest(transferTask, msBuilder)).Once().Return(nil) s.mockVisibilityMgr.On("RecordWorkflowExecutionStarted", s.createRecordWorkflowExecutionStartedRequest(transferTask, msBuilder)).Once().Return(nil) + s.mockProducer.On("Publish", mock.Anything).Return(nil) _, err := s.transferQueueActiveProcessor.process(transferTask) s.Nil(err) diff --git a/service/history/transferQueueProcessor.go b/service/history/transferQueueProcessor.go index f7abcafd155..939caeb165b 100644 --- a/service/history/transferQueueProcessor.go +++ b/service/history/transferQueueProcessor.go @@ -30,6 +30,7 @@ import ( "github.com/uber/cadence/client/history" "github.com/uber/cadence/client/matching" "github.com/uber/cadence/common/logging" + "github.com/uber/cadence/common/messaging" "github.com/uber/cadence/common/metrics" "github.com/uber/cadence/common/persistence" ) @@ -49,6 +50,7 @@ type ( metricsClient metrics.Client historyService *historyEngineImpl visibilityMgr persistence.VisibilityManager + visibilityProducer messaging.Producer matchingClient matching.Client historyClient history.Client ackLevel int64 @@ -61,7 +63,8 @@ type ( } ) -func newTransferQueueProcessor(shard ShardContext, historyService *historyEngineImpl, visibilityMgr persistence.VisibilityManager, +func newTransferQueueProcessor(shard ShardContext, historyService *historyEngineImpl, + visibilityMgr persistence.VisibilityManager, visibilityProducer messaging.Producer, matchingClient matching.Client, historyClient history.Client, logger bark.Logger) *transferQueueProcessorImpl { logger = logger.WithFields(bark.Fields{ logging.TagWorkflowComponent: logging.TagValueTransferQueueComponent, @@ -71,7 +74,7 @@ func newTransferQueueProcessor(shard ShardContext, historyService *historyEngine for clusterName := range shard.GetService().GetClusterMetadata().GetAllClusterFailoverVersions() { if clusterName != currentClusterName { standbyTaskProcessors[clusterName] = newTransferQueueStandbyProcessor( - clusterName, shard, historyService, visibilityMgr, matchingClient, logger, + clusterName, shard, historyService, visibilityMgr, visibilityProducer, matchingClient, logger, ) } } @@ -84,12 +87,13 @@ func newTransferQueueProcessor(shard ShardContext, historyService *historyEngine metricsClient: historyService.metricsClient, historyService: historyService, visibilityMgr: visibilityMgr, + visibilityProducer: visibilityProducer, matchingClient: matchingClient, historyClient: historyClient, ackLevel: shard.GetTransferAckLevel(), logger: logger, shutdownChan: make(chan struct{}), - activeTaskProcessor: newTransferQueueActiveProcessor(shard, historyService, visibilityMgr, matchingClient, historyClient, logger), + activeTaskProcessor: newTransferQueueActiveProcessor(shard, historyService, visibilityMgr, visibilityProducer, matchingClient, historyClient, logger), standbyTaskProcessors: standbyTaskProcessors, } } @@ -157,7 +161,7 @@ func (t *transferQueueProcessorImpl) FailoverDomain(domainID string) { maxLevel := t.activeTaskProcessor.getQueueReadLevel() + 1 t.logger.Infof("Transfer Failover Triggered: %v, min level: %v, max level: %v.\n", domainID, minLevel, maxLevel) updateShardAckLevel, failoverTaskProcessor := newTransferQueueFailoverProcessor( - t.shard, t.historyService, t.visibilityMgr, t.matchingClient, t.historyClient, + t.shard, t.historyService, t.visibilityMgr, t.visibilityProducer, t.matchingClient, t.historyClient, domainID, standbyClusterName, minLevel, maxLevel, t.logger, ) diff --git a/service/history/transferQueueProcessorBase.go b/service/history/transferQueueProcessorBase.go index 12b3e296581..27a01f0f4c3 100644 --- a/service/history/transferQueueProcessorBase.go +++ b/service/history/transferQueueProcessorBase.go @@ -27,6 +27,7 @@ import ( "github.com/uber/cadence/client/matching" "github.com/uber/cadence/common" "github.com/uber/cadence/common/logging" + "github.com/uber/cadence/common/messaging" "github.com/uber/cadence/common/persistence" ) @@ -41,6 +42,7 @@ type ( options *QueueProcessorOptions executionManager persistence.ExecutionManager visibilityMgr persistence.VisibilityManager + visibilityProducer messaging.Producer matchingClient matching.Client maxReadAckLevel maxReadAckLevel updateTransferAckLevel updateTransferAckLevel @@ -52,7 +54,7 @@ type ( const defaultDomainName = "defaultDomainName" func newTransferQueueProcessorBase(shard ShardContext, options *QueueProcessorOptions, - visibilityMgr persistence.VisibilityManager, matchingClient matching.Client, + visibilityMgr persistence.VisibilityManager, visibilityProducer messaging.Producer, matchingClient matching.Client, maxReadAckLevel maxReadAckLevel, updateTransferAckLevel updateTransferAckLevel, transferQueueShutdown transferQueueShutdown, logger bark.Logger) *transferQueueProcessorBase { return &transferQueueProcessorBase{ @@ -60,6 +62,7 @@ func newTransferQueueProcessorBase(shard ShardContext, options *QueueProcessorOp options: options, executionManager: shard.GetExecutionManager(), visibilityMgr: visibilityMgr, + visibilityProducer: visibilityProducer, matchingClient: matchingClient, maxReadAckLevel: maxReadAckLevel, updateTransferAckLevel: updateTransferAckLevel, @@ -140,6 +143,7 @@ func (t *transferQueueProcessorBase) recordWorkflowStarted( domain := defaultDomainName isSampledEnabled := false wid := execution.GetWorkflowId() + rid := execution.GetRunId() domainEntry, err := t.shard.GetDomainCache().GetDomainByID(domainID) if err != nil { @@ -156,6 +160,20 @@ func (t *transferQueueProcessorBase) recordWorkflowStarted( return nil } + // publish to kafka + if t.visibilityProducer != nil { + msg := &messaging.OpenWorkflowMsg{ + Domain: domain, + WorkflowID: wid, + RunID: rid, + StartTime: startTimeUnixNano, + } + err := t.visibilityProducer.Publish(msg) + if err != nil { + return err + } + } + return t.visibilityMgr.RecordWorkflowExecutionStarted(&persistence.RecordWorkflowExecutionStartedRequest{ DomainUUID: domainID, Domain: domain, diff --git a/service/history/transferQueueStandbyProcessor.go b/service/history/transferQueueStandbyProcessor.go index 6b76b02edef..2a516c3f6c1 100644 --- a/service/history/transferQueueStandbyProcessor.go +++ b/service/history/transferQueueStandbyProcessor.go @@ -26,6 +26,7 @@ import ( "github.com/uber/cadence/client/matching" "github.com/uber/cadence/common" "github.com/uber/cadence/common/logging" + "github.com/uber/cadence/common/messaging" "github.com/uber/cadence/common/metrics" "github.com/uber/cadence/common/persistence" ) @@ -52,7 +53,8 @@ var ( ) func newTransferQueueStandbyProcessor(clusterName string, shard ShardContext, historyService *historyEngineImpl, - visibilityMgr persistence.VisibilityManager, matchingClient matching.Client, logger bark.Logger) *transferQueueStandbyProcessorImpl { + visibilityMgr persistence.VisibilityManager, visibilityProducer messaging.Producer, + matchingClient matching.Client, logger bark.Logger) *transferQueueStandbyProcessorImpl { config := shard.GetConfig() options := &QueueProcessorOptions{ StartDelay: config.TransferProcessorStartDelay, @@ -94,7 +96,8 @@ func newTransferQueueStandbyProcessor(clusterName string, shard ShardContext, hi logger: logger, metricsClient: historyService.metricsClient, transferQueueProcessorBase: newTransferQueueProcessorBase( - shard, options, visibilityMgr, matchingClient, maxReadAckLevel, updateClusterAckLevel, transferQueueShutdown, logger, + shard, options, visibilityMgr, visibilityProducer, matchingClient, + maxReadAckLevel, updateClusterAckLevel, transferQueueShutdown, logger, ), } diff --git a/service/history/transferQueueStandbyProcessor_test.go b/service/history/transferQueueStandbyProcessor_test.go index 3cc703d4df3..88cfd77cf3f 100644 --- a/service/history/transferQueueStandbyProcessor_test.go +++ b/service/history/transferQueueStandbyProcessor_test.go @@ -146,7 +146,7 @@ func (s *transferQueueStandbyProcessorSuite) SetupTest() { s.mockHistoryEngine = h s.clusterName = cluster.TestAlternativeClusterName s.transferQueueStandbyProcessor = newTransferQueueStandbyProcessor( - s.clusterName, s.mockShard, h, s.mockVisibilityMgr, s.mockMatchingClient, s.logger, + s.clusterName, s.mockShard, h, s.mockVisibilityMgr, s.mockProducer, s.mockMatchingClient, s.logger, ) s.mockQueueAckMgr = &MockQueueAckMgr{} s.transferQueueStandbyProcessor.queueAckMgr = s.mockQueueAckMgr @@ -367,7 +367,7 @@ func (s *transferQueueStandbyProcessorSuite) TestProcessDecisionTask_Pending() { persistenceMutableState := createMutableState(msBuilder) s.mockExecutionMgr.On("GetWorkflowExecution", mock.Anything).Return(&persistence.GetWorkflowExecutionResponse{State: persistenceMutableState}, nil) s.mockVisibilityMgr.On("RecordWorkflowExecutionStarted", mock.Anything).Return(nil) - + s.mockProducer.On("Publish", mock.Anything).Return(nil) _, err := s.transferQueueStandbyProcessor.process(transferTask) s.Equal(ErrTaskRetry, err) } @@ -416,6 +416,7 @@ func (s *transferQueueStandbyProcessorSuite) TestProcessDecisionTask_Pending_Pus s.mockExecutionMgr.On("GetWorkflowExecution", mock.Anything).Return(&persistence.GetWorkflowExecutionResponse{State: persistenceMutableState}, nil) s.mockVisibilityMgr.On("RecordWorkflowExecutionStarted", mock.Anything).Return(nil) s.mockMatchingClient.On("AddDecisionTask", mock.Anything, mock.Anything).Return(nil).Once() + s.mockProducer.On("Publish", mock.Anything).Return(nil) _, err := s.transferQueueStandbyProcessor.process(transferTask) s.Nil(nil, err) @@ -475,6 +476,7 @@ func (s *transferQueueStandbyProcessorSuite) TestProcessDecisionTask_Success_Fir StartTimestamp: executionInfo.StartTimestamp.UnixNano(), WorkflowTimeout: int64(executionInfo.WorkflowTimeout), }).Return(nil).Once() + s.mockProducer.On("Publish", mock.Anything).Return(nil) _, err := s.transferQueueStandbyProcessor.process(transferTask) s.Nil(err)