Skip to content

Commit 5f78d90

Browse files
Willem van Bergenwvanbergen
Willem van Bergen
authored andcommitted
Add high water mark offset support to the consumer.
1 parent d0c297e commit 5f78d90

File tree

4 files changed

+76
-11
lines changed

4 files changed

+76
-11
lines changed

consumer.go

+14-2
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package sarama
33
import (
44
"fmt"
55
"sync"
6+
"sync/atomic"
67
"time"
78
)
89

@@ -255,6 +256,11 @@ type PartitionConsumer interface {
255256
// errors are logged and not returned over this channel. If you want to implement any custom errpr
256257
// handling, set your config's Consumer.Return.Errors setting to true, and read from this channel.
257258
Errors() <-chan *ConsumerError
259+
260+
// HighWaterMarkOffset returns the high water mark offset of the partition, i.e. the offset that will
261+
// be used for the next message that will be produced. You can use this to determine how far behind
262+
// the processing is.
263+
HighWaterMarkOffset() int64
258264
}
259265

260266
type partitionConsumer struct {
@@ -268,8 +274,9 @@ type partitionConsumer struct {
268274
errors chan *ConsumerError
269275
trigger, dying chan none
270276

271-
fetchSize int32
272-
offset int64
277+
fetchSize int32
278+
offset int64
279+
highWaterMarkOffset int64
273280
}
274281

275282
func (child *partitionConsumer) sendError(err error) {
@@ -391,6 +398,10 @@ func (child *partitionConsumer) Close() error {
391398
return nil
392399
}
393400

401+
func (child *partitionConsumer) HighWaterMarkOffset() int64 {
402+
return atomic.LoadInt64(&child.highWaterMarkOffset)
403+
}
404+
394405
func (child *partitionConsumer) handleResponse(response *FetchResponse) error {
395406
block := response.GetBlock(child.topic, child.partition)
396407
if block == nil {
@@ -422,6 +433,7 @@ func (child *partitionConsumer) handleResponse(response *FetchResponse) error {
422433

423434
// we got messages, reset our fetch size in case it was increased for a previous request
424435
child.fetchSize = child.conf.Consumer.Fetch.Default
436+
atomic.StoreInt64(&child.highWaterMarkOffset, block.HighWaterMarkOffset)
425437

426438
incomplete := false
427439
atLeastOne := false

consumer_test.go

+20-6
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,7 @@ func TestConsumerOffsetManual(t *testing.T) {
5959
leader.Close()
6060
}
6161

62-
func TestConsumerLatestOffset(t *testing.T) {
62+
func TestConsumerOffsetNewest(t *testing.T) {
6363
seedBroker := newMockBroker(t, 1)
6464
leader := newMockBroker(t, 2)
6565

@@ -69,15 +69,17 @@ func TestConsumerLatestOffset(t *testing.T) {
6969
seedBroker.Returns(metadataResponse)
7070

7171
offsetResponseNewest := new(OffsetResponse)
72-
offsetResponseNewest.AddTopicPartition("my_topic", 0, 0x010102)
72+
offsetResponseNewest.AddTopicPartition("my_topic", 0, 10)
7373
leader.Returns(offsetResponseNewest)
7474

7575
offsetResponseOldest := new(OffsetResponse)
76-
offsetResponseOldest.AddTopicPartition("my_topic", 0, 0x010101)
76+
offsetResponseOldest.AddTopicPartition("my_topic", 0, 7)
7777
leader.Returns(offsetResponseOldest)
7878

7979
fetchResponse := new(FetchResponse)
80-
fetchResponse.AddMessage("my_topic", 0, nil, ByteEncoder([]byte{0x00, 0x0E}), 0x010101)
80+
fetchResponse.AddMessage("my_topic", 0, nil, ByteEncoder([]byte{0x00, 0x0E}), 10)
81+
block := fetchResponse.GetBlock("my_topic", 0)
82+
block.HighWaterMarkOffset = 14
8183
leader.Returns(fetchResponse)
8284

8385
master, err := NewConsumer([]string{seedBroker.Addr()}, nil)
@@ -91,12 +93,24 @@ func TestConsumerLatestOffset(t *testing.T) {
9193
t.Fatal(err)
9294
}
9395

96+
msg := <-consumer.Messages()
97+
98+
// we deliver one message, so it should be one higher than we return in the OffsetResponse
99+
if msg.Offset != 10 {
100+
t.Error("Latest message offset not fetched correctly:", msg.Offset)
101+
}
102+
103+
if hwmo := consumer.HighWaterMarkOffset(); hwmo != 14 {
104+
t.Errorf("Expected high water mark offset 14, found %d", hwmo)
105+
}
106+
94107
leader.Close()
95108
safeClose(t, consumer)
96109
safeClose(t, master)
97110

98-
// we deliver one message, so it should be one higher than we return in the OffsetResponse
99-
if consumer.(*partitionConsumer).offset != 0x010102 {
111+
// We deliver one message, so it should be one higher than we return in the OffsetResponse.
112+
// This way it is set correctly for the next FetchRequest.
113+
if consumer.(*partitionConsumer).offset != 11 {
100114
t.Error("Latest offset not fetched correctly:", consumer.(*partitionConsumer).offset)
101115
}
102116
}

functional_consumer_test.go

+34
Original file line numberDiff line numberDiff line change
@@ -23,3 +23,37 @@ func TestFuncConsumerOffsetOutOfRange(t *testing.T) {
2323

2424
safeClose(t, consumer)
2525
}
26+
27+
func TestConsumerHighWaterMarkOffset(t *testing.T) {
28+
checkKafkaAvailability(t)
29+
30+
p, err := NewSyncProducer(kafkaBrokers, nil)
31+
if err != nil {
32+
t.Fatal(err)
33+
}
34+
defer safeClose(t, p)
35+
36+
_, offset, err := p.SendMessage(&ProducerMessage{Topic: "test.1", Value: StringEncoder("Test")})
37+
if err != nil {
38+
t.Fatal(err)
39+
}
40+
41+
c, err := NewConsumer(kafkaBrokers, nil)
42+
if err != nil {
43+
t.Fatal(err)
44+
}
45+
defer safeClose(t, c)
46+
47+
pc, err := c.ConsumePartition("test.1", 0, OffsetOldest)
48+
if err != nil {
49+
t.Fatal(err)
50+
}
51+
52+
<-pc.Messages()
53+
54+
if hwmo := pc.HighWaterMarkOffset(); hwmo != offset+1 {
55+
t.Logf("Last produced offset %d; high water mark should be one higher but found %d.", offset, hwmo)
56+
}
57+
58+
safeClose(t, pc)
59+
}

mocks/consumer.go

+8-3
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package mocks
22

33
import (
44
"sync"
5+
"sync/atomic"
56

67
"github.com/Shopify/sarama"
78
)
@@ -175,13 +176,13 @@ type PartitionConsumer struct {
175176
consumed bool
176177
errorsShouldBeDrained bool
177178
messagesShouldBeDrained bool
179+
highWaterMarkOffset int64
178180
}
179181

180182
func (pc *PartitionConsumer) handleExpectations() {
181183
pc.l.Lock()
182184
defer pc.l.Unlock()
183185

184-
var offset int64
185186
for ex := range pc.expectations {
186187
if ex.Err != nil {
187188
pc.errors <- &sarama.ConsumerError{
@@ -190,11 +191,11 @@ func (pc *PartitionConsumer) handleExpectations() {
190191
Err: ex.Err,
191192
}
192193
} else {
193-
offset++
194+
atomic.AddInt64(&pc.highWaterMarkOffset, 1)
194195

195196
ex.Msg.Topic = pc.topic
196197
ex.Msg.Partition = pc.partition
197-
ex.Msg.Offset = offset
198+
ex.Msg.Offset = atomic.LoadInt64(&pc.highWaterMarkOffset)
198199

199200
pc.messages <- ex.Msg
200201
}
@@ -274,6 +275,10 @@ func (pc *PartitionConsumer) Messages() <-chan *sarama.ConsumerMessage {
274275
return pc.messages
275276
}
276277

278+
func (pc *PartitionConsumer) HighWaterMarkOffset() int64 {
279+
return atomic.LoadInt64(&pc.highWaterMarkOffset) + 1
280+
}
281+
277282
///////////////////////////////////////////////////
278283
// Expectation API
279284
///////////////////////////////////////////////////

0 commit comments

Comments
 (0)