From c10e519cd56a9c20162c7bdb70b8c7969a73d155 Mon Sep 17 00:00:00 2001 From: ethfoo Date: Mon, 26 Jun 2023 15:13:13 +0800 Subject: [PATCH] Feat: add franzKafka source (#573) --- pkg/include/include.go | 1 + pkg/sink/franz/helper.go | 2 +- pkg/sink/franz/sink.go | 2 +- pkg/source/franz/config.go | 92 ++++++++++++ pkg/source/franz/kafka.go | 300 +++++++++++++++++++++++++++++++++++++ 5 files changed, 395 insertions(+), 2 deletions(-) create mode 100644 pkg/source/franz/config.go create mode 100644 pkg/source/franz/kafka.go diff --git a/pkg/include/include.go b/pkg/include/include.go index 5ace733c3..57e4a3442 100644 --- a/pkg/include/include.go +++ b/pkg/include/include.go @@ -64,6 +64,7 @@ import ( _ "github.com/loggie-io/loggie/pkg/source/elasticsearch" _ "github.com/loggie-io/loggie/pkg/source/file" _ "github.com/loggie-io/loggie/pkg/source/file/process" + _ "github.com/loggie-io/loggie/pkg/source/franz" _ "github.com/loggie-io/loggie/pkg/source/grpc" _ "github.com/loggie-io/loggie/pkg/source/kafka" _ "github.com/loggie-io/loggie/pkg/source/kubernetes_event" diff --git a/pkg/sink/franz/helper.go b/pkg/sink/franz/helper.go index 9e6aff8d8..d728b2d6c 100644 --- a/pkg/sink/franz/helper.go +++ b/pkg/sink/franz/helper.go @@ -64,7 +64,7 @@ func getCompression(name string) kgo.CompressionCodec { } } -func getMechanism(sasl SASL) sasl.Mechanism { +func GetMechanism(sasl SASL) sasl.Mechanism { switch sasl.Mechanism { case "PLAIN": auth := plain.Auth{ diff --git a/pkg/sink/franz/sink.go b/pkg/sink/franz/sink.go index dd483c31c..948e3a173 100644 --- a/pkg/sink/franz/sink.go +++ b/pkg/sink/franz/sink.go @@ -114,7 +114,7 @@ func (s *Sink) Start() error { } if c.SASL.Enabled == true { - mch := getMechanism(c.SASL) + mch := GetMechanism(c.SASL) if mch != nil { opts = append(opts, kgo.SASL(mch)) } diff --git a/pkg/source/franz/config.go b/pkg/source/franz/config.go new file mode 100644 index 000000000..98303465b --- /dev/null +++ b/pkg/source/franz/config.go @@ -0,0 +1,92 @@ +/* +Copyright 2023 Loggie Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package franz + +import ( + "github.com/loggie-io/loggie/pkg/sink/franz" + "github.com/pkg/errors" + "github.com/twmb/franz-go/pkg/kgo" + "regexp" + "time" +) + +const ( + earliestOffsetReset = "earliest" + latestOffsetReset = "latest" +) + +type Config struct { + Brokers []string `yaml:"brokers,omitempty" validate:"required"` + Topic string `yaml:"topic,omitempty"` + Topics []string `yaml:"topics,omitempty"` + GroupId string `yaml:"groupId,omitempty" default:"loggie"` + ClientId string `yaml:"clientId,omitempty"` + Worker int `yaml:"worker,omitempty" default:"1"` + + FetchMaxWait time.Duration `yaml:"fetchMaxWait,omitempty"` + FetchMaxBytes int32 `yaml:"fetchMaxBytes,omitempty"` + FetchMinBytes int32 `yaml:"fetchMinBytes,omitempty"` + FetchMaxPartitionBytes int32 `yaml:"fetchMaxPartitionBytes,omitempty"` + + EnableAutoCommit bool `yaml:"enableAutoCommit,omitempty"` + AutoCommitInterval time.Duration `yaml:"autoCommitInterval,omitempty" default:"1s"` + AutoOffsetReset string `yaml:"autoOffsetReset,omitempty" default:"latest"` + + SASL franz.SASL `yaml:"sasl,omitempty"` + + AddonMeta *bool `yaml:"addonMeta,omitempty" default:"true"` +} + +func getAutoOffset(autoOffsetReset string) kgo.Offset { + switch autoOffsetReset { + case earliestOffsetReset: + return kgo.NewOffset().AtStart() + case latestOffsetReset: + return kgo.NewOffset().AtEnd() + } + + return kgo.NewOffset().AtEnd() +} + +func (c *Config) Validate() error { + if c.Topic == "" && len(c.Topics) == 0 { + return errors.New("topic or topics is required") + } + + if c.Topic != "" { + _, err := regexp.Compile(c.Topic) + if err != nil { + return errors.WithMessagef(err, "compile kafka topic regex %s error", c.Topic) + } + } + if len(c.Topics) > 0 { + for _, t := range c.Topics { + _, err := regexp.Compile(t) + if err != nil { + return errors.WithMessagef(err, "compile kafka topic regex %s error", t) + } + } + } + + if c.AutoOffsetReset != "" { + if c.AutoOffsetReset != earliestOffsetReset && c.AutoOffsetReset != latestOffsetReset { + return errors.Errorf("autoOffsetReset must be one of %s or %s", earliestOffsetReset, latestOffsetReset) + } + } + + return nil +} diff --git a/pkg/source/franz/kafka.go b/pkg/source/franz/kafka.go new file mode 100644 index 000000000..e7b042851 --- /dev/null +++ b/pkg/source/franz/kafka.go @@ -0,0 +1,300 @@ +/* +Copyright 2023 Loggie Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package franz + +import ( + "context" + "fmt" + "github.com/loggie-io/loggie/pkg/core/api" + "github.com/loggie-io/loggie/pkg/core/event" + "github.com/loggie-io/loggie/pkg/core/log" + "github.com/loggie-io/loggie/pkg/pipeline" + "github.com/loggie-io/loggie/pkg/sink/franz" + "github.com/pkg/errors" + "github.com/twmb/franz-go/pkg/kgo" + "sync" + "time" +) + +const ( + Type = "franzKafka" + + fKafka = "kafka" + fOffset = "offset" + fPartition = "partition" + fTimestamp = "timestamp" + fTopic = "topic" + fLeaderEpoch = "leaderEpoch" +) + +func init() { + pipeline.Register(api.SOURCE, Type, makeSource) +} + +func makeSource(info pipeline.Info) api.Component { + return &Source{ + done: make(chan struct{}), + config: &Config{}, + eventPool: info.EventPool, + } +} + +type Source struct { + name string + done chan struct{} + closeOnce sync.Once + config *Config + client *kgo.Client + eventPool *event.Pool +} + +func (k *Source) Config() interface{} { + return k.config +} + +func (k *Source) Category() api.Category { + return api.SOURCE +} + +func (k *Source) Type() api.Type { + return Type +} + +func (k *Source) String() string { + return fmt.Sprintf("%s/%s", api.SOURCE, Type) +} + +func (k *Source) Init(context api.Context) error { + k.name = context.Name() + return nil +} + +func (k *Source) Start() error { + c := k.config + var logger franz.Logger + + opts := []kgo.Opt{ + kgo.SeedBrokers(c.Brokers...), + kgo.ConsumeRegex(), + kgo.WithLogger(&logger), + } + + // set topics + var confTopic []string + if k.config.Topic != "" { + confTopic = append(confTopic, k.config.Topic) + } + if len(k.config.Topics) > 0 { + confTopic = append(confTopic, k.config.Topics...) + } + opts = append(opts, kgo.ConsumeTopics(confTopic...)) + + // set group id + if k.config.GroupId != "" { + opts = append(opts, kgo.ConsumerGroup(k.config.GroupId)) + } + + // set client id + if k.config.ClientId != "" { + opts = append(opts, kgo.InstanceID(k.config.ClientId)) + } + + if k.config.FetchMaxWait != 0 { + opts = append(opts, kgo.FetchMaxWait(k.config.FetchMaxWait)) + } + + if k.config.FetchMaxBytes != 0 { + opts = append(opts, kgo.FetchMaxBytes(k.config.FetchMaxBytes)) + } + + if k.config.FetchMinBytes != 0 { + opts = append(opts, kgo.FetchMinBytes(k.config.FetchMinBytes)) + } + + if k.config.FetchMaxPartitionBytes != 0 { + opts = append(opts, kgo.FetchMaxBytes(k.config.FetchMaxPartitionBytes)) + } + + // set auto commit + if !k.config.EnableAutoCommit { + opts = append(opts, kgo.DisableAutoCommit()) + } + + if k.config.AutoCommitInterval != 0 { + opts = append(opts, kgo.AutoCommitInterval(k.config.AutoCommitInterval)) + } + + if k.config.AutoOffsetReset != "" { + opts = append(opts, kgo.ConsumeResetOffset(getAutoOffset(c.AutoOffsetReset))) + } + + if c.SASL.Enabled == true { + mch := franz.GetMechanism(c.SASL) + if mch != nil { + opts = append(opts, kgo.SASL(mch)) + } + } + + // new client + cl, err := kgo.NewClient(opts...) + if err != nil { + log.Error("kgo.NewClient error:%s", err) + return err + } + + k.client = cl + return nil +} + +func (k *Source) Stop() { + k.closeOnce.Do(func() { + close(k.done) + + if k.client != nil { + k.client.Close() + } + }) +} + +func (k *Source) ProductLoop(productFunc api.ProductFunc) { + log.Info("%s start product loop", k.String()) + + if k.client == nil { + log.Error("kafka consumer not initialized yet") + return + } + + wg := sync.WaitGroup{} + for i := 0; i < k.config.Worker; i++ { + wg.Add(1) + go func() { + defer wg.Done() + for { + select { + case <-k.done: + return + + default: + err := k.consume(productFunc) + if err != nil { + log.Error("%+v", err) + } + } + } + }() + } + + wg.Wait() +} + +func (k *Source) consume(productFunc api.ProductFunc) error { + fetches := k.client.PollFetches(context.Background()) + if fetches.IsClientClosed() { + return nil + } + + if errs := fetches.Errors(); len(errs) > 0 { + // All errors are retried internally when fetching, but non-retriable errors are + // returned from polls so that users can notice and take action. + return errors.Errorf("consumer read message error: %v", errs) + } + + iter := fetches.RecordIter() + for !iter.Done() { + record := iter.Next() + + e := k.eventPool.Get() + header := e.Header() + if k.config.AddonMeta != nil && *k.config.AddonMeta == true { + if header == nil { + header = make(map[string]interface{}) + } + + // set default metadata + header[fKafka] = map[string]interface{}{ + fOffset: record.Offset, + fPartition: record.Partition, + fTimestamp: record.Timestamp.Format(time.RFC3339), + fTopic: record.Topic, + } + for _, h := range record.Headers { + header[h.Key] = string(h.Value) + } + } + + meta := e.Meta() + if !k.config.EnableAutoCommit { + // set fields to metadata for kafka commit message + if meta == nil { + meta = event.NewDefaultMeta() + } + meta.Set(fOffset, record.Offset) + meta.Set(fPartition, record.Partition) + meta.Set(fTopic, record.Topic) + meta.Set(fLeaderEpoch, record.LeaderEpoch) + } + + e.Fill(meta, header, record.Value) + + productFunc(e) + } + + return nil +} + +func (k *Source) Commit(events []api.Event) { + // commit when sink ack + if !k.config.EnableAutoCommit { + var records []*kgo.Record + for _, e := range events { + meta := e.Meta() + + var mTopic, mPartition, mOffset interface{} + mTopic, exist := meta.Get(fTopic) + if !exist { + continue + } + mPartition, exist = meta.Get(fPartition) + if !exist { + continue + } + mOffset, exist = meta.Get(fOffset) + if !exist { + continue + } + mLeaderEpoch, exist := meta.Get(fLeaderEpoch) + if !exist { + continue + } + + records = append(records, &kgo.Record{ + Topic: mTopic.(string), + Partition: mPartition.(int32), + Offset: mOffset.(int64), + LeaderEpoch: mLeaderEpoch.(int32), + }) + } + if len(records) > 0 { + err := k.client.CommitRecords(context.Background(), records...) + if err != nil { + log.Error("consumer manually commit message error: %v", err) + } + } + } + + k.eventPool.PutAll(events) +}