Skip to content

Commit

Permalink
feat: use new dataset api with streaming style (arana-db#196)
Browse files Browse the repository at this point in the history
* add license for dockerfile

* feat: use new dataset api with streaming style

* fix review

* fix review

Co-authored-by: AlexStocks <[email protected]>
  • Loading branch information
jjeffcaii and AlexStocks authored May 30, 2022
1 parent c761426 commit 8210501
Show file tree
Hide file tree
Showing 56 changed files with 3,295 additions and 2,569 deletions.
7 changes: 3 additions & 4 deletions conf/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,6 @@ data:
type: mysql
sql_max_limit: -1
tenant: arana
conn_props:
capacity: 10
max_capacity: 20
idle_timeout: 60
groups:
- name: employees_0000
nodes:
Expand All @@ -57,6 +53,9 @@ data:
labels:
zone: shanghai
conn_props:
capacity: 8
max_capacity: 64
idle_timeout: "30m"
readTimeout: "1s"
writeTimeout: "1s"
parseTime: true
Expand Down
86 changes: 70 additions & 16 deletions pkg/config/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,12 @@ package config
import (
"bytes"
"encoding/json"
"fmt"
"io"
"os"
"regexp"
"strconv"
"time"
)

import (
Expand Down Expand Up @@ -77,31 +79,24 @@ type (
Type DataSourceType `yaml:"type" json:"type"`
SqlMaxLimit int `default:"-1" yaml:"sql_max_limit" json:"sql_max_limit,omitempty"`
Tenant string `yaml:"tenant" json:"tenant"`
ConnProps *ConnProp `yaml:"conn_props" json:"conn_props,omitempty"`
Groups []*Group `yaml:"groups" json:"groups"`
}

ConnProp struct {
Capacity int `yaml:"capacity" json:"capacity,omitempty"` // connection pool capacity
MaxCapacity int `yaml:"max_capacity" json:"max_capacity,omitempty"` // max connection pool capacity
IdleTimeout int `yaml:"idle_timeout" json:"idle_timeout,omitempty"` // close backend direct connection after idle_timeout
}

Group struct {
Name string `yaml:"name" json:"name"`
Nodes []*Node `yaml:"nodes" json:"nodes"`
}

Node struct {
Name string `validate:"required" yaml:"name" json:"name"`
Host string `validate:"required" yaml:"host" json:"host"`
Port int `validate:"required" yaml:"port" json:"port"`
Username string `validate:"required" yaml:"username" json:"username"`
Password string `validate:"required" yaml:"password" json:"password"`
Database string `validate:"required" yaml:"database" json:"database"`
ConnProps map[string]string `yaml:"conn_props" json:"conn_props,omitempty"`
Weight string `default:"r10w10" yaml:"weight" json:"weight"`
Labels map[string]string `yaml:"labels" json:"labels,omitempty"`
Name string `validate:"required" yaml:"name" json:"name"`
Host string `validate:"required" yaml:"host" json:"host"`
Port int `validate:"required" yaml:"port" json:"port"`
Username string `validate:"required" yaml:"username" json:"username"`
Password string `validate:"required" yaml:"password" json:"password"`
Database string `validate:"required" yaml:"database" json:"database"`
ConnProps map[string]interface{} `yaml:"conn_props" json:"conn_props,omitempty"`
Weight string `default:"r10w10" yaml:"weight" json:"weight"`
Labels map[string]string `yaml:"labels" json:"labels,omitempty"`
}

ShardingRule struct {
Expand Down Expand Up @@ -230,3 +225,62 @@ func Validate(cfg *Configuration) error {
v := validator.New()
return v.Struct(cfg)
}

// GetConnPropCapacity parses the capacity of backend connection pool, return default value if failed.
func GetConnPropCapacity(connProps map[string]interface{}, defaultValue int) int {
capacity, ok := connProps["capacity"]
if !ok {
return defaultValue
}
n, _ := strconv.Atoi(fmt.Sprint(capacity))
if n < 1 {
return defaultValue
}
return n
}

// GetConnPropMaxCapacity parses the max capacity of backend connection pool, return default value if failed.
func GetConnPropMaxCapacity(connProps map[string]interface{}, defaultValue int) int {
var (
maxCapacity interface{}
ok bool
)

if maxCapacity, ok = connProps["max_capacity"]; !ok {
if maxCapacity, ok = connProps["maxCapacity"]; !ok {
return defaultValue
}
}
n, _ := strconv.Atoi(fmt.Sprint(maxCapacity))
if n < 1 {
return defaultValue
}
return n
}

// GetConnPropIdleTime parses the idle time of backend connection pool, return default value if failed.
func GetConnPropIdleTime(connProps map[string]interface{}, defaultValue time.Duration) time.Duration {
var (
idleTime interface{}
ok bool
)

if idleTime, ok = connProps["idle_time"]; !ok {
if idleTime, ok = connProps["idleTime"]; !ok {
return defaultValue
}
}

s := fmt.Sprint(idleTime)
d, _ := time.ParseDuration(s)
if d > 0 {
return d
}

n, _ := strconv.Atoi(s)
if n < 1 {
return defaultValue
}

return time.Duration(n) * time.Second
}
4 changes: 0 additions & 4 deletions pkg/config/model_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,10 +64,6 @@ func TestDataSourceClustersConf(t *testing.T) {
assert.Equal(t, DBMySQL, dataSourceCluster.Type)
assert.Equal(t, -1, dataSourceCluster.SqlMaxLimit)
assert.Equal(t, "arana", dataSourceCluster.Tenant)
assert.NotNil(t, dataSourceCluster.ConnProps)
assert.Equal(t, 10, dataSourceCluster.ConnProps.Capacity)
assert.Equal(t, 20, dataSourceCluster.ConnProps.MaxCapacity)
assert.Equal(t, 60, dataSourceCluster.ConnProps.IdleTimeout)

assert.Equal(t, 1, len(dataSourceCluster.Groups))
group := dataSourceCluster.Groups[0]
Expand Down
63 changes: 63 additions & 0 deletions pkg/dataset/chain.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package dataset

import (
"github.com/arana-db/arana/pkg/proto"
)

type pipeOption []func(proto.Dataset) proto.Dataset

func Filter(predicate PredicateFunc) Option {
return func(option *pipeOption) {
*option = append(*option, func(prev proto.Dataset) proto.Dataset {
return FilterDataset{
Dataset: prev,
Predicate: predicate,
}
})
}
}

func Map(generateFields FieldsFunc, transform TransformFunc) Option {
return func(option *pipeOption) {
*option = append(*option, func(dataset proto.Dataset) proto.Dataset {
return &TransformDataset{
Dataset: dataset,
FieldsGetter: generateFields,
Transform: transform,
}
})
}
}

type Option func(*pipeOption)

func Pipe(root proto.Dataset, options ...Option) proto.Dataset {
var o pipeOption
for _, it := range options {
it(&o)
}

next := root
for _, it := range o {
next = it(next)
}

return next
}
33 changes: 33 additions & 0 deletions pkg/dataset/dataset.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package dataset

import (
"errors"
)

import (
perrors "github.com/pkg/errors"
)

var errLengthNotMatched = errors.New("dataset: the length of dest values doesn't match")

// IsLengthNotMatchedError returns true if target error is length-not-matched error.
func IsLengthNotMatchedError(err error) bool {
return perrors.Is(err, errLengthNotMatched)
}
52 changes: 52 additions & 0 deletions pkg/dataset/filter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package dataset

import (
"github.com/pkg/errors"
)

import (
"github.com/arana-db/arana/pkg/proto"
)

var _ proto.Dataset = (*FilterDataset)(nil)

type PredicateFunc func(proto.Row) bool

type FilterDataset struct {
proto.Dataset
Predicate PredicateFunc
}

func (f FilterDataset) Next() (proto.Row, error) {
if f.Predicate == nil {
return f.Dataset.Next()
}

row, err := f.Dataset.Next()
if err != nil {
return nil, errors.WithStack(err)
}

if !f.Predicate(row) {
return f.Next()
}

return row, nil
}
Loading

0 comments on commit 8210501

Please sign in to comment.