Skip to content

Commit

Permalink
use logging
Browse files Browse the repository at this point in the history
  • Loading branch information
nathanielc committed Oct 5, 2015
1 parent 26f6d5c commit caeec69
Show file tree
Hide file tree
Showing 17 changed files with 169 additions and 45 deletions.
4 changes: 1 addition & 3 deletions alert.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,8 +87,6 @@ func (a *AlertNode) runAlert() error {
}
}
}
default:
fmt.Println("Unhandled EdgeType")
}
return nil
}
Expand All @@ -109,7 +107,7 @@ func (a *AlertNode) check(p *models.Point) (bool, error) {
func (a *AlertNode) handlePost(pts []*models.Point) {
b, err := json.Marshal(pts)
if err != nil {
fmt.Println("failed to marshal points json")
a.l.Println("failed to marshal points json")
return
}
buf := bytes.NewBuffer(b)
Expand Down
20 changes: 8 additions & 12 deletions batch.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package kapacitor

import (
"fmt"
"net/url"
"time"

Expand All @@ -19,7 +18,6 @@ type BatchNode struct {
}

func newBatchNode(et *ExecutingTask, n *pipeline.BatchNode) (*BatchNode, error) {
fmt.Println("new batch node ")
bn := &BatchNode{
node: node{Node: n, et: et},
b: n,
Expand All @@ -42,8 +40,6 @@ func newBatchNode(et *ExecutingTask, n *pipeline.BatchNode) (*BatchNode, error)
return nil, err
}

fmt.Println("batch node created")

return bn, nil
}

Expand All @@ -61,7 +57,7 @@ func (b *BatchNode) Query(batch BatchCollector) {
// Connect
con, err := client.NewClient(b.conf)
if err != nil {
fmt.Println(err)
b.l.Println(err)
break
}
q := client.Query{
Expand All @@ -71,23 +67,23 @@ func (b *BatchNode) Query(batch BatchCollector) {
// Execute query
resp, err := con.Query(q)
if err != nil {
fmt.Println(err)
b.l.Println(err)
return
}

if resp.Err != nil {
fmt.Println(resp.Err)
b.l.Println(resp.Err)
return
}

// Collect batches
for _, res := range resp.Results {
if res.Err != nil {
fmt.Println(res.Err)
b.l.Println(res.Err)
return
}
for _, series := range res.Series {
b := make([]*models.Point, len(series.Values))
bch := make([]*models.Point, len(series.Values))
groupID := models.TagsToGroupID(
models.SortedKeys(series.Tags),
series.Tags,
Expand All @@ -99,22 +95,22 @@ func (b *BatchNode) Query(batch BatchCollector) {
if c == "time" {
t, err = time.Parse(time.RFC3339, v[i].(string))
if err != nil {
fmt.Println(err)
b.l.Println(err)
return
}
} else {
fields[c] = v[i]
}
}
b[i] = models.NewPoint(
bch[i] = models.NewPoint(
series.Name,
groupID,
series.Tags,
fields,
t,
)
}
batch.CollectBatch(b)
batch.CollectBatch(bch)
}
}
}
Expand Down
5 changes: 4 additions & 1 deletion cmd/kapacitor/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"errors"
"flag"
"fmt"
"log"
"net/http"
"net/url"
"os"
Expand All @@ -22,6 +23,8 @@ var (
branch string
)

var l = log.New(os.Stderr, "[run] ", log.LstdFlags)

var usageStr = `
Usage: kapacitor [command] [args]
Expand Down Expand Up @@ -237,7 +240,7 @@ func doRecord(args []string) error {
if rp.Error != "" {
return errors.New(rp.Error)
}
fmt.Println(rp.ReplayID)
l.Println(rp.ReplayID)
return nil
}

Expand Down
13 changes: 13 additions & 0 deletions cmd/kapacitord/run/command.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"time"

"github.com/BurntSushi/toml"
"github.com/influxdb/kapacitor/log_writer"
)

const logo = `
Expand Down Expand Up @@ -62,6 +63,12 @@ func (cmd *Command) Run(args ...string) error {
return err
}

// Set log level
err = log_writer.SetLevel(options.LogLevel)
if err != nil {
return err
}

// Print sweet Kapacitor logo.
fmt.Print(logo)

Expand Down Expand Up @@ -148,6 +155,7 @@ func (cmd *Command) ParseFlags(args ...string) (Options, error) {
fs.StringVar(&options.Hostname, "hostname", "", "")
fs.StringVar(&options.CPUProfile, "cpuprofile", "", "")
fs.StringVar(&options.MemProfile, "memprofile", "", "")
fs.StringVar(&options.LogLevel, "loglevel", "INFO", "")
fs.Usage = func() { fmt.Fprintln(cmd.Stderr, usage) }
if err := fs.Parse(args); err != nil {
return Options{}, err
Expand Down Expand Up @@ -209,6 +217,10 @@ run starts the Kapacitor server.
-pidfile <path>
Write process ID to a file.
-loglevel <level>
Sets the log level. One of debug,info,warn,error.
Default: info
`

// Options represents the command line options that can be parsed.
Expand All @@ -218,4 +230,5 @@ type Options struct {
Hostname string
CPUProfile string
MemProfile string
LogLevel string
}
17 changes: 9 additions & 8 deletions edge.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"os"
"sync"

"github.com/influxdb/kapacitor/log_writer"
"github.com/influxdb/kapacitor/models"
"github.com/influxdb/kapacitor/pipeline"
)
Expand All @@ -32,7 +33,7 @@ type Edge struct {
}

func newEdge(name string, t pipeline.EdgeType) *Edge {
l := log.New(os.Stderr, fmt.Sprintf("[edge:%s] ", name), log.LstdFlags)
l := log_writer.New(os.Stderr, fmt.Sprintf("[edge:%s] ", name), log.LstdFlags)
switch t {
case pipeline.StreamEdge:
return &Edge{stream: make(chan *models.Point), l: l}
Expand All @@ -50,7 +51,7 @@ func (e *Edge) Close() {
return
}
e.closed = true
e.l.Printf("closing c: %d e: %d\n", e.collected, e.emitted)
e.l.Printf("I@closing c: %d e: %d\n", e.collected, e.emitted)
if e.stream != nil {
close(e.stream)
}
Expand All @@ -63,7 +64,7 @@ func (e *Edge) Close() {
}

func (e *Edge) NextPoint() *models.Point {
e.l.Printf("next point c: %d e: %d\n", e.collected, e.emitted)
e.l.Printf("D@next point c: %d e: %d\n", e.collected, e.emitted)
p := <-e.stream
if p != nil {
e.emitted++
Expand All @@ -72,7 +73,7 @@ func (e *Edge) NextPoint() *models.Point {
}

func (e *Edge) NextBatch() []*models.Point {
e.l.Printf("next batch c: %d e: %d\n", e.collected, e.emitted)
e.l.Printf("D@next batch c: %d e: %d\n", e.collected, e.emitted)
b := <-e.batch
if b != nil {
e.emitted++
Expand All @@ -81,7 +82,7 @@ func (e *Edge) NextBatch() []*models.Point {
}

func (e *Edge) NextMaps() *MapResult {
e.l.Printf("next maps c: %d e: %d\n", e.collected, e.emitted)
e.l.Printf("D@next maps c: %d e: %d\n", e.collected, e.emitted)
m := <-e.reduce
if m != nil {
e.emitted++
Expand All @@ -97,23 +98,23 @@ func (e *Edge) recover(errp *error) {

func (e *Edge) CollectPoint(p *models.Point) (err error) {
defer e.recover(&err)
e.l.Printf("collect point c: %d e: %d\n", e.collected, e.emitted)
e.l.Printf("D@collect point c: %d e: %d\n", e.collected, e.emitted)
e.collected++
e.stream <- p
return
}

func (e *Edge) CollectBatch(b []*models.Point) (err error) {
defer e.recover(&err)
e.l.Printf("collect batch c: %d e: %d\n", e.collected, e.emitted)
e.l.Printf("D@collect batch c: %d e: %d\n", e.collected, e.emitted)
e.collected++
e.batch <- b
return
}

func (e *Edge) CollectMaps(m *MapResult) (err error) {
defer e.recover(&err)
e.l.Printf("collect maps c: %d e: %d\n", e.collected, e.emitted)
e.l.Printf("D@collect maps c: %d e: %d\n", e.collected, e.emitted)
e.collected++
e.reduce <- m
return
Expand Down
3 changes: 1 addition & 2 deletions integrations/batcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package integrations

import (
"fmt"
"log"
"net/http"
"os"
"path"
Expand Down Expand Up @@ -103,8 +104,6 @@ batch
}
}
}
} else {
fmt.Println(result.Window)
}
}

Expand Down
7 changes: 2 additions & 5 deletions integrations/streamer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"bytes"
"fmt"
"io/ioutil"
"log"
"net/http"
"net/http/httptest"
"os"
Expand Down Expand Up @@ -290,8 +291,6 @@ stream
}
}
}
} else {
fmt.Println(result.Window)
}
}

Expand Down Expand Up @@ -393,8 +392,6 @@ errorCounts.join(viewCounts)
}
}
}
} else {
fmt.Println(result.Window)
}

}
Expand Down Expand Up @@ -648,7 +645,7 @@ stream
}

for _, tc := range testCases {
fmt.Println("Method:", tc.Method)
log.Println("Method:", tc.Method)
var script bytes.Buffer
tmpl.Execute(&script, tc)
clock, et, errCh, tm := testStreamer(
Expand Down
Loading

0 comments on commit caeec69

Please sign in to comment.