From cba849dfb19f13d9d0dad03d5f87d8b6e9c78ce4 Mon Sep 17 00:00:00 2001 From: Quanzheng Long Date: Fri, 6 Dec 2019 11:53:22 -0800 Subject: [PATCH] Implement Postgres SQL Plugin (#2889) --- cmd/server/cadence.go | 1 + cmd/tools/sql/main.go | 1 + .../sql/sqlplugin/postgres/admin.go | 132 +++++++ .../persistence/sql/sqlplugin/postgres/db.go | 90 +++++ .../sql/sqlplugin/postgres/domain.go | 140 +++++++ .../sql/sqlplugin/postgres/events.go | 93 +++++ .../sql/sqlplugin/postgres/execution.go | 333 ++++++++++++++++ .../sql/sqlplugin/postgres/execution_maps.go | 371 ++++++++++++++++++ .../sql/sqlplugin/postgres/plugin.go | 93 +++++ .../postgres/postgres_server_test.go | 114 ++++++ .../sql/sqlplugin/postgres/queue.go | 113 ++++++ .../sql/sqlplugin/postgres/shard.go | 77 ++++ .../sql/sqlplugin/postgres/task.go | 185 +++++++++ .../sql/sqlplugin/postgres/typeconv.go | 62 +++ .../sql/sqlplugin/postgres/visibility.go | 227 +++++++++++ config/development_postgres.yaml | 128 ++++++ docker/buildkite/docker-compose-local.yml | 12 + docker/buildkite/docker-compose.yml | 13 + environment/env.go | 43 ++ go.mod | 3 +- go.sum | 4 +- schema/postgres/cadence/schema.sql | 2 +- .../postgres/cadence/versioned/v0.1/base.sql | 2 +- 23 files changed, 2234 insertions(+), 5 deletions(-) create mode 100644 common/persistence/sql/sqlplugin/postgres/admin.go create mode 100644 common/persistence/sql/sqlplugin/postgres/db.go create mode 100644 common/persistence/sql/sqlplugin/postgres/domain.go create mode 100644 common/persistence/sql/sqlplugin/postgres/events.go create mode 100644 common/persistence/sql/sqlplugin/postgres/execution.go create mode 100644 common/persistence/sql/sqlplugin/postgres/execution_maps.go create mode 100644 common/persistence/sql/sqlplugin/postgres/plugin.go create mode 100644 common/persistence/sql/sqlplugin/postgres/postgres_server_test.go create mode 100644 common/persistence/sql/sqlplugin/postgres/queue.go create mode 100644 common/persistence/sql/sqlplugin/postgres/shard.go create mode 100644 common/persistence/sql/sqlplugin/postgres/task.go create mode 100644 common/persistence/sql/sqlplugin/postgres/typeconv.go create mode 100644 common/persistence/sql/sqlplugin/postgres/visibility.go create mode 100644 config/development_postgres.yaml diff --git a/cmd/server/cadence.go b/cmd/server/cadence.go index 49bdb757972..2675b718639 100644 --- a/cmd/server/cadence.go +++ b/cmd/server/cadence.go @@ -31,6 +31,7 @@ import ( "github.com/uber/cadence/common" _ "github.com/uber/cadence/common/persistence/sql/sqlplugin/mysql" // needed to load mysql plugin + _ "github.com/uber/cadence/common/persistence/sql/sqlplugin/postgres" // needed to load postgres plugin "github.com/uber/cadence/common/service/config" "github.com/uber/cadence/tools/cassandra" "github.com/uber/cadence/tools/sql" diff --git a/cmd/tools/sql/main.go b/cmd/tools/sql/main.go index 2f01d5cc628..e361945c67e 100644 --- a/cmd/tools/sql/main.go +++ b/cmd/tools/sql/main.go @@ -24,6 +24,7 @@ import ( "os" _ "github.com/uber/cadence/common/persistence/sql/sqlplugin/mysql" // needed to load mysql plugin + _ "github.com/uber/cadence/common/persistence/sql/sqlplugin/postgres" // needed to load postgres plugin "github.com/uber/cadence/tools/sql" ) diff --git a/common/persistence/sql/sqlplugin/postgres/admin.go b/common/persistence/sql/sqlplugin/postgres/admin.go new file mode 100644 index 00000000000..1b833543b35 --- /dev/null +++ b/common/persistence/sql/sqlplugin/postgres/admin.go @@ -0,0 +1,132 @@ +// Copyright (c) 2019 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package postgres + +import ( + "fmt" + "time" +) + +const ( + readSchemaVersionQuery = `SELECT curr_version from schema_version where db_name=$1` + + writeSchemaVersionQuery = `INSERT into schema_version(db_name, creation_time, curr_version, min_compatible_version) VALUES ($1,$2,$3,$4) + ON CONFLICT (db_name) DO UPDATE + SET creation_time = excluded.creation_time, + curr_version = excluded.curr_version, + min_compatible_version = excluded.min_compatible_version;` + + writeSchemaUpdateHistoryQuery = `INSERT into schema_update_history(year, month, update_time, old_version, new_version, manifest_md5, description) VALUES($1,$2,$3,$4,$5,$6,$7)` + + createSchemaVersionTableQuery = `CREATE TABLE schema_version(db_name VARCHAR(255) not null PRIMARY KEY, ` + + `creation_time TIMESTAMP, ` + + `curr_version VARCHAR(64), ` + + `min_compatible_version VARCHAR(64));` + + createSchemaUpdateHistoryTableQuery = `CREATE TABLE schema_update_history(` + + `year int not null, ` + + `month int not null, ` + + `update_time TIMESTAMP not null, ` + + `description VARCHAR(255), ` + + `manifest_md5 VARCHAR(64), ` + + `new_version VARCHAR(64), ` + + `old_version VARCHAR(64), ` + + `PRIMARY KEY (year, month, update_time));` + + // NOTE we have to use %v because somehow postgres doesn't work with ? here + // It's a small bug in sqlx library + // TODO https://github.com/uber/cadence/issues/2893 + createDatabaseQuery = "CREATE database %v" + + dropDatabaseQuery = "Drop database %v" + + listTablesQuery = "select table_name from information_schema.tables where table_schema='public'" + + dropTableQuery = "DROP TABLE %v" +) + +// CreateSchemaVersionTables sets up the schema version tables +func (pdb *db) CreateSchemaVersionTables() error { + if err := pdb.Exec(createSchemaVersionTableQuery); err != nil { + return err + } + return pdb.Exec(createSchemaUpdateHistoryTableQuery) +} + +// ReadSchemaVersion returns the current schema version for the keyspace +func (pdb *db) ReadSchemaVersion(database string) (string, error) { + var version string + err := pdb.db.Get(&version, readSchemaVersionQuery, database) + return version, err +} + +// UpdateSchemaVersion updates the schema version for the keyspace +func (pdb *db) UpdateSchemaVersion(database string, newVersion string, minCompatibleVersion string) error { + return pdb.Exec(writeSchemaVersionQuery, database, time.Now(), newVersion, minCompatibleVersion) +} + +// WriteSchemaUpdateLog adds an entry to the schema update history table +func (pdb *db) WriteSchemaUpdateLog(oldVersion string, newVersion string, manifestMD5 string, desc string) error { + now := time.Now().UTC() + return pdb.Exec(writeSchemaUpdateHistoryQuery, now.Year(), int(now.Month()), now, oldVersion, newVersion, manifestMD5, desc) +} + +// Exec executes a sql statement +func (pdb *db) Exec(stmt string, args ...interface{}) error { + _, err := pdb.db.Exec(stmt, args...) + return err +} + +// ListTables returns a list of tables in this database +func (pdb *db) ListTables(database string) ([]string, error) { + var tables []string + err := pdb.db.Select(&tables, fmt.Sprintf(listTablesQuery)) + return tables, err +} + +// DropTable drops a given table from the database +func (pdb *db) DropTable(name string) error { + return pdb.Exec(fmt.Sprintf(dropTableQuery, name)) +} + +// DropAllTables drops all tables from this database +func (pdb *db) DropAllTables(database string) error { + tables, err := pdb.ListTables(database) + if err != nil { + return err + } + for _, tab := range tables { + if err := pdb.DropTable(tab); err != nil { + return err + } + } + return nil +} + +// CreateDatabase creates a database if it doesn't exist +func (pdb *db) CreateDatabase(name string) error { + return pdb.Exec(fmt.Sprintf(createDatabaseQuery, name)) +} + +// DropDatabase drops a database +func (pdb *db) DropDatabase(name string) error { + return pdb.Exec(fmt.Sprintf(dropDatabaseQuery, name)) +} diff --git a/common/persistence/sql/sqlplugin/postgres/db.go b/common/persistence/sql/sqlplugin/postgres/db.go new file mode 100644 index 00000000000..334cb3b98da --- /dev/null +++ b/common/persistence/sql/sqlplugin/postgres/db.go @@ -0,0 +1,90 @@ +// Copyright (c) 2019 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package postgres + +import ( + "github.com/jmoiron/sqlx" + "github.com/lib/pq" + + "github.com/uber/cadence/common/persistence/sql/sqlplugin" +) + +// db represents a logical connection to mysql database +type db struct { + db *sqlx.DB + tx *sqlx.Tx + conn sqlplugin.Conn + converter DataConverter +} + +var _ sqlplugin.DB = (*db)(nil) +var _ sqlplugin.Tx = (*db)(nil) + +// ErrDupEntry indicates a duplicate primary key i.e. the row already exists, +// check http://www.postgresql.org/docs/9.3/static/errcodes-appendix.html +const ErrDupEntry = "23505" + +func (pdb *db) IsDupEntryError(err error) bool { + sqlErr, ok := err.(*pq.Error) + return ok && sqlErr.Code == ErrDupEntry +} + +// NewDB returns an instance of DB, which is a logical +// connection to the underlying mysql database +// Fixme we need to ignore this Lint warning +func NewDB(xdb *sqlx.DB, tx *sqlx.Tx) *db { + mdb := &db{db: xdb, tx: tx} + mdb.conn = xdb + if tx != nil { + mdb.conn = tx + } + mdb.converter = &converter{} + return mdb +} + +// BeginTx starts a new transaction and returns a reference to the Tx object +func (pdb *db) BeginTx() (sqlplugin.Tx, error) { + xtx, err := pdb.db.Beginx() + if err != nil { + return nil, err + } + return NewDB(pdb.db, xtx), nil +} + +// Commit commits a previously started transaction +func (pdb *db) Commit() error { + return pdb.tx.Commit() +} + +// Rollback triggers rollback of a previously started transaction +func (pdb *db) Rollback() error { + return pdb.tx.Rollback() +} + +// Close closes the connection to the mysql db +func (pdb *db) Close() error { + return pdb.db.Close() +} + +// PluginName returns the name of the mysql plugin +func (pdb *db) PluginName() string { + return PluginName +} diff --git a/common/persistence/sql/sqlplugin/postgres/domain.go b/common/persistence/sql/sqlplugin/postgres/domain.go new file mode 100644 index 00000000000..f037d9cb781 --- /dev/null +++ b/common/persistence/sql/sqlplugin/postgres/domain.go @@ -0,0 +1,140 @@ +// Copyright (c) 2019 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package postgres + +import ( + "database/sql" + "errors" + + "github.com/uber/cadence/common/persistence/sql/sqlplugin" +) + +const ( + createDomainQuery = `INSERT INTO + domains (id, name, is_global, data, data_encoding) + VALUES($1, $2, $3, $4, $5)` + + updateDomainQuery = `UPDATE domains + SET name = $1, data = $2, data_encoding = $3 + WHERE shard_id=54321 AND id = $4` + + getDomainPart = `SELECT id, name, is_global, data, data_encoding FROM domains` + + getDomainByIDQuery = getDomainPart + ` WHERE shard_id=$1 AND id = $2` + getDomainByNameQuery = getDomainPart + ` WHERE shard_id=$1 AND name = $2` + + listDomainsQuery = getDomainPart + ` WHERE shard_id=$1 ORDER BY id LIMIT $2` + listDomainsRangeQuery = getDomainPart + ` WHERE shard_id=$1 AND id > $2 ORDER BY id LIMIT $3` + + deleteDomainByIDQuery = `DELETE FROM domains WHERE shard_id=$1 AND id = $2` + deleteDomainByNameQuery = `DELETE FROM domains WHERE shard_id=$1 AND name = $2` + + getDomainMetadataQuery = `SELECT notification_version FROM domain_metadata` + lockDomainMetadataQuery = `SELECT notification_version FROM domain_metadata FOR UPDATE` + updateDomainMetadataQuery = `UPDATE domain_metadata SET notification_version = $1 WHERE notification_version = $2` +) + +const ( + shardID = 54321 +) + +var errMissingArgs = errors.New("missing one or more args for API") + +// InsertIntoDomain inserts a single row into domains table +func (pdb *db) InsertIntoDomain(row *sqlplugin.DomainRow) (sql.Result, error) { + return pdb.conn.Exec(createDomainQuery, row.ID, row.Name, row.IsGlobal, row.Data, row.DataEncoding) +} + +// UpdateDomain updates a single row in domains table +func (pdb *db) UpdateDomain(row *sqlplugin.DomainRow) (sql.Result, error) { + return pdb.conn.Exec(updateDomainQuery, row.Name, row.Data, row.DataEncoding, row.ID) +} + +// SelectFromDomain reads one or more rows from domains table +func (pdb *db) SelectFromDomain(filter *sqlplugin.DomainFilter) ([]sqlplugin.DomainRow, error) { + switch { + case filter.ID != nil || filter.Name != nil: + return pdb.selectFromDomain(filter) + case filter.PageSize != nil && *filter.PageSize > 0: + return pdb.selectAllFromDomain(filter) + default: + return nil, errMissingArgs + } +} + +func (pdb *db) selectFromDomain(filter *sqlplugin.DomainFilter) ([]sqlplugin.DomainRow, error) { + var err error + var row sqlplugin.DomainRow + switch { + case filter.ID != nil: + err = pdb.conn.Get(&row, getDomainByIDQuery, shardID, *filter.ID) + case filter.Name != nil: + err = pdb.conn.Get(&row, getDomainByNameQuery, shardID, *filter.Name) + } + if err != nil { + return nil, err + } + return []sqlplugin.DomainRow{row}, err +} + +func (pdb *db) selectAllFromDomain(filter *sqlplugin.DomainFilter) ([]sqlplugin.DomainRow, error) { + var err error + var rows []sqlplugin.DomainRow + switch { + case filter.GreaterThanID != nil: + err = pdb.conn.Select(&rows, listDomainsRangeQuery, shardID, *filter.GreaterThanID, *filter.PageSize) + default: + err = pdb.conn.Select(&rows, listDomainsQuery, shardID, filter.PageSize) + } + return rows, err +} + +// DeleteFromDomain deletes a single row in domains table +func (pdb *db) DeleteFromDomain(filter *sqlplugin.DomainFilter) (sql.Result, error) { + var err error + var result sql.Result + switch { + case filter.ID != nil: + result, err = pdb.conn.Exec(deleteDomainByIDQuery, shardID, filter.ID) + default: + result, err = pdb.conn.Exec(deleteDomainByNameQuery, shardID, filter.Name) + } + return result, err +} + +// LockDomainMetadata acquires a write lock on a single row in domain_metadata table +func (pdb *db) LockDomainMetadata() error { + var row sqlplugin.DomainMetadataRow + err := pdb.conn.Get(&row.NotificationVersion, lockDomainMetadataQuery) + return err +} + +// SelectFromDomainMetadata reads a single row in domain_metadata table +func (pdb *db) SelectFromDomainMetadata() (*sqlplugin.DomainMetadataRow, error) { + var row sqlplugin.DomainMetadataRow + err := pdb.conn.Get(&row.NotificationVersion, getDomainMetadataQuery) + return &row, err +} + +// UpdateDomainMetadata updates a single row in domain_metadata table +func (pdb *db) UpdateDomainMetadata(row *sqlplugin.DomainMetadataRow) (sql.Result, error) { + return pdb.conn.Exec(updateDomainMetadataQuery, row.NotificationVersion+1, row.NotificationVersion) +} diff --git a/common/persistence/sql/sqlplugin/postgres/events.go b/common/persistence/sql/sqlplugin/postgres/events.go new file mode 100644 index 00000000000..10956a82801 --- /dev/null +++ b/common/persistence/sql/sqlplugin/postgres/events.go @@ -0,0 +1,93 @@ +// Copyright (c) 2019 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package postgres + +import ( + "database/sql" + + "github.com/uber/cadence/common/persistence/sql/sqlplugin" +) + +const ( + // below are templates for history_node table + addHistoryNodesQuery = `INSERT INTO history_node (` + + `shard_id, tree_id, branch_id, node_id, txn_id, data, data_encoding) ` + + `VALUES (:shard_id, :tree_id, :branch_id, :node_id, :txn_id, :data, :data_encoding) ` + + getHistoryNodesQuery = `SELECT node_id, txn_id, data, data_encoding FROM history_node ` + + `WHERE shard_id = $1 AND tree_id = $2 AND branch_id = $3 AND node_id >= $4 and node_id < $5 ORDER BY shard_id, tree_id, branch_id, node_id, txn_id LIMIT $6 ` + + deleteHistoryNodesQuery = `DELETE FROM history_node WHERE shard_id = $1 AND tree_id = $2 AND branch_id = $3 AND node_id >= $4 ` + + // below are templates for history_tree table + addHistoryTreeQuery = `INSERT INTO history_tree (` + + `shard_id, tree_id, branch_id, data, data_encoding) ` + + `VALUES (:shard_id, :tree_id, :branch_id, :data, :data_encoding) ` + + getHistoryTreeQuery = `SELECT branch_id, data, data_encoding FROM history_tree WHERE shard_id = $1 AND tree_id = $2 ` + + deleteHistoryTreeQuery = `DELETE FROM history_tree WHERE shard_id = $1 AND tree_id = $2 AND branch_id = $3 ` +) + +// For history_node table: + +// InsertIntoHistoryNode inserts a row into history_node table +func (pdb *db) InsertIntoHistoryNode(row *sqlplugin.HistoryNodeRow) (sql.Result, error) { + // NOTE: Query 5.6 doesn't support clustering order, to workaround, we let txn_id multiple by -1 + *row.TxnID *= -1 + return pdb.conn.NamedExec(addHistoryNodesQuery, row) +} + +// SelectFromHistoryNode reads one or more rows from history_node table +func (pdb *db) SelectFromHistoryNode(filter *sqlplugin.HistoryNodeFilter) ([]sqlplugin.HistoryNodeRow, error) { + var rows []sqlplugin.HistoryNodeRow + err := pdb.conn.Select(&rows, getHistoryNodesQuery, + filter.ShardID, filter.TreeID, filter.BranchID, *filter.MinNodeID, *filter.MaxNodeID, *filter.PageSize) + // NOTE: since we let txn_id multiple by -1 when inserting, we have to revert it back here + for _, row := range rows { + *row.TxnID *= -1 + } + return rows, err +} + +// DeleteFromHistoryNode deletes one or more rows from history_node table +func (pdb *db) DeleteFromHistoryNode(filter *sqlplugin.HistoryNodeFilter) (sql.Result, error) { + return pdb.conn.Exec(deleteHistoryNodesQuery, filter.ShardID, filter.TreeID, filter.BranchID, *filter.MinNodeID) +} + +// For history_tree table: + +// InsertIntoHistoryTree inserts a row into history_tree table +func (pdb *db) InsertIntoHistoryTree(row *sqlplugin.HistoryTreeRow) (sql.Result, error) { + return pdb.conn.NamedExec(addHistoryTreeQuery, row) +} + +// SelectFromHistoryTree reads one or more rows from history_tree table +func (pdb *db) SelectFromHistoryTree(filter *sqlplugin.HistoryTreeFilter) ([]sqlplugin.HistoryTreeRow, error) { + var rows []sqlplugin.HistoryTreeRow + err := pdb.conn.Select(&rows, getHistoryTreeQuery, filter.ShardID, filter.TreeID) + return rows, err +} + +// DeleteFromHistoryTree deletes one or more rows from history_tree table +func (pdb *db) DeleteFromHistoryTree(filter *sqlplugin.HistoryTreeFilter) (sql.Result, error) { + return pdb.conn.Exec(deleteHistoryTreeQuery, filter.ShardID, filter.TreeID, *filter.BranchID) +} diff --git a/common/persistence/sql/sqlplugin/postgres/execution.go b/common/persistence/sql/sqlplugin/postgres/execution.go new file mode 100644 index 00000000000..55fc2916c67 --- /dev/null +++ b/common/persistence/sql/sqlplugin/postgres/execution.go @@ -0,0 +1,333 @@ +// Copyright (c) 2019 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package postgres + +import ( + "database/sql" + + "github.com/uber/cadence/common/persistence/sql/sqlplugin" +) + +const ( + executionsColumns = `shard_id, domain_id, workflow_id, run_id, next_event_id, last_write_version, data, data_encoding` + + createExecutionQuery = `INSERT INTO executions(` + executionsColumns + `) + VALUES(:shard_id, :domain_id, :workflow_id, :run_id, :next_event_id, :last_write_version, :data, :data_encoding)` + + updateExecutionQuery = `UPDATE executions SET + next_event_id = :next_event_id, last_write_version = :last_write_version, data = :data, data_encoding = :data_encoding + WHERE shard_id = :shard_id AND domain_id = :domain_id AND workflow_id = :workflow_id AND run_id = :run_id` + + getExecutionQuery = `SELECT ` + executionsColumns + ` FROM executions + WHERE shard_id = $1 AND domain_id = $2 AND workflow_id = $3 AND run_id = $4` + + deleteExecutionQuery = `DELETE FROM executions + WHERE shard_id = $1 AND domain_id = $2 AND workflow_id = $3 AND run_id = $4` + + lockExecutionQueryBase = `SELECT next_event_id FROM executions + WHERE shard_id = $1 AND domain_id = $2 AND workflow_id = $3 AND run_id = $4` + + writeLockExecutionQuery = lockExecutionQueryBase + ` FOR UPDATE` + readLockExecutionQuery = lockExecutionQueryBase + ` FOR SHARE` + + createCurrentExecutionQuery = `INSERT INTO current_executions +(shard_id, domain_id, workflow_id, run_id, create_request_id, state, close_status, start_version, last_write_version) VALUES +(:shard_id, :domain_id, :workflow_id, :run_id, :create_request_id, :state, :close_status, :start_version, :last_write_version)` + + deleteCurrentExecutionQuery = "DELETE FROM current_executions WHERE shard_id = $1 AND domain_id = $2 AND workflow_id = $3 AND run_id = $4" + + getCurrentExecutionQuery = `SELECT +shard_id, domain_id, workflow_id, run_id, create_request_id, state, close_status, start_version, last_write_version +FROM current_executions WHERE shard_id = $1 AND domain_id = $2 AND workflow_id = $3` + + lockCurrentExecutionJoinExecutionsQuery = `SELECT +ce.shard_id, ce.domain_id, ce.workflow_id, ce.run_id, ce.create_request_id, ce.state, ce.close_status, ce.start_version, e.last_write_version +FROM current_executions ce +INNER JOIN executions e ON e.shard_id = ce.shard_id AND e.domain_id = ce.domain_id AND e.workflow_id = ce.workflow_id AND e.run_id = ce.run_id +WHERE ce.shard_id = $1 AND ce.domain_id = $2 AND ce.workflow_id = $3 FOR UPDATE` + + lockCurrentExecutionQuery = getCurrentExecutionQuery + ` FOR UPDATE` + + updateCurrentExecutionsQuery = `UPDATE current_executions SET +run_id = :run_id, +create_request_id = :create_request_id, +state = :state, +close_status = :close_status, +start_version = :start_version, +last_write_version = :last_write_version +WHERE +shard_id = :shard_id AND +domain_id = :domain_id AND +workflow_id = :workflow_id +` + + getTransferTasksQuery = `SELECT task_id, data, data_encoding + FROM transfer_tasks WHERE shard_id = $1 AND task_id > $2 AND task_id <= $3` + + createTransferTasksQuery = `INSERT INTO transfer_tasks(shard_id, task_id, data, data_encoding) + VALUES(:shard_id, :task_id, :data, :data_encoding)` + + deleteTransferTaskQuery = `DELETE FROM transfer_tasks WHERE shard_id = $1 AND task_id = $2` + rangeDeleteTransferTaskQuery = `DELETE FROM transfer_tasks WHERE shard_id = $1 AND task_id > $2 AND task_id <= $3` + + createTimerTasksQuery = `INSERT INTO timer_tasks (shard_id, visibility_timestamp, task_id, data, data_encoding) + VALUES (:shard_id, :visibility_timestamp, :task_id, :data, :data_encoding)` + + getTimerTasksQuery = `SELECT visibility_timestamp, task_id, data, data_encoding FROM timer_tasks + WHERE shard_id = $1 + AND ((visibility_timestamp >= $2 AND task_id >= $3) OR visibility_timestamp > $4) + AND visibility_timestamp < $5 + ORDER BY visibility_timestamp,task_id LIMIT $6` + + deleteTimerTaskQuery = `DELETE FROM timer_tasks WHERE shard_id = $1 AND visibility_timestamp = $2 AND task_id = $3` + rangeDeleteTimerTaskQuery = `DELETE FROM timer_tasks WHERE shard_id = $1 AND visibility_timestamp >= $2 AND visibility_timestamp < $3` + + createReplicationTasksQuery = `INSERT INTO replication_tasks (shard_id, task_id, data, data_encoding) + VALUES(:shard_id, :task_id, :data, :data_encoding)` + + getReplicationTasksQuery = `SELECT task_id, data, data_encoding FROM replication_tasks WHERE +shard_id = $1 AND +task_id > $2 AND +task_id <= $3 +ORDER BY task_id LIMIT $4` + + deleteReplicationTaskQuery = `DELETE FROM replication_tasks WHERE shard_id = $1 AND task_id = $2` + + getReplicationTasksDLQQuery = `SELECT task_id, data, data_encoding FROM replication_tasks_dlq WHERE +source_cluster_name = $1 AND +shard_id = $2 AND +task_id > $3 AND +task_id <= $4 +ORDER BY task_id LIMIT $5` + + bufferedEventsColumns = `shard_id, domain_id, workflow_id, run_id, data, data_encoding` + createBufferedEventsQuery = `INSERT INTO buffered_events(` + bufferedEventsColumns + `) +VALUES (:shard_id, :domain_id, :workflow_id, :run_id, :data, :data_encoding)` + + deleteBufferedEventsQuery = `DELETE FROM buffered_events WHERE shard_id = $1 AND domain_id = $2 AND workflow_id = $3 AND run_id = $4` + getBufferedEventsQuery = `SELECT data, data_encoding FROM buffered_events WHERE shard_id = $1 AND domain_id = $2 AND workflow_id = $3 AND run_id = $4` + + insertReplicationTaskDLQQuery = ` +INSERT INTO replication_tasks_dlq + (source_cluster_name, + shard_id, + task_id, + data, + data_encoding) +VALUES (:source_cluster_name, + :shard_id, + :task_id, + :data, + :data_encoding) +` +) + +// InsertIntoExecutions inserts a row into executions table +func (pdb *db) InsertIntoExecutions(row *sqlplugin.ExecutionsRow) (sql.Result, error) { + return pdb.conn.NamedExec(createExecutionQuery, row) +} + +// UpdateExecutions updates a single row in executions table +func (pdb *db) UpdateExecutions(row *sqlplugin.ExecutionsRow) (sql.Result, error) { + return pdb.conn.NamedExec(updateExecutionQuery, row) +} + +// SelectFromExecutions reads a single row from executions table +func (pdb *db) SelectFromExecutions(filter *sqlplugin.ExecutionsFilter) (*sqlplugin.ExecutionsRow, error) { + var row sqlplugin.ExecutionsRow + err := pdb.conn.Get(&row, getExecutionQuery, filter.ShardID, filter.DomainID, filter.WorkflowID, filter.RunID) + if err != nil { + return nil, err + } + return &row, err +} + +// DeleteFromExecutions deletes a single row from executions table +func (pdb *db) DeleteFromExecutions(filter *sqlplugin.ExecutionsFilter) (sql.Result, error) { + return pdb.conn.Exec(deleteExecutionQuery, filter.ShardID, filter.DomainID, filter.WorkflowID, filter.RunID) +} + +// ReadLockExecutions acquires a write lock on a single row in executions table +func (pdb *db) ReadLockExecutions(filter *sqlplugin.ExecutionsFilter) (int, error) { + var nextEventID int + err := pdb.conn.Get(&nextEventID, readLockExecutionQuery, filter.ShardID, filter.DomainID, filter.WorkflowID, filter.RunID) + return nextEventID, err +} + +// WriteLockExecutions acquires a write lock on a single row in executions table +func (pdb *db) WriteLockExecutions(filter *sqlplugin.ExecutionsFilter) (int, error) { + var nextEventID int + err := pdb.conn.Get(&nextEventID, writeLockExecutionQuery, filter.ShardID, filter.DomainID, filter.WorkflowID, filter.RunID) + return nextEventID, err +} + +// InsertIntoCurrentExecutions inserts a single row into current_executions table +func (pdb *db) InsertIntoCurrentExecutions(row *sqlplugin.CurrentExecutionsRow) (sql.Result, error) { + return pdb.conn.NamedExec(createCurrentExecutionQuery, row) +} + +// UpdateCurrentExecutions updates a single row in current_executions table +func (pdb *db) UpdateCurrentExecutions(row *sqlplugin.CurrentExecutionsRow) (sql.Result, error) { + return pdb.conn.NamedExec(updateCurrentExecutionsQuery, row) +} + +// SelectFromCurrentExecutions reads one or more rows from current_executions table +func (pdb *db) SelectFromCurrentExecutions(filter *sqlplugin.CurrentExecutionsFilter) (*sqlplugin.CurrentExecutionsRow, error) { + var row sqlplugin.CurrentExecutionsRow + err := pdb.conn.Get(&row, getCurrentExecutionQuery, filter.ShardID, filter.DomainID, filter.WorkflowID) + return &row, err +} + +// DeleteFromCurrentExecutions deletes a single row in current_executions table +func (pdb *db) DeleteFromCurrentExecutions(filter *sqlplugin.CurrentExecutionsFilter) (sql.Result, error) { + return pdb.conn.Exec(deleteCurrentExecutionQuery, filter.ShardID, filter.DomainID, filter.WorkflowID, filter.RunID) +} + +// LockCurrentExecutions acquires a write lock on a single row in current_executions table +func (pdb *db) LockCurrentExecutions(filter *sqlplugin.CurrentExecutionsFilter) (*sqlplugin.CurrentExecutionsRow, error) { + var row sqlplugin.CurrentExecutionsRow + err := pdb.conn.Get(&row, lockCurrentExecutionQuery, filter.ShardID, filter.DomainID, filter.WorkflowID) + return &row, err +} + +// LockCurrentExecutionsJoinExecutions joins a row in current_executions with executions table and acquires a +// write lock on the result +func (pdb *db) LockCurrentExecutionsJoinExecutions(filter *sqlplugin.CurrentExecutionsFilter) ([]sqlplugin.CurrentExecutionsRow, error) { + var rows []sqlplugin.CurrentExecutionsRow + err := pdb.conn.Select(&rows, lockCurrentExecutionJoinExecutionsQuery, filter.ShardID, filter.DomainID, filter.WorkflowID) + return rows, err +} + +// InsertIntoTransferTasks inserts one or more rows into transfer_tasks table +func (pdb *db) InsertIntoTransferTasks(rows []sqlplugin.TransferTasksRow) (sql.Result, error) { + return pdb.conn.NamedExec(createTransferTasksQuery, rows) +} + +// SelectFromTransferTasks reads one or more rows from transfer_tasks table +func (pdb *db) SelectFromTransferTasks(filter *sqlplugin.TransferTasksFilter) ([]sqlplugin.TransferTasksRow, error) { + var rows []sqlplugin.TransferTasksRow + err := pdb.conn.Select(&rows, getTransferTasksQuery, filter.ShardID, *filter.MinTaskID, *filter.MaxTaskID) + if err != nil { + return nil, err + } + return rows, err +} + +// DeleteFromTransferTasks deletes one or more rows from transfer_tasks table +func (pdb *db) DeleteFromTransferTasks(filter *sqlplugin.TransferTasksFilter) (sql.Result, error) { + if filter.MinTaskID != nil { + return pdb.conn.Exec(rangeDeleteTransferTaskQuery, filter.ShardID, *filter.MinTaskID, *filter.MaxTaskID) + } + return pdb.conn.Exec(deleteTransferTaskQuery, filter.ShardID, *filter.TaskID) +} + +// InsertIntoTimerTasks inserts one or more rows into timer_tasks table +func (pdb *db) InsertIntoTimerTasks(rows []sqlplugin.TimerTasksRow) (sql.Result, error) { + for i := range rows { + rows[i].VisibilityTimestamp = pdb.converter.ToPostgresDateTime(rows[i].VisibilityTimestamp) + } + return pdb.conn.NamedExec(createTimerTasksQuery, rows) +} + +// SelectFromTimerTasks reads one or more rows from timer_tasks table +func (pdb *db) SelectFromTimerTasks(filter *sqlplugin.TimerTasksFilter) ([]sqlplugin.TimerTasksRow, error) { + var rows []sqlplugin.TimerTasksRow + *filter.MinVisibilityTimestamp = pdb.converter.ToPostgresDateTime(*filter.MinVisibilityTimestamp) + *filter.MaxVisibilityTimestamp = pdb.converter.ToPostgresDateTime(*filter.MaxVisibilityTimestamp) + err := pdb.conn.Select(&rows, getTimerTasksQuery, filter.ShardID, *filter.MinVisibilityTimestamp, + filter.TaskID, *filter.MinVisibilityTimestamp, *filter.MaxVisibilityTimestamp, *filter.PageSize) + if err != nil { + return nil, err + } + for i := range rows { + rows[i].VisibilityTimestamp = pdb.converter.FromPostgresDateTime(rows[i].VisibilityTimestamp) + } + return rows, err +} + +// DeleteFromTimerTasks deletes one or more rows from timer_tasks table +func (pdb *db) DeleteFromTimerTasks(filter *sqlplugin.TimerTasksFilter) (sql.Result, error) { + if filter.MinVisibilityTimestamp != nil { + *filter.MinVisibilityTimestamp = pdb.converter.ToPostgresDateTime(*filter.MinVisibilityTimestamp) + *filter.MaxVisibilityTimestamp = pdb.converter.ToPostgresDateTime(*filter.MaxVisibilityTimestamp) + return pdb.conn.Exec(rangeDeleteTimerTaskQuery, filter.ShardID, *filter.MinVisibilityTimestamp, *filter.MaxVisibilityTimestamp) + } + *filter.VisibilityTimestamp = pdb.converter.ToPostgresDateTime(*filter.VisibilityTimestamp) + return pdb.conn.Exec(deleteTimerTaskQuery, filter.ShardID, *filter.VisibilityTimestamp, filter.TaskID) +} + +// InsertIntoBufferedEvents inserts one or more rows into buffered_events table +func (pdb *db) InsertIntoBufferedEvents(rows []sqlplugin.BufferedEventsRow) (sql.Result, error) { + return pdb.conn.NamedExec(createBufferedEventsQuery, rows) +} + +// SelectFromBufferedEvents reads one or more rows from buffered_events table +func (pdb *db) SelectFromBufferedEvents(filter *sqlplugin.BufferedEventsFilter) ([]sqlplugin.BufferedEventsRow, error) { + var rows []sqlplugin.BufferedEventsRow + err := pdb.conn.Select(&rows, getBufferedEventsQuery, filter.ShardID, filter.DomainID, filter.WorkflowID, filter.RunID) + for i := 0; i < len(rows); i++ { + rows[i].DomainID = filter.DomainID + rows[i].WorkflowID = filter.WorkflowID + rows[i].RunID = filter.RunID + rows[i].ShardID = filter.ShardID + } + return rows, err +} + +// DeleteFromBufferedEvents deletes one or more rows from buffered_events table +func (pdb *db) DeleteFromBufferedEvents(filter *sqlplugin.BufferedEventsFilter) (sql.Result, error) { + return pdb.conn.Exec(deleteBufferedEventsQuery, filter.ShardID, filter.DomainID, filter.WorkflowID, filter.RunID) +} + +// InsertIntoReplicationTasks inserts one or more rows into replication_tasks table +func (pdb *db) InsertIntoReplicationTasks(rows []sqlplugin.ReplicationTasksRow) (sql.Result, error) { + return pdb.conn.NamedExec(createReplicationTasksQuery, rows) +} + +// SelectFromReplicationTasks reads one or more rows from replication_tasks table +func (pdb *db) SelectFromReplicationTasks(filter *sqlplugin.ReplicationTasksFilter) ([]sqlplugin.ReplicationTasksRow, error) { + var rows []sqlplugin.ReplicationTasksRow + err := pdb.conn.Select(&rows, getReplicationTasksQuery, filter.ShardID, filter.MinTaskID, filter.MaxTaskID, filter.PageSize) + return rows, err +} + +// DeleteFromReplicationTasks deletes one or more rows from replication_tasks table +func (pdb *db) DeleteFromReplicationTasks(shardID, taskID int) (sql.Result, error) { + return pdb.conn.Exec(deleteReplicationTaskQuery, shardID, taskID) +} + +// InsertIntoReplicationTasksDLQ inserts one or more rows into replication_tasks_dlq table +func (pdb *db) InsertIntoReplicationTasksDLQ(row *sqlplugin.ReplicationTaskDLQRow) (sql.Result, error) { + return pdb.conn.NamedExec(insertReplicationTaskDLQQuery, row) +} + +// SelectFromReplicationTasksDLQ reads one or more rows from replication_tasks_dlq table +func (pdb *db) SelectFromReplicationTasksDLQ(filter *sqlplugin.ReplicationTasksDLQFilter) ([]sqlplugin.ReplicationTasksRow, error) { + var rows []sqlplugin.ReplicationTasksRow + err := pdb.conn.Select( + &rows, getReplicationTasksDLQQuery, + filter.SourceClusterName, + filter.ShardID, + filter.MinTaskID, + filter.MaxTaskID, + filter.PageSize) + return rows, err +} diff --git a/common/persistence/sql/sqlplugin/postgres/execution_maps.go b/common/persistence/sql/sqlplugin/postgres/execution_maps.go new file mode 100644 index 00000000000..dc518896672 --- /dev/null +++ b/common/persistence/sql/sqlplugin/postgres/execution_maps.go @@ -0,0 +1,371 @@ +// Copyright (c) 2019 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package postgres + +import ( + "database/sql" + "fmt" + "strings" + + "github.com/uber/cadence/common/persistence/sql/sqlplugin" +) + +const ( + deleteMapQueryTemplate = `DELETE FROM %v +WHERE +shard_id = $1 AND +domain_id = $2 AND +workflow_id = $3 AND +run_id = $4` + + // %[2]v is the columns of the value struct (i.e. no primary key columns), comma separated + // %[3]v should be %[2]v with colons prepended. + // i.e. %[3]v = ",".join(":" + s for s in %[2]v) + // So that this query can be used with BindNamed + // %[4]v should be the name of the key associated with the map + // e.g. for ActivityInfo it is "schedule_id" + setKeyInMapQueryTemplate = `INSERT INTO %[1]v +(shard_id, domain_id, workflow_id, run_id, %[4]v, %[2]v) +VALUES +(:shard_id, :domain_id, :workflow_id, :run_id, :%[4]v, %[3]v) +ON CONFLICT (shard_id, domain_id, workflow_id, run_id, %[4]v) DO UPDATE + SET shard_id = excluded.shard_id, + domain_id = excluded.domain_id, + workflow_id = excluded.workflow_id, + run_id = excluded.run_id, + %[4]v = excluded.%[4]v ` + + // %[2]v is the name of the key + deleteKeyInMapQueryTemplate = `DELETE FROM %[1]v +WHERE +shard_id = $1 AND +domain_id = $2 AND +workflow_id = $3 AND +run_id = $4 AND +%[2]v = $5` + + // %[1]v is the name of the table + // %[2]v is the name of the key + // %[3]v is the value columns, separated by commas + getMapQueryTemplate = `SELECT %[2]v, %[3]v FROM %[1]v +WHERE +shard_id = $1 AND +domain_id = $2 AND +workflow_id = $3 AND +run_id = $4` +) + +const ( + deleteAllSignalsRequestedSetQuery = `DELETE FROM signals_requested_sets +WHERE +shard_id = $1 AND +domain_id = $2 AND +workflow_id = $3 AND +run_id = $4 +` + + createSignalsRequestedSetQuery = `INSERT INTO signals_requested_sets +(shard_id, domain_id, workflow_id, run_id, signal_id) VALUES +(:shard_id, :domain_id, :workflow_id, :run_id, :signal_id) +ON CONFLICT (shard_id, domain_id, workflow_id, run_id, signal_id) DO NOTHING` + + deleteSignalsRequestedSetQuery = `DELETE FROM signals_requested_sets +WHERE +shard_id = $1 AND +domain_id = $2 AND +workflow_id = $3 AND +run_id = $4 AND +signal_id = $5` + + getSignalsRequestedSetQuery = `SELECT signal_id FROM signals_requested_sets WHERE +shard_id = $1 AND +domain_id = $2 AND +workflow_id = $3 AND +run_id = $4` +) + +func stringMap(a []string, f func(string) string) []string { + b := make([]string, len(a)) + for i, v := range a { + b[i] = f(v) + } + return b +} + +func makeDeleteMapQry(tableName string) string { + return fmt.Sprintf(deleteMapQueryTemplate, tableName) +} + +func makeSetKeyInMapQry(tableName string, nonPrimaryKeyColumns []string, mapKeyName string) string { + return fmt.Sprintf(setKeyInMapQueryTemplate, + tableName, + strings.Join(nonPrimaryKeyColumns, ","), + strings.Join(stringMap(nonPrimaryKeyColumns, func(x string) string { + return ":" + x + }), ","), + mapKeyName) +} + +func makeDeleteKeyInMapQry(tableName string, mapKeyName string) string { + return fmt.Sprintf(deleteKeyInMapQueryTemplate, + tableName, + mapKeyName) +} + +func makeGetMapQryTemplate(tableName string, nonPrimaryKeyColumns []string, mapKeyName string) string { + return fmt.Sprintf(getMapQueryTemplate, + tableName, + mapKeyName, + strings.Join(nonPrimaryKeyColumns, ",")) +} + +var ( + // Omit shard_id, run_id, domain_id, workflow_id, schedule_id since they're in the primary key + activityInfoColumns = []string{ + "data", + "data_encoding", + "last_heartbeat_details", + "last_heartbeat_updated_time", + } + activityInfoTableName = "activity_info_maps" + activityInfoKey = "schedule_id" + + deleteActivityInfoMapQry = makeDeleteMapQry(activityInfoTableName) + setKeyInActivityInfoMapQry = makeSetKeyInMapQry(activityInfoTableName, activityInfoColumns, activityInfoKey) + deleteKeyInActivityInfoMapQry = makeDeleteKeyInMapQry(activityInfoTableName, activityInfoKey) + getActivityInfoMapQry = makeGetMapQryTemplate(activityInfoTableName, activityInfoColumns, activityInfoKey) +) + +// ReplaceIntoActivityInfoMaps replaces one or more rows in activity_info_maps table +func (pdb *db) ReplaceIntoActivityInfoMaps(rows []sqlplugin.ActivityInfoMapsRow) (sql.Result, error) { + for i := range rows { + rows[i].LastHeartbeatUpdatedTime = pdb.converter.ToPostgresDateTime(rows[i].LastHeartbeatUpdatedTime) + } + return pdb.conn.NamedExec(setKeyInActivityInfoMapQry, rows) +} + +// SelectFromActivityInfoMaps reads one or more rows from activity_info_maps table +func (pdb *db) SelectFromActivityInfoMaps(filter *sqlplugin.ActivityInfoMapsFilter) ([]sqlplugin.ActivityInfoMapsRow, error) { + var rows []sqlplugin.ActivityInfoMapsRow + err := pdb.conn.Select(&rows, getActivityInfoMapQry, filter.ShardID, filter.DomainID, filter.WorkflowID, filter.RunID) + for i := 0; i < len(rows); i++ { + rows[i].ShardID = int64(filter.ShardID) + rows[i].DomainID = filter.DomainID + rows[i].WorkflowID = filter.WorkflowID + rows[i].RunID = filter.RunID + rows[i].LastHeartbeatUpdatedTime = pdb.converter.FromPostgresDateTime(rows[i].LastHeartbeatUpdatedTime) + } + return rows, err +} + +// DeleteFromActivityInfoMaps deletes one or more rows from activity_info_maps table +func (pdb *db) DeleteFromActivityInfoMaps(filter *sqlplugin.ActivityInfoMapsFilter) (sql.Result, error) { + if filter.ScheduleID != nil { + return pdb.conn.Exec(deleteKeyInActivityInfoMapQry, filter.ShardID, filter.DomainID, filter.WorkflowID, filter.RunID, *filter.ScheduleID) + } + return pdb.conn.Exec(deleteActivityInfoMapQry, filter.ShardID, filter.DomainID, filter.WorkflowID, filter.RunID) +} + +var ( + timerInfoColumns = []string{ + "data", + "data_encoding", + } + timerInfoTableName = "timer_info_maps" + timerInfoKey = "timer_id" + + deleteTimerInfoMapSQLQuery = makeDeleteMapQry(timerInfoTableName) + setKeyInTimerInfoMapSQLQuery = makeSetKeyInMapQry(timerInfoTableName, timerInfoColumns, timerInfoKey) + deleteKeyInTimerInfoMapSQLQuery = makeDeleteKeyInMapQry(timerInfoTableName, timerInfoKey) + getTimerInfoMapSQLQuery = makeGetMapQryTemplate(timerInfoTableName, timerInfoColumns, timerInfoKey) +) + +// ReplaceIntoTimerInfoMaps replaces one or more rows in timer_info_maps table +func (pdb *db) ReplaceIntoTimerInfoMaps(rows []sqlplugin.TimerInfoMapsRow) (sql.Result, error) { + return pdb.conn.NamedExec(setKeyInTimerInfoMapSQLQuery, rows) +} + +// SelectFromTimerInfoMaps reads one or more rows from timer_info_maps table +func (pdb *db) SelectFromTimerInfoMaps(filter *sqlplugin.TimerInfoMapsFilter) ([]sqlplugin.TimerInfoMapsRow, error) { + var rows []sqlplugin.TimerInfoMapsRow + err := pdb.conn.Select(&rows, getTimerInfoMapSQLQuery, filter.ShardID, filter.DomainID, filter.WorkflowID, filter.RunID) + for i := 0; i < len(rows); i++ { + rows[i].ShardID = int64(filter.ShardID) + rows[i].DomainID = filter.DomainID + rows[i].WorkflowID = filter.WorkflowID + rows[i].RunID = filter.RunID + } + return rows, err +} + +// DeleteFromTimerInfoMaps deletes one or more rows from timer_info_maps table +func (pdb *db) DeleteFromTimerInfoMaps(filter *sqlplugin.TimerInfoMapsFilter) (sql.Result, error) { + if filter.TimerID != nil { + return pdb.conn.Exec(deleteKeyInTimerInfoMapSQLQuery, filter.ShardID, filter.DomainID, filter.WorkflowID, filter.RunID, *filter.TimerID) + } + return pdb.conn.Exec(deleteTimerInfoMapSQLQuery, filter.ShardID, filter.DomainID, filter.WorkflowID, filter.RunID) +} + +var ( + childExecutionInfoColumns = []string{ + "data", + "data_encoding", + } + childExecutionInfoTableName = "child_execution_info_maps" + childExecutionInfoKey = "initiated_id" + + deleteChildExecutionInfoMapQry = makeDeleteMapQry(childExecutionInfoTableName) + setKeyInChildExecutionInfoMapQry = makeSetKeyInMapQry(childExecutionInfoTableName, childExecutionInfoColumns, childExecutionInfoKey) + deleteKeyInChildExecutionInfoMapQry = makeDeleteKeyInMapQry(childExecutionInfoTableName, childExecutionInfoKey) + getChildExecutionInfoMapQry = makeGetMapQryTemplate(childExecutionInfoTableName, childExecutionInfoColumns, childExecutionInfoKey) +) + +// ReplaceIntoChildExecutionInfoMaps replaces one or more rows in child_execution_info_maps table +func (pdb *db) ReplaceIntoChildExecutionInfoMaps(rows []sqlplugin.ChildExecutionInfoMapsRow) (sql.Result, error) { + return pdb.conn.NamedExec(setKeyInChildExecutionInfoMapQry, rows) +} + +// SelectFromChildExecutionInfoMaps reads one or more rows from child_execution_info_maps table +func (pdb *db) SelectFromChildExecutionInfoMaps(filter *sqlplugin.ChildExecutionInfoMapsFilter) ([]sqlplugin.ChildExecutionInfoMapsRow, error) { + var rows []sqlplugin.ChildExecutionInfoMapsRow + err := pdb.conn.Select(&rows, getChildExecutionInfoMapQry, filter.ShardID, filter.DomainID, filter.WorkflowID, filter.RunID) + for i := 0; i < len(rows); i++ { + rows[i].ShardID = int64(filter.ShardID) + rows[i].DomainID = filter.DomainID + rows[i].WorkflowID = filter.WorkflowID + rows[i].RunID = filter.RunID + } + return rows, err +} + +// DeleteFromChildExecutionInfoMaps deletes one or more rows from child_execution_info_maps table +func (pdb *db) DeleteFromChildExecutionInfoMaps(filter *sqlplugin.ChildExecutionInfoMapsFilter) (sql.Result, error) { + if filter.InitiatedID != nil { + return pdb.conn.Exec(deleteKeyInChildExecutionInfoMapQry, filter.ShardID, filter.DomainID, filter.WorkflowID, filter.RunID, *filter.InitiatedID) + } + return pdb.conn.Exec(deleteChildExecutionInfoMapQry, filter.ShardID, filter.DomainID, filter.WorkflowID, filter.RunID) +} + +var ( + requestCancelInfoColumns = []string{ + "data", + "data_encoding", + } + requestCancelInfoTableName = "request_cancel_info_maps" + requestCancelInfoKey = "initiated_id" + + deleteRequestCancelInfoMapQry = makeDeleteMapQry(requestCancelInfoTableName) + setKeyInRequestCancelInfoMapQry = makeSetKeyInMapQry(requestCancelInfoTableName, requestCancelInfoColumns, requestCancelInfoKey) + deleteKeyInRequestCancelInfoMapQry = makeDeleteKeyInMapQry(requestCancelInfoTableName, requestCancelInfoKey) + getRequestCancelInfoMapQry = makeGetMapQryTemplate(requestCancelInfoTableName, requestCancelInfoColumns, requestCancelInfoKey) +) + +// ReplaceIntoRequestCancelInfoMaps replaces one or more rows in request_cancel_info_maps table +func (pdb *db) ReplaceIntoRequestCancelInfoMaps(rows []sqlplugin.RequestCancelInfoMapsRow) (sql.Result, error) { + return pdb.conn.NamedExec(setKeyInRequestCancelInfoMapQry, rows) +} + +// SelectFromRequestCancelInfoMaps reads one or more rows from request_cancel_info_maps table +func (pdb *db) SelectFromRequestCancelInfoMaps(filter *sqlplugin.RequestCancelInfoMapsFilter) ([]sqlplugin.RequestCancelInfoMapsRow, error) { + var rows []sqlplugin.RequestCancelInfoMapsRow + err := pdb.conn.Select(&rows, getRequestCancelInfoMapQry, filter.ShardID, filter.DomainID, filter.WorkflowID, filter.RunID) + for i := 0; i < len(rows); i++ { + rows[i].ShardID = int64(filter.ShardID) + rows[i].DomainID = filter.DomainID + rows[i].WorkflowID = filter.WorkflowID + rows[i].RunID = filter.RunID + } + return rows, err +} + +// DeleteFromRequestCancelInfoMaps deletes one or more rows from request_cancel_info_maps table +func (pdb *db) DeleteFromRequestCancelInfoMaps(filter *sqlplugin.RequestCancelInfoMapsFilter) (sql.Result, error) { + if filter.InitiatedID != nil { + return pdb.conn.Exec(deleteKeyInRequestCancelInfoMapQry, filter.ShardID, filter.DomainID, filter.WorkflowID, filter.RunID, *filter.InitiatedID) + } + return pdb.conn.Exec(deleteRequestCancelInfoMapQry, filter.ShardID, filter.DomainID, filter.WorkflowID, filter.RunID) +} + +var ( + signalInfoColumns = []string{ + "data", + "data_encoding", + } + signalInfoTableName = "signal_info_maps" + signalInfoKey = "initiated_id" + + deleteSignalInfoMapQry = makeDeleteMapQry(signalInfoTableName) + setKeyInSignalInfoMapQry = makeSetKeyInMapQry(signalInfoTableName, signalInfoColumns, signalInfoKey) + deleteKeyInSignalInfoMapQry = makeDeleteKeyInMapQry(signalInfoTableName, signalInfoKey) + getSignalInfoMapQry = makeGetMapQryTemplate(signalInfoTableName, signalInfoColumns, signalInfoKey) +) + +// ReplaceIntoSignalInfoMaps replaces one or more rows in signal_info_maps table +func (pdb *db) ReplaceIntoSignalInfoMaps(rows []sqlplugin.SignalInfoMapsRow) (sql.Result, error) { + return pdb.conn.NamedExec(setKeyInSignalInfoMapQry, rows) +} + +// SelectFromSignalInfoMaps reads one or more rows from signal_info_maps table +func (pdb *db) SelectFromSignalInfoMaps(filter *sqlplugin.SignalInfoMapsFilter) ([]sqlplugin.SignalInfoMapsRow, error) { + var rows []sqlplugin.SignalInfoMapsRow + err := pdb.conn.Select(&rows, getSignalInfoMapQry, filter.ShardID, filter.DomainID, filter.WorkflowID, filter.RunID) + for i := 0; i < len(rows); i++ { + rows[i].ShardID = int64(filter.ShardID) + rows[i].DomainID = filter.DomainID + rows[i].WorkflowID = filter.WorkflowID + rows[i].RunID = filter.RunID + } + return rows, err +} + +// DeleteFromSignalInfoMaps deletes one or more rows from signal_info_maps table +func (pdb *db) DeleteFromSignalInfoMaps(filter *sqlplugin.SignalInfoMapsFilter) (sql.Result, error) { + if filter.InitiatedID != nil { + return pdb.conn.Exec(deleteKeyInSignalInfoMapQry, filter.ShardID, filter.DomainID, filter.WorkflowID, filter.RunID, *filter.InitiatedID) + } + return pdb.conn.Exec(deleteSignalInfoMapQry, filter.ShardID, filter.DomainID, filter.WorkflowID, filter.RunID) +} + +// InsertIntoSignalsRequestedSets inserts one or more rows into signals_requested_sets table +func (pdb *db) InsertIntoSignalsRequestedSets(rows []sqlplugin.SignalsRequestedSetsRow) (sql.Result, error) { + return pdb.conn.NamedExec(createSignalsRequestedSetQuery, rows) +} + +// SelectFromSignalsRequestedSets reads one or more rows from signals_requested_sets table +func (pdb *db) SelectFromSignalsRequestedSets(filter *sqlplugin.SignalsRequestedSetsFilter) ([]sqlplugin.SignalsRequestedSetsRow, error) { + var rows []sqlplugin.SignalsRequestedSetsRow + err := pdb.conn.Select(&rows, getSignalsRequestedSetQuery, filter.ShardID, filter.DomainID, filter.WorkflowID, filter.RunID) + for i := 0; i < len(rows); i++ { + rows[i].ShardID = int64(filter.ShardID) + rows[i].DomainID = filter.DomainID + rows[i].WorkflowID = filter.WorkflowID + rows[i].RunID = filter.RunID + } + return rows, err +} + +// DeleteFromSignalsRequestedSets deletes one or more rows from signals_requested_sets table +func (pdb *db) DeleteFromSignalsRequestedSets(filter *sqlplugin.SignalsRequestedSetsFilter) (sql.Result, error) { + if filter.SignalID != nil { + return pdb.conn.Exec(deleteSignalsRequestedSetQuery, filter.ShardID, filter.DomainID, filter.WorkflowID, filter.RunID, *filter.SignalID) + } + return pdb.conn.Exec(deleteAllSignalsRequestedSetQuery, filter.ShardID, filter.DomainID, filter.WorkflowID, filter.RunID) +} diff --git a/common/persistence/sql/sqlplugin/postgres/plugin.go b/common/persistence/sql/sqlplugin/postgres/plugin.go new file mode 100644 index 00000000000..6c0d9cce7ff --- /dev/null +++ b/common/persistence/sql/sqlplugin/postgres/plugin.go @@ -0,0 +1,93 @@ +// Copyright (c) 2019 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package postgres + +import ( + "fmt" + "net" + + "github.com/iancoleman/strcase" + "github.com/jmoiron/sqlx" + + "github.com/uber/cadence/common/persistence/sql" + "github.com/uber/cadence/common/persistence/sql/sqlplugin" + "github.com/uber/cadence/common/service/config" +) + +const ( + // PluginName is the name of the plugin + PluginName = "postgres" + dataSourceNamePostgres = "user=%v password=%v host=%v port=%v dbname=%v sslmode=disable " +) + +type plugin struct{} + +var _ sqlplugin.Plugin = (*plugin)(nil) + +func init() { + sql.RegisterPlugin(PluginName, &plugin{}) +} + +// CreateDB initialize the db object +func (d *plugin) CreateDB(cfg *config.SQL) (sqlplugin.DB, error) { + conn, err := d.createDBConnection(cfg) + if err != nil { + return nil, err + } + db := NewDB(conn, nil) + return db, nil +} + +// CreateAdminDB initialize the adminDB object +func (d *plugin) CreateAdminDB(cfg *config.SQL) (sqlplugin.AdminDB, error) { + conn, err := d.createDBConnection(cfg) + if err != nil { + return nil, err + } + db := NewDB(conn, nil) + return db, nil +} + +// CreateDBConnection creates a returns a reference to a logical connection to the +// underlying SQL database. The returned object is to tied to a single +// SQL database and the object can be used to perform CRUD operations on +// the tables in the database +func (d *plugin) createDBConnection(cfg *config.SQL) (*sqlx.DB, error) { + host, port, err := net.SplitHostPort(cfg.ConnectAddr) + if err != nil{ + return nil, fmt.Errorf("invalid connect address, it must be in host:port format, %v, err: %v", cfg.ConnectAddr, err) + } + + dbName := cfg.DatabaseName + //NOTE: postgres doesn't allow to connect with empty dbName, the admin dbName is "postgres" + if dbName == "" { + dbName = "postgres" + } + db, err := sqlx.Connect(PluginName, fmt.Sprintf(dataSourceNamePostgres, cfg.User, cfg.Password, host, port, dbName)) + + if err != nil { + return nil, err + } + + // Maps struct names in CamelCase to snake without need for db struct tags. + db.MapperFunc(strcase.ToSnake) + return db, nil +} diff --git a/common/persistence/sql/sqlplugin/postgres/postgres_server_test.go b/common/persistence/sql/sqlplugin/postgres/postgres_server_test.go new file mode 100644 index 00000000000..eec78ffe2da --- /dev/null +++ b/common/persistence/sql/sqlplugin/postgres/postgres_server_test.go @@ -0,0 +1,114 @@ +// Copyright (c) 2019 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package postgres + +import ( + "testing" + + "github.com/stretchr/testify/suite" + + pt "github.com/uber/cadence/common/persistence/persistence-tests" + "github.com/uber/cadence/environment" +) + +const ( + testUser = "postgres" + testPassword = "cadence" + testSchemaDir = "schema/postgres" +) + +func getTestClusterOption() *pt.TestBaseOptions { + return &pt.TestBaseOptions{ + SQLDBPluginName: PluginName, + DBUsername: testUser, + DBPassword: testPassword, + DBHost: environment.GetPostgresAddress(), + DBPort: environment.GetPostgresPort(), + SchemaDir: testSchemaDir, + } +} + +func TestSQLHistoryV2PersistenceSuite(t *testing.T) { + s := new(pt.HistoryV2PersistenceSuite) + s.TestBase = pt.NewTestBaseWithSQL(getTestClusterOption()) + s.TestBase.Setup() + suite.Run(t, s) +} + +func TestSQLMatchingPersistenceSuite(t *testing.T) { + s := new(pt.MatchingPersistenceSuite) + s.TestBase = pt.NewTestBaseWithSQL(getTestClusterOption()) + s.TestBase.Setup() + suite.Run(t, s) +} + +func TestSQLMetadataPersistenceSuiteV2(t *testing.T) { + s := new(pt.MetadataPersistenceSuiteV2) + s.TestBase = pt.NewTestBaseWithSQL(getTestClusterOption()) + s.TestBase.Setup() + suite.Run(t, s) +} + +func TestSQLShardPersistenceSuite(t *testing.T) { + s := new(pt.ShardPersistenceSuite) + s.TestBase = pt.NewTestBaseWithSQL(getTestClusterOption()) + s.TestBase.Setup() + suite.Run(t, s) +} + +func TestSQLExecutionManagerSuite(t *testing.T) { + s := new(pt.ExecutionManagerSuite) + s.TestBase = pt.NewTestBaseWithSQL(getTestClusterOption()) + s.TestBase.Setup() + suite.Run(t, s) +} + +func TestSQLExecutionManagerWithEventsV2(t *testing.T) { + s := new(pt.ExecutionManagerSuiteForEventsV2) + s.TestBase = pt.NewTestBaseWithSQL(getTestClusterOption()) + s.TestBase.Setup() + suite.Run(t, s) +} + +func TestSQLVisibilityPersistenceSuite(t *testing.T) { + s := new(pt.VisibilityPersistenceSuite) + s.TestBase = pt.NewTestBaseWithSQL(getTestClusterOption()) + s.TestBase.Setup() + suite.Run(t, s) +} + +// TODO flaky test in buildkite +// https://github.com/uber/cadence/issues/2877 +/* +FAIL: TestSQLQueuePersistence/TestDomainReplicationQueue (0.26s) + queuePersistenceTest.go:102: + Error Trace: queuePersistenceTest.go:102 + Error: Not equal: + expected: 99 + actual : 98 + Test: TestSQLQueuePersistence/TestDomainReplicationQueue +*/ +//func TestSQLQueuePersistence(t *testing.T) { +// s := new(pt.QueuePersistenceSuite) +// s.TestBase = pt.NewTestBaseWithSQL(getTestClusterOption()) +// s.TestBase.Setup() +// suite.Run(t, s) +//} diff --git a/common/persistence/sql/sqlplugin/postgres/queue.go b/common/persistence/sql/sqlplugin/postgres/queue.go new file mode 100644 index 00000000000..7515cd3ce69 --- /dev/null +++ b/common/persistence/sql/sqlplugin/postgres/queue.go @@ -0,0 +1,113 @@ +// Copyright (c) 2019 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package postgres + +import ( + "database/sql" + "encoding/json" + + "github.com/uber/cadence/common" + "github.com/uber/cadence/common/persistence/sql/sqlplugin" +) + +const ( + templateEnqueueMessageQuery = `INSERT INTO queue (queue_type, message_id, message_payload) VALUES(:queue_type, :message_id, :message_payload)` + templateGetLastMessageIDQuery = `SELECT message_id FROM queue WHERE message_id >= (SELECT message_id FROM queue WHERE queue_type=$1 ORDER BY message_id DESC LIMIT 1) FOR UPDATE` + templateGetMessagesQuery = `SELECT message_id, message_payload FROM queue WHERE queue_type = $1 and message_id > $2 LIMIT $3` + templateDeleteMessagesQuery = `DELETE FROM queue WHERE queue_type = $1 and message_id < $2` + templateGetQueueMetadataQuery = `SELECT data from queue_metadata WHERE queue_type = $1` + templateGetQueueMetadataForUpdateQuery = templateGetQueueMetadataQuery + ` FOR UPDATE` + templateInsertQueueMetadataQuery = `INSERT INTO queue_metadata (queue_type, data) VALUES(:queue_type, :data)` + templateUpdateQueueMetadataQuery = `UPDATE queue_metadata SET data = $1 WHERE queue_type = $2` +) + +// InsertIntoQueue inserts a new row into queue table +func (pdb *db) InsertIntoQueue(row *sqlplugin.QueueRow) (sql.Result, error) { + return pdb.conn.NamedExec(templateEnqueueMessageQuery, row) +} + +// GetLastEnqueuedMessageIDForUpdate returns the last enqueued message ID +func (pdb *db) GetLastEnqueuedMessageIDForUpdate(queueType common.QueueType) (int, error) { + var lastMessageID int + err := pdb.conn.Get(&lastMessageID, templateGetLastMessageIDQuery, queueType) + return lastMessageID, err +} + +// GetMessagesFromQueue retrieves messages from the queue +func (pdb *db) GetMessagesFromQueue(queueType common.QueueType, lastMessageID, maxRows int) ([]sqlplugin.QueueRow, error) { + var rows []sqlplugin.QueueRow + err := pdb.conn.Select(&rows, templateGetMessagesQuery, queueType, lastMessageID, maxRows) + return rows, err +} + +// DeleteMessagesBefore deletes messages before messageID from the queue +func (pdb *db) DeleteMessagesBefore(queueType common.QueueType, messageID int) (sql.Result, error) { + return pdb.conn.Exec(templateDeleteMessagesQuery, queueType, messageID) +} + +// InsertAckLevel inserts ack level +func (pdb *db) InsertAckLevel(queueType common.QueueType, messageID int, clusterName string) error { + clusterAckLevels := map[string]int{clusterName: messageID} + data, err := json.Marshal(clusterAckLevels) + if err != nil { + return err + } + + _, err = pdb.conn.NamedExec(templateInsertQueueMetadataQuery, sqlplugin.QueueMetadataRow{QueueType: queueType, Data: data}) + return err + +} + +// UpdateAckLevels updates cluster ack levels +func (pdb *db) UpdateAckLevels(queueType common.QueueType, clusterAckLevels map[string]int) error { + data, err := json.Marshal(clusterAckLevels) + if err != nil { + return err + } + + _, err = pdb.conn.Exec(templateUpdateQueueMetadataQuery, data, queueType) + return err +} + +// GetAckLevels returns ack levels for pulling clusters +func (pdb *db) GetAckLevels(queueType common.QueueType, forUpdate bool) (map[string]int, error) { + queryStr := templateGetQueueMetadataQuery + if forUpdate { + queryStr = templateGetQueueMetadataForUpdateQuery + } + + var data []byte + err := pdb.conn.Get(&data, queryStr, queueType) + if err != nil { + if err == sql.ErrNoRows { + return nil, nil + } + + return nil, err + } + + var clusterAckLevels map[string]int + if err := json.Unmarshal(data, &clusterAckLevels); err != nil { + return nil, err + } + + return clusterAckLevels, nil +} diff --git a/common/persistence/sql/sqlplugin/postgres/shard.go b/common/persistence/sql/sqlplugin/postgres/shard.go new file mode 100644 index 00000000000..75555780d8e --- /dev/null +++ b/common/persistence/sql/sqlplugin/postgres/shard.go @@ -0,0 +1,77 @@ +// Copyright (c) 2019 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package postgres + +import ( + "database/sql" + + "github.com/uber/cadence/common/persistence/sql/sqlplugin" +) + +const ( + createShardQry = `INSERT INTO + shards (shard_id, range_id, data, data_encoding) VALUES ($1, $2, $3, $4)` + + getShardQry = `SELECT + shard_id, range_id, data, data_encoding + FROM shards WHERE shard_id = $1` + + updateShardQry = `UPDATE shards + SET range_id = $1, data = $2, data_encoding = $3 + WHERE shard_id = $4` + + lockShardQry = `SELECT range_id FROM shards WHERE shard_id = $1 FOR UPDATE` + readLockShardQry = `SELECT range_id FROM shards WHERE shard_id = $1 FOR SHARE` +) + +// InsertIntoShards inserts one or more rows into shards table +func (pdb *db) InsertIntoShards(row *sqlplugin.ShardsRow) (sql.Result, error) { + return pdb.conn.Exec(createShardQry, row.ShardID, row.RangeID, row.Data, row.DataEncoding) +} + +// UpdateShards updates one or more rows into shards table +func (pdb *db) UpdateShards(row *sqlplugin.ShardsRow) (sql.Result, error) { + return pdb.conn.Exec(updateShardQry, row.RangeID, row.Data, row.DataEncoding, row.ShardID) +} + +// SelectFromShards reads one or more rows from shards table +func (pdb *db) SelectFromShards(filter *sqlplugin.ShardsFilter) (*sqlplugin.ShardsRow, error) { + var row sqlplugin.ShardsRow + err := pdb.conn.Get(&row, getShardQry, filter.ShardID) + if err != nil { + return nil, err + } + return &row, err +} + +// ReadLockShards acquires a read lock on a single row in shards table +func (pdb *db) ReadLockShards(filter *sqlplugin.ShardsFilter) (int, error) { + var rangeID int + err := pdb.conn.Get(&rangeID, readLockShardQry, filter.ShardID) + return rangeID, err +} + +// WriteLockShards acquires a write lock on a single row in shards table +func (pdb *db) WriteLockShards(filter *sqlplugin.ShardsFilter) (int, error) { + var rangeID int + err := pdb.conn.Get(&rangeID, lockShardQry, filter.ShardID) + return rangeID, err +} diff --git a/common/persistence/sql/sqlplugin/postgres/task.go b/common/persistence/sql/sqlplugin/postgres/task.go new file mode 100644 index 00000000000..2fa4678e2da --- /dev/null +++ b/common/persistence/sql/sqlplugin/postgres/task.go @@ -0,0 +1,185 @@ +// Copyright (c) 2019 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package postgres + +import ( + "database/sql" + "fmt" + + "github.com/uber/cadence/common/persistence/sql/sqlplugin" +) + +const ( + taskListCreatePart = `INTO task_lists(shard_id, domain_id, name, task_type, range_id, data, data_encoding) ` + + `VALUES (:shard_id, :domain_id, :name, :task_type, :range_id, :data, :data_encoding)` + + // (default range ID: initialRangeID == 1) + createTaskListQry = `INSERT ` + taskListCreatePart + + replaceTaskListQry = `INSERT ` + taskListCreatePart + + `ON CONFLICT (shard_id, domain_id, name, task_type) DO UPDATE +SET range_id = excluded.range_id, +data = excluded.data, +data_encoding = excluded.data_encoding` + + updateTaskListQry = `UPDATE task_lists SET +range_id = :range_id, +data = :data, +data_encoding = :data_encoding +WHERE +shard_id = :shard_id AND +domain_id = :domain_id AND +name = :name AND +task_type = :task_type +` + + listTaskListQry = `SELECT domain_id, range_id, name, task_type, data, data_encoding ` + + `FROM task_lists ` + + `WHERE shard_id = $1 AND domain_id > $2 AND name > $3 AND task_type > $4 ORDER BY domain_id,name,task_type LIMIT $5` + + getTaskListQry = `SELECT domain_id, range_id, name, task_type, data, data_encoding ` + + `FROM task_lists ` + + `WHERE shard_id = $1 AND domain_id = $2 AND name = $3 AND task_type = $4` + + deleteTaskListQry = `DELETE FROM task_lists WHERE shard_id=$1 AND domain_id=$2 AND name=$3 AND task_type=$4 AND range_id=$5` + + lockTaskListQry = `SELECT range_id FROM task_lists ` + + `WHERE shard_id = $1 AND domain_id = $2 AND name = $3 AND task_type = $4 FOR UPDATE` + + getTaskMinMaxQry = `SELECT task_id, data, data_encoding ` + + `FROM tasks ` + + `WHERE domain_id = $1 AND task_list_name = $2 AND task_type = $3 AND task_id > $4 AND task_id <= $5 ` + + ` ORDER BY task_id LIMIT $6` + + getTaskMinQry = `SELECT task_id, data, data_encoding ` + + `FROM tasks ` + + `WHERE domain_id = $1 AND task_list_name = $2 AND task_type = $3 AND task_id > $4 ORDER BY task_id LIMIT $5` + + createTaskQry = `INSERT INTO ` + + `tasks(domain_id, task_list_name, task_type, task_id, data, data_encoding) ` + + `VALUES(:domain_id, :task_list_name, :task_type, :task_id, :data, :data_encoding)` + + deleteTaskQry = `DELETE FROM tasks ` + + `WHERE domain_id = $1 AND task_list_name = $2 AND task_type = $3 AND task_id = $4` + + rangeDeleteTaskQry = `DELETE FROM tasks ` + + `WHERE domain_id = $1 AND task_list_name = $2 AND task_type = $3 AND task_id IN (SELECT task_id FROM + tasks WHERE domain_id = $1 AND task_list_name = $2 AND task_type = $3 AND task_id <= $4 ` + + `ORDER BY domain_id,task_list_name,task_type,task_id LIMIT $5 )` +) + +// InsertIntoTasks inserts one or more rows into tasks table +func (pdb *db) InsertIntoTasks(rows []sqlplugin.TasksRow) (sql.Result, error) { + return pdb.conn.NamedExec(createTaskQry, rows) +} + +// SelectFromTasks reads one or more rows from tasks table +func (pdb *db) SelectFromTasks(filter *sqlplugin.TasksFilter) ([]sqlplugin.TasksRow, error) { + var err error + var rows []sqlplugin.TasksRow + switch { + case filter.MaxTaskID != nil: + err = pdb.conn.Select(&rows, getTaskMinMaxQry, filter.DomainID, + filter.TaskListName, filter.TaskType, *filter.MinTaskID, *filter.MaxTaskID, *filter.PageSize) + default: + err = pdb.conn.Select(&rows, getTaskMinQry, filter.DomainID, + filter.TaskListName, filter.TaskType, *filter.MinTaskID, *filter.PageSize) + } + if err != nil { + return nil, err + } + return rows, err +} + +// DeleteFromTasks deletes one or more rows from tasks table +func (pdb *db) DeleteFromTasks(filter *sqlplugin.TasksFilter) (sql.Result, error) { + if filter.TaskIDLessThanEquals != nil { + if filter.Limit == nil || *filter.Limit == 0 { + return nil, fmt.Errorf("missing limit parameter") + } + return pdb.conn.Exec(rangeDeleteTaskQry, + filter.DomainID, filter.TaskListName, filter.TaskType, *filter.TaskIDLessThanEquals, *filter.Limit) + } + return pdb.conn.Exec(deleteTaskQry, filter.DomainID, filter.TaskListName, filter.TaskType, *filter.TaskID) +} + +// InsertIntoTaskLists inserts one or more rows into task_lists table +func (pdb *db) InsertIntoTaskLists(row *sqlplugin.TaskListsRow) (sql.Result, error) { + return pdb.conn.NamedExec(createTaskListQry, row) +} + +// ReplaceIntoTaskLists replaces one or more rows in task_lists table +func (pdb *db) ReplaceIntoTaskLists(row *sqlplugin.TaskListsRow) (sql.Result, error) { + return pdb.conn.NamedExec(replaceTaskListQry, row) +} + +// UpdateTaskLists updates a row in task_lists table +func (pdb *db) UpdateTaskLists(row *sqlplugin.TaskListsRow) (sql.Result, error) { + return pdb.conn.NamedExec(updateTaskListQry, row) +} + +// SelectFromTaskLists reads one or more rows from task_lists table +func (pdb *db) SelectFromTaskLists(filter *sqlplugin.TaskListsFilter) ([]sqlplugin.TaskListsRow, error) { + switch { + case filter.DomainID != nil && filter.Name != nil && filter.TaskType != nil: + return pdb.selectFromTaskLists(filter) + case filter.DomainIDGreaterThan != nil && filter.NameGreaterThan != nil && filter.TaskTypeGreaterThan != nil && filter.PageSize != nil: + return pdb.rangeSelectFromTaskLists(filter) + default: + return nil, fmt.Errorf("invalid set of query filter params") + } +} + +func (pdb *db) selectFromTaskLists(filter *sqlplugin.TaskListsFilter) ([]sqlplugin.TaskListsRow, error) { + var err error + var row sqlplugin.TaskListsRow + err = pdb.conn.Get(&row, getTaskListQry, filter.ShardID, *filter.DomainID, *filter.Name, *filter.TaskType) + if err != nil { + return nil, err + } + return []sqlplugin.TaskListsRow{row}, err +} + +func (pdb *db) rangeSelectFromTaskLists(filter *sqlplugin.TaskListsFilter) ([]sqlplugin.TaskListsRow, error) { + var err error + var rows []sqlplugin.TaskListsRow + err = pdb.conn.Select(&rows, listTaskListQry, + filter.ShardID, *filter.DomainIDGreaterThan, *filter.NameGreaterThan, *filter.TaskTypeGreaterThan, *filter.PageSize) + if err != nil { + return nil, err + } + for i := range rows { + rows[i].ShardID = filter.ShardID + } + return rows, nil +} + +// DeleteFromTaskLists deletes a row from task_lists table +func (pdb *db) DeleteFromTaskLists(filter *sqlplugin.TaskListsFilter) (sql.Result, error) { + return pdb.conn.Exec(deleteTaskListQry, filter.ShardID, *filter.DomainID, *filter.Name, *filter.TaskType, *filter.RangeID) +} + +// LockTaskLists locks a row in task_lists table +func (pdb *db) LockTaskLists(filter *sqlplugin.TaskListsFilter) (int64, error) { + var rangeID int64 + err := pdb.conn.Get(&rangeID, lockTaskListQry, filter.ShardID, *filter.DomainID, *filter.Name, *filter.TaskType) + return rangeID, err +} diff --git a/common/persistence/sql/sqlplugin/postgres/typeconv.go b/common/persistence/sql/sqlplugin/postgres/typeconv.go new file mode 100644 index 00000000000..a5e5e0b1f8c --- /dev/null +++ b/common/persistence/sql/sqlplugin/postgres/typeconv.go @@ -0,0 +1,62 @@ +// Copyright (c) 2019 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package postgres + +import "time" + +var localZone, _ = time.Now().Zone() +var localOffset = getLocalOffset() + +type ( + // DataConverter defines the API for conversions to/from + // go types to postgres datatypes + // TODO https://github.com/uber/cadence/issues/2892 + // There are some reasons: + //r application layer is not consistent with timezone: for example, + // in some case we write timestamp with local timezone but when the time.Time + // is converted from "JSON"(from paging token), the timezone is missing + DataConverter interface { + ToPostgresDateTime(t time.Time) time.Time + FromPostgresDateTime(t time.Time) time.Time + } + converter struct{} +) + +// ToPostgresDateTime converts to time to Postgres datetime +func (c *converter) ToPostgresDateTime(t time.Time) time.Time { + zn, _ := t.Zone() + if zn != localZone { + nano := t.UnixNano() + t := time.Unix(0, nano) + return t + } + return t +} + +// FromPostgresDateTime converts postgres datetime and returns go time +func (c *converter) FromPostgresDateTime(t time.Time) time.Time { + return t.Add(-localOffset) +} + +func getLocalOffset() time.Duration { + _, offsetSecs := time.Now().Zone() + return time.Duration(offsetSecs) * time.Second +} diff --git a/common/persistence/sql/sqlplugin/postgres/visibility.go b/common/persistence/sql/sqlplugin/postgres/visibility.go new file mode 100644 index 00000000000..53c64e9830f --- /dev/null +++ b/common/persistence/sql/sqlplugin/postgres/visibility.go @@ -0,0 +1,227 @@ +// Copyright (c) 2019 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package postgres + +import ( + "database/sql" + "errors" + "fmt" + "strings" + + "github.com/uber/cadence/common/persistence/sql/sqlplugin" +) + +const ( + templateCreateWorkflowExecutionStarted = `INSERT INTO executions_visibility (` + + `domain_id, workflow_id, run_id, start_time, execution_time, workflow_type_name, memo, encoding) ` + + `VALUES ($1, $2, $3, $4, $5, $6, $7, $8) + ON CONFLICT (domain_id, run_id) DO NOTHING` + + templateCreateWorkflowExecutionClosed = `INSERT INTO executions_visibility (` + + `domain_id, workflow_id, run_id, start_time, execution_time, workflow_type_name, close_time, close_status, history_length, memo, encoding) ` + + `VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11) + ON CONFLICT (domain_id, run_id) DO UPDATE + SET workflow_id = excluded.workflow_id, + start_time = excluded.start_time, + execution_time = excluded.execution_time, + workflow_type_name = excluded.workflow_type_name, + close_time = excluded.close_time, + close_status = excluded.close_status, + history_length = excluded.history_length, + memo = excluded.memo, + encoding = excluded.encoding` + + // RunID condition is needed for correct pagination + templateConditions1 = ` AND domain_id = $1 + AND start_time >= $2 + AND start_time <= $3 + AND (run_id > $4 OR start_time < $5) + ORDER BY start_time DESC, run_id + LIMIT $6` + + templateConditions2 = ` AND domain_id = $2 + AND start_time >= $3 + AND start_time <= $4 + AND (run_id > $5 OR start_time < $6) + ORDER BY start_time DESC, run_id + LIMIT $7` + + templateOpenFieldNames = `workflow_id, run_id, start_time, execution_time, workflow_type_name, memo, encoding` + templateOpenSelect = `SELECT ` + templateOpenFieldNames + ` FROM executions_visibility WHERE close_status IS NULL ` + + templateClosedSelect = `SELECT ` + templateOpenFieldNames + `, close_time, close_status, history_length + FROM executions_visibility WHERE close_status IS NOT NULL ` + + templateGetOpenWorkflowExecutions = templateOpenSelect + templateConditions1 + + templateGetClosedWorkflowExecutions = templateClosedSelect + templateConditions1 + + templateGetOpenWorkflowExecutionsByType = templateOpenSelect + `AND workflow_type_name = $1` + templateConditions2 + + templateGetClosedWorkflowExecutionsByType = templateClosedSelect + `AND workflow_type_name = $1` + templateConditions2 + + templateGetOpenWorkflowExecutionsByID = templateOpenSelect + `AND workflow_id = $1` + templateConditions2 + + templateGetClosedWorkflowExecutionsByID = templateClosedSelect + `AND workflow_id = $1` + templateConditions2 + + templateGetClosedWorkflowExecutionsByStatus = templateClosedSelect + `AND close_status = $1` + templateConditions2 + + templateGetClosedWorkflowExecution = `SELECT workflow_id, run_id, start_time, execution_time, memo, encoding, close_time, workflow_type_name, close_status, history_length + FROM executions_visibility + WHERE domain_id = $1 AND close_status IS NOT NULL + AND run_id = $2` + + templateDeleteWorkflowExecution = "DELETE FROM executions_visibility WHERE domain_id=$1 AND run_id=$2" +) + +var errCloseParams = errors.New("missing one of {closeStatus, closeTime, historyLength} params") + +// InsertIntoVisibility inserts a row into visibility table. If an row already exist, +// its left as such and no update will be made +func (pdb *db) InsertIntoVisibility(row *sqlplugin.VisibilityRow) (sql.Result, error) { + row.StartTime = pdb.converter.ToPostgresDateTime(row.StartTime) + return pdb.conn.Exec(templateCreateWorkflowExecutionStarted, + row.DomainID, + row.WorkflowID, + row.RunID, + row.StartTime, + row.ExecutionTime, + row.WorkflowTypeName, + row.Memo, + row.Encoding) +} + +// ReplaceIntoVisibility replaces an existing row if it exist or creates a new row in visibility table +func (pdb *db) ReplaceIntoVisibility(row *sqlplugin.VisibilityRow) (sql.Result, error) { + switch { + case row.CloseStatus != nil && row.CloseTime != nil && row.HistoryLength != nil: + row.StartTime = pdb.converter.ToPostgresDateTime(row.StartTime) + closeTime := pdb.converter.ToPostgresDateTime(*row.CloseTime) + return pdb.conn.Exec(templateCreateWorkflowExecutionClosed, + row.DomainID, + row.WorkflowID, + row.RunID, + row.StartTime, + row.ExecutionTime, + row.WorkflowTypeName, + closeTime, + *row.CloseStatus, + *row.HistoryLength, + row.Memo, + row.Encoding) + default: + return nil, errCloseParams + } +} + +// DeleteFromVisibility deletes a row from visibility table if it exist +func (pdb *db) DeleteFromVisibility(filter *sqlplugin.VisibilityFilter) (sql.Result, error) { + return pdb.conn.Exec(templateDeleteWorkflowExecution, filter.DomainID, filter.RunID) +} + +// SelectFromVisibility reads one or more rows from visibility table +func (pdb *db) SelectFromVisibility(filter *sqlplugin.VisibilityFilter) ([]sqlplugin.VisibilityRow, error) { + var err error + var rows []sqlplugin.VisibilityRow + if filter.MinStartTime != nil { + *filter.MinStartTime = pdb.converter.ToPostgresDateTime(*filter.MinStartTime) + } + if filter.MaxStartTime != nil { + *filter.MaxStartTime = pdb.converter.ToPostgresDateTime(*filter.MaxStartTime) + } + switch { + case filter.MinStartTime == nil && filter.RunID != nil && filter.Closed: + var row sqlplugin.VisibilityRow + err = pdb.conn.Get(&row, templateGetClosedWorkflowExecution, filter.DomainID, *filter.RunID) + if err == nil { + rows = append(rows, row) + } + case filter.MinStartTime != nil && filter.WorkflowID != nil: + qry := templateGetOpenWorkflowExecutionsByID + if filter.Closed { + qry = templateGetClosedWorkflowExecutionsByID + } + err = pdb.conn.Select(&rows, + qry, + *filter.WorkflowID, + filter.DomainID, + pdb.converter.ToPostgresDateTime(*filter.MinStartTime), + pdb.converter.ToPostgresDateTime(*filter.MaxStartTime), + *filter.RunID, + *filter.MinStartTime, + *filter.PageSize) + case filter.MinStartTime != nil && filter.WorkflowTypeName != nil: + qry := templateGetOpenWorkflowExecutionsByType + if filter.Closed { + qry = templateGetClosedWorkflowExecutionsByType + } + err = pdb.conn.Select(&rows, + qry, + *filter.WorkflowTypeName, + filter.DomainID, + pdb.converter.ToPostgresDateTime(*filter.MinStartTime), + pdb.converter.ToPostgresDateTime(*filter.MaxStartTime), + *filter.RunID, + *filter.MaxStartTime, + *filter.PageSize) + case filter.MinStartTime != nil && filter.CloseStatus != nil: + err = pdb.conn.Select(&rows, + templateGetClosedWorkflowExecutionsByStatus, + *filter.CloseStatus, + filter.DomainID, + pdb.converter.ToPostgresDateTime(*filter.MinStartTime), + pdb.converter.ToPostgresDateTime(*filter.MaxStartTime), + *filter.RunID, + pdb.converter.ToPostgresDateTime(*filter.MaxStartTime), + *filter.PageSize) + case filter.MinStartTime != nil: + qry := templateGetOpenWorkflowExecutions + if filter.Closed { + qry = templateGetClosedWorkflowExecutions + } + minSt := pdb.converter.ToPostgresDateTime(*filter.MinStartTime) + maxSt := pdb.converter.ToPostgresDateTime(*filter.MaxStartTime) + err = pdb.conn.Select(&rows, + qry, + filter.DomainID, + minSt, + maxSt, + *filter.RunID, + maxSt, + *filter.PageSize) + default: + return nil, fmt.Errorf("invalid query filter") + } + if err != nil { + return nil, err + } + for i := range rows { + rows[i].StartTime = pdb.converter.FromPostgresDateTime(rows[i].StartTime) + rows[i].ExecutionTime = pdb.converter.FromPostgresDateTime(rows[i].ExecutionTime) + if rows[i].CloseTime != nil { + closeTime := pdb.converter.FromPostgresDateTime(*rows[i].CloseTime) + rows[i].CloseTime = &closeTime + } + rows[i].RunID = strings.TrimSpace(rows[i].RunID) + rows[i].WorkflowID = strings.TrimSpace(rows[i].WorkflowID) + } + return rows, err +} diff --git a/config/development_postgres.yaml b/config/development_postgres.yaml new file mode 100644 index 00000000000..406b4d27fb9 --- /dev/null +++ b/config/development_postgres.yaml @@ -0,0 +1,128 @@ +persistence: + defaultStore: postgres-default + visibilityStore: postgres-visibility + numHistoryShards: 4 + datastores: + postgres-default: + sql: + pluginName: "postgres" + databaseName: "cadence" + connectAddr: "127.0.0.1:5432" + connectProtocol: "tcp" + user: "postgres" + password: "cadence" + maxConns: 20 + maxIdleConns: 20 + maxConnLifetime: "1h" + postgres-visibility: + sql: + pluginName: "postgres" + databaseName: "cadence_visibility" + connectAddr: "127.0.0.1:5432" + connectProtocol: "tcp" + user: "postgres" + password: "cadence" + maxConns: 2 + maxIdleConns: 2 + maxConnLifetime: "1h" + +ringpop: + name: cadence + bootstrapMode: hosts + bootstrapHosts: ["127.0.0.1:7933", "127.0.0.1:7934", "127.0.0.1:7935"] + maxJoinDuration: 30s + +services: + frontend: + rpc: + port: 7933 + bindOnLocalHost: true + metrics: + statsd: + hostPort: "127.0.0.1:8125" + prefix: "cadence" + pprof: + port: 7936 + + matching: + rpc: + port: 7935 + bindOnLocalHost: true + metrics: + statsd: + hostPort: "127.0.0.1:8125" + prefix: "cadence" + pprof: + port: 7938 + + history: + rpc: + port: 7934 + bindOnLocalHost: true + metrics: + statsd: + hostPort: "127.0.0.1:8125" + prefix: "cadence" + pprof: + port: 7937 + + worker: + rpc: + port: 7939 + bindOnLocalHost: true + metrics: + statsd: + hostPort: "127.0.0.1:8125" + prefix: "cadence" + pprof: + port: 7940 + +clusterMetadata: + enableGlobalDomain: false + failoverVersionIncrement: 10 + masterClusterName: "active" + currentClusterName: "active" + clusterInformation: + active: + enabled: true + initialFailoverVersion: 0 + rpcName: "cadence-frontend" + rpcAddress: "localhost:7933" + +dcRedirectionPolicy: + policy: "noop" + toDC: "" + +archival: + history: + status: "enabled" + enableRead: true + provider: + filestore: + fileMode: "0666" + dirMode: "0766" + visibility: + status: "disabled" + enableRead: false + +domainDefaults: + archival: + history: + status: "enabled" + URI: "file:///tmp/cadence_archival/development" + visibility: + status: "disabled" + +kafka: + clusters: + test: + brokers: + - 127.0.0.1:9092 + topics: + cadence-visibility-dev: + cluster: test + cadence-visibility-dev-dlq: + cluster: test + +publicClient: + hostPort: "localhost:7933" diff --git a/docker/buildkite/docker-compose-local.yml b/docker/buildkite/docker-compose-local.yml index 1d7b3e02dcb..630a83ee808 100644 --- a/docker/buildkite/docker-compose-local.yml +++ b/docker/buildkite/docker-compose-local.yml @@ -24,6 +24,17 @@ services: aliases: - mysql + postgres: + image: postgres:12 + environment: + POSTGRES_PASSWORD: cadence + ports: + - "5432:5432" + networks: + services-network: + aliases: + - postgres + zookeeper: image: wurstmeister/zookeeper:3.4.6 ports: @@ -80,6 +91,7 @@ services: depends_on: - cassandra - mysql + - postgres volumes: - ../../:/cadence networks: diff --git a/docker/buildkite/docker-compose.yml b/docker/buildkite/docker-compose.yml index a10372e1189..a895ce7faa7 100644 --- a/docker/buildkite/docker-compose.yml +++ b/docker/buildkite/docker-compose.yml @@ -20,6 +20,17 @@ services: aliases: - mysql + postgres: + image: postgres:12 + environment: + POSTGRES_PASSWORD: cadence + ports: + - "5432:5432" + networks: + services-network: + aliases: + - postgres + zookeeper: image: wurstmeister/zookeeper:3.4.6 networks: @@ -56,6 +67,7 @@ services: environment: - "CASSANDRA_SEEDS=cassandra" - "MYSQL_SEEDS=mysql" + - "POSTGRES_SEEDS=postgres" - BUILDKITE_AGENT_ACCESS_TOKEN - BUILDKITE_JOB_ID - BUILDKITE_BUILD_ID @@ -63,6 +75,7 @@ services: depends_on: - cassandra - mysql + - postgres volumes: - ../../:/cadence - /usr/bin/buildkite-agent:/usr/bin/buildkite-agent diff --git a/environment/env.go b/environment/env.go index 02a34a96635..b3728c3563a 100644 --- a/environment/env.go +++ b/environment/env.go @@ -57,6 +57,13 @@ const ( ESPort = "ES_PORT" // ESDefaultPort ES default port ESDefaultPort = "9200" + + // PostgresSeeds env + PostgresSeeds = "POSTGRES_SEEDS" + // PostgresPort env + PostgresPort = "POSTGRES_PORT" + // PostgresDefaultPort Postgres default port + PostgresDefaultPort = "5432" ) // SetupEnv setup the necessary env @@ -89,6 +96,20 @@ func SetupEnv() { } } + if os.Getenv(PostgresSeeds) == "" { + err := os.Setenv(PostgresSeeds, Localhost) + if err != nil { + panic(fmt.Sprintf("error setting env %v", PostgresSeeds)) + } + } + + if os.Getenv(PostgresPort) == "" { + err := os.Setenv(PostgresPort, PostgresDefaultPort) + if err != nil { + panic(fmt.Sprintf("error setting env %v", PostgresPort)) + } + } + if os.Getenv(KafkaSeeds) == "" { err := os.Setenv(KafkaSeeds, Localhost) if err != nil { @@ -162,6 +183,28 @@ func GetMySQLPort() int { return p } +// GetPostgresAddress return the cassandra address +func GetPostgresAddress() string { + addr := os.Getenv(PostgresSeeds) + if addr == "" { + addr = Localhost + } + return addr +} + +// GetPostgresPort return the Postgres port +func GetPostgresPort() int { + port := os.Getenv(PostgresPort) + if port == "" { + port = PostgresDefaultPort + } + p, err := strconv.Atoi(port) + if err != nil { + panic(fmt.Sprintf("error getting env %v", PostgresPort)) + } + return p +} + // GetKafkaAddress return the kafka address func GetKafkaAddress() string { addr := os.Getenv(KafkaSeeds) diff --git a/go.mod b/go.mod index e821a5ccc74..8fcaae49a95 100644 --- a/go.mod +++ b/go.mod @@ -81,4 +81,5 @@ require ( gopkg.in/yaml.v2 v2.2.2 ) -replace github.com/jmoiron/sqlx v1.2.0 => github.com/mfateev/sqlx v0.0.0-20180910213730-fa49b1cf03f7 +// TODO https://github.com/uber/cadence/issues/2863 +replace github.com/jmoiron/sqlx v1.2.0 => github.com/longquanzheng/sqlx v0.0.0-20191125235044-053e6130695c diff --git a/go.sum b/go.sum index 8c6e60e9c07..91bceef814f 100644 --- a/go.sum +++ b/go.sum @@ -148,6 +148,8 @@ github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE= github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= github.com/lib/pq v1.2.0 h1:LXpIM/LZ5xGFhOpXAQUIMM1HdyqzVYM13zNdjCEEcA0= github.com/lib/pq v1.2.0/go.mod h1:5WUZQaWbwv1U+lTReE5YruASi9Al49XbQIvNi/34Woo= +github.com/longquanzheng/sqlx v0.0.0-20191125235044-053e6130695c h1:roq28TmqsdM4aPfYhSOBe2HHAbLKkcxAT1koIRB/Cvs= +github.com/longquanzheng/sqlx v0.0.0-20191125235044-053e6130695c/go.mod h1:0hGsexQ5Yab4iRNib+aRRwzqdLCK1XZTOArERUmxcJQ= github.com/m3db/prometheus_client_golang v0.8.1 h1:t7w/tcFws81JL1j5sqmpqcOyQOpH4RDOmIe3A3fdN3w= github.com/m3db/prometheus_client_golang v0.8.1/go.mod h1:8R/f1xYhXWq59KD/mbRqoBulXejss7vYtYzWmruNUwI= github.com/m3db/prometheus_client_model v0.1.0 h1:cg1+DiuyT6x8h9voibtarkH1KT6CmsewBSaBhe8wzLo= @@ -168,8 +170,6 @@ github.com/mattn/go-sqlite3 v1.11.0 h1:LDdKkqtYlom37fkvqs8rMPFKAMe8+SgjbwZ6ex1/A github.com/mattn/go-sqlite3 v1.11.0/go.mod h1:FPy6KqzDD04eiIsT53CuJW3U88zkxoIYsOqkbpncsNc= github.com/matttproud/golang_protobuf_extensions v1.0.1 h1:4hp9jkHxhMHkqkrB3Ix0jegS5sx/RkqARlsWZ6pIwiU= github.com/matttproud/golang_protobuf_extensions v1.0.1/go.mod h1:D8He9yQNgCq6Z5Ld7szi9bcBfOoFv/3dc6xSMkL2PC0= -github.com/mfateev/sqlx v0.0.0-20180910213730-fa49b1cf03f7 h1:YBIPjkTc/4t/3TqW2Wq5LxGMIugkKP+b96k3LpTQE+c= -github.com/mfateev/sqlx v0.0.0-20180910213730-fa49b1cf03f7/go.mod h1:8TWz2UQt6AHl0w84VLPA2vx6fpYC27UnvyHxGDlfzIw= github.com/mitchellh/colorstring v0.0.0-20190213212951-d06e56a500db/go.mod h1:l0dey0ia/Uv7NcFFVbCLtqEBQbrT4OCwCSKTEv6enCw= github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= diff --git a/schema/postgres/cadence/schema.sql b/schema/postgres/cadence/schema.sql index eebaeed4bc3..a57a56b6e46 100644 --- a/schema/postgres/cadence/schema.sql +++ b/schema/postgres/cadence/schema.sql @@ -5,7 +5,7 @@ CREATE TABLE domains( -- data BYTEA NOT NULL, data_encoding VARCHAR(16) NOT NULL, - is_global SMALLINT NOT NULL, + is_global BOOLEAN NOT NULL, PRIMARY KEY(shard_id, id) ); diff --git a/schema/postgres/cadence/versioned/v0.1/base.sql b/schema/postgres/cadence/versioned/v0.1/base.sql index e5dd990fb4a..39bb8540ac6 100644 --- a/schema/postgres/cadence/versioned/v0.1/base.sql +++ b/schema/postgres/cadence/versioned/v0.1/base.sql @@ -5,7 +5,7 @@ CREATE TABLE domains( -- data BYTEA NOT NULL, data_encoding VARCHAR(16) NOT NULL, - is_global SMALLINT NOT NULL, + is_global BOOLEAN NOT NULL, PRIMARY KEY(shard_id, id) );