Skip to content

Commit

Permalink
feat: client id implementation for consumer and producer (#38)
Browse files Browse the repository at this point in the history
* feat: client id implementation for consumer and producer

* feat: update readme

* feat: update example for lean implementation on examples

---------

Co-authored-by: nihat.alim <[email protected]>
  • Loading branch information
nihatalim and nihat.alim authored Jun 19, 2023
1 parent 0a81fc7 commit 084afce
Show file tree
Hide file tree
Showing 4 changed files with 17 additions and 9 deletions.
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@ func main() {
| config | description | default | example |
|------------------------------|----------------------------------------------------------------------------------------------------|----------|--------------------------|
| `logLevel` | Describes log level, valid options are `debug`, `info`, `warn`, and `error` | info | |
| `consumer.clientId` | [see doc](https://pkg.go.dev/github.com/segmentio/[email protected]#Dialer) | | |
| `consumer.cron` | Cron expression when exception consumer starts to work at | | */1 * * * * |
| `consumer.duration` | Work duration exception consumer actively consuming messages | | 20s, 15m, 1h |
| `consumer.topic` | Exception topic names | | exception-topic |
Expand All @@ -120,6 +121,7 @@ func main() {
| `consumer.rebalanceTimeout` | [see doc](https://pkg.go.dev/github.com/segmentio/[email protected]#ReaderConfig.RebalanceTimeout) | 30s | |
| `consumer.startOffset` | [see doc](https://pkg.go.dev/github.com/segmentio/[email protected]#ReaderConfig.StartOffset) | earliest | |
| `consumer.retentionTime` | [see doc](https://pkg.go.dev/github.com/segmentio/[email protected]#ReaderConfig.RetentionTime) | 24h | |
| `producer.clientId` | [see doc](https://pkg.go.dev/github.com/segmentio/[email protected]#Transport) | | |
| `producer.batchSize` | [see doc](https://pkg.go.dev/github.com/segmentio/[email protected]#Writer.BatchSize) | 100 | |
| `producer.batchTimeout` | [see doc](https://pkg.go.dev/github.com/segmentio/[email protected]#Writer.BatchTimeout) | 1s | |
| `sasl.enabled` | It enables sasl authentication mechanism | false | |
Expand Down
10 changes: 6 additions & 4 deletions internal/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,13 @@ func newConsumer(kafkaConfig *kafka.Config) *kafkaConsumer {
RetentionTime: kafkaConfig.Consumer.RetentionTime,
}

readerConfig.Dialer = &segmentio.Dialer{
ClientID: kafkaConfig.Consumer.ClientID,
}

if kafkaConfig.SASL.Enabled {
readerConfig.Dialer = &segmentio.Dialer{
TLS: NewTLSConfig(kafkaConfig.SASL),
SASLMechanism: Mechanism(kafkaConfig.SASL),
}
readerConfig.Dialer.TLS = NewTLSConfig(kafkaConfig.SASL)
readerConfig.Dialer.SASLMechanism = Mechanism(kafkaConfig.SASL)

if kafkaConfig.SASL.Rack != "" {
readerConfig.GroupBalancers = []segmentio.GroupBalancer{segmentio.RackAffinityGroupBalancer{Rack: kafkaConfig.SASL.Rack}}
Expand Down
13 changes: 8 additions & 5 deletions internal/producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,14 +28,17 @@ func newProducer(kafkaConfig *kafka.Config) Producer {
AllowAutoTopicCreation: true,
}

transport := &segmentio.Transport{
ClientID: kafkaConfig.ClientID,
}

if kafkaConfig.SASL.Enabled {
producer.Transport = &segmentio.Transport{
TLS: NewTLSConfig(kafkaConfig.SASL),
SASL: Mechanism(kafkaConfig.SASL),
ClientID: kafkaConfig.ClientID,
}
transport.TLS = NewTLSConfig(kafkaConfig.SASL)
transport.SASL = Mechanism(kafkaConfig.SASL)
}

producer.Transport = transport

return &kafkaProducer{
w: producer,
cfg: kafkaConfig,
Expand Down
1 change: 1 addition & 0 deletions pkg/kafka/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ type SASLConfig struct {
}

type ConsumerConfig struct {
ClientID string `yaml:"clientId"`
GroupID string `yaml:"groupId"`
Topic string `yaml:"topic"`
DeadLetterTopic string `yaml:"deadLetterTopic"`
Expand Down

0 comments on commit 084afce

Please sign in to comment.