Skip to content

Commit

Permalink
Merge pull request databendlabs#7080 from b41sh/feat-access-tuple
Browse files Browse the repository at this point in the history
feat(planner): support tuple map access pushdown to storage
  • Loading branch information
mergify[bot] authored Aug 13, 2022
2 parents d61140a + 33c42e1 commit c1cd0bf
Show file tree
Hide file tree
Showing 35 changed files with 772 additions and 233 deletions.
49 changes: 49 additions & 0 deletions common/datavalues/src/data_schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ use common_exception::ErrorCode;
use common_exception::Result;

use crate::types::data_type::DataType;
use crate::types::data_type::DataTypeImpl;
use crate::DataField;
use crate::TypeDeserializerImpl;

Expand Down Expand Up @@ -141,6 +142,54 @@ impl DataSchema {
Self::new_from(fields, self.meta().clone())
}

/// project with inner columns by path.
pub fn inner_project(&self, path_indices: &BTreeMap<usize, Vec<usize>>) -> Self {
let paths: Vec<Vec<usize>> = path_indices.values().cloned().collect();
let fields = paths
.iter()
.map(|path| Self::traverse_paths(self.fields(), path).unwrap())
.collect();
Self::new_from(fields, self.meta().clone())
}

fn traverse_paths(fields: &[DataField], path: &[usize]) -> Result<DataField> {
if path.is_empty() {
return Err(ErrorCode::BadArguments("path should not be empty"));
}
let field = &fields[path[0]];
if path.len() == 1 {
return Ok(field.clone());
}

let field_name = field.name();
if let DataTypeImpl::Struct(struct_type) = &field.data_type() {
let inner_types = struct_type.types();
let inner_names = match struct_type.names() {
Some(inner_names) => inner_names
.iter()
.map(|name| format!("{}:{}", field_name, name.to_lowercase()))
.collect::<Vec<_>>(),
None => (0..inner_types.len())
.map(|i| format!("{}:{}", field_name, i))
.collect::<Vec<_>>(),
};

let inner_fields = inner_names
.iter()
.zip(inner_types.iter())
.map(|(inner_name, inner_type)| {
DataField::new(&inner_name.clone(), inner_type.clone())
})
.collect::<Vec<DataField>>();
return Self::traverse_paths(&inner_fields, &path[1..]);
}
let valid_fields: Vec<String> = fields.iter().map(|f| f.name().clone()).collect();
Err(ErrorCode::BadArguments(format!(
"Unable to get field paths. Valid fields: {:?}",
valid_fields
)))
}

/// project will do column pruning.
#[must_use]
pub fn project_by_fields(&self, fields: Vec<DataField>) -> Self {
Expand Down
1 change: 1 addition & 0 deletions common/planners/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,7 @@ pub use plan_list::ListPlan;
pub use plan_node::PlanNode;
pub use plan_node_builder::PlanBuilder;
pub use plan_node_extras::Extras;
pub use plan_node_extras::Projection;
pub use plan_node_rewriter::PlanRewriter;
pub use plan_node_rewriter::RewriteHelper;
pub use plan_node_stage::StageKind;
Expand Down
3 changes: 2 additions & 1 deletion common/planners/src/plan_delete.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use common_datavalues::DataSchemaRef;
use common_meta_app::schema::TableIdent;

use crate::Expression;
use crate::Projection;

#[derive(serde::Serialize, serde::Deserialize, Clone, Debug, PartialEq)]
pub struct DeletePlan {
Expand All @@ -27,7 +28,7 @@ pub struct DeletePlan {
pub table_name: String,
pub table_id: TableIdent,
pub selection: Option<Expression>,
pub projection: Vec<usize>,
pub projection: Projection,
}

impl DeletePlan {
Expand Down
44 changes: 43 additions & 1 deletion common/planners/src/plan_node_extras.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,55 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::BTreeMap;
use std::fmt::Debug;
use std::fmt::Formatter;

use crate::Expression;

#[derive(serde::Serialize, serde::Deserialize, Clone, PartialEq, Eq)]
pub enum Projection {
/// column indices of the table
Columns(Vec<usize>),
/// inner column indices for tuple data type with inner columns.
/// the key is the column_index of ColumnEntry.
/// the value is the path indices of inner columns.
InnerColumns(BTreeMap<usize, Vec<usize>>),
}

impl Projection {
pub fn len(&self) -> usize {
match self {
Projection::Columns(indices) => indices.len(),
Projection::InnerColumns(path_indices) => path_indices.len(),
}
}

pub fn is_empty(&self) -> bool {
match self {
Projection::Columns(indices) => indices.is_empty(),
Projection::InnerColumns(path_indices) => path_indices.is_empty(),
}
}
}

impl Debug for Projection {
fn fmt(&self, f: &mut Formatter) -> std::fmt::Result {
match self {
Projection::Columns(indices) => write!(f, "{:?}", indices),
Projection::InnerColumns(path_indices) => {
let paths: Vec<&Vec<usize>> = path_indices.values().collect();
write!(f, "{:?}", paths)
}
}
}
}

/// Extras is a wrapper for push down items.
#[derive(serde::Serialize, serde::Deserialize, Clone, Debug, PartialEq, Default)]
pub struct Extras {
/// Optional column indices to use as a projection
pub projection: Option<Vec<usize>>,
pub projection: Option<Projection>,
/// Optional filter expression plan
/// split_conjunctions by `and` operator
pub filters: Vec<Expression>,
Expand Down
7 changes: 4 additions & 3 deletions common/planners/src/plan_read_datasource.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ use common_meta_app::schema::TableInfo;
use crate::Expression;
use crate::Extras;
use crate::Partitions;
use crate::Projection;
use crate::StageTableInfo;
use crate::Statistics;

Expand Down Expand Up @@ -61,7 +62,7 @@ pub struct ReadDataSourcePlan {
/// Required fields to scan.
///
/// After optimization, only a sub set of the fields in `table_info.schema().fields` are needed.
/// The key is the index of the field in original `table_info.schema().fields`.
/// The key is the column_index of `ColumnEntry` in `Metadata`.
///
/// If it is None, one should use `table_info.schema().fields()`.
pub scan_fields: Option<BTreeMap<usize, DataField>>,
Expand Down Expand Up @@ -93,7 +94,7 @@ impl ReadDataSourcePlan {
.unwrap_or_else(|| self.source_info.schema().fields_map())
}

pub fn projections(&self) -> Vec<usize> {
pub fn projections(&self) -> Projection {
let default_proj = || {
(0..self.source_info.schema().fields().len())
.into_iter()
Expand All @@ -107,7 +108,7 @@ impl ReadDataSourcePlan {
{
prj.clone()
} else {
default_proj()
Projection::Columns(default_proj())
}
}
}
123 changes: 80 additions & 43 deletions common/storages/fuse/src/fuse_part.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,15 @@ use std::any::Any;
use std::collections::HashMap;
use std::sync::Arc;

use common_arrow::parquet::metadata::SchemaDescriptor;
use common_arrow::parquet::schema::types::ParquetType;
use common_arrow::arrow::datatypes::DataType as ArrowType;
use common_arrow::arrow::datatypes::Field as ArrowField;
use common_arrow::arrow::datatypes::Schema as ArrowSchema;
use common_exception::ErrorCode;
use common_exception::Result;
use common_fuse_meta::meta::Compression;
use common_planners::PartInfo;
use common_planners::PartInfoPtr;
use common_planners::Projection;

#[derive(serde::Serialize, serde::Deserialize, PartialEq, Eq)]
pub struct ColumnMeta {
Expand Down Expand Up @@ -93,11 +95,84 @@ impl FusePartInfo {
}
}

#[derive(Debug, Clone)]
pub struct ColumnLeaves {
pub column_leaves: Vec<ColumnLeaf>,
}

impl ColumnLeaves {
pub fn new_from_schema(schema: &ArrowSchema) -> Self {
let mut leaf_id = 0;
let mut column_leaves = Vec::with_capacity(schema.fields.len());

for field in &schema.fields {
let column_leaf = Self::traverse_fields_dfs(field, &mut leaf_id);
column_leaves.push(column_leaf);
}

Self { column_leaves }
}

fn traverse_fields_dfs(field: &ArrowField, leaf_id: &mut usize) -> ColumnLeaf {
match &field.data_type {
ArrowType::Struct(inner_fields) => {
let mut child_column_leaves = Vec::with_capacity(inner_fields.len());
let mut child_leaf_ids = Vec::with_capacity(inner_fields.len());
for inner_field in inner_fields {
let child_column_leaf = Self::traverse_fields_dfs(inner_field, leaf_id);
child_leaf_ids.extend(child_column_leaf.leaf_ids.clone());
child_column_leaves.push(child_column_leaf);
}
ColumnLeaf::new(field.clone(), child_leaf_ids, Some(child_column_leaves))
}
_ => {
let column_leaf = ColumnLeaf::new(field.clone(), vec![*leaf_id], None);
*leaf_id += 1;
column_leaf
}
}
}

pub fn get_by_projection<'a>(&'a self, proj: &'a Projection) -> Result<Vec<&ColumnLeaf>> {
let column_leaves = match proj {
Projection::Columns(indices) => indices
.iter()
.map(|idx| &self.column_leaves[*idx])
.collect(),
Projection::InnerColumns(path_indices) => {
let paths: Vec<&Vec<usize>> = path_indices.values().collect();
paths
.iter()
.map(|path| Self::traverse_path(&self.column_leaves, path).unwrap())
.collect()
}
};
Ok(column_leaves)
}

fn traverse_path<'a>(
column_leaves: &'a [ColumnLeaf],
path: &'a [usize],
) -> Result<&'a ColumnLeaf> {
let column_leaf = &column_leaves[path[0]];
if path.len() > 1 {
return match &column_leaf.children {
Some(ref children) => Self::traverse_path(children, &path[1..]),
None => Err(ErrorCode::LogicalError(format!(
"Cannot get column_leaf by path: {:?}",
path
))),
};
}
Ok(column_leaf)
}
}

/// `ColumnLeaf` contains all the leaf column ids of the column.
/// For the nested types, it may contain more than one leaf column.
#[derive(Debug, Clone)]
pub struct ColumnLeaf {
pub name: String,
pub field: ArrowField,
// `leaf_ids` is the indices of all the leaf columns in DFS order,
// through which we can find the meta information of the leaf columns.
pub leaf_ids: Vec<usize>,
Expand All @@ -106,49 +181,11 @@ pub struct ColumnLeaf {
}

impl ColumnLeaf {
pub fn new(name: String, leaf_ids: Vec<usize>, children: Option<Vec<ColumnLeaf>>) -> Self {
pub fn new(field: ArrowField, leaf_ids: Vec<usize>, children: Option<Vec<ColumnLeaf>>) -> Self {
Self {
name,
field,
leaf_ids,
children,
}
}
}

pub fn build_column_leaves(schema: &SchemaDescriptor) -> Vec<ColumnLeaf> {
let mut leaf_id = 0;
let mut column_leaves = Vec::with_capacity(schema.fields().len());

for field in schema.fields() {
let column_leaf = traverse_fields_dfs(field, &mut leaf_id);
column_leaves.push(column_leaf);
}

column_leaves.to_vec()
}

fn traverse_fields_dfs(field: &ParquetType, leaf_id: &mut usize) -> ColumnLeaf {
match field {
ParquetType::PrimitiveType(ty) => {
let column_leaf = ColumnLeaf::new(ty.field_info.name.clone(), vec![*leaf_id], None);
*leaf_id += 1;
column_leaf
}
ParquetType::GroupType {
field_info, fields, ..
} => {
let mut child_column_leaves = Vec::with_capacity(fields.len());
let mut child_leaf_ids = Vec::with_capacity(fields.len());
for field in fields {
let child_column_leaf = traverse_fields_dfs(field, leaf_id);
child_leaf_ids.extend(child_column_leaf.leaf_ids.clone());
child_column_leaves.push(child_column_leaf);
}
ColumnLeaf::new(
field_info.name.clone(),
child_leaf_ids,
Some(child_column_leaves),
)
}
}
}
Loading

0 comments on commit c1cd0bf

Please sign in to comment.