Skip to content

Commit

Permalink
Kafka TLS: allow CA(RootCAs) or cert/key(certificate chains) only
Browse files Browse the repository at this point in the history
  • Loading branch information
longquanzheng authored Feb 17, 2021
1 parent d39d428 commit 59a98cf
Showing 1 changed file with 19 additions and 17 deletions.
36 changes: 19 additions & 17 deletions common/messaging/kafka/clientImpl.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,32 +169,34 @@ func (c *clientImpl) initAuth(saramaConfig *sarama.Config) error {
return nil
}

// convertTLSConfig convert tls config
func convertTLSConfig(tlsConfig auth.TLS) (*tls.Config, error) {
if !tlsConfig.Enabled {
// convertTLSConfig converts tls config
func convertTLSConfig(authConfig auth.TLS) (*tls.Config, error) {
if !authConfig.Enabled {
return nil, nil
}

if tlsConfig.CertFile != "" && tlsConfig.CaFile != "" && tlsConfig.KeyFile != "" {
cert, err := tls.LoadX509KeyPair(tlsConfig.CertFile, tlsConfig.KeyFile)
tlsConfig := &tls.Config{
InsecureSkipVerify: !authConfig.EnableHostVerification,
}

if authConfig.CaFile != "" {
caCertPool := x509.NewCertPool()
pemData, err := ioutil.ReadFile(authConfig.CaFile)
if err != nil {
return nil, err
}
caCertPool := x509.NewCertPool()
pemData, err := ioutil.ReadFile(tlsConfig.CaFile)
caCertPool.AppendCertsFromPEM(pemData)

tlsConfig.RootCAs = caCertPool
}

if authConfig.CertFile != "" && authConfig.KeyFile != "" {
cert, err := tls.LoadX509KeyPair(authConfig.CertFile, authConfig.KeyFile)
if err != nil {
return nil, err
}
caCertPool.AppendCertsFromPEM(pemData)

return &tls.Config{
Certificates: []tls.Certificate{cert},
RootCAs: caCertPool,
InsecureSkipVerify: !tlsConfig.EnableHostVerification,
}, nil
} else {
return &tls.Config{
InsecureSkipVerify: !tlsConfig.EnableHostVerification,
}, nil
tlsConfig.Certificates = []tls.Certificate{cert}
}
return tlsConfig, nil
}

0 comments on commit 59a98cf

Please sign in to comment.