Skip to content

Commit

Permalink
ARROW-8658: [C++][Dataset] Implement subtree pruning for FileSystemDa…
Browse files Browse the repository at this point in the history
…taset

TODO:
- [ ] Add benchmarks to be sure this provides an advantage
- [ ] Move Forest to dataset_internal.h or so, since it is not used anywhere else
- [ ] Unit test SubtreeImpl, add more comments

Closes apache#9670 from bkietz/8658-Implement-subtree-pruning

Lead-authored-by: Benjamin Kietzman <[email protected]>
Co-authored-by: David Li <[email protected]>
Signed-off-by: Benjamin Kietzman <[email protected]>
  • Loading branch information
bkietz and lidavidm committed Mar 12, 2021
1 parent ff331f2 commit 26fc751
Show file tree
Hide file tree
Showing 14 changed files with 721 additions and 564 deletions.
1 change: 0 additions & 1 deletion cpp/src/arrow/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -418,7 +418,6 @@ if(ARROW_FILESYSTEM)
filesystem/filesystem.cc
filesystem/localfs.cc
filesystem/mockfs.cc
filesystem/path_forest.cc
filesystem/path_util.cc
filesystem/util_internal.cc)

Expand Down
4 changes: 4 additions & 0 deletions cpp/src/arrow/dataset/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -122,9 +122,13 @@ endif()

if(ARROW_BUILD_BENCHMARKS)
add_arrow_benchmark(expression_benchmark PREFIX "arrow-dataset")
add_arrow_benchmark(file_benchmark PREFIX "arrow-dataset")

if(ARROW_BUILD_STATIC)
target_link_libraries(arrow-dataset-expression-benchmark PUBLIC arrow_dataset_static)
target_link_libraries(arrow-dataset-file-benchmark PUBLIC arrow_dataset_static)
else()
target_link_libraries(arrow-dataset-expression-benchmark PUBLIC arrow_dataset_shared)
target_link_libraries(arrow-dataset-file-benchmark PUBLIC arrow_dataset_shared)
endif()
endif()
115 changes: 115 additions & 0 deletions cpp/src/arrow/dataset/dataset_internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

#include <memory>
#include <string>
#include <unordered_set>
#include <utility>
#include <vector>

Expand All @@ -29,6 +30,7 @@
#include "arrow/scalar.h"
#include "arrow/type.h"
#include "arrow/util/iterator.h"
#include "arrow/util/optional.h"

namespace arrow {
namespace dataset {
Expand Down Expand Up @@ -70,5 +72,118 @@ inline std::shared_ptr<Schema> SchemaFromColumnNames(
return schema(std::move(columns))->WithMetadata(input->metadata());
}

// Helper class for efficiently detecting subtrees given fragment partition expressions.
// Partition expressions are broken into conjunction members and each member dictionary
// encoded to impose a sortable ordering. In addition, subtrees are generated which span
// groups of fragments and nested subtrees. After encoding each fragment is guaranteed to
// be a descendant of at least one subtree. For example, given fragments in a
// HivePartitioning with paths:
//
// /num=0/al=eh/dat.par
// /num=0/al=be/dat.par
// /num=1/al=eh/dat.par
// /num=1/al=be/dat.par
//
// The following subtrees will be introduced:
//
// /num=0/
// /num=0/al=eh/
// /num=0/al=eh/dat.par
// /num=0/al=be/
// /num=0/al=be/dat.par
// /num=1/
// /num=1/al=eh/
// /num=1/al=eh/dat.par
// /num=1/al=be/
// /num=1/al=be/dat.par
struct SubtreeImpl {
// Each unique conjunction member is mapped to an integer.
using expression_code = char32_t;
// Partition expressions are mapped to strings of codes; strings give us lexicographic
// ordering (and potentially useful optimizations).
using expression_codes = std::basic_string<expression_code>;
// An encoded fragment (if fragment_index is set) or subtree.
struct Encoded {
util::optional<int> fragment_index;
expression_codes partition_expression;
};

std::unordered_map<Expression, expression_code, Expression::Hash> expr_to_code_;
std::vector<Expression> code_to_expr_;
std::unordered_set<expression_codes> subtree_exprs_;

// Encode a subexpression (returning the existing code if possible).
expression_code GetOrInsert(const Expression& expr) {
auto next_code = static_cast<int>(expr_to_code_.size());
auto it_success = expr_to_code_.emplace(expr, next_code);

if (it_success.second) {
code_to_expr_.push_back(expr);
}
return it_success.first->second;
}

// Encode an expression (recursively breaking up conjunction members if possible).
void EncodeConjunctionMembers(const Expression& expr, expression_codes* codes) {
if (auto call = expr.call()) {
if (call->function_name == "and_kleene") {
// expr is a conjunction, encode its arguments
EncodeConjunctionMembers(call->arguments[0], codes);
EncodeConjunctionMembers(call->arguments[1], codes);
return;
}
}
// expr is not a conjunction, encode it whole
codes->push_back(GetOrInsert(expr));
}

// Convert an encoded subtree or fragment back into an expression.
Expression GetSubtreeExpression(const Encoded& encoded_subtree) {
// Filters will already be simplified by all of a subtree's ancestors, so
// we only need to simplify the filter by the trailing conjunction member
// of each subtree.
return code_to_expr_[encoded_subtree.partition_expression.back()];
}

// Insert subtrees for each component of an encoded partition expression.
void GenerateSubtrees(expression_codes partition_expression,
std::vector<Encoded>* encoded) {
while (!partition_expression.empty()) {
if (subtree_exprs_.insert(partition_expression).second) {
Encoded encoded_subtree{/*fragment_index=*/util::nullopt, partition_expression};
encoded->push_back(std::move(encoded_subtree));
}
partition_expression.resize(partition_expression.size() - 1);
}
}

// Encode the fragment's partition expression and generate subtrees for it as well.
void EncodeOneFragment(int fragment_index, const Fragment& fragment,
std::vector<Encoded>* encoded) {
Encoded encoded_fragment{fragment_index, {}};

EncodeConjunctionMembers(fragment.partition_expression(),
&encoded_fragment.partition_expression);

GenerateSubtrees(encoded_fragment.partition_expression, encoded);

encoded->push_back(std::move(encoded_fragment));
}

template <typename Fragments>
std::vector<Encoded> EncodeFragments(const Fragments& fragments) {
std::vector<Encoded> encoded;
for (size_t i = 0; i < fragments.size(); ++i) {
EncodeOneFragment(static_cast<int>(i), *fragments[i], &encoded);
}
return encoded;
}
};

inline bool operator==(const SubtreeImpl::Encoded& l, const SubtreeImpl::Encoded& r) {
return l.fragment_index == r.fragment_index &&
l.partition_expression == r.partition_expression;
}

} // namespace dataset
} // namespace arrow
1 change: 0 additions & 1 deletion cpp/src/arrow/dataset/discovery.cc
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
#include "arrow/dataset/file_base.h"
#include "arrow/dataset/partition.h"
#include "arrow/dataset/type_fwd.h"
#include "arrow/filesystem/path_forest.h"
#include "arrow/filesystem/path_util.h"
#include "arrow/util/logging.h"

Expand Down
2 changes: 0 additions & 2 deletions cpp/src/arrow/dataset/expression_benchmark.cc
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,6 @@
// specific language governing permissions and limitations
// under the License.

#include <iostream>

#include "benchmark/benchmark.h"

#include "arrow/compute/cast.h"
Expand Down
112 changes: 90 additions & 22 deletions cpp/src/arrow/dataset/file_base.cc
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
#include <vector>

#include "arrow/dataset/dataset_internal.h"
#include "arrow/dataset/forest_internal.h"
#include "arrow/dataset/scanner.h"
#include "arrow/dataset/scanner_internal.h"
#include "arrow/filesystem/filesystem.h"
Expand All @@ -38,6 +39,7 @@
#include "arrow/util/mutex.h"
#include "arrow/util/string.h"
#include "arrow/util/task_group.h"
#include "arrow/util/variant.h"

namespace arrow {
namespace dataset {
Expand Down Expand Up @@ -82,30 +84,30 @@ Result<ScanTaskIterator> FileFragment::Scan(std::shared_ptr<ScanOptions> options
return format_->ScanFile(std::move(options), std::move(context), self);
}

FileSystemDataset::FileSystemDataset(std::shared_ptr<Schema> schema,
Expression root_partition,
std::shared_ptr<FileFormat> format,
std::shared_ptr<fs::FileSystem> filesystem,
std::vector<std::shared_ptr<FileFragment>> fragments)
: Dataset(std::move(schema), std::move(root_partition)),
format_(std::move(format)),
filesystem_(std::move(filesystem)),
fragments_(std::move(fragments)) {}
struct FileSystemDataset::FragmentSubtrees {
// Forest for skipping fragments based on extracted subtree expressions
Forest forest;
// fragment indices and subtree expressions in forest order
std::vector<util::Variant<int, Expression>> fragments_and_subtrees;
};

Result<std::shared_ptr<FileSystemDataset>> FileSystemDataset::Make(
std::shared_ptr<Schema> schema, Expression root_partition,
std::shared_ptr<FileFormat> format, std::shared_ptr<fs::FileSystem> filesystem,
std::vector<std::shared_ptr<FileFragment>> fragments) {
return std::shared_ptr<FileSystemDataset>(new FileSystemDataset(
std::move(schema), std::move(root_partition), std::move(format),
std::move(filesystem), std::move(fragments)));
std::shared_ptr<FileSystemDataset> out(
new FileSystemDataset(std::move(schema), std::move(root_partition)));
out->format_ = std::move(format);
out->filesystem_ = std::move(filesystem);
out->fragments_ = std::move(fragments);
out->SetupSubtreePruning();
return out;
}

Result<std::shared_ptr<Dataset>> FileSystemDataset::ReplaceSchema(
std::shared_ptr<Schema> schema) const {
RETURN_NOT_OK(CheckProjectable(*schema_, *schema));
return std::shared_ptr<Dataset>(new FileSystemDataset(
std::move(schema), partition_expression_, format_, filesystem_, fragments_));
return Make(std::move(schema), partition_expression_, format_, filesystem_, fragments_);
}

std::vector<std::string> FileSystemDataset::files() const {
Expand Down Expand Up @@ -137,18 +139,84 @@ std::string FileSystemDataset::ToString() const {
return repr;
}

Result<FragmentIterator> FileSystemDataset::GetFragmentsImpl(Expression predicate) {
FragmentVector fragments;
void FileSystemDataset::SetupSubtreePruning() {
subtrees_ = std::make_shared<FragmentSubtrees>();
SubtreeImpl impl;

auto encoded = impl.EncodeFragments(fragments_);

std::sort(encoded.begin(), encoded.end(),
[](const SubtreeImpl::Encoded& l, const SubtreeImpl::Encoded& r) {
const auto cmp = l.partition_expression.compare(r.partition_expression);
if (cmp != 0) {
return cmp < 0;
}
// Equal partition expressions; sort encodings with fragment indices after
// encodings without
return (l.fragment_index ? 1 : 0) < (r.fragment_index ? 1 : 0);
});

for (const auto& e : encoded) {
if (e.fragment_index) {
subtrees_->fragments_and_subtrees.emplace_back(*e.fragment_index);
} else {
subtrees_->fragments_and_subtrees.emplace_back(impl.GetSubtreeExpression(e));
}
}

for (const auto& fragment : fragments_) {
ARROW_ASSIGN_OR_RAISE(
auto simplified,
SimplifyWithGuarantee(predicate, fragment->partition_expression()));
if (simplified.IsSatisfiable()) {
fragments.push_back(fragment);
subtrees_->forest = Forest(static_cast<int>(encoded.size()), [&](int l, int r) {
if (encoded[l].fragment_index) {
// Fragment: not an ancestor.
return false;
}

const auto& ancestor = encoded[l].partition_expression;
const auto& descendant = encoded[r].partition_expression;

if (descendant.size() >= ancestor.size()) {
return std::equal(ancestor.begin(), ancestor.end(), descendant.begin());
}
return false;
});
}

Result<FragmentIterator> FileSystemDataset::GetFragmentsImpl(Expression predicate) {
if (predicate == literal(true)) {
// trivial predicate; skip subtree pruning
return MakeVectorIterator(FragmentVector(fragments_.begin(), fragments_.end()));
}

std::vector<int> fragment_indices;

std::vector<Expression> predicates{predicate};
RETURN_NOT_OK(subtrees_->forest.Visit(
[&](Forest::Ref ref) -> Result<bool> {
if (auto fragment_index =
util::get_if<int>(&subtrees_->fragments_and_subtrees[ref.i])) {
fragment_indices.push_back(*fragment_index);
return false;
}

const auto& subtree_expr =
util::get<Expression>(subtrees_->fragments_and_subtrees[ref.i]);
ARROW_ASSIGN_OR_RAISE(auto simplified,
SimplifyWithGuarantee(predicates.back(), subtree_expr));

if (!simplified.IsSatisfiable()) {
return false;
}

predicates.push_back(std::move(simplified));
return true;
},
[&](Forest::Ref ref) { predicates.pop_back(); }));

std::sort(fragment_indices.begin(), fragment_indices.end());

FragmentVector fragments(fragment_indices.size());
std::transform(fragment_indices.begin(), fragment_indices.end(), fragments.begin(),
[this](int i) { return fragments_[i]; });

return MakeVectorIterator(std::move(fragments));
}

Expand Down
16 changes: 11 additions & 5 deletions cpp/src/arrow/dataset/file_base.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@
#include "arrow/dataset/type_fwd.h"
#include "arrow/dataset/visibility.h"
#include "arrow/filesystem/filesystem.h"
#include "arrow/filesystem/path_forest.h"
#include "arrow/io/file.h"
#include "arrow/util/compression.h"

Expand Down Expand Up @@ -234,16 +233,23 @@ class ARROW_DS_EXPORT FileSystemDataset : public Dataset {
std::string ToString() const;

protected:
struct FragmentSubtrees;

explicit FileSystemDataset(std::shared_ptr<Schema> schema)
: Dataset(std::move(schema)) {}

FileSystemDataset(std::shared_ptr<Schema> schema, Expression partition_expression)
: Dataset(std::move(schema), partition_expression) {}

Result<FragmentIterator> GetFragmentsImpl(Expression predicate) override;

FileSystemDataset(std::shared_ptr<Schema> schema, Expression root_partition,
std::shared_ptr<FileFormat> format,
std::shared_ptr<fs::FileSystem> filesystem,
std::vector<std::shared_ptr<FileFragment>> fragments);
void SetupSubtreePruning();

std::shared_ptr<FileFormat> format_;
std::shared_ptr<fs::FileSystem> filesystem_;
std::vector<std::shared_ptr<FileFragment>> fragments_;

std::shared_ptr<FragmentSubtrees> subtrees_;
};

class ARROW_DS_EXPORT FileWriteOptions {
Expand Down
Loading

0 comments on commit 26fc751

Please sign in to comment.