Skip to content

Commit

Permalink
fix hang when deleteing invalid batch task
Browse files Browse the repository at this point in the history
  • Loading branch information
nathanielc committed Feb 4, 2016
1 parent bdd2f59 commit 8a07363
Show file tree
Hide file tree
Showing 7 changed files with 92 additions and 14 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ There was a breaking change to the `define` command, see [#173](https://github.c
- [#185](https://github.com/influxdata/kapacitor/issues/185): Fix panic in define command with invalid dbrp value.
- [#195](https://github.com/influxdata/kapacitor/issues/195): Fix panic in where node.
- [#208](https://github.com/influxdata/kapacitor/issues/208): Add default stats dbrp to default subscription excludes.
- [#203](https://github.com/influxdata/kapacitor/issues/203): Fix hang when deleteing invalid batch task.

## v0.10.0 [2016-01-26]

Expand Down
33 changes: 28 additions & 5 deletions batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,12 @@ func (s *SourceBatchNode) Start() {
}
}

func (s *SourceBatchNode) Abort() {
for _, b := range s.children {
b.(*BatchNode).Abort()
}
}

func (s *SourceBatchNode) Queries(start, stop time.Time) [][]string {
queries := make([][]string, len(s.children))
for i, b := range s.children {
Expand All @@ -101,13 +107,15 @@ type BatchNode struct {
queryMu sync.Mutex
queryErr chan error
closing chan struct{}
aborting chan struct{}
}

func newBatchNode(et *ExecutingTask, n *pipeline.BatchNode, l *log.Logger) (*BatchNode, error) {
bn := &BatchNode{
node: node{Node: n, et: et, logger: l},
b: n,
closing: make(chan struct{}),
node: node{Node: n, et: et, logger: l},
b: n,
closing: make(chan struct{}),
aborting: make(chan struct{}),
}
bn.node.runF = bn.runBatch
bn.node.stopF = bn.stopBatch
Expand Down Expand Up @@ -191,6 +199,10 @@ func (b *BatchNode) Start() {
}()
}

func (b *BatchNode) Abort() {
close(b.aborting)
}

func (b *BatchNode) Queries(start, stop time.Time) []string {
now := time.Now()
if stop.IsZero() {
Expand Down Expand Up @@ -229,6 +241,8 @@ func (b *BatchNode) doQuery() error {
select {
case <-b.closing:
return nil
case <-b.aborting:
return errors.New("batch doQuery aborted")
case now := <-tickC:

// Update times for query
Expand Down Expand Up @@ -295,12 +309,21 @@ func (b *BatchNode) runBatch([]byte) error {
b.queryMu.Lock()
if b.queryErr != nil {
b.queryMu.Unlock()
queryErr = <-b.queryErr
select {
case queryErr = <-b.queryErr:
case <-b.aborting:
queryErr = errors.New("batch queryErr aborted")
}
} else {
b.queryMu.Unlock()
}

err := <-errC
var err error
select {
case err = <-errC:
case <-b.aborting:
err = errors.New("batch run aborted")
}
if queryErr != nil {
return queryErr
}
Expand Down
2 changes: 1 addition & 1 deletion cmd/kapacitor/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -495,7 +495,7 @@ func doDefine(args []string) error {

// Reload task if enabled.
if ti.Enabled {
doReload([]string{*dname})
return doReload([]string{*dname})
}
}

Expand Down
17 changes: 13 additions & 4 deletions cmd/kapacitord/run/server_helper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,14 +126,23 @@ func (s *Server) HTTPPost(url string, content []byte) (results string, err error
if err != nil {
return "", err
}
body := string(MustReadAll(resp.Body))
body := MustReadAll(resp.Body)
type response struct {
Error string `json:"Error"`
}
d := json.NewDecoder(bytes.NewReader(body))
rp := response{}
d.Decode(&rp)
if rp.Error != "" {
return "", errors.New(rp.Error)
}
switch resp.StatusCode {
case http.StatusBadRequest:
return "", fmt.Errorf("unexpected status code: code=%d, body=%s", resp.StatusCode, body)
return "", fmt.Errorf("unexpected status code: code=%d, body=%s", resp.StatusCode, string(body))
case http.StatusOK, http.StatusNoContent:
return body, nil
return string(body), nil
default:
return "", fmt.Errorf("unexpected status code: code=%d, body=%s", resp.StatusCode, body)
return "", fmt.Errorf("unexpected status code: code=%d, body=%s", resp.StatusCode, string(body))
}
}

Expand Down
45 changes: 45 additions & 0 deletions cmd/kapacitord/run/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -431,6 +431,51 @@ batch
}
}

func TestServer_InvalidBatchTask(t *testing.T) {
c := NewConfig()
c.InfluxDB.Enabled = true
db := NewInfluxDB(func(q string) *client.Response {
return nil
})
c.InfluxDB.URLs = []string{db.URL()}
s := OpenServer(c)
defer s.Close()

name := "testInvalidBatchTask"
ttype := "batch"
dbrps := []kapacitor.DBRP{{
Database: "mydb",
RetentionPolicy: "myrp",
}}
tick := `
batch
.query(' SELECT value from unknowndb.unknownrp.cpu ')
.period(5ms)
.every(5ms)
.mapReduce(influxql.count('value'))
.httpOut('count')
`

r, err := s.DefineTask(name, ttype, tick, dbrps)
if err != nil {
t.Fatal(err)
}
if r != "" {
t.Fatal("unexpected result", r)
}

r, err = s.EnableTask(name)
expErr := `batch query is not allowed to request data from "unknowndb"."unknownrp"`
if err != nil && err.Error() != expErr {
t.Fatalf("unexpected err: got %v exp %s", err, expErr)
}

err = s.DeleteTask(name)
if err != nil {
t.Fatal(err)
}
}

func TestServer_RecordReplayStream(t *testing.T) {
s := OpenDefaultServer()
defer s.Close()
Expand Down
7 changes: 3 additions & 4 deletions services/task_store/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,21 +173,19 @@ func (ts *Service) Open() error {

// Start each enabled task
for _, name := range enabledTasks {
ts.logger.Println("D! enabling task on startup", name)
ts.logger.Println("D! starting enabled task on startup", name)
t, err := ts.Load(name)
ts.logger.Println("D! loaded", name)
if err != nil {
ts.logger.Printf("E! error loading enabled task %s, err: %s\n", name, err)
return nil
}
err = ts.StartTask(t)
ts.logger.Println("D! started", name)
if err != nil {
ts.logger.Printf("E! error starting enabled task %s, err: %s\n", name, err)
} else {
ts.logger.Println("D! started task during startup", name)
numEnabledTasks++
}
ts.logger.Println("D! enabled task on startup", name)
}

// Set expvars
Expand Down Expand Up @@ -617,6 +615,7 @@ func (ts *Service) StartTask(t *kapacitor.Task) error {
err := et.StartBatching()
if err != nil {
ts.SaveLastError(t.Name, err.Error())
ts.TaskMaster.StopTask(t.Name)
return err
}
}
Expand Down
1 change: 1 addition & 0 deletions task.go
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,7 @@ func (et *ExecutingTask) StartBatching() error {

err := et.checkDBRPs(batcher)
if err != nil {
batcher.Abort()
return err
}

Expand Down

0 comments on commit 8a07363

Please sign in to comment.