forked from GreptimeTeam/greptimedb
-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
fix(timestamp): add trim for the input date string (GreptimeTeam#2078)
* fix(timestamp): add trim for the input date string * fix(timestamp): add analyzer rule to trim strings before conversion * fix: adjust according to CR
- Loading branch information
Showing
11 changed files
with
218 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -13,4 +13,5 @@ | |
// limitations under the License. | ||
|
||
pub mod order_hint; | ||
pub mod string_normalization; | ||
pub mod type_conversion; |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,167 @@ | ||
// Copyright 2023 Greptime Team | ||
// | ||
// Licensed under the Apache License, Version 2.0 (the "License"); | ||
// you may not use this file except in compliance with the License. | ||
// You may obtain a copy of the License at | ||
// | ||
// http://www.apache.org/licenses/LICENSE-2.0 | ||
// | ||
// Unless required by applicable law or agreed to in writing, software | ||
// distributed under the License is distributed on an "AS IS" BASIS, | ||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
// See the License for the specific language governing permissions and | ||
// limitations under the License. | ||
|
||
use arrow_schema::DataType; | ||
use datafusion::config::ConfigOptions; | ||
use datafusion::logical_expr::expr::Cast; | ||
use datafusion_common::tree_node::{Transformed, TreeNode, TreeNodeRewriter}; | ||
use datafusion_common::{Result, ScalarValue}; | ||
use datafusion_expr::{Expr, LogicalPlan}; | ||
use datafusion_optimizer::analyzer::AnalyzerRule; | ||
|
||
/// StringNormalizationRule normalizes(trims) string values in logical plan. | ||
/// Mainly used for timestamp trimming | ||
pub struct StringNormalizationRule; | ||
|
||
impl AnalyzerRule for StringNormalizationRule { | ||
fn analyze(&self, plan: LogicalPlan, _config: &ConfigOptions) -> Result<LogicalPlan> { | ||
plan.transform(&|plan| { | ||
let mut converter = StringNormalizationConverter; | ||
let inputs = plan.inputs().into_iter().cloned().collect::<Vec<_>>(); | ||
let expr = plan | ||
.expressions() | ||
.into_iter() | ||
.map(|e| e.rewrite(&mut converter)) | ||
.collect::<Result<Vec<_>>>()?; | ||
datafusion_expr::utils::from_plan(&plan, &expr, &inputs).map(Transformed::Yes) | ||
}) | ||
} | ||
|
||
fn name(&self) -> &str { | ||
"StringNormalizationRule" | ||
} | ||
} | ||
|
||
struct StringNormalizationConverter; | ||
|
||
impl TreeNodeRewriter for StringNormalizationConverter { | ||
type N = Expr; | ||
|
||
/// remove extra whitespaces from the String value when | ||
/// there is a CAST from a String to Timestamp. | ||
/// Otherwise - no modifications applied | ||
fn mutate(&mut self, expr: Expr) -> Result<Expr> { | ||
let new_expr = match expr { | ||
Expr::Cast(Cast { expr, data_type }) => { | ||
let expr = match data_type { | ||
DataType::Timestamp(_, _) => match *expr { | ||
Expr::Literal(value) => match value { | ||
ScalarValue::Utf8(Some(s)) => trim_utf_expr(s), | ||
_ => Expr::Literal(value), | ||
}, | ||
expr => expr, | ||
}, | ||
_ => *expr, | ||
}; | ||
Expr::Cast(Cast { | ||
expr: Box::new(expr), | ||
data_type, | ||
}) | ||
} | ||
expr => expr, | ||
}; | ||
Ok(new_expr) | ||
} | ||
} | ||
|
||
fn trim_utf_expr(s: String) -> Expr { | ||
let parts: Vec<_> = s.split_whitespace().collect(); | ||
let trimmed = parts.join(" "); | ||
Expr::Literal(ScalarValue::Utf8(Some(trimmed))) | ||
} | ||
|
||
#[cfg(test)] | ||
mod tests { | ||
use std::sync::Arc; | ||
|
||
use arrow::datatypes::TimeUnit::{Microsecond, Millisecond, Nanosecond, Second}; | ||
use arrow::datatypes::{DataType, SchemaRef}; | ||
use arrow_schema::{Field, Schema, TimeUnit}; | ||
use datafusion::datasource::{provider_as_source, MemTable}; | ||
use datafusion_common::config::ConfigOptions; | ||
use datafusion_expr::{lit, Cast, Expr, LogicalPlan, LogicalPlanBuilder}; | ||
use datafusion_optimizer::analyzer::AnalyzerRule; | ||
|
||
use crate::optimizer::string_normalization::StringNormalizationRule; | ||
|
||
#[test] | ||
fn test_normalization_for_string_with_extra_whitespaces_to_timestamp_cast() { | ||
let timestamp_str_with_whitespaces = " 2017-07-23 13:10:11 "; | ||
let config = &ConfigOptions::default(); | ||
let projects = vec![ | ||
create_timestamp_cast_project(Nanosecond, timestamp_str_with_whitespaces), | ||
create_timestamp_cast_project(Microsecond, timestamp_str_with_whitespaces), | ||
create_timestamp_cast_project(Millisecond, timestamp_str_with_whitespaces), | ||
create_timestamp_cast_project(Second, timestamp_str_with_whitespaces), | ||
]; | ||
for (time_unit, proj) in projects { | ||
let plan = create_test_plan_with_project(proj); | ||
let result = StringNormalizationRule.analyze(plan, config).unwrap(); | ||
let expected = format!("Projection: CAST(Utf8(\"2017-07-23 13:10:11\") AS Timestamp({:#?}, None))\n TableScan: t", | ||
time_unit | ||
); | ||
assert_eq!(expected, format!("{:?}", result)); | ||
} | ||
} | ||
|
||
#[test] | ||
fn test_normalization_for_non_timestamp_casts() { | ||
let config = &ConfigOptions::default(); | ||
let proj_int_to_timestamp = vec![Expr::Cast(Cast::new( | ||
Box::new(lit(158412331400600000_i64)), | ||
DataType::Timestamp(Nanosecond, None), | ||
))]; | ||
let int_to_timestamp_plan = create_test_plan_with_project(proj_int_to_timestamp); | ||
let result = StringNormalizationRule | ||
.analyze(int_to_timestamp_plan, config) | ||
.unwrap(); | ||
let expected = String::from( | ||
"Projection: CAST(Int64(158412331400600000) AS Timestamp(Nanosecond, None))\n TableScan: t" | ||
); | ||
assert_eq!(expected, format!("{:?}", result)); | ||
|
||
let proj_string_to_int = vec![Expr::Cast(Cast::new( | ||
Box::new(lit(" 5 ")), | ||
DataType::Int32, | ||
))]; | ||
let string_to_int_plan = create_test_plan_with_project(proj_string_to_int); | ||
let result = StringNormalizationRule | ||
.analyze(string_to_int_plan, &ConfigOptions::default()) | ||
.unwrap(); | ||
let expected = String::from("Projection: CAST(Utf8(\" 5 \") AS Int32)\n TableScan: t"); | ||
assert_eq!(expected, format!("{:?}", result)); | ||
} | ||
|
||
fn create_test_plan_with_project(proj: Vec<Expr>) -> LogicalPlan { | ||
prepare_test_plan_builder() | ||
.project(proj) | ||
.unwrap() | ||
.build() | ||
.unwrap() | ||
} | ||
|
||
fn create_timestamp_cast_project(unit: TimeUnit, timestamp_str: &str) -> (TimeUnit, Vec<Expr>) { | ||
let proj = vec![Expr::Cast(Cast::new( | ||
Box::new(lit(timestamp_str)), | ||
DataType::Timestamp(unit.clone(), None), | ||
))]; | ||
(unit, proj) | ||
} | ||
|
||
fn prepare_test_plan_builder() -> LogicalPlanBuilder { | ||
let schema = Schema::new(vec![Field::new("f", DataType::Float64, false)]); | ||
let table = MemTable::try_new(SchemaRef::from(schema), vec![]).unwrap(); | ||
LogicalPlanBuilder::scan("t", provider_as_source(Arc::new(table)), None).unwrap() | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters