Skip to content
This repository has been archived by the owner on May 13, 2019. It is now read-only.

Commit

Permalink
Update gitignore handling
Browse files Browse the repository at this point in the history
  • Loading branch information
wvanbergen committed Mar 13, 2015
1 parent 518443a commit debb4c7
Show file tree
Hide file tree
Showing 5 changed files with 78 additions and 1 deletion.
1 change: 0 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
@@ -1,2 +1 @@
kafka
stressproducer
1 change: 1 addition & 0 deletions tools/consoleconsumer/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
consoleconsumer
76 changes: 76 additions & 0 deletions tools/consoleconsumer/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
package main

import (
"flag"
"fmt"
"log"
"os"
"os/signal"
"strconv"
"strings"

"github.com/Shopify/sarama"
)

var (
brokerList = flag.String("brokers", "localhost:9092", "The comma separated list of brokers in the Kafka cluster")
topic = flag.String("topic", "", "The topic to consume")
partition = flag.Int("partition", 0, "The partition to consume")
offset = flag.String("offset", "oldest", "The offset to start with. Can be `oldest`, `newest`, or an actual offset")
verbose = flag.Bool("verbose", false, "Whether to turn on sarama logging")

logger = log.New(os.Stderr, "", log.LstdFlags)
)

func main() {
flag.Parse()

if *verbose {
sarama.Logger = logger
}

var (
initialOffset int64
offsetError error
)
switch *offset {
case "oldest":
initialOffset = sarama.OffsetOldest
case "newest":
initialOffset = sarama.OffsetNewest
default:
initialOffset, offsetError = strconv.ParseInt(*offset, 10, 64)
}

if offsetError != nil {
logger.Fatalln("Invalid initial offset:", *offset)
}

c, err := sarama.NewConsumer(strings.Split(*brokerList, ","), nil)
if err != nil {
logger.Fatalln(err)
}

pc, err := c.ConsumePartition(*topic, int32(*partition), initialOffset)
if err != nil {
logger.Fatalln(err)
}

go func() {
signals := make(chan os.Signal, 1)
signal.Notify(signals, os.Kill, os.Interrupt)
<-signals
pc.AsyncClose()
}()

for msg := range pc.Messages() {
fmt.Printf("Offset: %d\n", msg.Offset)
fmt.Printf("Key: %s\n", string(msg.Key))
fmt.Printf("Value: %s\n", string(msg.Value))
fmt.Println()
}

if err := c.Close(); err != nil {
fmt.Println("Failed to close consumer: ", err)
}
}
1 change: 1 addition & 0 deletions tools/stressproducer/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
stressproducer
File renamed without changes.

0 comments on commit debb4c7

Please sign in to comment.