From 4dfed115167bb2ed35d8428e4b6c7a0d2e59020f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Johannes=20Br=C3=BCderl?= Date: Mon, 8 Oct 2018 22:59:01 +0200 Subject: [PATCH] add offset flag; move ConsumePartition into goroutine apparently ConsumePartition itself takes a little time, so it should be executed in the goroutine as well to avoid delaying the startup of the goroutines. --- cmd/kaf/consume.go | 38 +++++++++++++++++++++++++++----------- 1 file changed, 27 insertions(+), 11 deletions(-) diff --git a/cmd/kaf/consume.go b/cmd/kaf/consume.go index 03df3e5c..f8679ea8 100644 --- a/cmd/kaf/consume.go +++ b/cmd/kaf/consume.go @@ -1,19 +1,18 @@ package main import ( - "bytes" "fmt" - "os" "sync" - "io" - "github.com/birdayz/sarama" "github.com/spf13/cobra" ) +var offsetFlag string + func init() { rootCmd.AddCommand(consumeCmd) + consumeCmd.Flags().StringVar(&offsetFlag, "offset", "oldest", "Offset to start consuming. Possible values: oldest, newest. Default: newest") } var consumeCmd = &cobra.Command{ @@ -21,6 +20,21 @@ var consumeCmd = &cobra.Command{ Short: "Consume messages", Args: cobra.ExactArgs(1), Run: func(cmd *cobra.Command, args []string) { + + var offset int64 + switch offsetFlag { + case "oldest": + offset = sarama.OffsetOldest + case "newest": + offset = sarama.OffsetNewest + default: + // TODO: normally we would parse this to int64 but it's + // difficult as we can have multiple partitions. need to + // find a way to give offsets from CLI with a good + // syntax. + offset = sarama.OffsetNewest + } + topic := args[0] client, err := getClient() if err != nil { @@ -38,20 +52,22 @@ var consumeCmd = &cobra.Command{ } wg := sync.WaitGroup{} for _, partition := range partitions { - pc, err := consumer.ConsumePartition(topic, partition, sarama.OffsetNewest) - if err != nil { - panic(err) - } wg.Add(1) - go func() { + go func(partition int32) { + pc, err := consumer.ConsumePartition(topic, partition, offset) + if err != nil { + panic(err) + } + for msg := range pc.Messages() { - io.Copy(os.Stdout, bytes.NewReader(msg.Value)) + // io.Copy(os.Stdout, bytes.NewReader(msg.Value)) + fmt.Println(string(msg.Value)) fmt.Println() } wg.Done() - }() + }(partition) } wg.Wait()