Skip to content

Commit

Permalink
Upgrade DataFusion to 39.0 (lakehq#65)
Browse files Browse the repository at this point in the history
  • Loading branch information
linhr authored Jun 18, 2024
1 parent 7fb619f commit 59b4400
Show file tree
Hide file tree
Showing 23 changed files with 231 additions and 282 deletions.
224 changes: 95 additions & 129 deletions Cargo.lock

Large diffs are not rendered by default.

14 changes: 7 additions & 7 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -55,19 +55,19 @@ ryu = "1.0.18"
#
# The versions of the following dependencies are managed manually.
#
datafusion = { version = "38.0.0", features = ["serde"] }
datafusion-common = { version = "38.0.0", features = ["object_store"] }
datafusion-expr = "38.0.0"
datafusion-functions-array = "38.0.0"
datafusion = { version = "39.0.0", features = ["serde"] }
datafusion-common = { version = "39.0.0", features = ["object_store"] }
datafusion-expr = "39.0.0"
datafusion-functions-array = "39.0.0"
# auto-initialize: Changes [`Python::with_gil`] to automatically initialize the Python interpreter if needed.
# 0.21 breaks when datafusion has pyarrow enabled
pyo3 = { version = "0.21.2", features = ["auto-initialize", "serde"] }
# Should be the equivalent of enabling the pyarrow feature in datafusion since we already have pyo3 in the workspace
arrow = { version = "51.0.0", features = ["ffi"] }
arrow-cast = { version = "51.0.0" }
arrow = { version = "52.0.0", features = ["ffi"] }
arrow-cast = { version = "52.0.0" }
# We use a patched latest version of sqlparser. The version may be different from the one used in DataFusion.
sqlparser = { version = "0.46.0", features = ["serde", "visitor"] }
serde_arrow = { version = "0.11", features = ["arrow-51"] }
serde_arrow = { version = "0.11", features = ["arrow-52"] }

[patch.crates-io]
# Override dependencies to use our forked versions.
Expand Down
1 change: 1 addition & 0 deletions crates/framework-plan/src/extension/analyzer/explode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use std::sync::Arc;
use datafusion::arrow::datatypes::DataType;
use datafusion::common::tree_node::{Transformed, TreeNode, TreeNodeRewriter};
use datafusion::common::{Column, DataFusionError, Result, UnnestOptions};
use datafusion::functions::core::expr_ext::FieldAccessor;
use datafusion::logical_expr::builder::unnest_with_options;
use datafusion::logical_expr::{
col, Expr, ExprSchemable, LogicalPlan, Projection, ScalarUDF, ScalarUDFImpl,
Expand Down
7 changes: 2 additions & 5 deletions crates/framework-plan/src/extension/analyzer/mod.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use std::sync::Arc;

use datafusion::logical_expr::expr::ScalarFunction;
use datafusion::logical_expr::{Expr, ScalarFunctionDefinition, ScalarUDF};
use datafusion::logical_expr::{Expr, ScalarUDF};

pub(crate) mod alias;
pub(crate) mod explode;
Expand All @@ -10,10 +10,7 @@ pub(crate) mod window;

fn expr_to_udf(expr: &Expr) -> Option<(&Arc<ScalarUDF>, &Vec<Expr>)> {
match expr {
Expr::ScalarFunction(ScalarFunction {
func_def: ScalarFunctionDefinition::UDF(udf),
args,
}) => Some((udf, args)),
Expr::ScalarFunction(ScalarFunction { func: udf, args }) => Some((udf, args)),
_ => None,
}
}
2 changes: 1 addition & 1 deletion crates/framework-plan/src/extension/analyzer/wildcard.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ pub(crate) fn rewrite_wildcard(
Expr::Wildcard {
qualifier: Some(qualifier),
} => projected.extend(expand_qualified_wildcard(&qualifier, schema, None)?),
_ => projected.push(columnize_expr(normalize_col(e, &input)?, schema)),
_ => projected.push(columnize_expr(normalize_col(e, &input)?, &input)?),
}
}
Ok((input, projected))
Expand Down
7 changes: 5 additions & 2 deletions crates/framework-plan/src/extension/logical/command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ use crate::catalog::function::FunctionMetadata;
use crate::catalog::table::TableMetadata;
use crate::catalog::{CatalogManager, EmptyMetadata, SingleValueMetadata};
use crate::config::PlanConfig;
use crate::utils::ItemTaker;

#[derive(Clone, Debug, Eq, PartialEq, Hash)]
pub(crate) struct CatalogCommandNode {
Expand Down Expand Up @@ -379,7 +380,9 @@ impl UserDefinedLogicalNodeCore for CatalogCommandNode {
write!(f, "{}", self.name)
}

fn from_template(&self, _: &[Expr], _: &[LogicalPlan]) -> Self {
self.clone()
fn with_exprs_and_inputs(&self, exprs: Vec<Expr>, inputs: Vec<LogicalPlan>) -> Result<Self> {
exprs.zero()?;
inputs.zero()?;
Ok(self.clone())
}
}
8 changes: 6 additions & 2 deletions crates/framework-plan/src/extension/logical/range.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ use datafusion::common::{DFSchema, DFSchemaRef, Result};
use datafusion::logical_expr::{Expr, LogicalPlan, UserDefinedLogicalNodeCore};
use datafusion_common::plan_err;

use crate::utils::ItemTaker;

#[derive(Clone, Debug, Eq, PartialEq, Hash)]
pub(crate) struct Range {
pub start: i64,
Expand Down Expand Up @@ -127,7 +129,9 @@ impl UserDefinedLogicalNodeCore for RangeNode {
)
}

fn from_template(&self, _: &[Expr], _: &[LogicalPlan]) -> Self {
self.clone()
fn with_exprs_and_inputs(&self, exprs: Vec<Expr>, inputs: Vec<LogicalPlan>) -> Result<Self> {
exprs.zero()?;
inputs.zero()?;
Ok(self.clone())
}
}
12 changes: 7 additions & 5 deletions crates/framework-plan/src/extension/logical/show_string.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ use comfy_table::{Cell, CellAlignment, ColumnConstraint, Table, Width};
use datafusion_common::{DFSchema, DFSchemaRef, Result};
use datafusion_expr::{Expr, LogicalPlan, UserDefinedLogicalNodeCore};

use crate::utils::ItemTaker;

fn escape_meta_characters(s: &str) -> String {
s.replace('\n', "\\\\n")
.replace('\r', "\\\\r")
Expand Down Expand Up @@ -268,13 +270,13 @@ impl UserDefinedLogicalNodeCore for ShowStringNode {
write!(f, "ShowString")
}

fn from_template(&self, _: &[Expr], input: &[LogicalPlan]) -> Self {
assert_eq!(input.len(), 1);
Self {
input: Arc::new(input[0].clone()),
fn with_exprs_and_inputs(&self, exprs: Vec<Expr>, inputs: Vec<LogicalPlan>) -> Result<Self> {
exprs.zero()?;
Ok(Self {
input: Arc::new(inputs.one()?),
limit: self.limit,
format: self.format.clone(),
schema: self.schema.clone(),
}
})
}
}
15 changes: 8 additions & 7 deletions crates/framework-plan/src/extension/logical/sort.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
use std::fmt::Formatter;
use std::sync::Arc;

use datafusion_common::DFSchemaRef;
use datafusion_common::{DFSchemaRef, Result};
use datafusion_expr::{Expr, LogicalPlan, UserDefinedLogicalNodeCore};

use crate::utils::ItemTaker;

#[derive(Clone, Debug, Eq, PartialEq, Hash)]
pub(crate) struct SortWithinPartitionsNode {
pub input: Arc<LogicalPlan>,
Expand Down Expand Up @@ -48,12 +50,11 @@ impl UserDefinedLogicalNodeCore for SortWithinPartitionsNode {
Ok(())
}

fn from_template(&self, exprs: &[Expr], input: &[LogicalPlan]) -> Self {
assert_eq!(input.len(), 1);
Self {
input: Arc::new(input[0].clone()),
expr: exprs.to_vec(),
fn with_exprs_and_inputs(&self, exprs: Vec<Expr>, inputs: Vec<LogicalPlan>) -> Result<Self> {
Ok(Self {
input: Arc::new(inputs.one()?),
expr: exprs,
fetch: self.fetch,
}
})
}
}
2 changes: 1 addition & 1 deletion crates/framework-plan/src/extension/physical/range.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ impl ExecutionPlan for RangeExec {
&self.cache
}

fn children(&self) -> Vec<Arc<dyn ExecutionPlan>> {
fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
vec![]
}

Expand Down
4 changes: 2 additions & 2 deletions crates/framework-plan/src/extension/physical/show_string.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,8 +71,8 @@ impl ExecutionPlan for ShowStringExec {
vec![false]
}

fn children(&self) -> Vec<Arc<dyn ExecutionPlan>> {
vec![self.input.clone()]
fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
vec![&self.input]
}

fn with_new_children(
Expand Down
39 changes: 24 additions & 15 deletions crates/framework-plan/src/function/aggregate.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
use std::collections::HashMap;
use std::sync::Arc;

use datafusion::functions_aggregate::{covariance, first_last, median, sum, variance};
use datafusion_expr::expr::AggregateFunctionDefinition;
use datafusion_expr::{expr, AggregateFunction};
use datafusion_expr::{expr, AggregateFunction, AggregateUDF};
use lazy_static::lazy_static;

use crate::error::{PlanError, PlanResult};
Expand All @@ -14,12 +16,16 @@ lazy_static! {
struct AggregateFunctionBuilder;

impl AggregateFunctionBuilder {
fn unknown(name: &str) -> AggregateFunctionDefinition {
AggregateFunctionDefinition::Name(name.into())
fn unknown(_name: &str) -> Option<AggregateFunctionDefinition> {
None
}

fn agg(f: AggregateFunction) -> AggregateFunctionDefinition {
AggregateFunctionDefinition::BuiltIn(f)
fn agg(f: AggregateFunction) -> Option<AggregateFunctionDefinition> {
Some(AggregateFunctionDefinition::BuiltIn(f))
}

fn udaf(f: Arc<AggregateUDF>) -> Option<AggregateFunctionDefinition> {
Some(AggregateFunctionDefinition::UDF(f))
}
}

Expand All @@ -46,23 +52,23 @@ fn list_built_in_aggregate_functions() -> Vec<(&'static str, AggregateFunctionDe
("count", F::agg(AggregateFunction::Count)),
("count_if", F::unknown("count_if")),
("count_min_sketch", F::unknown("count_min_sketch")),
("covar_pop", F::agg(AggregateFunction::CovariancePop)),
("covar_samp", F::unknown("covar_samp")),
("covar_pop", F::udaf(covariance::covar_pop_udaf())),
("covar_samp", F::udaf(covariance::covar_samp_udaf())),
("every", F::agg(AggregateFunction::BoolAnd)),
("first", F::agg(AggregateFunction::FirstValue)),
("first_value", F::agg(AggregateFunction::FirstValue)),
("first", F::udaf(first_last::first_value_udaf())),
("first_value", F::udaf(first_last::first_value_udaf())),
("grouping", F::agg(AggregateFunction::Grouping)),
("grouping_id", F::unknown("grouping_id")),
("histogram_numeric", F::unknown("histogram_numeric")),
("hll_sketch_agg", F::unknown("hll_sketch_agg")),
("hll_union_agg", F::unknown("hll_union_agg")),
("kurtosis", F::unknown("kurtosis")),
("last", F::agg(AggregateFunction::LastValue)),
("last_value", F::agg(AggregateFunction::LastValue)),
("last", F::udaf(first_last::last_value_udaf())),
("last_value", F::udaf(first_last::last_value_udaf())),
("max", F::agg(AggregateFunction::Max)),
("max_by", F::unknown("max_by")),
("mean", F::agg(AggregateFunction::Avg)),
("median", F::agg(AggregateFunction::Median)),
("median", F::udaf(median::median_udaf())),
("min", F::agg(AggregateFunction::Min)),
("min_by", F::unknown("min_by")),
("mode", F::unknown("mode")),
Expand All @@ -83,13 +89,16 @@ fn list_built_in_aggregate_functions() -> Vec<(&'static str, AggregateFunctionDe
("stddev", F::agg(AggregateFunction::Stddev)),
("stddev_pop", F::agg(AggregateFunction::StddevPop)),
("stddev_samp", F::agg(AggregateFunction::Stddev)),
("sum", F::agg(AggregateFunction::Sum)),
("sum", F::udaf(sum::sum_udaf())),
("try_avg", F::unknown("try_avg")),
("try_sum", F::unknown("try_sum")),
("var_pop", F::agg(AggregateFunction::VariancePop)),
("var_samp", F::agg(AggregateFunction::Variance)),
("variance", F::agg(AggregateFunction::Variance)),
("var_samp", F::udaf(variance::var_samp_udaf())),
("variance", F::udaf(variance::var_samp_udaf())),
]
.into_iter()
.filter_map(|(name, f)| f.map(|f| (name, f)))
.collect()
}

pub(crate) fn get_built_in_aggregate_function(
Expand Down
12 changes: 4 additions & 8 deletions crates/framework-plan/src/function/common.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,7 @@
use std::sync::Arc;

use arrow::datatypes::DataType;
use datafusion_expr::{
expr, BinaryExpr, Operator, ScalarFunctionDefinition, ScalarUDF, ScalarUDFImpl,
};
use datafusion_expr::{expr, BinaryExpr, Operator, ScalarUDF, ScalarUDFImpl};

use crate::error::{PlanError, PlanResult};
use crate::utils::ItemTaker;
Expand Down Expand Up @@ -81,10 +79,10 @@ impl FunctionBuilder {
where
F: ScalarUDFImpl + Send + Sync + 'static,
{
let func_def = ScalarFunctionDefinition::UDF(Arc::new(ScalarUDF::from(f)));
let func = Arc::new(ScalarUDF::from(f));
Arc::new(move |args| {
Ok(expr::Expr::ScalarFunction(expr::ScalarFunction {
func_def: func_def.clone(),
func: func.clone(),
args,
}))
})
Expand All @@ -97,9 +95,7 @@ impl FunctionBuilder {
{
Arc::new(move |args| {
Ok(expr::Expr::ScalarFunction(expr::ScalarFunction {
func_def: ScalarFunctionDefinition::UDF(Arc::new(ScalarUDF::from(
f(args.clone())?,
))),
func: Arc::new(ScalarUDF::from(f(args.clone())?)),
args,
}))
})
Expand Down
6 changes: 3 additions & 3 deletions crates/framework-plan/src/function/scalar/string.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use std::sync::Arc;
use datafusion::functions;
use datafusion::functions::expr_fn;
use datafusion_common::ScalarValue;
use datafusion_expr::{expr, ScalarFunctionDefinition, ScalarUDF};
use datafusion_expr::{expr, ScalarUDF};

use crate::error::{PlanError, PlanResult};
use crate::extension::function::contains::Contains;
Expand All @@ -18,9 +18,9 @@ fn regexp_replace(mut args: Vec<expr::Expr>) -> PlanResult<expr::Expr> {
"g".to_string(),
))));
Ok(expr::Expr::ScalarFunction(expr::ScalarFunction {
func_def: ScalarFunctionDefinition::UDF(Arc::new(ScalarUDF::from(
func: Arc::new(ScalarUDF::from(
functions::regex::regexpreplace::RegexpReplaceFunc::new(),
))),
)),
args,
}))
}
Expand Down
5 changes: 2 additions & 3 deletions crates/framework-plan/src/function/window.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use datafusion::functions_aggregate::sum;
use datafusion_expr::{expr, AggregateFunction, BuiltInWindowFunction};

use crate::error::{PlanError, PlanResult};
Expand All @@ -16,9 +17,7 @@ pub(crate) fn get_built_in_window_function(
"max" => Ok(expr::WindowFunctionDefinition::AggregateFunction(
AggregateFunction::Max,
)),
"sum" => Ok(expr::WindowFunctionDefinition::AggregateFunction(
AggregateFunction::Sum,
)),
"sum" => Ok(expr::WindowFunctionDefinition::AggregateUDF(sum::sum_udaf())),
"count" => Ok(expr::WindowFunctionDefinition::AggregateFunction(
AggregateFunction::Count,
)),
Expand Down
Loading

0 comments on commit 59b4400

Please sign in to comment.