Skip to content

Commit

Permalink
feat(query): load ndjson support ts with diff units. (databendlabs#17203
Browse files Browse the repository at this point in the history
)

* feat(query): load ndjson/csv/tsv support ts with diff units.

* fix

* refactor

* refactor

* fix clippy
  • Loading branch information
youngsofun authored Jan 8, 2025
1 parent fa209f4 commit 069bdbb
Show file tree
Hide file tree
Showing 13 changed files with 121 additions and 82 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions src/query/formats/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ databend-common-expression = { workspace = true }
databend-common-io = { workspace = true }
databend-common-meta-app = { workspace = true }
databend-common-settings = { workspace = true }
databend-functions-scalar-datetime = { workspace = true }
databend-storages-common-blocks = { workspace = true }
databend-storages-common-table-meta = { workspace = true }

Expand Down
59 changes: 59 additions & 0 deletions src/query/formats/src/field_decoder/common.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
// Copyright 2021 Datafuse Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use std::io::Cursor;

use bstr::ByteSlice;
use databend_common_exception::ErrorCode;
use databend_common_exception::Result;
use databend_common_expression::types::timestamp::clamp_timestamp;
use databend_common_io::cursor_ext::read_num_text_exact;
use databend_common_io::cursor_ext::BufferReadDateTimeExt;
use databend_common_io::cursor_ext::DateTimeResType;
use databend_common_io::cursor_ext::ReadBytesExt;
use databend_functions_scalar_datetime::datetime::int64_to_timestamp;

use crate::InputCommonSettings;

pub(crate) fn read_timestamp(
column: &mut Vec<i64>,
data: &[u8],
settings: &InputCommonSettings,
) -> Result<()> {
let ts = if !data.contains(&b'-') {
int64_to_timestamp(read_num_text_exact(data)?)
} else {
let mut buffer_readr = Cursor::new(&data);
let t = buffer_readr.read_timestamp_text(&settings.jiff_timezone)?;
match t {
DateTimeResType::Datetime(t) => {
if !buffer_readr.eof() {
let data = data.to_str().unwrap_or("not utf8");
let msg = format!(
"fail to deserialize timestamp, unexpected end at pos {} of {}",
buffer_readr.position(),
data
);
return Err(ErrorCode::BadBytes(msg));
}
let mut ts = t.timestamp().as_microsecond();
clamp_timestamp(&mut ts);
ts
}
_ => unreachable!(),
}
};
column.push(ts);
Ok(())
}
25 changes: 2 additions & 23 deletions src/query/formats/src/field_decoder/fast_values.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@ use databend_common_expression::types::decimal::DecimalSize;
use databend_common_expression::types::nullable::NullableColumnBuilder;
use databend_common_expression::types::number::Number;
use databend_common_expression::types::string::StringColumnBuilder;
use databend_common_expression::types::timestamp::clamp_timestamp;
use databend_common_expression::types::AnyType;
use databend_common_expression::types::MutableBitmap;
use databend_common_expression::types::NumberColumnBuilder;
Expand All @@ -51,7 +50,6 @@ use databend_common_io::constants::NULL_BYTES_UPPER;
use databend_common_io::constants::TRUE_BYTES_LOWER;
use databend_common_io::cursor_ext::BufferReadDateTimeExt;
use databend_common_io::cursor_ext::BufferReadStringExt;
use databend_common_io::cursor_ext::DateTimeResType;
use databend_common_io::cursor_ext::ReadBytesExt;
use databend_common_io::cursor_ext::ReadCheckPointExt;
use databend_common_io::cursor_ext::ReadNumberExt;
Expand All @@ -62,9 +60,9 @@ use databend_common_io::prelude::FormatSettings;
use databend_common_io::Interval;
use jsonb::parse_value;
use lexical_core::FromLexical;
use num::cast::AsPrimitive;
use num_traits::NumCast;

use crate::field_decoder::common::read_timestamp;
use crate::FieldDecoder;
use crate::InputCommonSettings;

Expand Down Expand Up @@ -323,26 +321,7 @@ impl FastFieldDecoderValues {
) -> Result<()> {
let mut buf = Vec::new();
self.read_string_inner(reader, &mut buf, positions)?;
let mut buffer_readr = Cursor::new(&buf);
let ts = buffer_readr.read_timestamp_text(&self.common_settings().jiff_timezone)?;
match ts {
DateTimeResType::Datetime(ts) => {
if !buffer_readr.eof() {
let data = buf.to_str().unwrap_or("not utf8");
let msg = format!(
"fail to deserialize timestamp, unexpected end at pos {} of {}",
buffer_readr.position(),
data
);
return Err(ErrorCode::BadBytes(msg));
}
let mut micros = ts.timestamp().as_microsecond();
clamp_timestamp(&mut micros);
column.push(micros.as_());
}
_ => unreachable!(),
}
Ok(())
read_timestamp(column, &buf, self.common_settings())
}

fn read_array<R: AsRef<[u8]>>(
Expand Down
5 changes: 3 additions & 2 deletions src/query/formats/src/field_decoder/json_ast.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ use databend_common_io::cursor_ext::DateTimeResType;
use databend_common_io::geography::geography_from_ewkt;
use databend_common_io::geometry_from_ewkt;
use databend_common_io::parse_bitmap;
use databend_functions_scalar_datetime::datetime::int64_to_timestamp;
use jiff::tz::TimeZone;
use lexical_core::FromLexical;
use num::cast::AsPrimitive;
Expand Down Expand Up @@ -296,8 +297,8 @@ impl FieldJsonAstDecoder {
Ok(())
}
Value::Number(number) => match number.as_i64() {
Some(mut n) => {
clamp_timestamp(&mut n);
Some(n) => {
let n = int64_to_timestamp(n);
column.push(n);
Ok(())
}
Expand Down
1 change: 1 addition & 0 deletions src/query/formats/src/field_decoder/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

mod common;
mod fast_values;
mod json_ast;
mod nested;
Expand Down
34 changes: 5 additions & 29 deletions src/query/formats/src/field_decoder/nested.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ use databend_common_expression::types::decimal::DecimalSize;
use databend_common_expression::types::nullable::NullableColumnBuilder;
use databend_common_expression::types::number::Number;
use databend_common_expression::types::string::StringColumnBuilder;
use databend_common_expression::types::timestamp::clamp_timestamp;
use databend_common_expression::types::AnyType;
use databend_common_expression::types::MutableBitmap;
use databend_common_expression::types::NumberColumnBuilder;
Expand All @@ -45,7 +44,6 @@ use databend_common_io::constants::NULL_BYTES_UPPER;
use databend_common_io::constants::TRUE_BYTES_LOWER;
use databend_common_io::cursor_ext::BufferReadDateTimeExt;
use databend_common_io::cursor_ext::BufferReadStringExt;
use databend_common_io::cursor_ext::DateTimeResType;
use databend_common_io::cursor_ext::ReadBytesExt;
use databend_common_io::cursor_ext::ReadCheckPointExt;
use databend_common_io::cursor_ext::ReadNumberExt;
Expand All @@ -57,6 +55,7 @@ use jsonb::parse_value;
use lexical_core::FromLexical;

use crate::binary::decode_binary;
use crate::field_decoder::common::read_timestamp;
use crate::FileFormatOptionsExt;
use crate::InputCommonSettings;

Expand All @@ -67,9 +66,9 @@ pub struct NestedValues {

impl NestedValues {
/// Consider map/tuple/array as a private object format like JSON.
/// Currently we assume it as a fixed format, embed it in "strings" of other formats.
/// So we can used the same code to encode/decode in clients.
/// It maybe need to be configurable in future,
/// Currently, we assume it as a fixed format, embed it in "strings" of other formats.
/// So we can use the same code to encode/decode in clients.
/// It maybe needs to be configurable in the future,
/// to read data from other DB which also support map/tuple/array.
pub fn create(options_ext: &FileFormatOptionsExt) -> Self {
NestedValues {
Expand Down Expand Up @@ -281,30 +280,7 @@ impl NestedValues {
) -> Result<()> {
let mut buf = Vec::new();
self.read_string_inner(reader, &mut buf)?;
let mut buffer_readr = Cursor::new(&buf);
let mut ts = if !buf.contains(&b'-') {
buffer_readr.read_num_text_exact()?
} else {
let t = buffer_readr.read_timestamp_text(&self.common_settings().jiff_timezone)?;
match t {
DateTimeResType::Datetime(t) => {
if !buffer_readr.eof() {
let data = buf.to_str().unwrap_or("not utf8");
let msg = format!(
"fail to deserialize timestamp, unexpected end at pos {} of {}",
buffer_readr.position(),
data
);
return Err(ErrorCode::BadBytes(msg));
}
t.timestamp().as_microsecond()
}
_ => unreachable!(),
}
};
clamp_timestamp(&mut ts);
column.push(ts);
Ok(())
read_timestamp(column, &buf, self.common_settings())
}

fn read_bitmap<R: AsRef<[u8]>>(
Expand Down
29 changes: 2 additions & 27 deletions src/query/formats/src/field_decoder/separated_text.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ use databend_common_expression::types::decimal::Decimal;
use databend_common_expression::types::decimal::DecimalColumnBuilder;
use databend_common_expression::types::decimal::DecimalSize;
use databend_common_expression::types::nullable::NullableColumnBuilder;
use databend_common_expression::types::timestamp::clamp_timestamp;
use databend_common_expression::types::AnyType;
use databend_common_expression::types::MutableBitmap;
use databend_common_expression::types::Number;
Expand All @@ -45,8 +44,6 @@ use databend_common_io::constants::TRUE_BYTES_NUM;
use databend_common_io::cursor_ext::collect_number;
use databend_common_io::cursor_ext::read_num_text_exact;
use databend_common_io::cursor_ext::BufferReadDateTimeExt;
use databend_common_io::cursor_ext::DateTimeResType;
use databend_common_io::cursor_ext::ReadBytesExt;
use databend_common_io::geography::geography_from_ewkt_bytes;
use databend_common_io::parse_bitmap;
use databend_common_io::parse_bytes_to_ewkb;
Expand All @@ -58,6 +55,7 @@ use lexical_core::FromLexical;
use num_traits::NumCast;

use crate::binary::decode_binary;
use crate::field_decoder::common::read_timestamp;
use crate::field_decoder::FieldDecoder;
use crate::FileFormatOptionsExt;
use crate::InputCommonSettings;
Expand Down Expand Up @@ -276,30 +274,7 @@ impl SeparatedTextDecoder {
}

fn read_timestamp(&self, column: &mut Vec<i64>, data: &[u8]) -> Result<()> {
let mut ts = if !data.contains(&b'-') {
read_num_text_exact(data)?
} else {
let mut buffer_readr = Cursor::new(&data);
let t = buffer_readr.read_timestamp_text(&self.common_settings().jiff_timezone)?;
match t {
DateTimeResType::Datetime(t) => {
if !buffer_readr.eof() {
let data = data.to_str().unwrap_or("not utf8");
let msg = format!(
"fail to deserialize timestamp, unexpected end at pos {} of {}",
buffer_readr.position(),
data
);
return Err(ErrorCode::BadBytes(msg));
}
t.timestamp().as_microsecond()
}
_ => unreachable!(),
}
};
clamp_timestamp(&mut ts);
column.push(ts);
Ok(())
read_timestamp(column, data, self.common_settings())
}

fn read_bitmap(&self, column: &mut BinaryColumnBuilder, data: &[u8]) -> Result<()> {
Expand Down
2 changes: 1 addition & 1 deletion src/query/functions/src/scalars/timestamp/src/datetime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ pub fn register(registry: &mut FunctionRegistry) {

/// Check if timestamp is within range, and return the timestamp in micros.
#[inline]
fn int64_to_timestamp(mut n: i64) -> i64 {
pub fn int64_to_timestamp(mut n: i64) -> i64 {
if -31536000000 < n && n < 31536000000 {
n * MICROS_PER_SEC
} else if -31536000000000 < n && n < 31536000000000 {
Expand Down
8 changes: 8 additions & 0 deletions tests/data/csv/ts.csv
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
1736305864,('1736305864')
1736305865000,('1736305865000')
1736305866000000,('1736305866000000')
1,('1')
1970-01-01 00:00:01,('1970-01-01 00:00:01')
1970-01-01 00:00:00.001,('1970-01-01 00:00:00.001')
1970-01-01 00:00:00.000001,('1970-01-01 00:00:00.000001')
1970-01-01 00:00:00.000000,('1970-01-01 00:00:00.000000')
3 changes: 3 additions & 0 deletions tests/data/ndjson/ts.ndjson
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
{"t":1736305864}
{"t":1736305865000}
{"t":1736305866000000}
20 changes: 20 additions & 0 deletions tests/sqllogictests/suites/stage/formats/csv/csv_types.test
Original file line number Diff line number Diff line change
Expand Up @@ -121,3 +121,23 @@ query
select $1 from @csv_types(file_format => 'csv_raw');
----
'((''c d'',''{"a":1}''),{1:''["a",0]''},[({''x'':2},3)])'|1

statement ok
CREATE OR REPLACE TABLE cast_ts_csv (t timestamp, tt tuple(a timestamp))

query
copy into cast_ts_csv from @data/csv/ts.csv file_format = (type = CSV) ON_ERROR=continue
----
csv/ts.csv 8 0 NULL NULL

query
select t, tt.1 from cast_ts_csv
----
2025-01-08 03:11:04.000000 2025-01-08 03:11:04.000000
2025-01-08 03:11:05.000000 2025-01-08 03:11:05.000000
2025-01-08 03:11:06.000000 2025-01-08 03:11:06.000000
1970-01-01 00:00:01.000000 1970-01-01 00:00:01.000000
1970-01-01 00:00:01.000000 1970-01-01 00:00:01.000000
1970-01-01 00:00:00.001000 1970-01-01 00:00:00.001000
1970-01-01 00:00:00.000001 1970-01-01 00:00:00.000001
1970-01-01 00:00:00.000000 1970-01-01 00:00:00.000000
15 changes: 15 additions & 0 deletions tests/sqllogictests/suites/stage/formats/ndjson/ndjson_cast.test
Original file line number Diff line number Diff line change
Expand Up @@ -15,3 +15,18 @@ select * from cast_ndjson
data1 {'env':'test1','length':'ok'}
data2 {'env':'test2','length':'true'}
data3 {'env':'test3','length':'10'}

statement ok
CREATE OR REPLACE TABLE cast_ts_ndjson (t timestamp)

query
copy into cast_ts_ndjson from @data/ndjson/ts.ndjson file_format = (type = NDJSON) ON_ERROR=continue
----
ndjson/ts.ndjson 3 0 NULL NULL

query
select * from cast_ts_ndjson order by t
----
2025-01-08 03:11:04.000000
2025-01-08 03:11:05.000000
2025-01-08 03:11:06.000000

0 comments on commit 069bdbb

Please sign in to comment.