Skip to content

Commit

Permalink
Implement MongoDB plugin Part1: skeleton and ConfigStore (cadence-wo…
Browse files Browse the repository at this point in the history
  • Loading branch information
longquanzheng authored Nov 12, 2021
1 parent d61a3b3 commit 46b84be
Show file tree
Hide file tree
Showing 35 changed files with 1,502 additions and 32 deletions.
1 change: 1 addition & 0 deletions CONTRIBUTING.md
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ Also use `docker-compose -f ./docker/dev/cassandra.yml down` to stop and clean u
* Alternatively, use `./docker/dev/cassandra-esv7-kafka.yml` for Cassandra, ElasticSearch(v7) and Kafka/ZooKeeper dependencies
* Alternatively, use `./docker/dev/mysql-esv7-kafka.yml` for MySQL, ElasticSearch(v7) and Kafka/ZooKeeper dependencies
* Alternatively, use `./docker/dev/cassandra-opensearch-kafka.yml` for Cassandra, OpenSearch(compatible with ElasticSearch v7) and Kafka/ZooKeeper dependencies
* Alternatively, use `./docker/dev/mongo-esv7-kafka.yml` for MongoDB, ElasticSearch(v7) and Kafka/ZooKeeper dependencies

### 3. Schema installation
Based on the above dependency setup, you also need to install the schemas.
Expand Down
3 changes: 0 additions & 3 deletions common/persistence/nosql/nosqlPersistenceTest.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,10 +78,7 @@ func (s *testCluster) Config() config.Persistence {

// SetupTestDatabase from PersistenceTestCluster interface
func (s *testCluster) SetupTestDatabase() {
// the keyspace is not created yet, so use empty and let the NoSQL DB to decide how to connect
s.cfg.Keyspace = ""
adminDB, err := NewNoSQLAdminDB(&s.cfg, loggerimpl.NewNopLogger())
s.cfg.Keyspace = s.keyspace // change it back

if err != nil {
log.Fatal(err)
Expand Down
16 changes: 3 additions & 13 deletions common/persistence/nosql/nosqlplugin/cassandra/admin.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,11 +48,11 @@ func (db *cdb) SetupTestDatabase(schemaBaseDir string) error {
return err
}
if schemaBaseDir == "" {
cadencePackageDir, err := getCadencePackageDir()
var err error
schemaBaseDir, err = nosqlplugin.GetDefaultTestSchemaDir(testSchemaDir)
if err != nil {
log.Fatal(err)
return err
}
schemaBaseDir = cadencePackageDir + testSchemaDir
}

err = db.loadSchema([]string{"schema.cql"}, schemaBaseDir)
Expand Down Expand Up @@ -88,16 +88,6 @@ func (db *cdb) loadVisibilitySchema(fileNames []string, schemaBaseDir string) er
return nil
}

func getCadencePackageDir() (string, error) {
cadencePackageDir, err := os.Getwd()
if err != nil {
panic(err)
}
cadenceIndex := strings.LastIndex(cadencePackageDir, "/cadence/")
cadencePackageDir = cadencePackageDir[:cadenceIndex+len("/cadence/")]
return cadencePackageDir, err
}

func (db *cdb) TeardownTestDatabase() error {
err := dropCassandraKeyspace(db.session, db.cfg.Keyspace)
if err != nil {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,10 @@ import (
)

const (
// version is the clustering key(DESC order) so this query will always return the record with largest version
templateSelectLatestConfig = `SELECT row_type, version, timestamp, values, encoding FROM cluster_config WHERE row_type = ? LIMIT 1;`

templateInsertConfig = `INSERT INTO cluster_config (row_type, version, timestamp, values, encoding) VALUES (?, ?, ?, ?, ?) IF NOT EXISTS;`
//for version value, x + 1 where x is the cached copy version.
)

func (db *cdb) InsertConfig(ctx context.Context, row *persistence.InternalConfigStoreEntry) error {
Expand Down
8 changes: 8 additions & 0 deletions common/persistence/nosql/nosqlplugin/cassandra/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,14 @@ func (p *plugin) CreateDB(cfg *config.NoSQL, logger log.Logger) (nosqlplugin.DB,

// CreateAdminDB initialize the AdminDB object
func (p *plugin) CreateAdminDB(cfg *config.NoSQL, logger log.Logger) (nosqlplugin.AdminDB, error) {
// the keyspace is not created yet, so use empty and let the Cassandra connect
keyspace := cfg.Keyspace
cfg.Keyspace = ""
// change it back
defer func() {
cfg.Keyspace = keyspace
}()

return p.doCreateDB(cfg, logger)
}

Expand Down
45 changes: 45 additions & 0 deletions common/persistence/nosql/nosqlplugin/common.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
// Copyright (c) 2021 Uber Technologies, Inc.
// Portions of the Software are attributed to Copyright (c) 2020 Temporal Technologies Inc.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.

package nosqlplugin

import (
"os"
"strings"
)

func getCadencePackageDir() (string, error) {
cadencePackageDir, err := os.Getwd()
if err != nil {
panic(err)
}
cadenceIndex := strings.LastIndex(cadencePackageDir, "/cadence/")
cadencePackageDir = cadencePackageDir[:cadenceIndex+len("/cadence/")]
return cadencePackageDir, err
}

func GetDefaultTestSchemaDir(testSchemaRelativePath string) (string, error) {
cadencePackageDir, err := getCadencePackageDir()
if err != nil {
return "", err
}
return cadencePackageDir + testSchemaRelativePath, nil
}
8 changes: 6 additions & 2 deletions common/persistence/nosql/nosqlplugin/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,9 @@ type (
}

/**
* VisibilityCRUD is for visibility storage
* VisibilityCRUD is for visibility using database.
* Database visibility usually is no longer recommended. AdvancedVisibility(with Kafka+ElasticSearch) is more powerful and scalable.
* Feel free to skip this interface for any NoSQL plugin(use TODO() in the implementation)
*
* Recommendation: use one table with multiple indexes
*
Expand Down Expand Up @@ -510,15 +512,17 @@ type (
}

/***
* configStoreCRUD is for storing dynamic configuration parameters
* ConfigStoreCRUD is for storing dynamic configuration parameters
*
* Recommendation: one table
*
* Significant columns:
* domain: partition key(row_type), range key(version)
*/
ConfigStoreCRUD interface {
// InsertConfig insert a config entry with version. Return nosqlplugin.NewConditionFailure if the same version of the row_type is existing
InsertConfig(ctx context.Context, row *persistence.InternalConfigStoreEntry) error
// SelectLatestConfig returns the config entry of the row_type with the largest(latest) version value
SelectLatestConfig(ctx context.Context, rowType int) (*persistence.InternalConfigStoreEntry, error)
}
)
71 changes: 71 additions & 0 deletions common/persistence/nosql/nosqlplugin/mongodb/admin.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
// Copyright (c) 2021 Uber Technologies, Inc.
// Portions of the Software are attributed to Copyright (c) 2020 Temporal Technologies Inc.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.

package mongodb

import (
"context"
"io/ioutil"

"go.mongodb.org/mongo-driver/bson"

"github.com/uber/cadence/common/persistence/nosql/nosqlplugin"
)

var _ nosqlplugin.AdminDB = (*mdb)(nil)

const (
testSchemaDir = "schema/mongodb/"
)

func (db *mdb) SetupTestDatabase(schemaBaseDir string) error {
if schemaBaseDir == "" {
var err error
schemaBaseDir, err = nosqlplugin.GetDefaultTestSchemaDir(testSchemaDir)
if err != nil {
return err
}
}

schemaFile := schemaBaseDir + "cadence/schema.json"
byteValues, err := ioutil.ReadFile(schemaFile)
if err != nil {
return err
}
var commands []interface{}
err = bson.UnmarshalExtJSON(byteValues, false, &commands)
if err != nil {
return err
}
for _, cmd := range commands {
result := db.dbConn.RunCommand(context.Background(), cmd)
if result.Err() != nil {
return err
}
}
return nil
}

func (db *mdb) TeardownTestDatabase() error {
result := db.dbConn.RunCommand(context.Background(), bson.D{{"dropDatabase", 1}})
err := result.Err()
return err
}
70 changes: 70 additions & 0 deletions common/persistence/nosql/nosqlplugin/mongodb/configStore.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
// Copyright (c) 2020 Uber Technologies, Inc.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.

package mongodb

import (
"context"
"time"

"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/mongo"
"go.mongodb.org/mongo-driver/mongo/options"

"github.com/uber/cadence/common"
"github.com/uber/cadence/common/persistence"
"github.com/uber/cadence/common/persistence/nosql/nosqlplugin"
"github.com/uber/cadence/schema/mongodb/cadence"
)

func (db *mdb) InsertConfig(ctx context.Context, row *persistence.InternalConfigStoreEntry) error {
collection := db.dbConn.Collection(cadence.ClusterConfigCollectionName)
doc := cadence.ClusterConfigCollectionEntry{
RowType: row.RowType,
Version: row.Version,
UnixTimestampSeconds: row.Timestamp.Unix(),
Data: row.Values.Data,
DataEncoding: row.Values.GetEncodingString(),
}
_, err := collection.InsertOne(ctx, doc)
if mongo.IsDuplicateKeyError(err) {
return nosqlplugin.NewConditionFailure("InsertConfig operation failed because of version collision")
}
return err
}

func (db *mdb) SelectLatestConfig(ctx context.Context, rowType int) (*persistence.InternalConfigStoreEntry, error) {
filter := bson.D{{"rowtype", rowType}}
queryOptions := options.FindOneOptions{}
queryOptions.SetSort(bson.D{{"version", -1}})

collection := db.dbConn.Collection(cadence.ClusterConfigCollectionName)
var result cadence.ClusterConfigCollectionEntry
err := collection.FindOne(ctx, filter, &queryOptions).Decode(&result)
if err != nil {
return nil, err
}
return &persistence.InternalConfigStoreEntry{
RowType: rowType,
Version: result.Version,
Timestamp: time.Unix(result.UnixTimestampSeconds, 0),
Values: persistence.NewDataBlob(result.Data, common.EncodingType(result.DataEncoding)),
}, nil
}
49 changes: 49 additions & 0 deletions common/persistence/nosql/nosqlplugin/mongodb/db.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
// Copyright (c) 2020 Uber Technologies, Inc.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.

package mongodb

import (
"context"

"go.mongodb.org/mongo-driver/mongo"

"github.com/uber/cadence/common/config"
"github.com/uber/cadence/common/log"
"github.com/uber/cadence/common/persistence/nosql/nosqlplugin"
)

// mdb represents a logical connection to MongoDB database
type mdb struct {
client *mongo.Client
dbConn *mongo.Database
cfg *config.NoSQL
logger log.Logger
}

var _ nosqlplugin.DB = (*mdb)(nil)

func (db *mdb) Close() {
db.client.Disconnect(context.Background())
}

func (db *mdb) PluginName() string {
return PluginName
}
Loading

0 comments on commit 46b84be

Please sign in to comment.