Skip to content

Commit 953231c

Browse files
committed
Add support for Kafka 0.11 in consumer
This change adds support for new Records format for Kafka 0.11. It supports Records headers, however it doesn't support transactions or idempotent messages.
1 parent cfb0c1d commit 953231c

7 files changed

+462
-122
lines changed

consumer.go

+107-38
Original file line numberDiff line numberDiff line change
@@ -14,8 +14,9 @@ type ConsumerMessage struct {
1414
Topic string
1515
Partition int32
1616
Offset int64
17-
Timestamp time.Time // only set if kafka is version 0.10+, inner message timestamp
18-
BlockTimestamp time.Time // only set if kafka is version 0.10+, outer (compressed) block timestamp
17+
Timestamp time.Time // only set if kafka is version 0.10+, inner message timestamp
18+
BlockTimestamp time.Time // only set if kafka is version 0.10+, outer (compressed) block timestamp
19+
Headers []*RecordHeader // only set if kafka is version 0.11+
1920
}
2021

2122
// ConsumerError is what is provided to the user when an error occurs.
@@ -478,44 +479,12 @@ feederLoop:
478479
close(child.errors)
479480
}
480481

481-
func (child *partitionConsumer) parseResponse(response *FetchResponse) ([]*ConsumerMessage, error) {
482-
block := response.GetBlock(child.topic, child.partition)
483-
if block == nil {
484-
return nil, ErrIncompleteResponse
485-
}
486-
487-
if block.Err != ErrNoError {
488-
return nil, block.Err
489-
}
490-
491-
if len(block.MsgSet.Messages) == 0 {
492-
// We got no messages. If we got a trailing one then we need to ask for more data.
493-
// Otherwise we just poll again and wait for one to be produced...
494-
if block.MsgSet.PartialTrailingMessage {
495-
if child.conf.Consumer.Fetch.Max > 0 && child.fetchSize == child.conf.Consumer.Fetch.Max {
496-
// we can't ask for more data, we've hit the configured limit
497-
child.sendError(ErrMessageTooLarge)
498-
child.offset++ // skip this one so we can keep processing future messages
499-
} else {
500-
child.fetchSize *= 2
501-
if child.conf.Consumer.Fetch.Max > 0 && child.fetchSize > child.conf.Consumer.Fetch.Max {
502-
child.fetchSize = child.conf.Consumer.Fetch.Max
503-
}
504-
}
505-
}
506-
507-
return nil, nil
508-
}
509-
510-
// we got messages, reset our fetch size in case it was increased for a previous request
511-
child.fetchSize = child.conf.Consumer.Fetch.Default
512-
atomic.StoreInt64(&child.highWaterMarkOffset, block.HighWaterMarkOffset)
513-
514-
incomplete := false
515-
prelude := true
482+
func (child *partitionConsumer) parseMessages(msgSet *MessageSet) ([]*ConsumerMessage, error) {
516483
var messages []*ConsumerMessage
517-
for _, msgBlock := range block.MsgSet.Messages {
484+
var incomplete bool
485+
prelude := true
518486

487+
for _, msgBlock := range msgSet.Messages {
519488
for _, msg := range msgBlock.Messages() {
520489
offset := msg.Offset
521490
if msg.Msg.Version >= 1 {
@@ -542,7 +511,52 @@ func (child *partitionConsumer) parseResponse(response *FetchResponse) ([]*Consu
542511
incomplete = true
543512
}
544513
}
514+
}
515+
516+
if incomplete || len(messages) == 0 {
517+
return nil, ErrIncompleteResponse
518+
}
519+
return messages, nil
520+
}
521+
522+
func (child *partitionConsumer) parseRecords(block *FetchResponseBlock) ([]*ConsumerMessage, error) {
523+
var messages []*ConsumerMessage
524+
var incomplete bool
525+
prelude := true
526+
batch := block.Records.recordBatch
527+
528+
for _, rec := range batch.Records {
529+
offset := batch.FirstOffset + rec.OffsetDelta
530+
if prelude && offset < child.offset {
531+
continue
532+
}
533+
prelude = false
534+
535+
millis := batch.FirstTimestamp + rec.TimestampDelta
536+
timestamp := time.Time{}
537+
if millis >= 0 {
538+
timestamp = time.Unix(millis/1000, (millis%1000)*int64(time.Millisecond))
539+
}
545540

541+
if offset >= child.offset {
542+
messages = append(messages, &ConsumerMessage{
543+
Topic: child.topic,
544+
Partition: child.partition,
545+
Key: rec.Key,
546+
Value: rec.Value,
547+
Offset: offset,
548+
Timestamp: timestamp,
549+
Headers: rec.Headers,
550+
})
551+
child.offset = offset + 1
552+
} else {
553+
incomplete = true
554+
}
555+
556+
if child.offset > block.LastStableOffset {
557+
// We reached the end of closed transactions
558+
break
559+
}
546560
}
547561

548562
if incomplete || len(messages) == 0 {
@@ -551,6 +565,57 @@ func (child *partitionConsumer) parseResponse(response *FetchResponse) ([]*Consu
551565
return messages, nil
552566
}
553567

568+
func (child *partitionConsumer) parseResponse(response *FetchResponse) ([]*ConsumerMessage, error) {
569+
block := response.GetBlock(child.topic, child.partition)
570+
if block == nil {
571+
return nil, ErrIncompleteResponse
572+
}
573+
574+
if block.Err != ErrNoError {
575+
return nil, block.Err
576+
}
577+
578+
nRecs, err := block.Records.numRecords()
579+
if err != nil {
580+
return nil, err
581+
}
582+
if nRecs == 0 {
583+
partialTrailingMessage, err := block.Records.isPartial()
584+
if err != nil {
585+
return nil, err
586+
}
587+
// We got no messages. If we got a trailing one then we need to ask for more data.
588+
// Otherwise we just poll again and wait for one to be produced...
589+
if partialTrailingMessage {
590+
if child.conf.Consumer.Fetch.Max > 0 && child.fetchSize == child.conf.Consumer.Fetch.Max {
591+
// we can't ask for more data, we've hit the configured limit
592+
child.sendError(ErrMessageTooLarge)
593+
child.offset++ // skip this one so we can keep processing future messages
594+
} else {
595+
child.fetchSize *= 2
596+
if child.conf.Consumer.Fetch.Max > 0 && child.fetchSize > child.conf.Consumer.Fetch.Max {
597+
child.fetchSize = child.conf.Consumer.Fetch.Max
598+
}
599+
}
600+
}
601+
602+
return nil, nil
603+
}
604+
605+
// we got messages, reset our fetch size in case it was increased for a previous request
606+
child.fetchSize = child.conf.Consumer.Fetch.Default
607+
atomic.StoreInt64(&child.highWaterMarkOffset, block.HighWaterMarkOffset)
608+
609+
if control, err := block.Records.isControl(); err != nil || control {
610+
return nil, err
611+
}
612+
613+
if response.Version < 4 {
614+
return child.parseMessages(block.Records.msgSet)
615+
}
616+
return child.parseRecords(block)
617+
}
618+
554619
// brokerConsumer
555620

556621
type brokerConsumer struct {
@@ -740,6 +805,10 @@ func (bc *brokerConsumer) fetchNewMessages() (*FetchResponse, error) {
740805
request.Version = 3
741806
request.MaxBytes = MaxResponseSize
742807
}
808+
if bc.consumer.conf.Version.IsAtLeast(V0_11_0_0) {
809+
request.Version = 4
810+
request.Isolation = ReadUncommitted // We don't support yet transactions.
811+
}
743812

744813
for child := range bc.subscriptions {
745814
request.AddBlock(child.topic, child.partition, child.offset, child.fetchSize)

consumer_test.go

+98-66
Original file line numberDiff line numberDiff line change
@@ -379,86 +379,118 @@ func TestConsumerShutsDownOutOfRange(t *testing.T) {
379379
// requested, then such messages are ignored.
380380
func TestConsumerExtraOffsets(t *testing.T) {
381381
// Given
382-
broker0 := NewMockBroker(t, 0)
383-
fetchResponse1 := &FetchResponse{}
384-
fetchResponse1.AddMessage("my_topic", 0, nil, testMsg, 1)
385-
fetchResponse1.AddMessage("my_topic", 0, nil, testMsg, 2)
386-
fetchResponse1.AddMessage("my_topic", 0, nil, testMsg, 3)
387-
fetchResponse1.AddMessage("my_topic", 0, nil, testMsg, 4)
388-
fetchResponse2 := &FetchResponse{}
389-
fetchResponse2.AddError("my_topic", 0, ErrNoError)
390-
broker0.SetHandlerByMap(map[string]MockResponse{
391-
"MetadataRequest": NewMockMetadataResponse(t).
392-
SetBroker(broker0.Addr(), broker0.BrokerID()).
393-
SetLeader("my_topic", 0, broker0.BrokerID()),
394-
"OffsetRequest": NewMockOffsetResponse(t).
395-
SetOffset("my_topic", 0, OffsetNewest, 1234).
396-
SetOffset("my_topic", 0, OffsetOldest, 0),
397-
"FetchRequest": NewMockSequence(fetchResponse1, fetchResponse2),
398-
})
382+
legacyFetchResponse := &FetchResponse{}
383+
legacyFetchResponse.AddMessage("my_topic", 0, nil, testMsg, 1)
384+
legacyFetchResponse.AddMessage("my_topic", 0, nil, testMsg, 2)
385+
legacyFetchResponse.AddMessage("my_topic", 0, nil, testMsg, 3)
386+
legacyFetchResponse.AddMessage("my_topic", 0, nil, testMsg, 4)
387+
newFetchResponse := &FetchResponse{Version: 4}
388+
newFetchResponse.AddRecord("my_topic", 0, nil, testMsg, 1)
389+
newFetchResponse.AddRecord("my_topic", 0, nil, testMsg, 2)
390+
newFetchResponse.AddRecord("my_topic", 0, nil, testMsg, 3)
391+
newFetchResponse.AddRecord("my_topic", 0, nil, testMsg, 4)
392+
newFetchResponse.SetLastStableOffset("my_topic", 0, 4)
393+
for _, fetchResponse1 := range []*FetchResponse{legacyFetchResponse, newFetchResponse} {
394+
var offsetResponseVersion int16
395+
cfg := NewConfig()
396+
if fetchResponse1.Version >= 4 {
397+
cfg.Version = V0_11_0_0
398+
offsetResponseVersion = 1
399+
}
399400

400-
master, err := NewConsumer([]string{broker0.Addr()}, nil)
401-
if err != nil {
402-
t.Fatal(err)
403-
}
401+
broker0 := NewMockBroker(t, 0)
402+
fetchResponse2 := &FetchResponse{}
403+
fetchResponse2.Version = fetchResponse1.Version
404+
fetchResponse2.AddError("my_topic", 0, ErrNoError)
405+
broker0.SetHandlerByMap(map[string]MockResponse{
406+
"MetadataRequest": NewMockMetadataResponse(t).
407+
SetBroker(broker0.Addr(), broker0.BrokerID()).
408+
SetLeader("my_topic", 0, broker0.BrokerID()),
409+
"OffsetRequest": NewMockOffsetResponse(t).
410+
SetVersion(offsetResponseVersion).
411+
SetOffset("my_topic", 0, OffsetNewest, 1234).
412+
SetOffset("my_topic", 0, OffsetOldest, 0),
413+
"FetchRequest": NewMockSequence(fetchResponse1, fetchResponse2),
414+
})
415+
416+
master, err := NewConsumer([]string{broker0.Addr()}, cfg)
417+
if err != nil {
418+
t.Fatal(err)
419+
}
404420

405-
// When
406-
consumer, err := master.ConsumePartition("my_topic", 0, 3)
407-
if err != nil {
408-
t.Fatal(err)
409-
}
421+
// When
422+
consumer, err := master.ConsumePartition("my_topic", 0, 3)
423+
if err != nil {
424+
t.Fatal(err)
425+
}
410426

411-
// Then: messages with offsets 1 and 2 are not returned even though they
412-
// are present in the response.
413-
assertMessageOffset(t, <-consumer.Messages(), 3)
414-
assertMessageOffset(t, <-consumer.Messages(), 4)
427+
// Then: messages with offsets 1 and 2 are not returned even though they
428+
// are present in the response.
429+
assertMessageOffset(t, <-consumer.Messages(), 3)
430+
assertMessageOffset(t, <-consumer.Messages(), 4)
415431

416-
safeClose(t, consumer)
417-
safeClose(t, master)
418-
broker0.Close()
432+
safeClose(t, consumer)
433+
safeClose(t, master)
434+
broker0.Close()
435+
}
419436
}
420437

421438
// It is fine if offsets of fetched messages are not sequential (although
422439
// strictly increasing!).
423440
func TestConsumerNonSequentialOffsets(t *testing.T) {
424441
// Given
425-
broker0 := NewMockBroker(t, 0)
426-
fetchResponse1 := &FetchResponse{}
427-
fetchResponse1.AddMessage("my_topic", 0, nil, testMsg, 5)
428-
fetchResponse1.AddMessage("my_topic", 0, nil, testMsg, 7)
429-
fetchResponse1.AddMessage("my_topic", 0, nil, testMsg, 11)
430-
fetchResponse2 := &FetchResponse{}
431-
fetchResponse2.AddError("my_topic", 0, ErrNoError)
432-
broker0.SetHandlerByMap(map[string]MockResponse{
433-
"MetadataRequest": NewMockMetadataResponse(t).
434-
SetBroker(broker0.Addr(), broker0.BrokerID()).
435-
SetLeader("my_topic", 0, broker0.BrokerID()),
436-
"OffsetRequest": NewMockOffsetResponse(t).
437-
SetOffset("my_topic", 0, OffsetNewest, 1234).
438-
SetOffset("my_topic", 0, OffsetOldest, 0),
439-
"FetchRequest": NewMockSequence(fetchResponse1, fetchResponse2),
440-
})
442+
legacyFetchResponse := &FetchResponse{}
443+
legacyFetchResponse.AddMessage("my_topic", 0, nil, testMsg, 5)
444+
legacyFetchResponse.AddMessage("my_topic", 0, nil, testMsg, 7)
445+
legacyFetchResponse.AddMessage("my_topic", 0, nil, testMsg, 11)
446+
newFetchResponse := &FetchResponse{Version: 4}
447+
newFetchResponse.AddRecord("my_topic", 0, nil, testMsg, 5)
448+
newFetchResponse.AddRecord("my_topic", 0, nil, testMsg, 7)
449+
newFetchResponse.AddRecord("my_topic", 0, nil, testMsg, 11)
450+
newFetchResponse.SetLastStableOffset("my_topic", 0, 11)
451+
for _, fetchResponse1 := range []*FetchResponse{legacyFetchResponse, newFetchResponse} {
452+
var offsetResponseVersion int16
453+
cfg := NewConfig()
454+
if fetchResponse1.Version >= 4 {
455+
cfg.Version = V0_11_0_0
456+
offsetResponseVersion = 1
457+
}
441458

442-
master, err := NewConsumer([]string{broker0.Addr()}, nil)
443-
if err != nil {
444-
t.Fatal(err)
445-
}
459+
broker0 := NewMockBroker(t, 0)
460+
fetchResponse2 := &FetchResponse{Version: fetchResponse1.Version}
461+
fetchResponse2.AddError("my_topic", 0, ErrNoError)
462+
broker0.SetHandlerByMap(map[string]MockResponse{
463+
"MetadataRequest": NewMockMetadataResponse(t).
464+
SetBroker(broker0.Addr(), broker0.BrokerID()).
465+
SetLeader("my_topic", 0, broker0.BrokerID()),
466+
"OffsetRequest": NewMockOffsetResponse(t).
467+
SetVersion(offsetResponseVersion).
468+
SetOffset("my_topic", 0, OffsetNewest, 1234).
469+
SetOffset("my_topic", 0, OffsetOldest, 0),
470+
"FetchRequest": NewMockSequence(fetchResponse1, fetchResponse2),
471+
})
472+
473+
master, err := NewConsumer([]string{broker0.Addr()}, cfg)
474+
if err != nil {
475+
t.Fatal(err)
476+
}
446477

447-
// When
448-
consumer, err := master.ConsumePartition("my_topic", 0, 3)
449-
if err != nil {
450-
t.Fatal(err)
451-
}
478+
// When
479+
consumer, err := master.ConsumePartition("my_topic", 0, 3)
480+
if err != nil {
481+
t.Fatal(err)
482+
}
452483

453-
// Then: messages with offsets 1 and 2 are not returned even though they
454-
// are present in the response.
455-
assertMessageOffset(t, <-consumer.Messages(), 5)
456-
assertMessageOffset(t, <-consumer.Messages(), 7)
457-
assertMessageOffset(t, <-consumer.Messages(), 11)
484+
// Then: messages with offsets 1 and 2 are not returned even though they
485+
// are present in the response.
486+
assertMessageOffset(t, <-consumer.Messages(), 5)
487+
assertMessageOffset(t, <-consumer.Messages(), 7)
488+
assertMessageOffset(t, <-consumer.Messages(), 11)
458489

459-
safeClose(t, consumer)
460-
safeClose(t, master)
461-
broker0.Close()
490+
safeClose(t, consumer)
491+
safeClose(t, master)
492+
broker0.Close()
493+
}
462494
}
463495

464496
// If leadership for a partition is changing then consumer resolves the new

0 commit comments

Comments
 (0)