Skip to content

Commit

Permalink
consume: add group flag to consume via consumer group
Browse files Browse the repository at this point in the history
  • Loading branch information
birdayz committed Apr 24, 2020
1 parent 92d0622 commit 8022ae5
Showing 1 changed file with 167 additions and 114 deletions.
281 changes: 167 additions & 114 deletions cmd/kaf/consume.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package main

import (
"bytes"
"context"
"encoding/binary"
"fmt"
"os"
Expand All @@ -20,11 +21,13 @@ import (
)

var (
offsetFlag string
raw bool
follow bool
schemaCache *avro.SchemaCache
keyfmt *prettyjson.Formatter
offsetFlag string
groupFlag string
groupCommitFlag bool
raw bool
follow bool
schemaCache *avro.SchemaCache
keyfmt *prettyjson.Formatter

protoType string
keyProtoType string
Expand All @@ -44,6 +47,8 @@ func init() {
consumeCmd.Flags().StringVar(&protoType, "proto-type", "", "Fully qualified name of the proto message type. Example: com.test.SampleMessage")
consumeCmd.Flags().StringVar(&keyProtoType, "key-proto-type", "", "Fully qualified name of the proto key type. Example: com.test.SampleMessage")
consumeCmd.Flags().Int32SliceVarP(&flagPartitions, "partitions", "p", []int32{}, "Partitions to consume from")
consumeCmd.Flags().StringVarP(&groupFlag, "group", "g", "", "Consumer Group to use for consume")
consumeCmd.Flags().BoolVar(&groupCommitFlag, "commit", false, "Commit Group offset after receiving messages. Works only if consuming as Consumer Group")

keyfmt = prettyjson.NewFormatter()
keyfmt.Newline = " " // Replace newline with space to avoid condensed output.
Expand Down Expand Up @@ -97,132 +102,180 @@ var consumeCmd = &cobra.Command{
topic := args[0]
client := getClient()

consumer, err := sarama.NewConsumerFromClient(client)
if groupFlag != "" {
withConsumerGroup(client, topic, groupFlag)
} else {
withoutConsumerGroup(client, topic, offset)
}

},
}

type g struct{}

func (g *g) Setup(s sarama.ConsumerGroupSession) error {
return nil
}

func (g *g) Cleanup(s sarama.ConsumerGroupSession) error {
return nil
}

func (g *g) ConsumeClaim(s sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {

mu := sync.Mutex{} // Synchronizes stderr and stdout.
for msg := range claim.Messages() {
handleMessage(msg, &mu)
if groupCommitFlag {
s.MarkMessage(msg, "")
}
}
return nil
}

func withConsumerGroup(client sarama.Client, topic, group string) {
cg, err := sarama.NewConsumerGroupFromClient(group, client)
if err != nil {
errorExit("Failed to create consumer group: %v", err)
}

err = cg.Consume(context.Background(), []string{topic}, &g{})
if err != nil {
errorExit("Error on consume: %v", err)
}
}

func withoutConsumerGroup(client sarama.Client, topic string, offset int64) {
consumer, err := sarama.NewConsumerFromClient(client)
if err != nil {
errorExit("Unable to create consumer from client: %v\n", err)
}

var partitions []int32
if len(flagPartitions) == 0 {
partitions, err = consumer.Partitions(topic)
if err != nil {
errorExit("Unable to create consumer from client: %v\n", err)
errorExit("Unable to get partitions: %v\n", err)
}
} else {
partitions = flagPartitions
}

schemaCache = getSchemaCache()

wg := sync.WaitGroup{}
mu := sync.Mutex{} // Synchronizes stderr and stdout.
for _, partition := range partitions {

wg.Add(1)

var partitions []int32
if len(flagPartitions) == 0 {
partitions, err = consumer.Partitions(topic)
go func(partition int32, offset int64) {
req := &sarama.OffsetRequest{
Version: int16(1),
}
req.AddBlock(topic, partition, int64(-1), int32(0))
ldr, err := client.Leader(topic, partition)
if err != nil {
errorExit("Unable to get partitions: %v\n", err)
errorExit("Unable to get leader: %v\n", err)
}
} else {
partitions = flagPartitions

offsets, err := getAvailableOffsetsRetry(ldr, req, offsetsRetry)
if err != nil {
errorExit("Unable to get available offsets: %v\n", err)
}
followOffset := offsets.GetBlock(topic, partition).Offset - 1

if follow && followOffset > 0 {
offset = followOffset
fmt.Fprintf(os.Stderr, "Starting on partition %v with offset %v\n", partition, offset)
}

pc, err := consumer.ConsumePartition(topic, partition, offset)
if err != nil {
errorExit("Unable to consume partition: %v %v %v %v\n", topic, partition, offset, err)
}

for msg := range pc.Messages() {
handleMessage(msg, &mu)
}
wg.Done()
}(partition, offset)
}
wg.Wait()
}

func handleMessage(msg *sarama.ConsumerMessage, mu *sync.Mutex) {
var stderr bytes.Buffer

var dataToDisplay []byte
var keyToDisplay []byte
var err error

if protoType != "" {
dataToDisplay, err = protoDecode(reg, msg.Value, protoType)
if err != nil {
fmt.Fprintf(&stderr, "failed to decode proto. falling back to binary outputla. Error: %v\n", err)
}

keyToDisplay, err = protoDecode(reg, msg.Key, keyProtoType)
if err != nil {
fmt.Fprintf(&stderr, "failed to decode proto key. falling back to binary outputla. Error: %v\n", err)
}
} else {
dataToDisplay, err = avroDecode(msg.Value)
if err != nil {
fmt.Fprintf(&stderr, "could not decode Avro data: %v\n", err)
}

schemaCache = getSchemaCache()
keyToDisplay, err = avroDecode(msg.Key)
if err != nil {
fmt.Fprintf(&stderr, "could not decode Avro data: %v\n", err)
}
}

wg := sync.WaitGroup{}
mu := sync.Mutex{} // Synchronizes stderr and stdout.
for _, partition := range partitions {
if !raw {
formatted, err := prettyjson.Format(dataToDisplay)
if err == nil {
dataToDisplay = formatted
}

wg.Add(1)
w := tabwriter.NewWriter(&stderr, tabwriterMinWidth, tabwriterWidth, tabwriterPadding, tabwriterPadChar, tabwriterFlags)

go func(partition int32, offset int64) {
req := &sarama.OffsetRequest{
Version: int16(1),
}
req.AddBlock(topic, partition, int64(-1), int32(0))
ldr, err := client.Leader(topic, partition)
if err != nil {
errorExit("Unable to get leader: %v\n", err)
}
if len(msg.Headers) > 0 {
fmt.Fprintf(w, "Headers:\n")
}

offsets, err := getAvailableOffsetsRetry(ldr, req, offsetsRetry)
if err != nil {
errorExit("Unable to get available offsets: %v\n", err)
for _, hdr := range msg.Headers {
var hdrValue string
// Try to detect azure eventhub-specific encoding
if len(hdr.Value) > 0 {
switch hdr.Value[0] {
case 161:
hdrValue = string(hdr.Value[2 : 2+hdr.Value[1]])
case 131:
hdrValue = strconv.FormatUint(binary.BigEndian.Uint64(hdr.Value[1:9]), 10)
default:
hdrValue = string(hdr.Value)
}
followOffset := offsets.GetBlock(topic, partition).Offset - 1
}

if follow && followOffset > 0 {
offset = followOffset
fmt.Fprintf(os.Stderr, "Starting on partition %v with offset %v\n", partition, offset)
}
fmt.Fprintf(w, "\tKey: %v\tValue: %v\n", string(hdr.Key), hdrValue)

pc, err := consumer.ConsumePartition(topic, partition, offset)
if err != nil {
errorExit("Unable to consume partition: %v %v %v %v\n", topic, partition, offset, err)
}
}

for msg := range pc.Messages() {
var stderr bytes.Buffer

var dataToDisplay []byte
var keyToDisplay []byte

if protoType != "" {
dataToDisplay, err = protoDecode(reg, msg.Value, protoType)
if err != nil {
fmt.Fprintf(&stderr, "failed to decode proto. falling back to binary outputla. Error: %v\n", err)
}

keyToDisplay, err = protoDecode(reg, msg.Key, keyProtoType)
if err != nil {
fmt.Fprintf(&stderr, "failed to decode proto key. falling back to binary outputla. Error: %v\n", err)
}
} else {
dataToDisplay, err = avroDecode(msg.Value)
if err != nil {
fmt.Fprintf(&stderr, "could not decode Avro data: %v\n", err)
}

keyToDisplay, err = avroDecode(msg.Key)
if err != nil {
fmt.Fprintf(&stderr, "could not decode Avro data: %v\n", err)
}
}

if !raw {
formatted, err := prettyjson.Format(dataToDisplay)
if err == nil {
dataToDisplay = formatted
}

w := tabwriter.NewWriter(&stderr, tabwriterMinWidth, tabwriterWidth, tabwriterPadding, tabwriterPadChar, tabwriterFlags)

if len(msg.Headers) > 0 {
fmt.Fprintf(w, "Headers:\n")
}

for _, hdr := range msg.Headers {
var hdrValue string
// Try to detect azure eventhub-specific encoding
if len(hdr.Value) > 0 {
switch hdr.Value[0] {
case 161:
hdrValue = string(hdr.Value[2 : 2+hdr.Value[1]])
case 131:
hdrValue = strconv.FormatUint(binary.BigEndian.Uint64(hdr.Value[1:9]), 10)
default:
hdrValue = string(hdr.Value)
}
}

fmt.Fprintf(w, "\tKey: %v\tValue: %v\n", string(hdr.Key), hdrValue)

}

if msg.Key != nil && len(msg.Key) > 0 {
fmt.Fprintf(w, "Key:\t%v\n", formatKey(keyToDisplay))
}
fmt.Fprintf(w, "Partition:\t%v\nOffset:\t%v\nTimestamp:\t%v\n", msg.Partition, msg.Offset, msg.Timestamp)
w.Flush()
}

mu.Lock()
stderr.WriteTo(os.Stderr)
_, _ = colorable.NewColorableStdout().Write(dataToDisplay)
fmt.Print("\n")
mu.Unlock()
}
wg.Done()
}(partition, offset)
if msg.Key != nil && len(msg.Key) > 0 {
fmt.Fprintf(w, "Key:\t%v\n", formatKey(keyToDisplay))
}
wg.Wait()
fmt.Fprintf(w, "Partition:\t%v\nOffset:\t%v\nTimestamp:\t%v\n", msg.Partition, msg.Offset, msg.Timestamp)
w.Flush()
}

mu.Lock()
stderr.WriteTo(os.Stderr)
_, _ = colorable.NewColorableStdout().Write(dataToDisplay)
fmt.Print("\n")
mu.Unlock()

},
}

// proto to JSON
Expand Down

0 comments on commit 8022ae5

Please sign in to comment.