Skip to content

Commit

Permalink
[Advanced Visibility with SQL] Adding MySQL 8 schema and interface (t…
Browse files Browse the repository at this point in the history
…emporalio#3552)

* Create base schema for MySQL 8

* MySQL 8 schema changes to support advanced visibility

* Create base MySQL 8 db interface and configs
  • Loading branch information
rodrigozhou authored Jan 31, 2023
1 parent 416abfb commit fa429a5
Show file tree
Hide file tree
Showing 50 changed files with 1,520 additions and 19 deletions.
14 changes: 14 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -361,6 +361,17 @@ install-schema-mysql: temporal-sql-tool
./temporal-sql-tool -u $(SQL_USER) --pw $(SQL_PASSWORD) --db $(VISIBILITY_DB) setup-schema -v 0.0
./temporal-sql-tool -u $(SQL_USER) --pw $(SQL_PASSWORD) --db $(VISIBILITY_DB) update-schema -d ./schema/mysql/v57/visibility/versioned

install-schema-mysql8: temporal-sql-tool
@printf $(COLOR) "Install MySQL schema..."
./temporal-sql-tool -u $(SQL_USER) --pw $(SQL_PASSWORD) --db $(TEMPORAL_DB) drop -f
./temporal-sql-tool -u $(SQL_USER) --pw $(SQL_PASSWORD) --db $(TEMPORAL_DB) create
./temporal-sql-tool -u $(SQL_USER) --pw $(SQL_PASSWORD) --db $(TEMPORAL_DB) setup-schema -v 0.0
./temporal-sql-tool -u $(SQL_USER) --pw $(SQL_PASSWORD) --db $(TEMPORAL_DB) update-schema -d ./schema/mysql/v8/temporal/versioned
./temporal-sql-tool -u $(SQL_USER) --pw $(SQL_PASSWORD) --db $(VISIBILITY_DB) drop -f
./temporal-sql-tool -u $(SQL_USER) --pw $(SQL_PASSWORD) --db $(VISIBILITY_DB) create
./temporal-sql-tool -u $(SQL_USER) --pw $(SQL_PASSWORD) --db $(VISIBILITY_DB) setup-schema -v 0.0
./temporal-sql-tool -u $(SQL_USER) --pw $(SQL_PASSWORD) --db $(VISIBILITY_DB) update-schema -d ./schema/mysql/v8/visibility/versioned

install-schema-postgresql: temporal-sql-tool
@printf $(COLOR) "Install Postgres schema..."
./temporal-sql-tool -u $(SQL_USER) --pw $(SQL_PASSWORD) -p 5432 --pl postgres --db $(TEMPORAL_DB) drop -f
Expand Down Expand Up @@ -430,6 +441,9 @@ start-es: temporal-server
start-mysql: temporal-server
./temporal-server --env development-mysql --allow-no-auth start

start-mysql8: temporal-server
./temporal-server --env development-mysql8 --allow-no-auth start

start-mysql-es: temporal-server
./temporal-server --env development-mysql-es --allow-no-auth start

Expand Down
4 changes: 2 additions & 2 deletions common/persistence/persistence-tests/persistenceTestBase.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ func NewTestBaseWithSQL(options *TestBaseOptions) TestBase {

if options.DBPort == 0 {
switch options.SQLDBPluginName {
case mysql.PluginName:
case mysql.PluginName, mysql.PluginNameV8:
options.DBPort = environment.GetMySQLPort()
case postgresql.PluginName:
options.DBPort = environment.GetPostgreSQLPort()
Expand All @@ -148,7 +148,7 @@ func NewTestBaseWithSQL(options *TestBaseOptions) TestBase {
}
if options.DBHost == "" {
switch options.SQLDBPluginName {
case mysql.PluginName:
case mysql.PluginName, mysql.PluginNameV8:
options.DBHost = environment.GetMySQLAddress()
case postgresql.PluginName:
options.DBHost = environment.GetPostgreSQLAddress()
Expand Down
20 changes: 17 additions & 3 deletions common/persistence/persistence-tests/setup.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,10 @@ import (
)

const (
testMySQLUser = "temporal"
testMySQLPassword = "temporal"
testMySQLSchemaDir = "schema/mysql/v57"
testMySQLUser = "temporal"
testMySQLPassword = "temporal"
testMySQLSchemaDir = "schema/mysql/v57"
testMySQL8SchemaDir = "schema/mysql/v8"

testPostgreSQLUser = "temporal"
testPostgreSQLPassword = "temporal"
Expand All @@ -61,6 +62,19 @@ func GetMySQLTestClusterOption() *TestBaseOptions {
}
}

// GetMySQL8TestClusterOption return test options
func GetMySQL8TestClusterOption() *TestBaseOptions {
return &TestBaseOptions{
SQLDBPluginName: mysql.PluginNameV8,
DBUsername: testMySQLUser,
DBPassword: testMySQLPassword,
DBHost: environment.GetMySQLAddress(),
DBPort: environment.GetMySQLPort(),
SchemaDir: testMySQL8SchemaDir,
StoreType: config.StoreTypeSQL,
}
}

// GetPostgreSQLTestClusterOption return test options
func GetPostgreSQLTestClusterOption() *TestBaseOptions {
return &TestBaseOptions{
Expand Down
6 changes: 3 additions & 3 deletions common/persistence/sql/sqlplugin/mysql/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ import (

"go.temporal.io/server/common/persistence/schema"
"go.temporal.io/server/common/persistence/sql/sqlplugin"
mysqlschema "go.temporal.io/server/schema/mysql"
mysqlschemaV57 "go.temporal.io/server/schema/mysql/v57"
)

// db represents a logical connection to mysql database
Expand Down Expand Up @@ -120,9 +120,9 @@ func (mdb *db) DbName() string {
func (mdb *db) ExpectedVersion() string {
switch mdb.dbKind {
case sqlplugin.DbKindMain:
return mysqlschema.Version
return mysqlschemaV57.Version
case sqlplugin.DbKindVisibility:
return mysqlschema.VisibilityVersion
return mysqlschemaV57.VisibilityVersion
default:
panic(fmt.Sprintf("unknown db kind %v", mdb.dbKind))
}
Expand Down
101 changes: 101 additions & 0 deletions common/persistence/sql/sqlplugin/mysql/db_v8.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
// The MIT License
//
// Copyright (c) 2020 Temporal Technologies Inc. All rights reserved.
//
// Copyright (c) 2020 Uber Technologies, Inc.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.

package mysql

import (
"context"
"fmt"

"github.com/jmoiron/sqlx"

"go.temporal.io/server/common/persistence/schema"
"go.temporal.io/server/common/persistence/sql/sqlplugin"
mysqlschemaV8 "go.temporal.io/server/schema/mysql/v8"
)

// db represents a logical connection to mysql database
type dbV8 struct {
db
}

var _ sqlplugin.AdminDB = (*dbV8)(nil)
var _ sqlplugin.DB = (*dbV8)(nil)
var _ sqlplugin.Tx = (*dbV8)(nil)

// newDB returns an instance of DB, which is a logical
// connection to the underlying mysql database
func newDBV8(
dbKind sqlplugin.DbKind,
dbName string,
xdb *sqlx.DB,
tx *sqlx.Tx,
) *dbV8 {
mdb := &dbV8{
db: db{
dbKind: dbKind,
dbName: dbName,
db: xdb,
tx: tx,
},
}
mdb.conn = xdb
if tx != nil {
mdb.conn = tx
}
mdb.converter = &converter{}
return mdb
}

// BeginTx starts a new transaction and returns a reference to the Tx object
func (mdb *dbV8) BeginTx(ctx context.Context) (sqlplugin.Tx, error) {
xtx, err := mdb.db.db.BeginTxx(ctx, nil)
if err != nil {
return nil, err
}
return newDBV8(mdb.dbKind, mdb.dbName, mdb.db.db, xtx), nil
}

// PluginName returns the name of the mysql plugin
func (mdb *dbV8) PluginName() string {
return PluginNameV8
}

// ExpectedVersion returns expected version.
func (mdb *dbV8) ExpectedVersion() string {
switch mdb.dbKind {
case sqlplugin.DbKindMain:
return mysqlschemaV8.Version
case sqlplugin.DbKindVisibility:
return mysqlschemaV8.VisibilityVersion
default:
panic(fmt.Sprintf("unknown db kind %v", mdb.dbKind))
}
}

// VerifyVersion verify schema version is up to date
func (mdb *dbV8) VerifyVersion() error {
expectedVersion := mdb.ExpectedVersion()
return schema.VerifyCompatibleVersion(mdb, mdb.dbName, expectedVersion)
}
75 changes: 75 additions & 0 deletions common/persistence/sql/sqlplugin/mysql/plugin_v8.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
// The MIT License
//
// Copyright (c) 2020 Temporal Technologies Inc. All rights reserved.
//
// Copyright (c) 2020 Uber Technologies, Inc.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.

package mysql

import (
"go.temporal.io/server/common/config"
"go.temporal.io/server/common/persistence/sql"
"go.temporal.io/server/common/persistence/sql/sqlplugin"
"go.temporal.io/server/common/resolver"
)

const (
// PluginName is the name of the plugin
PluginNameV8 = "mysql8"
)

type pluginV8 struct {
plugin
}

var _ sqlplugin.Plugin = (*pluginV8)(nil)

func init() {
sql.RegisterPlugin(PluginNameV8, &pluginV8{})
}

// CreateDB initialize the db object
func (p *pluginV8) CreateDB(
dbKind sqlplugin.DbKind,
cfg *config.SQL,
r resolver.ServiceResolver,
) (sqlplugin.DB, error) {
conn, err := p.createDBConnection(cfg, r)
if err != nil {
return nil, err
}
db := newDBV8(dbKind, cfg.DatabaseName, conn, nil)
return db, nil
}

// CreateAdminDB initialize the db object
func (p *pluginV8) CreateAdminDB(
dbKind sqlplugin.DbKind,
cfg *config.SQL,
r resolver.ServiceResolver,
) (sqlplugin.AdminDB, error) {
conn, err := p.createDBConnection(cfg, r)
if err != nil {
return nil, err
}
db := newDBV8(dbKind, cfg.DatabaseName, conn, nil)
return db, nil
}
35 changes: 35 additions & 0 deletions common/persistence/tests/mysql_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,13 @@ func TestMySQLVisibilityPersistenceSuite(t *testing.T) {
suite.Run(t, s)
}

func TestMySQL8VisibilityPersistenceSuite(t *testing.T) {
s := &VisibilityPersistenceSuite{
TestBase: persistencetests.NewTestBaseWithSQL(persistencetests.GetMySQL8TestClusterOption()),
}
suite.Run(t, s)
}

// TODO: Merge persistence-tests into the tests directory.

func TestMySQLHistoryV2PersistenceSuite(t *testing.T) {
Expand Down Expand Up @@ -182,6 +189,34 @@ func TestMySQLClusterMetadataPersistence(t *testing.T) {
suite.Run(t, s)
}

func TestMySQL8HistoryV2PersistenceSuite(t *testing.T) {
s := new(persistencetests.HistoryV2PersistenceSuite)
s.TestBase = persistencetests.NewTestBaseWithSQL(persistencetests.GetMySQL8TestClusterOption())
s.TestBase.Setup(nil)
suite.Run(t, s)
}

func TestMySQL8MetadataPersistenceSuiteV2(t *testing.T) {
s := new(persistencetests.MetadataPersistenceSuiteV2)
s.TestBase = persistencetests.NewTestBaseWithSQL(persistencetests.GetMySQL8TestClusterOption())
s.TestBase.Setup(nil)
suite.Run(t, s)
}

func TestMySQL8QueuePersistence(t *testing.T) {
s := new(persistencetests.QueuePersistenceSuite)
s.TestBase = persistencetests.NewTestBaseWithSQL(persistencetests.GetMySQL8TestClusterOption())
s.TestBase.Setup(nil)
suite.Run(t, s)
}

func TestMySQL8ClusterMetadataPersistence(t *testing.T) {
s := new(persistencetests.ClusterMetadataManagerSuite)
s.TestBase = persistencetests.NewTestBaseWithSQL(persistencetests.GetMySQL8TestClusterOption())
s.TestBase.Setup(nil)
suite.Run(t, s)
}

// SQL Store tests

func TestMySQLNamespaceSuite(t *testing.T) {
Expand Down
8 changes: 4 additions & 4 deletions common/persistence/tests/mysql_test_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ import (
p "go.temporal.io/server/common/persistence"
"go.temporal.io/server/common/persistence/sql"
"go.temporal.io/server/common/persistence/sql/sqlplugin"
_ "go.temporal.io/server/common/persistence/sql/sqlplugin/mysql"
"go.temporal.io/server/common/persistence/sql/sqlplugin/mysql"
"go.temporal.io/server/common/resolver"
"go.temporal.io/server/common/shuffle"
"go.temporal.io/server/environment"
Expand All @@ -56,8 +56,8 @@ const (

// TODO hard code this dir for now
// need to merge persistence test config / initialization in one place
testMySQLExecutionSchema = "../../../schema/mysql/v57/temporal/schema.sql"
testMySQLVisibilitySchema = "../../../schema/mysql/v57/visibility/schema.sql"
testMySQLExecutionSchema = "../../../schema/mysql/v8/temporal/schema.sql"
testMySQLVisibilitySchema = "../../../schema/mysql/v8/visibility/schema.sql"
)

type (
Expand Down Expand Up @@ -100,7 +100,7 @@ func NewMySQLConfig() *config.SQL {
strconv.Itoa(environment.GetMySQLPort()),
),
ConnectProtocol: testMySQLConnectionProtocol,
PluginName: "mysql",
PluginName: mysql.PluginNameV8,
DatabaseName: testMySQLDatabaseNamePrefix + shuffle.String(testMySQLDatabaseNameSuffix),
}
}
Expand Down
Loading

0 comments on commit fa429a5

Please sign in to comment.