Skip to content

Commit

Permalink
feat: integrate config v2 and implement simple tx backbone (arana-db#90)
Browse files Browse the repository at this point in the history
* feat: integrate config v2 and implement simple tx backbone

* add ut
  • Loading branch information
jjeffcaii authored Mar 21, 2022
1 parent c6a2522 commit e1ecf8f
Show file tree
Hide file tree
Showing 38 changed files with 2,663 additions and 1,387 deletions.
68 changes: 24 additions & 44 deletions cmd/cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ package main

import (
"context"
"encoding/json"
"os"
"os/signal"
"syscall"
Expand All @@ -34,15 +33,13 @@ import (
)

import (
"github.com/arana-db/arana/pkg/config"
"github.com/arana-db/arana/pkg/boot"
"github.com/arana-db/arana/pkg/constants"
"github.com/arana-db/arana/pkg/executor"
"github.com/arana-db/arana/pkg/filters"
"github.com/arana-db/arana/pkg/mysql"
"github.com/arana-db/arana/pkg/proto"
"github.com/arana-db/arana/pkg/resource"
"github.com/arana-db/arana/pkg/server"
"github.com/arana-db/arana/third_party/pools"
"github.com/arana-db/arana/pkg/util/log"
)

var (
Expand All @@ -63,9 +60,19 @@ var (
Short: "start arana",

Run: func(cmd *cobra.Command, args []string) {
conf := config.Load(configPath)
provider := boot.NewFileProvider(configPath)
if err := boot.Boot(context.Background(), provider); err != nil {
log.Fatal("start failed: %v", err)
return
}

for _, filterConf := range conf.Filters {
filters, err := provider.ListFilters(context.Background())
if err != nil {
log.Fatal("start failed: %v", err)
return
}

for _, filterConf := range filters {
factory := filter.GetFilterFactory(filterConf.Name)
if factory == nil {
panic(errors.Errorf("there is no filter factory for filter: %s", filterConf.Name))
Expand All @@ -77,48 +84,21 @@ var (
filter.RegisterFilter(f.GetName(), f)
}

executors := make(map[string]proto.Executor)
for _, executorConf := range conf.Executors {
executor := executor.NewRedirectExecutor(executorConf)

for i := 0; i < len(executorConf.Filters); i++ {
filterName := executorConf.Filters[i]
f := filter.GetFilter(filterName)
if f != nil {
preFilter, ok := f.(proto.PreFilter)
if ok {
executor.AddPreFilter(preFilter)
}
postFilter, ok := f.(proto.PostFilter)
if ok {
executor.AddPostFilter(postFilter)
}
}
}
executors[executorConf.Name] = executor
}

resource.InitDataSourceManager(conf.DataSources, func(config json.RawMessage) pools.Factory {
collector, err := mysql.NewConnector(config)
if err != nil {
panic(err)
}
return collector.NewBackendConnection
})
propeller := server.NewServer()

for _, listenerConf := range conf.Listeners {
listenersConf, err := provider.ListListeners(context.Background())
if err != nil {
log.Fatal("start failed: %v", err)
return
}

for _, listenerConf := range listenersConf {
listener, err := mysql.NewListener(listenerConf)
if err != nil {
panic(err)
}
executor := executors[listenerConf.Executor]
if executor == nil {
panic(errors.Errorf("executor: %s is not exists for listener: %s:%d",
listenerConf.Executor,
listenerConf.SocketAddress.Address,
listenerConf.SocketAddress.Port))
log.Fatalf("create listener failed: %v", err)
return
}
executor := executor.NewRedirectExecutor()
listener.SetExecutor(executor)
propeller.AddListener(listener)
}
Expand Down
103 changes: 58 additions & 45 deletions docker/conf/config.yaml
Original file line number Diff line number Diff line change
@@ -1,48 +1,61 @@
listeners:
- protocol_type: mysql
socket_address:
address: 0.0.0.0
port: 13306
config:
users:
dksl: "123456"
kind: ConfigMap
apiVersion: "1.0"
metadata:
name: arana-config
data:
listeners:
- protocol_type: mysql
server_version: 5.7.0
executor: redirect
socket_address:
address: 0.0.0.0
port: 13306

tenants:
- name: arana
users:
- username: arana
password: "123456"
- username: dksl
password: "123456"

executors:
- name: redirect
mode: readwritesplitting
data_sources:
- master:
name: employees
slaves:
- name: employees1
weight: 50
- name: employees2
weight: 50
clusters:
- name: employees
type: mysql
sql_max_limit: -1
tenant: arana
conn_props:
capacity: 10
max_capacity: 20
idle_timeout: 60
groups:
- name: employees_0000
nodes:
- name: arana-node-1
host: arana-mysql
port: 3306
username: root
password: "123456"
database: employees
weight: r10w10
labels:
zone: shanghai
conn_props:
readTimeout: "1s"
writeTimeout: "1s"
parseTime: true
loc: Local
charset: utf8mb4,utf8

data_source_cluster:
- role: master
type: mysql
name: employees
capacity: 10
max_capacity: 20
idle_timeout: 60s
conf:
dsn: root:123456@tcp(arana-mysql:3306)/employees?timeout=11s&readTimeout=11s&writeTimeout=1s&parseTime=true&loc=Local&charset=utf8mb4,utf8
- role: slave
type: mysql
name: employees1
capacity: 10
max_capacity: 20
idle_timeout: 60s
conf:
dsn: root:123456@tcp(arana-mysql:3306)/employees?timeout=11s&readTimeout=1s&writeTimeout=11s&parseTime=true&loc=Local&charset=utf8mb4,utf8
- role: slave
type: mysql
name: employees2
capacity: 10
max_capacity: 20
idle_timeout: 60s
conf:
dsn: root:123456@tcp(arana-mysql:3306)/employees?timeout=11s&readTimeout=1s&writeTimeout=11s&parseTime=true&loc=Local&charset=utf8mb4,utf8
sharding_rule:
tables:
- name: employees.student
allow_full_scan: true
db_rules:
tbl_rules:
- column: uid
expr: modShard(32)
topology:
db_pattern: employees_0000
tbl_Pattern: student_${0000...0031}
attributes:
sqlMaxLimit: -1
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ go 1.16

require (
github.com/arana-db/parser v0.1.1
github.com/bwmarrin/snowflake v0.3.0
github.com/cespare/xxhash/v2 v2.1.2
github.com/dop251/goja v0.0.0-20220102113305-2298ace6d09d
github.com/dubbogo/gost v1.11.23-0.20220113102152-a2ef9b809a45
Expand All @@ -23,4 +24,5 @@ require (
go.uber.org/atomic v1.9.0
go.uber.org/zap v1.19.1
golang.org/x/net v0.0.0-20211105192438-b53810dc28af
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c
)
3 changes: 3 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,8 @@ github.com/bketelsen/crypt v0.0.3-0.20200106085610-5cbc8cc4026c/go.mod h1:MKsuJm
github.com/bketelsen/crypt v0.0.4/go.mod h1:aI6NrJ0pMGgvZKL1iVgXLnfIFJtfV+bKCoqOes/6LfM=
github.com/buger/jsonparser v0.0.0-20181115193947-bf1c66bbce23/go.mod h1:bbYlZJ7hK1yFx9hf58LP0zeX7UjIGs20ufpu3evjr+s=
github.com/buger/jsonparser v1.1.1/go.mod h1:6RYKKt7H4d4+iWqouImQ9R2FZql3VbhNgx27UK13J/0=
github.com/bwmarrin/snowflake v0.3.0 h1:xm67bEhkKh6ij1790JB83OujPR5CzNe8QuQqAgISZN0=
github.com/bwmarrin/snowflake v0.3.0/go.mod h1:NdZxfVWX+oR6y2K0o6qAYv6gIOP9rjG0/E9WsDpxqwE=
github.com/casbin/casbin/v2 v2.1.2/go.mod h1:YcPU1XXisHhLzuxH9coDNf2FbKpjGlbCg3n9yuLkIJQ=
github.com/cenkalti/backoff v2.2.1+incompatible/go.mod h1:90ReRw6GdpyfrHakVjL/QHaoyV4aDUVVkXQJJJ3NXXM=
github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU=
Expand Down Expand Up @@ -739,6 +741,7 @@ golang.org/x/sync v0.0.0-20200317015054-43a5402ce75a/go.mod h1:RxMgew5VJxzue5/jJ
golang.org/x/sync v0.0.0-20200625203802-6e8e738ad208/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20201207232520-09787c993a3a/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c h1:5KslGYwFpkhGh+Q16bwMP3cOontH8FOep7tGV86Y7SQ=
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sys v0.0.0-20180823144017-11551d06cbcc/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
Expand Down
134 changes: 134 additions & 0 deletions pkg/boot/boot.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
// Licensed to Apache Software Foundation (ASF) under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Apache Software Foundation (ASF) licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
//

package boot

import (
"context"
)

import (
"github.com/pkg/errors"
)

import (
"github.com/arana-db/arana/pkg/config"
"github.com/arana-db/arana/pkg/proto/rule"
"github.com/arana-db/arana/pkg/runtime"
"github.com/arana-db/arana/pkg/runtime/namespace"
"github.com/arana-db/arana/pkg/runtime/optimize"
"github.com/arana-db/arana/pkg/security"
"github.com/arana-db/arana/pkg/util/log"
)

func Boot(ctx context.Context, provider Discovery) error {
if err := provider.Init(ctx); err != nil {
return err
}

clusters, err := provider.ListClusters(ctx)
if err != nil {
return err
}

for _, cluster := range clusters {
var (
c *Cluster
ns *namespace.Namespace
)

if c, err = provider.GetCluster(ctx, cluster); err != nil {
continue
}
if ns, err = buildNamespace(ctx, provider, cluster); err != nil {
log.Errorf("build namespace %s failed: %v", cluster, err)
continue
}
if err = namespace.Register(ns); err != nil {
log.Errorf("register namespace %s failed: %v", cluster, err)
continue
}
log.Infof("register namespace %s successfully", cluster)
security.DefaultTenantManager().PutCluster(c.Tenant, cluster)
}

var tenants []string
if tenants, err = provider.ListTenants(ctx); err != nil {
return errors.Wrap(err, "no tenants found")
}

for _, tenant := range tenants {
var t *config.Tenant
if t, err = provider.GetTenant(ctx, tenant); err != nil {
log.Errorf("failed to get tenant %s: %v", tenant, err)
continue
}
for _, it := range t.Users {
security.DefaultTenantManager().PutUser(tenant, it)
}
}

return nil
}

func buildNamespace(ctx context.Context, provider Discovery, cluster string) (*namespace.Namespace, error) {
var (
groups []string
err error
)

if groups, err = provider.ListGroups(ctx, cluster); err != nil {
return nil, err
}

var initCmds []namespace.Command
for _, group := range groups {
var nodes []string
if nodes, err = provider.ListNodes(ctx, cluster, group); err != nil {
return nil, err
}
for _, it := range nodes {
var node *config.Node
if node, err = provider.GetNode(ctx, cluster, group, it); err != nil {
return nil, errors.WithStack(err)
}
initCmds = append(initCmds, namespace.UpsertDB(group, runtime.NewAtomDB(node)))
}
}

var tables []string
if tables, err = provider.ListTables(ctx, cluster); err != nil {
return nil, errors.WithStack(err)
}

var ru rule.Rule
for _, table := range tables {
var vt *rule.VTable
if vt, err = provider.GetTable(ctx, cluster, table); err != nil {
return nil, err
}
if vt == nil {
log.Warnf("no such table %s", table)
continue
}
ru.SetVTable(table, vt)
}
initCmds = append(initCmds, namespace.UpdateRule(&ru))

return namespace.New(cluster, optimize.GetOptimizer(), initCmds...), nil
}
Loading

0 comments on commit e1ecf8f

Please sign in to comment.