Skip to content

Commit

Permalink
ARROW-4628: [Rust] [DataFusion] Implement type coercion query optimiz…
Browse files Browse the repository at this point in the history
…er rule

This PR refactors the existing type coercion logic, to remove it from the SQL query planner and into an optimizer rule and also makes it more complete, with improved unit tests.

It also converts more functions to return Result instead of using unwrap and removes some dead code and removes some duplicated code by introducing a new `utils` file in `optimizer` module.

Author: Andy Grove <[email protected]>

Closes apache#3939 from andygrove/type_coercion and squashes the following commits:

d94d8fd <Andy Grove> use correct error type
f8b25e9 <Andy Grove> manual merge with latest from master
30ef339 <Andy Grove> rebase
9844792 <Andy Grove> Roll back some changes to reduce scope of PR
1a76ca3 <Andy Grove> Implement type coercion optimizer rule
  • Loading branch information
andygrove authored and kszucs committed Mar 24, 2019
1 parent 7901f8e commit 483fd8c
Show file tree
Hide file tree
Showing 8 changed files with 321 additions and 74 deletions.
12 changes: 10 additions & 2 deletions rust/datafusion/src/execution/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ use crate::execution::relation::{DataSourceRelation, Relation};
use crate::logicalplan::*;
use crate::optimizer::optimizer::OptimizerRule;
use crate::optimizer::projection_push_down::ProjectionPushDown;
use crate::optimizer::type_coercion::TypeCoercionRule;
use crate::optimizer::utils;
use crate::sql::parser::{DFASTNode, DFParser};
use crate::sql::planner::{SchemaProvider, SqlToRel};
Expand Down Expand Up @@ -107,8 +108,15 @@ impl ExecutionContext {

/// Optimize the logical plan by applying optimizer rules
fn optimize(&self, plan: &LogicalPlan) -> Result<Arc<LogicalPlan>> {
let mut rule = ProjectionPushDown::new();
Ok(rule.optimize(plan)?)
let rules: Vec<Box<OptimizerRule>> = vec![
Box::new(ProjectionPushDown::new()),
Box::new(TypeCoercionRule::new()),
];
let mut plan = Arc::new(plan.clone());
for mut rule in rules {
plan = rule.optimize(&plan)?;
}
Ok(plan)
}

/// Execute a logical plan and produce a Relation (a schema-aware iterator over a series
Expand Down
25 changes: 13 additions & 12 deletions rust/datafusion/src/logicalplan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -450,53 +450,54 @@ impl fmt::Debug for LogicalPlan {
}
}

pub fn can_coerce_from(left: &DataType, other: &DataType) -> bool {
pub fn can_coerce_from(type_into: &DataType, type_from: &DataType) -> bool {
use self::DataType::*;
match left {
Int8 => match other {
match type_into {
Int8 => match type_from {
Int8 => true,
_ => false,
},
Int16 => match other {
Int16 => match type_from {
Int8 | Int16 => true,
_ => false,
},
Int32 => match other {
Int32 => match type_from {
Int8 | Int16 | Int32 => true,
_ => false,
},
Int64 => match other {
Int64 => match type_from {
Int8 | Int16 | Int32 | Int64 => true,
_ => false,
},
UInt8 => match other {
UInt8 => match type_from {
UInt8 => true,
_ => false,
},
UInt16 => match other {
UInt16 => match type_from {
UInt8 | UInt16 => true,
_ => false,
},
UInt32 => match other {
UInt32 => match type_from {
UInt8 | UInt16 | UInt32 => true,
_ => false,
},
UInt64 => match other {
UInt64 => match type_from {
UInt8 | UInt16 | UInt32 | UInt64 => true,
_ => false,
},
Float32 => match other {
Float32 => match type_from {
Int8 | Int16 | Int32 | Int64 => true,
UInt8 | UInt16 | UInt32 | UInt64 => true,
Float32 => true,
_ => false,
},
Float64 => match other {
Float64 => match type_from {
Int8 | Int16 | Int32 | Int64 => true,
UInt8 | UInt16 | UInt32 | UInt64 => true,
Float32 | Float64 => true,
_ => false,
},
Utf8 => true,
_ => false,
}
}
Expand Down
1 change: 1 addition & 0 deletions rust/datafusion/src/optimizer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,4 +19,5 @@
pub mod optimizer;
pub mod projection_push_down;
pub mod type_coercion;
pub mod utils;
2 changes: 1 addition & 1 deletion rust/datafusion/src/optimizer/optimizer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@

//! Query optimizer traits
use crate::error::Result;
use crate::logicalplan::LogicalPlan;
use arrow::error::Result;
use std::sync::Arc;

/// An optimizer rules performs a transformation on a logical plan to produce an optimized logical plan.
Expand Down
42 changes: 11 additions & 31 deletions rust/datafusion/src/optimizer/projection_push_down.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,12 @@
//! Projection Push Down optimizer rule ensures that only referenced columns are
//! loaded into memory
use crate::error::{ExecutionError, Result};
use crate::logicalplan::Expr;
use crate::logicalplan::LogicalPlan;
use crate::optimizer::optimizer::OptimizerRule;
use crate::optimizer::utils;
use arrow::datatypes::{Field, Schema};
use arrow::error::{ArrowError, Result};
use std::collections::{HashMap, HashSet};
use std::sync::Arc;

Expand Down Expand Up @@ -56,7 +57,7 @@ impl ProjectionPushDown {
schema,
} => {
// collect all columns referenced by projection expressions
self.collect_exprs(&expr, accum);
utils::exprlist_to_column_indices(&expr, accum);

// push projection down
let input = self.optimize_plan(&input, accum, mapping)?;
Expand All @@ -72,7 +73,7 @@ impl ProjectionPushDown {
}
LogicalPlan::Selection { expr, input } => {
// collect all columns referenced by filter expression
self.collect_expr(expr, accum);
utils::expr_to_column_indices(expr, accum);

// push projection down
let input = self.optimize_plan(&input, accum, mapping)?;
Expand All @@ -92,8 +93,8 @@ impl ProjectionPushDown {
schema,
} => {
// collect all columns referenced by grouping and aggregate expressions
self.collect_exprs(&group_expr, accum);
self.collect_exprs(&aggr_expr, accum);
utils::exprlist_to_column_indices(&group_expr, accum);
utils::exprlist_to_column_indices(&aggr_expr, accum);

// push projection down
let input = self.optimize_plan(&input, accum, mapping)?;
Expand All @@ -115,7 +116,7 @@ impl ProjectionPushDown {
schema,
} => {
// collect all columns referenced by sort expressions
self.collect_exprs(&expr, accum);
utils::exprlist_to_column_indices(&expr, accum);

// push projection down
let input = self.optimize_plan(&input, accum, mapping)?;
Expand Down Expand Up @@ -161,7 +162,9 @@ impl ProjectionPushDown {
// can rewrite expressions as we walk back up the tree

if mapping.len() != 0 {
return Err(ArrowError::ComputeError("illegal state".to_string()));
return Err(ExecutionError::InternalError(
"illegal state".to_string(),
));
}

for i in 0..schema.fields().len() {
Expand Down Expand Up @@ -190,29 +193,6 @@ impl ProjectionPushDown {
}
}

fn collect_exprs(&self, expr: &Vec<Expr>, accum: &mut HashSet<usize>) {
expr.iter().for_each(|e| self.collect_expr(e, accum));
}

fn collect_expr(&self, expr: &Expr, accum: &mut HashSet<usize>) {
match expr {
Expr::Column(i) => {
accum.insert(*i);
}
Expr::Literal(_) => { /* not needed */ }
Expr::IsNull(e) => self.collect_expr(e, accum),
Expr::IsNotNull(e) => self.collect_expr(e, accum),
Expr::BinaryExpr { left, right, .. } => {
self.collect_expr(left, accum);
self.collect_expr(right, accum);
}
Expr::Cast { expr, .. } => self.collect_expr(expr, accum),
Expr::Sort { expr, .. } => self.collect_expr(expr, accum),
Expr::AggregateFunction { args, .. } => self.collect_exprs(args, accum),
Expr::ScalarFunction { args, .. } => self.collect_exprs(args, accum),
}
}

fn rewrite_exprs(
&self,
expr: &Vec<Expr>,
Expand Down Expand Up @@ -269,7 +249,7 @@ impl ProjectionPushDown {
fn new_index(&self, mapping: &HashMap<usize, usize>, i: &usize) -> Result<usize> {
match mapping.get(i) {
Some(j) => Ok(*j),
_ => Err(ArrowError::ComputeError(
_ => Err(ExecutionError::InternalError(
"Internal error computing new column index".to_string(),
)),
}
Expand Down
Loading

0 comments on commit 483fd8c

Please sign in to comment.