Skip to content

Commit

Permalink
ARROW-9424: [C++][Parquet] Disable writing files with LZ4 codec
Browse files Browse the repository at this point in the history
Due to ongoing LZ4 problems with Parquet files, this patch disables writing files with LZ4 codec by throwing a `ParquetException`.

In progress: adding exceptions for pyarrow when using LZ4 to write files and updating relevant pytests

Mailing list discussion: https://mail-archives.apache.org/mod_mbox/arrow-dev/202007.mbox/%3CCAJPUwMCM4ZaJB720%2BuoM1aSA2oD9jSEnzuwWjJiw6vwXxHk7nw%40mail.gmail.com%3E

Jira ticket: https://issues.apache.org/jira/browse/ARROW-9424

Closes apache#7757 from patrickpai/ARROW-9424

Lead-authored-by: Wes McKinney <[email protected]>
Co-authored-by: Patrick Pai <[email protected]>
Signed-off-by: Wes McKinney <[email protected]>
  • Loading branch information
wesm and patrickpai committed Jul 15, 2020
1 parent a0b7f2a commit 3586292
Show file tree
Hide file tree
Showing 9 changed files with 64 additions and 16 deletions.
2 changes: 1 addition & 1 deletion cpp/src/parquet/column_reader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,7 @@ class SerializedPageReader : public PageReader {
InitDecryption();
}
max_page_header_size_ = kDefaultMaxPageHeaderSize;
decompressor_ = GetCodec(codec);
decompressor_ = internal::GetReadCodec(codec);
}

// Implement the PageReader interface
Expand Down
2 changes: 1 addition & 1 deletion cpp/src/parquet/column_writer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@ class SerializedPageWriter : public PageWriter {
if (data_encryptor_ != nullptr || meta_encryptor_ != nullptr) {
InitEncryption();
}
compressor_ = GetCodec(codec, compression_level);
compressor_ = internal::GetWriteCodec(codec, compression_level);
thrift_serializer_.reset(new ThriftSerializer);
}

Expand Down
10 changes: 6 additions & 4 deletions cpp/src/parquet/column_writer_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -488,13 +488,15 @@ TYPED_TEST(TestPrimitiveWriter, RequiredPlainWithStatsAndGzipCompression) {

#ifdef ARROW_WITH_LZ4
TYPED_TEST(TestPrimitiveWriter, RequiredPlainWithLz4Compression) {
this->TestRequiredWithSettings(Encoding::PLAIN, Compression::LZ4, false, false,
LARGE_SIZE);
ASSERT_THROW(this->TestRequiredWithSettings(Encoding::PLAIN, Compression::LZ4, false,
false, LARGE_SIZE),
ParquetException);
}

TYPED_TEST(TestPrimitiveWriter, RequiredPlainWithStatsAndLz4Compression) {
this->TestRequiredWithSettings(Encoding::PLAIN, Compression::LZ4, false, true,
LARGE_SIZE);
ASSERT_THROW(this->TestRequiredWithSettings(Encoding::PLAIN, Compression::LZ4, false,
true, LARGE_SIZE),
ParquetException);
}
#endif

Expand Down
5 changes: 2 additions & 3 deletions cpp/src/parquet/file_deserialize_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -249,9 +249,8 @@ TEST_F(TestPageSerde, Compression) {
codec_types.push_back(Compression::GZIP);
#endif

#ifdef ARROW_WITH_LZ4
codec_types.push_back(Compression::LZ4);
#endif
// TODO: Add LZ4 compression type after PARQUET-1878 is complete.
// Testing for deserializing LZ4 is hard without writing enabled, so it is not included.

#ifdef ARROW_WITH_ZSTD
codec_types.push_back(Compression::ZSTD);
Expand Down
2 changes: 1 addition & 1 deletion cpp/src/parquet/file_serialize_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -309,7 +309,7 @@ TYPED_TEST(TestSerialize, SmallFileGzip) {

#ifdef ARROW_WITH_LZ4
TYPED_TEST(TestSerialize, SmallFileLz4) {
ASSERT_NO_FATAL_FAILURE(this->FileSerializeTest(Compression::LZ4));
ASSERT_THROW(this->FileSerializeTest(Compression::LZ4), ParquetException);
}
#endif

Expand Down
1 change: 1 addition & 0 deletions cpp/src/parquet/thrift_internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ static inline Compression::type FromThriftUnsafe(format::CompressionCodec::type
case format::CompressionCodec::BROTLI:
return Compression::BROTLI;
case format::CompressionCodec::LZ4:
// ARROW-9424: Existing files use LZ4_RAW but this may need to change
return Compression::LZ4;
case format::CompressionCodec::ZSTD:
return Compression::ZSTD;
Expand Down
33 changes: 29 additions & 4 deletions cpp/src/parquet/types.cc
Original file line number Diff line number Diff line change
Expand Up @@ -49,12 +49,19 @@ bool IsCodecSupported(Compression::type codec) {
}
}

std::unique_ptr<Codec> GetCodec(Compression::type codec) {
return GetCodec(codec, Codec::UseDefaultCompressionLevel());
}
namespace internal {

std::unique_ptr<Codec> GetCodec(Compression::type codec, int compression_level) {
std::unique_ptr<Codec> GetCodec(Compression::type codec, int compression_level,
bool for_writing) {
std::unique_ptr<Codec> result;
if (for_writing && (codec == Compression::LZ4 || codec == Compression::LZ4_FRAME)) {
throw ParquetException(
"Per ARROW-9424, writing files with LZ4 compression has been "
"disabled until implementation issues have been resolved. "
"It is recommended to read any existing files and rewrite them "
"using a different compression.");
}

if (!IsCodecSupported(codec)) {
std::stringstream ss;
ss << "Codec type " << Codec::GetCodecAsString(codec)
Expand All @@ -66,6 +73,24 @@ std::unique_ptr<Codec> GetCodec(Compression::type codec, int compression_level)
return result;
}

std::unique_ptr<Codec> GetReadCodec(Compression::type codec) {
return GetCodec(codec, Codec::UseDefaultCompressionLevel(), /*for_writing=*/false);
}

std::unique_ptr<Codec> GetWriteCodec(Compression::type codec, int compression_level) {
return GetCodec(codec, compression_level, /*for_writing=*/true);
}

} // namespace internal

std::unique_ptr<Codec> GetCodec(Compression::type codec, int compression_level) {
return internal::GetCodec(codec, compression_level, /*for_writing=*/false);
}

std::unique_ptr<Codec> GetCodec(Compression::type codec) {
return GetCodec(codec, Codec::UseDefaultCompressionLevel());
}

std::string FormatStatValue(Type::type parquet_type, ::arrow::util::string_view val) {
std::stringstream result;

Expand Down
9 changes: 9 additions & 0 deletions cpp/src/parquet/types.h
Original file line number Diff line number Diff line change
Expand Up @@ -465,6 +465,15 @@ struct Encoding {
PARQUET_EXPORT
bool IsCodecSupported(Compression::type codec);

namespace internal {

// ARROW-9424: Separate functions for reading and writing so we can disable LZ4
// on writing
std::unique_ptr<Codec> GetReadCodec(Compression::type codec);
std::unique_ptr<Codec> GetWriteCodec(Compression::type codec, int compression_level);

} // namespace internal

PARQUET_EXPORT
std::unique_ptr<Codec> GetCodec(Compression::type codec);

Expand Down
16 changes: 14 additions & 2 deletions python/pyarrow/tests/test_parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -698,6 +698,10 @@ def test_pandas_parquet_pyfile_roundtrip(tempdir, use_legacy_dataset):
tm.assert_frame_equal(df, df_read)


# ARROW-9424: LZ4 support is currently disabled
SUPPORTED_COMPRESSIONS = ['NONE', 'SNAPPY', 'GZIP', 'ZSTD']


@pytest.mark.pandas
@parametrize_legacy_dataset
def test_pandas_parquet_configuration_options(tempdir, use_legacy_dataset):
Expand Down Expand Up @@ -735,7 +739,7 @@ def test_pandas_parquet_configuration_options(tempdir, use_legacy_dataset):
df_read = table_read.to_pandas()
tm.assert_frame_equal(df, df_read)

for compression in ['NONE', 'SNAPPY', 'GZIP', 'LZ4', 'ZSTD']:
for compression in SUPPORTED_COMPRESSIONS:
if (compression != 'NONE' and
not pa.lib.Codec.is_available(compression)):
continue
Expand All @@ -747,6 +751,13 @@ def test_pandas_parquet_configuration_options(tempdir, use_legacy_dataset):
tm.assert_frame_equal(df, df_read)


# ARROW-9424: LZ4 support is currently disabled
def test_lz4_compression_disabled():
table = pa.table([pa.array([1, 2, 3, 4, 5])], names=['f0'])
with pytest.raises(IOError):
pq.write_table(table, pa.BufferOutputStream(), compression='lz4')


def make_sample_file(table_or_df):
if isinstance(table_or_df, pa.Table):
a_table = table_or_df
Expand Down Expand Up @@ -828,8 +839,9 @@ def test_compression_level(use_legacy_dataset):
# level.
# GZIP (zlib) allows for specifying a compression level but as of up
# to version 1.2.11 the valid range is [-1, 9].
invalid_combinations = [("snappy", 4), ("lz4", 5), ("gzip", -1337),
invalid_combinations = [("snappy", 4), ("gzip", -1337),
("None", 444), ("lzo", 14)]
# ARROW-9424: lz4 is disabled for now ("lz4", 5),
buf = io.BytesIO()
for (codec, level) in invalid_combinations:
with pytest.raises((ValueError, OSError)):
Expand Down

0 comments on commit 3586292

Please sign in to comment.