diff --git a/cmd/kaf/consume.go b/cmd/kaf/consume.go index 11ccc80a..d1256d67 100644 --- a/cmd/kaf/consume.go +++ b/cmd/kaf/consume.go @@ -2,6 +2,7 @@ package main import ( "bytes" + "context" "encoding/binary" "fmt" "os" @@ -20,11 +21,13 @@ import ( ) var ( - offsetFlag string - raw bool - follow bool - schemaCache *avro.SchemaCache - keyfmt *prettyjson.Formatter + offsetFlag string + groupFlag string + groupCommitFlag bool + raw bool + follow bool + schemaCache *avro.SchemaCache + keyfmt *prettyjson.Formatter protoType string keyProtoType string @@ -44,6 +47,8 @@ func init() { consumeCmd.Flags().StringVar(&protoType, "proto-type", "", "Fully qualified name of the proto message type. Example: com.test.SampleMessage") consumeCmd.Flags().StringVar(&keyProtoType, "key-proto-type", "", "Fully qualified name of the proto key type. Example: com.test.SampleMessage") consumeCmd.Flags().Int32SliceVarP(&flagPartitions, "partitions", "p", []int32{}, "Partitions to consume from") + consumeCmd.Flags().StringVarP(&groupFlag, "group", "g", "", "Consumer Group to use for consume") + consumeCmd.Flags().BoolVar(&groupCommitFlag, "commit", false, "Commit Group offset after receiving messages. Works only if consuming as Consumer Group") keyfmt = prettyjson.NewFormatter() keyfmt.Newline = " " // Replace newline with space to avoid condensed output. @@ -97,132 +102,180 @@ var consumeCmd = &cobra.Command{ topic := args[0] client := getClient() - consumer, err := sarama.NewConsumerFromClient(client) + if groupFlag != "" { + withConsumerGroup(client, topic, groupFlag) + } else { + withoutConsumerGroup(client, topic, offset) + } + + }, +} + +type g struct{} + +func (g *g) Setup(s sarama.ConsumerGroupSession) error { + return nil +} + +func (g *g) Cleanup(s sarama.ConsumerGroupSession) error { + return nil +} + +func (g *g) ConsumeClaim(s sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error { + + mu := sync.Mutex{} // Synchronizes stderr and stdout. + for msg := range claim.Messages() { + handleMessage(msg, &mu) + if groupCommitFlag { + s.MarkMessage(msg, "") + } + } + return nil +} + +func withConsumerGroup(client sarama.Client, topic, group string) { + cg, err := sarama.NewConsumerGroupFromClient(group, client) + if err != nil { + errorExit("Failed to create consumer group: %v", err) + } + + err = cg.Consume(context.Background(), []string{topic}, &g{}) + if err != nil { + errorExit("Error on consume: %v", err) + } +} + +func withoutConsumerGroup(client sarama.Client, topic string, offset int64) { + consumer, err := sarama.NewConsumerFromClient(client) + if err != nil { + errorExit("Unable to create consumer from client: %v\n", err) + } + + var partitions []int32 + if len(flagPartitions) == 0 { + partitions, err = consumer.Partitions(topic) if err != nil { - errorExit("Unable to create consumer from client: %v\n", err) + errorExit("Unable to get partitions: %v\n", err) } + } else { + partitions = flagPartitions + } + + schemaCache = getSchemaCache() + + wg := sync.WaitGroup{} + mu := sync.Mutex{} // Synchronizes stderr and stdout. + for _, partition := range partitions { + + wg.Add(1) - var partitions []int32 - if len(flagPartitions) == 0 { - partitions, err = consumer.Partitions(topic) + go func(partition int32, offset int64) { + req := &sarama.OffsetRequest{ + Version: int16(1), + } + req.AddBlock(topic, partition, int64(-1), int32(0)) + ldr, err := client.Leader(topic, partition) if err != nil { - errorExit("Unable to get partitions: %v\n", err) + errorExit("Unable to get leader: %v\n", err) } - } else { - partitions = flagPartitions + + offsets, err := getAvailableOffsetsRetry(ldr, req, offsetsRetry) + if err != nil { + errorExit("Unable to get available offsets: %v\n", err) + } + followOffset := offsets.GetBlock(topic, partition).Offset - 1 + + if follow && followOffset > 0 { + offset = followOffset + fmt.Fprintf(os.Stderr, "Starting on partition %v with offset %v\n", partition, offset) + } + + pc, err := consumer.ConsumePartition(topic, partition, offset) + if err != nil { + errorExit("Unable to consume partition: %v %v %v %v\n", topic, partition, offset, err) + } + + for msg := range pc.Messages() { + handleMessage(msg, &mu) + } + wg.Done() + }(partition, offset) + } + wg.Wait() +} + +func handleMessage(msg *sarama.ConsumerMessage, mu *sync.Mutex) { + var stderr bytes.Buffer + + var dataToDisplay []byte + var keyToDisplay []byte + var err error + + if protoType != "" { + dataToDisplay, err = protoDecode(reg, msg.Value, protoType) + if err != nil { + fmt.Fprintf(&stderr, "failed to decode proto. falling back to binary outputla. Error: %v\n", err) + } + + keyToDisplay, err = protoDecode(reg, msg.Key, keyProtoType) + if err != nil { + fmt.Fprintf(&stderr, "failed to decode proto key. falling back to binary outputla. Error: %v\n", err) + } + } else { + dataToDisplay, err = avroDecode(msg.Value) + if err != nil { + fmt.Fprintf(&stderr, "could not decode Avro data: %v\n", err) } - schemaCache = getSchemaCache() + keyToDisplay, err = avroDecode(msg.Key) + if err != nil { + fmt.Fprintf(&stderr, "could not decode Avro data: %v\n", err) + } + } - wg := sync.WaitGroup{} - mu := sync.Mutex{} // Synchronizes stderr and stdout. - for _, partition := range partitions { + if !raw { + formatted, err := prettyjson.Format(dataToDisplay) + if err == nil { + dataToDisplay = formatted + } - wg.Add(1) + w := tabwriter.NewWriter(&stderr, tabwriterMinWidth, tabwriterWidth, tabwriterPadding, tabwriterPadChar, tabwriterFlags) - go func(partition int32, offset int64) { - req := &sarama.OffsetRequest{ - Version: int16(1), - } - req.AddBlock(topic, partition, int64(-1), int32(0)) - ldr, err := client.Leader(topic, partition) - if err != nil { - errorExit("Unable to get leader: %v\n", err) - } + if len(msg.Headers) > 0 { + fmt.Fprintf(w, "Headers:\n") + } - offsets, err := getAvailableOffsetsRetry(ldr, req, offsetsRetry) - if err != nil { - errorExit("Unable to get available offsets: %v\n", err) + for _, hdr := range msg.Headers { + var hdrValue string + // Try to detect azure eventhub-specific encoding + if len(hdr.Value) > 0 { + switch hdr.Value[0] { + case 161: + hdrValue = string(hdr.Value[2 : 2+hdr.Value[1]]) + case 131: + hdrValue = strconv.FormatUint(binary.BigEndian.Uint64(hdr.Value[1:9]), 10) + default: + hdrValue = string(hdr.Value) } - followOffset := offsets.GetBlock(topic, partition).Offset - 1 + } - if follow && followOffset > 0 { - offset = followOffset - fmt.Fprintf(os.Stderr, "Starting on partition %v with offset %v\n", partition, offset) - } + fmt.Fprintf(w, "\tKey: %v\tValue: %v\n", string(hdr.Key), hdrValue) - pc, err := consumer.ConsumePartition(topic, partition, offset) - if err != nil { - errorExit("Unable to consume partition: %v %v %v %v\n", topic, partition, offset, err) - } + } - for msg := range pc.Messages() { - var stderr bytes.Buffer - - var dataToDisplay []byte - var keyToDisplay []byte - - if protoType != "" { - dataToDisplay, err = protoDecode(reg, msg.Value, protoType) - if err != nil { - fmt.Fprintf(&stderr, "failed to decode proto. falling back to binary outputla. Error: %v\n", err) - } - - keyToDisplay, err = protoDecode(reg, msg.Key, keyProtoType) - if err != nil { - fmt.Fprintf(&stderr, "failed to decode proto key. falling back to binary outputla. Error: %v\n", err) - } - } else { - dataToDisplay, err = avroDecode(msg.Value) - if err != nil { - fmt.Fprintf(&stderr, "could not decode Avro data: %v\n", err) - } - - keyToDisplay, err = avroDecode(msg.Key) - if err != nil { - fmt.Fprintf(&stderr, "could not decode Avro data: %v\n", err) - } - } - - if !raw { - formatted, err := prettyjson.Format(dataToDisplay) - if err == nil { - dataToDisplay = formatted - } - - w := tabwriter.NewWriter(&stderr, tabwriterMinWidth, tabwriterWidth, tabwriterPadding, tabwriterPadChar, tabwriterFlags) - - if len(msg.Headers) > 0 { - fmt.Fprintf(w, "Headers:\n") - } - - for _, hdr := range msg.Headers { - var hdrValue string - // Try to detect azure eventhub-specific encoding - if len(hdr.Value) > 0 { - switch hdr.Value[0] { - case 161: - hdrValue = string(hdr.Value[2 : 2+hdr.Value[1]]) - case 131: - hdrValue = strconv.FormatUint(binary.BigEndian.Uint64(hdr.Value[1:9]), 10) - default: - hdrValue = string(hdr.Value) - } - } - - fmt.Fprintf(w, "\tKey: %v\tValue: %v\n", string(hdr.Key), hdrValue) - - } - - if msg.Key != nil && len(msg.Key) > 0 { - fmt.Fprintf(w, "Key:\t%v\n", formatKey(keyToDisplay)) - } - fmt.Fprintf(w, "Partition:\t%v\nOffset:\t%v\nTimestamp:\t%v\n", msg.Partition, msg.Offset, msg.Timestamp) - w.Flush() - } - - mu.Lock() - stderr.WriteTo(os.Stderr) - _, _ = colorable.NewColorableStdout().Write(dataToDisplay) - fmt.Print("\n") - mu.Unlock() - } - wg.Done() - }(partition, offset) + if msg.Key != nil && len(msg.Key) > 0 { + fmt.Fprintf(w, "Key:\t%v\n", formatKey(keyToDisplay)) } - wg.Wait() + fmt.Fprintf(w, "Partition:\t%v\nOffset:\t%v\nTimestamp:\t%v\n", msg.Partition, msg.Offset, msg.Timestamp) + w.Flush() + } + + mu.Lock() + stderr.WriteTo(os.Stderr) + _, _ = colorable.NewColorableStdout().Write(dataToDisplay) + fmt.Print("\n") + mu.Unlock() - }, } // proto to JSON