Skip to content

Commit

Permalink
Rewrite Kafka client and support SASL/TLS
Browse files Browse the repository at this point in the history
  • Loading branch information
longquanzheng authored Dec 9, 2020
1 parent d58cf9d commit df3c714
Show file tree
Hide file tree
Showing 29 changed files with 964 additions and 415 deletions.
5 changes: 2 additions & 3 deletions cmd/server/cadence/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ import (
"time"

"go.uber.org/cadence/.gen/go/cadence/workflowserviceclient"
"go.uber.org/zap"

"github.com/uber/cadence/client"
"github.com/uber/cadence/common"
Expand All @@ -37,7 +36,7 @@ import (
"github.com/uber/cadence/common/elasticsearch"
"github.com/uber/cadence/common/log/loggerimpl"
"github.com/uber/cadence/common/log/tag"
"github.com/uber/cadence/common/messaging"
"github.com/uber/cadence/common/messaging/kafka"
"github.com/uber/cadence/common/metrics"
"github.com/uber/cadence/common/service"
"github.com/uber/cadence/common/service/config"
Expand Down Expand Up @@ -161,7 +160,7 @@ func (s *server) startService() common.Daemon {
)()
isAdvancedVisEnabled := advancedVisMode != common.AdvancedVisibilityWritingModeOff
if isAdvancedVisEnabled {
params.MessagingClient = messaging.NewKafkaClient(&s.cfg.Kafka, params.MetricsClient, zap.NewNop(), params.Logger, params.MetricScope, isAdvancedVisEnabled)
params.MessagingClient = kafka.NewKafkaClient(&s.cfg.Kafka, params.MetricsClient, params.Logger, params.MetricScope, isAdvancedVisEnabled)
} else {
params.MessagingClient = nil
}
Expand Down
31 changes: 31 additions & 0 deletions common/auth/sasl.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
// Copyright (c) 2020 Uber Technologies, Inc.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.

package auth

type (
// SASL describe SASL configuration (for Kafka)
SASL struct {
Enabled bool `yaml:"enabled"` // false as default
User string `yaml:"user"`
Password string `yaml:"password"`
Algorithm string `yaml:"algorithm"` // plain, sha512 or sha256
}
)
58 changes: 58 additions & 0 deletions common/auth/scram_client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
// Copyright (c) 2020 Uber Technologies, Inc.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.

package auth

import (
"crypto/sha256"
"crypto/sha512"
"hash"

"github.com/xdg/scram"
)

// NOTE: the code is copied from https://github.com/Shopify/sarama/blob/master/examples/sasl_scram_client/scram_client.go

var SHA256 scram.HashGeneratorFcn = func() hash.Hash { return sha256.New() }
var SHA512 scram.HashGeneratorFcn = func() hash.Hash { return sha512.New() }

type XDGSCRAMClient struct {
*scram.Client
*scram.ClientConversation
scram.HashGeneratorFcn
}

func (x *XDGSCRAMClient) Begin(userName, password, authzID string) (err error) {
x.Client, err = x.HashGeneratorFcn.NewClient(userName, password, authzID)
if err != nil {
return err
}
x.ClientConversation = x.Client.NewConversation()
return nil
}

func (x *XDGSCRAMClient) Step(challenge string) (response string, err error) {
response, err = x.ClientConversation.Step(challenge)
return
}

func (x *XDGSCRAMClient) Done() bool {
return x.ClientConversation.Done()
}
151 changes: 151 additions & 0 deletions common/messaging/ackManager.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,151 @@
// Copyright (c) 2020 Uber Technologies, Inc.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.

package messaging

import (
"fmt"
"sync"

"go.uber.org/atomic"

"github.com/uber/cadence/common/log"
"github.com/uber/cadence/common/log/tag"
)

type ackManager struct {
sync.RWMutex
outstandingMessages map[int64]bool // key->itemID, value->(true for acked/completed, false->for non acked)
readLevel int64 // Maximum itemID inserted into outstandingMessages
ackLevel int64 // Maximum itemID below which all messages are acked
backlogCounter atomic.Int64
logIncontinuousErr bool // emit error for itemID being incontinuous when consuming for potential bugs
logger log.Logger
}

// NewAckManager returns a AckManager without monitoring the itemIDs continousness.
// For example, our internal matching task queue doesn't guarantee it.
func NewAckManager(logger log.Logger) AckManager {
return newAckManager(false, logger)
}

// NewContinuousAckManager returns a ContinuousAckManager
// it will emit error logs for itemIDs being incontinuous
// This is useful for some message queue system that guarantees continuousness
// that we want to monitor it's behaving correctly
func NewContinuousAckManager(logger log.Logger) AckManager {
return newAckManager(true, logger)
}

func newAckManager(logIncontinuousErr bool, logger log.Logger) AckManager {
return &ackManager{
logger: logger,
outstandingMessages: make(map[int64]bool),
readLevel: -1,
ackLevel: -1,
logIncontinuousErr: logIncontinuousErr,
}
}

// Registers message as in-flight and moves read level to it. Messages can be added in increasing order of messageID only.
// NOTE that ackManager assumes adding messages is in order
func (m *ackManager) ReadItem(itemID int64) error {
m.Lock()
defer m.Unlock()
m.backlogCounter.Inc()
if m.readLevel >= itemID {
return fmt.Errorf("next item ID is less than or equal to current read level. itemID %d, readLevel %d", itemID, m.readLevel)
}
if _, ok := m.outstandingMessages[itemID]; ok {
return fmt.Errorf("already present in outstanding items but hasn't added itemID:%d", itemID)
}
m.readLevel = itemID
if m.ackLevel == -1 {
// because of ordering, the first itemID is the minimum to ack
m.ackLevel = itemID - 1
m.logger.Info("this is the very first itemID being read in this ackManager",
tag.TaskID(itemID),
)
}
m.outstandingMessages[itemID] = false // true is for acked
return nil
}

func (m *ackManager) AckItem(itemID int64) (ackLevel int64) {
m.Lock()
defer m.Unlock()
if completed, ok := m.outstandingMessages[itemID]; ok && !completed {
m.outstandingMessages[itemID] = true
m.backlogCounter.Dec()
} else {
m.logger.Warn("Duplicated completion for item",
tag.TaskID(itemID))
}

// Update ackLevel
for current := m.ackLevel + 1; current <= m.readLevel; current++ {
if acked, ok := m.outstandingMessages[current]; ok {
if acked {
m.ackLevel = current
delete(m.outstandingMessages, current)
} else {
return m.ackLevel
}
} else {
if m.logIncontinuousErr {
m.logger.Error("potential bug, an item is probably skipped when adding", tag.TaskID(current))
}
}
}
return m.ackLevel
}

func (m *ackManager) GetReadLevel() int64 {
m.RLock()
defer m.RUnlock()
return m.readLevel
}

func (m *ackManager) SetReadLevel(readLevel int64) {
m.Lock()
defer m.Unlock()
m.readLevel = readLevel
}

func (m *ackManager) GetAckLevel() int64 {
m.RLock()
defer m.RUnlock()
return m.ackLevel
}

func (m *ackManager) SetAckLevel(ackLevel int64) {
m.Lock()
defer m.Unlock()
if ackLevel > m.ackLevel {
m.ackLevel = ackLevel
}
if ackLevel > m.readLevel {
m.readLevel = ackLevel
}
}

func (m *ackManager) GetBacklogCount() int64 {
return m.backlogCounter.Load()
}
86 changes: 86 additions & 0 deletions common/messaging/ackManager_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
// Copyright (c) 2020 Uber Technologies, Inc.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.

package messaging

import (
"testing"

"github.com/stretchr/testify/assert"

"github.com/uber/cadence/common/log/loggerimpl"
)

func TestAckManager(t *testing.T) {
logger, err := loggerimpl.NewDevelopment()
assert.Nil(t, err)
m := NewAckManager(logger)
m.SetAckLevel(100)
assert.EqualValues(t, 100, m.GetAckLevel())
assert.EqualValues(t, 100, m.GetReadLevel())
const t1 = 200
const t2 = 220
const t3 = 320
const t4 = 340
const t5 = 360

err = m.ReadItem(t1)
assert.Nil(t, err)
assert.EqualValues(t, 100, m.GetAckLevel())
assert.EqualValues(t, t1, m.GetReadLevel())

err = m.ReadItem(t2)
assert.Nil(t, err)
assert.EqualValues(t, 100, m.GetAckLevel())
assert.EqualValues(t, t2, m.GetReadLevel())

m.AckItem(t2)
assert.EqualValues(t, 100, m.GetAckLevel())
assert.EqualValues(t, t2, m.GetReadLevel())

m.AckItem(t1)
assert.EqualValues(t, t2, m.GetAckLevel())
assert.EqualValues(t, t2, m.GetReadLevel())

m.SetAckLevel(300)
assert.EqualValues(t, 300, m.GetAckLevel())
assert.EqualValues(t, 300, m.GetReadLevel())

err = m.ReadItem(t3)
assert.Nil(t, err)
assert.EqualValues(t, 300, m.GetAckLevel())
assert.EqualValues(t, t3, m.GetReadLevel())

err = m.ReadItem(t4)
assert.Nil(t, err)
assert.EqualValues(t, 300, m.GetAckLevel())
assert.EqualValues(t, t4, m.GetReadLevel())

m.AckItem(t3)
assert.EqualValues(t, t3, m.GetAckLevel())
assert.EqualValues(t, t4, m.GetReadLevel())

m.AckItem(t4)
assert.EqualValues(t, t4, m.GetAckLevel())
assert.EqualValues(t, t4, m.GetReadLevel())

m.SetReadLevel(t5)
assert.EqualValues(t, t5, m.GetReadLevel())
}
20 changes: 19 additions & 1 deletion common/messaging/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import (
type (
// Client is the interface used to abstract out interaction with messaging system for replication
Client interface {
NewConsumer(appName, consumerName string, concurrency int) (Consumer, error)
NewConsumer(appName, consumerName string) (Consumer, error)
NewProducer(appName string) (Producer, error)
}

Expand Down Expand Up @@ -65,4 +65,22 @@ type (
Producer
Close() error
}

// AckManager convert out of order acks into ackLevel movement.
AckManager interface {
// Read an item into backlog for processing for ack
ReadItem(id int64) error
// Get current max ID from read items
GetReadLevel() int64
// Set current max ID from read items
SetReadLevel(readLevel int64)
// Mark an item as done processing, and remove from backlog
AckItem(id int64) (ackLevel int64)
// Get current max level that can safely ack
GetAckLevel() int64
// Set current max level that can safely ack
SetAckLevel(ackLevel int64)
// GetBacklogCount return the of items that are waiting for ack
GetBacklogCount() int64
}
)
Loading

0 comments on commit df3c714

Please sign in to comment.