diff --git a/README.md b/README.md index 31997c1..eb7412d 100644 --- a/README.md +++ b/README.md @@ -103,21 +103,21 @@ Check out on [go-dcp-client](https://github.com/Trendyol/go-dcp-client#configura ### Kafka Specific Configuration -| Variable | Type | Is Required | -|-------------------------------------|----------------------|-------------| -| `kafka.collectionTopicMapping` | map[string]string | yes | -| `kafka.brokers` | array | yes | -| `kafka.readTimeout` | integer | no | -| `kafka.compression` | integer | no | -| `kafka.writeTimeout` | integer | no | -| `kafka.producerBatchSize` | integer | yes | -| `kafka.producerBatchTickerDuration` | integer | yes | -| `kafka.requiredAcks` | integer | no | -| `kafka.secureConnection` | boolean (true/false) | no | -| `kafka.rootCAPath` | string | no | -| `kafka.interCAPath` | string | no | -| `kafka.scramUsername` | string | no | -| `kafka.scramPassword` | string | no | +| Variable | Type | Required | Default | +|-------------------------------------|-------------------|----------|---------| +| `kafka.collectionTopicMapping` | map[string]string | yes | | +| `kafka.brokers` | []string | yes | | +| `kafka.producerBatchSize` | integer | yes | | +| `kafka.producerBatchTickerDuration` | time.Duration | yes | | +| `kafka.readTimeout` | time.Duration | no | | +| `kafka.compression` | integer | no | | +| `kafka.writeTimeout` | time.Duration | no | | +| `kafka.requiredAcks` | integer | no | | +| `kafka.secureConnection` | bool | no | | +| `kafka.rootCAPath` | string | no | | +| `kafka.interCAPath` | string | no | | +| `kafka.scramUsername` | string | no | | +| `kafka.scramPassword` | string | no | | --- diff --git a/kafka/client.go b/kafka/client.go index 7c49a54..96e21fa 100644 --- a/kafka/client.go +++ b/kafka/client.go @@ -151,19 +151,18 @@ func (c *client) GetPartitions(topic string) ([]int, error) { func (c *client) Producer() *kafka.Writer { return &kafka.Writer{ - Addr: kafka.TCP(c.config.Brokers...), - Balancer: &kafka.Hash{}, - BatchSize: c.config.ProducerBatchSize, - BatchBytes: math.MaxInt, - BatchTimeout: 500 * time.Microsecond, - MaxAttempts: math.MaxInt, - ReadTimeout: c.config.ReadTimeout, - WriteTimeout: c.config.WriteTimeout, - RequiredAcks: kafka.RequiredAcks(c.config.RequiredAcks), - Logger: c.logger, - ErrorLogger: c.errorLogger, - Compression: kafka.Compression(c.config.GetCompression()), - AllowAutoTopicCreation: true, + Addr: kafka.TCP(c.config.Brokers...), + Balancer: &kafka.Hash{}, + BatchSize: c.config.ProducerBatchSize, + BatchBytes: math.MaxInt, + BatchTimeout: 500 * time.Microsecond, + MaxAttempts: math.MaxInt, + ReadTimeout: c.config.ReadTimeout, + WriteTimeout: c.config.WriteTimeout, + RequiredAcks: kafka.RequiredAcks(c.config.RequiredAcks), + Logger: c.logger, + ErrorLogger: c.errorLogger, + Compression: kafka.Compression(c.config.GetCompression()), } }