Skip to content

Commit

Permalink
refactor: catalog managers (GreptimeTeam#2405)
Browse files Browse the repository at this point in the history
* feat: rename catalog::local to catalog::memory

* refactor: catalog managers

* chore: license header
  • Loading branch information
killme2008 authored Sep 15, 2023
1 parent 364b99a commit 43e3c94
Show file tree
Hide file tree
Showing 41 changed files with 598 additions and 520 deletions.
5 changes: 5 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ members = [
"src/mito",
"src/mito2",
"src/object-store",
"src/operator",
"src/partition",
"src/promql",
"src/query",
Expand Down Expand Up @@ -148,6 +149,7 @@ meta-client = { path = "src/meta-client" }
meta-srv = { path = "src/meta-srv" }
mito = { path = "src/mito" }
mito2 = { path = "src/mito2" }
operator = { path = "src/operator" }
object-store = { path = "src/object-store" }
partition = { path = "src/partition" }
promql = { path = "src/promql" }
Expand Down
1 change: 1 addition & 0 deletions src/catalog/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ meta-client = { workspace = true }
metrics.workspace = true
moka = { version = "0.11", features = ["future"] }
parking_lot = "0.12"
partition.workspace = true
regex.workspace = true
serde.workspace = true
serde_json = "1.0"
Expand Down
3 changes: 3 additions & 0 deletions src/catalog/src/remote.rs → src/catalog/src/kvbackend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,13 @@ use std::sync::Arc;
pub use client::{CachedMetaKvBackend, MetaKvBackend};

mod client;
mod manager;

#[cfg(feature = "testing")]
pub mod mock;
pub use manager::KvBackendCatalogManager;

/// KvBackend cache invalidator
#[async_trait::async_trait]
pub trait KvCacheInvalidator: Send + Sync {
async fn invalidate_key(&self, key: &[u8]);
Expand Down
File renamed without changes.
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,6 @@ use std::any::Any;
use std::collections::BTreeSet;
use std::sync::{Arc, Weak};

use catalog::error::{
self as catalog_err, ListCatalogsSnafu, ListSchemasSnafu, Result as CatalogResult,
TableMetadataManagerSnafu,
};
use catalog::information_schema::{InformationSchemaProvider, COLUMNS, TABLES};
use catalog::remote::KvCacheInvalidatorRef;
use catalog::CatalogManager;
use common_catalog::consts::{DEFAULT_SCHEMA_NAME, INFORMATION_SCHEMA_NAME, NUMBERS_TABLE_ID};
use common_error::ext::BoxedError;
use common_meta::cache_invalidator::{CacheInvalidator, Context};
Expand All @@ -40,19 +33,26 @@ use common_telemetry::debug;
use futures_util::TryStreamExt;
use partition::manager::{PartitionRuleManager, PartitionRuleManagerRef};
use snafu::prelude::*;
use table::dist_table::DistTable;
use table::metadata::TableId;
use table::table::numbers::{NumbersTable, NUMBERS_TABLE_NAME};
use table::TableRef;

use crate::table::DistTable;
use crate::error::{
self as catalog_err, ListCatalogsSnafu, ListSchemasSnafu, Result as CatalogResult,
TableMetadataManagerSnafu,
};
use crate::information_schema::{InformationSchemaProvider, COLUMNS, TABLES};
use crate::kvbackend::KvCacheInvalidatorRef;
use crate::CatalogManager;

/// Access all existing catalog, schema and tables.
///
/// The result comes from two source, all the user tables are presented in
/// a kv-backend which persists the metadata of a table. And system tables
/// comes from [SystemCatalog], which is static and read-only.
#[derive(Clone)]
pub struct FrontendCatalogManager {
pub struct KvBackendCatalogManager {
// TODO(LFC): Maybe use a real implementation for Standalone mode.
// Now we use `NoopKvCacheInvalidator` for Standalone mode. In Standalone mode, the KV backend
// is implemented by RaftEngine. Maybe we need a cache for it?
Expand All @@ -65,9 +65,10 @@ pub struct FrontendCatalogManager {
}

#[async_trait::async_trait]
impl CacheInvalidator for FrontendCatalogManager {
impl CacheInvalidator for KvBackendCatalogManager {
async fn invalidate_table_name(&self, _ctx: &Context, table_name: TableName) -> MetaResult<()> {
let key: TableNameKey = (&table_name).into();

self.backend_cache_invalidator
.invalidate_key(&key.as_raw_key())
.await;
Expand Down Expand Up @@ -102,7 +103,7 @@ impl CacheInvalidator for FrontendCatalogManager {
}
}

impl FrontendCatalogManager {
impl KvBackendCatalogManager {
pub fn new(
backend: KvBackendRef,
backend_cache_invalidator: KvCacheInvalidatorRef,
Expand Down Expand Up @@ -139,7 +140,7 @@ impl FrontendCatalogManager {
}

#[async_trait::async_trait]
impl CatalogManager for FrontendCatalogManager {
impl CatalogManager for KvBackendCatalogManager {
async fn catalog_names(&self) -> CatalogResult<Vec<String>> {
let stream = self
.table_metadata_manager
Expand Down Expand Up @@ -278,7 +279,7 @@ impl CatalogManager for FrontendCatalogManager {
/// - information_schema.columns
#[derive(Clone)]
struct SystemCatalog {
catalog_manager: Weak<FrontendCatalogManager>,
catalog_manager: Weak<KvBackendCatalogManager>,
}

impl SystemCatalog {
Expand Down
File renamed without changes.
4 changes: 2 additions & 2 deletions src/catalog/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,9 @@ use crate::error::Result;

pub mod error;
pub mod information_schema;
pub mod local;
pub mod kvbackend;
pub mod memory;
mod metrics;
pub mod remote;
pub mod system;
pub mod table_source;

Expand Down
4 changes: 2 additions & 2 deletions src/catalog/src/local.rs → src/catalog/src/memory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,6 @@
// See the License for the specific language governing permissions and
// limitations under the License.

pub mod memory;
pub mod manager;

pub use memory::{new_memory_catalog_manager, MemoryCatalogManager};
pub use manager::{new_memory_catalog_manager, MemoryCatalogManager};
File renamed without changes.
2 changes: 1 addition & 1 deletion src/catalog/src/table_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ mod tests {
use session::context::QueryContext;

use super::*;
use crate::local::MemoryCatalogManager;
use crate::memory::MemoryCatalogManager;

#[test]
fn test_validate_table_ref() {
Expand Down
5 changes: 2 additions & 3 deletions src/cmd/src/cli/repl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use std::path::PathBuf;
use std::sync::Arc;
use std::time::Instant;

use catalog::remote::CachedMetaKvBackend;
use catalog::kvbackend::{CachedMetaKvBackend, KvBackendCatalogManager};
use client::client_manager::DatanodeClients;
use client::{Client, Database, DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
use common_base::Plugins;
Expand All @@ -25,7 +25,6 @@ use common_query::Output;
use common_recordbatch::RecordBatches;
use common_telemetry::logging;
use either::Either;
use frontend::catalog::FrontendCatalogManager;
use meta_client::client::MetaClientBuilder;
use query::datafusion::DatafusionQueryEngine;
use query::logical_optimizer::LogicalOptimizer;
Expand Down Expand Up @@ -253,7 +252,7 @@ async fn create_query_engine(meta_addr: &str) -> Result<DatafusionQueryEngine> {

let datanode_clients = Arc::new(DatanodeClients::default());

let catalog_list = FrontendCatalogManager::new(
let catalog_list = KvBackendCatalogManager::new(
cached_meta_backend.clone(),
cached_meta_backend.clone(),
datanode_clients,
Expand Down
5 changes: 2 additions & 3 deletions src/cmd/src/standalone.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@

use std::sync::Arc;

use catalog::remote::DummyKvCacheInvalidator;
use catalog::kvbackend::{DummyKvCacheInvalidator, KvBackendCatalogManager};
use catalog::CatalogManagerRef;
use clap::Parser;
use common_base::Plugins;
Expand All @@ -26,7 +26,6 @@ use common_telemetry::logging::LoggingOptions;
use datanode::datanode::builder::DatanodeBuilder;
use datanode::datanode::{Datanode, DatanodeOptions, ProcedureConfig, StorageConfig};
use datanode::region_server::RegionServer;
use frontend::catalog::FrontendCatalogManager;
use frontend::frontend::FrontendOptions;
use frontend::instance::{FrontendInstance, Instance as FeInstance, StandaloneDatanodeManager};
use frontend::service_config::{
Expand Down Expand Up @@ -313,7 +312,7 @@ impl StartCommand {
.context(StartDatanodeSnafu)?;
let region_server = datanode.region_server();

let catalog_manager = FrontendCatalogManager::new(
let catalog_manager = KvBackendCatalogManager::new(
kv_store.clone(),
Arc::new(DummyKvCacheInvalidator),
Arc::new(StandaloneDatanodeManager(region_server.clone())),
Expand Down
2 changes: 1 addition & 1 deletion src/datanode/src/datanode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use std::path::Path;
use std::sync::Arc;
use std::time::Duration;

use catalog::local::MemoryCatalogManager;
use catalog::memory::MemoryCatalogManager;
use common_base::readable_size::ReadableSize;
use common_base::Plugins;
use common_config::WalConfig;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
// limitations under the License.

use async_trait::async_trait;
use catalog::remote::KvCacheInvalidatorRef;
use catalog::kvbackend::KvCacheInvalidatorRef;
use common_meta::error::Result as MetaResult;
use common_meta::heartbeat::handler::{
HandleControl, HeartbeatResponseHandler, HeartbeatResponseHandlerContext,
Expand Down
2 changes: 1 addition & 1 deletion src/frontend/src/heartbeat/handler/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use std::collections::HashMap;
use std::sync::{Arc, Mutex};

use api::v1::meta::HeartbeatResponse;
use catalog::remote::KvCacheInvalidator;
use catalog::kvbackend::KvCacheInvalidator;
use common_meta::heartbeat::handler::{
HandlerGroupExecutor, HeartbeatResponseHandlerContext, HeartbeatResponseHandlerExecutor,
};
Expand Down
5 changes: 2 additions & 3 deletions src/frontend/src/instance.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ use std::time::Duration;
use api::v1::meta::Role;
use async_trait::async_trait;
use auth::{PermissionChecker, PermissionCheckerRef, PermissionReq};
use catalog::remote::CachedMetaKvBackend;
use catalog::kvbackend::{CachedMetaKvBackend, KvBackendCatalogManager};
use catalog::CatalogManagerRef;
use client::client_manager::DatanodeClients;
use common_base::Plugins;
Expand Down Expand Up @@ -80,7 +80,6 @@ pub use standalone::StandaloneDatanodeManager;

use self::distributed::DistRegionRequestHandler;
use self::standalone::StandaloneTableMetadataCreator;
use crate::catalog::FrontendCatalogManager;
use crate::delete::{Deleter, DeleterRef};
use crate::error::{
self, Error, ExecLogicalPlanSnafu, ExecutePromqlSnafu, ExternalSnafu, MissingMetasrvOptsSnafu,
Expand Down Expand Up @@ -151,7 +150,7 @@ impl Instance {
) -> Result<Self> {
let meta_backend = Arc::new(CachedMetaKvBackend::new(meta_client.clone()));

let catalog_manager = FrontendCatalogManager::new(
let catalog_manager = KvBackendCatalogManager::new(
meta_backend.clone(),
meta_backend.clone(),
datanode_clients.clone(),
Expand Down
3 changes: 2 additions & 1 deletion src/frontend/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
#![feature(assert_matches)]
#![feature(trait_upcasting)]

pub mod catalog;
pub(crate) mod delete;
pub mod error;
pub mod expr_factory;
Expand All @@ -31,5 +30,7 @@ mod server;
pub mod service_config;
pub mod statement;
pub mod table;
#[cfg(test)]
pub(crate) mod tests;

pub const MAX_VALUE: &str = "MAXVALUE";
2 changes: 1 addition & 1 deletion src/frontend/src/req_convert/delete/table_to_region.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ mod tests {
use store_api::storage::RegionId;

use super::*;
use crate::table::test::{create_partition_rule_manager, new_test_table_info};
use crate::tests::{create_partition_rule_manager, new_test_table_info};

async fn prepare_mocked_backend() -> KvBackendRef {
let backend = Arc::new(MemoryKvBackend::default());
Expand Down
2 changes: 1 addition & 1 deletion src/frontend/src/req_convert/insert/table_to_region.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ mod tests {
use store_api::storage::RegionId;

use super::*;
use crate::table::test::{create_partition_rule_manager, new_test_table_info};
use crate::tests::{create_partition_rule_manager, new_test_table_info};

async fn prepare_mocked_backend() -> KvBackendRef {
let backend = Arc::new(MemoryKvBackend::default());
Expand Down
2 changes: 1 addition & 1 deletion src/frontend/src/statement/ddl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ use sql::ast::Value as SqlValue;
use sql::statements::alter::AlterTable;
use sql::statements::create::{CreateExternalTable, CreateTable, Partitions};
use sql::statements::sql_value_to_value;
use table::dist_table::DistTable;
use table::metadata::{self, RawTableInfo, RawTableMeta, TableId, TableInfo, TableType};
use table::requests::{AlterTableRequest, TableOptions};
use table::TableRef;
Expand All @@ -48,7 +49,6 @@ use crate::error::{
DeserializePartitionSnafu, ParseSqlSnafu, Result, SchemaNotFoundSnafu,
TableMetadataManagerSnafu, TableNotFoundSnafu, UnrecognizedTableOptionSnafu,
};
use crate::table::DistTable;
use crate::{expr_factory, MAX_VALUE};

impl StatementExecutor {
Expand Down
Loading

0 comments on commit 43e3c94

Please sign in to comment.