Skip to content

Commit

Permalink
acl describe and create + other cleanup and refactoring
Browse files Browse the repository at this point in the history
  • Loading branch information
rmb938 committed Mar 29, 2022
1 parent 8662523 commit a345ca3
Show file tree
Hide file tree
Showing 38 changed files with 2,426 additions and 92 deletions.
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ require (
github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b // indirect
github.com/golang/snappy v0.0.4 // indirect
github.com/hashicorp/errwrap v1.0.0 // indirect
github.com/mitchellh/hashstructure/v2 v2.0.2 // indirect
github.com/onsi/ginkgo v1.16.5 // indirect
github.com/onsi/gomega v1.18.1 // indirect
github.com/pierrec/lz4/v4 v4.1.14 // indirect
Expand All @@ -36,5 +37,6 @@ require (
go.uber.org/atomic v1.7.0 // indirect
go.uber.org/multierr v1.5.0 // indirect
golang.org/x/sys v0.0.0-20211216021012-1d35b9e2eb4e // indirect
google.golang.org/protobuf v1.28.0 // indirect
gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b // indirect
)
4 changes: 4 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,8 @@ github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE=
github.com/mitchellh/hashstructure/v2 v2.0.2 h1:vGKWl0YJqUNxE8d+h8f6NJLcCJrgbhC4NcD46KavDd4=
github.com/mitchellh/hashstructure/v2 v2.0.2/go.mod h1:MG3aRVU/N29oo/V/IhBX8GR/zz4kQkprJgF2EVszyDE=
github.com/nxadm/tail v1.4.4/go.mod h1:kenIhsEOeOJmVchQTgglprH7qJGnHDVpk1VPCcaMI8A=
github.com/nxadm/tail v1.4.8 h1:nPr65rt6Y5JFSKQO7qToXr7pePgD6Gwiw05lkbyAQTE=
github.com/nxadm/tail v1.4.8/go.mod h1:+ncqLTQzXmGhMZNUePPaPqPvBxHAIsmXswZKocGu+AU=
Expand Down Expand Up @@ -211,6 +213,8 @@ google.golang.org/protobuf v1.21.0/go.mod h1:47Nbq4nVaFHyn7ilMalzfO3qCViNmqZ2kzi
google.golang.org/protobuf v1.23.0/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU=
google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw=
google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc=
google.golang.org/protobuf v1.28.0 h1:w43yiav+6bVFTBQFZX0r7ipe9JQ1QsbMgHwbBziscLw=
google.golang.org/protobuf v1.28.0/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/errgo.v2 v2.1.0/go.mod h1:hNsd1EY+bozCKY1Ytp96fpM3vjJbqLJn88ws8XvfDNI=
Expand Down
173 changes: 173 additions & 0 deletions pkg/kafka/logical_broker/authorizer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,173 @@
package logical_broker

import (
"context"
"sync"

"github.com/go-logr/logr"
"github.com/puzpuzpuz/xsync"
"github.com/rmb938/krouter/pkg/kafka/logical_broker/internal_topics_pb"
"github.com/rmb938/krouter/pkg/kafka/logical_broker/models"
"github.com/twmb/franz-go/pkg/kgo"
"google.golang.org/protobuf/proto"
)

type Authorizer struct {
log logr.Logger

kafkaClient *kgo.Client

aclsSyncOnce sync.Once
syncedChan chan struct{}

acls *xsync.MapOf[*models.ACL]
}

func NewAuthorizer(log logr.Logger, kafkaClient *kgo.Client) (*Authorizer, error) {
authorizer := &Authorizer{
log: log.WithName("authorizer"),
kafkaClient: kafkaClient,
syncedChan: make(chan struct{}),
acls: xsync.NewMapOf[*models.ACL](),
}

return authorizer, nil
}

func (c *Authorizer) CreateTestACLs() error {
acls := []struct {
Key *internal_topics_pb.ACLMessageKey
Value *internal_topics_pb.ACLMessageValue
}{
{
Key: &internal_topics_pb.ACLMessageKey{
Operation: internal_topics_pb.ACLMessageKey_OPERATION_WRITE,
ResourceType: internal_topics_pb.ACLMessageKey_RESOURCE_TYPE_TOPIC,
PatternType: internal_topics_pb.ACLMessageKey_PATTERN_TYPE_LITERAL,
ResourceName: "topic-1",
Principal: "User:user1",
},
Value: &internal_topics_pb.ACLMessageValue{
Permission: internal_topics_pb.ACLMessageValue_PERMISSION_ALLOW,
},
},
{
Key: &internal_topics_pb.ACLMessageKey{
Operation: internal_topics_pb.ACLMessageKey_OPERATION_READ,
ResourceType: internal_topics_pb.ACLMessageKey_RESOURCE_TYPE_TOPIC,
PatternType: internal_topics_pb.ACLMessageKey_PATTERN_TYPE_LITERAL,
ResourceName: "topic-1",
Principal: "User:user2",
},
Value: &internal_topics_pb.ACLMessageValue{
Permission: internal_topics_pb.ACLMessageValue_PERMISSION_DENY,
},
},
{
Key: &internal_topics_pb.ACLMessageKey{
Operation: internal_topics_pb.ACLMessageKey_OPERATION_READ,
ResourceType: internal_topics_pb.ACLMessageKey_RESOURCE_TYPE_GROUP,
PatternType: internal_topics_pb.ACLMessageKey_PATTERN_TYPE_PREFIXED,
ResourceName: "my-group1",
Principal: "User:user2",
},
Value: &internal_topics_pb.ACLMessageValue{
Permission: internal_topics_pb.ACLMessageValue_PERMISSION_DENY,
},
},
}

for _, acl := range acls {
aclMessageKeyBytes, err := proto.Marshal(acl.Key)
if err != nil {
return err
}

aclMessageValueBytes, err := proto.Marshal(acl.Value)
if err != nil {
return err
}

record := kgo.KeySliceRecord(aclMessageKeyBytes, aclMessageValueBytes)
record.Topic = InternalTopicAcls
produceResp := c.kafkaClient.ProduceSync(context.TODO(), record)
if produceResp.FirstErr() != nil {
return produceResp.FirstErr()
}
}

return nil
}

func (c *Authorizer) WaitSynced() {
<-c.syncedChan
}

func (c *Authorizer) GetAcls(resourceType models.ACLResourceType, resourceName *string, patternType models.ACLPatternType, principal *string, operation models.ACLOperation, permission models.ACLPermission) []*models.ACL {
var acls []*models.ACL

c.acls.Range(func(_ string, acl *models.ACL) bool {
if resourceType != models.ACLResourceTypeAny && resourceType != acl.ResourceType {
return true
}

if resourceName != nil && *resourceName != acl.ResourceName {
return true
}

if patternType != models.ACLPatternTypeAny && patternType != acl.PatternType {
return true
}

if principal != nil && *principal != acl.Principal {
return true
}

if operation != models.ACLOperationAny && operation != acl.Operation {
return true
}

if permission != models.ACLPermissionAny && permission != acl.Permission {
return true
}

acls = append(acls, acl)
return true
})

return acls
}

func (c *Authorizer) CreateAcl(operation models.ACLOperation, resourceType models.ACLResourceType, patternType models.ACLPatternType, resourceName string, principal string, permission models.ACLPermission) error {

aclKey := &internal_topics_pb.ACLMessageKey{
Operation: internal_topics_pb.ACLMessageKey_Operation(operation),
ResourceType: internal_topics_pb.ACLMessageKey_ResourceType(resourceType),
PatternType: internal_topics_pb.ACLMessageKey_PatternType(patternType),
ResourceName: resourceName,
Principal: principal,
}

aclValue := &internal_topics_pb.ACLMessageValue{
Permission: internal_topics_pb.ACLMessageValue_Permission(permission),
}

aclMessageKeyBytes, err := proto.Marshal(aclKey)
if err != nil {
return err
}

aclMessageValueBytes, err := proto.Marshal(aclValue)
if err != nil {
return err
}

record := kgo.KeySliceRecord(aclMessageKeyBytes, aclMessageValueBytes)
record.Topic = InternalTopicAcls
produceResp := c.kafkaClient.ProduceSync(context.TODO(), record)
if produceResp.FirstErr() != nil {
return produceResp.FirstErr()
}

return nil
}
128 changes: 128 additions & 0 deletions pkg/kafka/logical_broker/authorizer_consumer_acl.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
package logical_broker

import (
"context"
"os"
"strconv"
"sync"

"github.com/mitchellh/hashstructure/v2"
"github.com/rmb938/krouter/pkg/kafka/logical_broker/internal_topics_pb"
"github.com/rmb938/krouter/pkg/kafka/logical_broker/models"
"github.com/twmb/franz-go/pkg/kgo"
"google.golang.org/protobuf/proto"
)

const (
InternalTopicAcls = "__krouter_acls"
)

func (c *Authorizer) ConsumeAcls() {
kafkaClient := c.kafkaClient

highWaterMarks := make(map[int32]int64)
lowWaterMarks := make(map[int32]int64)

for {
fetches := kafkaClient.PollFetches(context.TODO())
if fetches.IsClientClosed() {
c.log.Info("Topic ACL Kafka Client Closed")
os.Exit(1)
return
}

fetchErrors := fetches.Errors()
for _, fetchError := range fetches.Errors() {
c.log.Error(fetchError.Err, "Error polling fetches for acl consumer", "partition", fetchError.Partition)
}
if len(fetchErrors) > 0 {
os.Exit(1)
}

fetches.EachTopic(func(topic kgo.FetchTopic) {
topic.EachPartition(func(partition kgo.FetchPartition) {
if _, ok := lowWaterMarks[partition.Partition]; !ok {
lowWaterMarks[partition.Partition] = 0
}
highWaterMarks[partition.Partition] = partition.HighWatermark
})
})

for iter := fetches.RecordIter(); !iter.Done(); {
record := iter.Next()

lowWaterMarks[record.Partition] = record.Offset + 1

key := record.Key
value := record.Value

if string(key) != InternalControlKey {
aclKey := &internal_topics_pb.ACLMessageKey{}
aclValue := &internal_topics_pb.ACLMessageValue{}
err := proto.Unmarshal(key, aclKey)
if err != nil {
// We don't exit and return here because it'll crash all instances
// instead we just ignore the message
c.log.Error(err, "error parsing acl key", "key", key)
aclKey = nil
}

// value may be nil so don't parse if it is
if value != nil {
err = proto.Unmarshal(value, aclValue)
if err != nil {
// We don't exit and return here because it'll crash all instances
// instead we just ignore the message
c.log.Error(err, "error parsing acl value", "value", value)
aclValue = nil
}
}

// if the key and value was parsed correctly do stuff
if aclKey != nil && aclValue != nil {
aclModel := &models.ACL{
Operation: models.ACLOperation(aclKey.Operation),
ResourceType: models.ACLResourceType(aclKey.ResourceType),
PatternType: models.ACLPatternType(aclKey.PatternType),
ResourceName: aclKey.ResourceName,
Principal: aclKey.Principal,
}

hashInt, err := hashstructure.Hash(aclModel, hashstructure.FormatV2, nil)
if err != nil {
c.log.Error(err, "hashing acl for map key")
}
hashKey := strconv.FormatUint(hashInt, 10)

aclModel.Permission = models.ACLPermission(aclValue.Permission)

if value == nil {
// if no value delete it
c.acls.Delete(hashKey)
} else {
c.acls.Store(hashKey, aclModel)
}
}
}

c.shouldBeSynced(&c.aclsSyncOnce, highWaterMarks, lowWaterMarks)
}
}
}

func (c *Authorizer) shouldBeSynced(once *sync.Once, highWaterMarks, lowWaterMarks map[int32]int64) {
synced := true
for partitionIndex, highMark := range highWaterMarks {
if lowMark, ok := lowWaterMarks[partitionIndex]; ok {
if highMark != lowMark {
synced = false
}
}
}

if synced {
once.Do(func() {
c.syncedChan <- struct{}{}
})
}
}
19 changes: 8 additions & 11 deletions pkg/kafka/logical_broker/broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import (
"github.com/go-logr/logr"
"github.com/google/uuid"
"github.com/hashicorp/go-multierror"
"github.com/rmb938/krouter/pkg/kafka/logical_broker/topics"
"github.com/rmb938/krouter/pkg/kafka/logical_broker/models"
"github.com/rmb938/krouter/pkg/redisw"
)

Expand Down Expand Up @@ -83,14 +83,11 @@ func (b *Broker) InitClusters() error {
return err
}

err = b.controller.CreateInternalTopics()
err = b.controller.Start()
if err != nil {
return err
}

go b.controller.ConsumeTopicPointers()
b.controller.WaitSynced()
b.log.Info("Controller Synced")
b.log.Info("Controller Started")

// TODO: remove this it's temporary
pointer, err := b.controller.APIGetTopicPointer("test1")
Expand Down Expand Up @@ -197,17 +194,17 @@ func (b *Broker) GetClusters() map[string]*Cluster {
return b.clusters
}

func (b *Broker) GetTopics() ([]*topics.Topic, error) {
var allTopics []*topics.Topic
func (b *Broker) GetTopics() ([]*models.Topic, error) {
var allTopics []*models.Topic

for _, cluster := range b.clusters {
cluster.topics.Range(func(topicName string, topic *topics.Topic) bool {
cluster.topics.Range(func(topicName string, topic *models.Topic) bool {
if clusterName, ok := b.controller.topicPointers.Load(topicName); ok {
if clusterName == cluster.Name {
allTopics = append(allTopics, topic)
}
}
return false
return true
})
}

Expand All @@ -224,7 +221,7 @@ func (b *Broker) GetClusterByTopic(topicName string) *Cluster {
return nil
}

func (b *Broker) GetTopic(topicName string) (*Cluster, *topics.Topic) {
func (b *Broker) GetTopic(topicName string) (*Cluster, *models.Topic) {
if cluster := b.GetClusterByTopic(topicName); cluster != nil {
if topic, ok := cluster.topics.Load(topicName); ok {
return cluster, topic
Expand Down
Loading

0 comments on commit a345ca3

Please sign in to comment.