Skip to content

Commit

Permalink
Add cassandra TLS support (cadence-workflow#2769)
Browse files Browse the repository at this point in the history
* Consolidate duplicate Cassandra setup and config into a new common/cassandra package
* Add TLS support for Cassandra
* Add TLS flags to cadence-cassandra-tool
* Combine TLS config structs into new common auth.TLS
* Add more TLS options:
   - CaFile
   - EnableHostVerification
   - Remove TLS bundleFile option in favor of caFile option
  • Loading branch information
aoby authored and wxing1292 committed Nov 7, 2019
1 parent 925d24d commit e0ef19e
Show file tree
Hide file tree
Showing 26 changed files with 368 additions and 105 deletions.
40 changes: 40 additions & 0 deletions common/auth/tls.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
// Copyright (c) 2017 Uber Technologies, Inc.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.

package auth

type (
// TLS describe TLS configuration (for Kafka, Cassandra)
TLS struct {
Enabled bool `yaml:"enabled"`

// CertPath and KeyPath are optional depending on server
// config, but both fields must be omitted to avoid using a
// client certificate
CertFile string `yaml:"certFile"`
KeyFile string `yaml:"keyFile"`

CaFile string `yaml:"caFile"` //optional depending on server config
// If you want to verify the hostname and server cert (like a wildcard for cass cluster) then you should turn this on
// This option is basically the inverse of InSecureSkipVerify
// See InSecureSkipVerify in http://golang.org/pkg/crypto/tls/ for more info
EnableHostVerification bool `yaml:"enableHostVerification"`
}
)
79 changes: 79 additions & 0 deletions common/cassandra/cassandraCluster.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
// Copyright (c) 2017 Uber Technologies, Inc.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.

package cassandra

import (
"strings"

"github.com/gocql/gocql"

"github.com/uber/cadence/common/service/config"
)

// NewCassandraCluster creates a cassandra cluster from a given configuration
func NewCassandraCluster(cfg config.Cassandra) *gocql.ClusterConfig {
hosts := parseHosts(cfg.Hosts)
cluster := gocql.NewCluster(hosts...)
cluster.ProtoVersion = 4
if cfg.Port > 0 {
cluster.Port = cfg.Port
}
if cfg.User != "" && cfg.Password != "" {
cluster.Authenticator = gocql.PasswordAuthenticator{
Username: cfg.User,
Password: cfg.Password,
}
}
if cfg.Keyspace != "" {
cluster.Keyspace = cfg.Keyspace
}
if cfg.Consistency != "" {
cluster.Consistency = gocql.ParseConsistency(cfg.Consistency)
}
if cfg.Datacenter != "" {
cluster.HostFilter = gocql.DataCentreHostFilter(cfg.Datacenter)
}
if cfg.TLS != nil && cfg.TLS.Enabled {
cluster.SslOpts = &gocql.SslOptions{
CertPath: cfg.TLS.CertFile,
KeyPath: cfg.TLS.KeyFile,
CaPath: cfg.TLS.CaFile,
EnableHostVerification: cfg.TLS.EnableHostVerification,
}
}
if cfg.MaxConns > 0 {
cluster.NumConns = cfg.MaxConns
}

cluster.PoolConfig.HostSelectionPolicy = gocql.TokenAwareHostPolicy(gocql.RoundRobinHostPolicy())

return cluster
}

func parseHosts(input string) []string {
var hosts = make([]string, 0)
for _, h := range strings.Split(input, ",") {
if host := strings.TrimSpace(h); len(host) > 0 {
hosts = append(hosts, host)
}
}
return hosts
}
6 changes: 4 additions & 2 deletions common/messaging/kafkaClient.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ import (
"io/ioutil"
"strings"

"github.com/uber/cadence/common/auth"

"github.com/Shopify/sarama"
uberKafkaClient "github.com/uber-go/kafka-client"
uberKafka "github.com/uber-go/kafka-client/kafka"
Expand Down Expand Up @@ -173,7 +175,7 @@ func (c *kafkaClient) newProducerHelper(topic string) (Producer, error) {
}

// CreateTLSConfig return tls config
func CreateTLSConfig(tlsConfig TLS) (*tls.Config, error) {
func CreateTLSConfig(tlsConfig auth.TLS) (*tls.Config, error) {
if !tlsConfig.Enabled {
return nil, nil
}
Expand All @@ -183,7 +185,7 @@ func CreateTLSConfig(tlsConfig TLS) (*tls.Config, error) {
return nil, err
}
caCertPool := x509.NewCertPool()
pemData, err := ioutil.ReadFile(tlsConfig.BundleFile)
pemData, err := ioutil.ReadFile(tlsConfig.CaFile)
if err != nil {
return nil, err
}
Expand Down
12 changes: 3 additions & 9 deletions common/messaging/kafkaConfig.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,26 +22,20 @@ package messaging

import (
"fmt"

"github.com/uber/cadence/common/auth"
)

type (
// KafkaConfig describes the configuration needed to connect to all kafka clusters
KafkaConfig struct {
TLS TLS `yaml:"tls"`
TLS auth.TLS `yaml:"tls"`
Clusters map[string]ClusterConfig `yaml:"clusters"`
Topics map[string]TopicConfig `yaml:"topics"`
ClusterToTopic map[string]TopicList `yaml:"cadence-cluster-topics"`
Applications map[string]TopicList `yaml:"applications"`
}

// TLS describe the Kafka TLS configuration
TLS struct {
Enabled bool `yaml:"enabled"`
CertFile string `yaml:"certFile"`
KeyFile string `yaml:"keyFile"`
BundleFile string `yaml:"bundleFile"`
}

// ClusterConfig describes the configuration for a single Kafka cluster
ClusterConfig struct {
Brokers []string `yaml:"brokers"`
Expand Down
42 changes: 12 additions & 30 deletions common/persistence/cassandra/cassandraHelpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ import (
"os"
"strings"

"github.com/uber/cadence/common/auth"

"github.com/gocql/gocql"
log "github.com/sirupsen/logrus"

Expand All @@ -35,33 +37,6 @@ import (

const cassandraPersistenceName = "cassandra"

// NewCassandraCluster creates a cassandra cluster given comma separated list of clusterHosts
func NewCassandraCluster(clusterHosts string, port int, user, password, dc string) *gocql.ClusterConfig {
var hosts []string
for _, h := range strings.Split(clusterHosts, ",") {
if host := strings.TrimSpace(h); len(host) > 0 {
hosts = append(hosts, host)
}
}

cluster := gocql.NewCluster(hosts...)
cluster.ProtoVersion = 4
if port > 0 {
cluster.Port = port
}
if user != "" && password != "" {
cluster.Authenticator = gocql.PasswordAuthenticator{
Username: user,
Password: password,
}
}
if dc != "" {
cluster.HostFilter = gocql.DataCentreHostFilter(dc)
}
cluster.PoolConfig.HostSelectionPolicy = gocql.TokenAwareHostPolicy(gocql.RoundRobinHostPolicy())
return cluster
}

// CreateCassandraKeyspace creates the keyspace using this session for given replica count
func CreateCassandraKeyspace(s *gocql.Session, keyspace string, replicas int, overwrite bool) (err error) {
// if overwrite flag is set, drop the keyspace and create a new one
Expand Down Expand Up @@ -90,9 +65,15 @@ func DropCassandraKeyspace(s *gocql.Session, keyspace string) (err error) {
return
}

// LoadCassandraSchema loads the schema from the given .cql files on this keyspace
func LoadCassandraSchema(
dir string, fileNames []string, hosts []string, port int, keyspace string, override bool,
// loadCassandraSchema loads the schema from the given .cql files on this keyspace
func loadCassandraSchema(
dir string,
fileNames []string,
hosts []string,
port int,
keyspace string,
override bool,
tls *auth.TLS,
) (err error) {

tmpFile, err := ioutil.TempFile("", "_cadence_")
Expand Down Expand Up @@ -120,6 +101,7 @@ func LoadCassandraSchema(
Hosts: strings.Join(hosts, ","),
Port: port,
Keyspace: keyspace,
TLS: tls,
},
SetupConfig: schema.SetupConfig{
SchemaFilePath: tmpFile.Name(),
Expand Down
8 changes: 3 additions & 5 deletions common/persistence/cassandra/cassandraHistoryPersistence.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ import (
"sort"
"time"

"github.com/uber/cadence/common/cassandra"

"github.com/gocql/gocql"

workflow "github.com/uber/cadence/.gen/go/shared"
Expand Down Expand Up @@ -78,15 +80,11 @@ func newHistoryV2Persistence(
logger log.Logger,
) (p.HistoryStore, error) {

cluster := NewCassandraCluster(cfg.Hosts, cfg.Port, cfg.User, cfg.Password, cfg.Datacenter)
cluster.Keyspace = cfg.Keyspace
cluster := cassandra.NewCassandraCluster(cfg)
cluster.ProtoVersion = cassandraProtoVersion
cluster.Consistency = gocql.LocalQuorum
cluster.SerialConsistency = gocql.LocalSerial
cluster.Timeout = defaultSessionTimeout
if cfg.MaxConns > 0 {
cluster.NumConns = cfg.MaxConns
}
session, err := cluster.CreateSession()
if err != nil {
return nil, err
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ package cassandra
import (
"fmt"

"github.com/uber/cadence/common/cassandra"

"github.com/uber/cadence/common"

"github.com/gocql/gocql"
Expand Down Expand Up @@ -147,8 +149,7 @@ type (

// newMetadataPersistenceV2 is used to create an instance of HistoryManager implementation
func newMetadataPersistenceV2(cfg config.Cassandra, currentClusterName string, logger log.Logger) (p.MetadataStore, error) {
cluster := NewCassandraCluster(cfg.Hosts, cfg.Port, cfg.User, cfg.Password, cfg.Datacenter)
cluster.Keyspace = cfg.Keyspace
cluster := cassandra.NewCassandraCluster(cfg)
cluster.ProtoVersion = cassandraProtoVersion
cluster.Consistency = gocql.LocalQuorum
cluster.SerialConsistency = gocql.LocalSerial
Expand Down
8 changes: 4 additions & 4 deletions common/persistence/cassandra/cassandraPersistence.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ import (
"strings"
"time"

"github.com/uber/cadence/common/cassandra"

"github.com/gocql/gocql"

workflow "github.com/uber/cadence/.gen/go/shared"
Expand Down Expand Up @@ -863,8 +865,7 @@ var _ p.ExecutionStore = (*cassandraPersistence)(nil)

// newShardPersistence is used to create an instance of ShardManager implementation
func newShardPersistence(cfg config.Cassandra, clusterName string, logger log.Logger) (p.ShardStore, error) {
cluster := NewCassandraCluster(cfg.Hosts, cfg.Port, cfg.User, cfg.Password, cfg.Datacenter)
cluster.Keyspace = cfg.Keyspace
cluster := cassandra.NewCassandraCluster(cfg)
cluster.ProtoVersion = cassandraProtoVersion
cluster.Consistency = gocql.LocalQuorum
cluster.SerialConsistency = gocql.LocalSerial
Expand Down Expand Up @@ -893,8 +894,7 @@ func NewWorkflowExecutionPersistence(

// newTaskPersistence is used to create an instance of TaskManager implementation
func newTaskPersistence(cfg config.Cassandra, logger log.Logger) (p.TaskStore, error) {
cluster := NewCassandraCluster(cfg.Hosts, cfg.Port, cfg.User, cfg.Password, cfg.Datacenter)
cluster.Keyspace = cfg.Keyspace
cluster := cassandra.NewCassandraCluster(cfg)
cluster.ProtoVersion = cassandraProtoVersion
cluster.Consistency = gocql.LocalQuorum
cluster.SerialConsistency = gocql.LocalSerial
Expand Down
13 changes: 10 additions & 3 deletions common/persistence/cassandra/cassandraPersistenceTest.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ import (
"strings"
"time"

"github.com/uber/cadence/common/cassandra"

"github.com/gocql/gocql"
log "github.com/sirupsen/logrus"

Expand Down Expand Up @@ -115,7 +117,12 @@ func (s *TestCluster) TearDownTestDatabase() {

// CreateSession from PersistenceTestCluster interface
func (s *TestCluster) CreateSession() {
s.cluster = NewCassandraCluster(s.cfg.Hosts, s.cfg.Port, testUser, testPassword, "")
s.cluster = cassandra.NewCassandraCluster(config.Cassandra{
Hosts: s.cfg.Hosts,
Port: s.cfg.Port,
User: testUser,
Password: testPassword,
})
s.cluster.Consistency = gocql.Consistency(1)
s.cluster.Keyspace = "system"
s.cluster.Timeout = 40 * time.Second
Expand Down Expand Up @@ -147,7 +154,7 @@ func (s *TestCluster) DropDatabase() {
// LoadSchema from PersistenceTestCluster interface
func (s *TestCluster) LoadSchema(fileNames []string, schemaDir string) {
workflowSchemaDir := schemaDir + "/cadence"
err := LoadCassandraSchema(workflowSchemaDir, fileNames, s.cluster.Hosts, s.cluster.Port, s.DatabaseName(), true)
err := loadCassandraSchema(workflowSchemaDir, fileNames, s.cluster.Hosts, s.cluster.Port, s.DatabaseName(), true, nil)
if err != nil && !strings.Contains(err.Error(), "AlreadyExists") {
log.Fatal(err)
}
Expand All @@ -156,7 +163,7 @@ func (s *TestCluster) LoadSchema(fileNames []string, schemaDir string) {
// LoadVisibilitySchema from PersistenceTestCluster interface
func (s *TestCluster) LoadVisibilitySchema(fileNames []string, schemaDir string) {
workflowSchemaDir := schemaDir + "visibility"
err := LoadCassandraSchema(workflowSchemaDir, fileNames, s.cluster.Hosts, s.cluster.Port, s.DatabaseName(), false)
err := loadCassandraSchema(workflowSchemaDir, fileNames, s.cluster.Hosts, s.cluster.Port, s.DatabaseName(), false, nil)
if err != nil && !strings.Contains(err.Error(), "AlreadyExists") {
log.Fatal(err)
}
Expand Down
4 changes: 2 additions & 2 deletions common/persistence/cassandra/cassandraQueue.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
workflow "github.com/uber/cadence/.gen/go/shared"
"github.com/uber/cadence/common"
"github.com/uber/cadence/common/backoff"
"github.com/uber/cadence/common/cassandra"
"github.com/uber/cadence/common/log"
"github.com/uber/cadence/common/persistence"
"github.com/uber/cadence/common/service/config"
Expand Down Expand Up @@ -71,8 +72,7 @@ func newQueue(
logger log.Logger,
queueType common.QueueType,
) (persistence.Queue, error) {
cluster := NewCassandraCluster(cfg.Hosts, cfg.Port, cfg.User, cfg.Password, cfg.Datacenter)
cluster.Keyspace = cfg.Keyspace
cluster := cassandra.NewCassandraCluster(cfg)
cluster.ProtoVersion = cassandraProtoVersion
cluster.Consistency = gocql.LocalQuorum
cluster.SerialConsistency = gocql.LocalSerial
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ import (
"fmt"
"time"

"github.com/uber/cadence/common/cassandra"

"github.com/gocql/gocql"

workflow "github.com/uber/cadence/.gen/go/shared"
Expand Down Expand Up @@ -144,8 +146,7 @@ type (

// newVisibilityPersistence is used to create an instance of VisibilityManager implementation
func newVisibilityPersistence(cfg config.Cassandra, logger log.Logger) (p.VisibilityStore, error) {
cluster := NewCassandraCluster(cfg.Hosts, cfg.Port, cfg.User, cfg.Password, cfg.Datacenter)
cluster.Keyspace = cfg.Keyspace
cluster := cassandra.NewCassandraCluster(cfg)
cluster.ProtoVersion = cassandraProtoVersion
cluster.Consistency = gocql.LocalQuorum
cluster.SerialConsistency = gocql.LocalSerial
Expand Down
Loading

0 comments on commit e0ef19e

Please sign in to comment.