Skip to content

Commit 6a7bac8

Browse files
author
Varun
authored
Merge pull request IBM#1160 from thomaslee/tom_backoff_func
*.Retry.BackoffFunc
2 parents a3e71cf + a047cd7 commit 6a7bac8

9 files changed

+231
-17
lines changed

async_producer.go

+15-2
Original file line numberDiff line numberDiff line change
@@ -483,6 +483,19 @@ func (p *asyncProducer) newPartitionProducer(topic string, partition int32) chan
483483
return input
484484
}
485485

486+
func (pp *partitionProducer) backoff(retries int) {
487+
var backoff time.Duration
488+
if pp.parent.conf.Producer.Retry.BackoffFunc != nil {
489+
maxRetries := pp.parent.conf.Producer.Retry.Max
490+
backoff = pp.parent.conf.Producer.Retry.BackoffFunc(retries, maxRetries)
491+
} else {
492+
backoff = pp.parent.conf.Producer.Retry.Backoff
493+
}
494+
if backoff > 0 {
495+
time.Sleep(backoff)
496+
}
497+
}
498+
486499
func (pp *partitionProducer) dispatch() {
487500
// try to prefetch the leader; if this doesn't work, we'll do a proper call to `updateLeader`
488501
// on the first message
@@ -517,7 +530,7 @@ func (pp *partitionProducer) dispatch() {
517530
if msg.retries > pp.highWatermark {
518531
// a new, higher, retry level; handle it and then back off
519532
pp.newHighWatermark(msg.retries)
520-
time.Sleep(pp.parent.conf.Producer.Retry.Backoff)
533+
pp.backoff(msg.retries)
521534
} else if pp.highWatermark > 0 {
522535
// we are retrying something (else highWatermark would be 0) but this message is not a *new* retry level
523536
if msg.retries < pp.highWatermark {
@@ -545,7 +558,7 @@ func (pp *partitionProducer) dispatch() {
545558
if pp.brokerProducer == nil {
546559
if err := pp.updateLeader(); err != nil {
547560
pp.parent.returnError(msg, err)
548-
time.Sleep(pp.parent.conf.Producer.Retry.Backoff)
561+
pp.backoff(msg.retries)
549562
continue
550563
}
551564
Logger.Printf("producer/leader/%s/%d selected broker %d\n", pp.topic, pp.partition, pp.leader.ID())

async_producer_test.go

+68
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import (
66
"os"
77
"os/signal"
88
"sync"
9+
"sync/atomic"
910
"testing"
1011
"time"
1112
)
@@ -547,6 +548,73 @@ func TestAsyncProducerMultipleRetries(t *testing.T) {
547548
closeProducer(t, producer)
548549
}
549550

551+
func TestAsyncProducerMultipleRetriesWithBackoffFunc(t *testing.T) {
552+
seedBroker := NewMockBroker(t, 1)
553+
leader1 := NewMockBroker(t, 2)
554+
leader2 := NewMockBroker(t, 3)
555+
556+
metadataLeader1 := new(MetadataResponse)
557+
metadataLeader1.AddBroker(leader1.Addr(), leader1.BrokerID())
558+
metadataLeader1.AddTopicPartition("my_topic", 0, leader1.BrokerID(), nil, nil, ErrNoError)
559+
seedBroker.Returns(metadataLeader1)
560+
561+
config := NewConfig()
562+
config.Producer.Flush.Messages = 1
563+
config.Producer.Return.Successes = true
564+
config.Producer.Retry.Max = 4
565+
566+
backoffCalled := make([]int32, config.Producer.Retry.Max+1)
567+
config.Producer.Retry.BackoffFunc = func(retries, maxRetries int) time.Duration {
568+
atomic.AddInt32(&backoffCalled[retries-1], 1)
569+
return 0
570+
}
571+
producer, err := NewAsyncProducer([]string{seedBroker.Addr()}, config)
572+
if err != nil {
573+
t.Fatal(err)
574+
}
575+
576+
producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)}
577+
prodNotLeader := new(ProduceResponse)
578+
prodNotLeader.AddTopicPartition("my_topic", 0, ErrNotLeaderForPartition)
579+
580+
prodSuccess := new(ProduceResponse)
581+
prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError)
582+
583+
metadataLeader2 := new(MetadataResponse)
584+
metadataLeader2.AddBroker(leader2.Addr(), leader2.BrokerID())
585+
metadataLeader2.AddTopicPartition("my_topic", 0, leader2.BrokerID(), nil, nil, ErrNoError)
586+
587+
leader1.Returns(prodNotLeader)
588+
seedBroker.Returns(metadataLeader2)
589+
leader2.Returns(prodNotLeader)
590+
seedBroker.Returns(metadataLeader1)
591+
leader1.Returns(prodNotLeader)
592+
seedBroker.Returns(metadataLeader1)
593+
leader1.Returns(prodNotLeader)
594+
seedBroker.Returns(metadataLeader2)
595+
leader2.Returns(prodSuccess)
596+
597+
expectResults(t, producer, 1, 0)
598+
599+
producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)}
600+
leader2.Returns(prodSuccess)
601+
expectResults(t, producer, 1, 0)
602+
603+
seedBroker.Close()
604+
leader1.Close()
605+
leader2.Close()
606+
closeProducer(t, producer)
607+
608+
for i := 0; i < config.Producer.Retry.Max; i++ {
609+
if atomic.LoadInt32(&backoffCalled[i]) != 1 {
610+
t.Errorf("expected one retry attempt #%d", i)
611+
}
612+
}
613+
if atomic.LoadInt32(&backoffCalled[config.Producer.Retry.Max]) != 0 {
614+
t.Errorf("expected no retry attempt #%d", config.Producer.Retry.Max)
615+
}
616+
}
617+
550618
func TestAsyncProducerOutOfRetries(t *testing.T) {
551619
t.Skip("Enable once bug #294 is fixed.")
552620

client.go

+17-3
Original file line numberDiff line numberDiff line change
@@ -710,8 +710,11 @@ func (client *client) refreshMetadata() error {
710710
func (client *client) tryRefreshMetadata(topics []string, attemptsRemaining int) error {
711711
retry := func(err error) error {
712712
if attemptsRemaining > 0 {
713+
backoff := client.computeBackoff(attemptsRemaining)
713714
Logger.Printf("client/metadata retrying after %dms... (%d attempts remaining)\n", client.conf.Metadata.Retry.Backoff/time.Millisecond, attemptsRemaining)
714-
time.Sleep(client.conf.Metadata.Retry.Backoff)
715+
if backoff > 0 {
716+
time.Sleep(backoff)
717+
}
715718
return client.tryRefreshMetadata(topics, attemptsRemaining-1)
716719
}
717720
return err
@@ -839,11 +842,22 @@ func (client *client) cachedController() *Broker {
839842
return client.brokers[client.controllerID]
840843
}
841844

845+
func (client *client) computeBackoff(attemptsRemaining int) time.Duration {
846+
if client.conf.Metadata.Retry.BackoffFunc != nil {
847+
maxRetries := client.conf.Metadata.Retry.Max
848+
retries := maxRetries - attemptsRemaining
849+
return client.conf.Metadata.Retry.BackoffFunc(retries, maxRetries)
850+
} else {
851+
return client.conf.Metadata.Retry.Backoff
852+
}
853+
}
854+
842855
func (client *client) getConsumerMetadata(consumerGroup string, attemptsRemaining int) (*FindCoordinatorResponse, error) {
843856
retry := func(err error) (*FindCoordinatorResponse, error) {
844857
if attemptsRemaining > 0 {
845-
Logger.Printf("client/coordinator retrying after %dms... (%d attempts remaining)\n", client.conf.Metadata.Retry.Backoff/time.Millisecond, attemptsRemaining)
846-
time.Sleep(client.conf.Metadata.Retry.Backoff)
858+
backoff := client.computeBackoff(attemptsRemaining)
859+
Logger.Printf("client/coordinator retrying after %dms... (%d attempts remaining)\n", backoff/time.Millisecond, attemptsRemaining)
860+
time.Sleep(backoff)
847861
return client.getConsumerMetadata(consumerGroup, attemptsRemaining-1)
848862
}
849863
return nil, err

client_test.go

+38
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package sarama
33
import (
44
"io"
55
"sync"
6+
"sync/atomic"
67
"testing"
78
"time"
89
)
@@ -260,6 +261,43 @@ func TestClientGetOffset(t *testing.T) {
260261
safeClose(t, client)
261262
}
262263

264+
func TestClientReceivingUnknownTopicWithBackoffFunc(t *testing.T) {
265+
seedBroker := NewMockBroker(t, 1)
266+
267+
metadataResponse1 := new(MetadataResponse)
268+
seedBroker.Returns(metadataResponse1)
269+
270+
retryCount := int32(0)
271+
272+
config := NewConfig()
273+
config.Metadata.Retry.Max = 1
274+
config.Metadata.Retry.BackoffFunc = func(retries, maxRetries int) time.Duration {
275+
atomic.AddInt32(&retryCount, 1)
276+
return 0
277+
}
278+
client, err := NewClient([]string{seedBroker.Addr()}, config)
279+
if err != nil {
280+
t.Fatal(err)
281+
}
282+
283+
metadataUnknownTopic := new(MetadataResponse)
284+
metadataUnknownTopic.AddTopic("new_topic", ErrUnknownTopicOrPartition)
285+
seedBroker.Returns(metadataUnknownTopic)
286+
seedBroker.Returns(metadataUnknownTopic)
287+
288+
if err := client.RefreshMetadata("new_topic"); err != ErrUnknownTopicOrPartition {
289+
t.Error("ErrUnknownTopicOrPartition expected, got", err)
290+
}
291+
292+
safeClose(t, client)
293+
seedBroker.Close()
294+
295+
actualRetryCount := atomic.LoadInt32(&retryCount)
296+
if actualRetryCount != 1 {
297+
t.Fatalf("Expected BackoffFunc to be called exactly once, but saw %d", actualRetryCount)
298+
}
299+
}
300+
263301
func TestClientReceivingUnknownTopic(t *testing.T) {
264302
seedBroker := NewMockBroker(t, 1)
265303

config.go

+12
Original file line numberDiff line numberDiff line change
@@ -92,6 +92,10 @@ type Config struct {
9292
// How long to wait for leader election to occur before retrying
9393
// (default 250ms). Similar to the JVM's `retry.backoff.ms`.
9494
Backoff time.Duration
95+
// Called to compute backoff time dynamically. Useful for implementing
96+
// more sophisticated backoff strategies. This takes precedence over
97+
// `Backoff` if set.
98+
BackoffFunc func(retries, maxRetries int) time.Duration
9599
}
96100
// How frequently to refresh the cluster metadata in the background.
97101
// Defaults to 10 minutes. Set to 0 to disable. Similar to
@@ -179,6 +183,10 @@ type Config struct {
179183
// (default 100ms). Similar to the `retry.backoff.ms` setting of the
180184
// JVM producer.
181185
Backoff time.Duration
186+
// Called to compute backoff time dynamically. Useful for implementing
187+
// more sophisticated backoff strategies. This takes precedence over
188+
// `Backoff` if set.
189+
BackoffFunc func(retries, maxRetries int) time.Duration
182190
}
183191
}
184192

@@ -237,6 +245,10 @@ type Config struct {
237245
// How long to wait after a failing to read from a partition before
238246
// trying again (default 2s).
239247
Backoff time.Duration
248+
// Called to compute backoff time dynamically. Useful for implementing
249+
// more sophisticated backoff strategies. This takes precedence over
250+
// `Backoff` if set.
251+
BackoffFunc func(retries int) time.Duration
240252
}
241253

242254
// Fetch is the namespace for controlling how many bytes are retrieved by any

consumer.go

+16-1
Original file line numberDiff line numberDiff line change
@@ -314,6 +314,8 @@ type partitionConsumer struct {
314314

315315
fetchSize int32
316316
offset int64
317+
318+
retries int32
317319
}
318320

319321
var errTimedOut = errors.New("timed out feeding messages to the user") // not user-facing
@@ -332,12 +334,21 @@ func (child *partitionConsumer) sendError(err error) {
332334
}
333335
}
334336

337+
func (child *partitionConsumer) computeBackoff() time.Duration {
338+
if child.conf.Consumer.Retry.BackoffFunc != nil {
339+
retries := atomic.AddInt32(&child.retries, 1)
340+
return child.conf.Consumer.Retry.BackoffFunc(int(retries))
341+
} else {
342+
return child.conf.Consumer.Retry.Backoff
343+
}
344+
}
345+
335346
func (child *partitionConsumer) dispatcher() {
336347
for range child.trigger {
337348
select {
338349
case <-child.dying:
339350
close(child.trigger)
340-
case <-time.After(child.conf.Consumer.Retry.Backoff):
351+
case <-time.After(child.computeBackoff()):
341352
if child.broker != nil {
342353
child.consumer.unrefBrokerConsumer(child.broker)
343354
child.broker = nil
@@ -451,6 +462,10 @@ feederLoop:
451462
for response := range child.feeder {
452463
msgs, child.responseResult = child.parseResponse(response)
453464

465+
if child.responseResult == nil {
466+
atomic.StoreInt32(&child.retries, 0)
467+
}
468+
454469
for i, msg := range msgs {
455470
messageSelect:
456471
select {

consumer_test.go

+34-8
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import (
55
"os"
66
"os/signal"
77
"sync"
8+
"sync/atomic"
89
"testing"
910
"time"
1011
)
@@ -180,9 +181,7 @@ func TestConsumerDuplicate(t *testing.T) {
180181
broker0.Close()
181182
}
182183

183-
// If consumer fails to refresh metadata it keeps retrying with frequency
184-
// specified by `Config.Consumer.Retry.Backoff`.
185-
func TestConsumerLeaderRefreshError(t *testing.T) {
184+
func runConsumerLeaderRefreshErrorTestWithConfig(t *testing.T, config *Config) {
186185
// Given
187186
broker0 := NewMockBroker(t, 100)
188187

@@ -200,11 +199,6 @@ func TestConsumerLeaderRefreshError(t *testing.T) {
200199
SetMessage("my_topic", 0, 123, testMsg),
201200
})
202201

203-
config := NewConfig()
204-
config.Net.ReadTimeout = 100 * time.Millisecond
205-
config.Consumer.Retry.Backoff = 200 * time.Millisecond
206-
config.Consumer.Return.Errors = true
207-
config.Metadata.Retry.Max = 0
208202
c, err := NewConsumer([]string{broker0.Addr()}, config)
209203
if err != nil {
210204
t.Fatal(err)
@@ -258,6 +252,38 @@ func TestConsumerLeaderRefreshError(t *testing.T) {
258252
broker0.Close()
259253
}
260254

255+
// If consumer fails to refresh metadata it keeps retrying with frequency
256+
// specified by `Config.Consumer.Retry.Backoff`.
257+
func TestConsumerLeaderRefreshError(t *testing.T) {
258+
config := NewConfig()
259+
config.Net.ReadTimeout = 100 * time.Millisecond
260+
config.Consumer.Retry.Backoff = 200 * time.Millisecond
261+
config.Consumer.Return.Errors = true
262+
config.Metadata.Retry.Max = 0
263+
264+
runConsumerLeaderRefreshErrorTestWithConfig(t, config)
265+
}
266+
267+
func TestConsumerLeaderRefreshErrorWithBackoffFunc(t *testing.T) {
268+
var calls int32 = 0
269+
270+
config := NewConfig()
271+
config.Net.ReadTimeout = 100 * time.Millisecond
272+
config.Consumer.Retry.BackoffFunc = func(retries int) time.Duration {
273+
atomic.AddInt32(&calls, 1)
274+
return 200 * time.Millisecond
275+
}
276+
config.Consumer.Return.Errors = true
277+
config.Metadata.Retry.Max = 0
278+
279+
runConsumerLeaderRefreshErrorTestWithConfig(t, config)
280+
281+
// we expect at least one call to our backoff function
282+
if calls == 0 {
283+
t.Fail()
284+
}
285+
}
286+
261287
func TestConsumerInvalidTopic(t *testing.T) {
262288
// Given
263289
broker0 := NewMockBroker(t, 100)

offset_manager.go

+10-1
Original file line numberDiff line numberDiff line change
@@ -120,6 +120,14 @@ func (om *offsetManager) Close() error {
120120
return nil
121121
}
122122

123+
func (om *offsetManager) computeBackoff(retries int) time.Duration {
124+
if om.conf.Metadata.Retry.BackoffFunc != nil {
125+
return om.conf.Metadata.Retry.BackoffFunc(retries, om.conf.Metadata.Retry.Max)
126+
} else {
127+
return om.conf.Metadata.Retry.Backoff
128+
}
129+
}
130+
123131
func (om *offsetManager) fetchInitialOffset(topic string, partition int32, retries int) (int64, string, error) {
124132
broker, err := om.coordinator()
125133
if err != nil {
@@ -161,10 +169,11 @@ func (om *offsetManager) fetchInitialOffset(topic string, partition int32, retri
161169
if retries <= 0 {
162170
return 0, "", block.Err
163171
}
172+
backoff := om.computeBackoff(retries)
164173
select {
165174
case <-om.closing:
166175
return 0, "", block.Err
167-
case <-time.After(om.conf.Metadata.Retry.Backoff):
176+
case <-time.After(backoff):
168177
}
169178
return om.fetchInitialOffset(topic, partition, retries-1)
170179
default:

0 commit comments

Comments
 (0)