Skip to content

Commit

Permalink
Merge pull request influxdata#391 from influxdata/nc-issue#390
Browse files Browse the repository at this point in the history
Remove old mapReduce code
  • Loading branch information
Nathaniel Cook committed Mar 29, 2016
2 parents 3ee456a + e398695 commit cb2b95a
Show file tree
Hide file tree
Showing 25 changed files with 19 additions and 13,012 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ format a TICKscript file according to a common standard.

- [#299](https://github.com/influxdata/kapacitor/issues/299): Changes TICKscript chaining method operators and adds `tickfmt` binary.
- [#389](https://github.com/influxdata/kapacitor/pull/389): Adds benchmarks to Kapacitor for basic use cases.
- [#390](https://github.com/influxdata/kapacitor/issues/390): BREAKING: Remove old `.mapReduce` functions.


### Bugfixes
Expand Down
27 changes: 0 additions & 27 deletions edge.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ type BatchCollector interface {
type Edge struct {
stream chan models.Point
batch chan models.Batch
reduce chan *MapResult

logger *log.Logger
aborted chan struct{}
Expand Down Expand Up @@ -72,8 +71,6 @@ func newEdge(taskName, parentName, childName string, t pipeline.EdgeType, size i
e.stream = make(chan models.Point, size)
case pipeline.BatchEdge:
e.batch = make(chan models.Batch, size)
case pipeline.ReduceEdge:
e.reduce = make(chan *MapResult, size)
}
return e
}
Expand Down Expand Up @@ -123,9 +120,6 @@ func (e *Edge) Close() {
if e.batch != nil {
close(e.batch)
}
if e.reduce != nil {
close(e.reduce)
}
DeleteStatistics(e.statsKey)
}

Expand Down Expand Up @@ -171,17 +165,6 @@ func (e *Edge) NextBatch() (b models.Batch, ok bool) {
return
}

func (e *Edge) NextMaps() (m *MapResult, ok bool) {
select {
case <-e.aborted:
case m, ok = <-e.reduce:
if ok {
e.emitted.Add(1)
}
}
return
}

func (e *Edge) CollectPoint(p models.Point) error {
e.collected.Add(1)
e.incCollected(p.Group, p.Tags, p.Dimensions)
Expand All @@ -204,16 +187,6 @@ func (e *Edge) CollectBatch(b models.Batch) error {
}
}

func (e *Edge) CollectMaps(m *MapResult) error {
e.collected.Add(1)
select {
case <-e.aborted:
return ErrAborted
case e.reduce <- m:
return nil
}
}

// Increment the emitted count of the group for this edge.
func (e *Edge) incEmitted(group models.GroupID, tags models.Tags, dims []string) {
e.groupMu.Lock()
Expand Down
192 changes: 0 additions & 192 deletions functions.go

This file was deleted.

56 changes: 15 additions & 41 deletions integrations/streamer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1437,19 +1437,6 @@ func TestStream_InfluxQL(t *testing.T) {
}

var scriptTmpl = `
stream
|from()
.measurement('cpu')
.where(lambda: "host" == 'serverA')
|window()
.period(10s)
.every(10s)
|mapReduce(influxql|{{ .Method }}({{ .Args }}))
{{ if .UsePointTimes }}.usePointTimes(){{ end }}
|httpOut('TestStream_InfluxQL')
`

var newScriptTmpl = `
stream
|from()
.measurement('cpu')
Expand Down Expand Up @@ -1875,35 +1862,22 @@ stream
t.Fatal(err)
}

newTmpl, err := template.New("script").Parse(newScriptTmpl)
if err != nil {
t.Fatal(err)
}

tmpls := []*template.Template{tmpl, newTmpl}

for _, tc := range testCases {
for i, tmpl := range tmpls {
if tc.Method == "distinct" && i == 0 {
// Skip legacy test for new behavior
continue
}
t.Log("Method:", tc.Method, i)
var script bytes.Buffer
if tc.Args == "" {
tc.Args = "'value'"
}
tmpl.Execute(&script, tc)
testStreamerWithOutput(
t,
"TestStream_InfluxQL",
script.String(),
13*time.Second,
tc.ER,
nil,
false,
)
}
t.Log("Method:", tc.Method)
var script bytes.Buffer
if tc.Args == "" {
tc.Args = "'value'"
}
tmpl.Execute(&script, tc)
testStreamerWithOutput(
t,
"TestStream_InfluxQL",
script.String(),
13*time.Second,
tc.ER,
nil,
false,
)
}
}

Expand Down
Loading

0 comments on commit cb2b95a

Please sign in to comment.