Skip to content

Commit

Permalink
Parquet: Add Bloom filter FPP config (apache#10149)
Browse files Browse the repository at this point in the history
  • Loading branch information
huaxingao authored May 13, 2024
1 parent d0dbc9c commit b623630
Show file tree
Hide file tree
Showing 4 changed files with 67 additions and 1 deletion.
4 changes: 4 additions & 0 deletions core/src/main/java/org/apache/iceberg/TableProperties.java
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,10 @@ private TableProperties() {}
"write.parquet.bloom-filter-max-bytes";
public static final int PARQUET_BLOOM_FILTER_MAX_BYTES_DEFAULT = 1024 * 1024;

public static final String PARQUET_BLOOM_FILTER_COLUMN_FPP_PREFIX =
"write.parquet.bloom-filter-fpp.column.";
public static final double PARQUET_BLOOM_FILTER_COLUMN_FPP_DEFAULT = 0.01;

public static final String PARQUET_BLOOM_FILTER_COLUMN_ENABLED_PREFIX =
"write.parquet.bloom-filter-enabled.column.";

Expand Down
3 changes: 2 additions & 1 deletion docs/docs/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,9 @@ Iceberg tables support table properties to configure table behavior, like the de
| write.parquet.dict-size-bytes | 2097152 (2 MB) | Parquet dictionary page size |
| write.parquet.compression-codec | zstd | Parquet compression codec: zstd, brotli, lz4, gzip, snappy, uncompressed |
| write.parquet.compression-level | null | Parquet compression level |
| write.parquet.bloom-filter-enabled.column.col1 | (not set) | Hint to parquet to write a bloom filter for the column: col1 |
| write.parquet.bloom-filter-enabled.column.col1 | (not set) | Hint to parquet to write a bloom filter for the column: 'col1' |
| write.parquet.bloom-filter-max-bytes | 1048576 (1 MB) | The maximum number of bytes for a bloom filter bitset |
| write.parquet.bloom-filter-fpp.column.col1 | 0.01 | The false positive probability for a bloom filter applied to 'col1' (must > 0.0 and < 1.0) |
| write.avro.compression-codec | gzip | Avro compression codec: gzip(deflate with 9 level), zstd, snappy, uncompressed |
| write.avro.compression-level | null | Avro compression level |
| write.orc.stripe-size-bytes | 67108864 (64 MB) | Define the default ORC stripe size, in bytes |
Expand Down
26 changes: 26 additions & 0 deletions parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import static org.apache.iceberg.TableProperties.DELETE_PARQUET_ROW_GROUP_CHECK_MIN_RECORD_COUNT;
import static org.apache.iceberg.TableProperties.DELETE_PARQUET_ROW_GROUP_SIZE_BYTES;
import static org.apache.iceberg.TableProperties.PARQUET_BLOOM_FILTER_COLUMN_ENABLED_PREFIX;
import static org.apache.iceberg.TableProperties.PARQUET_BLOOM_FILTER_COLUMN_FPP_PREFIX;
import static org.apache.iceberg.TableProperties.PARQUET_BLOOM_FILTER_MAX_BYTES;
import static org.apache.iceberg.TableProperties.PARQUET_BLOOM_FILTER_MAX_BYTES_DEFAULT;
import static org.apache.iceberg.TableProperties.PARQUET_COMPRESSION;
Expand Down Expand Up @@ -284,6 +285,7 @@ public <D> FileAppender<D> build() throws IOException {
int rowGroupCheckMinRecordCount = context.rowGroupCheckMinRecordCount();
int rowGroupCheckMaxRecordCount = context.rowGroupCheckMaxRecordCount();
int bloomFilterMaxBytes = context.bloomFilterMaxBytes();
Map<String, String> columnBloomFilterFpp = context.columnBloomFilterFpp();
Map<String, String> columnBloomFilterEnabled = context.columnBloomFilterEnabled();
boolean dictionaryEnabled = context.dictionaryEnabled();

Expand Down Expand Up @@ -347,6 +349,12 @@ public <D> FileAppender<D> build() throws IOException {
propsBuilder.withBloomFilterEnabled(colPath, Boolean.valueOf(bloomEnabled));
}

for (Map.Entry<String, String> entry : columnBloomFilterFpp.entrySet()) {
String colPath = entry.getKey();
String fpp = entry.getValue();
propsBuilder.withBloomFilterFPP(colPath, Double.parseDouble(fpp));
}

ParquetProperties parquetProperties = propsBuilder.build();

return new org.apache.iceberg.parquet.ParquetWriter<>(
Expand Down Expand Up @@ -384,6 +392,12 @@ public <D> FileAppender<D> build() throws IOException {
parquetWriteBuilder.withBloomFilterEnabled(colPath, Boolean.valueOf(bloomEnabled));
}

for (Map.Entry<String, String> entry : columnBloomFilterFpp.entrySet()) {
String colPath = entry.getKey();
String fpp = entry.getValue();
parquetWriteBuilder.withBloomFilterFPP(colPath, Double.parseDouble(fpp));
}

return new ParquetWriteAdapter<>(parquetWriteBuilder.build(), metricsConfig);
}
}
Expand All @@ -398,6 +412,7 @@ private static class Context {
private final int rowGroupCheckMinRecordCount;
private final int rowGroupCheckMaxRecordCount;
private final int bloomFilterMaxBytes;
private final Map<String, String> columnBloomFilterFpp;
private final Map<String, String> columnBloomFilterEnabled;
private final boolean dictionaryEnabled;

Expand All @@ -411,6 +426,7 @@ private Context(
int rowGroupCheckMinRecordCount,
int rowGroupCheckMaxRecordCount,
int bloomFilterMaxBytes,
Map<String, String> columnBloomFilterFpp,
Map<String, String> columnBloomFilterEnabled,
boolean dictionaryEnabled) {
this.rowGroupSize = rowGroupSize;
Expand All @@ -422,6 +438,7 @@ private Context(
this.rowGroupCheckMinRecordCount = rowGroupCheckMinRecordCount;
this.rowGroupCheckMaxRecordCount = rowGroupCheckMaxRecordCount;
this.bloomFilterMaxBytes = bloomFilterMaxBytes;
this.columnBloomFilterFpp = columnBloomFilterFpp;
this.columnBloomFilterEnabled = columnBloomFilterEnabled;
this.dictionaryEnabled = dictionaryEnabled;
}
Expand Down Expand Up @@ -478,6 +495,9 @@ static Context dataContext(Map<String, String> config) {
config, PARQUET_BLOOM_FILTER_MAX_BYTES, PARQUET_BLOOM_FILTER_MAX_BYTES_DEFAULT);
Preconditions.checkArgument(bloomFilterMaxBytes > 0, "bloom Filter Max Bytes must be > 0");

Map<String, String> columnBloomFilterFpp =
PropertyUtil.propertiesWithPrefix(config, PARQUET_BLOOM_FILTER_COLUMN_FPP_PREFIX);

Map<String, String> columnBloomFilterEnabled =
PropertyUtil.propertiesWithPrefix(config, PARQUET_BLOOM_FILTER_COLUMN_ENABLED_PREFIX);

Expand All @@ -494,6 +514,7 @@ static Context dataContext(Map<String, String> config) {
rowGroupCheckMinRecordCount,
rowGroupCheckMaxRecordCount,
bloomFilterMaxBytes,
columnBloomFilterFpp,
columnBloomFilterEnabled,
dictionaryEnabled);
}
Expand Down Expand Up @@ -562,6 +583,7 @@ static Context deleteContext(Map<String, String> config) {
rowGroupCheckMaxRecordCount,
PARQUET_BLOOM_FILTER_MAX_BYTES_DEFAULT,
ImmutableMap.of(),
ImmutableMap.of(),
dictionaryEnabled);
}

Expand Down Expand Up @@ -609,6 +631,10 @@ int bloomFilterMaxBytes() {
return bloomFilterMaxBytes;
}

Map<String, String> columnBloomFilterFpp() {
return columnBloomFilterFpp;
}

Map<String, String> columnBloomFilterEnabled() {
return columnBloomFilterEnabled;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,28 +18,40 @@
*/
package org.apache.iceberg.spark.data;

import static org.apache.iceberg.TableProperties.PARQUET_BLOOM_FILTER_COLUMN_ENABLED_PREFIX;
import static org.apache.iceberg.TableProperties.PARQUET_BLOOM_FILTER_COLUMN_FPP_PREFIX;
import static org.apache.iceberg.types.Types.NestedField.optional;
import static org.apache.iceberg.types.Types.NestedField.required;
import static org.assertj.core.api.Assertions.assertThat;

import java.io.File;
import java.io.IOException;
import java.lang.reflect.Field;
import java.nio.file.Path;
import java.util.Iterator;
import org.apache.iceberg.Files;
import org.apache.iceberg.Schema;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.io.FileAppender;
import org.apache.iceberg.parquet.Parquet;
import org.apache.iceberg.parquet.ParquetSchemaUtil;
import org.apache.iceberg.spark.SparkSchemaUtil;
import org.apache.iceberg.types.Types;
import org.apache.parquet.column.ColumnDescriptor;
import org.apache.parquet.column.ParquetProperties;
import org.apache.parquet.schema.MessageType;
import org.apache.spark.sql.catalyst.InternalRow;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;

public class TestSparkParquetWriter {
@TempDir private Path temp;

public static final Schema SCHEMA =
new Schema(
Types.NestedField.required(1, "id", Types.IntegerType.get()),
Types.NestedField.required(2, "id_long", Types.LongType.get()));

private static final Schema COMPLEX_SCHEMA =
new Schema(
required(1, "roots", Types.LongType.get()),
Expand Down Expand Up @@ -116,4 +128,27 @@ public void testCorrectness() throws IOException {
assertThat(rows).as("Should not have extra rows").isExhausted();
}
}

@Test
public void testFpp() throws IOException, NoSuchFieldException, IllegalAccessException {
File testFile = File.createTempFile("junit", null, temp.toFile());
try (FileAppender<InternalRow> writer =
Parquet.write(Files.localOutput(testFile))
.schema(SCHEMA)
.set(PARQUET_BLOOM_FILTER_COLUMN_ENABLED_PREFIX + "id", "true")
.set(PARQUET_BLOOM_FILTER_COLUMN_FPP_PREFIX + "id", "0.05")
.createWriterFunc(
msgType ->
SparkParquetWriters.buildWriter(SparkSchemaUtil.convert(SCHEMA), msgType))
.build()) {
// Using reflection to access the private 'props' field in ParquetWriter
Field propsField = writer.getClass().getDeclaredField("props");
propsField.setAccessible(true);
ParquetProperties props = (ParquetProperties) propsField.get(writer);
MessageType parquetSchema = ParquetSchemaUtil.convert(SCHEMA, "test");
ColumnDescriptor descriptor = parquetSchema.getColumnDescription(new String[] {"id"});
double fpp = props.getBloomFilterFPP(descriptor).getAsDouble();
assertThat(fpp).isEqualTo(0.05);
}
}
}

0 comments on commit b623630

Please sign in to comment.