Skip to content

Commit

Permalink
use view table
Browse files Browse the repository at this point in the history
  • Loading branch information
Veeupup committed Mar 31, 2022
1 parent cd8dcdc commit 0176db9
Show file tree
Hide file tree
Showing 8 changed files with 139 additions and 29 deletions.
10 changes: 4 additions & 6 deletions common/meta/types/src/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,6 @@ pub struct TableMeta {
pub engine_options: HashMap<String, String>,
pub options: HashMap<String, String>,
pub created_on: DateTime<Utc>,
pub view: Option<TableView>
}

impl TableInfo {
Expand Down Expand Up @@ -173,7 +172,6 @@ impl Default for TableMeta {
engine_options: HashMap::new(),
options: HashMap::new(),
created_on: Utc::now(),
view: None,
}
}
}
Expand All @@ -182,8 +180,8 @@ impl Display for TableMeta {
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
write!(
f,
"Engine: {}={:?}, Schema: {}, Options: {:?} CreatedOn: {:?} IsView: {:?}",
self.engine, self.engine_options, self.schema, self.options, self.created_on, self.view
"Engine: {}={:?}, Schema: {}, Options: {:?} CreatedOn: {:?}",
self.engine, self.engine_options, self.schema, self.options, self.created_on
)
}
}
Expand All @@ -192,8 +190,8 @@ impl Display for TableInfo {
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
write!(
f,
"DB.Table: {}, Table: {}-{}, Engine: {}, IsView: {:?}",
self.desc, self.name, self.ident, self.meta.engine, self.meta.view
"DB.Table: {}, Table: {}-{}, Engine: {}",
self.desc, self.name, self.ident, self.meta.engine
)
}
}
Expand Down
5 changes: 4 additions & 1 deletion query/src/interpreters/interpreter_view_create.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ use common_streams::SendableDataBlockStream;
use common_planners::CreateViewPlan;
use super::InsertInterpreter;
use crate::catalogs::Catalog;
use std::collections::HashMap;
use crate::interpreters::Interpreter;
use crate::interpreters::InterpreterPtr;
use crate::sessions::QueryContext;
Expand Down Expand Up @@ -88,14 +89,16 @@ impl Interpreter for CreateViewInterpreter {
impl CreateViewInterpreter {
async fn create_view(&self) -> Result<SendableDataBlockStream> {
let catalog = self.ctx.get_catalog();
let mut options = HashMap::new();
options.insert("query".to_string(), self.plan.subquery.clone());
let plan = CreateTableReq {
if_not_exists: self.plan.if_not_exists,
tenant: self.plan.tenant.clone(),
db: self.plan.db.clone(),
table: self.plan.viewname.clone(),
table_meta: TableMeta {
engine: "VIEW".to_string(),
view: Some(TableView { subquery:self.plan.subquery.clone() }),
options,
..Default::default()
}
};
Expand Down
36 changes: 19 additions & 17 deletions query/src/sql/statements/query/query_schema_joined_analyzer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,10 @@ use crate::sql::statements::query::query_schema_joined::JoinedSchema;
use crate::sql::statements::AnalyzableStatement;
use crate::sql::statements::AnalyzedResult;
use crate::sql::statements::DfQueryStatement;
use crate::storages::Table;
use crate::storages::view::ViewTable;
use crate::storages::view::view_table::VIEW_ENGINE;
use crate::storages::view::view_table::QUERY;

pub struct JoinedSchemaAnalyzer {
ctx: Arc<QueryContext>,
Expand Down Expand Up @@ -101,28 +105,26 @@ impl JoinedSchemaAnalyzer {
// TODO(Winter): await query_context.get_table
let (database, table) = self.resolve_table(&item.name)?;
let read_table = self.ctx.get_table(&database, &table).await?;

let tbl_info = read_table.get_table_info();
// TODO(veeupup) make view use subquery logic
if let Some(view) = &tbl_info.meta.view { // view, not table
// parse and make it subquery
let sql = view.subquery.clone();
let (statements, _) = DfParser::parse_sql(&sql)?;
if statements.len() == 1 {
if let DfStatement::Query(subquery) = &statements[0] {
match subquery.analyze(self.ctx.clone()).await? {
AnalyzedResult::SelectQuery(state) => {
let viewname = tbl_info.name.clone();
let alias = vec![viewname];
return JoinedSchema::from_subquery(state, alias);
},
_ => {}

if tbl_info.engine() == VIEW_ENGINE {
if let Some(query) = tbl_info.options().get(QUERY) {
let (statements, _) = DfParser::parse_sql(query.as_str())?;
if statements.len() == 1 {
if let DfStatement::Query(subquery) = &statements[0] {
match subquery.analyze(self.ctx.clone()).await? {
AnalyzedResult::SelectQuery(state) => {
let alias = vec![tbl_info.name.clone()];
return JoinedSchema::from_subquery(state, alias);
},
_ => {}
}
}
}
}
return Err(ErrorCode::LogicalError(
Err(ErrorCode::LogicalError(
"Logical error, subquery analyzed data must be SelectQuery, it's a bug.",
));
))
}else {
match &item.alias {
None => {
Expand Down
1 change: 1 addition & 0 deletions query/src/storages/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ pub mod index;
pub mod memory;
pub mod null;
pub mod system;
pub mod view;

mod s3;
mod storage_context;
Expand Down
5 changes: 3 additions & 2 deletions query/src/storages/storage_factory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ use crate::configs::Config;
use crate::storages::fuse::FuseTable;
use crate::storages::github::GithubTable;
use crate::storages::memory::MemoryTable;
use crate::storages::view::ViewTable;
use crate::storages::null::NullTable;
use crate::storages::StorageContext;
use crate::storages::Table;
Expand Down Expand Up @@ -109,8 +110,8 @@ impl StorageFactory {
// so we will need a table engine to present VIEW type
// TODO(veeupup) add a new table engine storage
creators.insert("VIEW".to_string(), Storage {
creator: Arc::new(MemoryTable::try_create),
descriptor: Arc::new(MemoryTable::description),
creator: Arc::new(ViewTable::try_create),
descriptor: Arc::new(ViewTable::description),
});

StorageFactory {
Expand Down
12 changes: 9 additions & 3 deletions query/src/storages/storage_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,9 @@ pub trait Table: Sync + Send {
&self,
_ctx: Arc<QueryContext>,
_push_downs: Option<Extras>,
) -> Result<(Statistics, Partitions)>;
) -> Result<(Statistics, Partitions)> {
unimplemented!()
}

fn table_args(&self) -> Option<Vec<Expression>> {
None
Expand All @@ -89,14 +91,18 @@ pub trait Table: Sync + Send {
&self,
_ctx: Arc<QueryContext>,
plan: &ReadDataSourcePlan,
) -> Result<SendableDataBlockStream>;
) -> Result<SendableDataBlockStream> {
unimplemented!()
}

fn read2(
&self,
_: Arc<QueryContext>,
_: &ReadDataSourcePlan,
_: &mut NewPipeline,
) -> Result<()>;
) -> Result<()> {
unimplemented!()
}

async fn append_data(
&self,
Expand Down
16 changes: 16 additions & 0 deletions query/src/storages/view/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
// Copyright 2021 Datafuse Labs.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

pub mod view_table;
pub use view_table::ViewTable;
83 changes: 83 additions & 0 deletions query/src/storages/view/view_table.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
// Copyright 2022 Datafuse Labs.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use std::any::Any;
use std::collections::HashSet;
use std::collections::VecDeque;
use std::sync::Arc;

use common_exception::ErrorCode;
use common_datablocks::DataBlock;
use common_datavalues::ColumnRef;
use common_exception::Result;
use common_infallible::Mutex;
use common_infallible::RwLock;
use common_meta_types::TableInfo;
use common_planners::Extras;
use common_planners::Partitions;
use common_planners::ReadDataSourcePlan;
use common_planners::Statistics;
use common_planners::TruncateTablePlan;
use common_streams::SendableDataBlockStream;

use crate::pipelines::new::processors::port::OutputPort;
use crate::pipelines::new::processors::processor::ProcessorPtr;
use crate::pipelines::new::processors::SyncSource;
use crate::pipelines::new::processors::SyncSourcer;
use crate::pipelines::new::NewPipeline;
use crate::pipelines::new::SourcePipeBuilder;
use crate::sessions::QueryContext;
use crate::storages::StorageContext;
use crate::storages::StorageDescription;
use crate::storages::Table;

pub struct ViewTable {
table_info: TableInfo,
pub query: String,
}

pub const VIEW_ENGINE: &str = "VIEW";
pub const QUERY: &str = "query";

impl ViewTable {
pub fn try_create(ctx: StorageContext, table_info: TableInfo) -> Result<Box<dyn Table>> {
let query = table_info.options().get(QUERY).cloned();
if let Some(query) = query {
Ok(Box::new(ViewTable {
query,
table_info,
}))
}else {
Err(ErrorCode::LogicalError("Need `query` when creating ViewTable"))
}
}

pub fn description() -> StorageDescription {
StorageDescription {
engine_name: "VIEW".to_string(),
comment: "VIEW STORAGE (LOGICAL VIEW)".to_string(),
}
}
}

#[async_trait::async_trait]
impl Table for ViewTable {
fn as_any(&self) -> &dyn Any {
self
}

fn get_table_info(&self) -> &TableInfo {
&self.table_info
}
}

0 comments on commit 0176db9

Please sign in to comment.