Skip to content

Commit

Permalink
Implement Postgres SQL Plugin (cadence-workflow#2889)
Browse files Browse the repository at this point in the history
  • Loading branch information
longquanzheng authored Dec 6, 2019
1 parent b972d8d commit cba849d
Show file tree
Hide file tree
Showing 23 changed files with 2,234 additions and 5 deletions.
1 change: 1 addition & 0 deletions cmd/server/cadence.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (

"github.com/uber/cadence/common"
_ "github.com/uber/cadence/common/persistence/sql/sqlplugin/mysql" // needed to load mysql plugin
_ "github.com/uber/cadence/common/persistence/sql/sqlplugin/postgres" // needed to load postgres plugin
"github.com/uber/cadence/common/service/config"
"github.com/uber/cadence/tools/cassandra"
"github.com/uber/cadence/tools/sql"
Expand Down
1 change: 1 addition & 0 deletions cmd/tools/sql/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"os"

_ "github.com/uber/cadence/common/persistence/sql/sqlplugin/mysql" // needed to load mysql plugin
_ "github.com/uber/cadence/common/persistence/sql/sqlplugin/postgres" // needed to load postgres plugin
"github.com/uber/cadence/tools/sql"
)

Expand Down
132 changes: 132 additions & 0 deletions common/persistence/sql/sqlplugin/postgres/admin.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
// Copyright (c) 2019 Uber Technologies, Inc.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.

package postgres

import (
"fmt"
"time"
)

const (
readSchemaVersionQuery = `SELECT curr_version from schema_version where db_name=$1`

writeSchemaVersionQuery = `INSERT into schema_version(db_name, creation_time, curr_version, min_compatible_version) VALUES ($1,$2,$3,$4)
ON CONFLICT (db_name) DO UPDATE
SET creation_time = excluded.creation_time,
curr_version = excluded.curr_version,
min_compatible_version = excluded.min_compatible_version;`

writeSchemaUpdateHistoryQuery = `INSERT into schema_update_history(year, month, update_time, old_version, new_version, manifest_md5, description) VALUES($1,$2,$3,$4,$5,$6,$7)`

createSchemaVersionTableQuery = `CREATE TABLE schema_version(db_name VARCHAR(255) not null PRIMARY KEY, ` +
`creation_time TIMESTAMP, ` +
`curr_version VARCHAR(64), ` +
`min_compatible_version VARCHAR(64));`

createSchemaUpdateHistoryTableQuery = `CREATE TABLE schema_update_history(` +
`year int not null, ` +
`month int not null, ` +
`update_time TIMESTAMP not null, ` +
`description VARCHAR(255), ` +
`manifest_md5 VARCHAR(64), ` +
`new_version VARCHAR(64), ` +
`old_version VARCHAR(64), ` +
`PRIMARY KEY (year, month, update_time));`

// NOTE we have to use %v because somehow postgres doesn't work with ? here
// It's a small bug in sqlx library
// TODO https://github.com/uber/cadence/issues/2893
createDatabaseQuery = "CREATE database %v"

dropDatabaseQuery = "Drop database %v"

listTablesQuery = "select table_name from information_schema.tables where table_schema='public'"

dropTableQuery = "DROP TABLE %v"
)

// CreateSchemaVersionTables sets up the schema version tables
func (pdb *db) CreateSchemaVersionTables() error {
if err := pdb.Exec(createSchemaVersionTableQuery); err != nil {
return err
}
return pdb.Exec(createSchemaUpdateHistoryTableQuery)
}

// ReadSchemaVersion returns the current schema version for the keyspace
func (pdb *db) ReadSchemaVersion(database string) (string, error) {
var version string
err := pdb.db.Get(&version, readSchemaVersionQuery, database)
return version, err
}

// UpdateSchemaVersion updates the schema version for the keyspace
func (pdb *db) UpdateSchemaVersion(database string, newVersion string, minCompatibleVersion string) error {
return pdb.Exec(writeSchemaVersionQuery, database, time.Now(), newVersion, minCompatibleVersion)
}

// WriteSchemaUpdateLog adds an entry to the schema update history table
func (pdb *db) WriteSchemaUpdateLog(oldVersion string, newVersion string, manifestMD5 string, desc string) error {
now := time.Now().UTC()
return pdb.Exec(writeSchemaUpdateHistoryQuery, now.Year(), int(now.Month()), now, oldVersion, newVersion, manifestMD5, desc)
}

// Exec executes a sql statement
func (pdb *db) Exec(stmt string, args ...interface{}) error {
_, err := pdb.db.Exec(stmt, args...)
return err
}

// ListTables returns a list of tables in this database
func (pdb *db) ListTables(database string) ([]string, error) {
var tables []string
err := pdb.db.Select(&tables, fmt.Sprintf(listTablesQuery))
return tables, err
}

// DropTable drops a given table from the database
func (pdb *db) DropTable(name string) error {
return pdb.Exec(fmt.Sprintf(dropTableQuery, name))
}

// DropAllTables drops all tables from this database
func (pdb *db) DropAllTables(database string) error {
tables, err := pdb.ListTables(database)
if err != nil {
return err
}
for _, tab := range tables {
if err := pdb.DropTable(tab); err != nil {
return err
}
}
return nil
}

// CreateDatabase creates a database if it doesn't exist
func (pdb *db) CreateDatabase(name string) error {
return pdb.Exec(fmt.Sprintf(createDatabaseQuery, name))
}

// DropDatabase drops a database
func (pdb *db) DropDatabase(name string) error {
return pdb.Exec(fmt.Sprintf(dropDatabaseQuery, name))
}
90 changes: 90 additions & 0 deletions common/persistence/sql/sqlplugin/postgres/db.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
// Copyright (c) 2019 Uber Technologies, Inc.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.

package postgres

import (
"github.com/jmoiron/sqlx"
"github.com/lib/pq"

"github.com/uber/cadence/common/persistence/sql/sqlplugin"
)

// db represents a logical connection to mysql database
type db struct {
db *sqlx.DB
tx *sqlx.Tx
conn sqlplugin.Conn
converter DataConverter
}

var _ sqlplugin.DB = (*db)(nil)
var _ sqlplugin.Tx = (*db)(nil)

// ErrDupEntry indicates a duplicate primary key i.e. the row already exists,
// check http://www.postgresql.org/docs/9.3/static/errcodes-appendix.html
const ErrDupEntry = "23505"

func (pdb *db) IsDupEntryError(err error) bool {
sqlErr, ok := err.(*pq.Error)
return ok && sqlErr.Code == ErrDupEntry
}

// NewDB returns an instance of DB, which is a logical
// connection to the underlying mysql database
// Fixme we need to ignore this Lint warning
func NewDB(xdb *sqlx.DB, tx *sqlx.Tx) *db {
mdb := &db{db: xdb, tx: tx}
mdb.conn = xdb
if tx != nil {
mdb.conn = tx
}
mdb.converter = &converter{}
return mdb
}

// BeginTx starts a new transaction and returns a reference to the Tx object
func (pdb *db) BeginTx() (sqlplugin.Tx, error) {
xtx, err := pdb.db.Beginx()
if err != nil {
return nil, err
}
return NewDB(pdb.db, xtx), nil
}

// Commit commits a previously started transaction
func (pdb *db) Commit() error {
return pdb.tx.Commit()
}

// Rollback triggers rollback of a previously started transaction
func (pdb *db) Rollback() error {
return pdb.tx.Rollback()
}

// Close closes the connection to the mysql db
func (pdb *db) Close() error {
return pdb.db.Close()
}

// PluginName returns the name of the mysql plugin
func (pdb *db) PluginName() string {
return PluginName
}
140 changes: 140 additions & 0 deletions common/persistence/sql/sqlplugin/postgres/domain.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
// Copyright (c) 2019 Uber Technologies, Inc.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.

package postgres

import (
"database/sql"
"errors"

"github.com/uber/cadence/common/persistence/sql/sqlplugin"
)

const (
createDomainQuery = `INSERT INTO
domains (id, name, is_global, data, data_encoding)
VALUES($1, $2, $3, $4, $5)`

updateDomainQuery = `UPDATE domains
SET name = $1, data = $2, data_encoding = $3
WHERE shard_id=54321 AND id = $4`

getDomainPart = `SELECT id, name, is_global, data, data_encoding FROM domains`

getDomainByIDQuery = getDomainPart + ` WHERE shard_id=$1 AND id = $2`
getDomainByNameQuery = getDomainPart + ` WHERE shard_id=$1 AND name = $2`

listDomainsQuery = getDomainPart + ` WHERE shard_id=$1 ORDER BY id LIMIT $2`
listDomainsRangeQuery = getDomainPart + ` WHERE shard_id=$1 AND id > $2 ORDER BY id LIMIT $3`

deleteDomainByIDQuery = `DELETE FROM domains WHERE shard_id=$1 AND id = $2`
deleteDomainByNameQuery = `DELETE FROM domains WHERE shard_id=$1 AND name = $2`

getDomainMetadataQuery = `SELECT notification_version FROM domain_metadata`
lockDomainMetadataQuery = `SELECT notification_version FROM domain_metadata FOR UPDATE`
updateDomainMetadataQuery = `UPDATE domain_metadata SET notification_version = $1 WHERE notification_version = $2`
)

const (
shardID = 54321
)

var errMissingArgs = errors.New("missing one or more args for API")

// InsertIntoDomain inserts a single row into domains table
func (pdb *db) InsertIntoDomain(row *sqlplugin.DomainRow) (sql.Result, error) {
return pdb.conn.Exec(createDomainQuery, row.ID, row.Name, row.IsGlobal, row.Data, row.DataEncoding)
}

// UpdateDomain updates a single row in domains table
func (pdb *db) UpdateDomain(row *sqlplugin.DomainRow) (sql.Result, error) {
return pdb.conn.Exec(updateDomainQuery, row.Name, row.Data, row.DataEncoding, row.ID)
}

// SelectFromDomain reads one or more rows from domains table
func (pdb *db) SelectFromDomain(filter *sqlplugin.DomainFilter) ([]sqlplugin.DomainRow, error) {
switch {
case filter.ID != nil || filter.Name != nil:
return pdb.selectFromDomain(filter)
case filter.PageSize != nil && *filter.PageSize > 0:
return pdb.selectAllFromDomain(filter)
default:
return nil, errMissingArgs
}
}

func (pdb *db) selectFromDomain(filter *sqlplugin.DomainFilter) ([]sqlplugin.DomainRow, error) {
var err error
var row sqlplugin.DomainRow
switch {
case filter.ID != nil:
err = pdb.conn.Get(&row, getDomainByIDQuery, shardID, *filter.ID)
case filter.Name != nil:
err = pdb.conn.Get(&row, getDomainByNameQuery, shardID, *filter.Name)
}
if err != nil {
return nil, err
}
return []sqlplugin.DomainRow{row}, err
}

func (pdb *db) selectAllFromDomain(filter *sqlplugin.DomainFilter) ([]sqlplugin.DomainRow, error) {
var err error
var rows []sqlplugin.DomainRow
switch {
case filter.GreaterThanID != nil:
err = pdb.conn.Select(&rows, listDomainsRangeQuery, shardID, *filter.GreaterThanID, *filter.PageSize)
default:
err = pdb.conn.Select(&rows, listDomainsQuery, shardID, filter.PageSize)
}
return rows, err
}

// DeleteFromDomain deletes a single row in domains table
func (pdb *db) DeleteFromDomain(filter *sqlplugin.DomainFilter) (sql.Result, error) {
var err error
var result sql.Result
switch {
case filter.ID != nil:
result, err = pdb.conn.Exec(deleteDomainByIDQuery, shardID, filter.ID)
default:
result, err = pdb.conn.Exec(deleteDomainByNameQuery, shardID, filter.Name)
}
return result, err
}

// LockDomainMetadata acquires a write lock on a single row in domain_metadata table
func (pdb *db) LockDomainMetadata() error {
var row sqlplugin.DomainMetadataRow
err := pdb.conn.Get(&row.NotificationVersion, lockDomainMetadataQuery)
return err
}

// SelectFromDomainMetadata reads a single row in domain_metadata table
func (pdb *db) SelectFromDomainMetadata() (*sqlplugin.DomainMetadataRow, error) {
var row sqlplugin.DomainMetadataRow
err := pdb.conn.Get(&row.NotificationVersion, getDomainMetadataQuery)
return &row, err
}

// UpdateDomainMetadata updates a single row in domain_metadata table
func (pdb *db) UpdateDomainMetadata(row *sqlplugin.DomainMetadataRow) (sql.Result, error) {
return pdb.conn.Exec(updateDomainMetadataQuery, row.NotificationVersion+1, row.NotificationVersion)
}
Loading

0 comments on commit cba849d

Please sign in to comment.