Skip to content

Commit

Permalink
Merge pull request databendlabs#6873 from youngsofun/fmt
Browse files Browse the repository at this point in the history
feat(input format): pass FileSplit instead of Vec<u8>.
  • Loading branch information
BohuTANG authored Jul 28, 2022
2 parents 3ad86bc + 8cbafb5 commit 522853e
Show file tree
Hide file tree
Showing 18 changed files with 312 additions and 191 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

11 changes: 11 additions & 0 deletions common/datavalues/src/data_schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,9 @@ use common_arrow::arrow::datatypes::SchemaRef as ArrowSchemaRef;
use common_exception::ErrorCode;
use common_exception::Result;

use crate::types::data_type::DataType;
use crate::DataField;
use crate::TypeDeserializerImpl;

/// memory layout.
#[derive(serde::Serialize, serde::Deserialize, Debug, Clone, PartialEq, Eq, Default)]
Expand Down Expand Up @@ -154,6 +156,15 @@ impl DataSchema {

ArrowSchema::from(fields).with_metadata(self.metadata.clone())
}

pub fn create_deserializers(&self, capacity: usize) -> Vec<TypeDeserializerImpl> {
let mut deserializers = Vec::with_capacity(self.num_fields());
for field in self.fields() {
let data_type = field.data_type();
deserializers.push(data_type.create_deserializer(capacity));
}
deserializers
}
}

pub type DataSchemaRef = Arc<DataSchema>;
Expand Down
9 changes: 6 additions & 3 deletions common/formats/src/format.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ use common_datablocks::DataBlock;
use common_datavalues::TypeDeserializerImpl;
use common_exception::ErrorCode;
use common_exception::Result;
use common_io::prelude::FileSplit;
use common_io::prelude::MemoryReader;
use common_io::prelude::NestedCheckpointReader;

Expand All @@ -43,9 +44,9 @@ pub trait InputFormat: Send + Sync {

fn deserialize_data(&self, state: &mut Box<dyn InputState>) -> Result<Vec<DataBlock>>;

fn read_buf(&self, buf: &[u8], state: &mut Box<dyn InputState>) -> Result<usize>;
fn deserialize_complete_split(&self, split: FileSplit) -> Result<Vec<DataBlock>>;

fn set_buf(&self, buf: Vec<u8>, state: &mut Box<dyn InputState>);
fn read_buf(&self, buf: &[u8], state: &mut Box<dyn InputState>) -> Result<(usize, bool)>;

fn take_buf(&self, state: &mut Box<dyn InputState>) -> Vec<u8>;

Expand All @@ -66,6 +67,8 @@ pub trait InputFormat: Send + Sync {
}

fn read_row_num(&self, _state: &mut Box<dyn InputState>) -> Result<usize> {
Err(ErrorCode::UnImplement("Unimplement error"))
Ok(0)
}
}

trait RowBasedInputFormat: InputFormat {}
34 changes: 17 additions & 17 deletions common/formats/src/format_csv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,13 @@ use std::sync::Arc;

use common_datablocks::DataBlock;
use common_datavalues::DataSchemaRef;
use common_datavalues::DataType;
use common_datavalues::TypeDeserializer;
use common_datavalues::TypeDeserializerImpl;
use common_exception::ErrorCode;
use common_exception::Result;
use common_io::prelude::position4;
use common_io::prelude::BufferReadExt;
use common_io::prelude::FileSplit;
use common_io::prelude::FormatSettings;
use common_io::prelude::MemoryReader;
use common_io::prelude::NestedCheckpointReader;
Expand Down Expand Up @@ -229,17 +229,21 @@ impl InputFormat for CsvInputFormat {
}

fn deserialize_data(&self, state: &mut Box<dyn InputState>) -> Result<Vec<DataBlock>> {
let mut deserializers = Vec::with_capacity(self.schema.num_fields());
for field in self.schema.fields() {
let data_type = field.data_type();
deserializers.push(data_type.create_deserializer(self.min_accepted_rows));
}

let mut state = std::mem::replace(state, self.create_state());
let state = state.as_any().downcast_mut::<CsvInputState>().unwrap();
let memory = std::mem::take(&mut state.memory);
self.deserialize_complete_split(FileSplit {
path: state.file_name.clone(),
start_offset: 0,
start_row: state.start_row_index,
buf: memory,
})
}

let memory_reader = MemoryReader::new(memory);
fn deserialize_complete_split(&self, split: FileSplit) -> Result<Vec<DataBlock>> {
let mut deserializers = self.schema.create_deserializers(self.min_accepted_rows);

let memory_reader = MemoryReader::new(split.buf);
let mut checkpoint_reader = NestedCheckpointReader::new(memory_reader);

let mut row_index = 0;
Expand All @@ -249,8 +253,8 @@ impl InputFormat for CsvInputFormat {
let checkpoint_buffer = checkpoint_reader.get_checkpoint_buffer_end();
let msg = self.get_diagnostic_info(
checkpoint_buffer,
&state.file_name,
row_index + state.start_row_index,
&split.path,
row_index + split.start_row,
self.schema.clone(),
self.min_accepted_rows,
self.settings.clone(),
Expand Down Expand Up @@ -317,7 +321,7 @@ impl InputFormat for CsvInputFormat {
Ok(())
}

fn read_buf(&self, buf: &[u8], state: &mut Box<dyn InputState>) -> Result<usize> {
fn read_buf(&self, buf: &[u8], state: &mut Box<dyn InputState>) -> Result<(usize, bool)> {
let mut index = 0;
let state = state.as_any().downcast_mut::<CsvInputState>().unwrap();

Expand All @@ -336,12 +340,8 @@ impl InputFormat for CsvInputFormat {
}

state.memory.extend_from_slice(&buf[0..index]);
Ok(index)
}

fn set_buf(&self, buf: Vec<u8>, state: &mut Box<dyn InputState>) {
let state = state.as_any().downcast_mut::<CsvInputState>().unwrap();
state.memory = buf;
let finished = !state.need_more_data && state.ignore_if_first.is_none();
Ok((index, finished))
}

fn take_buf(&self, state: &mut Box<dyn InputState>) -> Vec<u8> {
Expand Down
25 changes: 15 additions & 10 deletions common/formats/src/format_parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ use common_datavalues::DataSchemaRef;
use common_datavalues::DataTypeImpl;
use common_exception::ErrorCode;
use common_exception::Result;
use common_io::prelude::FileSplit;
use common_io::prelude::FormatSettings;
use similar_asserts::Diff;

Expand Down Expand Up @@ -114,11 +115,20 @@ impl InputFormat for ParquetInputFormat {
fn deserialize_data(&self, state: &mut Box<dyn InputState>) -> Result<Vec<DataBlock>> {
let mut state = std::mem::replace(state, self.create_state());
let state = state.as_any().downcast_mut::<ParquetInputState>().unwrap();

if state.memory.is_empty() {
let memory = std::mem::take(&mut state.memory);
if memory.is_empty() {
return Ok(vec![]);
}
let mut cursor = Cursor::new(&state.memory);
self.deserialize_complete_split(FileSplit {
path: None,
start_offset: 0,
start_row: 0,
buf: memory,
})
}

fn deserialize_complete_split(&self, split: FileSplit) -> Result<Vec<DataBlock>> {
let mut cursor = Cursor::new(&split.buf);
let parquet_metadata = Self::read_meta_data(&mut cursor)?;
let infer_schema = read::infer_schema(&parquet_metadata)?;
let actually_schema = DataSchema::from(&infer_schema);
Expand Down Expand Up @@ -170,15 +180,10 @@ impl InputFormat for ParquetInputFormat {
Ok(data_blocks)
}

fn read_buf(&self, buf: &[u8], state: &mut Box<dyn InputState>) -> Result<usize> {
fn read_buf(&self, buf: &[u8], state: &mut Box<dyn InputState>) -> Result<(usize, bool)> {
let state = state.as_any().downcast_mut::<ParquetInputState>().unwrap();
state.memory.extend_from_slice(buf);
Ok(buf.len())
}

fn set_buf(&self, buf: Vec<u8>, state: &mut Box<dyn InputState>) {
let state = state.as_any().downcast_mut::<ParquetInputState>().unwrap();
state.memory = buf;
Ok((buf.len(), false))
}

fn take_buf(&self, state: &mut Box<dyn InputState>) -> Vec<u8> {
Expand Down
87 changes: 44 additions & 43 deletions common/formats/src/format_tsv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,13 @@ use std::sync::Arc;

use common_datablocks::DataBlock;
use common_datavalues::DataSchemaRef;
use common_datavalues::DataType;
use common_datavalues::TypeDeserializer;
use common_datavalues::TypeDeserializerImpl;
use common_exception::ErrorCode;
use common_exception::Result;
use common_io::prelude::position2;
use common_io::prelude::BufferReadExt;
use common_io::prelude::FileSplit;
use common_io::prelude::FormatSettings;
use common_io::prelude::MemoryReader;
use common_io::prelude::NestedCheckpointReader;
Expand Down Expand Up @@ -182,44 +182,15 @@ impl InputFormat for TsvInputFormat {
}

fn deserialize_data(&self, state: &mut Box<dyn InputState>) -> Result<Vec<DataBlock>> {
let mut deserializers = Vec::with_capacity(self.schema.num_fields());
for field in self.schema.fields() {
let data_type = field.data_type();
deserializers.push(data_type.create_deserializer(self.min_accepted_rows));
}

let mut state = std::mem::replace(state, self.create_state());
let state = state.as_any().downcast_mut::<TsvInputState>().unwrap();
let memory = std::mem::take(&mut state.memory);
let memory_reader = MemoryReader::new(memory);
let mut checkpoint_reader = NestedCheckpointReader::new(memory_reader);

let mut row_index = 0;
while !checkpoint_reader.eof()? {
checkpoint_reader.push_checkpoint();
if let Err(err) = self.read_row(&mut checkpoint_reader, &mut deserializers, row_index) {
let checkpoint_buffer = checkpoint_reader.get_checkpoint_buffer_end();
let msg = self.get_diagnostic_info(
checkpoint_buffer,
&state.file_name,
row_index + state.start_row_index,
self.schema.clone(),
self.min_accepted_rows,
self.settings.clone(),
)?;
let err = err.add_message_back(msg);
return Err(err);
}
checkpoint_reader.pop_checkpoint();
row_index += 1;
}

let mut columns = Vec::with_capacity(deserializers.len());
for deserializer in &mut deserializers {
columns.push(deserializer.finish_to_column());
}

Ok(vec![DataBlock::create(self.schema.clone(), columns)])
self.deserialize_complete_split(FileSplit {
path: state.file_name.clone(),
start_offset: 0,
start_row: state.start_row_index,
buf: memory,
})
}

fn read_row(
Expand Down Expand Up @@ -258,7 +229,7 @@ impl InputFormat for TsvInputFormat {
Ok(())
}

fn read_buf(&self, buf: &[u8], state: &mut Box<dyn InputState>) -> Result<usize> {
fn read_buf(&self, buf: &[u8], state: &mut Box<dyn InputState>) -> Result<(usize, bool)> {
let mut index = 0;
let state = state.as_any().downcast_mut::<TsvInputState>().unwrap();

Expand All @@ -274,12 +245,8 @@ impl InputFormat for TsvInputFormat {
}

state.memory.extend_from_slice(&buf[0..index]);
Ok(index)
}

fn set_buf(&self, buf: Vec<u8>, state: &mut Box<dyn InputState>) {
let state = state.as_any().downcast_mut::<TsvInputState>().unwrap();
state.memory = buf;
let finished = !state.need_more_data && state.ignore_if_first.is_none();
Ok((index, finished))
}

fn take_buf(&self, state: &mut Box<dyn InputState>) -> Vec<u8> {
Expand Down Expand Up @@ -312,6 +279,40 @@ impl InputFormat for TsvInputFormat {
let state = state.as_any().downcast_mut::<TsvInputState>().unwrap();
Ok(state.accepted_rows)
}

fn deserialize_complete_split(&self, split: FileSplit) -> Result<Vec<DataBlock>> {
let mut deserializers = self.schema.create_deserializers(self.min_accepted_rows);

let memory_reader = MemoryReader::new(split.buf);
let mut checkpoint_reader = NestedCheckpointReader::new(memory_reader);

let mut row_index = 0;
while !checkpoint_reader.eof()? {
checkpoint_reader.push_checkpoint();
if let Err(err) = self.read_row(&mut checkpoint_reader, &mut deserializers, row_index) {
let checkpoint_buffer = checkpoint_reader.get_checkpoint_buffer_end();
let msg = self.get_diagnostic_info(
checkpoint_buffer,
&split.path,
row_index + split.start_row,
self.schema.clone(),
self.min_accepted_rows,
self.settings.clone(),
)?;
let err = err.add_message_back(msg);
return Err(err);
}
checkpoint_reader.pop_checkpoint();
row_index += 1;
}

let mut columns = Vec::with_capacity(deserializers.len());
for deserializer in &mut deserializers {
columns.push(deserializer.finish_to_column());
}

Ok(vec![DataBlock::create(self.schema.clone(), columns)])
}
}

#[allow(clippy::format_push_string)]
Expand Down
8 changes: 4 additions & 4 deletions common/formats/tests/it/format_csv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ fn test_accepted_multi_lines() -> Result<()> {
let bytes = "first,second\nfirst,".as_bytes();
assert_eq!(
bytes.len(),
csv_input_format.read_buf(bytes, &mut csv_input_state)?
csv_input_format.read_buf(bytes, &mut csv_input_state)?.0
);
assert_eq!(
bytes,
Expand All @@ -67,7 +67,7 @@ fn test_accepted_multi_lines() -> Result<()> {
);

let bytes = "second\nfirst,".as_bytes();
assert_eq!(7, csv_input_format.read_buf(bytes, &mut csv_input_state)?);
assert_eq!(7, csv_input_format.read_buf(bytes, &mut csv_input_state)?.0);
assert_eq!(
"first,second\nfirst,second\n".as_bytes(),
csv_input_state
Expand Down Expand Up @@ -153,7 +153,7 @@ fn assert_complete_line(content: &str) -> Result<()> {
let bytes = content.as_bytes();
assert_eq!(
bytes.len(),
csv_input_format.read_buf(bytes, &mut csv_input_state)?
csv_input_format.read_buf(bytes, &mut csv_input_state)?.0
);
assert_eq!(
bytes,
Expand Down Expand Up @@ -181,7 +181,7 @@ fn assert_broken_line(content: &str, assert_size: usize) -> Result<()> {
let bytes = content.as_bytes();
assert_eq!(
assert_size,
csv_input_format.read_buf(bytes, &mut csv_input_state)?
csv_input_format.read_buf(bytes, &mut csv_input_state)?.0
);
assert_eq!(
&bytes[0..assert_size],
Expand Down
21 changes: 21 additions & 0 deletions common/io/src/file_split.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
// Copyright 2022 Datafuse Labs.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#[derive(Debug)]
pub struct FileSplit {
pub path: Option<String>,
pub start_offset: usize,
pub start_row: usize,
pub buf: Vec<u8>,
}
1 change: 1 addition & 0 deletions common/io/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ mod binary_read;
mod binary_write;

mod buffer;
mod file_split;
mod format_settings;
mod options_deserializer;
mod position;
Expand Down
1 change: 1 addition & 0 deletions common/io/src/prelude.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ pub use crate::buffer::BufferReader;
pub use crate::buffer::CheckpointRead;
pub use crate::buffer::MemoryReader;
pub use crate::buffer::NestedCheckpointReader;
pub use crate::file_split::*;
pub use crate::format_settings::Compression;
pub use crate::format_settings::FormatSettings;
pub use crate::options_deserializer::OptionsDeserializer;
Expand Down
Loading

0 comments on commit 522853e

Please sign in to comment.