Skip to content

Commit

Permalink
ARROW-7951: [Python] Expose BYTE_STREAM_SPLIT in pyarrow
Browse files Browse the repository at this point in the history
This patch exposes the new Parquet encoding in pyarrow.
The decision is to always prefer the dictionary encoding if
it set and to not have the BYTE_STREAM_SPLIT encoding by default.
The reason for this is that the new encoding only improves
compressibility for high-entropy data when combined with a compressor.
Thus, it is useful only for certain cases.

Closes apache#6499 from martinradev/byte_stream_split_submit

Authored-by: Martin Radev <[email protected]>
Signed-off-by: Antoine Pitrou <[email protected]>
  • Loading branch information
martinradev authored and pitrou committed Mar 12, 2020
1 parent b314276 commit 1270034
Show file tree
Hide file tree
Showing 4 changed files with 68 additions and 2 deletions.
5 changes: 5 additions & 0 deletions python/pyarrow/_parquet.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,8 @@ cdef extern from "parquet/api/schema.h" namespace "parquet" nogil:
" parquet::Encoding::DELTA_LENGTH_BYTE_ARRAY"
ParquetEncoding_DELTA_BYTE_ARRAY" parquet::Encoding::DELTA_BYTE_ARRAY"
ParquetEncoding_RLE_DICTIONARY" parquet::Encoding::RLE_DICTIONARY"
ParquetEncoding_BYTE_STREAM_SPLIT \
" parquet::Encoding::BYTE_STREAM_SPLIT"

enum ParquetCompression" parquet::Compression::type":
ParquetCompression_UNCOMPRESSED" parquet::Compression::UNCOMPRESSED"
Expand Down Expand Up @@ -359,6 +361,9 @@ cdef extern from "parquet/api/writer.h" namespace "parquet" nogil:
Builder* enable_statistics()
Builder* enable_statistics(const c_string& path)
Builder* data_pagesize(int64_t size)
Builder* encoding(ParquetEncoding encoding)
Builder* encoding(const c_string& path,
ParquetEncoding encoding)
Builder* write_batch_size(int64_t batch_size)
shared_ptr[WriterProperties] build()

Expand Down
16 changes: 15 additions & 1 deletion python/pyarrow/_parquet.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -938,6 +938,7 @@ cdef encoding_name_from_enum(ParquetEncoding encoding_):
ParquetEncoding_DELTA_LENGTH_BYTE_ARRAY: 'DELTA_LENGTH_BYTE_ARRAY',
ParquetEncoding_DELTA_BYTE_ARRAY: 'DELTA_BYTE_ARRAY',
ParquetEncoding_RLE_DICTIONARY: 'RLE_DICTIONARY',
ParquetEncoding_BYTE_STREAM_SPLIT: 'BYTE_STREAM_SPLIT',
}.get(encoding_, 'UNKNOWN')


Expand Down Expand Up @@ -1201,6 +1202,7 @@ cdef class ParquetWriter:
cdef readonly:
object use_dictionary
object use_deprecated_int96_timestamps
object use_byte_stream_split
object coerce_timestamps
object allow_truncated_timestamps
object compression
Expand All @@ -1218,7 +1220,8 @@ cdef class ParquetWriter:
coerce_timestamps=None,
data_page_size=None,
allow_truncated_timestamps=False,
compression_level=None):
compression_level=None,
use_byte_stream_split=False):
cdef:
shared_ptr[WriterProperties] properties
c_string c_where
Expand All @@ -1243,12 +1246,14 @@ cdef class ParquetWriter:
self.use_deprecated_int96_timestamps = use_deprecated_int96_timestamps
self.coerce_timestamps = coerce_timestamps
self.allow_truncated_timestamps = allow_truncated_timestamps
self.use_byte_stream_split = use_byte_stream_split

cdef WriterProperties.Builder properties_builder
self._set_version(&properties_builder)
self._set_compression_props(&properties_builder)
self._set_dictionary_props(&properties_builder)
self._set_statistics_props(&properties_builder)
self._set_byte_stream_split_props(&properties_builder)

if data_page_size is not None:
properties_builder.data_pagesize(data_page_size)
Expand Down Expand Up @@ -1334,6 +1339,15 @@ cdef class ParquetWriter:
for column in self.use_dictionary:
props.enable_dictionary(column)

cdef void _set_byte_stream_split_props(
self, WriterProperties.Builder* props):
if isinstance(self.use_byte_stream_split, bool):
if self.use_byte_stream_split:
props.encoding(ParquetEncoding_BYTE_STREAM_SPLIT)
elif self.use_byte_stream_split is not None:
for column in self.use_byte_stream_split:
props.encoding(column, ParquetEncoding_BYTE_STREAM_SPLIT)

cdef void _set_statistics_props(self, WriterProperties.Builder* props):
if isinstance(self.write_statistics, bool):
if self.write_statistics:
Expand Down
12 changes: 11 additions & 1 deletion python/pyarrow/parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -391,7 +391,13 @@ def _sanitize_table(table, new_schema, flavor):
meaning for each codec, so you have to read the documentation of the
codec you are using.
An exception is thrown if the compression codec does not allow specifying
a compression level."""
a compression level.
use_byte_stream_split: bool or list, default False
Specify if the byte_stream_split encoding should be used in general or
only for some columns. If both dictionary and byte_stream_stream are
enabled, then dictionary is prefered.
The byte_stream_split encoding is valid only for floating-point data types
and should be combined with a compression codec."""


class ParquetWriter:
Expand Down Expand Up @@ -419,6 +425,7 @@ def __init__(self, where, schema, filesystem=None,
write_statistics=True,
use_deprecated_int96_timestamps=None,
compression_level=None,
use_byte_stream_split=False,
**options):
if use_deprecated_int96_timestamps is None:
# Use int96 timestamps for Spark
Expand Down Expand Up @@ -454,6 +461,7 @@ def __init__(self, where, schema, filesystem=None,
write_statistics=write_statistics,
use_deprecated_int96_timestamps=use_deprecated_int96_timestamps,
compression_level=compression_level,
use_byte_stream_split=use_byte_stream_split,
**options)
self.is_open = True

Expand Down Expand Up @@ -1320,6 +1328,7 @@ def write_table(table, where, row_group_size=None, version='1.0',
data_page_size=None, flavor=None,
filesystem=None,
compression_level=None,
use_byte_stream_split=False,
**kwargs):
row_group_size = kwargs.pop('chunk_size', row_group_size)
use_int96 = use_deprecated_int96_timestamps
Expand All @@ -1337,6 +1346,7 @@ def write_table(table, where, row_group_size=None, version='1.0',
compression=compression,
use_deprecated_int96_timestamps=use_int96,
compression_level=compression_level,
use_byte_stream_split=use_byte_stream_split,
**kwargs) as writer:
writer.write_table(table, row_group_size=row_group_size)
except Exception:
Expand Down
37 changes: 37 additions & 0 deletions python/pyarrow/tests/test_parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -637,6 +637,43 @@ def make_sample_file(table_or_df):
return pq.ParquetFile(buf)


def test_byte_stream_split():
# This is only a smoke test.
arr_float = pa.array(list(map(float, range(100))))
arr_int = pa.array(list(map(int, range(100))))
data_float = [arr_float, arr_float]
table = pa.Table.from_arrays(data_float, names=['a', 'b'])

# Check with byte_stream_split for both columns.
_check_roundtrip(table, expected=table, compression="gzip",
use_dictionary=False, use_byte_stream_split=True)

# Check with byte_stream_split for column 'b' and dictionary
# for column 'a'.
_check_roundtrip(table, expected=table, compression="gzip",
use_dictionary=['a'],
use_byte_stream_split=['b'])

# Check with a collision for both columns.
_check_roundtrip(table, expected=table, compression="gzip",
use_dictionary=['a', 'b'],
use_byte_stream_split=['a', 'b'])

# Check with mixed column types.
mixed_table = pa.Table.from_arrays([arr_float, arr_int],
names=['a', 'b'])
_check_roundtrip(mixed_table, expected=mixed_table,
use_dictionary=['b'],
use_byte_stream_split=['a'])

# Try to use the wrong data type with the byte_stream_split encoding.
# This should throw an exception.
table = pa.Table.from_arrays([arr_int], names=['tmp'])
with pytest.raises(IOError):
_check_roundtrip(table, expected=table, use_byte_stream_split=True,
use_dictionary=False)


def test_compression_level():
arr = pa.array(list(map(int, range(1000))))
data = [arr, arr]
Expand Down

0 comments on commit 1270034

Please sign in to comment.