Skip to content

Commit

Permalink
fix: refactor and fix to_unixtime (GreptimeTeam#2695)
Browse files Browse the repository at this point in the history
* fix: refactor and fix to_unixtime

* fix: sqlness tests

* feat: supports date type

* fix: test

* feat: supports datetime type

* refactor: convert_to_seconds
  • Loading branch information
killme2008 authored Nov 6, 2023
1 parent cf94d32 commit f387a09
Show file tree
Hide file tree
Showing 7 changed files with 208 additions and 149 deletions.
265 changes: 139 additions & 126 deletions src/common/function/src/scalars/timestamp/to_unixtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,36 +18,50 @@ use std::sync::Arc;

use common_query::error::{InvalidFuncArgsSnafu, Result, UnsupportedInputDataTypeSnafu};
use common_query::prelude::{Signature, Volatility};
use common_time::timestamp::TimeUnit;
use common_time::Timestamp;
use common_time::{Date, DateTime, Timestamp};
use datatypes::prelude::ConcreteDataType;
use datatypes::types::TimestampType;
use datatypes::vectors::{
Int64Vector, StringVector, TimestampMicrosecondVector, TimestampMillisecondVector,
TimestampNanosecondVector, TimestampSecondVector, Vector, VectorRef,
};
use datatypes::vectors::{Int64Vector, VectorRef};
use snafu::ensure;

use crate::scalars::function::{Function, FunctionContext};

/// A function to convert the column into the unix timestamp in seconds.
#[derive(Clone, Debug, Default)]
pub struct ToUnixtimeFunction;

const NAME: &str = "to_unixtime";

fn convert_to_seconds(arg: &str) -> Option<i64> {
match Timestamp::from_str(arg) {
Ok(ts) => {
let sec_mul = (TimeUnit::Second.factor() / ts.unit().factor()) as i64;
Some(ts.value().div_euclid(sec_mul))
}
Err(_err) => None,
if let Ok(dt) = DateTime::from_str(arg) {
return Some(dt.val() / 1000);
}

if let Ok(ts) = Timestamp::from_str(arg) {
return Some(ts.split().0);
}

if let Ok(date) = Date::from_str(arg) {
return Some(date.to_secs());
}

None
}

fn process_vector(vector: &dyn Vector) -> Vec<Option<i64>> {
fn convert_timestamps_to_seconds(vector: &VectorRef) -> Vec<Option<i64>> {
(0..vector.len())
.map(|i| paste::expr!((vector.get(i)).as_timestamp().map(|ts| ts.value())))
.map(|i| vector.get(i).as_timestamp().map(|ts| ts.split().0))
.collect::<Vec<Option<i64>>>()
}

fn convert_dates_to_seconds(vector: &VectorRef) -> Vec<Option<i64>> {
(0..vector.len())
.map(|i| vector.get(i).as_date().map(|dt| dt.to_secs()))
.collect::<Vec<Option<i64>>>()
}

fn convert_datetimes_to_seconds(vector: &VectorRef) -> Vec<Option<i64>> {
(0..vector.len())
.map(|i| vector.get(i).as_datetime().map(|dt| dt.val() / 1000))
.collect::<Vec<Option<i64>>>()
}

Expand All @@ -67,6 +81,8 @@ impl Function for ToUnixtimeFunction {
ConcreteDataType::string_datatype(),
ConcreteDataType::int32_datatype(),
ConcreteDataType::int64_datatype(),
ConcreteDataType::date_datatype(),
ConcreteDataType::datetime_datatype(),
ConcreteDataType::timestamp_second_datatype(),
ConcreteDataType::timestamp_millisecond_datatype(),
ConcreteDataType::timestamp_microsecond_datatype(),
Expand All @@ -87,51 +103,29 @@ impl Function for ToUnixtimeFunction {
}
);

let vector = &columns[0];

match columns[0].data_type() {
ConcreteDataType::String(_) => {
let array = columns[0].to_arrow_array();
let vector = StringVector::try_from_arrow_array(&array).unwrap();
Ok(Arc::new(Int64Vector::from(
(0..vector.len())
.map(|i| convert_to_seconds(&vector.get(i).to_string()))
.collect::<Vec<_>>(),
)))
}
ConcreteDataType::String(_) => Ok(Arc::new(Int64Vector::from(
(0..vector.len())
.map(|i| convert_to_seconds(&vector.get(i).to_string()))
.collect::<Vec<_>>(),
))),
ConcreteDataType::Int64(_) | ConcreteDataType::Int32(_) => {
let array = columns[0].to_arrow_array();
Ok(Arc::new(Int64Vector::try_from_arrow_array(&array).unwrap()))
// Safety: cast always successfully at here
Ok(vector.cast(&ConcreteDataType::int64_datatype()).unwrap())
}
ConcreteDataType::Date(_) => {
let seconds = convert_dates_to_seconds(vector);
Ok(Arc::new(Int64Vector::from(seconds)))
}
ConcreteDataType::Timestamp(ts) => {
let array = columns[0].to_arrow_array();
let value = match ts {
TimestampType::Second(_) => {
let vector = paste::expr!(TimestampSecondVector::try_from_arrow_array(
array
)
.unwrap());
process_vector(&vector)
}
TimestampType::Millisecond(_) => {
let vector = paste::expr!(
TimestampMillisecondVector::try_from_arrow_array(array).unwrap()
);
process_vector(&vector)
}
TimestampType::Microsecond(_) => {
let vector = paste::expr!(
TimestampMicrosecondVector::try_from_arrow_array(array).unwrap()
);
process_vector(&vector)
}
TimestampType::Nanosecond(_) => {
let vector = paste::expr!(TimestampNanosecondVector::try_from_arrow_array(
array
)
.unwrap());
process_vector(&vector)
}
};
Ok(Arc::new(Int64Vector::from(value)))
ConcreteDataType::DateTime(_) => {
let seconds = convert_datetimes_to_seconds(vector);
Ok(Arc::new(Int64Vector::from(seconds)))
}
ConcreteDataType::Timestamp(_) => {
let seconds = convert_timestamps_to_seconds(vector);
Ok(Arc::new(Int64Vector::from(seconds)))
}
_ => UnsupportedInputDataTypeSnafu {
function: NAME,
Expand All @@ -151,11 +145,11 @@ impl fmt::Display for ToUnixtimeFunction {
#[cfg(test)]
mod tests {
use common_query::prelude::TypeSignature;
use datatypes::prelude::{ConcreteDataType, ScalarVectorBuilder};
use datatypes::scalars::ScalarVector;
use datatypes::timestamp::TimestampSecond;
use datatypes::prelude::ConcreteDataType;
use datatypes::value::Value;
use datatypes::vectors::{StringVector, TimestampSecondVector};
use datatypes::vectors::{
DateTimeVector, DateVector, StringVector, TimestampMillisecondVector, TimestampSecondVector,
};

use super::{ToUnixtimeFunction, *};
use crate::scalars::Function;
Expand All @@ -170,18 +164,20 @@ mod tests {
);

assert!(matches!(f.signature(),
Signature {
type_signature: TypeSignature::Uniform(1, valid_types),
volatility: Volatility::Immutable
} if valid_types == vec![
ConcreteDataType::string_datatype(),
ConcreteDataType::int32_datatype(),
ConcreteDataType::int64_datatype(),
ConcreteDataType::timestamp_second_datatype(),
ConcreteDataType::timestamp_millisecond_datatype(),
ConcreteDataType::timestamp_microsecond_datatype(),
ConcreteDataType::timestamp_nanosecond_datatype(),
]
Signature {
type_signature: TypeSignature::Uniform(1, valid_types),
volatility: Volatility::Immutable
} if valid_types == vec![
ConcreteDataType::string_datatype(),
ConcreteDataType::int32_datatype(),
ConcreteDataType::int64_datatype(),
ConcreteDataType::date_datatype(),
ConcreteDataType::datetime_datatype(),
ConcreteDataType::timestamp_second_datatype(),
ConcreteDataType::timestamp_millisecond_datatype(),
ConcreteDataType::timestamp_microsecond_datatype(),
ConcreteDataType::timestamp_nanosecond_datatype(),
]
));

let times = vec![
Expand Down Expand Up @@ -212,26 +208,6 @@ mod tests {
#[test]
fn test_int_to_unixtime() {
let f = ToUnixtimeFunction;
assert_eq!("to_unixtime", f.name());
assert_eq!(
ConcreteDataType::int64_datatype(),
f.return_type(&[]).unwrap()
);

assert!(matches!(f.signature(),
Signature {
type_signature: TypeSignature::Uniform(1, valid_types),
volatility: Volatility::Immutable
} if valid_types == vec![
ConcreteDataType::string_datatype(),
ConcreteDataType::int32_datatype(),
ConcreteDataType::int64_datatype(),
ConcreteDataType::timestamp_second_datatype(),
ConcreteDataType::timestamp_millisecond_datatype(),
ConcreteDataType::timestamp_microsecond_datatype(),
ConcreteDataType::timestamp_nanosecond_datatype(),
]
));

let times = vec![Some(3_i64), None, Some(5_i64), None];
let results = [Some(3), None, Some(5), None];
Expand All @@ -254,38 +230,38 @@ mod tests {
}

#[test]
fn test_timestamp_to_unixtime() {
fn test_date_to_unixtime() {
let f = ToUnixtimeFunction;
assert_eq!("to_unixtime", f.name());
assert_eq!(
ConcreteDataType::int64_datatype(),
f.return_type(&[]).unwrap()
);

assert!(matches!(f.signature(),
Signature {
type_signature: TypeSignature::Uniform(1, valid_types),
volatility: Volatility::Immutable
} if valid_types == vec![
ConcreteDataType::string_datatype(),
ConcreteDataType::int32_datatype(),
ConcreteDataType::int64_datatype(),
ConcreteDataType::timestamp_second_datatype(),
ConcreteDataType::timestamp_millisecond_datatype(),
ConcreteDataType::timestamp_microsecond_datatype(),
ConcreteDataType::timestamp_nanosecond_datatype(),
]
));
let times = vec![Some(123), None, Some(42), None];
let results = [Some(10627200), None, Some(3628800), None];
let date_vector = DateVector::from(times.clone());
let args: Vec<VectorRef> = vec![Arc::new(date_vector)];
let vector = f.eval(FunctionContext::default(), &args).unwrap();
assert_eq!(4, vector.len());
for (i, _t) in times.iter().enumerate() {
let v = vector.get(i);
if i == 1 || i == 3 {
assert_eq!(Value::Null, v);
continue;
}
match v {
Value::Int64(ts) => {
assert_eq!(ts, (*results.get(i).unwrap()).unwrap());
}
_ => unreachable!(),
}
}
}

let times: Vec<Option<TimestampSecond>> = vec![
Some(TimestampSecond::new(123)),
None,
Some(TimestampSecond::new(42)),
None,
];
#[test]
fn test_datetime_to_unixtime() {
let f = ToUnixtimeFunction;

let times = vec![Some(123000), None, Some(42000), None];
let results = [Some(123), None, Some(42), None];
let ts_vector: TimestampSecondVector = build_vector_from_slice(&times);
let args: Vec<VectorRef> = vec![Arc::new(ts_vector)];
let date_vector = DateTimeVector::from(times.clone());
let args: Vec<VectorRef> = vec![Arc::new(date_vector)];
let vector = f.eval(FunctionContext::default(), &args).unwrap();
assert_eq!(4, vector.len());
for (i, _t) in times.iter().enumerate() {
Expand All @@ -303,11 +279,48 @@ mod tests {
}
}

fn build_vector_from_slice<T: ScalarVector>(items: &[Option<T::RefItem<'_>>]) -> T {
let mut builder = T::Builder::with_capacity(items.len());
for item in items {
builder.push(*item);
#[test]
fn test_timestamp_to_unixtime() {
let f = ToUnixtimeFunction;

let times = vec![Some(123), None, Some(42), None];
let results = [Some(123), None, Some(42), None];
let ts_vector = TimestampSecondVector::from(times.clone());
let args: Vec<VectorRef> = vec![Arc::new(ts_vector)];
let vector = f.eval(FunctionContext::default(), &args).unwrap();
assert_eq!(4, vector.len());
for (i, _t) in times.iter().enumerate() {
let v = vector.get(i);
if i == 1 || i == 3 {
assert_eq!(Value::Null, v);
continue;
}
match v {
Value::Int64(ts) => {
assert_eq!(ts, (*results.get(i).unwrap()).unwrap());
}
_ => unreachable!(),
}
}

let times = vec![Some(123000), None, Some(42000), None];
let results = [Some(123), None, Some(42), None];
let ts_vector = TimestampMillisecondVector::from(times.clone());
let args: Vec<VectorRef> = vec![Arc::new(ts_vector)];
let vector = f.eval(FunctionContext::default(), &args).unwrap();
assert_eq!(4, vector.len());
for (i, _t) in times.iter().enumerate() {
let v = vector.get(i);
if i == 1 || i == 3 {
assert_eq!(Value::Null, v);
continue;
}
match v {
Value::Int64(ts) => {
assert_eq!(ts, (*results.get(i).unwrap()).unwrap());
}
_ => unreachable!(),
}
}
builder.finish()
}
}
2 changes: 1 addition & 1 deletion src/common/time/src/timestamp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,7 @@ impl Timestamp {

/// Split a [Timestamp] into seconds part and nanoseconds part.
/// Notice the seconds part of split result is always rounded down to floor.
fn split(&self) -> (i64, u32) {
pub fn split(&self) -> (i64, u32) {
let sec_mul = (TimeUnit::Second.factor() / self.unit.factor()) as i64;
let nsec_mul = (self.unit.factor() / TimeUnit::Nanosecond.factor()) as i64;

Expand Down
16 changes: 16 additions & 0 deletions src/datatypes/src/value.rs
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,22 @@ impl Value {
}
}

/// Cast Value to Date. Return None if value is not a valid date data type.
pub fn as_date(&self) -> Option<Date> {
match self {
Value::Date(t) => Some(*t),
_ => None,
}
}

/// Cast Value to DateTime. Return None if value is not a valid datetime data type.
pub fn as_datetime(&self) -> Option<DateTime> {
match self {
Value::DateTime(t) => Some(*t),
_ => None,
}
}

/// Cast Value to [Time]. Return None if value is not a valid time data type.
pub fn as_time(&self) -> Option<Time> {
match self {
Expand Down
Loading

0 comments on commit f387a09

Please sign in to comment.