Skip to content

Commit

Permalink
[fix](block-reader) Make rowsets union iterating work (apache#40877)
Browse files Browse the repository at this point in the history
## Proposed changes

issue: apache#41995

### Fix 5 problems to make union iterating work.

1. Fix the forever true logic when juding rowsets overlapping.

2. Fix collect iterator corruption due to not refreshing ref block
resulting in nullptr.

3. Fix exception raised on row id conversion when doing union iterate
due to block reader's lazy init policy. Init rowid conversion when
really doing init.

4. Fix rowid_conversion test failed because of constructing a non-empty
[0-1] rowset. Just make the overlap judgement compactible with that case
because it does no harm (**_PLZ review this conclusion_**).

5. Make rowid conversion test produce correct non-overlap input data and
cover union reading code path.

### Rename some vars and func names.
### Add vertical compaction config to fuzzy.
  • Loading branch information
TangSiyang2001 authored Oct 22, 2024
1 parent 8653616 commit e5ebbf1
Show file tree
Hide file tree
Showing 12 changed files with 46 additions and 61 deletions.
3 changes: 3 additions & 0 deletions be/src/common/config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1797,6 +1797,9 @@ Status set_fuzzy_configs() {
((distribution(*generator) % 2) == 0) ? "true" : "false";
fuzzy_field_and_value["enable_shrink_memory"] =
((distribution(*generator) % 2) == 0) ? "true" : "false";
fuzzy_field_and_value["enable_vertical_compaction"] =
((distribution(*generator) % 2) == 0) ? "true" : "false";

fuzzy_field_and_value["string_overflow_size"] =
((distribution(*generator) % 2) == 0) ? "10" : "4294967295";

Expand Down
4 changes: 2 additions & 2 deletions be/src/olap/compaction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -100,14 +100,14 @@ bool is_rowset_tidy(std::string& pre_max_key, const RowsetSharedPtr& rhs) {
}
}
std::string min_key;
auto ret = rhs->min_key(&min_key);
auto ret = rhs->first_key(&min_key);
if (!ret) {
return false;
}
if (min_key <= pre_max_key) {
return false;
}
CHECK(rhs->max_key(&pre_max_key));
CHECK(rhs->last_key(&pre_max_key));

return true;
}
Expand Down
27 changes: 5 additions & 22 deletions be/src/olap/merger.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
#include <gen_cpp/olap_file.pb.h>
#include <gen_cpp/types.pb.h>
#include <stddef.h>
#include <unistd.h>

#include <algorithm>
#include <iterator>
Expand Down Expand Up @@ -91,24 +92,15 @@ Status Merger::vmerge_rowsets(BaseTabletSPtr tablet, ReaderType reader_type,

if (stats_output && stats_output->rowid_conversion) {
reader_params.record_rowids = true;
reader_params.rowid_conversion = stats_output->rowid_conversion;
stats_output->rowid_conversion->set_dst_rowset_id(dst_rowset_writer->rowset_id());
}

reader_params.return_columns.resize(cur_tablet_schema.num_columns());
std::iota(reader_params.return_columns.begin(), reader_params.return_columns.end(), 0);
reader_params.origin_return_columns = &reader_params.return_columns;
RETURN_IF_ERROR(reader.init(reader_params));

if (reader_params.record_rowids) {
stats_output->rowid_conversion->set_dst_rowset_id(dst_rowset_writer->rowset_id());
// init segment rowid map for rowid conversion
std::vector<uint32_t> segment_num_rows;
for (auto& rs_split : reader_params.rs_splits) {
RETURN_IF_ERROR(rs_split.rs_reader->get_segment_num_rows(&segment_num_rows));
stats_output->rowid_conversion->init_segment_map(
rs_split.rs_reader->rowset()->rowset_id(), segment_num_rows);
}
}

vectorized::Block block = cur_tablet_schema.create_block(reader_params.return_columns);
size_t output_rows = 0;
bool eof = false;
Expand Down Expand Up @@ -274,24 +266,15 @@ Status Merger::vertical_compact_one_group(

if (is_key && stats_output && stats_output->rowid_conversion) {
reader_params.record_rowids = true;
reader_params.rowid_conversion = stats_output->rowid_conversion;
stats_output->rowid_conversion->set_dst_rowset_id(dst_rowset_writer->rowset_id());
}

reader_params.return_columns = column_group;
reader_params.origin_return_columns = &reader_params.return_columns;
reader_params.batch_size = batch_size;
RETURN_IF_ERROR(reader.init(reader_params, sample_info));

if (reader_params.record_rowids) {
stats_output->rowid_conversion->set_dst_rowset_id(dst_rowset_writer->rowset_id());
// init segment rowid map for rowid conversion
std::vector<uint32_t> segment_num_rows;
for (auto& rs_split : reader_params.rs_splits) {
RETURN_IF_ERROR(rs_split.rs_reader->get_segment_num_rows(&segment_num_rows));
stats_output->rowid_conversion->init_segment_map(
rs_split.rs_reader->rowset()->rowset_id(), segment_num_rows);
}
}

vectorized::Block block = tablet_schema.create_block(reader_params.return_columns);
size_t output_rows = 0;
bool eof = false;
Expand Down
6 changes: 6 additions & 0 deletions be/src/olap/rowset/beta_rowset_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,12 @@ Status BetaRowsetReader::get_segment_iterators(RowsetReaderContext* read_context
for (size_t i = 0; i < segments.size(); i++) {
_segments_rows[i] = segments[i]->num_rows();
}
if (_read_context->record_rowids) {
// init segment rowid map for rowid conversion
std::vector<uint32_t> segment_num_rows;
RETURN_IF_ERROR(get_segment_num_rows(&segment_num_rows));
_read_context->rowid_conversion->init_segment_map(rowset()->rowset_id(), segment_num_rows);
}

auto [seg_start, seg_end] = _segment_offsets;
if (seg_start == seg_end) {
Expand Down
8 changes: 6 additions & 2 deletions be/src/olap/rowset/rowset.h
Original file line number Diff line number Diff line change
Expand Up @@ -269,7 +269,9 @@ class Rowset : public std::enable_shared_from_this<Rowset> {
_rowset_meta->get_segments_key_bounds(segments_key_bounds);
return Status::OK();
}
bool min_key(std::string* min_key) {

// min key of the first segment
bool first_key(std::string* min_key) {
KeyBoundsPB key_bounds;
bool ret = _rowset_meta->get_first_segment_key_bound(&key_bounds);
if (!ret) {
Expand All @@ -278,7 +280,9 @@ class Rowset : public std::enable_shared_from_this<Rowset> {
*min_key = key_bounds.min_key();
return true;
}
bool max_key(std::string* max_key) {

// max key of the last segment
bool last_key(std::string* max_key) {
KeyBoundsPB key_bounds;
bool ret = _rowset_meta->get_last_segment_key_bound(&key_bounds);
if (!ret) {
Expand Down
2 changes: 2 additions & 0 deletions be/src/olap/rowset/rowset_reader_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
#include "io/io_common.h"
#include "olap/column_predicate.h"
#include "olap/olap_common.h"
#include "olap/rowid_conversion.h"
#include "runtime/runtime_state.h"
#include "vec/exprs/vexpr.h"
#include "vec/exprs/vexpr_context.h"
Expand Down Expand Up @@ -75,6 +76,7 @@ struct RowsetReaderContext {
bool enable_unique_key_merge_on_write = false;
const DeleteBitmap* delete_bitmap = nullptr;
bool record_rowids = false;
RowIdConversion* rowid_conversion;
bool is_vertical_compaction = false;
bool is_key_column_group = false;
const std::set<int32_t>* output_columns = nullptr;
Expand Down
1 change: 1 addition & 0 deletions be/src/olap/tablet_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -254,6 +254,7 @@ Status TabletReader::_capture_rs_readers(const ReaderParams& read_params) {
_reader_context.delete_bitmap = read_params.delete_bitmap;
_reader_context.enable_unique_key_merge_on_write = tablet()->enable_unique_key_merge_on_write();
_reader_context.record_rowids = read_params.record_rowids;
_reader_context.rowid_conversion = read_params.rowid_conversion;
_reader_context.is_key_column_group = read_params.is_key_column_group;
_reader_context.remaining_conjunct_roots = read_params.remaining_conjunct_roots;
_reader_context.common_expr_ctxs_push_down = read_params.common_expr_ctxs_push_down;
Expand Down
2 changes: 2 additions & 0 deletions be/src/olap/tablet_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
#include "olap/olap_common.h"
#include "olap/olap_tuple.h"
#include "olap/row_cursor.h"
#include "olap/rowid_conversion.h"
#include "olap/rowset/rowset.h"
#include "olap/rowset/rowset_meta.h"
#include "olap/rowset/rowset_reader.h"
Expand Down Expand Up @@ -166,6 +167,7 @@ class TabletReader {

// used for compaction to record row ids
bool record_rowids = false;
RowIdConversion* rowid_conversion;
std::vector<int> topn_filter_source_node_ids;
int topn_filter_target_node_id = -1;
// used for special optimization for query : ORDER BY key LIMIT n
Expand Down
43 changes: 10 additions & 33 deletions be/src/vec/olap/block_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -72,54 +72,31 @@ Status BlockReader::next_block_with_aggregation(Block* block, bool* eof) {
return res;
}

bool BlockReader::_rowsets_overlapping(const ReaderParams& read_params) {
std::string cur_max_key;
bool BlockReader::_rowsets_mono_asc_disjoint(const ReaderParams& read_params) {
std::string cur_rs_last_key;
const std::vector<RowSetSplits>& rs_splits = read_params.rs_splits;
for (const auto& rs_split : rs_splits) {
// version 0-1 of every tablet is empty, just skip this rowset
if (rs_split.rs_reader->rowset()->version().second == 1) {
continue;
}
if (rs_split.rs_reader->rowset()->num_rows() == 0) {
continue;
}
if (rs_split.rs_reader->rowset()->is_segments_overlapping()) {
return true;
}
std::string min_key;
bool has_min_key = rs_split.rs_reader->rowset()->min_key(&min_key);
if (!has_min_key) {
std::string rs_first_key;
bool has_first_key = rs_split.rs_reader->rowset()->first_key(&rs_first_key);
if (!has_first_key) {
return true;
}
if (min_key <= cur_max_key) {
if (rs_first_key <= cur_rs_last_key) {
return true;
}
CHECK(rs_split.rs_reader->rowset()->max_key(&cur_max_key));
bool has_last_key = rs_split.rs_reader->rowset()->last_key(&cur_rs_last_key);
CHECK(has_last_key);
}

for (const auto& rs_reader : rs_splits) {
// version 0-1 of every tablet is empty, just skip this rowset
if (rs_reader.rs_reader->rowset()->version().second == 1) {
continue;
}
if (rs_reader.rs_reader->rowset()->num_rows() == 0) {
continue;
}
if (rs_reader.rs_reader->rowset()->is_segments_overlapping()) {
return true;
}
std::string min_key;
bool has_min_key = rs_reader.rs_reader->rowset()->min_key(&min_key);
if (!has_min_key) {
return true;
}
if (min_key <= cur_max_key) {
return true;
}
CHECK(rs_reader.rs_reader->rowset()->max_key(&cur_max_key));
}
return false;
}

Status BlockReader::_init_collect_iter(const ReaderParams& read_params) {
auto res = _capture_rs_readers(read_params);
if (!res.ok()) {
Expand All @@ -131,7 +108,7 @@ Status BlockReader::_init_collect_iter(const ReaderParams& read_params) {
return res;
}
// check if rowsets are noneoverlapping
_is_rowsets_overlapping = _rowsets_overlapping(read_params);
_is_rowsets_overlapping = _rowsets_mono_asc_disjoint(read_params);
_vcollect_iter.init(this, _is_rowsets_overlapping, read_params.read_orderby_key,
read_params.read_orderby_key_reverse);

Expand Down
3 changes: 2 additions & 1 deletion be/src/vec/olap/block_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,8 @@ class BlockReader final : public TabletReader {

bool _get_next_row_same();

bool _rowsets_overlapping(const ReaderParams& read_params);
// return true if keys of rowsets are mono ascending and disjoint
bool _rowsets_mono_asc_disjoint(const ReaderParams& read_params);

VCollectIterator _vcollect_iter;
IteratorRowRef _next_row {{}, -1, false};
Expand Down
1 change: 1 addition & 0 deletions be/src/vec/olap/vcollect_iterator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -499,6 +499,7 @@ Status VCollectIterator::Level0Iterator::refresh_current_row() {
if (_block == nullptr && !_get_data_by_ref) {
_block = std::make_shared<Block>(_schema.create_block(
_reader->_return_columns, _reader->_tablet_columns_convert_to_null_set));
_ref.block = _block;
}

if (!_is_empty() && _current_valid()) {
Expand Down
7 changes: 6 additions & 1 deletion be/test/olap/rowid_conversion_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -447,7 +447,12 @@ class TestRowIdConversion : public testing::TestWithParam<std::tuple<KeysType, b
int64_t c1 = j * rows_per_segment + n;
// There are 500 rows of data overlap between rowsets
if (i > 0) {
c1 += i * num_segments * rows_per_segment - 500;
if (is_overlap) {
// There are 500 rows of data overlap between rowsets
c1 -= 500;
} else {
++c1;
}
}
if (is_overlap && j > 0) {
// There are 10 rows of data overlap between segments
Expand Down

0 comments on commit e5ebbf1

Please sign in to comment.