Skip to content

Commit

Permalink
fix bug in window buffer
Browse files Browse the repository at this point in the history
  • Loading branch information
nathanielc committed Oct 7, 2015
1 parent 10b887c commit c789f65
Show file tree
Hide file tree
Showing 8 changed files with 78 additions and 42 deletions.
6 changes: 2 additions & 4 deletions cmd/kapacitor/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -544,15 +544,13 @@ func doDelete(args []string) error {
var paramName string
switch args[0] {
case "task":
baseURL = "http://localhost:9092/task"
baseURL = "http://localhost:9092/task?"
paramName = "name"
case "recording":
baseURL = "http://localhost:9092/recording"
baseURL = "http://localhost:9092/recording?"
paramName = "rid"
}

l.Println(args)

for _, arg := range args[1:] {
v := url.Values{}
v.Add(paramName, arg)
Expand Down
14 changes: 11 additions & 3 deletions edge.go
Original file line number Diff line number Diff line change
@@ -1,14 +1,15 @@
package kapacitor

import (
"errors"
"fmt"
"log"
"os"
"sync"

"github.com/influxdb/kapacitor/wlog"
"github.com/influxdb/kapacitor/models"
"github.com/influxdb/kapacitor/pipeline"
"github.com/influxdb/kapacitor/wlog"
)

type StreamCollector interface {
Expand Down Expand Up @@ -92,7 +93,12 @@ func (e *Edge) NextMaps() *MapResult {

func (e *Edge) recover(errp *error) {
if r := recover(); r != nil {
*errp = fmt.Errorf("%s", r)
msg := fmt.Sprintf("%s", r)
if msg == "send on closed channel" {
*errp = errors.New(msg)
} else {
panic(r)
}
}
}

Expand Down Expand Up @@ -143,8 +149,10 @@ func (s *streamItr) Next() (time int64, value interface{}) {
if i >= len(s.batch) {
return -1, nil
}

p := s.batch[i]
if p == nil {
return -1, nil
}
time = p.Time.Unix()
value = p.Fields[s.field]
return
Expand Down
8 changes: 6 additions & 2 deletions node.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,10 @@ import (
"fmt"
"log"
"os"
"runtime"

"github.com/influxdb/kapacitor/wlog"
"github.com/influxdb/kapacitor/pipeline"
"github.com/influxdb/kapacitor/wlog"
)

// A node that can be in an executor.
Expand Down Expand Up @@ -65,11 +66,14 @@ func (n *node) start() {
// Handle panic in runF
r := recover()
if r != nil {
err = fmt.Errorf("%s: %s", n.Name(), r)
trace := make([]byte, 512)
n := runtime.Stack(trace, false)
err = fmt.Errorf("%s: Trace:%s", r, string(trace[:n]))
}
// Propogate error up
if err != nil {
n.closeParentEdges()
n.logger.Println("E!", err)
}
n.errCh <- err
}()
Expand Down
8 changes: 8 additions & 0 deletions services/httpd/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,14 @@ func NewHandler(requireAuthentication, loggingEnabled, writeTrace bool, statMap
"404", // Catch all 404
"POST", "/", true, true, h.serve404,
},
Route{
"404", // Catch all 404
"DELETE", "/", true, true, h.serve404,
},
Route{
"404", // Catch all 404
"HEAD", "/", true, true, h.serve404,
},
})

return h
Expand Down
10 changes: 5 additions & 5 deletions services/influxdb/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,11 @@ import (
)

type Config struct {
URLs []string
Username string
Password string
Timeout time.Duration
Precision string
URLs []string `toml:"urls"`
Username string `toml:"username"`
Password string `toml:"password"`
Timeout time.Duration `toml:"timeout"`
Precision string `toml:"precision"`
}

func NewConfig() Config {
Expand Down
20 changes: 18 additions & 2 deletions task_master.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
package kapacitor

import (
"log"
"net"
"os"

"github.com/influxdb/influxdb/client"
"github.com/influxdb/kapacitor/pipeline"
"github.com/influxdb/kapacitor/services/httpd"
"github.com/influxdb/kapacitor/wlog"
)

// An execution framework for a set of tasks.
Expand All @@ -29,6 +32,8 @@ type TaskMaster struct {

// Executing tasks
tasks map[string]*ExecutingTask

logger *log.Logger
}

// Create a new Executor with a given clock.
Expand All @@ -40,6 +45,7 @@ func NewTaskMaster() *TaskMaster {
forks: make(map[string]*Edge),
batches: make(map[string]*Edge),
tasks: make(map[string]*ExecutingTask),
logger: wlog.New(os.Stderr, "[tm] ", log.LstdFlags),
}
}

Expand Down Expand Up @@ -74,6 +80,7 @@ func (tm *TaskMaster) StartTask(t *Task) (*ExecutingTask, error) {
}

tm.tasks[et.Task.Name] = et
tm.logger.Println("I! Started task:", t.Name)

return et, nil
}
Expand All @@ -89,13 +96,17 @@ func (tm *TaskMaster) StopTask(name string) {
tm.DelFork(et.Task.Name)
}
et.stop()
tm.logger.Println("I! Stopped task:", name)
}
}

func (tm *TaskMaster) runForking() {
for p := tm.in.NextPoint(); p != nil; p = tm.in.NextPoint() {
for _, out := range tm.forks {
out.CollectPoint(p)
for name, out := range tm.forks {
err := out.CollectPoint(p)
if err != nil {
tm.StopTask(name)
}
}
}
for _, out := range tm.forks {
Expand All @@ -112,6 +123,11 @@ func (tm *TaskMaster) NewFork(name string) *Edge {
tm.forks[name] = e
return e
}

func (tm *TaskMaster) DelFork(name string) {
fork := tm.forks[name]
delete(tm.forks, name)
if fork != nil {
fork.Close()
}
}
35 changes: 13 additions & 22 deletions window.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package kapacitor

import (
"fmt"
"log"
"sync"
"time"

Expand Down Expand Up @@ -39,7 +40,7 @@ func (w *WindowNode) runWindow() error {
wnd := windows[p.Group]
if wnd == nil {
wnd = &window{
buf: &windowBuffer{},
buf: &windowBuffer{logger: w.logger},
nextEmit: p.Time.Add(w.w.Every),
period: w.w.Period,
every: w.w.Every,
Expand Down Expand Up @@ -76,10 +77,14 @@ type windowBuffer struct {
start int
stop int
size int
logger *log.Logger
}

// Insert a single point into the buffer.
func (b *windowBuffer) insert(p *models.Point) {
if p == nil {
panic("do not insert nil values")
}
b.Lock()
defer b.Unlock()
if b.size == cap(b.window) {
Expand All @@ -96,7 +101,7 @@ func (b *windowBuffer) insert(p *models.Point) {
} else {
n := 0
n += copy(w, b.window[b.start:])
n += copy(w[b.start+1:], b.window[:b.stop])
n += copy(w[b.size-b.start:], b.window[:b.stop])
if n != b.size {
panic(fmt.Sprintf("did not copy all the data: copied: %d size: %d start: %d stop: %d\n", n, b.size, b.start, b.stop))
}
Expand All @@ -119,23 +124,6 @@ func (b *windowBuffer) insert(p *models.Point) {
}
b.size++
b.stop++
//b.check()
}

func (b *windowBuffer) check() {
if b.start < b.stop {
if b.size != b.stop-b.start {
panic(fmt.Sprintf("invalid size: size: %d start: %d stop: %d len: %d\n", b.size, b.start, b.stop, len(b.window)))
}
} else if b.start > b.stop {
if b.size != len(b.window)-b.start+b.stop {
panic(fmt.Sprintf("invalid size: size: %d start: %d stop: %d len: %d\n", b.size, b.start, b.stop, len(b.window)))
}
} else {
if b.size != 0 && b.size != len(b.window) {
panic(fmt.Sprintf("invalid size: size: %d start: %d stop: %d len: %d\n", b.size, b.start, b.stop, len(b.window)))
}
}
}

// Purge expired data from the window.
Expand Down Expand Up @@ -170,15 +158,17 @@ func (b *windowBuffer) purge(oldest time.Time) {
b.size = b.stop - b.start
}
}
//b.check()
}

// Returns a copy of the current buffer.
func (b *windowBuffer) points() []*models.Point {
b.Lock()
defer b.Unlock()
buf := make([]*models.Point, b.size)
if b.start <= b.stop {
if b.size == 0 {
return buf
}
if b.stop > b.start {
for i, p := range b.window[b.start:b.stop] {
buf[i] = p
}
Expand All @@ -190,7 +180,8 @@ func (b *windowBuffer) points() []*models.Point {
j++
}
for i := 0; i < b.stop; i++ {
buf[j] = b.window[i]
p := b.window[i]
buf[j] = p
j++
}
}
Expand Down
19 changes: 15 additions & 4 deletions window_test.go
Original file line number Diff line number Diff line change
@@ -1,17 +1,21 @@
package kapacitor

import (
"log"
"os"
"testing"
"time"

"github.com/influxdb/kapacitor/models"
"github.com/stretchr/testify/assert"
)

var logger = log.New(os.Stderr, "[window] ", log.LstdFlags|log.Lshortfile)

func TestWindowBuffer(t *testing.T) {
assert := assert.New(t)

buf := &windowBuffer{}
buf := &windowBuffer{logger: logger}

size := 100

Expand Down Expand Up @@ -48,15 +52,22 @@ func TestWindowBuffer(t *testing.T) {
assert.Equal(0, buf.size)

// fill buffer again
for i := 1; i <= size; i++ {
oldest := time.Unix(int64(size), 0)
for i := 1; i <= size*2; i++ {

t := time.Unix(int64(i), 0)
t := time.Unix(int64(i+size), 0)
p := models.NewPoint("TestWindowBuffer", models.NilGroup, nil, nil, t)
buf.insert(p)

assert.Equal(i, buf.size)

points := buf.points()
assert.Equal(i, len(points))
if assert.Equal(i, len(points)) {
for i, p := range points {
if assert.NotNil(p, "i:%d", i) {
assert.True(!p.Time.Before(oldest), "Point %s is not after oldest time %s", p.Time, oldest)
}
}
}
}
}

0 comments on commit c789f65

Please sign in to comment.