Skip to content

Commit

Permalink
Align binary math expression streams by time
Browse files Browse the repository at this point in the history
Also fills in missing values using the fill expression for any binary
aggregation.
  • Loading branch information
jsternberg committed Oct 18, 2016
1 parent 4776e8f commit 19a61db
Show file tree
Hide file tree
Showing 6 changed files with 1,547 additions and 451 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
- [#6894](https://github.com/influxdata/influxdb/issues/6894): Support `INFLUX_USERNAME` and `INFLUX_PASSWORD` for setting username/password in the CLI.
- [#6896](https://github.com/influxdata/influxdb/issues/6896): Correctly read in input from a non-interactive stream for the CLI.
- [#7463](https://github.com/influxdata/influxdb/pull/7463): Make input plugin services open/close idempotent.
- [#7473](https://github.com/influxdata/influxdb/pull/7473): Align binary math expression streams by time.

### Bugfixes

Expand Down
60 changes: 60 additions & 0 deletions cmd/influxd/run/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6657,6 +6657,66 @@ func TestServer_Query_IntoTarget(t *testing.T) {
}
}

// Ensure that binary operators of aggregates of separate fields, when a field is sometimes missing and sometimes present,
// result in values that are still properly time-aligned.
func TestServer_Query_IntoTarget_Sparse(t *testing.T) {
t.Parallel()
s := OpenServer(NewConfig())
defer s.Close()

if err := s.CreateDatabaseAndRetentionPolicy("db0", newRetentionPolicySpec("rp0", 1, 0)); err != nil {
t.Fatal(err)
}
if err := s.MetaClient.SetDefaultRetentionPolicy("db0", "rp0"); err != nil {
t.Fatal(err)
}

writes := []string{
// All points have fields n and a. Field b is not present in all intervals.
// First 10s interval is missing field b. Result a_n should be (2+5)*(3+7) = 70, b_n is null.
fmt.Sprintf(`foo a=2,n=3 %d`, mustParseTime(time.RFC3339Nano, "2000-01-01T00:00:01Z").UnixNano()),
fmt.Sprintf(`foo a=5,n=7 %d`, mustParseTime(time.RFC3339Nano, "2000-01-01T00:00:02Z").UnixNano()),
// Second 10s interval has field b. Result a_n = 11*17 = 187, b_n = 13*17 = 221.
fmt.Sprintf(`foo a=11,b=13,n=17 %d`, mustParseTime(time.RFC3339Nano, "2000-01-01T00:00:11Z").UnixNano()),
}

test := NewTest("db0", "rp0")
test.writes = Writes{
&Write{data: strings.Join(writes, "\n")},
}

test.addQueries([]*Query{
&Query{
name: "into",
params: url.Values{"db": []string{"db0"}},
command: `SELECT sum(a) * sum(n) as a_n, sum(b) * sum(n) as b_n INTO baz FROM foo WHERE time >= '2000-01-01T00:00:00Z' AND time < '2000-01-01T00:01:00Z' GROUP BY time(10s)`,
exp: `{"results":[{"series":[{"name":"result","columns":["time","written"],"values":[["1970-01-01T00:00:00Z",2]]}]}]}`,
},
&Query{
name: "confirm results",
params: url.Values{"db": []string{"db0"}},
command: `SELECT * FROM baz`,
exp: `{"results":[{"series":[{"name":"baz","columns":["time","a_n","b_n"],"values":[["2000-01-01T00:00:00Z",70,null],["2000-01-01T00:00:10Z",187,221]]}]}]}`,
},
}...)

if err := test.init(s); err != nil {
t.Fatalf("test init failed: %s", err)
}

for _, query := range test.queries {
if query.skip {
t.Logf("SKIP:: %s", query.name)
continue
}
if err := query.Execute(s); err != nil {
t.Error(query.Error(err))
} else if !query.success() {
t.Error(query.failureMessage())
}
}
}

// This test ensures that data is not duplicated with measurements
// of the same name.
func TestServer_Query_DuplicateMeasurements(t *testing.T) {
Expand Down
81 changes: 44 additions & 37 deletions influxql/internal/internal.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit 19a61db

Please sign in to comment.