From 5c1951b48749783c9e932859c937139d0a88cb55 Mon Sep 17 00:00:00 2001 From: liaochuntao Date: Sat, 9 Apr 2022 17:03:27 +0800 Subject: [PATCH] [ISSUE #62] Feat configcenter (#120) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * refactor: 配置中心模块重构 * feat: 添加etcd & file的配置存储实现 * feat: feat issue #62 * feat: feat issue #62 * fix: fix pr comment * fix: fix test fail * style: fix import style * fix: fix code style --- .gitignore | 2 +- cmd/cmd.go | 2 +- docker/conf/bootstrap.yaml | 7 + docker/docker-compose.yaml | 3 +- go.mod | 10 +- go.sum | 47 +- pkg/boot/discovery.go | 523 +++++++++++++++++- pkg/boot/discovery_over_etcd.go | 117 ---- pkg/boot/discovery_over_file.go | 448 --------------- ...ry_over_file_test.go => discovery_test.go} | 9 +- pkg/boot/options.go | 29 + pkg/boot/plugin.go | 26 + pkg/config/api.go | 107 ++++ pkg/config/config.go | 359 +++++------- pkg/config/etcd.go | 109 ---- pkg/config/etcd/etcd.go | 159 ++++++ pkg/config/etcd/etcd_test.go | 99 ++++ pkg/config/etcd_test.go | 172 ------ pkg/config/file/file.go | 107 ++++ pkg/config/model.go | 224 ++++++++ pkg/config/{config_test.go => model_test.go} | 12 +- pkg/util/file/file.go | 33 ++ testdata/fake_bootstrap.yaml | 7 + 23 files changed, 1524 insertions(+), 1087 deletions(-) create mode 100644 docker/conf/bootstrap.yaml delete mode 100644 pkg/boot/discovery_over_etcd.go delete mode 100644 pkg/boot/discovery_over_file.go rename pkg/boot/{discovery_over_file_test.go => discovery_test.go} (88%) create mode 100644 pkg/boot/options.go create mode 100644 pkg/boot/plugin.go create mode 100644 pkg/config/api.go delete mode 100644 pkg/config/etcd.go create mode 100644 pkg/config/etcd/etcd.go create mode 100644 pkg/config/etcd/etcd_test.go delete mode 100644 pkg/config/etcd_test.go create mode 100644 pkg/config/file/file.go create mode 100644 pkg/config/model.go rename pkg/config/{config_test.go => model_test.go} (93%) create mode 100644 pkg/util/file/file.go create mode 100644 testdata/fake_bootstrap.yaml diff --git a/.gitignore b/.gitignore index 4cc7af8e..c2e81686 100644 --- a/.gitignore +++ b/.gitignore @@ -14,7 +14,7 @@ /coverage.txt # Dependency directories (remove the comment below to include it) -# vendor/ +vendor/ .idea/ .tmp/ diff --git a/cmd/cmd.go b/cmd/cmd.go index 3a54a3ed..e5502f1b 100644 --- a/cmd/cmd.go +++ b/cmd/cmd.go @@ -58,7 +58,7 @@ var ( Short: "start arana", Run: func(cmd *cobra.Command, args []string) { - provider := boot.NewFileProvider(configPath) + provider := boot.NewProvider(configPath) if err := boot.Boot(context.Background(), provider); err != nil { log.Fatal("start failed: %v", err) return diff --git a/docker/conf/bootstrap.yaml b/docker/conf/bootstrap.yaml new file mode 100644 index 00000000..dbbbc445 --- /dev/null +++ b/docker/conf/bootstrap.yaml @@ -0,0 +1,7 @@ +config: + name: file + options: + path: ./config.yaml + # name: etcd + # options: + # endpoints: "http://localhost:2382" \ No newline at end of file diff --git a/docker/docker-compose.yaml b/docker/docker-compose.yaml index 6853fb29..fbbf6d05 100644 --- a/docker/docker-compose.yaml +++ b/docker/docker-compose.yaml @@ -22,13 +22,14 @@ services: arana: image: arana:latest container_name: arana - command: sh -c "./arana start -c config.yaml" + command: sh -c "./arana start -c bootstrap.yaml" networks: - local ports: - "13306:13306" volumes: - ./conf/config.yaml:/app/config.yaml + - ./conf/bootstrap.yaml:/app/bootstrap.yaml depends_on: - mysql networks: diff --git a/go.mod b/go.mod index 6d8d8347..f8e654b7 100644 --- a/go.mod +++ b/go.mod @@ -6,26 +6,34 @@ require ( github.com/arana-db/parser v0.2.1 github.com/bwmarrin/snowflake v0.3.0 github.com/cespare/xxhash/v2 v2.1.2 - github.com/creasty/defaults v1.5.2 // indirect github.com/dop251/goja v0.0.0-20220102113305-2298ace6d09d github.com/dubbogo/gost v1.11.23-0.20220113102152-a2ef9b809a45 github.com/ghodss/yaml v1.0.0 github.com/go-sql-driver/mysql v1.6.0 + github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect github.com/golang/mock v1.5.0 + github.com/google/go-cmp v0.5.6 // indirect github.com/hashicorp/golang-lru v0.5.4 github.com/kr/pretty v0.3.0 // indirect github.com/lestrrat-go/strftime v1.0.5 github.com/natefinch/lumberjack v2.0.0+incompatible github.com/olekukonko/tablewriter v0.0.5 github.com/pkg/errors v0.9.1 + github.com/prometheus/common v0.28.0 // indirect github.com/rogpeppe/go-internal v1.8.0 // indirect github.com/spf13/cobra v1.2.1 github.com/stretchr/testify v1.7.0 github.com/testcontainers/testcontainers-go v0.12.0 github.com/tidwall/gjson v1.14.0 + go.etcd.io/etcd/api/v3 v3.5.1 + go.etcd.io/etcd/client/v3 v3.5.0 go.etcd.io/etcd/server/v3 v3.5.0-alpha.0 go.uber.org/atomic v1.9.0 go.uber.org/zap v1.19.1 + golang.org/x/crypto v0.0.0-20210711020723-a769d52b0f97 // indirect golang.org/x/net v0.0.0-20211108170745-6635138e15ea golang.org/x/sync v0.0.0-20210220032951-036812b2e83c + google.golang.org/genproto v0.0.0-20211104193956-4c6863e31247 // indirect + google.golang.org/grpc v1.42.0 // indirect + gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b ) diff --git a/go.sum b/go.sum index d09ece3e..d682afab 100644 --- a/go.sum +++ b/go.sum @@ -154,6 +154,11 @@ github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDk github.com/cncf/udpa/go v0.0.0-20191209042840-269d4d468f6f/go.mod h1:M8M6+tZqaGXZJjfX53e64911xZQV5JYwmTeXPW+k8Sc= github.com/cncf/udpa/go v0.0.0-20200629203442-efcf912fb354/go.mod h1:WmhPx2Nbnhtbo57+VJT5O0JRkEi1Wbu0z5j0R8u5Hbk= github.com/cncf/udpa/go v0.0.0-20201120205902-5459f2c99403/go.mod h1:WmhPx2Nbnhtbo57+VJT5O0JRkEi1Wbu0z5j0R8u5Hbk= +github.com/cncf/udpa/go v0.0.0-20210930031921-04548b0d99d4/go.mod h1:6pvJx4me5XPnfI9Z40ddWsdw2W/uZgQLFXToKeRcDiI= +github.com/cncf/xds/go v0.0.0-20210312221358-fbca930ec8ed/go.mod h1:eXthEFrGJvWHgFFCl3hGmgk+/aYT6PnTQLykKQRLhEs= +github.com/cncf/xds/go v0.0.0-20210805033703-aa0b78936158/go.mod h1:eXthEFrGJvWHgFFCl3hGmgk+/aYT6PnTQLykKQRLhEs= +github.com/cncf/xds/go v0.0.0-20210922020428-25de7278fc84/go.mod h1:eXthEFrGJvWHgFFCl3hGmgk+/aYT6PnTQLykKQRLhEs= +github.com/cncf/xds/go v0.0.0-20211011173535-cb28da3451f1/go.mod h1:eXthEFrGJvWHgFFCl3hGmgk+/aYT6PnTQLykKQRLhEs= github.com/cockroachdb/datadriven v0.0.0-20190809214429-80d97fb3cbaa/go.mod h1:zn76sxSg3SzpJ0PPJaLDCu+Bu0Lg3sKTORVIj19EIF8= github.com/cockroachdb/datadriven v0.0.0-20200714090401-bf6692d28da5 h1:xD/lrqdvwsc+O2bjSSi3YqY73Ke3LAiSCx49aCesA0E= github.com/cockroachdb/datadriven v0.0.0-20200714090401-bf6692d28da5/go.mod h1:h6jFvWxBdQXxjopDMZyH2UVceIRfR84bdzbkoKrsWNo= @@ -253,8 +258,6 @@ github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ3 github.com/creack/pty v1.1.11 h1:07n33Z8lZxZ2qwegKbObQohDhXDQxiMMz1NOUGYlesw= github.com/creack/pty v1.1.11/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E= github.com/cyphar/filepath-securejoin v0.2.2/go.mod h1:FpkQEhXnPnOthhzymB7CGsFk2G9VLXONKD9G7QGMM+4= -github.com/creasty/defaults v1.5.2 h1:/VfB6uxpyp6h0fr7SPp7n8WJBoV8jfxQXPCnkVSjyls= -github.com/creasty/defaults v1.5.2/go.mod h1:FPZ+Y0WNrbqOVw+c6av63eyHUAl6pMHZwqLPvXUZGfY= github.com/cznic/mathutil v0.0.0-20181122101859-297441e03548/go.mod h1:e6NPNENfs9mPDVNRekM7lKScauxd5kXTr1Mfyig6TDM= github.com/cznic/sortutil v0.0.0-20181122101858-f5f958428db8/go.mod h1:q2w6Bg5jeox1B+QkJ6Wp/+Vn0G/bo3f1uY7Fn3vivIQ= github.com/cznic/strutil v0.0.0-20171016134553-529a34b1c186/go.mod h1:AHHPPPXTw0h6pVabbcbyGRK1DckRn7r/STdZEeIDzZc= @@ -312,6 +315,8 @@ github.com/envoyproxy/go-control-plane v0.9.4/go.mod h1:6rpuAdCZL397s3pYoYcLgu1m github.com/envoyproxy/go-control-plane v0.9.7/go.mod h1:cwu0lG7PUMfa9snN8LXBig5ynNVH9qI8YYLbd1fK2po= github.com/envoyproxy/go-control-plane v0.9.9-0.20201210154907-fd9021fe5dad/go.mod h1:cXg6YxExXjJnVBQHBLXeUAgxn2UodCpnH306RInaBQk= github.com/envoyproxy/go-control-plane v0.9.9-0.20210217033140-668b12f5399d/go.mod h1:cXg6YxExXjJnVBQHBLXeUAgxn2UodCpnH306RInaBQk= +github.com/envoyproxy/go-control-plane v0.9.9-0.20210512163311-63b5d3c536b0/go.mod h1:hliV/p42l8fGbc6Y9bQ70uLwIvmJyVE5k4iMKlh8wCQ= +github.com/envoyproxy/go-control-plane v0.9.10-0.20210907150352-cf90f659a021/go.mod h1:AFq3mo9L8Lqqiid3OhADV3RfLJnjiw63cSpi+fDTRC0= github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c= github.com/evanphx/json-patch v4.9.0+incompatible/go.mod h1:50XU6AFN0ol/bzJsmQLiYLvXMP4fmwYFNcr97nuDLSk= github.com/fastly/go-utils v0.0.0-20180712184237-d95a45783239/go.mod h1:Gdwt2ce0yfBxPvZrHkprdPPTTS3N5rwmLE8T22KBXlw= @@ -338,6 +343,7 @@ github.com/go-ini/ini v1.25.4/go.mod h1:ByCAeIL28uOIIG0E3PJtZPDL8WnHpFKFOtgjp+3I github.com/go-kit/kit v0.8.0/go.mod h1:xBxKIO96dXMWWy0MnWVtmwkA9/13aqxPnvrjFYMA2as= github.com/go-kit/kit v0.9.0/go.mod h1:xBxKIO96dXMWWy0MnWVtmwkA9/13aqxPnvrjFYMA2as= github.com/go-kit/kit v0.10.0/go.mod h1:xUsJbQ/Fp4kEt7AFgCuvyX4a71u8h9jB8tj/ORgOZ7o= +github.com/go-kit/log v0.1.0/go.mod h1:zbhenjAZHb184qTLMA9ZjW7ThYL0H2mk7Q6pNt4vbaY= github.com/go-logfmt/logfmt v0.3.0/go.mod h1:Qt1PoO58o5twSAckw1HlFXLmHsOX5/0LbT9GBnD5lWE= github.com/go-logfmt/logfmt v0.4.0/go.mod h1:3RMwSq7FuexP4Kalkev3ejPJsZTpXXBr9+V4qmtdjCk= github.com/go-logfmt/logfmt v0.5.0/go.mod h1:wCYkCAKZfumFQihp8CzCvQ3paCTfi41vtzG1KdI/P7A= @@ -381,8 +387,9 @@ github.com/golang/groupcache v0.0.0-20160516000752-02826c3e7903/go.mod h1:cIg4er github.com/golang/groupcache v0.0.0-20190129154638-5b532d6fd5ef/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc= github.com/golang/groupcache v0.0.0-20190702054246-869f871628b6/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc= github.com/golang/groupcache v0.0.0-20191227052852-215e87163ea7/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc= -github.com/golang/groupcache v0.0.0-20200121045136-8c9f03a8e57e h1:1r7pUrabqp18hOBcwBwiTsbnFeTZHV9eER/QT5JVZxY= github.com/golang/groupcache v0.0.0-20200121045136-8c9f03a8e57e/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc= +github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da h1:oI5xCqsCo564l8iNU+DwB5epxmsaqB+rhGL0m5jtYqE= +github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc= github.com/golang/mock v1.1.1/go.mod h1:oTYuIxOrZwtPieC+H1uAHpcLFnEyAGVDL/k47Jfbm0A= github.com/golang/mock v1.2.0/go.mod h1:oTYuIxOrZwtPieC+H1uAHpcLFnEyAGVDL/k47Jfbm0A= github.com/golang/mock v1.3.1/go.mod h1:sBzyDLLjw3U8JLTeZvSv8jJB+tU5PVekmnlKIyFUx0Y= @@ -424,8 +431,9 @@ github.com/google/go-cmp v0.5.1/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/ github.com/google/go-cmp v0.5.2/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.5.3/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.5.4/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= -github.com/google/go-cmp v0.5.5 h1:Khx7svrCpmxxtHBq5j2mp/xVjsi8hQMfNLvJFAlrGgU= github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= +github.com/google/go-cmp v0.5.6 h1:BKbKCqvP6I+rmFHt06ZmyQtvB8xAkWdhFyr0ZUNZcxQ= +github.com/google/go-cmp v0.5.6/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= github.com/google/gofuzz v1.1.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= github.com/google/martian v2.1.0+incompatible/go.mod h1:9I4somxYTbIHy5NJKHRl3wXiIaQGbYVAs8BPL6v8lEs= @@ -721,8 +729,9 @@ github.com/prometheus/client_golang v1.1.0/go.mod h1:I1FGZT9+L76gKKOs5djB6ezCbFQ github.com/prometheus/client_golang v1.3.0/go.mod h1:hJaj2vgQTGQmVCsAACORcieXFeDPbaTKGT+JTgUa3og= github.com/prometheus/client_golang v1.5.1/go.mod h1:e9GMxYsXl05ICDXkRhurwBS4Q3OK1iX/F2sw+iXX5zU= github.com/prometheus/client_golang v1.7.1/go.mod h1:PY5Wy2awLA44sXw4AOSfFBetzPP4j5+D6mVACh+pe2M= -github.com/prometheus/client_golang v1.9.0 h1:Rrch9mh17XcxvEu9D9DEpb4isxjGBtcevQjKvxPRQIU= github.com/prometheus/client_golang v1.9.0/go.mod h1:FqZLKOZnGdFAhOK4nqGHa7D66IdsO+O441Eve7ptJDU= +github.com/prometheus/client_golang v1.11.0 h1:HNkLOAEQMIDv/K+04rukrLx6ch7msSRwf3/SASFAGtQ= +github.com/prometheus/client_golang v1.11.0/go.mod h1:Z6t4BnS23TR94PD6BsDNk8yVqroYurpAkEiz0P2BEV0= github.com/prometheus/client_model v0.0.0-20171117100541-99fa1f4be8e5/go.mod h1:MbSGuTsp3dbXC40dX6PRTWyKYBIrTGTE9sqQNg2J8bo= github.com/prometheus/client_model v0.0.0-20180712105110-5c3871d89910/go.mod h1:MbSGuTsp3dbXC40dX6PRTWyKYBIrTGTE9sqQNg2J8bo= github.com/prometheus/client_model v0.0.0-20190115171406-56726106282f/go.mod h1:MbSGuTsp3dbXC40dX6PRTWyKYBIrTGTE9sqQNg2J8bo= @@ -740,8 +749,10 @@ github.com/prometheus/common v0.6.0/go.mod h1:eBmuwkDJBwy6iBfxCBob6t6dR6ENT/y+J+ github.com/prometheus/common v0.7.0/go.mod h1:DjGbpBbp5NYNiECxcL/VnbXCCaQpKd3tt26CguLLsqA= github.com/prometheus/common v0.9.1/go.mod h1:yhUN8i9wzaXS3w1O07YhxHEBxD+W35wd8bs7vj7HSQ4= github.com/prometheus/common v0.10.0/go.mod h1:Tlit/dnDKsSWFlCLTWaA1cyBgKHSMdTB80sz/V91rCo= -github.com/prometheus/common v0.15.0 h1:4fgOnadei3EZvgRwxJ7RMpG1k1pOZth5Pc13tyspaKM= github.com/prometheus/common v0.15.0/go.mod h1:U+gB1OBLb1lF3O42bTCL+FK18tX9Oar16Clt/msog/s= +github.com/prometheus/common v0.26.0/go.mod h1:M7rCNAaPfAosfx8veZJCuw84e35h3Cfd9VFqTh1DIvc= +github.com/prometheus/common v0.28.0 h1:vGVfV9KrDTvWt5boZO0I19g2E3CsWfpPPKZM9dt3mEw= +github.com/prometheus/common v0.28.0/go.mod h1:vu+V0TpY+O6vW9J44gczi3Ap/oXXR10b+M/gUGO4Hls= github.com/prometheus/procfs v0.0.0-20180125133057-cb4147076ac7/go.mod h1:c3At6R/oaqEKCNdg8wHV1ftS6bRYblBhIjjI8uT2IGk= github.com/prometheus/procfs v0.0.0-20181005140218-185b4288413d/go.mod h1:c3At6R/oaqEKCNdg8wHV1ftS6bRYblBhIjjI8uT2IGk= github.com/prometheus/procfs v0.0.0-20190117184657-bf6a532e95b1/go.mod h1:c3At6R/oaqEKCNdg8wHV1ftS6bRYblBhIjjI8uT2IGk= @@ -878,15 +889,17 @@ go.etcd.io/etcd v0.0.0-20191023171146-3cf2f69b5738/go.mod h1:dnLIgRNXwCJa5e+c6mI go.etcd.io/etcd v0.5.0-alpha.5.0.20200910180754-dd1b699fc489 h1:1JFLBqwIgdyHN1ZtgjTBwO+blA6gVOmZurpiMEsETKo= go.etcd.io/etcd v0.5.0-alpha.5.0.20200910180754-dd1b699fc489/go.mod h1:yVHk9ub3CSBatqGNg7GRmsnfLWtoW60w4eDYfh7vHDg= go.etcd.io/etcd/api/v3 v3.5.0-alpha.0/go.mod h1:mPcW6aZJukV6Aa81LSKpBjQXTWlXB5r74ymPoSWa3Sw= -go.etcd.io/etcd/api/v3 v3.5.0 h1:GsV3S+OfZEOCNXdtNkBSR7kgLobAa/SO6tCxRa0GAYw= go.etcd.io/etcd/api/v3 v3.5.0/go.mod h1:cbVKeC6lCfl7j/8jBhAK6aIYO9XOjdptoxU/nLQcPvs= +go.etcd.io/etcd/api/v3 v3.5.1 h1:v28cktvBq+7vGyJXF8G+rWJmj+1XUmMtqcLnH8hDocM= +go.etcd.io/etcd/api/v3 v3.5.1/go.mod h1:cbVKeC6lCfl7j/8jBhAK6aIYO9XOjdptoxU/nLQcPvs= go.etcd.io/etcd/client/pkg/v3 v3.5.0 h1:2aQv6F436YnN7I4VbI8PPYrBhu+SmrTaADcf8Mi/6PU= go.etcd.io/etcd/client/pkg/v3 v3.5.0/go.mod h1:IJHfcCEKxYu1Os13ZdwCwIUTUVGYTSAM3YSwc9/Ac1g= go.etcd.io/etcd/client/v2 v2.305.0-alpha.0/go.mod h1:kdV+xzCJ3luEBSIeQyB/OEKkWKd8Zkux4sbDeANrosU= go.etcd.io/etcd/client/v2 v2.305.0 h1:ftQ0nOOHMcbMS3KIaDQ0g5Qcd6bhaBrQT6b89DfwLTs= go.etcd.io/etcd/client/v2 v2.305.0/go.mod h1:h9puh54ZTgAKtEbut2oe9P4L/oqKCVB6xsXlzd7alYQ= -go.etcd.io/etcd/client/v3 v3.5.0-alpha.0 h1:dr1EOILak2pu4Nf5XbRIOCNIBjcz6UmkQd7hHRXwxaM= go.etcd.io/etcd/client/v3 v3.5.0-alpha.0/go.mod h1:wKt7jgDgf/OfKiYmCq5WFGxOFAkVMLxiiXgLDFhECr8= +go.etcd.io/etcd/client/v3 v3.5.0 h1:62Eh0XOro+rDwkrypAGDfgmNh5Joq+z+W9HZdlXMzek= +go.etcd.io/etcd/client/v3 v3.5.0/go.mod h1:AIKXXVX/DQXtfTEqBryiLTUXwON+GuvO6Z7lLS/oTh0= go.etcd.io/etcd/pkg/v3 v3.5.0-alpha.0 h1:3yLUEC0nFCxw/RArImOyRUI4OAFbg4PFpBbAhSNzKNY= go.etcd.io/etcd/pkg/v3 v3.5.0-alpha.0/go.mod h1:tV31atvwzcybuqejDoY3oaNRTtlD2l/Ot78Pc9w7DMY= go.etcd.io/etcd/raft/v3 v3.5.0-alpha.0 h1:DvYJotxV9q1Lkn7pknzAbFO/CLtCVidCr2K9qRLJ8pA= @@ -904,6 +917,7 @@ go.opencensus.io v0.22.4/go.mod h1:yxeiOL68Rb0Xd1ddK5vPZ/oVn4vY4Ynel7k9FzqtOIw= go.opencensus.io v0.22.5/go.mod h1:5pWMHQbX5EPX2/62yrJeAkowc+lfs/XD7Uxpq3pI6kk= go.opencensus.io v0.23.0 h1:gqCw0LfLxScz8irSi8exQc7fyQ0fKQU/qnC/X8+V/1M= go.opencensus.io v0.23.0/go.mod h1:XItmlyltB5F7CS4xOC1DcqMoFqwtC6OG2xF7mCv7P7E= +go.opentelemetry.io/proto/otlp v0.7.0/go.mod h1:PqfVotwruBrMGOCsRd/89rSnXhoiJIqeYNgFYFoEGnI= go.uber.org/atomic v1.3.2/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE= go.uber.org/atomic v1.4.0/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE= go.uber.org/atomic v1.5.0/go.mod h1:sABNBOSYdrvTF6hTgEIbc7YasKWGhgEQZyfxyTvoXHQ= @@ -942,8 +956,9 @@ golang.org/x/crypto v0.0.0-20190820162420-60c769a6c586/go.mod h1:yigFU9vqHzYiE8U golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= golang.org/x/crypto v0.0.0-20200728195943-123391ffb6de/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= -golang.org/x/crypto v0.0.0-20201002170205-7f63de1d35b0 h1:hb9wdF1z5waM+dSIICn1l0DkLVDT3hqhhQsDNUmHPRE= golang.org/x/crypto v0.0.0-20201002170205-7f63de1d35b0/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= +golang.org/x/crypto v0.0.0-20210711020723-a769d52b0f97 h1:/UOmuWzQfxxo9UtlXMwuQU8CMgg1eZXqTRwkSQJWKOI= +golang.org/x/crypto v0.0.0-20210711020723-a769d52b0f97/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc= golang.org/x/exp v0.0.0-20181106170214-d68db9428509/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= golang.org/x/exp v0.0.0-20190306152737-a1d7652674e8/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= @@ -1032,6 +1047,7 @@ golang.org/x/net v0.0.0-20210119194325-5f4716e94777/go.mod h1:m0MpNAwzfU5UDzcl9v golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg= golang.org/x/net v0.0.0-20210316092652-d523dce5a7f4/go.mod h1:RBQZq4jEuRlivfhVLdyRGr576XBO4/greRjx4P4O3yc= golang.org/x/net v0.0.0-20210405180319-a5a99cb37ef4/go.mod h1:p54w0d4576C0XHj96bSt6lcn1PtDYWL6XObtHCRCNQM= +golang.org/x/net v0.0.0-20210525063256-abc453219eb5/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= golang.org/x/net v0.0.0-20211108170745-6635138e15ea h1:FosBMXtOc8Tp9Hbo4ltl1WJSrTVewZU8MPnTPY2HdH8= golang.org/x/net v0.0.0-20211108170745-6635138e15ea/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= @@ -1046,6 +1062,7 @@ golang.org/x/oauth2 v0.0.0-20210218202405-ba52d332ba99/go.mod h1:KelEdhl1UZF7XfJ golang.org/x/oauth2 v0.0.0-20210220000619-9bb904979d93/go.mod h1:KelEdhl1UZF7XfJ4dDtk6s++YSgaE7mD/BuKKDLBl4A= golang.org/x/oauth2 v0.0.0-20210313182246-cd4f82c27b84/go.mod h1:KelEdhl1UZF7XfJ4dDtk6s++YSgaE7mD/BuKKDLBl4A= golang.org/x/oauth2 v0.0.0-20210402161424-2e8d93401602/go.mod h1:KelEdhl1UZF7XfJ4dDtk6s++YSgaE7mD/BuKKDLBl4A= +golang.org/x/oauth2 v0.0.0-20210514164344-f6687ab2804c/go.mod h1:KelEdhl1UZF7XfJ4dDtk6s++YSgaE7mD/BuKKDLBl4A= golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= @@ -1143,6 +1160,8 @@ golang.org/x/sys v0.0.0-20210403161142-5e06dd20ab57/go.mod h1:h1NjWce9XRLGQEsW7w golang.org/x/sys v0.0.0-20210423082822-04245dca01da/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210426230700-d19ff857e887/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210510120138-977fb7262007/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20210603081109-ebe580a85c40/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20211025201205-69cdffdb9359/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20211109184856-51b60fd695b3 h1:T6tyxxvHMj2L1R2kZg0uNMpS8ZhB9lRa9XRGTCSA65w= golang.org/x/sys v0.0.0-20211109184856-51b60fd695b3/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= @@ -1309,8 +1328,9 @@ google.golang.org/genproto v0.0.0-20210303154014-9728d6b83eeb/go.mod h1:FWY/as6D google.golang.org/genproto v0.0.0-20210310155132-4ce2db91004e/go.mod h1:FWY/as6DDZQgahTzZj3fqbO1CbirC29ZNUFHwi0/+no= google.golang.org/genproto v0.0.0-20210319143718-93e7006c17a6/go.mod h1:FWY/as6DDZQgahTzZj3fqbO1CbirC29ZNUFHwi0/+no= google.golang.org/genproto v0.0.0-20210402141018-6c239bbf2bb1/go.mod h1:9lPAdzaEmUacj36I+k7YKbEc5CXzPIeORRgDAUOu28A= -google.golang.org/genproto v0.0.0-20210602131652-f16073e35f0c h1:wtujag7C+4D6KMoulW9YauvK2lgdvCMS260jsqqBXr0= google.golang.org/genproto v0.0.0-20210602131652-f16073e35f0c/go.mod h1:UODoCrxHCcBojKKwX1terBiRUaqAsFqJiF615XL43r0= +google.golang.org/genproto v0.0.0-20211104193956-4c6863e31247 h1:ZONpjmFT5e+I/0/xE3XXbG5OIvX2hRYzol04MhKBl2E= +google.golang.org/genproto v0.0.0-20211104193956-4c6863e31247/go.mod h1:5CzLGKJ67TSI2B9POpiiyGha0AjJvZIUgRMt1dSmuhc= google.golang.org/grpc v0.0.0-20160317175043-d3ddb4469d5a/go.mod h1:yo6s7OP7yaDglbqo1J04qKzAhqBH6lvTonzMVmEdcZw= google.golang.org/grpc v1.17.0/go.mod h1:6QZJwpn2B+Zp71q/5VxRsJ6NXXVCE5NRUHRo+f3cWCs= google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c= @@ -1338,8 +1358,10 @@ google.golang.org/grpc v1.34.0/go.mod h1:WotjhfgOW/POjDeRt8vscBtXq+2VjORFy659qA5 google.golang.org/grpc v1.35.0/go.mod h1:qjiiYl8FncCW8feJPdyg3v6XW24KsRHe+dy9BAGRRjU= google.golang.org/grpc v1.36.0/go.mod h1:qjiiYl8FncCW8feJPdyg3v6XW24KsRHe+dy9BAGRRjU= google.golang.org/grpc v1.36.1/go.mod h1:qjiiYl8FncCW8feJPdyg3v6XW24KsRHe+dy9BAGRRjU= -google.golang.org/grpc v1.38.0 h1:/9BgsAsa5nWe26HqOlvlgJnqBuktYOLCgjCPqsa56W0= google.golang.org/grpc v1.38.0/go.mod h1:NREThFqKR1f3iQ6oBuvc5LadQuXVGo9rkm5ZGrQdJfM= +google.golang.org/grpc v1.40.0/go.mod h1:ogyxbiOoUXAkP+4+xa6PZSE9DZgIHtSpzjDTB9KAK34= +google.golang.org/grpc v1.42.0 h1:XT2/MFpuPFsEX2fWh3YQtHkZ+WYZFQRfaUgLZYj/p6A= +google.golang.org/grpc v1.42.0/go.mod h1:k+4IHHFw41K8+bbowsex27ge2rCb65oeWqe4jJ590SU= google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8= google.golang.org/protobuf v0.0.0-20200221191635-4d8936d0db64/go.mod h1:kwYJMbMJ01Woi6D6+Kah6886xMZcty6N08ah7+eCXa0= google.golang.org/protobuf v0.0.0-20200228230310-ab0ca4ff8a60/go.mod h1:cfTl7dwQJ+fmap5saPgwCLgHXTUD7jkjRqWcaiX5VyM= @@ -1420,12 +1442,10 @@ k8s.io/kube-openapi v0.0.0-20201113171705-d219536bb9fd/go.mod h1:WOJ3KddDSol4tAG k8s.io/kubernetes v1.13.0/go.mod h1:ocZa8+6APFNC2tX1DZASIbocyYT5jHzqFVsY5aoB7Jk= k8s.io/utils v0.0.0-20201110183641-67b214c5f920/go.mod h1:jPW/WVKK9YHAvNhRxK0md/EJ228hCsBRufyofKtW8HA= modernc.org/fileutil v1.0.0/go.mod h1:JHsWpkrk/CnVV1H/eGlFf85BEpfkrp56ro8nojIq9Q8= -modernc.org/fileutil v1.0.0/go.mod h1:JHsWpkrk/CnVV1H/eGlFf85BEpfkrp56ro8nojIq9Q8= modernc.org/golex v1.0.1/go.mod h1:QCA53QtsT1NdGkaZZkF5ezFwk4IXh4BGNafAARTC254= modernc.org/lex v1.0.0/go.mod h1:G6rxMTy3cH2iA0iXL/HRRv4Znu8MK4higxph/lE7ypk= modernc.org/lexer v1.0.0/go.mod h1:F/Dld0YKYdZCLQ7bD0USbWL4YKCyTDRDHiDTOs0q0vk= modernc.org/mathutil v1.0.0/go.mod h1:wU0vUrJsVWBZ4P6e7xtFJEhFSNsfRLJ8H458uRjg03k= -modernc.org/mathutil v1.0.0/go.mod h1:wU0vUrJsVWBZ4P6e7xtFJEhFSNsfRLJ8H458uRjg03k= modernc.org/mathutil v1.4.1/go.mod h1:mZW8CKdRPY1v87qxC/wUdX5O1qDzXMP5TH3wjfpga6E= modernc.org/parser v1.0.0/go.mod h1:H20AntYJ2cHHL6MHthJ8LZzXCdDCHMWt1KZXtIMjejA= modernc.org/parser v1.0.2/go.mod h1:TXNq3HABP3HMaqLK7brD1fLA/LfN0KS6JxZn71QdDqs= @@ -1433,7 +1453,6 @@ modernc.org/scanner v1.0.1/go.mod h1:OIzD2ZtjYk6yTuyqZr57FmifbM9fIH74SumloSsajuE modernc.org/sortutil v1.0.0/go.mod h1:1QO0q8IlIlmjBIwm6t/7sof874+xCfZouyqZMLIAtxM= modernc.org/strutil v1.0.0/go.mod h1:lstksw84oURvj9y3tn8lGvRxyRC1S2+g5uuIzNfIOBs= modernc.org/strutil v1.1.0/go.mod h1:lstksw84oURvj9y3tn8lGvRxyRC1S2+g5uuIzNfIOBs= -modernc.org/strutil v1.1.0/go.mod h1:lstksw84oURvj9y3tn8lGvRxyRC1S2+g5uuIzNfIOBs= modernc.org/y v1.0.1/go.mod h1:Ho86I+LVHEI+LYXoUKlmOMAM1JTXOCfj8qi1T8PsClE= rsc.io/binaryregexp v0.2.0/go.mod h1:qTv7/COck+e2FymRvadv62gMdZztPaShugOCi3I+8D8= rsc.io/quote/v3 v3.1.0/go.mod h1:yEA65RcK8LyAZtP9Kv3t0HmxON59tX3rD+tICJqUlj0= diff --git a/pkg/boot/discovery.go b/pkg/boot/discovery.go index fefd76b0..7ec92423 100644 --- a/pkg/boot/discovery.go +++ b/pkg/boot/discovery.go @@ -19,35 +19,550 @@ package boot import ( "context" + "fmt" + "io/ioutil" + "path/filepath" + "regexp" + "sort" + "strconv" + "strings" + "sync" +) + +import ( + "github.com/pkg/errors" + + "gopkg.in/yaml.v3" ) import ( "github.com/arana-db/arana/pkg/config" "github.com/arana-db/arana/pkg/proto/rule" + rrule "github.com/arana-db/arana/pkg/runtime/rule" + "github.com/arana-db/arana/pkg/util/file" + "github.com/arana-db/arana/pkg/util/log" ) +var _ Discovery = (*discovery)(nil) + +var ( + _regexpTable *regexp.Regexp + _regexpTableOnce sync.Once +) + +var ( + _regexpRuleExpr *regexp.Regexp + _regexpRuleExprSync sync.Once +) + +func getTableRegexp() *regexp.Regexp { + _regexpTableOnce.Do(func() { + _regexpTable = regexp.MustCompile("([a-zA-Z0-9\\-_]+)\\.([a-zA-Z0-9\\\\-_]+)") + }) + return _regexpTable +} + +func getRuleExprRegexp() *regexp.Regexp { + _regexpRuleExprSync.Do(func() { + _regexpRuleExpr = regexp.MustCompile(`([a-zA-Z0-9_]+)\(\s*([0-9]|[1-9][0-9]+)?\s*\)`) + }) + return _regexpRuleExpr +} + type Cluster struct { Tenant string Type config.DataSourceType } type Discovery interface { + + // Init 初始化 Init(ctx context.Context) error + // ListTenants ListTenants(ctx context.Context) ([]string, error) + // GetTable + GetTenant(ctx context.Context, tenant string) (*config.Tenant, error) + + // ListListeners ListListeners(ctx context.Context) ([]*config.Listener, error) + // ListFilters ListFilters(ctx context.Context) ([]*config.Filter, error) + // ListClusters lists the cluster names. ListClusters(ctx context.Context) ([]string, error) + // GetCluster + GetCluster(ctx context.Context, cluster string) (*Cluster, error) // ListGroups lists the group names. ListGroups(ctx context.Context, cluster string) ([]string, error) + // ListNodes lists the node names. ListNodes(ctx context.Context, cluster, group string) ([]string, error) - // ListTables lists the table names. - ListTables(ctx context.Context, cluster string) ([]string, error) // GetNode returns the node info. GetNode(ctx context.Context, cluster, group, node string) (*config.Node, error) + + // ListTables lists the table names. + ListTables(ctx context.Context, cluster string) ([]string, error) // GetTable returns the table info. GetTable(ctx context.Context, cluster, table string) (*rule.VTable, error) - GetTenant(ctx context.Context, tenant string) (*config.Tenant, error) - GetCluster(ctx context.Context, cluster string) (*Cluster, error) +} + +type discovery struct { + path string + options *BootOptions + c *config.Center +} + +func (fp *discovery) Init(ctx context.Context) error { + if err := fp.loadBootOptions(); err != nil { + return err + } + + if err := fp.initConfigCenter(); err != nil { + return err + } + + return nil +} + +func (fp *discovery) loadBootOptions() error { + content, err := ioutil.ReadFile(fp.path) + if err != nil { + err = errors.Wrap(err, "failed to load config") + return err + } + + if !file.IsYaml(fp.path) { + err = errors.Errorf("invalid config file format: %s", filepath.Ext(fp.path)) + return err + } + + var cfg BootOptions + if err = yaml.Unmarshal(content, &cfg); err != nil { + err = errors.Wrapf(err, "failed to unmarshal config") + return err + } + + fp.options = &cfg + return nil +} + +func (fp *discovery) initConfigCenter() error { + c, err := config.NewCenter(*fp.options.Config) + if err != nil { + return err + } + + fp.c = c + + return nil +} + +func (fp *discovery) GetCluster(ctx context.Context, cluster string) (*Cluster, error) { + exist, ok := fp.loadCluster(cluster) + if !ok { + return nil, nil + } + + return &Cluster{ + Tenant: exist.Tenant, + Type: exist.Type, + }, nil +} + +func (fp *discovery) ListTenants(ctx context.Context) ([]string, error) { + + cfg, err := fp.c.Load() + if err != nil { + return nil, err + } + + var tenants []string + for _, it := range cfg.Data.Tenants { + tenants = append(tenants, it.Name) + } + return tenants, nil +} + +func (fp *discovery) GetTenant(ctx context.Context, tenant string) (*config.Tenant, error) { + cfg, err := fp.c.Load() + if err != nil { + return nil, err + } + + for _, it := range cfg.Data.Tenants { + if it.Name == tenant { + return it, nil + } + } + return nil, nil +} + +func (fp *discovery) ListListeners(ctx context.Context) ([]*config.Listener, error) { + cfg, err := fp.c.Load() + if err != nil { + return nil, err + } + + return cfg.Data.Listeners, nil +} + +func (fp *discovery) ListFilters(ctx context.Context) ([]*config.Filter, error) { + cfg, err := fp.c.Load() + if err != nil { + return nil, err + } + + return cfg.Data.Filters, nil +} + +func (fp *discovery) ListClusters(ctx context.Context) ([]string, error) { + cfg, err := fp.c.Load() + if err != nil { + return nil, err + } + + clusters := make([]string, 0, len(cfg.Data.DataSourceClusters)) + for _, it := range cfg.Data.DataSourceClusters { + clusters = append(clusters, it.Name) + } + + return clusters, nil +} + +func (fp *discovery) ListGroups(ctx context.Context, cluster string) ([]string, error) { + bingo, ok := fp.loadCluster(cluster) + if !ok { + return nil, nil + } + groups := make([]string, 0, len(bingo.Groups)) + for _, it := range bingo.Groups { + groups = append(groups, it.Name) + } + + return groups, nil +} + +func (fp *discovery) ListNodes(ctx context.Context, cluster, group string) ([]string, error) { + bingo, ok := fp.loadGroup(cluster, group) + if !ok { + return nil, nil + } + + var nodes []string + for _, it := range bingo.Nodes { + nodes = append(nodes, it.Name) + } + + return nodes, nil +} + +func (fp *discovery) ListTables(ctx context.Context, cluster string) ([]string, error) { + cfg, err := fp.c.Load() + if err != nil { + return nil, err + } + + var tables []string + for _, it := range fp.loadTables(cfg, cluster) { + _, tb, _ := parseTable(it.Name) + tables = append(tables, tb) + } + sort.Strings(tables) + return tables, nil +} + +func (fp *discovery) GetNode(ctx context.Context, cluster, group, node string) (*config.Node, error) { + bingo, ok := fp.loadGroup(cluster, group) + if !ok { + return nil, nil + } + for _, it := range bingo.Nodes { + if it.Name == node { + return it, nil + } + } + return nil, nil +} + +func (fp *discovery) GetTable(ctx context.Context, cluster, table string) (*rule.VTable, error) { + cfg, err := fp.c.Load() + if err != nil { + return nil, err + } + + exist, ok := fp.loadTables(cfg, cluster)[table] + if !ok { + return nil, nil + } + var vt rule.VTable + + var ( + topology rule.Topology + dbFormat, tbFormat string + dbBegin, tbBegin int + dbEnd, tbEnd int + ) + + if exist.Topology != nil { + if len(exist.Topology.DbPattern) > 0 { + if dbFormat, dbBegin, dbEnd, err = parseTopology(exist.Topology.DbPattern); err != nil { + return nil, errors.WithStack(err) + } + } + if len(exist.Topology.TblPattern) > 0 { + if tbFormat, tbBegin, tbEnd, err = parseTopology(exist.Topology.TblPattern); err != nil { + return nil, errors.WithStack(err) + } + } + } + topology.SetRender(getRender(dbFormat), getRender(tbFormat)) + + var ( + keys map[string]struct{} + dbSharder, tbSharder map[string]rule.ShardComputer + ) + for _, it := range exist.DbRules { + var shd rule.ShardComputer + if shd, err = toSharder(it); err != nil { + return nil, err + } + if dbSharder == nil { + dbSharder = make(map[string]rule.ShardComputer) + } + if keys == nil { + keys = make(map[string]struct{}) + } + dbSharder[it.Column] = shd + keys[it.Column] = struct{}{} + } + + for _, it := range exist.TblRules { + var shd rule.ShardComputer + if shd, err = toSharder(it); err != nil { + return nil, err + } + if tbSharder == nil { + tbSharder = make(map[string]rule.ShardComputer) + } + if keys == nil { + keys = make(map[string]struct{}) + } + tbSharder[it.Column] = shd + keys[it.Column] = struct{}{} + } + + for k := range keys { + var ( + shd rule.ShardComputer + dbMetadata, tbMetadata *rule.ShardMetadata + ) + if shd, ok = dbSharder[k]; ok { + dbMetadata = &rule.ShardMetadata{ + + Computer: shd, + Stepper: rule.DefaultNumberStepper, + } + if dbBegin >= 0 && dbEnd >= 0 { + dbMetadata.Steps = 1 + dbEnd - dbBegin + } + } + if shd, ok = tbSharder[k]; ok { + tbMetadata = &rule.ShardMetadata{ + Computer: shd, + Stepper: rule.DefaultNumberStepper, + } + if tbBegin >= 0 && tbEnd >= 0 { + tbMetadata.Steps = 1 + tbEnd - tbBegin + } + } + vt.SetShardMetadata(k, dbMetadata, tbMetadata) + + tpRes := make(map[int][]int) + rng, _ := tbMetadata.Stepper.Ascend(0, tbMetadata.Steps) + for rng.HasNext() { + var ( + seed = rng.Next() + dbIdx = -1 + tbIdx = -1 + ) + if dbMetadata != nil { + if dbIdx, err = dbMetadata.Computer.Compute(seed); err != nil { + return nil, errors.WithStack(err) + } + } + if tbMetadata != nil { + if tbIdx, err = tbMetadata.Computer.Compute(seed); err != nil { + return nil, errors.WithStack(err) + } + } + tpRes[dbIdx] = append(tpRes[dbIdx], tbIdx) + } + + for dbIndex, tbIndexes := range tpRes { + topology.SetTopology(dbIndex, tbIndexes...) + } + } + + if exist.AllowFullScan { + vt.SetAllowFullScan(true) + } + + // TODO: process attributes + _ = exist.Attributes["sql_max_limit"] + + vt.SetTopology(&topology) + + return &vt, nil +} + +func (fp *discovery) loadCluster(cluster string) (*config.DataSourceCluster, bool) { + cfg, err := fp.c.Load() + if err != nil { + return nil, false + } + + for _, it := range cfg.Data.DataSourceClusters { + if it.Name == cluster { + return it, true + } + } + return nil, false +} + +func (fp *discovery) loadGroup(cluster, group string) (*config.Group, bool) { + bingo, ok := fp.loadCluster(cluster) + if !ok { + return nil, false + } + for _, it := range bingo.Groups { + if it.Name == group { + return it, true + } + } + return nil, false +} + +func (fp *discovery) loadTables(cfg *config.Configuration, cluster string) map[string]*config.Table { + + var tables map[string]*config.Table + for _, it := range cfg.Data.ShardingRule.Tables { + db, tb, err := parseTable(it.Name) + if err != nil { + log.Warnf("skip parsing table rule: %v", err) + continue + } + if db != cluster { + continue + } + if tables == nil { + tables = make(map[string]*config.Table) + } + tables[tb] = it + } + return tables +} + +var ( + _regexpTopology *regexp.Regexp + _regexpTopologyOnce sync.Once +) + +func getTopologyRegexp() *regexp.Regexp { + _regexpTopologyOnce.Do(func() { + _regexpTopology = regexp.MustCompile(`\${(?P[0-9]+)\.\.\.(?P[0-9]+)}`) + }) + return _regexpTopology +} + +func parseTopology(input string) (format string, begin, end int, err error) { + mats := getTopologyRegexp().FindAllStringSubmatch(input, -1) + + if len(mats) < 1 { + format = input + begin = -1 + end = -1 + return + } + + if len(mats) > 1 { + err = errors.Errorf("invalid topology expression: %s", input) + return + } + + var ( + beginStr, endStr string + ) + for i := 1; i < len(mats[0]); i++ { + switch getTopologyRegexp().SubexpNames()[i] { + case "begin": + beginStr = mats[0][i] + case "end": + endStr = mats[0][i] + } + } + + if len(beginStr) != len(endStr) { + err = errors.Errorf("invalid topology expression: %s", input) + return + } + + format = getTopologyRegexp().ReplaceAllString(input, fmt.Sprintf(`%%0%dd`, len(beginStr))) + begin, _ = strconv.Atoi(strings.TrimLeft(beginStr, "0")) + end, _ = strconv.Atoi(strings.TrimLeft(endStr, "0")) + return +} + +func toSharder(input *config.Rule) (rule.ShardComputer, error) { + mat := getRuleExprRegexp().FindStringSubmatch(input.Expr) + if len(mat) != 3 { + return nil, errors.Errorf("invalid shard rule: %s", input.Expr) + } + + var ( + computer rule.ShardComputer + method = mat[1] + n, _ = strconv.Atoi(mat[2]) + ) + + switch method { + case string(rrule.ModShard): + computer = rrule.NewModShard(n) + case string(rrule.HashMd5Shard): + computer = rrule.NewHashMd5Shard(n) + case string(rrule.HashBKDRShard): + computer = rrule.NewHashBKDRShard(n) + case string(rrule.HashCrc32Shard): + computer = rrule.NewHashCrc32Shard(n) + default: + return nil, errors.Errorf("invalid shard rule: %s", input.Expr) + } + return computer, nil +} + +func getRender(format string) func(int) string { + if strings.ContainsRune(format, '%') { + return func(i int) string { + return fmt.Sprintf(format, i) + } + } + return func(i int) string { + return format + } +} + +func parseTable(input string) (db, tbl string, err error) { + mat := getTableRegexp().FindStringSubmatch(input) + if len(mat) < 1 { + err = errors.Errorf("invalid table name: %s", input) + return + } + db = mat[1] + tbl = mat[2] + return +} + +func NewProvider(path string) Discovery { + return &discovery{ + path: path, + } } diff --git a/pkg/boot/discovery_over_etcd.go b/pkg/boot/discovery_over_etcd.go deleted file mode 100644 index 9cc7dd06..00000000 --- a/pkg/boot/discovery_over_etcd.go +++ /dev/null @@ -1,117 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package boot - -import ( - "context" - "time" -) - -import ( - etcdv3 "github.com/dubbogo/gost/database/kv/etcd/v3" - - "github.com/pkg/errors" -) - -import ( - "github.com/arana-db/arana/pkg/config" - "github.com/arana-db/arana/pkg/proto/rule" -) - -var _ Discovery = (*EtcdProvider)(nil) - -type EtcdProvider struct { - rootPath string - client *etcdv3.Client -} - -func (ep *EtcdProvider) ListTenants(ctx context.Context) ([]string, error) { - //TODO implement me - panic("implement me") -} - -func (ep *EtcdProvider) ListListeners(ctx context.Context) ([]*config.Listener, error) { - //TODO implement me - panic("implement me") -} - -func (ep *EtcdProvider) ListFilters(ctx context.Context) ([]*config.Filter, error) { - //TODO implement me - panic("implement me") -} - -func (ep *EtcdProvider) ListClusters(ctx context.Context) ([]string, error) { - //TODO implement me - panic("implement me") -} - -func (ep *EtcdProvider) ListGroups(ctx context.Context, cluster string) ([]string, error) { - //TODO implement me - panic("implement me") -} - -func (ep *EtcdProvider) ListNodes(ctx context.Context, cluster, group string) ([]string, error) { - //TODO implement me - panic("implement me") -} - -func (ep *EtcdProvider) ListTables(ctx context.Context, cluster string) ([]string, error) { - //TODO implement me - panic("implement me") -} - -func (ep *EtcdProvider) GetNode(ctx context.Context, cluster, group, node string) (*config.Node, error) { - //TODO implement me - panic("implement me") -} - -func (ep *EtcdProvider) GetTable(ctx context.Context, cluster, table string) (*rule.VTable, error) { - //TODO implement me - panic("implement me") -} - -func (ep *EtcdProvider) GetTenant(ctx context.Context, tenant string) (*config.Tenant, error) { - //TODO implement me - panic("implement me") -} - -func (ep *EtcdProvider) GetCluster(ctx context.Context, cluster string) (*Cluster, error) { - //TODO implement me - panic("implement me") -} - -func (ep *EtcdProvider) Init(ctx context.Context) error { - //TODO implement me - panic("implement me") -} - -func NewEtcdProvider(rootPath string, endpoints ...string) (Discovery, error) { - c, err := etcdv3.NewConfigClientWithErr( - etcdv3.WithName(etcdv3.RegistryETCDV3Client), - etcdv3.WithTimeout(10*time.Second), - etcdv3.WithEndpoints(endpoints...), - ) - if err != nil { - return nil, errors.Wrap(err, "cannot connect etcd") - } - - return &EtcdProvider{ - rootPath: rootPath, - client: c, - }, nil -} diff --git a/pkg/boot/discovery_over_file.go b/pkg/boot/discovery_over_file.go deleted file mode 100644 index 7af203a9..00000000 --- a/pkg/boot/discovery_over_file.go +++ /dev/null @@ -1,448 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package boot - -import ( - "context" - "fmt" - "regexp" - "sort" - "strconv" - "strings" - "sync" -) - -import ( - "github.com/pkg/errors" -) - -import ( - "github.com/arana-db/arana/pkg/config" - "github.com/arana-db/arana/pkg/proto/rule" - rrule "github.com/arana-db/arana/pkg/runtime/rule" - "github.com/arana-db/arana/pkg/util/log" -) - -var _ Discovery = (*fileDiscovery)(nil) - -var ( - _regexpTable *regexp.Regexp - _regexpTableOnce sync.Once -) - -var ( - _regexpRuleExpr *regexp.Regexp - _regexpRuleExprSync sync.Once -) - -func getTableRegexp() *regexp.Regexp { - _regexpTableOnce.Do(func() { - _regexpTable = regexp.MustCompile("([a-zA-Z0-9\\-_]+)\\.([a-zA-Z0-9\\\\-_]+)") - }) - return _regexpTable -} - -func getRuleExprRegexp() *regexp.Regexp { - _regexpRuleExprSync.Do(func() { - _regexpRuleExpr = regexp.MustCompile(`([a-zA-Z0-9_]+)\(\s*([0-9]|[1-9][0-9]+)?\s*\)`) - }) - return _regexpRuleExpr -} - -type fileDiscovery struct { - sync.Once - path string - c *config.Configuration -} - -func (fp *fileDiscovery) GetCluster(ctx context.Context, cluster string) (*Cluster, error) { - exist, ok := fp.loadCluster(cluster) - if !ok { - return nil, nil - } - - return &Cluster{ - Tenant: exist.Tenant, - Type: exist.Type, - }, nil -} - -func (fp *fileDiscovery) Init(ctx context.Context) (err error) { - fp.Do(func() { - fp.c, err = config.ParseV2(fp.path) - }) - return -} - -func (fp *fileDiscovery) ListTenants(ctx context.Context) ([]string, error) { - var tenants []string - for _, it := range fp.c.Data.Tenants { - tenants = append(tenants, it.Name) - } - return tenants, nil -} - -func (fp *fileDiscovery) GetTenant(ctx context.Context, tenant string) (*config.Tenant, error) { - for _, it := range fp.c.Data.Tenants { - if it.Name == tenant { - return it, nil - } - } - return nil, nil -} - -func (fp *fileDiscovery) ListListeners(ctx context.Context) ([]*config.Listener, error) { - return fp.c.Data.Listeners, nil -} - -func (fp *fileDiscovery) ListFilters(ctx context.Context) ([]*config.Filter, error) { - return fp.c.Data.Filters, nil -} - -func (fp *fileDiscovery) ListClusters(ctx context.Context) ([]string, error) { - clusters := make([]string, 0, len(fp.c.Data.DataSourceClusters)) - for _, it := range fp.c.Data.DataSourceClusters { - clusters = append(clusters, it.Name) - } - - return clusters, nil -} - -func (fp *fileDiscovery) ListGroups(ctx context.Context, cluster string) ([]string, error) { - bingo, ok := fp.loadCluster(cluster) - if !ok { - return nil, nil - } - groups := make([]string, 0, len(bingo.Groups)) - for _, it := range bingo.Groups { - groups = append(groups, it.Name) - } - - return groups, nil -} - -func (fp *fileDiscovery) ListNodes(ctx context.Context, cluster, group string) ([]string, error) { - bingo, ok := fp.loadGroup(cluster, group) - if !ok { - return nil, nil - } - - var nodes []string - for _, it := range bingo.Nodes { - nodes = append(nodes, it.Name) - } - - return nodes, nil -} - -func (fp *fileDiscovery) ListTables(ctx context.Context, cluster string) ([]string, error) { - var tables []string - for _, it := range fp.loadTables(cluster) { - _, tb, _ := parseTable(it.Name) - tables = append(tables, tb) - } - sort.Strings(tables) - return tables, nil -} - -func (fp *fileDiscovery) GetNode(ctx context.Context, cluster, group, node string) (*config.Node, error) { - bingo, ok := fp.loadGroup(cluster, group) - if !ok { - return nil, nil - } - for _, it := range bingo.Nodes { - if it.Name == node { - return it, nil - } - } - return nil, nil -} - -func (fp *fileDiscovery) GetTable(ctx context.Context, cluster, table string) (*rule.VTable, error) { - exist, ok := fp.loadTables(cluster)[table] - if !ok { - return nil, nil - } - var vt rule.VTable - - var ( - topology rule.Topology - err error - - dbFormat, tbFormat string - dbBegin, tbBegin int - dbEnd, tbEnd int - ) - - if exist.Topology != nil { - if len(exist.Topology.DbPattern) > 0 { - if dbFormat, dbBegin, dbEnd, err = parseTopology(exist.Topology.DbPattern); err != nil { - return nil, errors.WithStack(err) - } - } - if len(exist.Topology.TblPattern) > 0 { - if tbFormat, tbBegin, tbEnd, err = parseTopology(exist.Topology.TblPattern); err != nil { - return nil, errors.WithStack(err) - } - } - } - topology.SetRender(getRender(dbFormat), getRender(tbFormat)) - - var ( - keys map[string]struct{} - dbSharder, tbSharder map[string]rule.ShardComputer - ) - for _, it := range exist.DbRules { - var shd rule.ShardComputer - if shd, err = toSharder(it); err != nil { - return nil, err - } - if dbSharder == nil { - dbSharder = make(map[string]rule.ShardComputer) - } - if keys == nil { - keys = make(map[string]struct{}) - } - dbSharder[it.Column] = shd - keys[it.Column] = struct{}{} - } - - for _, it := range exist.TblRules { - var shd rule.ShardComputer - if shd, err = toSharder(it); err != nil { - return nil, err - } - if tbSharder == nil { - tbSharder = make(map[string]rule.ShardComputer) - } - if keys == nil { - keys = make(map[string]struct{}) - } - tbSharder[it.Column] = shd - keys[it.Column] = struct{}{} - } - - for k := range keys { - var ( - shd rule.ShardComputer - dbMetadata, tbMetadata *rule.ShardMetadata - ) - if shd, ok = dbSharder[k]; ok { - dbMetadata = &rule.ShardMetadata{ - - Computer: shd, - Stepper: rule.DefaultNumberStepper, - } - if dbBegin >= 0 && dbEnd >= 0 { - dbMetadata.Steps = 1 + dbEnd - dbBegin - } - } - if shd, ok = tbSharder[k]; ok { - tbMetadata = &rule.ShardMetadata{ - Computer: shd, - Stepper: rule.DefaultNumberStepper, - } - if tbBegin >= 0 && tbEnd >= 0 { - tbMetadata.Steps = 1 + tbEnd - tbBegin - } - } - vt.SetShardMetadata(k, dbMetadata, tbMetadata) - - tpRes := make(map[int][]int) - rng, _ := tbMetadata.Stepper.Ascend(0, tbMetadata.Steps) - for rng.HasNext() { - var ( - seed = rng.Next() - dbIdx = -1 - tbIdx = -1 - ) - if dbMetadata != nil { - if dbIdx, err = dbMetadata.Computer.Compute(seed); err != nil { - return nil, errors.WithStack(err) - } - } - if tbMetadata != nil { - if tbIdx, err = tbMetadata.Computer.Compute(seed); err != nil { - return nil, errors.WithStack(err) - } - } - tpRes[dbIdx] = append(tpRes[dbIdx], tbIdx) - } - - for dbIndex, tbIndexes := range tpRes { - topology.SetTopology(dbIndex, tbIndexes...) - } - } - - if exist.AllowFullScan { - vt.SetAllowFullScan(true) - } - - // TODO: process attributes - _ = exist.Attributes["sql_max_limit"] - - vt.SetTopology(&topology) - - return &vt, nil -} - -func (fp *fileDiscovery) loadCluster(cluster string) (*config.DataSourceCluster, bool) { - for _, it := range fp.c.Data.DataSourceClusters { - if it.Name == cluster { - return it, true - } - } - return nil, false -} - -func (fp *fileDiscovery) loadGroup(cluster, group string) (*config.Group, bool) { - bingo, ok := fp.loadCluster(cluster) - if !ok { - return nil, false - } - for _, it := range bingo.Groups { - if it.Name == group { - return it, true - } - } - return nil, false -} - -func (fp *fileDiscovery) loadTables(cluster string) map[string]*config.Table { - var tables map[string]*config.Table - for _, it := range fp.c.Data.ShardingRule.Tables { - db, tb, err := parseTable(it.Name) - if err != nil { - log.Warnf("skip parsing table rule: %v", err) - continue - } - if db != cluster { - continue - } - if tables == nil { - tables = make(map[string]*config.Table) - } - tables[tb] = it - } - return tables -} - -var ( - _regexpTopology *regexp.Regexp - _regexpTopologyOnce sync.Once -) - -func getTopologyRegexp() *regexp.Regexp { - _regexpTopologyOnce.Do(func() { - _regexpTopology = regexp.MustCompile(`\${(?P[0-9]+)\.\.\.(?P[0-9]+)}`) - }) - return _regexpTopology -} - -func parseTopology(input string) (format string, begin, end int, err error) { - mats := getTopologyRegexp().FindAllStringSubmatch(input, -1) - - if len(mats) < 1 { - format = input - begin = -1 - end = -1 - return - } - - if len(mats) > 1 { - err = errors.Errorf("invalid topology expression: %s", input) - return - } - - var ( - beginStr, endStr string - ) - for i := 1; i < len(mats[0]); i++ { - switch getTopologyRegexp().SubexpNames()[i] { - case "begin": - beginStr = mats[0][i] - case "end": - endStr = mats[0][i] - } - } - - if len(beginStr) != len(endStr) { - err = errors.Errorf("invalid topology expression: %s", input) - return - } - - format = getTopologyRegexp().ReplaceAllString(input, fmt.Sprintf(`%%0%dd`, len(beginStr))) - begin, _ = strconv.Atoi(strings.TrimLeft(beginStr, "0")) - end, _ = strconv.Atoi(strings.TrimLeft(endStr, "0")) - return -} - -func toSharder(input *config.Rule) (rule.ShardComputer, error) { - mat := getRuleExprRegexp().FindStringSubmatch(input.Expr) - if len(mat) != 3 { - return nil, errors.Errorf("invalid shard rule: %s", input.Expr) - } - - var ( - computer rule.ShardComputer - method = mat[1] - n, _ = strconv.Atoi(mat[2]) - ) - - switch method { - case string(rrule.ModShard): - computer = rrule.NewModShard(n) - case string(rrule.HashMd5Shard): - computer = rrule.NewHashMd5Shard(n) - case string(rrule.HashBKDRShard): - computer = rrule.NewHashBKDRShard(n) - case string(rrule.HashCrc32Shard): - computer = rrule.NewHashCrc32Shard(n) - default: - return nil, errors.Errorf("invalid shard rule: %s", input.Expr) - } - return computer, nil -} - -func getRender(format string) func(int) string { - if strings.ContainsRune(format, '%') { - return func(i int) string { - return fmt.Sprintf(format, i) - } - } - return func(i int) string { - return format - } -} - -func parseTable(input string) (db, tbl string, err error) { - mat := getTableRegexp().FindStringSubmatch(input) - if len(mat) < 1 { - err = errors.Errorf("invalid table name: %s", input) - return - } - db = mat[1] - tbl = mat[2] - return -} - -func NewFileProvider(path string) Discovery { - return &fileDiscovery{ - path: path, - } -} diff --git a/pkg/boot/discovery_over_file_test.go b/pkg/boot/discovery_test.go similarity index 88% rename from pkg/boot/discovery_over_file_test.go rename to pkg/boot/discovery_test.go index 007ce924..b7ad477b 100644 --- a/pkg/boot/discovery_over_file_test.go +++ b/pkg/boot/discovery_test.go @@ -31,9 +31,14 @@ import ( ) func TestFileProvider(t *testing.T) { - provider := NewFileProvider(testdata.Path("fake_config.yaml")) + provider := NewProvider(testdata.Path("fake_bootstrap.yaml")).(*discovery) - err := provider.Init(context.Background()) + err := provider.loadBootOptions() + assert.NoError(t, err, "should init ok") + + provider.options.Config.Options["path"] = testdata.Path("fake_config.yaml") + + err = provider.initConfigCenter() assert.NoError(t, err, "should init ok") clusters, err := provider.ListClusters(context.Background()) diff --git a/pkg/boot/options.go b/pkg/boot/options.go new file mode 100644 index 00000000..0a82aff9 --- /dev/null +++ b/pkg/boot/options.go @@ -0,0 +1,29 @@ +/* + * Licensed to Apache Software Foundation (ASF) under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Apache Software Foundation (ASF) licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package boot + +import ( + "github.com/arana-db/arana/pkg/config" +) + +type BootOptions struct { + Config *config.ConfigOptions `yaml:"config"` +} diff --git a/pkg/boot/plugin.go b/pkg/boot/plugin.go new file mode 100644 index 00000000..fe07a7d1 --- /dev/null +++ b/pkg/boot/plugin.go @@ -0,0 +1,26 @@ +/* + * Licensed to Apache Software Foundation (ASF) under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Apache Software Foundation (ASF) licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package boot + +import ( + _ "github.com/arana-db/arana/pkg/config/etcd" + _ "github.com/arana-db/arana/pkg/config/file" +) diff --git a/pkg/config/api.go b/pkg/config/api.go new file mode 100644 index 00000000..e8f0f4b3 --- /dev/null +++ b/pkg/config/api.go @@ -0,0 +1,107 @@ +/* + * Licensed to Apache Software Foundation (ASF) under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Apache Software Foundation (ASF) licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package config + +import ( + "errors" + "fmt" + "io" +) + +type ( + // ProtocolType protocol type enum + ProtocolType int32 +) + +const ( + DefaultConfigPath = "/arana-db/config" + DefaultConfigMetadataPath = "/arana-db/config/metadata" + DefaultConfigDataListenersPath = "/arana-db/config/data/listeners" + DefaultConfigDataFiltersPath = "/arana-db/config/data/filters" + DefaultConfigDataSourceClustersPath = "/arana-db/config/data/dataSourceClusters" + DefaultConfigDataShardingRulePath = "/arana-db/config/data/shardingRule" + DefaultConfigDataTenantsPath = "/arana-db/config/data/tenants" +) + +const ( + Http ProtocolType = iota + MySQL +) + +const ( + _ DataSourceType = "" + DBMySQL DataSourceType = "mysql" + DBPostgreSQL DataSourceType = "postgresql" +) + +var ( + slots map[string]StoreOperate = make(map[string]StoreOperate) + storeOperate StoreOperate +) + +func GetStoreOperate() (StoreOperate, error) { + if storeOperate != nil { + return storeOperate, nil + } + + return nil, errors.New("StoreOperate not init") +} + +func Init(name string, options map[string]interface{}) error { + + s, exist := slots[name] + if !exist { + return fmt.Errorf("StoreOperate solt=[%s] not exist", name) + } + + storeOperate = s + + return storeOperate.Init(options) +} + +//Register register store plugin +func Register(s StoreOperate) { + if _, ok := slots[s.Name()]; ok { + panic(fmt.Errorf("StoreOperate=[%s] already exist", s.Name())) + } + + slots[s.Name()] = s +} + +//StoreOperate config storage related plugins +type StoreOperate interface { + io.Closer + + //Init plugin initialization + Init(options map[string]interface{}) error + + //Save save a configuration data + Save(key string, val []byte) error + + //Get get a configuration + Get(key string) ([]byte, error) + + //Watch Monitor changes of the key + Watch(key string) (<-chan []byte, error) + + //Name plugin name + Name() string +} diff --git a/pkg/config/config.go b/pkg/config/config.go index a1bc3bb1..99e5b726 100644 --- a/pkg/config/config.go +++ b/pkg/config/config.go @@ -1,274 +1,211 @@ /* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at + * Licensed to Apache Software Foundation (ASF) under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Apache Software Foundation (ASF) licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. */ package config import ( - "bytes" + "context" "encoding/json" - "io/ioutil" - "path/filepath" - "regexp" - "strconv" + "errors" + "fmt" + "sync" + "sync/atomic" ) import ( - "github.com/creasty/defaults" - - "github.com/ghodss/yaml" - - "github.com/pkg/errors" -) - -type ( - // ProtocolType protocol type enum - ProtocolType int32 + "github.com/tidwall/gjson" ) -const ( - Http ProtocolType = iota - Mysql -) +type Changeable interface { + Name() string + Sign() string +} -const ( - _ DataSourceType = iota - DBMysql - DBPostgreSql -) +type Observer func() -type ( - // DataSourceType is the data source type - DataSourceType int +type ConfigOptions struct { + StoreName string `yaml:"name"` + Options map[string]interface{} `yaml:"options"` +} - // SocketAddress specify either a logical or physical address and port, which are - // used to tell server where to bind/listen, connect to upstream and find - // management servers - SocketAddress struct { - Address string `default:"0.0.0.0" yaml:"address" json:"address"` - Port int `default:"8881" yaml:"port" json:"port"` - } +type Center struct { + storeOperate StoreOperate + confHolder atomic.Value // 里面持有了最新的 *Configuration 对象 + lock sync.RWMutex + observers []Observer +} - Filter struct { - Name string `json:"name,omitempty"` - Config json.RawMessage `json:"config,omitempty"` +func NewCenter(options ConfigOptions) (*Center, error) { + if err := Init(options.StoreName, options.Options); err != nil { + return nil, err } - Configuration struct { - TypeMeta - Metadata map[string]interface{} `yaml:"metadata" json:"metadata"` - Data *Data `validate:"required" yaml:"data" json:"data"` + operate, err := GetStoreOperate() + if err != nil { + return nil, err } - TypeMeta struct { - Kind string `yaml:"kind" json:"kind,omitempty"` - APIVersion string `yaml:"apiVersion" json:"apiVersion,omitempty"` - } + return &Center{ + confHolder: atomic.Value{}, + lock: sync.RWMutex{}, + storeOperate: operate, + observers: make([]Observer, 0, 2), + }, nil +} - Data struct { - Filters []*Filter `yaml:"filters" json:"filters,omitempty"` - Listeners []*Listener `validate:"required" yaml:"listeners" json:"listeners"` - Tenants []*Tenant `validate:"required" yaml:"tenants" json:"tenants"` - DataSourceClusters []*DataSourceCluster `validate:"required" yaml:"clusters" json:"clusters"` - ShardingRule *ShardingRule `yaml:"sharding_rule,omitempty" json:"sharding_rule,omitempty"` - } +func (c *Center) Close() error { + return c.storeOperate.Close() +} - Tenant struct { - Name string `validate:"required" yaml:"name" json:"name"` - Users []*User `validate:"required" yaml:"users" json:"users"` - } +func (c *Center) Load() (*Configuration, error) { + return c.LoadContext(context.Background()) +} - DataSourceCluster struct { - Name string `yaml:"name" json:"name"` - Type DataSourceType `yaml:"type" json:"type"` - SqlMaxLimit int `default:"-1" yaml:"sql_max_limit" json:"sql_max_limit,omitempty"` - Tenant string `yaml:"tenant" json:"tenant"` - ConnProps *ConnProp `yaml:"conn_props" json:"conn_props,omitempty"` - Groups []*Group `yaml:"groups" json:"groups"` - } +func (c *Center) LoadContext(ctx context.Context) (*Configuration, error) { + val := c.confHolder.Load() + if val == nil { + cfg, err := c.loadFromStore(ctx) + if err != nil { + return nil, err + } - ConnProp struct { - Capacity int `yaml:"capacity" json:"capacity,omitempty"` // connection pool capacity - MaxCapacity int `yaml:"max_capacity" json:"max_capacity,omitempty"` // max connection pool capacity - IdleTimeout int `yaml:"idle_timeout" json:"idle_timeout,omitempty"` // close backend direct connection after idle_timeout + c.confHolder.Store(cfg) } - Group struct { - Name string `yaml:"name" json:"name"` - Nodes []*Node `yaml:"nodes" json:"nodes"` - } + val = c.confHolder.Load() + return val.(*Configuration), nil +} - Node struct { - Name string `yaml:"name" json:"name"` - Host string `yaml:"host" json:"host"` - Port int `yaml:"port" json:"port"` - Username string `yaml:"username" json:"username"` - Password string `yaml:"password" json:"password"` - Database string `yaml:"database" json:"database"` - ConnProps map[string]string `yaml:"conn_props" json:"conn_props,omitempty"` - Weight string `default:"r10w10" yaml:"weight" json:"weight"` - Labels map[string]string `yaml:"labels" json:"labels,omitempty"` - } +func (c *Center) loadFromStore(ctx context.Context) (*Configuration, error) { + operate := c.storeOperate - ShardingRule struct { - Tables []*Table `yaml:"tables" json:"tables"` + cfg := &Configuration{ + TypeMeta: TypeMeta{}, + Metadata: make(map[string]interface{}), + Data: &Data{ + Filters: make([]*Filter, 0), + Listeners: make([]*Listener, 0), + Tenants: make([]*Tenant, 0), + DataSourceClusters: make([]*DataSourceCluster, 0), + ShardingRule: &ShardingRule{}, + }, } - Listener struct { - ProtocolType string `yaml:"protocol_type" json:"protocol_type"` - SocketAddress *SocketAddress `yaml:"socket_address" json:"socket_address"` - ServerVersion string `yaml:"server_version" json:"server_version"` + metadataVal, err := operate.Get(DefaultConfigMetadataPath) + if err != nil { + return nil, err } - - User struct { - Username string `yaml:"username" json:"username"` - Password string `yaml:"password" json:"password"` + listenersVal, err := operate.Get(DefaultConfigDataListenersPath) + if err != nil { + return nil, err } - - Table struct { - Name string `yaml:"name" json:"name"` - AllowFullScan bool `yaml:"allow_full_scan" json:"allow_full_scan,omitempty"` - DbRules []*Rule `yaml:"db_rules" json:"db_rules"` - TblRules []*Rule `yaml:"tbl_rules" json:"tbl_rules"` - Topology *Topology `yaml:"topology" json:"topology"` - ShadowTopology *Topology `yaml:"shadow_topology" json:"shadow_topology"` - Attributes map[string]string `yaml:"attributes" json:"attributes"` + filtersVal, err := operate.Get(DefaultConfigDataFiltersPath) + if err != nil { + return nil, err } - - Rule struct { - Column string `yaml:"column" json:"column"` - Expr string `yaml:"expr" json:"expr"` + clustersVal, err := operate.Get(DefaultConfigDataSourceClustersPath) + if err != nil { + return nil, err } - - Topology struct { - DbPattern string `yaml:"db_pattern" json:"db_pattern"` - TblPattern string `yaml:"tbl_pattern" json:"tbl_pattern"` + shardingRuleVal, err := operate.Get(DefaultConfigDataShardingRulePath) + if err != nil { + return nil, err } -) - -func (c *Configuration) Init() error { - return defaults.Set(c) -} - -func ParseV2(path string) (*Configuration, error) { - content, err := ioutil.ReadFile(path) + tenantsVal, err := operate.Get(DefaultConfigDataTenantsPath) if err != nil { - return nil, errors.Wrap(err, "failed to load config") + return nil, err } - if !yamlFormat(path) { - return nil, errors.Errorf("invalid config file format: %s", filepath.Ext(path)) + if len(metadataVal) != 0 { + if err := json.Unmarshal(metadataVal, &cfg.Metadata); err != nil { + return nil, err + } } - - var cfg Configuration - if err = yaml.Unmarshal(content, &cfg); err != nil { - return nil, errors.Wrapf(err, "failed to unmarshal config") + if len(listenersVal) != 0 { + if err := json.Unmarshal(listenersVal, &cfg.Data.Listeners); err != nil { + return nil, err + } } - - if err = cfg.Init(); err != nil { - return nil, errors.Wrapf(err, "failed to init default config") + if len(filtersVal) != 0 { + if err := json.Unmarshal(filtersVal, &cfg.Data.Filters); err != nil { + return nil, err + } } - - return &cfg, nil -} - -// LoadV2 loads the configuration. -func LoadV2(path string) (*Configuration, error) { - configPath, _ := filepath.Abs(path) - cfg, err := ParseV2(configPath) - if err != nil { - return nil, errors.WithStack(err) + if len(clustersVal) != 0 { + if err := json.Unmarshal(clustersVal, &cfg.Data.DataSourceClusters); err != nil { + return nil, err + } + } + if len(tenantsVal) != 0 { + if err := json.Unmarshal(tenantsVal, &cfg.Data.Tenants); err != nil { + return nil, err + } + } + if len(shardingRuleVal) != 0 { + if err := json.Unmarshal(shardingRuleVal, cfg.Data.ShardingRule); err != nil { + return nil, err + } } return cfg, nil } -func yamlFormat(path string) bool { - ext := filepath.Ext(path) - if ext == ".yaml" || ext == ".yml" { - return true - } - return false +func (c *Center) Persist() error { + return c.PersistContext(context.Background()) } -func (t *DataSourceType) UnmarshalText(text []byte) error { - if t == nil { - return errors.New("can't unmarshal a nil *DataSourceType") - } - if !t.unmarshalText(bytes.ToLower(text)) { - return errors.Errorf("unrecognized datasource type: %q", text) +func (c *Center) PersistContext(ctx context.Context) error { + val := c.confHolder.Load() + if val == nil { + return errors.New("ConfHolder.load is nil") } - return nil -} -func (t *DataSourceType) unmarshalText(text []byte) bool { - dataSourceType := string(text) - switch dataSourceType { - case "mysql": - *t = DBMysql - case "postgresql": - *t = DBPostgreSql - default: - return false - } - return true -} + conf := val.(*Configuration) -var reg = regexp.MustCompile(`^[rR]([0-9]+)[wW]([0-9]+)$`) - -func (d *Node) GetReadAndWriteWeight() (int, int, error) { - items := reg.FindStringSubmatch(d.Weight) - if len(items) != 3 { - return 0, 0, errors.New("weight config should be r10w10") - } - readWeight, err := strconv.Atoi(items[1]) + configJson, err := json.Marshal(conf) if err != nil { - return 0, 0, err + return fmt.Errorf("config json.marshal failed %v err:", err) } - writeWeight, err := strconv.Atoi(items[2]) - if err != nil { - return 0, 0, err + + if err := c.storeOperate.Save(DefaultConfigMetadataPath, []byte(gjson.GetBytes(configJson, "metadata").String())); err != nil { + return err } - return readWeight, writeWeight, nil -} + if err := c.storeOperate.Save(DefaultConfigDataListenersPath, []byte(gjson.GetBytes(configJson, "data.listeners").String())); err != nil { + return err + } -func (d *Node) String() string { - b, _ := json.Marshal(d) - return string(b) -} + if err := c.storeOperate.Save(DefaultConfigDataFiltersPath, []byte(gjson.GetBytes(configJson, "data.filters").String())); err != nil { + return err + } -func (t *ProtocolType) UnmarshalText(text []byte) error { - if t == nil { - return errors.New("can't unmarshal a nil *ProtocolType") + if err := c.storeOperate.Save(DefaultConfigDataSourceClustersPath, []byte(gjson.GetBytes(configJson, "data.dataSourceClusters").String())); err != nil { + return err } - if !t.unmarshalText(bytes.ToLower(text)) { - return errors.Errorf("unrecognized protocol type: %q", text) + + if err := c.storeOperate.Save(DefaultConfigDataTenantsPath, []byte(gjson.GetBytes(configJson, "data.tenants").String())); err != nil { + return err } - return nil -} -func (t *ProtocolType) unmarshalText(text []byte) bool { - protocolType := string(text) - switch protocolType { - case "mysql": - *t = Mysql - case "http": - *t = Http - default: - return false + if err = c.storeOperate.Save(DefaultConfigDataShardingRulePath, []byte(gjson.GetBytes(configJson, "data.shardingRule").String())); err != nil { + return err } - return true + + return nil } diff --git a/pkg/config/etcd.go b/pkg/config/etcd.go deleted file mode 100644 index 53e8d166..00000000 --- a/pkg/config/etcd.go +++ /dev/null @@ -1,109 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package config - -import ( - "encoding/json" - "time" -) - -import ( - etcdv3 "github.com/dubbogo/gost/database/kv/etcd/v3" - - "github.com/pkg/errors" - - "github.com/tidwall/gjson" -) - -const ( - defaultConfigPath = "/dubbo-go-arana/config" - defaultConfigDataListenersPath = "/dubbo-go-arana/config/data/listeners" - defaultConfigDataExecutorsPath = "/dubbo-go-arana/config/data/executors" - defaultConfigDataSourceClustersPath = "/dubbo-go-arana/config/data/dataSourceClusters" - defaultConfigDataShardingRulePath = "/dubbo-go-arana/config/data/shardingRule" -) - -type Client struct { - client *etcdv3.Client -} - -func NewClient(endpoint []string) (*Client, error) { - tmpClient, err := etcdv3.NewConfigClientWithErr( - etcdv3.WithName(etcdv3.RegistryETCDV3Client), - etcdv3.WithTimeout(10*time.Second), - etcdv3.WithEndpoints(endpoint...), - ) - if err != nil { - return nil, errors.Wrap(err, "failed to initialize etcd client") - } - return &Client{client: tmpClient}, nil -} - -// PutConfigToEtcd initialize local file config into etcd,only be used in when etcd don't hava data. -func (c *Client) PutConfigToEtcd(configPath string) error { - config, err := LoadV2(configPath) - if err != nil { - return errors.WithStack(err) - } - - configJson, err := json.Marshal(config) - if err != nil { - return errors.Errorf("config json.marshal failed %v err:", err) - } - - if err = c.client.Put(defaultConfigPath, string(configJson)); err != nil { - return err - } - - if err = c.client.Put(defaultConfigDataListenersPath, gjson.Get(string(configJson), "data.listeners").String()); err != nil { - return err - } - - if err = c.client.Put(defaultConfigDataExecutorsPath, gjson.Get(string(configJson), "data.executors").String()); err != nil { - return err - } - - if err = c.client.Put(defaultConfigDataSourceClustersPath, gjson.Get(string(configJson), "data.dataSourceClusters").String()); err != nil { - return err - } - - if err = c.client.Put(defaultConfigDataShardingRulePath, gjson.Get(string(configJson), "data.shardingRule").String()); err != nil { - return err - } - - return nil -} - -// LoadConfigFromEtcd get key value from etcd -func (c *Client) LoadConfigFromEtcd(configKey string) (string, error) { - resp, err := c.client.Get(configKey) - if err != nil { - return "", errors.Errorf("Get remote config fail error %v", err) - } - return resp, nil -} - -// UpdateConfigToEtcd update key value in etcd -func (c *Client) UpdateConfigToEtcd(configKey, configValue string) error { - - if err := c.client.Put(configKey, configValue); err != nil { - return err - } - - return nil -} diff --git a/pkg/config/etcd/etcd.go b/pkg/config/etcd/etcd.go new file mode 100644 index 00000000..6253d3b2 --- /dev/null +++ b/pkg/config/etcd/etcd.go @@ -0,0 +1,159 @@ +/* + * Licensed to Apache Software Foundation (ASF) under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Apache Software Foundation (ASF) licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package etcd + +import ( + "context" + "math" + "sync" + "time" +) + +import ( + etcdv3 "github.com/dubbogo/gost/database/kv/etcd/v3" + + "github.com/pkg/errors" + + "go.etcd.io/etcd/api/v3/mvccpb" + + clientv3 "go.etcd.io/etcd/client/v3" +) + +import ( + "github.com/arana-db/arana/pkg/config" +) + +func init() { + config.Register(&storeOperate{}) +} + +type storeOperate struct { + client *etcdv3.Client + lock *sync.RWMutex + receivers map[string]*etcdWatcher + cancelList []context.CancelFunc +} + +func (c *storeOperate) Init(options map[string]interface{}) error { + endpoints, _ := options["endpoints"].([]string) + tmpClient, err := etcdv3.NewConfigClientWithErr( + etcdv3.WithName(etcdv3.RegistryETCDV3Client), + etcdv3.WithTimeout(10*time.Second), + etcdv3.WithEndpoints(endpoints...), + ) + if err != nil { + return errors.Wrap(err, "failed to initialize etcd client") + } + + c.client = tmpClient + c.lock = &sync.RWMutex{} + c.receivers = make(map[string]*etcdWatcher) + c.cancelList = make([]context.CancelFunc, 0, 2) + + return nil +} + +func (c *storeOperate) Save(key string, val []byte) error { + return c.client.Put(key, string(val)) +} + +func (c *storeOperate) Get(key string) ([]byte, error) { + v, err := c.client.Get(key) + if err != nil { + return nil, err + } + + return []byte(v), nil +} + +type etcdWatcher struct { + revision int64 + lock *sync.RWMutex + receivers []chan []byte + ch clientv3.WatchChan +} + +func newEtcdWatcher(ch clientv3.WatchChan) *etcdWatcher { + w := &etcdWatcher{ + revision: math.MinInt64, + lock: &sync.RWMutex{}, + receivers: make([]chan []byte, 0, 2), + ch: ch, + } + return w +} + +func (w *etcdWatcher) run(ctx context.Context) { + for { + select { + case resp := <-w.ch: + for i := range resp.Events { + event := resp.Events[i] + if event.Type == mvccpb.DELETE || event.Kv.ModRevision <= w.revision { + continue + } + w.revision = event.Kv.ModRevision + for p := range w.receivers { + w.receivers[p] <- event.Kv.Value + } + } + case <-ctx.Done(): + return + } + } +} + +func (c *storeOperate) Watch(key string) (<-chan []byte, error) { + defer c.lock.Unlock() + c.lock.Lock() + if _, ok := c.receivers[key]; !ok { + watchCh, err := c.client.Watch(key) + if err != nil { + return nil, err + } + w := newEtcdWatcher(watchCh) + ctx, cancel := context.WithCancel(context.Background()) + go w.run(ctx) + c.cancelList = append(c.cancelList, cancel) + c.receivers[key] = w + } + + w := c.receivers[key] + + defer w.lock.Unlock() + w.lock.Lock() + + rec := make(chan []byte) + c.receivers[key].receivers = append(c.receivers[key].receivers, rec) + return rec, nil +} + +func (c *storeOperate) Name() string { + return "etcd" +} + +func (c *storeOperate) Close() error { + for _, f := range c.cancelList { + f() + } + + return nil +} diff --git a/pkg/config/etcd/etcd_test.go b/pkg/config/etcd/etcd_test.go new file mode 100644 index 00000000..bd5c699a --- /dev/null +++ b/pkg/config/etcd/etcd_test.go @@ -0,0 +1,99 @@ +/* + * Licensed to Apache Software Foundation (ASF) under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Apache Software Foundation (ASF) licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package etcd + +import ( + "net/url" + "testing" + "time" +) + +import ( + "github.com/stretchr/testify/suite" + + "go.etcd.io/etcd/server/v3/embed" +) + +import ( + "github.com/arana-db/arana/testdata" +) + +const _defaultEtcdV3WorkDir = "/tmp/dubbo-go-arana/config" + +var _fakeConfigPath = testdata.Path("fake_bootstrap.yaml") + +type ClientTestSuite struct { + suite.Suite + + etcdConfig struct { + name string + endpoints []string + timeout time.Duration + heartbeat int + } + + etcd *embed.Etcd + client *storeOperate +} + +// start etcd server +func (suite *ClientTestSuite) SetupSuite() { + t := suite.T() + DefaultListenPeerURLs := "http://localhost:2382" + DefaultListenClientURLs := "http://localhost:2381" + lpurl, _ := url.Parse(DefaultListenPeerURLs) + lcurl, _ := url.Parse(DefaultListenClientURLs) + cfg := embed.NewConfig() + cfg.LPUrls = []url.URL{*lpurl} + cfg.LCUrls = []url.URL{*lcurl} + cfg.Dir = _defaultEtcdV3WorkDir + e, err := embed.StartEtcd(cfg) + if err != nil { + t.Fatal(err) + } + select { + case <-e.Server.ReadyNotify(): + t.Log("Server is ready!") + case <-time.After(60 * time.Second): + e.Server.Stop() // trigger a shutdown + t.Logf("Server took too long to start!") + } + + suite.etcd = e + return +} + +func TestClientSuite(t *testing.T) { + t.Skip("reimplement etcd") + suite.Run(t, &ClientTestSuite{ + etcdConfig: struct { + name string + endpoints []string + timeout time.Duration + heartbeat int + }{ + name: "test", + endpoints: []string{"localhost:2381"}, + timeout: time.Second, + heartbeat: 1, + }, + }) +} diff --git a/pkg/config/etcd_test.go b/pkg/config/etcd_test.go deleted file mode 100644 index dddb5fd1..00000000 --- a/pkg/config/etcd_test.go +++ /dev/null @@ -1,172 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package config - -import ( - "encoding/json" - "net/url" - "os" - "testing" - "time" -) - -import ( - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/suite" - - "go.etcd.io/etcd/server/v3/embed" -) - -const defaultEtcdV3WorkDir = "/tmp/dubbo-go-arana/config" - -type ClientTestSuite struct { - suite.Suite - - etcdConfig struct { - name string - endpoints []string - timeout time.Duration - heartbeat int - } - - etcd *embed.Etcd - - client *Client -} - -// start etcd server -func (suite *ClientTestSuite) SetupSuite() { - t := suite.T() - DefaultListenPeerURLs := "http://localhost:2382" - DefaultListenClientURLs := "http://localhost:2381" - lpurl, _ := url.Parse(DefaultListenPeerURLs) - lcurl, _ := url.Parse(DefaultListenClientURLs) - cfg := embed.NewConfig() - cfg.LPUrls = []url.URL{*lpurl} - cfg.LCUrls = []url.URL{*lcurl} - cfg.Dir = defaultEtcdV3WorkDir - e, err := embed.StartEtcd(cfg) - if err != nil { - t.Fatal(err) - } - select { - case <-e.Server.ReadyNotify(): - t.Log("Server is ready!") - case <-time.After(60 * time.Second): - e.Server.Stop() // trigger a shutdown - t.Logf("Server took too long to start!") - } - - suite.etcd = e - return -} - -// stop etcd server -func (suite *ClientTestSuite) TearDownSuite() { - suite.etcd.Close() - if err := os.RemoveAll(defaultConfigPath); err != nil { - suite.FailNow(err.Error()) - } -} - -func (suite *ClientTestSuite) setUpClient() *Client { - c, err := NewClient(suite.etcdConfig.endpoints) - if err != nil { - suite.T().Fatal(err) - } - return c -} - -func (suite *ClientTestSuite) SetupTest() { - c := suite.setUpClient() - suite.client = c - return -} - -func (suite *ClientTestSuite) TestLoadConfigFromEtcd() { - t := suite.T() - c := suite.client - defer suite.client.client.Close() - - if err := c.PutConfigToEtcd(fakeConfigPath); err != nil { - t.Fatal(err) - } - - resp, err := c.LoadConfigFromEtcd(defaultConfigPath) - if err != nil { - t.Fatal(err) - } - config, err := LoadV2(fakeConfigPath) - assert.NoError(suite.T(), err) - configJson, _ := json.Marshal(config) - - if resp != string(configJson) { - t.Fatalf("expect %s but get %s", string(configJson), resp) - } -} - -func (suite *ClientTestSuite) TestUpdateConfigToEtcd() { - t := suite.T() - c := suite.client - defer suite.client.client.Close() - - resp, err := c.LoadConfigFromEtcd(defaultConfigDataExecutorsPath) - if err != nil { - t.Fatal(err) - } - jsonSlice := make([]map[string]interface{}, 1) - - json.Unmarshal([]byte(resp), &jsonSlice) - - jsonSlice[0]["name"] = "test" - - configJson, err := json.Marshal(jsonSlice) - if err != nil { - t.Fatal(err) - } - - err = c.UpdateConfigToEtcd(defaultConfigDataExecutorsPath, string(configJson)) - if err != nil { - t.Fatal(err) - } - - resp, err = c.LoadConfigFromEtcd(defaultConfigDataExecutorsPath) - if err != nil { - t.Fatal(err) - } - if resp != string(configJson) { - t.Fatalf("expect %s but get %s", configJson, resp) - } -} - -func TestClientSuite(t *testing.T) { - t.Skip("reimplement etcd") - suite.Run(t, &ClientTestSuite{ - etcdConfig: struct { - name string - endpoints []string - timeout time.Duration - heartbeat int - }{ - name: "test", - endpoints: []string{"localhost:2381"}, - timeout: time.Second, - heartbeat: 1, - }, - }) -} diff --git a/pkg/config/file/file.go b/pkg/config/file/file.go new file mode 100644 index 00000000..15a12688 --- /dev/null +++ b/pkg/config/file/file.go @@ -0,0 +1,107 @@ +/* + * Licensed to Apache Software Foundation (ASF) under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Apache Software Foundation (ASF) licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package file + +import ( + "encoding/json" + "fmt" + "sync" +) + +import ( + "github.com/tidwall/gjson" +) + +import ( + "github.com/arana-db/arana/pkg/config" +) + +func init() { + config.Register(&storeOperate{}) +} + +type storeOperate struct { + lock *sync.RWMutex + receivers map[string][]chan []byte + path string + cfgJson map[string]string +} + +func (s *storeOperate) Init(options map[string]interface{}) error { + s.lock = &sync.RWMutex{} + s.receivers = make(map[string][]chan []byte) + + s.path, _ = options["path"].(string) + + cfg, err := config.LoadV2(s.path) + if err != nil { + return err + } + configJson, err := json.Marshal(cfg) + if err != nil { + return fmt.Errorf("config json.marshal failed %v err:", err) + } + s.initCfgJsonMap(string(configJson)) + return nil +} + +func (s *storeOperate) initCfgJsonMap(val string) { + s.cfgJson = make(map[string]string) + + s.cfgJson[config.DefaultConfigMetadataPath] = gjson.Get(val, "metadata").String() + s.cfgJson[config.DefaultConfigDataTenantsPath] = gjson.Get(val, "data.tenants").String() + s.cfgJson[config.DefaultConfigDataFiltersPath] = gjson.Get(val, "data.filters").String() + s.cfgJson[config.DefaultConfigDataListenersPath] = gjson.Get(val, "data.listeners").String() + s.cfgJson[config.DefaultConfigDataSourceClustersPath] = gjson.Get(val, "data.clusters").String() + s.cfgJson[config.DefaultConfigDataShardingRulePath] = gjson.Get(val, "data.sharding_rule").String() +} + +func (s *storeOperate) Save(key string, val []byte) error { + return nil +} + +func (s *storeOperate) Get(key string) ([]byte, error) { + val := []byte(s.cfgJson[key]) + return val, nil +} + +//Watch TODO change notification through file inotify mechanism +func (s *storeOperate) Watch(key string) (<-chan []byte, error) { + defer s.lock.Unlock() + + if _, ok := s.receivers[key]; !ok { + s.receivers[key] = make([]chan []byte, 0, 2) + } + + rec := make(chan []byte) + + s.receivers[key] = append(s.receivers[key], rec) + + return rec, nil +} + +func (s *storeOperate) Name() string { + return "file" +} + +func (s *storeOperate) Close() error { + return nil +} diff --git a/pkg/config/model.go b/pkg/config/model.go new file mode 100644 index 00000000..87f2642e --- /dev/null +++ b/pkg/config/model.go @@ -0,0 +1,224 @@ +/* + * Licensed to Apache Software Foundation (ASF) under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Apache Software Foundation (ASF) licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package config + +import ( + "bytes" + "encoding/json" + "io/ioutil" + "path/filepath" + "regexp" + "strconv" +) + +import ( + "github.com/ghodss/yaml" + + "github.com/pkg/errors" +) + +import ( + "github.com/arana-db/arana/pkg/util/file" +) + +type ( + TypeMeta struct { + Kind string `yaml:"kind" json:"kind,omitempty"` + APIVersion string `yaml:"apiVersion" json:"apiVersion,omitempty"` + } + + Configuration struct { + TypeMeta + Metadata map[string]interface{} `yaml:"metadata" json:"metadata"` + Data *Data `validate:"required" yaml:"data" json:"data"` + } + + // DataSourceType is the data source type + DataSourceType string + + // SocketAddress specify either a logical or physical address and port, which are + // used to tell server where to bind/listen, connect to upstream and find + // management servers + SocketAddress struct { + Address string `default:"0.0.0.0" yaml:"address" json:"address"` + Port int `default:"8881" yaml:"port" json:"port"` + } + + Data struct { + Filters []*Filter `yaml:"filters" json:"filters,omitempty"` + Listeners []*Listener `validate:"required" yaml:"listeners" json:"listeners"` + Tenants []*Tenant `validate:"required" yaml:"tenants" json:"tenants"` + DataSourceClusters []*DataSourceCluster `validate:"required" yaml:"clusters" json:"clusters"` + ShardingRule *ShardingRule `yaml:"sharding_rule,omitempty" json:"sharding_rule,omitempty"` + } + + Filter struct { + Name string `json:"name,omitempty"` + Config json.RawMessage `json:"config,omitempty"` + } + + Tenant struct { + Name string `validate:"required" yaml:"name" json:"name"` + Users []*User `validate:"required" yaml:"users" json:"users"` + } + + DataSourceCluster struct { + Name string `yaml:"name" json:"name"` + Type DataSourceType `yaml:"type" json:"type"` + SqlMaxLimit int `default:"-1" yaml:"sql_max_limit" json:"sql_max_limit,omitempty"` + Tenant string `yaml:"tenant" json:"tenant"` + ConnProps *ConnProp `yaml:"conn_props" json:"conn_props,omitempty"` + Groups []*Group `yaml:"groups" json:"groups"` + } + + ConnProp struct { + Capacity int `yaml:"capacity" json:"capacity,omitempty"` // connection pool capacity + MaxCapacity int `yaml:"max_capacity" json:"max_capacity,omitempty"` // max connection pool capacity + IdleTimeout int `yaml:"idle_timeout" json:"idle_timeout,omitempty"` // close backend direct connection after idle_timeout + } + + Group struct { + Name string `yaml:"name" json:"name"` + Nodes []*Node `yaml:"nodes" json:"nodes"` + } + + Node struct { + Name string `yaml:"name" json:"name"` + Host string `yaml:"host" json:"host"` + Port int `yaml:"port" json:"port"` + Username string `yaml:"username" json:"username"` + Password string `yaml:"password" json:"password"` + Database string `yaml:"database" json:"database"` + ConnProps map[string]string `yaml:"conn_props" json:"conn_props,omitempty"` + Weight string `default:"r10w10" yaml:"weight" json:"weight"` + Labels map[string]string `yaml:"labels" json:"labels,omitempty"` + } + + ShardingRule struct { + Tables []*Table `yaml:"tables" json:"tables"` + } + + Listener struct { + ProtocolType string `yaml:"protocol_type" json:"protocol_type"` + SocketAddress *SocketAddress `yaml:"socket_address" json:"socket_address"` + ServerVersion string `yaml:"server_version" json:"server_version"` + } + + User struct { + Username string `yaml:"username" json:"username"` + Password string `yaml:"password" json:"password"` + } + + Table struct { + Name string `yaml:"name" json:"name"` + AllowFullScan bool `yaml:"allow_full_scan" json:"allow_full_scan,omitempty"` + DbRules []*Rule `yaml:"db_rules" json:"db_rules"` + TblRules []*Rule `yaml:"tbl_rules" json:"tbl_rules"` + Topology *Topology `yaml:"topology" json:"topology"` + ShadowTopology *Topology `yaml:"shadow_topology" json:"shadow_topology"` + Attributes map[string]string `yaml:"attributes" json:"attributes"` + } + + Rule struct { + Column string `yaml:"column" json:"column"` + Expr string `yaml:"expr" json:"expr"` + } + + Topology struct { + DbPattern string `yaml:"db_pattern" json:"db_pattern"` + TblPattern string `yaml:"tbl_pattern" json:"tbl_pattern"` + } +) + +func ParseV2(path string) (*Configuration, error) { + content, err := ioutil.ReadFile(path) + if err != nil { + return nil, errors.Wrap(err, "failed to load config") + } + + if !file.IsYaml(path) { + return nil, errors.Errorf("invalid config file format: %s", filepath.Ext(path)) + } + + var cfg Configuration + if err = yaml.Unmarshal(content, &cfg); err != nil { + return nil, errors.Wrapf(err, "failed to unmarshal config") + } + + return &cfg, nil +} + +// LoadV2 loads the configuration. +func LoadV2(path string) (*Configuration, error) { + configPath, _ := filepath.Abs(path) + cfg, err := ParseV2(configPath) + if err != nil { + return nil, errors.WithStack(err) + } + return cfg, nil +} + +var reg = regexp.MustCompile(`^[rR]([0-9]+)[wW]([0-9]+)$`) + +func (d *Node) GetReadAndWriteWeight() (int, int, error) { + items := reg.FindStringSubmatch(d.Weight) + if len(items) != 3 { + return 0, 0, errors.New("weight config should be r10w10") + } + readWeight, err := strconv.Atoi(items[1]) + if err != nil { + return 0, 0, err + } + writeWeight, err := strconv.Atoi(items[2]) + if err != nil { + return 0, 0, err + } + + return readWeight, writeWeight, nil +} + +func (d *Node) String() string { + b, _ := json.Marshal(d) + return string(b) +} + +func (t *ProtocolType) UnmarshalText(text []byte) error { + if t == nil { + return errors.New("can't unmarshal a nil *ProtocolType") + } + if !t.unmarshalText(bytes.ToLower(text)) { + return errors.Errorf("unrecognized protocol type: %q", text) + } + return nil +} + +func (t *ProtocolType) unmarshalText(text []byte) bool { + protocolType := string(text) + switch protocolType { + case "mysql": + *t = MySQL + case "http": + *t = Http + default: + return false + } + return true +} diff --git a/pkg/config/config_test.go b/pkg/config/model_test.go similarity index 93% rename from pkg/config/config_test.go rename to pkg/config/model_test.go index c62c49a3..59f10023 100644 --- a/pkg/config/config_test.go +++ b/pkg/config/model_test.go @@ -29,10 +29,10 @@ import ( "github.com/arana-db/arana/testdata" ) -var fakeConfigPath = testdata.Path("fake_config.yaml") +var FakeConfigPath = testdata.Path("fake_config.yaml") func TestMetadataConf(t *testing.T) { - conf, err := LoadV2(fakeConfigPath) + conf, err := LoadV2(FakeConfigPath) assert.NoError(t, err) assert.NotNil(t, conf) @@ -45,14 +45,14 @@ func TestMetadataConf(t *testing.T) { } func TestDataSourceClustersConf(t *testing.T) { - conf, err := LoadV2(fakeConfigPath) + conf, err := LoadV2(FakeConfigPath) assert.NoError(t, err) assert.NotEqual(t, nil, conf) assert.Equal(t, 1, len(conf.Data.DataSourceClusters)) dataSourceCluster := conf.Data.DataSourceClusters[0] assert.Equal(t, "employee", dataSourceCluster.Name) - assert.Equal(t, DBMysql, dataSourceCluster.Type) + assert.Equal(t, DBMySQL, dataSourceCluster.Type) assert.Equal(t, -1, dataSourceCluster.SqlMaxLimit) assert.Equal(t, "arana", dataSourceCluster.Tenant) assert.NotNil(t, dataSourceCluster.ConnProps) @@ -76,7 +76,7 @@ func TestDataSourceClustersConf(t *testing.T) { } func TestShardingRuleConf(t *testing.T) { - conf, err := LoadV2(fakeConfigPath) + conf, err := LoadV2(FakeConfigPath) assert.NoError(t, err) assert.NotEqual(t, nil, conf) @@ -121,5 +121,5 @@ func TestUnmarshalText(t *testing.T) { var text = []byte("mysql") err := protocolType.UnmarshalText(text) assert.Nil(t, err) - assert.Equal(t, Mysql, protocolType) + assert.Equal(t, MySQL, protocolType) } diff --git a/pkg/util/file/file.go b/pkg/util/file/file.go new file mode 100644 index 00000000..ab5edfb8 --- /dev/null +++ b/pkg/util/file/file.go @@ -0,0 +1,33 @@ +/* + * Licensed to Apache Software Foundation (ASF) under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Apache Software Foundation (ASF) licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package file + +import ( + "path/filepath" +) + +func IsYaml(path string) bool { + ext := filepath.Ext(path) + if ext == ".yaml" || ext == ".yml" { + return true + } + return false +} diff --git a/testdata/fake_bootstrap.yaml b/testdata/fake_bootstrap.yaml new file mode 100644 index 00000000..81d66ea0 --- /dev/null +++ b/testdata/fake_bootstrap.yaml @@ -0,0 +1,7 @@ +config: + name: file + options: + path: ./fake_config.yaml + # name: etcd + # options: + # endpoints: "http://localhost:2382" \ No newline at end of file