Skip to content

Commit

Permalink
feat: enhance advance aggregate query (arana-db#350)
Browse files Browse the repository at this point in the history
  • Loading branch information
jjeffcaii authored Aug 10, 2022
1 parent 4a7c72b commit dc102cc
Show file tree
Hide file tree
Showing 34 changed files with 1,604 additions and 107 deletions.
2 changes: 1 addition & 1 deletion pkg/config/api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ import (
func TestGetStoreOperate(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()
//mockStore := NewMockStoreOperate(ctrl)
// mockStore := NewMockStoreOperate(ctrl)
tests := []struct {
name string
want config.StoreOperate
Expand Down
6 changes: 4 additions & 2 deletions pkg/config/file/file_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,10 @@ import (
"github.com/arana-db/arana/testdata"
)

var FakeConfigPath = testdata.Path("fake_config.yaml")
var EmptyConfigPath = testdata.Path("fake_empty_config.yaml")
var (
FakeConfigPath = testdata.Path("fake_config.yaml")
EmptyConfigPath = testdata.Path("fake_empty_config.yaml")
)

var jsonConfig = `{
"kind":"ConfigMap",
Expand Down
8 changes: 4 additions & 4 deletions pkg/config/model_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,8 +77,8 @@ func TestDataSourceClustersConf(t *testing.T) {
assert.Equal(t, "123456", node.Password)
assert.Equal(t, "employees_0000", node.Database)
assert.Equal(t, "r10w10", node.Weight)
//assert.Len(t, node.Labels, 1)
//assert.NotNil(t, node.ConnProps)
// assert.Len(t, node.Labels, 1)
// assert.NotNil(t, node.ConnProps)
}

func TestShardingRuleConf(t *testing.T) {
Expand All @@ -102,8 +102,8 @@ func TestShardingRuleConf(t *testing.T) {

assert.Equal(t, "employees_${0000..0003}", table.Topology.DbPattern)
assert.Equal(t, "student_${0000..0031}", table.Topology.TblPattern)
//assert.Equal(t, "employee_0000", table.ShadowTopology.DbPattern)
//assert.Equal(t, "__test_student_${0000...0007}", table.ShadowTopology.TblPattern)
// assert.Equal(t, "employee_0000", table.ShadowTopology.DbPattern)
// assert.Equal(t, "__test_student_${0000...0007}", table.ShadowTopology.TblPattern)
assert.Len(t, table.Attributes, 2)
}

Expand Down
12 changes: 12 additions & 0 deletions pkg/dataset/chain.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,22 @@ package dataset

import (
"github.com/arana-db/arana/pkg/proto"
"github.com/arana-db/arana/pkg/reduce"
)

type pipeOption []func(proto.Dataset) proto.Dataset

func Reduce(reducers map[int]reduce.Reducer) Option {
return func(option *pipeOption) {
*option = append(*option, func(dataset proto.Dataset) proto.Dataset {
return &ReduceDataset{
Dataset: dataset,
Reducers: reducers,
}
})
}
}

func Filter(predicate PredicateFunc) Option {
return func(option *pipeOption) {
*option = append(*option, func(prev proto.Dataset) proto.Dataset {
Expand Down
132 changes: 132 additions & 0 deletions pkg/dataset/reduce.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package dataset

import (
"database/sql"
"io"
)

import (
gxbig "github.com/dubbogo/gost/math/big"
"github.com/pkg/errors"
)

import (
"github.com/arana-db/arana/pkg/mysql/rows"
"github.com/arana-db/arana/pkg/proto"
"github.com/arana-db/arana/pkg/reduce"
)

type ReduceDataset struct {
proto.Dataset
Reducers map[int]reduce.Reducer // field_index -> aggregator
prev []proto.Value
binary bool
eof bool
}

func (ad *ReduceDataset) Next() (proto.Row, error) {
if ad.eof {
return nil, io.EOF
}

nextRow, err := ad.Dataset.Next()
if errors.Is(err, io.EOF) {
if ad.prev == nil {
return nil, io.EOF
}

ad.eof = true
fields, _ := ad.Dataset.Fields()
if ad.binary {
return rows.NewBinaryVirtualRow(fields, ad.prev), nil
} else {
return rows.NewTextVirtualRow(fields, ad.prev), nil
}
}

if err != nil {
return nil, errors.WithStack(err)
}
fields, _ := ad.Fields()
values := make([]proto.Value, len(fields))
if err = nextRow.Scan(values); err != nil {
return nil, errors.WithStack(err)
}

if ad.prev == nil {
ad.prev = values
ad.binary = nextRow.IsBinary()
return ad.Next()
}

var (
prevValue, nextValue *gxbig.Decimal
result *gxbig.Decimal
)
for i := range values {
red, ok := ad.Reducers[i]
if !ok {
continue
}
var (
prev = ad.prev[i]
next = values[i]
)

if next == nil {
continue
}
if prev == nil {
ad.prev[i] = next
continue
}

if prevValue, err = toValue(fields[i], prev); err != nil {
return nil, errors.WithStack(err)
}
if nextValue, err = toValue(fields[i], next); err != nil {
return nil, errors.WithStack(err)
}
if result, err = red.Decimal(prevValue, nextValue); err != nil {
return nil, errors.WithStack(err)
}

ad.prev[i] = result
}

return ad.Next()
}

func toValue(field proto.Field, input interface{}) (*gxbig.Decimal, error) {
switch v := input.(type) {
case *gxbig.Decimal:
return v, nil
case string:
return gxbig.NewDecFromString(v)
case int64:
return gxbig.NewDecFromInt(v), nil
}

var s sql.NullString
if err := s.Scan(input); err != nil {
return nil, errors.WithStack(err)
}
return gxbig.NewDecFromString(s.String)
}
6 changes: 3 additions & 3 deletions pkg/mysql/utils_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,9 +155,9 @@ func TestParseBinaryDateTime(t *testing.T) {
want driver.Value
}{
{num: 0, data: []byte(""), want: time.Time{}},
{num: 4, data: []byte{byte(230), byte(7), byte(1), byte(1)}, want: time.Date(2022, 01, 01, 0, 0, 0, 0, time.Local)},
{num: 7, data: []byte{byte(230), byte(7), byte(1), byte(1), byte(1), byte(1), byte(1)}, want: time.Date(2022, 01, 01, 1, 1, 1, 0, time.Local)},
{num: 11, data: []byte{byte(230), byte(7), byte(1), byte(1), byte(1), byte(1), byte(1), byte(87), byte(4), byte(0), byte(0)}, want: time.Date(2022, 01, 01, 1, 1, 1, 1111000, time.Local)},
{num: 4, data: []byte{byte(230), byte(7), byte(1), byte(1)}, want: time.Date(2022, 0o1, 0o1, 0, 0, 0, 0, time.Local)},
{num: 7, data: []byte{byte(230), byte(7), byte(1), byte(1), byte(1), byte(1), byte(1)}, want: time.Date(2022, 0o1, 0o1, 1, 1, 1, 0, time.Local)},
{num: 11, data: []byte{byte(230), byte(7), byte(1), byte(1), byte(1), byte(1), byte(1), byte(87), byte(4), byte(0), byte(0)}, want: time.Date(2022, 0o1, 0o1, 1, 1, 1, 1111000, time.Local)},
}
for i := 0; i < len(table); i++ {
res, err := parseBinaryDateTime(table[i].num, table[i].data, time.Local)
Expand Down
60 changes: 60 additions & 0 deletions pkg/reduce/max.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package reduce

import (
"time"
)

import (
gxbig "github.com/dubbogo/gost/math/big"
)

var _ Reducer = (*maxReducer)(nil)

type maxReducer struct{}

func (m maxReducer) Int64(prev, next int64) (int64, error) {
if next > prev {
return next, nil
}
return prev, nil
}

func (m maxReducer) Float64(prev, next float64) (float64, error) {
if next > prev {
return next, nil
}
return prev, nil
}

func (m maxReducer) Decimal(prev, next *gxbig.Decimal) (*gxbig.Decimal, error) {
switch next.Compare(prev) {
case 1:
return next, nil
default:
return prev, nil
}
}

func (m maxReducer) Time(prev, next time.Time) (time.Time, error) {
if next.After(prev) {
return next, nil
}
return prev, nil
}
58 changes: 58 additions & 0 deletions pkg/reduce/min.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package reduce

import (
"time"
)

import (
gxbig "github.com/dubbogo/gost/math/big"
)

type minReducer struct{}

func (minReducer) Int64(prev, next int64) (int64, error) {
if next < prev {
return next, nil
}
return prev, nil
}

func (minReducer) Float64(prev, next float64) (float64, error) {
if next < prev {
return next, nil
}
return prev, nil
}

func (minReducer) Decimal(prev, next *gxbig.Decimal) (*gxbig.Decimal, error) {
switch next.Compare(prev) {
case -1:
return next, nil
default:
return prev, nil
}
}

func (minReducer) Time(prev, next time.Time) (time.Time, error) {
if next.Before(prev) {
return next, nil
}
return prev, nil
}
45 changes: 45 additions & 0 deletions pkg/reduce/reduce.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package reduce

import (
"time"
)

import (
gxbig "github.com/dubbogo/gost/math/big"
)

type Reducer interface {
Int64(prev, next int64) (int64, error)
Float64(prev, next float64) (float64, error)
Decimal(prev, next *gxbig.Decimal) (*gxbig.Decimal, error)
Time(prev, next time.Time) (time.Time, error)
}

func Max() Reducer {
return maxReducer{}
}

func Min() Reducer {
return minReducer{}
}

func Sum() Reducer {
return sumReducer{}
}
Loading

0 comments on commit dc102cc

Please sign in to comment.