Skip to content

Commit

Permalink
changed queue.buffering.backpressure.threshold default (confluentinc#…
Browse files Browse the repository at this point in the history
  • Loading branch information
mhowlett authored and edenhill committed Jun 15, 2018
1 parent 52c0a71 commit 47e4732
Show file tree
Hide file tree
Showing 3 changed files with 11 additions and 6 deletions.
2 changes: 1 addition & 1 deletion CONFIGURATION.md
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ linger.ms | P | |
message.send.max.retries | P | 0 .. 10000000 | 2 | How many times to retry sending a failing MessageSet. **Note:** retrying may cause reordering. <br>*Type: integer*
retries | P | | | Alias for `message.send.max.retries`
retry.backoff.ms | P | 1 .. 300000 | 100 | The backoff time in milliseconds before retrying a protocol request. <br>*Type: integer*
queue.buffering.backpressure.threshold | P | 0 .. 1000000 | 10 | The threshold of outstanding not yet transmitted requests needed to backpressure the producer's message accumulator. A lower number yields larger and more effective batches. <br>*Type: integer*
queue.buffering.backpressure.threshold | P | 1 .. 1000000 | 1 | The threshold of outstanding not yet transmitted broker requests needed to backpressure the producer's message accumulator. If the number of not yet transmitted requests equals or exceeds this number, produce request creation that would have otherwise been triggered (for example, in accordance with linger.ms) will be delayed. A lower number yields larger and more effective batches. A higher value can improve latency when using compression on slow machines. <br>*Type: integer*
compression.codec | P | none, gzip, snappy, lz4 | none | compression codec to use for compressing message sets. This is the default value for all topics, may be overriden by the topic configuration property `compression.codec`. <br>*Type: enum value*
compression.type | P | | | Alias for `compression.codec`
batch.num.messages | P | 1 .. 1000000 | 10000 | Maximum number of messages batched in one MessageSet. The total MessageSet size is also limited by message.max.bytes. <br>*Type: integer*
Expand Down
2 changes: 1 addition & 1 deletion src/rdkafka_broker.c
Original file line number Diff line number Diff line change
Expand Up @@ -2603,7 +2603,7 @@ static int rd_kafka_toppar_producer_serve (rd_kafka_broker_t *rkb,
* avoid having to acquire the lock in the typical case
* (do_timeout_scan==0). */
if (unlikely(!do_timeout_scan &&
rd_atomic32_get(&rkb->rkb_outbufs.rkbq_cnt) >
rd_atomic32_get(&rkb->rkb_outbufs.rkbq_cnt) >=
rkb->rkb_rk->rk_conf.queue_backpressure_thres))
return 0;

Expand Down
13 changes: 9 additions & 4 deletions src/rdkafka_conf.c
Original file line number Diff line number Diff line change
Expand Up @@ -784,12 +784,17 @@ static const struct rd_kafka_property rd_kafka_properties[] = {
"The backoff time in milliseconds before retrying a protocol request.",
1, 300*1000, 100 },

{ _RK_GLOBAL|_RK_PRODUCER, "queue.buffering.backpressure.threshold",
{ _RK_GLOBAL|_RK_PRODUCER, "queue.buffering.backpressure.threshold",
_RK_C_INT, _RK(queue_backpressure_thres),
"The threshold of outstanding not yet transmitted requests "
"The threshold of outstanding not yet transmitted broker requests "
"needed to backpressure the producer's message accumulator. "
"A lower number yields larger and more effective batches.",
0, 1000000, 10 },
"If the number of not yet transmitted requests equals or exceeds "
"this number, produce request creation that would have otherwise "
"been triggered (for example, in accordance with linger.ms) will be "
"delayed. A lower number yields larger and more effective batches. "
"A higher value can improve latency when using compression on slow "
"machines.",
1, 1000000, 1 },

{ _RK_GLOBAL|_RK_PRODUCER, "compression.codec", _RK_C_S2I,
_RK(compression_codec),
Expand Down

0 comments on commit 47e4732

Please sign in to comment.