Skip to content

Commit

Permalink
apacheGH-35579: [C++] Support non-named FieldRefs in Parquet scanner (a…
Browse files Browse the repository at this point in the history
…pache#35798)

### Rationale for this change

When setting projections/filters for the file system scanner, the Parquet implementation requires that all materialized `FieldRef`s be position-independent (containing only names). However, it may be useful to support index-based field lookups as well - assuming the dataset schema is known.

### What changes are included in this PR?

Adds a translation step for field refs prior to looking them up in the fragment schema. A known dataset schema is required to do this reliably, however (since the fragment schema may be a sub/superset of the dataset schema) - so in the absence of one, we fall back to the existing behavior.

### Are these changes tested?

Yes (tests are included)

### Are there any user-facing changes?

Yes

* Closes: apache#35579

Authored-by: benibus <[email protected]>
Signed-off-by: Weston Pace <[email protected]>
  • Loading branch information
benibus authored Jun 24, 2023
1 parent 10708b3 commit 10eedbe
Show file tree
Hide file tree
Showing 4 changed files with 87 additions and 5 deletions.
30 changes: 29 additions & 1 deletion cpp/src/arrow/dataset/file_parquet.cc
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,28 @@ Status ResolveOneFieldRef(
return Status::OK();
}

// Converts a field ref into a position-independent ref (containing only a sequence of
// names) based on the dataset schema. Returns `false` if no conversion was needed.
Result<FieldRef> MaybeConvertFieldRef(FieldRef ref, const Schema& dataset_schema) {
if (ARROW_PREDICT_TRUE(ref.IsNameSequence())) {
return std::move(ref);
}

ARROW_ASSIGN_OR_RAISE(auto path, ref.FindOne(dataset_schema));
std::vector<FieldRef> named_refs;
named_refs.reserve(path.indices().size());

const FieldVector* child_fields = &dataset_schema.fields();
for (auto index : path) {
const auto& child_field = *(*child_fields)[index];
named_refs.emplace_back(child_field.name());
child_fields = &child_field.type()->fields();
}

return named_refs.size() == 1 ? std::move(named_refs[0])
: FieldRef(std::move(named_refs));
}

// Compute the column projection based on the scan options
Result<std::vector<int>> InferColumnProjection(const parquet::arrow::FileReader& reader,
const ScanOptions& options) {
Expand All @@ -248,7 +270,13 @@ Result<std::vector<int>> InferColumnProjection(const parquet::arrow::FileReader&
}

std::vector<int> columns_selection;
for (const auto& ref : field_refs) {
for (auto& ref : field_refs) {
// In the (unlikely) absence of a known dataset schema, we require that all
// materialized refs are named.
if (options.dataset_schema) {
ARROW_ASSIGN_OR_RAISE(
ref, MaybeConvertFieldRef(std::move(ref), *options.dataset_schema));
}
RETURN_NOT_OK(ResolveOneFieldRef(manifest, ref, field_lookup, duplicate_fields,
&columns_selection));
}
Expand Down
36 changes: 36 additions & 0 deletions cpp/src/arrow/dataset/file_parquet_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -678,6 +678,42 @@ TEST_P(TestParquetFileFormatScan, PredicatePushdownRowGroupFragmentsUsingStringC
CountRowGroupsInFragment(fragment, {0, 3}, equal(field_ref("x"), literal("a")));
}

// Tests projection with nested/indexed FieldRefs.
// https://github.com/apache/arrow/issues/35579
TEST_P(TestParquetFileFormatScan, ProjectWithNonNamedFieldRefs) {
auto table_schema = schema(
{field("info", struct_({field("name", utf8()),
field("data", struct_({field("amount", float64()),
field("percent", float32())}))}))});
auto table = TableFromJSON(table_schema, {R"([
{"info": {"name": "a", "data": {"amount": 10.3, "percent": 0.1}}},
{"info": {"name": "b", "data": {"amount": 11.6, "percent": 0.2}}},
{"info": {"name": "c", "data": {"amount": 12.9, "percent": 0.3}}},
{"info": {"name": "d", "data": {"amount": 14.2, "percent": 0.4}}},
{"info": {"name": "e", "data": {"amount": 15.5, "percent": 0.5}}},
{"info": {"name": "f", "data": {"amount": 16.8, "percent": 0.6}}}])"});
ASSERT_OK_AND_ASSIGN(auto expected_batch, table->CombineChunksToBatch());

TableBatchReader reader(*table);
SetSchema(reader.schema()->fields());

auto source = GetFileSource(&reader);
ASSERT_OK_AND_ASSIGN(auto fragment, format_->MakeFragment(*source));

std::vector<FieldRef> equivalent_refs = {
FieldRef("info", "data", "percent"), FieldRef("info", 1, 1),
FieldRef(0, 1, "percent"), FieldRef(0, 1, 1),
FieldRef(0, FieldRef("data", 1)), FieldRef(FieldRef(0), FieldRef(1, 1)),
};
for (const auto& ref : equivalent_refs) {
ARROW_SCOPED_TRACE("ref = ", ref.ToString());

Project({field_ref(ref)}, {"value"});
auto batch = SingleBatch(fragment);
AssertBatchesEqual(*expected_batch, *batch);
}
}

INSTANTIATE_TEST_SUITE_P(TestScan, TestParquetFileFormatScan,
::testing::ValuesIn(TestFormatParams::Values()),
TestFormatParams::ToTestNameString);
Expand Down
12 changes: 8 additions & 4 deletions cpp/src/arrow/dataset/test_util_internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -516,16 +516,20 @@ class FileFormatFixtureMixin : public ::testing::Test {
SetProjection(opts_.get(), std::move(projection));
}

void Project(std::vector<compute::Expression> exprs, std::vector<std::string> names) {
ASSERT_OK_AND_ASSIGN(auto projection,
ProjectionDescr::FromExpressions(
std::move(exprs), std::move(names), *opts_->dataset_schema));
SetProjection(opts_.get(), std::move(projection));
}

void ProjectNested(std::vector<std::string> names) {
std::vector<compute::Expression> exprs;
for (const auto& name : names) {
ASSERT_OK_AND_ASSIGN(auto ref, FieldRef::FromDotPath(name));
exprs.push_back(field_ref(ref));
}
ASSERT_OK_AND_ASSIGN(
auto descr, ProjectionDescr::FromExpressions(std::move(exprs), std::move(names),
*opts_->dataset_schema));
SetProjection(opts_.get(), std::move(descr));
Project(std::move(exprs), std::move(names));
}

// Shared test cases
Expand Down
14 changes: 14 additions & 0 deletions cpp/src/arrow/type.h
Original file line number Diff line number Diff line change
Expand Up @@ -1823,6 +1823,20 @@ class ARROW_EXPORT FieldRef : public util::EqualityComparable<FieldRef> {
return true;
}

/// \brief Return true if this ref is a name or a nested sequence of only names
///
/// Useful for determining if iteration is possible without recursion or inner loops
bool IsNameSequence() const {
if (IsName()) return true;
if (const auto* nested = nested_refs()) {
for (const auto& ref : *nested) {
if (!ref.IsName()) return false;
}
return !nested->empty();
}
return false;
}

const FieldPath* field_path() const {
return IsFieldPath() ? &std::get<FieldPath>(impl_) : NULLPTR;
}
Expand Down

0 comments on commit 10eedbe

Please sign in to comment.