Skip to content

Commit

Permalink
[Refactor]split column reader make it more readable (StarRocks#45776)
Browse files Browse the repository at this point in the history
Signed-off-by: zombee0 <[email protected]>
  • Loading branch information
zombee0 authored May 20, 2024
1 parent 2e45589 commit 9ed56be
Show file tree
Hide file tree
Showing 7 changed files with 958 additions and 862 deletions.
2 changes: 2 additions & 0 deletions be/src/formats/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,8 @@ add_library(Formats STATIC
parquet/column_chunk_reader.cpp
parquet/column_converter.cpp
parquet/column_reader.cpp
parquet/scalar_column_reader.cpp
parquet/complex_column_reader.cpp
parquet/encoding.cpp
parquet/level_codec.cpp
parquet/page_reader.cpp
Expand Down
858 changes: 2 additions & 856 deletions be/src/formats/parquet/column_reader.cpp

Large diffs are not rendered by default.

6 changes: 0 additions & 6 deletions be/src/formats/parquet/column_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,15 +23,11 @@

#include "column/column.h"
#include "column/vectorized_fwd.h"
#include "common/global_types.h"
#include "common/object_pool.h"
#include "common/status.h"
#include "common/statusor.h"
#include "exprs/function_context.h"
#include "formats/parquet/column_converter.h"
#include "formats/parquet/types.h"
#include "formats/parquet/utils.h"
#include "gen_cpp/PlanNodes_types.h"
#include "io/shared_buffered_input_stream.h"
#include "storage/column_predicate.h"
#include "storage/range.h"
Expand Down Expand Up @@ -157,8 +153,6 @@ class ColumnReader {
}

virtual void select_offset_index(const SparseRange<uint64_t>& range, const uint64_t rg_first_row) = 0;

std::unique_ptr<ColumnConverter> converter;
};

} // namespace starrocks::parquet
354 changes: 354 additions & 0 deletions be/src/formats/parquet/complex_column_reader.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,354 @@
// Copyright 2021-present StarRocks, Inc. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#include "formats/parquet/complex_column_reader.h"

#include "column/array_column.h"
#include "column/map_column.h"
#include "column/struct_column.h"
#include "formats/parquet/schema.h"
#include "gutil/casts.h"
#include "gutil/strings/substitute.h"

namespace starrocks::parquet {

template <typename TOffset, typename TIsNull>
static void def_rep_to_offset(const LevelInfo& level_info, const level_t* def_levels, const level_t* rep_levels,
size_t num_levels, TOffset* offsets, TIsNull* is_nulls, size_t* num_offsets,
bool* has_null) {
size_t offset_pos = 0;
for (int i = 0; i < num_levels; ++i) {
// when def_level is less than immediate_repeated_ancestor_def_level, it means that level
// will affect its ancestor.
// when rep_level is greater than max_rep_level, this means that level affects its
// descendants.
// So we can skip this levels
if (def_levels[i] < level_info.immediate_repeated_ancestor_def_level ||
rep_levels[i] > level_info.max_rep_level) {
continue;
}
if (rep_levels[i] == level_info.max_rep_level) {
offsets[offset_pos]++;
continue;
}

// Start for a new row
offset_pos++;
offsets[offset_pos] = offsets[offset_pos - 1];
if (def_levels[i] >= level_info.max_def_level) {
offsets[offset_pos]++;
}

// when def_level equals with max_def_level, this is a non null element or a required element
// when def_level equals with (max_def_level - 1), this indicates an empty array
// when def_level less than (max_def_level - 1) it means this array is null
if (def_levels[i] >= level_info.max_def_level - 1) {
is_nulls[offset_pos - 1] = 0;
} else {
is_nulls[offset_pos - 1] = 1;
*has_null = true;
}
}
*num_offsets = offset_pos;
}

Status ListColumnReader::read_range(const Range<uint64_t>& range, const Filter* filter, ColumnPtr& dst) {
NullableColumn* nullable_column = nullptr;
ArrayColumn* array_column = nullptr;
if (dst->is_nullable()) {
nullable_column = down_cast<NullableColumn*>(dst.get());
DCHECK(nullable_column->mutable_data_column()->is_array());
array_column = down_cast<ArrayColumn*>(nullable_column->mutable_data_column());
} else {
DCHECK(dst->is_array());
DCHECK(!_field->is_nullable);
array_column = down_cast<ArrayColumn*>(dst.get());
}
auto& child_column = array_column->elements_column();
RETURN_IF_ERROR(_element_reader->read_range(range, filter, child_column));

level_t* def_levels = nullptr;
level_t* rep_levels = nullptr;
size_t num_levels = 0;
_element_reader->get_levels(&def_levels, &rep_levels, &num_levels);

auto& offsets = array_column->offsets_column()->get_data();
offsets.resize(num_levels + 1);
NullColumn null_column(num_levels);
auto& is_nulls = null_column.get_data();
size_t num_offsets = 0;
bool has_null = false;
def_rep_to_offset(_field->level_info, def_levels, rep_levels, num_levels, &offsets[0], &is_nulls[0], &num_offsets,
&has_null);
offsets.resize(num_offsets + 1);
is_nulls.resize(num_offsets);

if (dst->is_nullable()) {
DCHECK(nullable_column != nullptr);
nullable_column->mutable_null_column()->swap_column(null_column);
nullable_column->set_has_null(has_null);
}

return Status::OK();
}

Status MapColumnReader::read_range(const Range<uint64_t>& range, const Filter* filter, ColumnPtr& dst) {
NullableColumn* nullable_column = nullptr;
MapColumn* map_column = nullptr;
if (dst->is_nullable()) {
nullable_column = down_cast<NullableColumn*>(dst.get());
DCHECK(nullable_column->mutable_data_column()->is_map());
map_column = down_cast<MapColumn*>(nullable_column->mutable_data_column());
} else {
DCHECK(dst->is_map());
DCHECK(!_field->is_nullable);
map_column = down_cast<MapColumn*>(dst.get());
}
auto& key_column = map_column->keys_column();
auto& value_column = map_column->values_column();
if (_key_reader != nullptr) {
RETURN_IF_ERROR(_key_reader->read_range(range, filter, key_column));
}

if (_value_reader != nullptr) {
RETURN_IF_ERROR(_value_reader->read_range(range, filter, value_column));
}

// if neither key_reader not value_reader is nullptr , check the value_column size is the same with key_column
DCHECK((_key_reader == nullptr) || (_value_reader == nullptr) || (value_column->size() == key_column->size()));

level_t* def_levels = nullptr;
level_t* rep_levels = nullptr;
size_t num_levels = 0;

if (_key_reader != nullptr) {
_key_reader->get_levels(&def_levels, &rep_levels, &num_levels);
} else if (_value_reader != nullptr) {
_value_reader->get_levels(&def_levels, &rep_levels, &num_levels);
} else {
DCHECK(false) << "Unreachable!";
}

auto& offsets = map_column->offsets_column()->get_data();
offsets.resize(num_levels + 1);
NullColumn null_column(num_levels);
auto& is_nulls = null_column.get_data();
size_t num_offsets = 0;
bool has_null = false;

// ParquetFiled Map -> Map<Struct<key,value>>
def_rep_to_offset(_field->level_info, def_levels, rep_levels, num_levels, &offsets[0], &is_nulls[0], &num_offsets,
&has_null);
offsets.resize(num_offsets + 1);
is_nulls.resize(num_offsets);

// fill with default
if (_key_reader == nullptr) {
key_column->append_default(offsets.back());
}
if (_value_reader == nullptr) {
value_column->append_default(offsets.back());
}

if (dst->is_nullable()) {
DCHECK(nullable_column != nullptr);
nullable_column->mutable_null_column()->swap_column(null_column);
nullable_column->set_has_null(has_null);
}

return Status::OK();
}

Status StructColumnReader::read_range(const Range<uint64_t>& range, const Filter* filter, ColumnPtr& dst) {
NullableColumn* nullable_column = nullptr;
StructColumn* struct_column = nullptr;
if (dst->is_nullable()) {
nullable_column = down_cast<NullableColumn*>(dst.get());
DCHECK(nullable_column->mutable_data_column()->is_struct());
struct_column = down_cast<StructColumn*>(nullable_column->mutable_data_column());
} else {
DCHECK(dst->is_struct());
DCHECK(!_field->is_nullable);
struct_column = down_cast<StructColumn*>(dst.get());
}

const auto& field_names = struct_column->field_names();

DCHECK_EQ(field_names.size(), _child_readers.size());

// Fill data for subfield column reader
size_t real_read = 0;
bool first_read = true;
for (size_t i = 0; i < field_names.size(); i++) {
const auto& field_name = field_names[i];
if (LIKELY(_child_readers.find(field_name) != _child_readers.end())) {
if (_child_readers[field_name] != nullptr) {
auto& child_column = struct_column->field_column(field_name);
RETURN_IF_ERROR(_child_readers[field_name]->read_range(range, filter, child_column));
real_read = child_column->size();
first_read = false;
}
} else {
return Status::InternalError(strings::Substitute("there is no match subfield reader for $1", field_name));
}
}

if (UNLIKELY(first_read)) {
return Status::InternalError(
strings::Substitute("All used subfield of struct type $1 is not exist", _field->name));
}

for (size_t i = 0; i < field_names.size(); i++) {
const auto& field_name = field_names[i];
if (_child_readers[field_name] == nullptr) {
Column* child_column = struct_column->field_column(field_name).get();
child_column->append_default(real_read);
}
}

if (dst->is_nullable()) {
DCHECK(nullable_column != nullptr);
size_t row_nums = struct_column->fields_column()[0]->size();
NullColumn null_column(row_nums, 0);
auto& is_nulls = null_column.get_data();
bool has_null = false;
_handle_null_rows(is_nulls.data(), &has_null, row_nums);

nullable_column->mutable_null_column()->swap_column(null_column);
nullable_column->set_has_null(has_null);
}
return Status::OK();
}

bool StructColumnReader::try_to_use_dict_filter(ExprContext* ctx, bool is_decode_needed, const SlotId slotId,
const std::vector<std::string>& sub_field_path, const size_t& layer) {
if (sub_field_path.size() <= layer) {
return false;
}
const std::string& sub_field = sub_field_path[layer];
if (_child_readers.find(sub_field) == _child_readers.end()) {
return false;
}

if (_child_readers[sub_field] == nullptr) {
return false;
}
return _child_readers[sub_field]->try_to_use_dict_filter(ctx, is_decode_needed, slotId, sub_field_path, layer + 1);
}

Status StructColumnReader::filter_dict_column(const ColumnPtr& column, Filter* filter,
const std::vector<std::string>& sub_field_path, const size_t& layer) {
const std::string& sub_field = sub_field_path[layer];
StructColumn* struct_column = nullptr;
if (column->is_nullable()) {
NullableColumn* nullable_column = down_cast<NullableColumn*>(column.get());
DCHECK(nullable_column->mutable_data_column()->is_struct());
struct_column = down_cast<StructColumn*>(nullable_column->mutable_data_column());
} else {
DCHECK(column->is_struct());
DCHECK(!_field->is_nullable);
struct_column = down_cast<StructColumn*>(column.get());
}
return _child_readers[sub_field]->filter_dict_column(struct_column->field_column(sub_field), filter, sub_field_path,
layer + 1);
}

Status StructColumnReader::fill_dst_column(ColumnPtr& dst, const ColumnPtr& src) {
StructColumn* struct_column_src = nullptr;
StructColumn* struct_column_dst = nullptr;
if (src->is_nullable()) {
NullableColumn* nullable_column_src = down_cast<NullableColumn*>(src.get());
DCHECK(nullable_column_src->mutable_data_column()->is_struct());
struct_column_src = down_cast<StructColumn*>(nullable_column_src->mutable_data_column());
NullColumn* null_column_src = nullable_column_src->mutable_null_column();
NullableColumn* nullable_column_dst = down_cast<NullableColumn*>(dst.get());
DCHECK(nullable_column_dst->mutable_data_column()->is_struct());
struct_column_dst = down_cast<StructColumn*>(nullable_column_dst->mutable_data_column());
NullColumn* null_column_dst = nullable_column_dst->mutable_null_column();
null_column_dst->swap_column(*null_column_src);
nullable_column_src->update_has_null();
nullable_column_dst->update_has_null();
} else {
DCHECK(src->is_struct());
DCHECK(dst->is_struct());
DCHECK(!_field->is_nullable);
struct_column_src = down_cast<StructColumn*>(src.get());
struct_column_dst = down_cast<StructColumn*>(dst.get());
}
const auto& field_names = struct_column_dst->field_names();
for (size_t i = 0; i < field_names.size(); i++) {
const auto& field_name = field_names[i];
if (LIKELY(_child_readers.find(field_name) != _child_readers.end())) {
if (_child_readers[field_name] == nullptr) {
struct_column_dst->field_column(field_name)
->swap_column(*(struct_column_src->field_column(field_name)));
} else {
RETURN_IF_ERROR(_child_readers[field_name]->fill_dst_column(
struct_column_dst->field_column(field_name), struct_column_src->field_column(field_name)));
}
} else {
return Status::InternalError(strings::Substitute("there is no match subfield reader for $1", field_name));
}
}
return Status::OK();
}

void StructColumnReader::_handle_null_rows(uint8_t* is_nulls, bool* has_null, size_t num_rows) {
level_t* def_levels = nullptr;
level_t* rep_levels = nullptr;
size_t num_levels = 0;
(*_def_rep_level_child_reader)->get_levels(&def_levels, &rep_levels, &num_levels);

if (def_levels == nullptr) {
// If subfields are required, def_levels is nullptr
*has_null = false;
return;
}

LevelInfo level_info = _field->level_info;

if (rep_levels != nullptr) {
// It's a RepeatedStoredColumnReader
size_t rows = 0;
for (size_t i = 0; i < num_levels; i++) {
if (def_levels[i] < level_info.immediate_repeated_ancestor_def_level ||
rep_levels[i] > level_info.max_rep_level) {
continue;
}

// Start for a new row
if (def_levels[i] >= level_info.max_def_level) {
is_nulls[rows] = 0;
} else {
is_nulls[rows] = 1;
*has_null = true;
}
rows++;
}
DCHECK_EQ(num_rows, rows);
} else {
// For OptionalStoredColumnReader, num_levels is equal to num_rows
DCHECK(num_rows == num_levels);
for (size_t i = 0; i < num_levels; i++) {
if (def_levels[i] >= level_info.max_def_level) {
is_nulls[i] = 0;
} else {
is_nulls[i] = 1;
*has_null = true;
}
}
}
}

} // namespace starrocks::parquet
Loading

0 comments on commit 9ed56be

Please sign in to comment.