diff --git a/CHANGELOG.md b/CHANGELOG.md index 9e8109fe0dd..562bd1b8795 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -28,6 +28,7 @@ - [#6894](https://github.com/influxdata/influxdb/issues/6894): Support `INFLUX_USERNAME` and `INFLUX_PASSWORD` for setting username/password in the CLI. - [#6896](https://github.com/influxdata/influxdb/issues/6896): Correctly read in input from a non-interactive stream for the CLI. - [#7463](https://github.com/influxdata/influxdb/pull/7463): Make input plugin services open/close idempotent. +- [#7473](https://github.com/influxdata/influxdb/pull/7473): Align binary math expression streams by time. ### Bugfixes diff --git a/cmd/influxd/run/server_test.go b/cmd/influxd/run/server_test.go index cd92f963368..ebb633035cc 100644 --- a/cmd/influxd/run/server_test.go +++ b/cmd/influxd/run/server_test.go @@ -6657,6 +6657,66 @@ func TestServer_Query_IntoTarget(t *testing.T) { } } +// Ensure that binary operators of aggregates of separate fields, when a field is sometimes missing and sometimes present, +// result in values that are still properly time-aligned. +func TestServer_Query_IntoTarget_Sparse(t *testing.T) { + t.Parallel() + s := OpenServer(NewConfig()) + defer s.Close() + + if err := s.CreateDatabaseAndRetentionPolicy("db0", newRetentionPolicySpec("rp0", 1, 0)); err != nil { + t.Fatal(err) + } + if err := s.MetaClient.SetDefaultRetentionPolicy("db0", "rp0"); err != nil { + t.Fatal(err) + } + + writes := []string{ + // All points have fields n and a. Field b is not present in all intervals. + // First 10s interval is missing field b. Result a_n should be (2+5)*(3+7) = 70, b_n is null. + fmt.Sprintf(`foo a=2,n=3 %d`, mustParseTime(time.RFC3339Nano, "2000-01-01T00:00:01Z").UnixNano()), + fmt.Sprintf(`foo a=5,n=7 %d`, mustParseTime(time.RFC3339Nano, "2000-01-01T00:00:02Z").UnixNano()), + // Second 10s interval has field b. Result a_n = 11*17 = 187, b_n = 13*17 = 221. + fmt.Sprintf(`foo a=11,b=13,n=17 %d`, mustParseTime(time.RFC3339Nano, "2000-01-01T00:00:11Z").UnixNano()), + } + + test := NewTest("db0", "rp0") + test.writes = Writes{ + &Write{data: strings.Join(writes, "\n")}, + } + + test.addQueries([]*Query{ + &Query{ + name: "into", + params: url.Values{"db": []string{"db0"}}, + command: `SELECT sum(a) * sum(n) as a_n, sum(b) * sum(n) as b_n INTO baz FROM foo WHERE time >= '2000-01-01T00:00:00Z' AND time < '2000-01-01T00:01:00Z' GROUP BY time(10s)`, + exp: `{"results":[{"series":[{"name":"result","columns":["time","written"],"values":[["1970-01-01T00:00:00Z",2]]}]}]}`, + }, + &Query{ + name: "confirm results", + params: url.Values{"db": []string{"db0"}}, + command: `SELECT * FROM baz`, + exp: `{"results":[{"series":[{"name":"baz","columns":["time","a_n","b_n"],"values":[["2000-01-01T00:00:00Z",70,null],["2000-01-01T00:00:10Z",187,221]]}]}]}`, + }, + }...) + + if err := test.init(s); err != nil { + t.Fatalf("test init failed: %s", err) + } + + for _, query := range test.queries { + if query.skip { + t.Logf("SKIP:: %s", query.name) + continue + } + if err := query.Execute(s); err != nil { + t.Error(query.Error(err)) + } else if !query.success() { + t.Error(query.failureMessage()) + } + } +} + // This test ensures that data is not duplicated with measurements // of the same name. func TestServer_Query_DuplicateMeasurements(t *testing.T) { diff --git a/influxql/internal/internal.pb.go b/influxql/internal/internal.pb.go index 51e0a7f3e76..cfafbf43951 100644 --- a/influxql/internal/internal.pb.go +++ b/influxql/internal/internal.pb.go @@ -481,41 +481,48 @@ func init() { func init() { proto.RegisterFile("internal/internal.proto", fileDescriptorInternal) } var fileDescriptorInternal = []byte{ - // 575 bytes of a gzipped FileDescriptorProto - 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x09, 0x6e, 0x88, 0x02, 0xff, 0x7c, 0x53, 0xdf, 0x6a, 0xdb, 0x3c, - 0x14, 0x47, 0x56, 0x9d, 0xda, 0xc7, 0x49, 0x93, 0xe8, 0xfb, 0x46, 0xc5, 0xae, 0x44, 0x56, 0x8a, - 0x2f, 0x46, 0x37, 0xca, 0x5e, 0x20, 0x5b, 0x5b, 0x08, 0x6c, 0x69, 0x69, 0x4a, 0xef, 0xb5, 0xe6, - 0xc4, 0x08, 0x14, 0x39, 0x93, 0xe4, 0x91, 0x3e, 0x73, 0x5f, 0x62, 0x48, 0x71, 0x9a, 0xac, 0x84, - 0xdd, 0xf9, 0x1c, 0x1d, 0xc9, 0xbf, 0x7f, 0x07, 0x4e, 0x95, 0xf1, 0x68, 0x8d, 0xd4, 0x9f, 0xb6, - 0x1f, 0x17, 0x2b, 0x5b, 0xfb, 0x9a, 0x65, 0xca, 0x2c, 0x74, 0xb3, 0xfe, 0xa5, 0x47, 0x2f, 0x04, - 0xd2, 0xbb, 0x5a, 0x19, 0xcf, 0xba, 0x70, 0x34, 0x95, 0x4b, 0xe4, 0x44, 0x24, 0x65, 0x1e, 0xaa, - 0x07, 0x59, 0x39, 0x9e, 0xbc, 0x56, 0x6a, 0x89, 0x9c, 0x8a, 0xa4, 0xa4, 0xac, 0x00, 0x3a, 0x55, - 0x9a, 0x1f, 0x89, 0xa4, 0xcc, 0xd8, 0x7b, 0xa0, 0xe3, 0x66, 0xcd, 0x53, 0x41, 0xcb, 0xe2, 0xb2, - 0x77, 0xb1, 0x7d, 0xf8, 0x62, 0xdc, 0xac, 0x19, 0x03, 0x18, 0x57, 0x95, 0xc5, 0x4a, 0x7a, 0x9c, - 0xf3, 0x8e, 0x20, 0x65, 0x2f, 0xf4, 0x6e, 0x74, 0x2d, 0xfd, 0xa3, 0xd4, 0x0d, 0xf2, 0x63, 0x41, - 0x4a, 0xc2, 0xfe, 0x87, 0xee, 0xc4, 0x78, 0xac, 0xd0, 0x6e, 0xba, 0x99, 0x20, 0x25, 0x65, 0xff, - 0x41, 0x31, 0xf3, 0x56, 0x99, 0x6a, 0xd3, 0xcc, 0x05, 0x29, 0xf3, 0x30, 0xfa, 0xb5, 0xae, 0x35, - 0x4a, 0xb3, 0xe9, 0x82, 0x20, 0x65, 0xc6, 0xce, 0x21, 0x9d, 0x79, 0xe9, 0x1d, 0x2f, 0x04, 0x29, - 0x8b, 0xcb, 0xd3, 0x1d, 0x8c, 0x89, 0x47, 0x2b, 0x7d, 0x6d, 0xe3, 0xf1, 0x48, 0x47, 0xb0, 0x6c, - 0x00, 0xd9, 0x95, 0xf4, 0xf2, 0xe1, 0x79, 0xb5, 0xa1, 0x9b, 0xbe, 0x41, 0x95, 0x1c, 0x44, 0x45, - 0x0f, 0xa1, 0x3a, 0x3a, 0x88, 0x2a, 0x0d, 0xa8, 0x46, 0x2f, 0x09, 0xf4, 0xb7, 0xff, 0xbf, 0x5d, - 0x79, 0x55, 0x1b, 0x17, 0x94, 0xbc, 0x5e, 0xaf, 0x2c, 0x27, 0xf1, 0x5e, 0xb1, 0x11, 0x2f, 0x11, - 0xb4, 0xcc, 0x99, 0x80, 0xce, 0x8d, 0x42, 0x3d, 0x77, 0x7c, 0x18, 0xc5, 0x1c, 0xec, 0x58, 0x3c, - 0x4a, 0x7b, 0x8f, 0x0b, 0x76, 0x0e, 0xc7, 0xb3, 0xba, 0xb1, 0x4f, 0xe8, 0x38, 0x8d, 0x23, 0xef, - 0x76, 0x23, 0x3f, 0x50, 0xba, 0xc6, 0xe2, 0x12, 0x8d, 0x67, 0x67, 0x90, 0x05, 0xe4, 0xf6, 0xb7, - 0xd4, 0x11, 0x60, 0x71, 0xc9, 0xf6, 0x14, 0x69, 0x4f, 0x02, 0xe7, 0x2b, 0xb5, 0x44, 0xe3, 0x02, - 0xb0, 0x68, 0x60, 0x34, 0xfa, 0x46, 0x69, 0x1d, 0xbd, 0x4a, 0xd9, 0x10, 0xf2, 0x50, 0xed, 0x5b, - 0x35, 0x84, 0xfc, 0x5b, 0x6d, 0xe6, 0x2a, 0xb0, 0x89, 0x3e, 0xe5, 0xa1, 0x35, 0xf3, 0xd2, 0xfa, - 0x98, 0x90, 0x3c, 0x8a, 0xd4, 0x87, 0xe3, 0x6b, 0x33, 0x8f, 0x0d, 0x88, 0x8d, 0x21, 0xe4, 0x63, - 0xf7, 0x84, 0x66, 0xae, 0x4c, 0x15, 0x4d, 0xca, 0x58, 0x0f, 0xd2, 0xef, 0x6a, 0xa9, 0x3c, 0xef, - 0xc6, 0x89, 0x13, 0xe8, 0xdc, 0x2e, 0x16, 0x0e, 0x3d, 0xef, 0x6d, 0xeb, 0xd9, 0xe6, 0xfc, 0x64, - 0xfb, 0xe4, 0xac, 0x1d, 0xe8, 0x6f, 0x07, 0xae, 0x70, 0xde, 0xac, 0x90, 0x0f, 0xa2, 0xda, 0x5f, - 0xa0, 0xbb, 0xa7, 0x81, 0x63, 0x67, 0x90, 0x4e, 0x3c, 0x2e, 0x1d, 0x27, 0xff, 0x90, 0x6a, 0x54, - 0x41, 0xb1, 0xaf, 0x5c, 0x9b, 0x8c, 0x9f, 0xd2, 0x61, 0x6b, 0xd1, 0x29, 0xf4, 0xef, 0xd1, 0xa3, - 0x09, 0x84, 0xef, 0x6a, 0xad, 0x9e, 0x9e, 0x63, 0x3c, 0xf2, 0xd7, 0x7d, 0xa1, 0xb1, 0xea, 0x41, - 0x7a, 0x8f, 0x15, 0xae, 0xdb, 0x40, 0x0c, 0x20, 0x9b, 0xb8, 0x07, 0x69, 0x2b, 0xf4, 0x6d, 0x18, - 0x3e, 0xee, 0x3c, 0x89, 0x7f, 0x69, 0xac, 0x8c, 0x1a, 0x92, 0x37, 0xec, 0xc3, 0xe3, 0x74, 0xf4, - 0x19, 0x7a, 0x7f, 0x25, 0x37, 0xd2, 0x47, 0xab, 0xd0, 0x4d, 0x77, 0x37, 0xe2, 0xde, 0x4e, 0xdb, - 0x1b, 0x1f, 0xa0, 0xd3, 0xa6, 0xa4, 0x00, 0xfa, 0x28, 0xf5, 0xde, 0x1e, 0x87, 0x98, 0x87, 0xa1, - 0xf4, 0x4f, 0x00, 0x00, 0x00, 0xff, 0xff, 0x1a, 0x06, 0x51, 0x0d, 0x11, 0x04, 0x00, 0x00, + // 685 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x09, 0x6e, 0x88, 0x02, 0xff, 0x84, 0x54, 0xd1, 0x6a, 0xdc, 0x3a, + 0x10, 0xc5, 0xf6, 0x7a, 0x63, 0x6b, 0xb3, 0x37, 0xb9, 0x22, 0xf7, 0x46, 0x94, 0xd2, 0x1a, 0x3f, + 0x19, 0x4a, 0x37, 0x90, 0xd7, 0x42, 0x61, 0xdb, 0x24, 0xb0, 0xd0, 0x6e, 0x82, 0x1c, 0xf2, 0xae, + 0x66, 0x67, 0x8d, 0xc0, 0x2b, 0x6f, 0x65, 0xb9, 0x6c, 0xde, 0xfa, 0x1b, 0xfd, 0x86, 0x7e, 0x4c, + 0x7f, 0xa9, 0x68, 0x64, 0xaf, 0x9d, 0x14, 0x9a, 0x27, 0xcf, 0x39, 0x33, 0x92, 0x7c, 0x66, 0x8e, + 0x44, 0x4e, 0xa5, 0x32, 0xa0, 0x95, 0x28, 0xcf, 0xba, 0x60, 0xb6, 0xd5, 0x95, 0xa9, 0x68, 0x24, + 0xd5, 0xba, 0x6c, 0x76, 0x5f, 0xcb, 0xf4, 0x97, 0x4f, 0xc2, 0x9b, 0x4a, 0x2a, 0x43, 0x29, 0x19, + 0x2d, 0xc5, 0x06, 0x98, 0x97, 0xf8, 0x59, 0xcc, 0x31, 0xb6, 0xdc, 0xad, 0x28, 0x6a, 0xe6, 0x3b, + 0xce, 0xc6, 0xc8, 0xc9, 0x0d, 0xb0, 0x20, 0xf1, 0xb3, 0x80, 0x63, 0x4c, 0x8f, 0x49, 0xb0, 0x94, + 0x25, 0x1b, 0x25, 0x7e, 0x16, 0x71, 0x1b, 0xd2, 0xd7, 0x24, 0x98, 0x37, 0x3b, 0x16, 0x26, 0x41, + 0x36, 0x39, 0x9f, 0xce, 0xba, 0xf3, 0x66, 0xf3, 0x66, 0xc7, 0x6d, 0x86, 0xbe, 0x22, 0x64, 0x5e, + 0x14, 0x1a, 0x0a, 0x61, 0x60, 0xc5, 0xc6, 0x89, 0x97, 0x4d, 0xf9, 0x80, 0xb1, 0xf9, 0xab, 0xb2, + 0x12, 0xe6, 0x4e, 0x94, 0x0d, 0xb0, 0x83, 0xc4, 0xcb, 0x3c, 0x3e, 0x60, 0x68, 0x4a, 0x0e, 0x17, + 0xca, 0x40, 0x01, 0xda, 0x55, 0x44, 0x89, 0x97, 0x05, 0xfc, 0x11, 0x47, 0x13, 0x32, 0xc9, 0x8d, + 0x96, 0xaa, 0x70, 0x25, 0x71, 0xe2, 0x65, 0x31, 0x1f, 0x52, 0x76, 0x97, 0x0f, 0x55, 0x55, 0x82, + 0x50, 0xae, 0x84, 0x24, 0x5e, 0x16, 0xf1, 0x47, 0x1c, 0x7d, 0x4b, 0xc2, 0xdc, 0x08, 0x53, 0xb3, + 0x49, 0xe2, 0x65, 0x93, 0xf3, 0xd3, 0x5e, 0xcc, 0xc2, 0x80, 0x16, 0xa6, 0xd2, 0x98, 0xe6, 0xae, + 0x2a, 0xfd, 0xe9, 0xa1, 0x74, 0xfa, 0x82, 0x44, 0x17, 0xc2, 0x88, 0xdb, 0x87, 0xad, 0xeb, 0x69, + 0xc8, 0xf7, 0xf8, 0x89, 0x38, 0xff, 0x59, 0x71, 0xc1, 0xf3, 0xe2, 0x46, 0xcf, 0x8b, 0x0b, 0xff, + 0x14, 0x97, 0x7e, 0x1f, 0x91, 0xa3, 0x4e, 0xc6, 0xf5, 0xd6, 0xc8, 0x4a, 0xe1, 0x84, 0x2f, 0x77, + 0x5b, 0xcd, 0x3c, 0xdc, 0x12, 0x63, 0x3b, 0x61, 0x3b, 0x4f, 0x3f, 0x09, 0xb2, 0xd8, 0x0d, 0x30, + 0x23, 0xe3, 0x2b, 0x09, 0xe5, 0xaa, 0x66, 0xff, 0xe2, 0x90, 0x8f, 0xfb, 0xbe, 0xdc, 0x09, 0xcd, + 0x61, 0xcd, 0xdb, 0x3c, 0x3d, 0x23, 0x07, 0x79, 0xd5, 0xe8, 0x7b, 0xa8, 0x59, 0x80, 0xa5, 0xff, + 0xf5, 0xa5, 0x9f, 0x41, 0xd4, 0x8d, 0x86, 0x0d, 0x28, 0xc3, 0xbb, 0x2a, 0x3a, 0x23, 0x91, 0x95, + 0xaa, 0xbf, 0x89, 0x12, 0x75, 0x4d, 0xce, 0xe9, 0xa0, 0xe9, 0x6d, 0x86, 0xef, 0x6b, 0x6c, 0x3b, + 0x2f, 0xe4, 0x06, 0x54, 0x6d, 0x7f, 0x1f, 0x3d, 0x17, 0xf3, 0x01, 0x63, 0x05, 0x5d, 0xc9, 0xb2, + 0x44, 0x97, 0x85, 0x1c, 0x63, 0xfa, 0x92, 0xc4, 0xf6, 0x3b, 0xb4, 0x57, 0x4f, 0xd8, 0xec, 0xc7, + 0x4a, 0xad, 0xa4, 0x6d, 0x08, 0x5a, 0x2b, 0xe6, 0x3d, 0x61, 0xb3, 0xb9, 0x11, 0xda, 0xe0, 0x3d, + 0x88, 0x71, 0x36, 0x3d, 0x41, 0x19, 0x39, 0xb8, 0x54, 0x2b, 0xcc, 0x11, 0xcc, 0x75, 0xd0, 0xae, + 0x9b, 0xd7, 0xf7, 0xa0, 0x56, 0x52, 0x15, 0xe8, 0xa6, 0x88, 0xf7, 0x04, 0x3d, 0x21, 0xe1, 0x27, + 0xb9, 0x91, 0x86, 0x1d, 0xe2, 0x2a, 0x07, 0xe8, 0xff, 0x64, 0x7c, 0xbd, 0x5e, 0xd7, 0x60, 0xd8, + 0x14, 0xe9, 0x16, 0x59, 0x3e, 0x77, 0xe5, 0xff, 0x38, 0xde, 0x21, 0x7b, 0x7a, 0xde, 0x2e, 0x38, + 0x72, 0xa7, 0xe7, 0xfd, 0x8a, 0x0b, 0x58, 0x35, 0x5b, 0x60, 0xc7, 0x78, 0x74, 0x8b, 0xd2, 0x77, + 0xe4, 0x70, 0x30, 0x85, 0x9a, 0xbe, 0x21, 0xe1, 0xc2, 0xc0, 0xa6, 0x66, 0xde, 0xdf, 0x86, 0xe5, + 0x6a, 0xd2, 0x1f, 0x1e, 0x99, 0x0c, 0xe8, 0xce, 0xf5, 0x5f, 0x44, 0x0d, 0xad, 0x7f, 0xf6, 0x98, + 0x66, 0xe4, 0x88, 0x83, 0x01, 0x65, 0x7b, 0x78, 0x53, 0x95, 0xf2, 0xfe, 0x01, 0xad, 0x1f, 0xf3, + 0xa7, 0xf4, 0xfe, 0x2d, 0x0a, 0x9c, 0x03, 0xf1, 0x2d, 0x3a, 0x21, 0x21, 0x87, 0x02, 0x76, 0xad, + 0xd3, 0x1d, 0xb0, 0xe7, 0x2d, 0xea, 0x5b, 0xa1, 0x0b, 0x30, 0xad, 0xbf, 0xf7, 0x38, 0x7d, 0xdf, + 0xdb, 0x08, 0xff, 0xab, 0xd1, 0x02, 0xe7, 0xe9, 0x61, 0x5f, 0xf6, 0x78, 0xd0, 0x62, 0x7f, 0xd8, + 0xe2, 0x74, 0x4e, 0xa6, 0x8f, 0x6e, 0x38, 0xf6, 0x16, 0xb4, 0x84, 0x7a, 0xd9, 0xee, 0xd1, 0x41, + 0xbb, 0x05, 0xbe, 0xa2, 0xcb, 0x6e, 0x0b, 0x87, 0xd2, 0x19, 0x19, 0xbb, 0xcb, 0x60, 0x2f, 0xd0, + 0x9d, 0x28, 0xdb, 0xd7, 0xd5, 0x86, 0xf8, 0x90, 0xda, 0xc7, 0xc1, 0x77, 0xae, 0xb4, 0xf1, 0xef, + 0x00, 0x00, 0x00, 0xff, 0xff, 0x8c, 0x01, 0x5d, 0x11, 0xb2, 0x05, 0x00, 0x00, } diff --git a/influxql/iterator.gen.go b/influxql/iterator.gen.go index 2f5431493a7..f76a80e1745 100644 --- a/influxql/iterator.gen.go +++ b/influxql/iterator.gen.go @@ -689,6 +689,7 @@ func (itr *floatFillIterator) Next() (*FloatPoint, error) { } else { p.Nil = true } + case NullFill: p.Nil = true case NumberFill: @@ -1211,9 +1212,29 @@ func (itr *floatStreamFloatIterator) reduce() ([]FloatPoint, error) { // floatExprIterator executes a function to modify an existing point // for every output of the input iterator. type floatExprIterator struct { - left *bufFloatIterator - right *bufFloatIterator - fn floatExprFunc + left *bufFloatIterator + right *bufFloatIterator + fn floatExprFunc + points []FloatPoint // must be size 2 + storePrev bool +} + +func newFloatExprIterator(left, right FloatIterator, opt IteratorOptions, fn func(a, b float64) float64) *floatExprIterator { + var points []FloatPoint + switch opt.Fill { + case NullFill, PreviousFill: + points = []FloatPoint{{Nil: true}, {Nil: true}} + case NumberFill: + value := castToFloat(opt.FillValue) + points = []FloatPoint{{Value: value}, {Value: value}} + } + return &floatExprIterator{ + left: newBufFloatIterator(left), + right: newBufFloatIterator(right), + points: points, + fn: fn, + storePrev: opt.Fill == PreviousFill, + } } func (itr *floatExprIterator) Stats() IteratorStats { @@ -1229,24 +1250,70 @@ func (itr *floatExprIterator) Close() error { } func (itr *floatExprIterator) Next() (*FloatPoint, error) { - a, err := itr.left.Next() - if err != nil { - return nil, err - } - b, err := itr.right.Next() - if err != nil { - return nil, err - } else if a == nil && b == nil { - return nil, nil + for { + a, err := itr.left.Next() + if err != nil { + return nil, err + } + b, err := itr.right.Next() + if err != nil { + return nil, err + } + + if a == nil && b == nil { + return nil, nil + } else if itr.points == nil && (a == nil || b == nil) { + return nil, nil + } + + if a != nil && b != nil { + if a.Time > b.Time { + itr.left.unread(a) + a = nil + } else if a.Time < b.Time { + itr.right.unread(b) + b = nil + } + } + + if a == nil || a.Nil { + if itr.points == nil { + continue + } + p := *b + p.Value = itr.points[0].Value + p.Nil = itr.points[0].Nil + a = &p + } else if b == nil || b.Nil { + if itr.points == nil { + continue + } + p := *a + p.Value = itr.points[1].Value + p.Nil = itr.points[1].Nil + b = &p + } + + if itr.storePrev { + itr.points[0], itr.points[1] = *a, *b + } + + if a.Nil { + return a, nil + } else if b.Nil { + return b, nil + } + a.Value = itr.fn(a.Value, b.Value) + return a, nil + } - return itr.fn(a, b), nil } // floatExprFunc creates or modifies a point by combining two // points. The point passed in may be modified and returned rather than // allocating a new point if possible. One of the points may be nil, but at // least one of the points will be non-nil. -type floatExprFunc func(a *FloatPoint, b *FloatPoint) *FloatPoint +type floatExprFunc func(a, b float64) float64 // floatReduceIntegerIterator executes a reducer for every interval and buffers the result. type floatReduceIntegerIterator struct { @@ -1449,9 +1516,29 @@ func (itr *floatStreamIntegerIterator) reduce() ([]IntegerPoint, error) { // floatIntegerExprIterator executes a function to modify an existing point // for every output of the input iterator. type floatIntegerExprIterator struct { - left *bufFloatIterator - right *bufFloatIterator - fn floatIntegerExprFunc + left *bufFloatIterator + right *bufFloatIterator + fn floatIntegerExprFunc + points []FloatPoint // must be size 2 + storePrev bool +} + +func newFloatIntegerExprIterator(left, right FloatIterator, opt IteratorOptions, fn func(a, b float64) int64) *floatIntegerExprIterator { + var points []FloatPoint + switch opt.Fill { + case NullFill, PreviousFill: + points = []FloatPoint{{Nil: true}, {Nil: true}} + case NumberFill: + value := castToFloat(opt.FillValue) + points = []FloatPoint{{Value: value}, {Value: value}} + } + return &floatIntegerExprIterator{ + left: newBufFloatIterator(left), + right: newBufFloatIterator(right), + points: points, + fn: fn, + storePrev: opt.Fill == PreviousFill, + } } func (itr *floatIntegerExprIterator) Stats() IteratorStats { @@ -1467,24 +1554,74 @@ func (itr *floatIntegerExprIterator) Close() error { } func (itr *floatIntegerExprIterator) Next() (*IntegerPoint, error) { - a, err := itr.left.Next() - if err != nil { - return nil, err - } - b, err := itr.right.Next() - if err != nil { - return nil, err - } else if a == nil && b == nil { - return nil, nil + for { + a, err := itr.left.Next() + if err != nil { + return nil, err + } + b, err := itr.right.Next() + if err != nil { + return nil, err + } + + if a == nil && b == nil { + return nil, nil + } else if itr.points == nil && (a == nil || b == nil) { + return nil, nil + } + + if a != nil && b != nil { + if a.Time > b.Time { + itr.left.unread(a) + a = nil + } else if a.Time < b.Time { + itr.right.unread(b) + b = nil + } + } + + if a == nil || a.Nil { + if itr.points == nil { + continue + } + p := *b + p.Value = itr.points[0].Value + p.Nil = itr.points[0].Nil + a = &p + } else if b == nil || b.Nil { + if itr.points == nil { + continue + } + p := *a + p.Value = itr.points[1].Value + p.Nil = itr.points[1].Nil + b = &p + } + + if itr.storePrev { + itr.points[0], itr.points[1] = *a, *b + } + + p := &IntegerPoint{ + Name: a.Name, + Tags: a.Tags, + Time: a.Time, + Nil: a.Nil || b.Nil, + Aggregated: a.Aggregated, + } + if !p.Nil { + p.Value = itr.fn(a.Value, b.Value) + } + return p, nil + } - return itr.fn(a, b), nil } // floatIntegerExprFunc creates or modifies a point by combining two // points. The point passed in may be modified and returned rather than // allocating a new point if possible. One of the points may be nil, but at // least one of the points will be non-nil. -type floatIntegerExprFunc func(a *FloatPoint, b *FloatPoint) *IntegerPoint +type floatIntegerExprFunc func(a, b float64) int64 // floatReduceStringIterator executes a reducer for every interval and buffers the result. type floatReduceStringIterator struct { @@ -1687,9 +1824,29 @@ func (itr *floatStreamStringIterator) reduce() ([]StringPoint, error) { // floatStringExprIterator executes a function to modify an existing point // for every output of the input iterator. type floatStringExprIterator struct { - left *bufFloatIterator - right *bufFloatIterator - fn floatStringExprFunc + left *bufFloatIterator + right *bufFloatIterator + fn floatStringExprFunc + points []FloatPoint // must be size 2 + storePrev bool +} + +func newFloatStringExprIterator(left, right FloatIterator, opt IteratorOptions, fn func(a, b float64) string) *floatStringExprIterator { + var points []FloatPoint + switch opt.Fill { + case NullFill, PreviousFill: + points = []FloatPoint{{Nil: true}, {Nil: true}} + case NumberFill: + value := castToFloat(opt.FillValue) + points = []FloatPoint{{Value: value}, {Value: value}} + } + return &floatStringExprIterator{ + left: newBufFloatIterator(left), + right: newBufFloatIterator(right), + points: points, + fn: fn, + storePrev: opt.Fill == PreviousFill, + } } func (itr *floatStringExprIterator) Stats() IteratorStats { @@ -1705,24 +1862,74 @@ func (itr *floatStringExprIterator) Close() error { } func (itr *floatStringExprIterator) Next() (*StringPoint, error) { - a, err := itr.left.Next() - if err != nil { - return nil, err - } - b, err := itr.right.Next() - if err != nil { - return nil, err - } else if a == nil && b == nil { - return nil, nil + for { + a, err := itr.left.Next() + if err != nil { + return nil, err + } + b, err := itr.right.Next() + if err != nil { + return nil, err + } + + if a == nil && b == nil { + return nil, nil + } else if itr.points == nil && (a == nil || b == nil) { + return nil, nil + } + + if a != nil && b != nil { + if a.Time > b.Time { + itr.left.unread(a) + a = nil + } else if a.Time < b.Time { + itr.right.unread(b) + b = nil + } + } + + if a == nil || a.Nil { + if itr.points == nil { + continue + } + p := *b + p.Value = itr.points[0].Value + p.Nil = itr.points[0].Nil + a = &p + } else if b == nil || b.Nil { + if itr.points == nil { + continue + } + p := *a + p.Value = itr.points[1].Value + p.Nil = itr.points[1].Nil + b = &p + } + + if itr.storePrev { + itr.points[0], itr.points[1] = *a, *b + } + + p := &StringPoint{ + Name: a.Name, + Tags: a.Tags, + Time: a.Time, + Nil: a.Nil || b.Nil, + Aggregated: a.Aggregated, + } + if !p.Nil { + p.Value = itr.fn(a.Value, b.Value) + } + return p, nil + } - return itr.fn(a, b), nil } // floatStringExprFunc creates or modifies a point by combining two // points. The point passed in may be modified and returned rather than // allocating a new point if possible. One of the points may be nil, but at // least one of the points will be non-nil. -type floatStringExprFunc func(a *FloatPoint, b *FloatPoint) *StringPoint +type floatStringExprFunc func(a, b float64) string // floatReduceBooleanIterator executes a reducer for every interval and buffers the result. type floatReduceBooleanIterator struct { @@ -1925,9 +2132,29 @@ func (itr *floatStreamBooleanIterator) reduce() ([]BooleanPoint, error) { // floatBooleanExprIterator executes a function to modify an existing point // for every output of the input iterator. type floatBooleanExprIterator struct { - left *bufFloatIterator - right *bufFloatIterator - fn floatBooleanExprFunc + left *bufFloatIterator + right *bufFloatIterator + fn floatBooleanExprFunc + points []FloatPoint // must be size 2 + storePrev bool +} + +func newFloatBooleanExprIterator(left, right FloatIterator, opt IteratorOptions, fn func(a, b float64) bool) *floatBooleanExprIterator { + var points []FloatPoint + switch opt.Fill { + case NullFill, PreviousFill: + points = []FloatPoint{{Nil: true}, {Nil: true}} + case NumberFill: + value := castToFloat(opt.FillValue) + points = []FloatPoint{{Value: value}, {Value: value}} + } + return &floatBooleanExprIterator{ + left: newBufFloatIterator(left), + right: newBufFloatIterator(right), + points: points, + fn: fn, + storePrev: opt.Fill == PreviousFill, + } } func (itr *floatBooleanExprIterator) Stats() IteratorStats { @@ -1943,24 +2170,74 @@ func (itr *floatBooleanExprIterator) Close() error { } func (itr *floatBooleanExprIterator) Next() (*BooleanPoint, error) { - a, err := itr.left.Next() - if err != nil { - return nil, err - } - b, err := itr.right.Next() - if err != nil { - return nil, err - } else if a == nil && b == nil { - return nil, nil + for { + a, err := itr.left.Next() + if err != nil { + return nil, err + } + b, err := itr.right.Next() + if err != nil { + return nil, err + } + + if a == nil && b == nil { + return nil, nil + } else if itr.points == nil && (a == nil || b == nil) { + return nil, nil + } + + if a != nil && b != nil { + if a.Time > b.Time { + itr.left.unread(a) + a = nil + } else if a.Time < b.Time { + itr.right.unread(b) + b = nil + } + } + + if a == nil || a.Nil { + if itr.points == nil { + continue + } + p := *b + p.Value = itr.points[0].Value + p.Nil = itr.points[0].Nil + a = &p + } else if b == nil || b.Nil { + if itr.points == nil { + continue + } + p := *a + p.Value = itr.points[1].Value + p.Nil = itr.points[1].Nil + b = &p + } + + if itr.storePrev { + itr.points[0], itr.points[1] = *a, *b + } + + p := &BooleanPoint{ + Name: a.Name, + Tags: a.Tags, + Time: a.Time, + Nil: a.Nil || b.Nil, + Aggregated: a.Aggregated, + } + if !p.Nil { + p.Value = itr.fn(a.Value, b.Value) + } + return p, nil + } - return itr.fn(a, b), nil } // floatBooleanExprFunc creates or modifies a point by combining two // points. The point passed in may be modified and returned rather than // allocating a new point if possible. One of the points may be nil, but at // least one of the points will be non-nil. -type floatBooleanExprFunc func(a *FloatPoint, b *FloatPoint) *BooleanPoint +type floatBooleanExprFunc func(a, b float64) bool // floatTransformIterator executes a function to modify an existing point for every // output of the input iterator. @@ -2774,6 +3051,7 @@ func (itr *integerFillIterator) Next() (*IntegerPoint, error) { } else { p.Nil = true } + case NullFill: p.Nil = true case NumberFill: @@ -3293,9 +3571,29 @@ func (itr *integerStreamFloatIterator) reduce() ([]FloatPoint, error) { // integerFloatExprIterator executes a function to modify an existing point // for every output of the input iterator. type integerFloatExprIterator struct { - left *bufIntegerIterator - right *bufIntegerIterator - fn integerFloatExprFunc + left *bufIntegerIterator + right *bufIntegerIterator + fn integerFloatExprFunc + points []IntegerPoint // must be size 2 + storePrev bool +} + +func newIntegerFloatExprIterator(left, right IntegerIterator, opt IteratorOptions, fn func(a, b int64) float64) *integerFloatExprIterator { + var points []IntegerPoint + switch opt.Fill { + case NullFill, PreviousFill: + points = []IntegerPoint{{Nil: true}, {Nil: true}} + case NumberFill: + value := castToInteger(opt.FillValue) + points = []IntegerPoint{{Value: value}, {Value: value}} + } + return &integerFloatExprIterator{ + left: newBufIntegerIterator(left), + right: newBufIntegerIterator(right), + points: points, + fn: fn, + storePrev: opt.Fill == PreviousFill, + } } func (itr *integerFloatExprIterator) Stats() IteratorStats { @@ -3311,24 +3609,74 @@ func (itr *integerFloatExprIterator) Close() error { } func (itr *integerFloatExprIterator) Next() (*FloatPoint, error) { - a, err := itr.left.Next() - if err != nil { - return nil, err - } - b, err := itr.right.Next() - if err != nil { - return nil, err - } else if a == nil && b == nil { - return nil, nil + for { + a, err := itr.left.Next() + if err != nil { + return nil, err + } + b, err := itr.right.Next() + if err != nil { + return nil, err + } + + if a == nil && b == nil { + return nil, nil + } else if itr.points == nil && (a == nil || b == nil) { + return nil, nil + } + + if a != nil && b != nil { + if a.Time > b.Time { + itr.left.unread(a) + a = nil + } else if a.Time < b.Time { + itr.right.unread(b) + b = nil + } + } + + if a == nil || a.Nil { + if itr.points == nil { + continue + } + p := *b + p.Value = itr.points[0].Value + p.Nil = itr.points[0].Nil + a = &p + } else if b == nil || b.Nil { + if itr.points == nil { + continue + } + p := *a + p.Value = itr.points[1].Value + p.Nil = itr.points[1].Nil + b = &p + } + + if itr.storePrev { + itr.points[0], itr.points[1] = *a, *b + } + + p := &FloatPoint{ + Name: a.Name, + Tags: a.Tags, + Time: a.Time, + Nil: a.Nil || b.Nil, + Aggregated: a.Aggregated, + } + if !p.Nil { + p.Value = itr.fn(a.Value, b.Value) + } + return p, nil + } - return itr.fn(a, b), nil } // integerFloatExprFunc creates or modifies a point by combining two // points. The point passed in may be modified and returned rather than // allocating a new point if possible. One of the points may be nil, but at // least one of the points will be non-nil. -type integerFloatExprFunc func(a *IntegerPoint, b *IntegerPoint) *FloatPoint +type integerFloatExprFunc func(a, b int64) float64 // integerReduceIntegerIterator executes a reducer for every interval and buffers the result. type integerReduceIntegerIterator struct { @@ -3531,9 +3879,29 @@ func (itr *integerStreamIntegerIterator) reduce() ([]IntegerPoint, error) { // integerExprIterator executes a function to modify an existing point // for every output of the input iterator. type integerExprIterator struct { - left *bufIntegerIterator - right *bufIntegerIterator - fn integerExprFunc + left *bufIntegerIterator + right *bufIntegerIterator + fn integerExprFunc + points []IntegerPoint // must be size 2 + storePrev bool +} + +func newIntegerExprIterator(left, right IntegerIterator, opt IteratorOptions, fn func(a, b int64) int64) *integerExprIterator { + var points []IntegerPoint + switch opt.Fill { + case NullFill, PreviousFill: + points = []IntegerPoint{{Nil: true}, {Nil: true}} + case NumberFill: + value := castToInteger(opt.FillValue) + points = []IntegerPoint{{Value: value}, {Value: value}} + } + return &integerExprIterator{ + left: newBufIntegerIterator(left), + right: newBufIntegerIterator(right), + points: points, + fn: fn, + storePrev: opt.Fill == PreviousFill, + } } func (itr *integerExprIterator) Stats() IteratorStats { @@ -3549,24 +3917,70 @@ func (itr *integerExprIterator) Close() error { } func (itr *integerExprIterator) Next() (*IntegerPoint, error) { - a, err := itr.left.Next() - if err != nil { - return nil, err - } - b, err := itr.right.Next() - if err != nil { - return nil, err - } else if a == nil && b == nil { - return nil, nil + for { + a, err := itr.left.Next() + if err != nil { + return nil, err + } + b, err := itr.right.Next() + if err != nil { + return nil, err + } + + if a == nil && b == nil { + return nil, nil + } else if itr.points == nil && (a == nil || b == nil) { + return nil, nil + } + + if a != nil && b != nil { + if a.Time > b.Time { + itr.left.unread(a) + a = nil + } else if a.Time < b.Time { + itr.right.unread(b) + b = nil + } + } + + if a == nil || a.Nil { + if itr.points == nil { + continue + } + p := *b + p.Value = itr.points[0].Value + p.Nil = itr.points[0].Nil + a = &p + } else if b == nil || b.Nil { + if itr.points == nil { + continue + } + p := *a + p.Value = itr.points[1].Value + p.Nil = itr.points[1].Nil + b = &p + } + + if itr.storePrev { + itr.points[0], itr.points[1] = *a, *b + } + + if a.Nil { + return a, nil + } else if b.Nil { + return b, nil + } + a.Value = itr.fn(a.Value, b.Value) + return a, nil + } - return itr.fn(a, b), nil } // integerExprFunc creates or modifies a point by combining two // points. The point passed in may be modified and returned rather than // allocating a new point if possible. One of the points may be nil, but at // least one of the points will be non-nil. -type integerExprFunc func(a *IntegerPoint, b *IntegerPoint) *IntegerPoint +type integerExprFunc func(a, b int64) int64 // integerReduceStringIterator executes a reducer for every interval and buffers the result. type integerReduceStringIterator struct { @@ -3769,9 +4183,29 @@ func (itr *integerStreamStringIterator) reduce() ([]StringPoint, error) { // integerStringExprIterator executes a function to modify an existing point // for every output of the input iterator. type integerStringExprIterator struct { - left *bufIntegerIterator - right *bufIntegerIterator - fn integerStringExprFunc + left *bufIntegerIterator + right *bufIntegerIterator + fn integerStringExprFunc + points []IntegerPoint // must be size 2 + storePrev bool +} + +func newIntegerStringExprIterator(left, right IntegerIterator, opt IteratorOptions, fn func(a, b int64) string) *integerStringExprIterator { + var points []IntegerPoint + switch opt.Fill { + case NullFill, PreviousFill: + points = []IntegerPoint{{Nil: true}, {Nil: true}} + case NumberFill: + value := castToInteger(opt.FillValue) + points = []IntegerPoint{{Value: value}, {Value: value}} + } + return &integerStringExprIterator{ + left: newBufIntegerIterator(left), + right: newBufIntegerIterator(right), + points: points, + fn: fn, + storePrev: opt.Fill == PreviousFill, + } } func (itr *integerStringExprIterator) Stats() IteratorStats { @@ -3787,34 +4221,84 @@ func (itr *integerStringExprIterator) Close() error { } func (itr *integerStringExprIterator) Next() (*StringPoint, error) { - a, err := itr.left.Next() - if err != nil { - return nil, err - } - b, err := itr.right.Next() - if err != nil { - return nil, err - } else if a == nil && b == nil { - return nil, nil - } - return itr.fn(a, b), nil -} + for { + a, err := itr.left.Next() + if err != nil { + return nil, err + } + b, err := itr.right.Next() + if err != nil { + return nil, err + } -// integerStringExprFunc creates or modifies a point by combining two -// points. The point passed in may be modified and returned rather than -// allocating a new point if possible. One of the points may be nil, but at -// least one of the points will be non-nil. -type integerStringExprFunc func(a *IntegerPoint, b *IntegerPoint) *StringPoint + if a == nil && b == nil { + return nil, nil + } else if itr.points == nil && (a == nil || b == nil) { + return nil, nil + } -// integerReduceBooleanIterator executes a reducer for every interval and buffers the result. -type integerReduceBooleanIterator struct { - input *bufIntegerIterator - create func() (IntegerPointAggregator, BooleanPointEmitter) - opt IteratorOptions - points []BooleanPoint -} + if a != nil && b != nil { + if a.Time > b.Time { + itr.left.unread(a) + a = nil + } else if a.Time < b.Time { + itr.right.unread(b) + b = nil + } + } -// Stats returns stats from the input iterator. + if a == nil || a.Nil { + if itr.points == nil { + continue + } + p := *b + p.Value = itr.points[0].Value + p.Nil = itr.points[0].Nil + a = &p + } else if b == nil || b.Nil { + if itr.points == nil { + continue + } + p := *a + p.Value = itr.points[1].Value + p.Nil = itr.points[1].Nil + b = &p + } + + if itr.storePrev { + itr.points[0], itr.points[1] = *a, *b + } + + p := &StringPoint{ + Name: a.Name, + Tags: a.Tags, + Time: a.Time, + Nil: a.Nil || b.Nil, + Aggregated: a.Aggregated, + } + if !p.Nil { + p.Value = itr.fn(a.Value, b.Value) + } + return p, nil + + } +} + +// integerStringExprFunc creates or modifies a point by combining two +// points. The point passed in may be modified and returned rather than +// allocating a new point if possible. One of the points may be nil, but at +// least one of the points will be non-nil. +type integerStringExprFunc func(a, b int64) string + +// integerReduceBooleanIterator executes a reducer for every interval and buffers the result. +type integerReduceBooleanIterator struct { + input *bufIntegerIterator + create func() (IntegerPointAggregator, BooleanPointEmitter) + opt IteratorOptions + points []BooleanPoint +} + +// Stats returns stats from the input iterator. func (itr *integerReduceBooleanIterator) Stats() IteratorStats { return itr.input.Stats() } // Close closes the iterator and all child iterators. @@ -4007,9 +4491,29 @@ func (itr *integerStreamBooleanIterator) reduce() ([]BooleanPoint, error) { // integerBooleanExprIterator executes a function to modify an existing point // for every output of the input iterator. type integerBooleanExprIterator struct { - left *bufIntegerIterator - right *bufIntegerIterator - fn integerBooleanExprFunc + left *bufIntegerIterator + right *bufIntegerIterator + fn integerBooleanExprFunc + points []IntegerPoint // must be size 2 + storePrev bool +} + +func newIntegerBooleanExprIterator(left, right IntegerIterator, opt IteratorOptions, fn func(a, b int64) bool) *integerBooleanExprIterator { + var points []IntegerPoint + switch opt.Fill { + case NullFill, PreviousFill: + points = []IntegerPoint{{Nil: true}, {Nil: true}} + case NumberFill: + value := castToInteger(opt.FillValue) + points = []IntegerPoint{{Value: value}, {Value: value}} + } + return &integerBooleanExprIterator{ + left: newBufIntegerIterator(left), + right: newBufIntegerIterator(right), + points: points, + fn: fn, + storePrev: opt.Fill == PreviousFill, + } } func (itr *integerBooleanExprIterator) Stats() IteratorStats { @@ -4025,24 +4529,74 @@ func (itr *integerBooleanExprIterator) Close() error { } func (itr *integerBooleanExprIterator) Next() (*BooleanPoint, error) { - a, err := itr.left.Next() - if err != nil { - return nil, err - } - b, err := itr.right.Next() - if err != nil { - return nil, err - } else if a == nil && b == nil { - return nil, nil + for { + a, err := itr.left.Next() + if err != nil { + return nil, err + } + b, err := itr.right.Next() + if err != nil { + return nil, err + } + + if a == nil && b == nil { + return nil, nil + } else if itr.points == nil && (a == nil || b == nil) { + return nil, nil + } + + if a != nil && b != nil { + if a.Time > b.Time { + itr.left.unread(a) + a = nil + } else if a.Time < b.Time { + itr.right.unread(b) + b = nil + } + } + + if a == nil || a.Nil { + if itr.points == nil { + continue + } + p := *b + p.Value = itr.points[0].Value + p.Nil = itr.points[0].Nil + a = &p + } else if b == nil || b.Nil { + if itr.points == nil { + continue + } + p := *a + p.Value = itr.points[1].Value + p.Nil = itr.points[1].Nil + b = &p + } + + if itr.storePrev { + itr.points[0], itr.points[1] = *a, *b + } + + p := &BooleanPoint{ + Name: a.Name, + Tags: a.Tags, + Time: a.Time, + Nil: a.Nil || b.Nil, + Aggregated: a.Aggregated, + } + if !p.Nil { + p.Value = itr.fn(a.Value, b.Value) + } + return p, nil + } - return itr.fn(a, b), nil } // integerBooleanExprFunc creates or modifies a point by combining two // points. The point passed in may be modified and returned rather than // allocating a new point if possible. One of the points may be nil, but at // least one of the points will be non-nil. -type integerBooleanExprFunc func(a *IntegerPoint, b *IntegerPoint) *BooleanPoint +type integerBooleanExprFunc func(a, b int64) bool // integerTransformIterator executes a function to modify an existing point for every // output of the input iterator. @@ -5361,9 +5915,29 @@ func (itr *stringStreamFloatIterator) reduce() ([]FloatPoint, error) { // stringFloatExprIterator executes a function to modify an existing point // for every output of the input iterator. type stringFloatExprIterator struct { - left *bufStringIterator - right *bufStringIterator - fn stringFloatExprFunc + left *bufStringIterator + right *bufStringIterator + fn stringFloatExprFunc + points []StringPoint // must be size 2 + storePrev bool +} + +func newStringFloatExprIterator(left, right StringIterator, opt IteratorOptions, fn func(a, b string) float64) *stringFloatExprIterator { + var points []StringPoint + switch opt.Fill { + case NullFill, PreviousFill: + points = []StringPoint{{Nil: true}, {Nil: true}} + case NumberFill: + value := castToString(opt.FillValue) + points = []StringPoint{{Value: value}, {Value: value}} + } + return &stringFloatExprIterator{ + left: newBufStringIterator(left), + right: newBufStringIterator(right), + points: points, + fn: fn, + storePrev: opt.Fill == PreviousFill, + } } func (itr *stringFloatExprIterator) Stats() IteratorStats { @@ -5379,24 +5953,74 @@ func (itr *stringFloatExprIterator) Close() error { } func (itr *stringFloatExprIterator) Next() (*FloatPoint, error) { - a, err := itr.left.Next() - if err != nil { - return nil, err - } - b, err := itr.right.Next() - if err != nil { - return nil, err - } else if a == nil && b == nil { - return nil, nil + for { + a, err := itr.left.Next() + if err != nil { + return nil, err + } + b, err := itr.right.Next() + if err != nil { + return nil, err + } + + if a == nil && b == nil { + return nil, nil + } else if itr.points == nil && (a == nil || b == nil) { + return nil, nil + } + + if a != nil && b != nil { + if a.Time > b.Time { + itr.left.unread(a) + a = nil + } else if a.Time < b.Time { + itr.right.unread(b) + b = nil + } + } + + if a == nil || a.Nil { + if itr.points == nil { + continue + } + p := *b + p.Value = itr.points[0].Value + p.Nil = itr.points[0].Nil + a = &p + } else if b == nil || b.Nil { + if itr.points == nil { + continue + } + p := *a + p.Value = itr.points[1].Value + p.Nil = itr.points[1].Nil + b = &p + } + + if itr.storePrev { + itr.points[0], itr.points[1] = *a, *b + } + + p := &FloatPoint{ + Name: a.Name, + Tags: a.Tags, + Time: a.Time, + Nil: a.Nil || b.Nil, + Aggregated: a.Aggregated, + } + if !p.Nil { + p.Value = itr.fn(a.Value, b.Value) + } + return p, nil + } - return itr.fn(a, b), nil } // stringFloatExprFunc creates or modifies a point by combining two // points. The point passed in may be modified and returned rather than // allocating a new point if possible. One of the points may be nil, but at // least one of the points will be non-nil. -type stringFloatExprFunc func(a *StringPoint, b *StringPoint) *FloatPoint +type stringFloatExprFunc func(a, b string) float64 // stringReduceIntegerIterator executes a reducer for every interval and buffers the result. type stringReduceIntegerIterator struct { @@ -5599,9 +6223,29 @@ func (itr *stringStreamIntegerIterator) reduce() ([]IntegerPoint, error) { // stringIntegerExprIterator executes a function to modify an existing point // for every output of the input iterator. type stringIntegerExprIterator struct { - left *bufStringIterator - right *bufStringIterator - fn stringIntegerExprFunc + left *bufStringIterator + right *bufStringIterator + fn stringIntegerExprFunc + points []StringPoint // must be size 2 + storePrev bool +} + +func newStringIntegerExprIterator(left, right StringIterator, opt IteratorOptions, fn func(a, b string) int64) *stringIntegerExprIterator { + var points []StringPoint + switch opt.Fill { + case NullFill, PreviousFill: + points = []StringPoint{{Nil: true}, {Nil: true}} + case NumberFill: + value := castToString(opt.FillValue) + points = []StringPoint{{Value: value}, {Value: value}} + } + return &stringIntegerExprIterator{ + left: newBufStringIterator(left), + right: newBufStringIterator(right), + points: points, + fn: fn, + storePrev: opt.Fill == PreviousFill, + } } func (itr *stringIntegerExprIterator) Stats() IteratorStats { @@ -5617,24 +6261,74 @@ func (itr *stringIntegerExprIterator) Close() error { } func (itr *stringIntegerExprIterator) Next() (*IntegerPoint, error) { - a, err := itr.left.Next() - if err != nil { - return nil, err - } - b, err := itr.right.Next() - if err != nil { - return nil, err - } else if a == nil && b == nil { - return nil, nil + for { + a, err := itr.left.Next() + if err != nil { + return nil, err + } + b, err := itr.right.Next() + if err != nil { + return nil, err + } + + if a == nil && b == nil { + return nil, nil + } else if itr.points == nil && (a == nil || b == nil) { + return nil, nil + } + + if a != nil && b != nil { + if a.Time > b.Time { + itr.left.unread(a) + a = nil + } else if a.Time < b.Time { + itr.right.unread(b) + b = nil + } + } + + if a == nil || a.Nil { + if itr.points == nil { + continue + } + p := *b + p.Value = itr.points[0].Value + p.Nil = itr.points[0].Nil + a = &p + } else if b == nil || b.Nil { + if itr.points == nil { + continue + } + p := *a + p.Value = itr.points[1].Value + p.Nil = itr.points[1].Nil + b = &p + } + + if itr.storePrev { + itr.points[0], itr.points[1] = *a, *b + } + + p := &IntegerPoint{ + Name: a.Name, + Tags: a.Tags, + Time: a.Time, + Nil: a.Nil || b.Nil, + Aggregated: a.Aggregated, + } + if !p.Nil { + p.Value = itr.fn(a.Value, b.Value) + } + return p, nil + } - return itr.fn(a, b), nil } // stringIntegerExprFunc creates or modifies a point by combining two // points. The point passed in may be modified and returned rather than // allocating a new point if possible. One of the points may be nil, but at // least one of the points will be non-nil. -type stringIntegerExprFunc func(a *StringPoint, b *StringPoint) *IntegerPoint +type stringIntegerExprFunc func(a, b string) int64 // stringReduceStringIterator executes a reducer for every interval and buffers the result. type stringReduceStringIterator struct { @@ -5837,9 +6531,29 @@ func (itr *stringStreamStringIterator) reduce() ([]StringPoint, error) { // stringExprIterator executes a function to modify an existing point // for every output of the input iterator. type stringExprIterator struct { - left *bufStringIterator - right *bufStringIterator - fn stringExprFunc + left *bufStringIterator + right *bufStringIterator + fn stringExprFunc + points []StringPoint // must be size 2 + storePrev bool +} + +func newStringExprIterator(left, right StringIterator, opt IteratorOptions, fn func(a, b string) string) *stringExprIterator { + var points []StringPoint + switch opt.Fill { + case NullFill, PreviousFill: + points = []StringPoint{{Nil: true}, {Nil: true}} + case NumberFill: + value := castToString(opt.FillValue) + points = []StringPoint{{Value: value}, {Value: value}} + } + return &stringExprIterator{ + left: newBufStringIterator(left), + right: newBufStringIterator(right), + points: points, + fn: fn, + storePrev: opt.Fill == PreviousFill, + } } func (itr *stringExprIterator) Stats() IteratorStats { @@ -5855,24 +6569,70 @@ func (itr *stringExprIterator) Close() error { } func (itr *stringExprIterator) Next() (*StringPoint, error) { - a, err := itr.left.Next() - if err != nil { - return nil, err - } - b, err := itr.right.Next() - if err != nil { - return nil, err - } else if a == nil && b == nil { - return nil, nil + for { + a, err := itr.left.Next() + if err != nil { + return nil, err + } + b, err := itr.right.Next() + if err != nil { + return nil, err + } + + if a == nil && b == nil { + return nil, nil + } else if itr.points == nil && (a == nil || b == nil) { + return nil, nil + } + + if a != nil && b != nil { + if a.Time > b.Time { + itr.left.unread(a) + a = nil + } else if a.Time < b.Time { + itr.right.unread(b) + b = nil + } + } + + if a == nil || a.Nil { + if itr.points == nil { + continue + } + p := *b + p.Value = itr.points[0].Value + p.Nil = itr.points[0].Nil + a = &p + } else if b == nil || b.Nil { + if itr.points == nil { + continue + } + p := *a + p.Value = itr.points[1].Value + p.Nil = itr.points[1].Nil + b = &p + } + + if itr.storePrev { + itr.points[0], itr.points[1] = *a, *b + } + + if a.Nil { + return a, nil + } else if b.Nil { + return b, nil + } + a.Value = itr.fn(a.Value, b.Value) + return a, nil + } - return itr.fn(a, b), nil } // stringExprFunc creates or modifies a point by combining two // points. The point passed in may be modified and returned rather than // allocating a new point if possible. One of the points may be nil, but at // least one of the points will be non-nil. -type stringExprFunc func(a *StringPoint, b *StringPoint) *StringPoint +type stringExprFunc func(a, b string) string // stringReduceBooleanIterator executes a reducer for every interval and buffers the result. type stringReduceBooleanIterator struct { @@ -6075,9 +6835,29 @@ func (itr *stringStreamBooleanIterator) reduce() ([]BooleanPoint, error) { // stringBooleanExprIterator executes a function to modify an existing point // for every output of the input iterator. type stringBooleanExprIterator struct { - left *bufStringIterator - right *bufStringIterator - fn stringBooleanExprFunc + left *bufStringIterator + right *bufStringIterator + fn stringBooleanExprFunc + points []StringPoint // must be size 2 + storePrev bool +} + +func newStringBooleanExprIterator(left, right StringIterator, opt IteratorOptions, fn func(a, b string) bool) *stringBooleanExprIterator { + var points []StringPoint + switch opt.Fill { + case NullFill, PreviousFill: + points = []StringPoint{{Nil: true}, {Nil: true}} + case NumberFill: + value := castToString(opt.FillValue) + points = []StringPoint{{Value: value}, {Value: value}} + } + return &stringBooleanExprIterator{ + left: newBufStringIterator(left), + right: newBufStringIterator(right), + points: points, + fn: fn, + storePrev: opt.Fill == PreviousFill, + } } func (itr *stringBooleanExprIterator) Stats() IteratorStats { @@ -6093,24 +6873,74 @@ func (itr *stringBooleanExprIterator) Close() error { } func (itr *stringBooleanExprIterator) Next() (*BooleanPoint, error) { - a, err := itr.left.Next() - if err != nil { - return nil, err - } - b, err := itr.right.Next() - if err != nil { - return nil, err - } else if a == nil && b == nil { - return nil, nil + for { + a, err := itr.left.Next() + if err != nil { + return nil, err + } + b, err := itr.right.Next() + if err != nil { + return nil, err + } + + if a == nil && b == nil { + return nil, nil + } else if itr.points == nil && (a == nil || b == nil) { + return nil, nil + } + + if a != nil && b != nil { + if a.Time > b.Time { + itr.left.unread(a) + a = nil + } else if a.Time < b.Time { + itr.right.unread(b) + b = nil + } + } + + if a == nil || a.Nil { + if itr.points == nil { + continue + } + p := *b + p.Value = itr.points[0].Value + p.Nil = itr.points[0].Nil + a = &p + } else if b == nil || b.Nil { + if itr.points == nil { + continue + } + p := *a + p.Value = itr.points[1].Value + p.Nil = itr.points[1].Nil + b = &p + } + + if itr.storePrev { + itr.points[0], itr.points[1] = *a, *b + } + + p := &BooleanPoint{ + Name: a.Name, + Tags: a.Tags, + Time: a.Time, + Nil: a.Nil || b.Nil, + Aggregated: a.Aggregated, + } + if !p.Nil { + p.Value = itr.fn(a.Value, b.Value) + } + return p, nil + } - return itr.fn(a, b), nil } // stringBooleanExprFunc creates or modifies a point by combining two // points. The point passed in may be modified and returned rather than // allocating a new point if possible. One of the points may be nil, but at // least one of the points will be non-nil. -type stringBooleanExprFunc func(a *StringPoint, b *StringPoint) *BooleanPoint +type stringBooleanExprFunc func(a, b string) bool // stringTransformIterator executes a function to modify an existing point for every // output of the input iterator. @@ -7429,9 +8259,29 @@ func (itr *booleanStreamFloatIterator) reduce() ([]FloatPoint, error) { // booleanFloatExprIterator executes a function to modify an existing point // for every output of the input iterator. type booleanFloatExprIterator struct { - left *bufBooleanIterator - right *bufBooleanIterator - fn booleanFloatExprFunc + left *bufBooleanIterator + right *bufBooleanIterator + fn booleanFloatExprFunc + points []BooleanPoint // must be size 2 + storePrev bool +} + +func newBooleanFloatExprIterator(left, right BooleanIterator, opt IteratorOptions, fn func(a, b bool) float64) *booleanFloatExprIterator { + var points []BooleanPoint + switch opt.Fill { + case NullFill, PreviousFill: + points = []BooleanPoint{{Nil: true}, {Nil: true}} + case NumberFill: + value := castToBoolean(opt.FillValue) + points = []BooleanPoint{{Value: value}, {Value: value}} + } + return &booleanFloatExprIterator{ + left: newBufBooleanIterator(left), + right: newBufBooleanIterator(right), + points: points, + fn: fn, + storePrev: opt.Fill == PreviousFill, + } } func (itr *booleanFloatExprIterator) Stats() IteratorStats { @@ -7447,24 +8297,74 @@ func (itr *booleanFloatExprIterator) Close() error { } func (itr *booleanFloatExprIterator) Next() (*FloatPoint, error) { - a, err := itr.left.Next() - if err != nil { - return nil, err - } - b, err := itr.right.Next() - if err != nil { - return nil, err - } else if a == nil && b == nil { - return nil, nil + for { + a, err := itr.left.Next() + if err != nil { + return nil, err + } + b, err := itr.right.Next() + if err != nil { + return nil, err + } + + if a == nil && b == nil { + return nil, nil + } else if itr.points == nil && (a == nil || b == nil) { + return nil, nil + } + + if a != nil && b != nil { + if a.Time > b.Time { + itr.left.unread(a) + a = nil + } else if a.Time < b.Time { + itr.right.unread(b) + b = nil + } + } + + if a == nil || a.Nil { + if itr.points == nil { + continue + } + p := *b + p.Value = itr.points[0].Value + p.Nil = itr.points[0].Nil + a = &p + } else if b == nil || b.Nil { + if itr.points == nil { + continue + } + p := *a + p.Value = itr.points[1].Value + p.Nil = itr.points[1].Nil + b = &p + } + + if itr.storePrev { + itr.points[0], itr.points[1] = *a, *b + } + + p := &FloatPoint{ + Name: a.Name, + Tags: a.Tags, + Time: a.Time, + Nil: a.Nil || b.Nil, + Aggregated: a.Aggregated, + } + if !p.Nil { + p.Value = itr.fn(a.Value, b.Value) + } + return p, nil + } - return itr.fn(a, b), nil } // booleanFloatExprFunc creates or modifies a point by combining two // points. The point passed in may be modified and returned rather than // allocating a new point if possible. One of the points may be nil, but at // least one of the points will be non-nil. -type booleanFloatExprFunc func(a *BooleanPoint, b *BooleanPoint) *FloatPoint +type booleanFloatExprFunc func(a, b bool) float64 // booleanReduceIntegerIterator executes a reducer for every interval and buffers the result. type booleanReduceIntegerIterator struct { @@ -7667,9 +8567,29 @@ func (itr *booleanStreamIntegerIterator) reduce() ([]IntegerPoint, error) { // booleanIntegerExprIterator executes a function to modify an existing point // for every output of the input iterator. type booleanIntegerExprIterator struct { - left *bufBooleanIterator - right *bufBooleanIterator - fn booleanIntegerExprFunc + left *bufBooleanIterator + right *bufBooleanIterator + fn booleanIntegerExprFunc + points []BooleanPoint // must be size 2 + storePrev bool +} + +func newBooleanIntegerExprIterator(left, right BooleanIterator, opt IteratorOptions, fn func(a, b bool) int64) *booleanIntegerExprIterator { + var points []BooleanPoint + switch opt.Fill { + case NullFill, PreviousFill: + points = []BooleanPoint{{Nil: true}, {Nil: true}} + case NumberFill: + value := castToBoolean(opt.FillValue) + points = []BooleanPoint{{Value: value}, {Value: value}} + } + return &booleanIntegerExprIterator{ + left: newBufBooleanIterator(left), + right: newBufBooleanIterator(right), + points: points, + fn: fn, + storePrev: opt.Fill == PreviousFill, + } } func (itr *booleanIntegerExprIterator) Stats() IteratorStats { @@ -7685,24 +8605,74 @@ func (itr *booleanIntegerExprIterator) Close() error { } func (itr *booleanIntegerExprIterator) Next() (*IntegerPoint, error) { - a, err := itr.left.Next() - if err != nil { - return nil, err - } - b, err := itr.right.Next() - if err != nil { - return nil, err - } else if a == nil && b == nil { - return nil, nil + for { + a, err := itr.left.Next() + if err != nil { + return nil, err + } + b, err := itr.right.Next() + if err != nil { + return nil, err + } + + if a == nil && b == nil { + return nil, nil + } else if itr.points == nil && (a == nil || b == nil) { + return nil, nil + } + + if a != nil && b != nil { + if a.Time > b.Time { + itr.left.unread(a) + a = nil + } else if a.Time < b.Time { + itr.right.unread(b) + b = nil + } + } + + if a == nil || a.Nil { + if itr.points == nil { + continue + } + p := *b + p.Value = itr.points[0].Value + p.Nil = itr.points[0].Nil + a = &p + } else if b == nil || b.Nil { + if itr.points == nil { + continue + } + p := *a + p.Value = itr.points[1].Value + p.Nil = itr.points[1].Nil + b = &p + } + + if itr.storePrev { + itr.points[0], itr.points[1] = *a, *b + } + + p := &IntegerPoint{ + Name: a.Name, + Tags: a.Tags, + Time: a.Time, + Nil: a.Nil || b.Nil, + Aggregated: a.Aggregated, + } + if !p.Nil { + p.Value = itr.fn(a.Value, b.Value) + } + return p, nil + } - return itr.fn(a, b), nil } // booleanIntegerExprFunc creates or modifies a point by combining two // points. The point passed in may be modified and returned rather than // allocating a new point if possible. One of the points may be nil, but at // least one of the points will be non-nil. -type booleanIntegerExprFunc func(a *BooleanPoint, b *BooleanPoint) *IntegerPoint +type booleanIntegerExprFunc func(a, b bool) int64 // booleanReduceStringIterator executes a reducer for every interval and buffers the result. type booleanReduceStringIterator struct { @@ -7905,9 +8875,29 @@ func (itr *booleanStreamStringIterator) reduce() ([]StringPoint, error) { // booleanStringExprIterator executes a function to modify an existing point // for every output of the input iterator. type booleanStringExprIterator struct { - left *bufBooleanIterator - right *bufBooleanIterator - fn booleanStringExprFunc + left *bufBooleanIterator + right *bufBooleanIterator + fn booleanStringExprFunc + points []BooleanPoint // must be size 2 + storePrev bool +} + +func newBooleanStringExprIterator(left, right BooleanIterator, opt IteratorOptions, fn func(a, b bool) string) *booleanStringExprIterator { + var points []BooleanPoint + switch opt.Fill { + case NullFill, PreviousFill: + points = []BooleanPoint{{Nil: true}, {Nil: true}} + case NumberFill: + value := castToBoolean(opt.FillValue) + points = []BooleanPoint{{Value: value}, {Value: value}} + } + return &booleanStringExprIterator{ + left: newBufBooleanIterator(left), + right: newBufBooleanIterator(right), + points: points, + fn: fn, + storePrev: opt.Fill == PreviousFill, + } } func (itr *booleanStringExprIterator) Stats() IteratorStats { @@ -7923,24 +8913,74 @@ func (itr *booleanStringExprIterator) Close() error { } func (itr *booleanStringExprIterator) Next() (*StringPoint, error) { - a, err := itr.left.Next() - if err != nil { - return nil, err - } - b, err := itr.right.Next() - if err != nil { - return nil, err - } else if a == nil && b == nil { - return nil, nil + for { + a, err := itr.left.Next() + if err != nil { + return nil, err + } + b, err := itr.right.Next() + if err != nil { + return nil, err + } + + if a == nil && b == nil { + return nil, nil + } else if itr.points == nil && (a == nil || b == nil) { + return nil, nil + } + + if a != nil && b != nil { + if a.Time > b.Time { + itr.left.unread(a) + a = nil + } else if a.Time < b.Time { + itr.right.unread(b) + b = nil + } + } + + if a == nil || a.Nil { + if itr.points == nil { + continue + } + p := *b + p.Value = itr.points[0].Value + p.Nil = itr.points[0].Nil + a = &p + } else if b == nil || b.Nil { + if itr.points == nil { + continue + } + p := *a + p.Value = itr.points[1].Value + p.Nil = itr.points[1].Nil + b = &p + } + + if itr.storePrev { + itr.points[0], itr.points[1] = *a, *b + } + + p := &StringPoint{ + Name: a.Name, + Tags: a.Tags, + Time: a.Time, + Nil: a.Nil || b.Nil, + Aggregated: a.Aggregated, + } + if !p.Nil { + p.Value = itr.fn(a.Value, b.Value) + } + return p, nil + } - return itr.fn(a, b), nil } // booleanStringExprFunc creates or modifies a point by combining two // points. The point passed in may be modified and returned rather than // allocating a new point if possible. One of the points may be nil, but at // least one of the points will be non-nil. -type booleanStringExprFunc func(a *BooleanPoint, b *BooleanPoint) *StringPoint +type booleanStringExprFunc func(a, b bool) string // booleanReduceBooleanIterator executes a reducer for every interval and buffers the result. type booleanReduceBooleanIterator struct { @@ -8143,9 +9183,29 @@ func (itr *booleanStreamBooleanIterator) reduce() ([]BooleanPoint, error) { // booleanExprIterator executes a function to modify an existing point // for every output of the input iterator. type booleanExprIterator struct { - left *bufBooleanIterator - right *bufBooleanIterator - fn booleanExprFunc + left *bufBooleanIterator + right *bufBooleanIterator + fn booleanExprFunc + points []BooleanPoint // must be size 2 + storePrev bool +} + +func newBooleanExprIterator(left, right BooleanIterator, opt IteratorOptions, fn func(a, b bool) bool) *booleanExprIterator { + var points []BooleanPoint + switch opt.Fill { + case NullFill, PreviousFill: + points = []BooleanPoint{{Nil: true}, {Nil: true}} + case NumberFill: + value := castToBoolean(opt.FillValue) + points = []BooleanPoint{{Value: value}, {Value: value}} + } + return &booleanExprIterator{ + left: newBufBooleanIterator(left), + right: newBufBooleanIterator(right), + points: points, + fn: fn, + storePrev: opt.Fill == PreviousFill, + } } func (itr *booleanExprIterator) Stats() IteratorStats { @@ -8161,24 +9221,70 @@ func (itr *booleanExprIterator) Close() error { } func (itr *booleanExprIterator) Next() (*BooleanPoint, error) { - a, err := itr.left.Next() - if err != nil { - return nil, err - } - b, err := itr.right.Next() - if err != nil { - return nil, err - } else if a == nil && b == nil { - return nil, nil + for { + a, err := itr.left.Next() + if err != nil { + return nil, err + } + b, err := itr.right.Next() + if err != nil { + return nil, err + } + + if a == nil && b == nil { + return nil, nil + } else if itr.points == nil && (a == nil || b == nil) { + return nil, nil + } + + if a != nil && b != nil { + if a.Time > b.Time { + itr.left.unread(a) + a = nil + } else if a.Time < b.Time { + itr.right.unread(b) + b = nil + } + } + + if a == nil || a.Nil { + if itr.points == nil { + continue + } + p := *b + p.Value = itr.points[0].Value + p.Nil = itr.points[0].Nil + a = &p + } else if b == nil || b.Nil { + if itr.points == nil { + continue + } + p := *a + p.Value = itr.points[1].Value + p.Nil = itr.points[1].Nil + b = &p + } + + if itr.storePrev { + itr.points[0], itr.points[1] = *a, *b + } + + if a.Nil { + return a, nil + } else if b.Nil { + return b, nil + } + a.Value = itr.fn(a.Value, b.Value) + return a, nil + } - return itr.fn(a, b), nil } // booleanExprFunc creates or modifies a point by combining two // points. The point passed in may be modified and returned rather than // allocating a new point if possible. One of the points may be nil, but at // least one of the points will be non-nil. -type booleanExprFunc func(a *BooleanPoint, b *BooleanPoint) *BooleanPoint +type booleanExprFunc func(a, b bool) bool // booleanTransformIterator executes a function to modify an existing point for every // output of the input iterator. diff --git a/influxql/iterator.gen.go.tmpl b/influxql/iterator.gen.go.tmpl index 16b84d5e5c5..09dcbb58317 100644 --- a/influxql/iterator.gen.go.tmpl +++ b/influxql/iterator.gen.go.tmpl @@ -1213,9 +1213,29 @@ func (itr *{{$k.name}}Stream{{$v.Name}}Iterator) reduce() ([]{{$v.Name}}Point, e // {{$k.name}}{{if ne $k.Name $v.Name}}{{$v.Name}}{{end}}ExprIterator executes a function to modify an existing point // for every output of the input iterator. type {{$k.name}}{{if ne $k.Name $v.Name}}{{$v.Name}}{{end}}ExprIterator struct { - left *buf{{$k.Name}}Iterator - right *buf{{$k.Name}}Iterator - fn {{$k.name}}{{if ne $k.Name $v.Name}}{{$v.Name}}{{end}}ExprFunc + left *buf{{$k.Name}}Iterator + right *buf{{$k.Name}}Iterator + fn {{$k.name}}{{if ne $k.Name $v.Name}}{{$v.Name}}{{end}}ExprFunc + points []{{$k.Name}}Point // must be size 2 + storePrev bool +} + +func new{{$k.Name}}{{if ne $k.Name $v.Name}}{{$v.Name}}{{end}}ExprIterator(left, right {{$k.Name}}Iterator, opt IteratorOptions, fn func(a, b {{$k.Type}}) {{$v.Type}}) *{{$k.name}}{{if ne $k.Name $v.Name}}{{$v.Name}}{{end}}ExprIterator { + var points []{{$k.Name}}Point + switch opt.Fill { + case NullFill, PreviousFill: + points = []{{$k.Name}}Point{ {Nil: true}, {Nil: true} } + case NumberFill: + value := castTo{{$k.Name}}(opt.FillValue) + points = []{{$k.Name}}Point{ {Value: value}, {Value: value} } + } + return &{{$k.name}}{{if ne $k.Name $v.Name}}{{$v.Name}}{{end}}ExprIterator{ + left: newBuf{{$k.Name}}Iterator(left), + right: newBuf{{$k.Name}}Iterator(right), + points: points, + fn: fn, + storePrev: opt.Fill == PreviousFill, + } } func (itr *{{$k.name}}{{if ne $k.Name $v.Name}}{{$v.Name}}{{end}}ExprIterator) Stats() IteratorStats { @@ -1231,24 +1251,83 @@ func (itr *{{$k.name}}{{if ne $k.Name $v.Name}}{{$v.Name}}{{end}}ExprIterator) C } func (itr *{{$k.name}}{{if ne $k.Name $v.Name}}{{$v.Name}}{{end}}ExprIterator) Next() (*{{$v.Name}}Point, error) { - a, err := itr.left.Next() - if err != nil { - return nil, err - } - b, err := itr.right.Next() - if err != nil { - return nil, err - } else if a == nil && b == nil { - return nil, nil + for { + a, err := itr.left.Next() + if err != nil { + return nil, err + } + b, err := itr.right.Next() + if err != nil { + return nil, err + } + + if a == nil && b == nil { + return nil, nil + } else if itr.points == nil && (a == nil || b == nil ) { + return nil, nil + } + + if a != nil && b != nil { + if a.Time > b.Time { + itr.left.unread(a) + a = nil + } else if a.Time < b.Time { + itr.right.unread(b) + b = nil + } + } + + if a == nil || a.Nil { + if itr.points == nil { + continue + } + p := *b + p.Value = itr.points[0].Value + p.Nil = itr.points[0].Nil + a = &p + } else if b == nil || b.Nil { + if itr.points == nil { + continue + } + p := *a + p.Value = itr.points[1].Value + p.Nil = itr.points[1].Nil + b = &p + } + + if itr.storePrev { + itr.points[0], itr.points[1] = *a, *b + } + +{{if eq $k.Name $v.Name}} + if a.Nil { + return a, nil + } else if b.Nil { + return b, nil + } + a.Value = itr.fn(a.Value, b.Value) + return a, nil +{{else}} + p := &{{$v.Name}}Point{ + Name: a.Name, + Tags: a.Tags, + Time: a.Time, + Nil: a.Nil || b.Nil, + Aggregated: a.Aggregated, + } + if !p.Nil { + p.Value = itr.fn(a.Value, b.Value) + } + return p, nil +{{end}} } - return itr.fn(a, b), nil } // {{$k.name}}{{if ne $k.Name $v.Name}}{{$v.Name}}{{end}}ExprFunc creates or modifies a point by combining two // points. The point passed in may be modified and returned rather than // allocating a new point if possible. One of the points may be nil, but at // least one of the points will be non-nil. -type {{$k.name}}{{if ne $k.Name $v.Name}}{{$v.Name}}{{end}}ExprFunc func(a *{{$k.Name}}Point, b *{{$k.Name}}Point) *{{$v.Name}}Point +type {{$k.name}}{{if ne $k.Name $v.Name}}{{$v.Name}}{{end}}ExprFunc func(a, b {{$k.Type}}) {{$v.Type}} {{end}} // {{$k.name}}TransformIterator executes a function to modify an existing point for every diff --git a/influxql/select.go b/influxql/select.go index b7675f29b12..97c9d3f549f 100644 --- a/influxql/select.go +++ b/influxql/select.go @@ -879,30 +879,7 @@ func buildTransformIterator(lhs Iterator, rhs Iterator, op Token, ic IteratorCre default: return nil, fmt.Errorf("type mismatch on RHS, unable to use %T as a FloatIterator", rhs) } - return &floatExprIterator{ - left: newBufFloatIterator(left), - right: newBufFloatIterator(right), - fn: func(a *FloatPoint, b *FloatPoint) *FloatPoint { - if a != nil && b != nil { - if !a.Nil && !b.Nil { - a.Value = fn(a.Value, b.Value) - return a - } else if a.Nil { - return a - } else { - return b - } - } else if a != nil { - a.Value = float64(0) - a.Nil = true - return a - } else { - b.Value = float64(0) - b.Nil = true - return b - } - }, - }, nil + return newFloatExprIterator(left, right, opt, fn), nil case func(int64, int64) float64: left, ok := lhs.(IntegerIterator) if !ok { @@ -912,44 +889,7 @@ func buildTransformIterator(lhs Iterator, rhs Iterator, op Token, ic IteratorCre if !ok { return nil, fmt.Errorf("type mismatch on RHS, unable to use %T as a IntegerIterator", rhs) } - return &integerFloatExprIterator{ - left: newBufIntegerIterator(left), - right: newBufIntegerIterator(right), - fn: func(a *IntegerPoint, b *IntegerPoint) *FloatPoint { - if a == nil && b == nil { - return nil - } else if a == nil { - return &FloatPoint{ - Name: b.Name, - Tags: b.Tags, - Time: b.Time, - Aux: b.Aux, - Nil: true, - } - } else if b == nil { - return &FloatPoint{ - Name: a.Name, - Tags: a.Tags, - Time: a.Time, - Aux: a.Aux, - Nil: true, - } - } - - p := &FloatPoint{ - Name: a.Name, - Tags: a.Tags, - Time: a.Time, - Aux: a.Aux, - } - if !a.Nil && !b.Nil { - p.Value = fn(a.Value, b.Value) - } else { - p.Nil = true - } - return p - }, - }, nil + return newIntegerFloatExprIterator(left, right, opt, fn), nil case func(int64, int64) int64: left, ok := lhs.(IntegerIterator) if !ok { @@ -959,30 +899,7 @@ func buildTransformIterator(lhs Iterator, rhs Iterator, op Token, ic IteratorCre if !ok { return nil, fmt.Errorf("type mismatch on RHS, unable to use %T as a IntegerIterator", rhs) } - return &integerExprIterator{ - left: newBufIntegerIterator(left), - right: newBufIntegerIterator(right), - fn: func(a *IntegerPoint, b *IntegerPoint) *IntegerPoint { - if a != nil && b != nil { - if !a.Nil && !b.Nil { - a.Value = fn(a.Value, b.Value) - return a - } else if a.Nil { - return a - } else { - return b - } - } else if a != nil { - a.Value = int64(0) - a.Nil = true - return a - } else { - b.Value = int64(0) - b.Nil = true - return b - } - }, - }, nil + return newIntegerExprIterator(left, right, opt, fn), nil case func(float64, float64) bool: var left FloatIterator switch lhs := lhs.(type) { @@ -1003,44 +920,7 @@ func buildTransformIterator(lhs Iterator, rhs Iterator, op Token, ic IteratorCre default: return nil, fmt.Errorf("type mismatch on RHS, unable to use %T as a FloatIterator", rhs) } - return &floatBooleanExprIterator{ - left: newBufFloatIterator(left), - right: newBufFloatIterator(right), - fn: func(a *FloatPoint, b *FloatPoint) *BooleanPoint { - if a == nil && b == nil { - return nil - } else if a == nil { - return &BooleanPoint{ - Name: b.Name, - Tags: b.Tags, - Time: b.Time, - Aux: b.Aux, - Nil: true, - } - } else if b == nil { - return &BooleanPoint{ - Name: a.Name, - Tags: a.Tags, - Time: a.Time, - Aux: a.Aux, - Nil: true, - } - } - - p := &BooleanPoint{ - Name: a.Name, - Tags: a.Tags, - Time: a.Time, - Aux: a.Aux, - } - if !a.Nil && !b.Nil { - p.Value = fn(a.Value, b.Value) - } else { - p.Nil = true - } - return p - }, - }, nil + return newFloatBooleanExprIterator(left, right, opt, fn), nil case func(int64, int64) bool: left, ok := lhs.(IntegerIterator) if !ok { @@ -1050,44 +930,7 @@ func buildTransformIterator(lhs Iterator, rhs Iterator, op Token, ic IteratorCre if !ok { return nil, fmt.Errorf("type mismatch on LHS, unable to use %T as a IntegerIterator", rhs) } - return &integerBooleanExprIterator{ - left: newBufIntegerIterator(left), - right: newBufIntegerIterator(right), - fn: func(a *IntegerPoint, b *IntegerPoint) *BooleanPoint { - if a == nil && b == nil { - return nil - } else if a == nil { - return &BooleanPoint{ - Name: b.Name, - Tags: b.Tags, - Time: b.Time, - Aux: b.Aux, - Nil: true, - } - } else if b == nil { - return &BooleanPoint{ - Name: a.Name, - Tags: a.Tags, - Time: a.Time, - Aux: a.Aux, - Nil: true, - } - } - - p := &BooleanPoint{ - Name: a.Name, - Tags: a.Tags, - Time: a.Time, - Aux: a.Aux, - } - if !a.Nil && !b.Nil { - p.Value = fn(a.Value, b.Value) - } else { - p.Nil = true - } - return p - }, - }, nil + return newIntegerBooleanExprIterator(left, right, opt, fn), nil } return nil, fmt.Errorf("unable to construct transform iterator from %T and %T", lhs, rhs) }