Skip to content

Commit

Permalink
ARROW-8543: [C++] Single pass coalescing algorithm + Rebase
Browse files Browse the repository at this point in the history
Simplify the read range coalescing algorithm by applying more heuristics at once, inside of dividing up work in two functions. This algorithm is shorter in lines of code and hence (hopefully) more maintainable in long term.

Closes apache#7002 from mayuropensource/ARROW-8543__single_pass_coalescing

Lead-authored-by: mayuropensource <[email protected]>
Co-authored-by: Antoine Pitrou <[email protected]>
Signed-off-by: Antoine Pitrou <[email protected]>
  • Loading branch information
mayuropensource and pitrou committed Apr 21, 2020
1 parent e80e243 commit a2d0fb3
Show file tree
Hide file tree
Showing 2 changed files with 46 additions and 51 deletions.
89 changes: 38 additions & 51 deletions cpp/src/arrow/io/interfaces.cc
Original file line number Diff line number Diff line change
Expand Up @@ -327,7 +327,7 @@ namespace {

struct ReadRangeCombiner {
std::vector<ReadRange> Coalesce(std::vector<ReadRange> ranges) {
if (ranges.size() == 0) {
if (ranges.empty()) {
return ranges;
}

Expand All @@ -339,6 +339,11 @@ struct ReadRangeCombiner {
std::sort(ranges.begin(), ranges.end(),
[](const ReadRange& a, const ReadRange& b) { return a.offset < b.offset; });

// Skip further processing if ranges is empty after removing zero-sized ranges.
if (ranges.empty()) {
return ranges;
}

#ifndef NDEBUG
for (size_t i = 0; i < ranges.size() - 1; ++i) {
const auto& left = ranges[i];
Expand All @@ -350,27 +355,41 @@ struct ReadRangeCombiner {

std::vector<ReadRange> coalesced;

// Find some subsets of ranges that we may want to coalesce
auto start = ranges.begin(), prev = start, next = prev;

while (++next != ranges.end()) {
if (next->offset - prev->offset - prev->length > hole_size_limit_) {
// Distance between consecutive ranges is too large, coalesce this subset
// and start a new one
if (next - start > 1) {
CoalesceUntilLargeEnough(start, next, &coalesced);
} else {
coalesced.push_back(*start);
auto itr = ranges.begin();
// Ensure ranges is not empty.
DCHECK_LE(itr, ranges.end());
// Start of the current coalesced range and end (exclusive) of previous range.
// Both are initialized with the start of first range which is a placeholder value.
int64_t coalesced_start = itr->offset;
int64_t prev_range_end = coalesced_start;

for (; itr < ranges.end(); ++itr) {
const int64_t current_range_start = itr->offset;
const int64_t current_range_end = current_range_start + itr->length;
// We don't expect to have 0 sized ranges.
DCHECK_LT(current_range_start, current_range_end);

// At this point, the coalesced range is [coalesced_start, prev_range_end).
// Stop coalescing if:
// - coalesced range is too large, or
// - distance (hole/gap) between consecutive ranges is too large.
if (current_range_end - coalesced_start > range_size_limit_ ||
current_range_start - prev_range_end > hole_size_limit_) {
DCHECK_LE(coalesced_start, prev_range_end);
// Append the coalesced range only if coalesced range size > 0.
if (prev_range_end > coalesced_start) {
coalesced.push_back({coalesced_start, prev_range_end - coalesced_start});
}
start = next;
// Start a new coalesced range.
coalesced_start = current_range_start;
}
prev = next;

// Update the prev_range_end with the current range.
prev_range_end = current_range_end;
}
// Coalesce last subset
if (next - start > 1) {
CoalesceUntilLargeEnough(start, next, &coalesced);
} else {
coalesced.push_back(*start);
// Append the coalesced range only if coalesced range size > 0.
if (prev_range_end > coalesced_start) {
coalesced.push_back({coalesced_start, prev_range_end - coalesced_start});
}

DCHECK_EQ(coalesced.front().offset, ranges.front().offset);
Expand All @@ -379,38 +398,6 @@ struct ReadRangeCombiner {
return coalesced;
}

// Coalesce consecutive pairs of ranges, but only if the resulting range size
// would not exceed range_size_limit.
template <typename ReadRangeIterator>
void CoalesceUntilLargeEnough(ReadRangeIterator begin, ReadRangeIterator end,
std::vector<ReadRange>* out) {
std::list<ReadRange> todo;
std::copy(begin, end, std::back_inserter(todo));

// Iterate over consecutive pairs
auto prev = todo.begin(), next = prev;
while (++next != todo.end()) {
DCHECK_GE(next->offset, prev->offset);
if (CanCoalesce(*prev, *next)) {
next->length = (next->offset - prev->offset) + next->length;
next->offset = prev->offset;
todo.erase(prev); // Keep `next` valid
}
prev = next;
}

const auto out_size = out->size();
out->resize(out_size + todo.size());
std::copy(todo.begin(), todo.end(), &(*out)[out_size]);
}

bool CanCoalesce(const ReadRange& left, const ReadRange& right) {
DCHECK_LE(left.offset, right.offset);
// Ensured by the subset-finding in Coalesce()
DCHECK_LE(right.offset - left.offset - left.length, hole_size_limit_);
return left.length + right.length <= range_size_limit_;
}

const int64_t hole_size_limit_;
const int64_t range_size_limit_;
};
Expand Down
8 changes: 8 additions & 0 deletions cpp/src/arrow/io/memory_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -417,6 +417,14 @@ TEST(CoalesceReadRanges, Basics) {
};

check({}, {});
// Zero sized range that ends up in empty list
check({{110, 0}}, {});
// Combination on 1 zero sized range and 1 non-zero sized range
check({{110, 10}, {120, 0}}, {{110, 10}});
// 1 non-zero sized range
check({{110, 10}}, {{110, 10}});
// No holes + unordered ranges
check({{130, 10}, {110, 10}, {120, 10}}, {{110, 30}});
// No holes
check({{110, 10}, {120, 10}, {130, 10}}, {{110, 30}});
// Small holes only
Expand Down

0 comments on commit a2d0fb3

Please sign in to comment.