Skip to content

Commit

Permalink
sql: add shard_id to domains table (cadence-workflow#1683)
Browse files Browse the repository at this point in the history
  • Loading branch information
venkat1109 authored Apr 15, 2019
1 parent 6f3a9ba commit 0c50d08
Show file tree
Hide file tree
Showing 6 changed files with 60 additions and 26 deletions.
17 changes: 15 additions & 2 deletions common/persistence/sql/sqlMetadataManagerV2.go
Original file line number Diff line number Diff line change
Expand Up @@ -316,7 +316,15 @@ func (m *sqlMetadataManagerV2) GetMetadata() (*persistence.GetMetadataResponse,
}

func (m *sqlMetadataManagerV2) ListDomains(request *persistence.ListDomainsRequest) (*persistence.ListDomainsResponse, error) {
rows, err := m.db.SelectFromDomain(&sqldb.DomainFilter{})
var pageToken *sqldb.UUID
if request.NextPageToken != nil {
token := sqldb.UUID(request.NextPageToken)
pageToken = &token
}
rows, err := m.db.SelectFromDomain(&sqldb.DomainFilter{
GreaterThanID: pageToken,
PageSize: &request.PageSize,
})
if err != nil {
if err == sql.ErrNoRows {
return &persistence.ListDomainsResponse{}, nil
Expand All @@ -335,5 +343,10 @@ func (m *sqlMetadataManagerV2) ListDomains(request *persistence.ListDomainsReque
domains = append(domains, resp)
}

return &persistence.ListDomainsResponse{Domains: domains}, nil
resp := &persistence.ListDomainsResponse{Domains: domains}
if len(rows) >= request.PageSize {
resp.NextPageToken = rows[len(rows)-1].ID
}

return resp, nil
}
44 changes: 30 additions & 14 deletions common/persistence/sql/storage/mysql/domain.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,14 @@ package mysql

import (
"database/sql"
"errors"

"github.com/uber/cadence/common/persistence/sql/storage/sqldb"
)

const (
shardID = 54321

createDomainQry = `INSERT INTO domains (
id,
name,
Expand Down Expand Up @@ -81,7 +84,7 @@ const (
notification_version = :notification_version,
failover_notification_version = :failover_notification_version,
data = :data
WHERE name = :name AND id = :id`
WHERE shard_id=54321 AND name = :name AND id = :id`

getDomainPart = `SELECT
id,
Expand All @@ -102,20 +105,23 @@ const (
failover_notification_version,
data FROM domains
`
getDomainByIDQry = getDomainPart + `WHERE id = ?`
getDomainByNameQry = getDomainPart + `WHERE name = ?`
getDomainByIDQry = getDomainPart + `WHERE shard_id=? AND id = ?`
getDomainByNameQry = getDomainPart + `WHERE shard_id=? AND name = ?`

deleteDomainByIDQry = `DELETE FROM domains WHERE id = ?`
deleteDomainByNameQry = `DELETE FROM domains WHERE name = ?`
deleteDomainByIDQry = `DELETE FROM domains WHERE shard_id=? AND id = ?`
deleteDomainByNameQry = `DELETE FROM domains WHERE shard_id=? AND name = ?`

getDomainMetadataQry = `SELECT notification_version FROM domain_metadata`
lockDomainMetadataQry = `SELECT notification_version FROM domain_metadata FOR UPDATE`
updateDomainMetadataQry = `UPDATE domain_metadata SET notification_version = :notification_version + 1
WHERE notification_version = :notification_version`

listDomainsQry = getDomainPart
listDomainsQry = getDomainPart + ` WHERE shard_id=? ORDER BY id LIMIT ?`
listDomainsRangeQry = getDomainPart + ` WHERE shard_id=? AND id > ? ORDER BY id LIMIT ?`
)

var errMissingArgs = errors.New("missing one or more args for API")

// InsertIntoDomain inserts a single row into domains table
func (mdb *DB) InsertIntoDomain(row *sqldb.DomainRow) (sql.Result, error) {
return mdb.conn.NamedExec(createDomainQry, row)
Expand All @@ -128,30 +134,40 @@ func (mdb *DB) UpdateDomain(row *sqldb.DomainRow) (sql.Result, error) {

// SelectFromDomain reads one or more rows from domains table
func (mdb *DB) SelectFromDomain(filter *sqldb.DomainFilter) ([]sqldb.DomainRow, error) {
if filter.ID != nil || filter.Name != nil {
switch {
case filter.ID != nil || filter.Name != nil:
return mdb.selectFromDomain(filter)
case filter.PageSize != nil && *filter.PageSize > 0:
return mdb.selectAllFromDomain(filter)
default:
return nil, errMissingArgs
}
return mdb.selectAllFromDomain()
}

func (mdb *DB) selectFromDomain(filter *sqldb.DomainFilter) ([]sqldb.DomainRow, error) {
var err error
var row sqldb.DomainRow
switch {
case filter.ID != nil:
err = mdb.conn.Get(&row, getDomainByIDQry, *filter.ID)
err = mdb.conn.Get(&row, getDomainByIDQry, shardID, *filter.ID)
case filter.Name != nil:
err = mdb.conn.Get(&row, getDomainByNameQry, *filter.Name)
err = mdb.conn.Get(&row, getDomainByNameQry, shardID, *filter.Name)
}
if err != nil {
return nil, err
}
return []sqldb.DomainRow{row}, err
}

func (mdb *DB) selectAllFromDomain() ([]sqldb.DomainRow, error) {
func (mdb *DB) selectAllFromDomain(filter *sqldb.DomainFilter) ([]sqldb.DomainRow, error) {
var err error
var rows []sqldb.DomainRow
err := mdb.conn.Select(&rows, listDomainsQry)
switch {
case filter.GreaterThanID != nil:
err = mdb.conn.Select(&rows, listDomainsRangeQry, shardID, *filter.GreaterThanID, *filter.PageSize)
default:
err = mdb.conn.Select(&rows, listDomainsQry, shardID, filter.PageSize)
}
return rows, err
}

Expand All @@ -161,9 +177,9 @@ func (mdb *DB) DeleteFromDomain(filter *sqldb.DomainFilter) (sql.Result, error)
var result sql.Result
switch {
case filter.ID != nil:
result, err = mdb.conn.Exec(deleteDomainByIDQry, filter.ID)
result, err = mdb.conn.Exec(deleteDomainByIDQry, shardID, filter.ID)
default:
result, err = mdb.conn.Exec(deleteDomainByNameQry, filter.Name)
result, err = mdb.conn.Exec(deleteDomainByNameQry, shardID, filter.Name)
}
return result, err
}
Expand Down
6 changes: 4 additions & 2 deletions common/persistence/sql/storage/sqldb/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,10 @@ type (
// Name will be used for WHERE condition. When both ID and Name are nil,
// no WHERE clause will be used
DomainFilter struct {
ID *UUID
Name *string
ID *UUID
Name *string
GreaterThanID *UUID
PageSize *int
}

// DomainMetadataRow represents a row in domain_metadata table
Expand Down
7 changes: 3 additions & 4 deletions host/integration_cross_dc_domain_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -366,10 +366,6 @@ func (s *integrationCrossDCSuite) TestIntegrationRegisterGetDomain_GlobalDomainE
}

func (s *integrationCrossDCSuite) TestIntegrationRegisterListDomains() {
if TestFlags.PersistenceType == config.StoreTypeSQL {
s.T().Skip("skipping until sql supports ListDomains pagination")
return
}
// re-initialize to enable global domain
s.TearDownTest()
s.setupTest(true, true)
Expand Down Expand Up @@ -420,6 +416,9 @@ func (s *integrationCrossDCSuite) TestIntegrationRegisterListDomains() {
domains := append(resp1.Domains, resp2.Domains...)

for _, resp := range domains {
if resp.DomainInfo.GetName() == common.SystemDomainName {
continue // this domain is always created by schema file
}
s.True(strings.HasPrefix(resp.DomainInfo.GetName(), domainNamePrefix))
ss := strings.Split(*resp.DomainInfo.Name, "-")
s.Equal(2, len(ss))
Expand Down
6 changes: 4 additions & 2 deletions schema/mysql/v56/cadence/schema.sql
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
CREATE TABLE domains(
shard_id INT NOT NULL DEFAULT 54321,
/* domain */
id BINARY(16) PRIMARY KEY NOT NULL,
id BINARY(16) NOT NULL,
name VARCHAR(255) UNIQUE NOT NULL,
status INT NOT NULL,
description VARCHAR(255) NOT NULL,
Expand All @@ -19,8 +20,9 @@ CREATE TABLE domains(
is_global_domain TINYINT(1) NOT NULL,
/* domain_replication_config */
active_cluster_name VARCHAR(255) NOT NULL,
clusters BLOB
clusters BLOB,
/* end domain_replication_config */
PRIMARY KEY(shard_id, id)
);

CREATE TABLE domain_metadata (
Expand Down
6 changes: 4 additions & 2 deletions schema/mysql/v57/cadence/schema.sql
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
CREATE TABLE domains(
shard_id INT NOT NULL DEFAULT 54321,
/* domain */
id BINARY(16) PRIMARY KEY NOT NULL,
id BINARY(16) NOT NULL,
name VARCHAR(255) UNIQUE NOT NULL,
status INT NOT NULL,
description VARCHAR(255) NOT NULL,
Expand All @@ -19,8 +20,9 @@ CREATE TABLE domains(
is_global_domain TINYINT(1) NOT NULL,
/* domain_replication_config */
active_cluster_name VARCHAR(255) NOT NULL,
clusters BLOB
clusters BLOB,
/* end domain_replication_config */
PRIMARY KEY(shard_id, id)
);

CREATE TABLE domain_metadata (
Expand Down

0 comments on commit 0c50d08

Please sign in to comment.