Skip to content

Commit

Permalink
Add testing of Parquet files from apache/parquet-testing [databricks] (
Browse files Browse the repository at this point in the history
…#8646)

* Add testing of Parquet files from apache/parquet-testing

Signed-off-by: Jason Lowe <[email protected]>

* Fix typo in error string

* Only fail for missing data during precommit runs

* Add force_parquet_testing_tests cmdline option to force parquet-testing tests

* Fix test failures on Databricks

* Exclude parquet-testing submodule from RAT check

---------

Signed-off-by: Jason Lowe <[email protected]>
  • Loading branch information
jlowe authored Jul 24, 2023
1 parent c144858 commit 3a9d757
Show file tree
Hide file tree
Showing 8 changed files with 172 additions and 1 deletion.
3 changes: 3 additions & 0 deletions .gitmodules
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
[submodule "thirdparty/parquet-testing"]
path = thirdparty/parquet-testing
url = https://github.com/apache/parquet-testing
7 changes: 7 additions & 0 deletions CONTRIBUTING.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,13 @@ There are two types of branches in this repository:
is held here. `main` will change with new releases, but otherwise it should not change with
every pull request merged, making it a more stable branch.

## Git Submodules

This repository uses git submodules. The submodules may need to be updated after the repository
is cloned or after moving to a new commit via `git submodule update --init`. See the
[Git documentation on submodules](https://git-scm.com/book/en/v2/Git-Tools-Submodules) for more
information.

## Building From Source

We use [Maven](https://maven.apache.org) for most aspects of the build. We test the build with latest
Expand Down
4 changes: 4 additions & 0 deletions integration_tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,3 +52,7 @@ def pytest_addoption(parser):
"--test_oom_injection_mode", action='store', default="random",
help="in what way, if any, should the tests inject OOMs at test time. Valid options are: random, always, or never"
)
parser.addoption(
"--force_parquet_testing_tests", action="store_true", default=False,
help="if true forces parquet-testing tests to fail if input data cannot be found"
)
9 changes: 8 additions & 1 deletion integration_tests/src/assembly/bin.xml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
Copyright (c) 2020-2021, NVIDIA CORPORATION.
Copyright (c) 2020-2023, NVIDIA CORPORATION.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -59,5 +59,12 @@
<directory>src/test/resources/</directory>
<outputDirectory>integration_tests/src/test/resources</outputDirectory>
</fileSet>
<fileSet>
<directory>${project.basedir}/../thirdparty/parquet-testing</directory>
<outputDirectory>integration_tests/src/test/resources/parquet-testing</outputDirectory>
<includes>
<include>**/*.parquet</include>
</includes>
</fileSet>
</fileSets>
</assembly>
16 changes: 16 additions & 0 deletions integration_tests/src/main/python/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,9 @@ def is_dataproc_runtime():
def is_nightly_run():
return _is_nightly_run

def is_precommit_run():
return _is_precommit_run

def is_at_least_precommit_run():
return _is_nightly_run or _is_precommit_run

Expand All @@ -100,6 +103,11 @@ def skip_unless_precommit_tests(description):
else:
pytest.skip(description)

_is_parquet_testing_tests_forced = False

def is_parquet_testing_tests_forced():
return _is_parquet_testing_tests_forced

_limit = -1

_inject_oom = None
Expand All @@ -117,6 +125,10 @@ def _get_limit_from_mark(mark):
else:
return mark.kwargs.get('num_rows', 100000)

_std_input_path = None
def get_std_input_path():
return _std_input_path

def pytest_runtest_setup(item):
global _sort_on_spark
global _sort_locally
Expand Down Expand Up @@ -209,6 +221,8 @@ def pytest_runtest_setup(item):
def pytest_configure(config):
global _runtime_env
_runtime_env = config.getoption('runtime_env')
global _std_input_path
_std_input_path = config.getoption("std_input_path")
global _is_nightly_run
global _is_precommit_run
test_type = config.getoption('test_type').lower()
Expand All @@ -218,6 +232,8 @@ def pytest_configure(config):
_is_precommit_run = True
elif "developer" != test_type:
raise Exception("not supported test type {}".format(test_type))
global _is_parquet_testing_tests_forced
_is_parquet_testing_tests_forced = config.getoption("force_parquet_testing_tests")

# For OOM injection: we expect a seed to be provided by the environment, or default to 1.
# This is done such that any worker started by the xdist plugin for pytest will
Expand Down
132 changes: 132 additions & 0 deletions integration_tests/src/main/python/parquet_testing_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
# Copyright (c) 2023, NVIDIA CORPORATION.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

# Tests based on the Parquet dataset available at
# https://github.com/apache/parquet-testing

from asserts import assert_gpu_and_cpu_are_equal_collect, assert_gpu_and_cpu_error
from conftest import get_std_input_path, is_parquet_testing_tests_forced, is_precommit_run
from data_gen import copy_and_update
from pathlib import Path
import pytest
import warnings

_rebase_confs = {
"spark.sql.legacy.parquet.datetimeRebaseModeInRead": "CORRECTED",
"spark.sql.legacy.parquet.int96RebaseModeInRead": "CORRECTED"
}
_native_reader_confs = copy_and_update(
_rebase_confs, {"spark.rapids.sql.format.parquet.reader.footer.type": "NATIVE"})
_java_reader_confs = copy_and_update(
_rebase_confs, {"spark.rapids.sql.format.parquet.reader.footer.type": "JAVA"})

# Basenames of Parquet files that are expected to generate an error mapped to the
# error message. Many of these use "Exception" since the error from libcudf does not
# match the error message from Spark, but the important part is that both CPU and GPU
# agree that the file cannot be loaded.
# When the association is a pair rather than a string, it's a way to xfail the test
# by providing the error string and xfail reason.
_error_files = {
"fixed_length_byte_array.parquet": "Exception",
"large_string_map.brotli.parquet": "Exception",
"lz4_raw_compressed.parquet": "Exception",
"lz4_raw_compressed_larger.parquet": "Exception",
"nation.dict-malformed.parquet": ("Exception", "https://github.com/NVIDIA/spark-rapids/issues/8644"),
"non_hadoop_lz4_compressed.parquet": "Exception",
"PARQUET-1481.parquet": "Exception",
}

# Basenames of Parquet files that are expected to fail due to known bugs mapped to the
# xfail reason message.
_xfail_files = {
"byte_array_decimal.parquet": "https://github.com/NVIDIA/spark-rapids/issues/8629",
"datapage_v2.snappy.parquet": "datapage v2 not supported by cudf",
"delta_binary_packed.parquet": "https://github.com/rapidsai/cudf/issues/13501",
"delta_byte_array.parquet": "https://github.com/rapidsai/cudf/issues/13501",
"delta_encoding_optional_column.parquet": "https://github.com/rapidsai/cudf/issues/13501",
"delta_encoding_required_column.parquet": "https://github.com/rapidsai/cudf/issues/13501",
"delta_length_byte_array.parquet": "https://github.com/rapidsai/cudf/issues/13501",
"hadoop_lz4_compressed.parquet": "cudf does not support Hadoop LZ4 format",
"hadoop_lz4_compressed_larger.parquet": "cudf does not support Hadoop LZ4 format",
"nested_structs.rust.parquet": "PySpark cannot handle year 52951",
"repeated_no_annotation.parquet": "https://github.com/NVIDIA/spark-rapids/issues/8631",
"rle_boolean_encoding.parquet": "https://github.com/NVIDIA/spark-rapids/issues/8630",
}

def locate_parquet_testing_files():
"""
Finds the input files by first checking the standard input path,
falling back to the parquet-testing submodule relative to this
script's location.
:param path: standard input path to check
:return: list of input files or empty list if no files found
"""
glob_patterns = ("parquet-testing/data/*.parquet", "parquet-testing/bad_data/*.parquet")
places = []
std_path = get_std_input_path()
if std_path: places.append(Path(std_path))
places.append(Path(__file__).parent.joinpath("../../../../thirdparty").resolve())
for p in places:
files = []
for pattern in glob_patterns:
files += p.glob(pattern)
if files:
return files
locations = ", ".join([ p.joinpath(g).as_posix() for p in places for g in glob_patterns])
# TODO: Also fail for nightly tests when nightly scripts have been updated to initialize
# the git submodules when pulling spark-rapids changes.
# https://github.com/NVIDIA/spark-rapids/issues/8677
if is_precommit_run() or is_parquet_testing_tests_forced():
raise AssertionError("Cannot find parquet-testing data in any of: " + locations)
warnings.warn("Skipping parquet-testing tests. Unable to locate data in any of: " + locations)
return []

def gen_testing_params_for_errors():
result = []
for f in locate_parquet_testing_files():
error_obj = _error_files.get(f.name, None)
if error_obj is not None:
result.append((f.as_posix(), error_obj))
return result

def gen_testing_params_for_valid_files():
files = []
for f in locate_parquet_testing_files():
if f.name in _error_files:
continue
path = f.as_posix()
xfail_reason = _xfail_files.get(f.name, None)
if xfail_reason:
files.append(pytest.param(path, marks=pytest.mark.xfail(reason=xfail_reason)))
else:
files.append(path)
return files

@pytest.mark.parametrize("path", gen_testing_params_for_valid_files())
@pytest.mark.parametrize("confs", [_native_reader_confs, _java_reader_confs])
def test_parquet_testing_valid_files(path, confs):
assert_gpu_and_cpu_are_equal_collect(lambda spark: spark.read.parquet(path), conf=confs)

@pytest.mark.parametrize(("path", "errobj"), gen_testing_params_for_errors())
@pytest.mark.parametrize("confs", [_native_reader_confs, _java_reader_confs])
def test_parquet_testing_error_files(path, errobj, confs):
error_msg = errobj
print("error_msg:", error_msg)
if type(error_msg) != str:
error_msg, xfail_reason = errobj
pytest.xfail(xfail_reason)
assert_gpu_and_cpu_error(
lambda spark: spark.read.parquet(path).collect(),
conf=confs,
error_message=error_msg)
1 change: 1 addition & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -1184,6 +1184,7 @@
default, but there are some projects that are conditionally included. -->
<exclude>**/target/**/*</exclude>
<exclude>**/cufile.log</exclude>
<exclude>thirdparty/parquet-testing/**</exclude>
</excludes>
</configuration>
</plugin>
Expand Down
1 change: 1 addition & 0 deletions thirdparty/parquet-testing
Submodule parquet-testing added at d79a01

0 comments on commit 3a9d757

Please sign in to comment.