Skip to content

Commit

Permalink
Assign better names for projection expressions (lakehq#61)
Browse files Browse the repository at this point in the history
  • Loading branch information
linhr authored Jun 18, 2024
1 parent ad2e65a commit 4902161
Show file tree
Hide file tree
Showing 35 changed files with 995 additions and 501 deletions.
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ syn = "2.0.66"
quote = "1.0.36"
prettyplease = "0.2.20"
phf = { version = "0.11.2", features = ["macros"] }
ryu = "1.0.18"
#
# The versions of the following dependencies are managed manually.
#
Expand Down
20 changes: 12 additions & 8 deletions crates/framework-common/src/object.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,22 +2,26 @@ use std::any::Any;
use std::hash::{Hash, Hasher};

/// A trait that facilitates deriving `PartialEq`, `Eq`, and `Hash` for `dyn` trait objects.
/// Since `DynObject` has a blanket implementation, all method names are prefixed with `dyn_object_`
/// to avoid conflicts with similar methods defined by other traits.
/// Otherwise, for example, `x.as_any()` may unintentionally call the method defined by `DynObject`
/// when `DynObject` is in the scope.
pub trait DynObject: Any {
fn as_any(&self) -> &dyn Any;
fn dyn_eq(&self, other: &dyn Any) -> bool;
fn dyn_hash(&self, state: &mut dyn Hasher);
fn dyn_object_as_any(&self) -> &dyn Any;
fn dyn_object_eq(&self, other: &dyn Any) -> bool;
fn dyn_object_hash(&self, state: &mut dyn Hasher);
}

impl<T: PartialEq + Eq + Hash + 'static> DynObject for T {
fn as_any(&self) -> &dyn Any {
fn dyn_object_as_any(&self) -> &dyn Any {
self
}

fn dyn_eq(&self, other: &dyn Any) -> bool {
fn dyn_object_eq(&self, other: &dyn Any) -> bool {
other.downcast_ref::<Self>().map_or(false, |x| self == x)
}

fn dyn_hash(&self, mut state: &mut dyn Hasher) {
fn dyn_object_hash(&self, mut state: &mut dyn Hasher) {
self.hash(&mut state)
}
}
Expand All @@ -27,15 +31,15 @@ macro_rules! impl_dyn_object_traits {
($t:ident) => {
impl PartialEq<dyn $t> for dyn $t {
fn eq(&self, other: &dyn $t) -> bool {
self.dyn_eq(DynObject::as_any(other))
self.dyn_object_eq(DynObject::dyn_object_as_any(other))
}
}

impl Eq for dyn $t {}

impl Hash for dyn $t {
fn hash<H: Hasher>(&self, state: &mut H) {
self.dyn_hash(state)
self.dyn_object_hash(state)
}
}
};
Expand Down
4 changes: 2 additions & 2 deletions crates/framework-common/src/spec/data_type.rs
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ impl Into<Fields> for Vec<Field> {
}
}

#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub enum DayTimeIntervalField {
Day = 0,
Expand All @@ -132,7 +132,7 @@ impl TryFrom<i32> for DayTimeIntervalField {
}
}

#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub enum YearMonthIntervalField {
Year = 0,
Expand Down
2 changes: 2 additions & 0 deletions crates/framework-plan/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,3 +27,5 @@ arrow-cast = { workspace = true }
futures = { workspace = true }
comfy-table = { workspace = true }
html-escape = { workspace = true }
chrono = { workspace = true }
ryu = { workspace = true }
4 changes: 2 additions & 2 deletions crates/framework-plan/src/catalog/column.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,8 @@ impl<'a> CatalogManager<'a> {
let data_type = PlanResolver::unresolve_data_type(column.data_type().clone())?;
let data_type = self
.config
.data_type_formatter
.to_simple_string(data_type)?;
.plan_formatter
.data_type_to_simple_string(&data_type)?;
Ok(TableColumnMetadata::new(
column.name().clone(),
data_type,
Expand Down
38 changes: 6 additions & 32 deletions crates/framework-plan/src/config.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,7 @@
use crate::error::{PlanError, PlanResult};
use crate::formatter::{DefaultPlanFormatter, PlanFormatter};
use framework_common::config::{ConfigKeyValue, SparkUdfConfig};
use framework_common::object::DynObject;
use framework_common::{impl_dyn_object_traits, spec};
use std::fmt::Debug;
use std::hash::{Hash, Hasher};
use std::hash::Hash;
use std::sync::Arc;

#[derive(Debug, Clone, PartialEq, Eq, Hash)]
Expand All @@ -12,31 +10,16 @@ pub enum TimestampType {
TimestampNtz,
}

pub trait DataTypeFormatter: DynObject + Debug + Send + Sync {
fn to_simple_string(&self, data_type: spec::DataType) -> PlanResult<String>;
}

impl_dyn_object_traits!(DataTypeFormatter);

#[derive(Debug, PartialEq, Eq, Hash)]
struct DefaultDataTypeFormatter;

impl DataTypeFormatter for DefaultDataTypeFormatter {
fn to_simple_string(&self, _data_type: spec::DataType) -> PlanResult<String> {
Err(PlanError::unsupported("default data type formatter"))
}
}

// The generic type parameter is used to work around the issue deriving `PartialEq` for `dyn` trait.
// See also: https://github.com/rust-lang/rust/issues/78808
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct PlanConfig<F: ?Sized = dyn DataTypeFormatter> {
pub struct PlanConfig<F: ?Sized = dyn PlanFormatter> {
/// The time zone of the session.
pub time_zone: String,
/// The default timestamp type.
pub timestamp_type: TimestampType,
/// The data type formatter.
pub data_type_formatter: Arc<F>,
/// The plan formatter.
pub plan_formatter: Arc<F>,
// TODO: Revisit how to handle spark_udf_config
// https://github.com/lakehq/framework/pull/53#discussion_r1643683600
pub spark_udf_config: SparkUdfConfig,
Expand All @@ -47,7 +30,7 @@ impl Default for PlanConfig {
Self {
time_zone: "UTC".to_string(),
timestamp_type: TimestampType::TimestampLtz,
data_type_formatter: Arc::new(DefaultDataTypeFormatter),
plan_formatter: Arc::new(DefaultPlanFormatter),
spark_udf_config: SparkUdfConfig {
timezone: ConfigKeyValue {
key: "spark.sql.session.timeZone".to_string(),
Expand All @@ -74,12 +57,3 @@ impl Default for PlanConfig {
}
}
}

impl PlanConfig {
pub fn with_data_type_formatter(self, data_type_formatter: Arc<dyn DataTypeFormatter>) -> Self {
Self {
data_type_formatter,
..self
}
}
}
21 changes: 5 additions & 16 deletions crates/framework-plan/src/extension/physical/planner.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use crate::extension::logical::{RangeNode, ShowStringNode, SortWithinPartitionsNode};
use crate::extension::physical::range::RangeExec;
use crate::extension::physical::show_string::ShowStringExec;
use crate::utils::ItemTaker;
use async_trait::async_trait;
use datafusion::execution::context::SessionState;
use datafusion::physical_plan::sorts::sort::SortExec;
Expand All @@ -10,22 +11,6 @@ use datafusion_common::{internal_err, Result};
use datafusion_expr::{LogicalPlan, UserDefinedLogicalNode};
use std::sync::Arc;

trait NodeContainer {
type Item;

fn one(&self) -> Result<Self::Item>;
}

impl NodeContainer for &[Arc<dyn ExecutionPlan>] {
type Item = Arc<dyn ExecutionPlan>;
fn one(&self) -> Result<Arc<dyn ExecutionPlan>> {
if self.len() != 1 {
return internal_err!("expecting one execution plan input: {:?}", self);
}
Ok(self[0].clone())
}
}

pub(crate) struct ExtensionPhysicalPlanner {}

#[async_trait]
Expand All @@ -38,6 +23,10 @@ impl ExtensionPlanner for ExtensionPhysicalPlanner {
physical_inputs: &[Arc<dyn ExecutionPlan>],
session_state: &SessionState,
) -> Result<Option<Arc<dyn ExecutionPlan>>> {
let physical_inputs = physical_inputs
.iter()
.map(|x| x.clone())
.collect::<Vec<_>>();
let plan: Arc<dyn ExecutionPlan> =
if let Some(node) = node.as_any().downcast_ref::<RangeNode>() {
Arc::new(RangeExec::new(
Expand Down
Loading

0 comments on commit 4902161

Please sign in to comment.