Skip to content

Commit

Permalink
Add TLS support for kafka CLI (cadence-workflow#1956)
Browse files Browse the repository at this point in the history
  • Loading branch information
longquanzheng authored and wxing1292 committed Jun 6, 2019
1 parent 8221471 commit 44d4bc4
Show file tree
Hide file tree
Showing 3 changed files with 45 additions and 15 deletions.
4 changes: 2 additions & 2 deletions common/messaging/kafkaClient.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ func NewKafkaClient(kc *KafkaConfig, metricsClient metrics.Client, zLogger *zap.

client := uberKafkaClient.New(uberKafka.NewStaticNameResolver(topicClusterAssignment, brokers), zLogger, metricScope)

tlsConfig, err := createTLSConfig(kc.TLS)
tlsConfig, err := CreateTLSConfig(kc.TLS)
if err != nil {
panic(fmt.Sprintf("Error creating Kafka TLS config %v", err))
}
Expand Down Expand Up @@ -171,7 +171,7 @@ func (c *kafkaClient) newProducerHelper(topic string) (Producer, error) {
return NewKafkaProducer(topic, producer, c.logger), nil
}

func createTLSConfig(tlsConfig TLS) (*tls.Config, error) {
func CreateTLSConfig(tlsConfig TLS) (*tls.Config, error) {
if !tlsConfig.Enabled {
return nil, nil
}
Expand Down
15 changes: 15 additions & 0 deletions tools/cli/admin.go
Original file line number Diff line number Diff line change
Expand Up @@ -309,6 +309,11 @@ func newAdminKafkaCommands() []cli.Command {
cli.StringFlag{
Name: FlagHostFile,
Usage: "Kafka host config file in format of: " + `
tls:
enabled: false
certFile: ""
keyFile: ""
bundleFile: ""
clusters:
localKafka:
brokers:
Expand Down Expand Up @@ -356,6 +361,11 @@ clusters:
cli.StringFlag{
Name: FlagHostFile,
Usage: "Kafka host config file in format of: " + `
tls:
enabled: false
certFile: ""
keyFile: ""
bundleFile: ""
clusters:
localKafka:
brokers:
Expand Down Expand Up @@ -444,6 +454,11 @@ clusters:
cli.StringFlag{
Name: FlagHostFile,
Usage: "Kafka host config file in format of: " + `
tls:
enabled: false
certFile: ""
keyFile: ""
bundleFile: ""
clusters:
localKafka:
brokers:
Expand Down
41 changes: 28 additions & 13 deletions tools/cli/adminKafkaCommands.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ package cli
import (
"bufio"
"bytes"
"crypto/tls"
"encoding/json"
"errors"
"fmt"
Expand Down Expand Up @@ -458,6 +459,7 @@ func decodeVisibility(message []byte, val *indexer.Message) error {
// ClustersConfig describes the kafka clusters
type ClustersConfig struct {
Clusters map[string]messaging.ClusterConfig
TLS messaging.TLS
}

func doRereplicate(shardID int, domainID, wid, rid string, minID, maxID int64, targets []string, producer messaging.Producer, session *gocql.Session) {
Expand Down Expand Up @@ -645,14 +647,18 @@ func newKafkaProducer(c *cli.Context) messaging.Producer {
destTopic := getRequiredOption(c, FlagTopic)

// initialize kafka producer
destBrokers, err := loadBrokers(hostFile, destCluster)
destBrokers, tlsConfig, err := loadBrokerConfig(hostFile, destCluster)
if err != nil {
ErrorAndExit("", err)
}

config := sarama.NewConfig()
config.Producer.RequiredAcks = sarama.WaitForAll
config.Producer.Return.Successes = true
if tlsConfig != nil {
config.Net.TLS.Config = tlsConfig
config.Net.TLS.Enable = true
}
sproducer, err := sarama.NewSyncProducer(destBrokers, config)
if err != nil {
ErrorAndExit("", err)
Expand All @@ -669,9 +675,9 @@ func AdminPurgeTopic(c *cli.Context) {
topic := getRequiredOption(c, FlagTopic)
cluster := getRequiredOption(c, FlagCluster)
group := getRequiredOption(c, FlagGroup)
brokers, err := loadBrokers(hostFile, cluster)
brokers, tlsConfig, err := loadBrokerConfig(hostFile, cluster)

consumer := createConsumerAndWaitForReady(brokers, group, topic)
consumer := createConsumerAndWaitForReady(brokers, tlsConfig, group, topic)

highWaterMarks, ok := consumer.HighWaterMarks()[topic]
if !ok {
Expand All @@ -687,7 +693,7 @@ func AdminPurgeTopic(c *cli.Context) {
ErrorAndExit("fail to commit offset", err)
}

consumer = createConsumerAndWaitForReady(brokers, group, topic)
consumer = createConsumerAndWaitForReady(brokers, tlsConfig, group, topic)
msg, ok := <-consumer.Messages()
fmt.Printf("current offset sample: %v: %v \n", msg.Partition, msg.Offset)
}
Expand Down Expand Up @@ -727,12 +733,12 @@ func AdminMergeDLQ(c *cli.Context) {
startOffset := c.Int64(FlagStartOffset)
group := getRequiredOption(c, FlagGroup)

fromBrokers, err := loadBrokers(hostFile, fromCluster)
fromBrokers, tlsConfig, err := loadBrokerConfig(hostFile, fromCluster)
if err != nil {
ErrorAndExit("", err)
}

consumer := createConsumerAndWaitForReady(fromBrokers, group, fromTopic)
consumer := createConsumerAndWaitForReady(fromBrokers, tlsConfig, group, fromTopic)

highWaterMarks, ok := consumer.HighWaterMarks()[fromTopic]
if !ok {
Expand All @@ -748,7 +754,7 @@ func AdminMergeDLQ(c *cli.Context) {
ErrorAndExit("fail to commit offset", err)
}
// create consumer again to make sure MarkPartitionOffset works
consumer = createConsumerAndWaitForReady(fromBrokers, group, fromTopic)
consumer = createConsumerAndWaitForReady(fromBrokers, tlsConfig, group, fromTopic)

for {
select {
Expand Down Expand Up @@ -783,10 +789,15 @@ func AdminMergeDLQ(c *cli.Context) {
}
}

func createConsumerAndWaitForReady(brokers []string, group, fromTopic string) *cluster.Consumer {
func createConsumerAndWaitForReady(brokers []string, tlsConfig *tls.Config, group, fromTopic string) *cluster.Consumer {
config := cluster.NewConfig()
config.Consumer.Return.Errors = true
config.Consumer.Offsets.Initial = sarama.OffsetOldest
if tlsConfig != nil {
config.Net.TLS.Enable = true
config.Net.TLS.Config = tlsConfig
}

config.Group.Return.Notifications = true

client, err := cluster.NewClient(brokers, config)
Expand Down Expand Up @@ -841,14 +852,14 @@ func parseReplicationTask(in string) (tasks []*replicator.ReplicationTask, err e
return tasks, nil
}

func loadBrokers(hostFile string, cluster string) (brokers []string, err error) {
func loadBrokerConfig(hostFile string, cluster string) ([]string, *tls.Config, error) {
contents, err := ioutil.ReadFile(hostFile)
if err != nil {
return nil, fmt.Errorf("failed to load kafka cluster info from %v., error: %v", hostFile, err)
return nil, nil, fmt.Errorf("failed to load kafka cluster info from %v., error: %v", hostFile, err)
}
clustersConfig := ClustersConfig{}
if err := yaml.Unmarshal(contents, &clustersConfig); err != nil {
return nil, err
return nil, nil, err
}
if len(clustersConfig.Clusters) != 0 {
config, ok := clustersConfig.Clusters[cluster]
Expand All @@ -860,8 +871,12 @@ func loadBrokers(hostFile string, cluster string) (brokers []string, err error)
brs[i] = b
}
}
return brs, nil
tlsConfig, err := messaging.CreateTLSConfig(clustersConfig.TLS)
if err != nil {
return nil, nil, fmt.Errorf(fmt.Sprintf("Error creating Kafka TLS config %v", err))
}
return brs, tlsConfig, nil
}
}
return nil, fmt.Errorf("failed to load broker for cluster %v", cluster)
return nil, nil, fmt.Errorf("failed to load broker for cluster %v", cluster)
}

0 comments on commit 44d4bc4

Please sign in to comment.