diff --git a/common/persistence/nosql/nosqlplugin/cassandra/tests/cassandra_tool_cqlclient_test.go b/common/persistence/nosql/nosqlplugin/cassandra/tests/cassandra_tool_cqlclient_test.go index 5318ba4f825..20b349b4726 100644 --- a/common/persistence/nosql/nosqlplugin/cassandra/tests/cassandra_tool_cqlclient_test.go +++ b/common/persistence/nosql/nosqlplugin/cassandra/tests/cassandra_tool_cqlclient_test.go @@ -40,7 +40,7 @@ type ( } ) -var _ test.DB = (*cassandra.CqlClient)(nil) +var _ test.DB = (cassandra.CqlClient)(nil) func TestCQLClientTestSuite(t *testing.T) { testflags.RequireCassandra(t) diff --git a/common/persistence/nosql/nosqlplugin/cassandra/tests/cassandra_tool_setupTask_test.go b/common/persistence/nosql/nosqlplugin/cassandra/tests/cassandra_tool_setupTask_test.go index 5141949adef..f5a773ffa6e 100644 --- a/common/persistence/nosql/nosqlplugin/cassandra/tests/cassandra_tool_setupTask_test.go +++ b/common/persistence/nosql/nosqlplugin/cassandra/tests/cassandra_tool_setupTask_test.go @@ -37,7 +37,7 @@ import ( type ( SetupSchemaTestSuite struct { test.SetupSchemaTestBase - client *cassandra.CqlClient + client cassandra.CqlClient } ) diff --git a/common/persistence/nosql/nosqlplugin/cassandra/tests/utils.go b/common/persistence/nosql/nosqlplugin/cassandra/tests/utils.go index 5dc5486c610..4decd456be1 100644 --- a/common/persistence/nosql/nosqlplugin/cassandra/tests/utils.go +++ b/common/persistence/nosql/nosqlplugin/cassandra/tests/utils.go @@ -28,7 +28,7 @@ import ( // NOTE: change this when moving the test files around during refactoring const rootRelativePath = "../../../../../../" -func NewTestCQLClient(keyspace string) (*cassandra.CqlClient, error) { +func NewTestCQLClient(keyspace string) (cassandra.CqlClient, error) { return cassandra.NewCQLClient(&cassandra.CQLClientConfig{ Hosts: environment.GetCassandraAddress(), Port: cassandra.DefaultCassandraPort, diff --git a/tools/cassandra/cqlclient.go b/tools/cassandra/cqlclient.go index 0e6a39caf32..09187af1c3c 100644 --- a/tools/cassandra/cqlclient.go +++ b/tools/cassandra/cqlclient.go @@ -31,7 +31,27 @@ import ( ) type ( - CqlClient struct { + CqlClient interface { + CreateDatabase(name string) error + DropDatabase(name string) error + CreateKeyspace(name string) error + CreateNTSKeyspace(name string, datacenter string) error + DropKeyspace(name string) error + DropAllTables() error + CreateSchemaVersionTables() error + ReadSchemaVersion() (string, error) + UpdateSchemaVersion(newVersion string, minCompatibleVersion string) error + WriteSchemaUpdateLog(oldVersion string, newVersion string, manifestMD5 string, desc string) error + ExecDDLQuery(stmt string, args ...interface{}) error + Close() + ListTables() ([]string, error) + listTypes() ([]string, error) + dropTable(name string) error + dropType(name string) error + dropAllTablesTypes() error + } + + CqlClientImpl struct { nReplicas int session gocql.Session cfg *CQLClientConfig @@ -89,13 +109,13 @@ const ( `WITH replication = { 'class' : 'NetworkTopologyStrategy', '%v' : %v};` ) -var _ schema.SchemaClient = (*CqlClient)(nil) +var _ schema.SchemaClient = (*CqlClientImpl)(nil) // NewCQLClient returns a new instance of CQLClient -func NewCQLClient(cfg *CQLClientConfig) (*CqlClient, error) { +func NewCQLClient(cfg *CQLClientConfig) (CqlClient, error) { var err error - cqlClient := new(CqlClient) + cqlClient := new(CqlClientImpl) cqlClient.cfg = cfg cqlClient.nReplicas = cfg.NumReplicas cqlClient.session, err = gocql.GetRegisteredClient().CreateSession(gocql.ClusterConfig{ @@ -117,35 +137,35 @@ func NewCQLClient(cfg *CQLClientConfig) (*CqlClient, error) { return cqlClient, nil } -func (client *CqlClient) CreateDatabase(name string) error { +func (client *CqlClientImpl) CreateDatabase(name string) error { return client.CreateKeyspace(name) } -func (client *CqlClient) DropDatabase(name string) error { +func (client *CqlClientImpl) DropDatabase(name string) error { return client.DropKeyspace(name) } -// CreateKeyspace creates a cassandra Keyspace if it doesn't exist -func (client *CqlClient) CreateKeyspace(name string) error { +// CreateNTSKeyspace creates a cassandra Keyspace if it doesn't exist using network topology strategy +func (client *CqlClientImpl) CreateKeyspace(name string) error { return client.ExecDDLQuery(fmt.Sprintf(createKeyspaceCQL, name, client.nReplicas)) } // CreateNTSKeyspace creates a cassandra Keyspace if it doesn't exist using network topology strategy -func (client *CqlClient) CreateNTSKeyspace(name string, datacenter string) error { +func (client *CqlClientImpl) CreateNTSKeyspace(name string, datacenter string) error { return client.ExecDDLQuery(fmt.Sprintf(createNTSKeyspaceCQL, name, datacenter, client.nReplicas)) } // DropKeyspace drops a Keyspace -func (client *CqlClient) DropKeyspace(name string) error { +func (client *CqlClientImpl) DropKeyspace(name string) error { return client.ExecDDLQuery(fmt.Sprintf("DROP KEYSPACE %v", name)) } -func (client *CqlClient) DropAllTables() error { +func (client *CqlClientImpl) DropAllTables() error { return client.dropAllTablesTypes() } // CreateSchemaVersionTables sets up the schema version tables -func (client *CqlClient) CreateSchemaVersionTables() error { +func (client *CqlClientImpl) CreateSchemaVersionTables() error { if err := client.ExecDDLQuery(createSchemaVersionTableCQL); err != nil { return err } @@ -153,7 +173,7 @@ func (client *CqlClient) CreateSchemaVersionTables() error { } // ReadSchemaVersion returns the current schema version for the Keyspace -func (client *CqlClient) ReadSchemaVersion() (string, error) { +func (client *CqlClientImpl) ReadSchemaVersion() (string, error) { query := client.session.Query(readSchemaVersionCQL, client.cfg.Keyspace) iter := query.Iter() var version string @@ -168,13 +188,13 @@ func (client *CqlClient) ReadSchemaVersion() (string, error) { } // UpdateSchemaVersion updates the schema version for the Keyspace -func (client *CqlClient) UpdateSchemaVersion(newVersion string, minCompatibleVersion string) error { +func (client *CqlClientImpl) UpdateSchemaVersion(newVersion string, minCompatibleVersion string) error { query := client.session.Query(writeSchemaVersionCQL, client.cfg.Keyspace, time.Now(), newVersion, minCompatibleVersion) return query.Exec() } // WriteSchemaUpdateLog adds an entry to the schema update history table -func (client *CqlClient) WriteSchemaUpdateLog(oldVersion string, newVersion string, manifestMD5 string, desc string) error { +func (client *CqlClientImpl) WriteSchemaUpdateLog(oldVersion string, newVersion string, manifestMD5 string, desc string) error { now := time.Now().UTC() query := client.session.Query(writeSchemaUpdateHistoryCQL) query.Bind(now.Year(), int(now.Month()), now, oldVersion, newVersion, manifestMD5, desc) @@ -182,19 +202,19 @@ func (client *CqlClient) WriteSchemaUpdateLog(oldVersion string, newVersion stri } // ExecDDLQuery executes a cql statement -func (client *CqlClient) ExecDDLQuery(stmt string, args ...interface{}) error { +func (client *CqlClientImpl) ExecDDLQuery(stmt string, args ...interface{}) error { return client.session.Query(stmt, args...).Exec() } // Close closes the cql client -func (client *CqlClient) Close() { +func (client *CqlClientImpl) Close() { if client.session != nil { client.session.Close() } } // ListTables lists the table names in a Keyspace -func (client *CqlClient) ListTables() ([]string, error) { +func (client *CqlClientImpl) ListTables() ([]string, error) { query := client.session.Query(listTablesCQL, client.cfg.Keyspace) iter := query.Iter() var names []string @@ -209,7 +229,7 @@ func (client *CqlClient) ListTables() ([]string, error) { } // listTypes lists the User defined types in a Keyspace -func (client *CqlClient) listTypes() ([]string, error) { +func (client *CqlClientImpl) listTypes() ([]string, error) { qry := client.session.Query(listTypesCQL, client.cfg.Keyspace) iter := qry.Iter() var names []string @@ -224,18 +244,18 @@ func (client *CqlClient) listTypes() ([]string, error) { } // dropTable drops a given table from the Keyspace -func (client *CqlClient) dropTable(name string) error { +func (client *CqlClientImpl) dropTable(name string) error { return client.ExecDDLQuery(fmt.Sprintf("DROP TABLE %v", name)) } // dropType drops a given type from the Keyspace -func (client *CqlClient) dropType(name string) error { +func (client *CqlClientImpl) dropType(name string) error { return client.ExecDDLQuery(fmt.Sprintf("DROP TYPE %v", name)) } // dropAllTablesTypes deletes all tables/types in the // Keyspace without deleting the Keyspace -func (client *CqlClient) dropAllTablesTypes() error { +func (client *CqlClientImpl) dropAllTablesTypes() error { tables, err := client.ListTables() if err != nil { return err