Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Set write compression codec in Iceberg #24851

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import io.trino.filesystem.Location;
import io.trino.filesystem.TrinoFileSystem;
import io.trino.filesystem.TrinoInputFile;
import io.trino.hive.formats.avro.AvroCompressionKind;
import io.trino.plugin.iceberg.PartitionTransforms.ColumnTransform;
import io.trino.plugin.iceberg.catalog.IcebergTableOperations;
import io.trino.plugin.iceberg.catalog.IcebergTableOperationsProvider;
Expand Down Expand Up @@ -121,6 +122,7 @@
import static io.trino.plugin.iceberg.IcebergErrorCode.ICEBERG_FILESYSTEM_ERROR;
import static io.trino.plugin.iceberg.IcebergErrorCode.ICEBERG_INVALID_METADATA;
import static io.trino.plugin.iceberg.IcebergErrorCode.ICEBERG_INVALID_PARTITION_VALUE;
import static io.trino.plugin.iceberg.IcebergSessionProperties.getCompressionCodec;
import static io.trino.plugin.iceberg.IcebergTableProperties.DATA_LOCATION_PROPERTY;
import static io.trino.plugin.iceberg.IcebergTableProperties.FILE_FORMAT_PROPERTY;
import static io.trino.plugin.iceberg.IcebergTableProperties.FORMAT_VERSION_PROPERTY;
Expand Down Expand Up @@ -173,7 +175,9 @@
import static java.lang.String.format;
import static java.math.RoundingMode.UNNECESSARY;
import static java.util.Comparator.comparing;
import static java.util.Map.entry;
import static java.util.Objects.requireNonNull;
import static org.apache.iceberg.TableProperties.AVRO_COMPRESSION;
import static org.apache.iceberg.TableProperties.COMMIT_NUM_RETRIES;
import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT;
import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT_DEFAULT;
Expand All @@ -182,7 +186,9 @@
import static org.apache.iceberg.TableProperties.OBJECT_STORE_ENABLED_DEFAULT;
import static org.apache.iceberg.TableProperties.ORC_BLOOM_FILTER_COLUMNS;
import static org.apache.iceberg.TableProperties.ORC_BLOOM_FILTER_FPP;
import static org.apache.iceberg.TableProperties.ORC_COMPRESSION;
import static org.apache.iceberg.TableProperties.PARQUET_BLOOM_FILTER_COLUMN_ENABLED_PREFIX;
import static org.apache.iceberg.TableProperties.PARQUET_COMPRESSION;
import static org.apache.iceberg.TableProperties.WRITE_DATA_LOCATION;
import static org.apache.iceberg.TableProperties.WRITE_LOCATION_PROVIDER_IMPL;
import static org.apache.iceberg.types.Type.TypeID.BINARY;
Expand Down Expand Up @@ -514,7 +520,7 @@ private static Stream<Entry<Integer, PrimitiveType>> primitiveFieldTypes(NestedF
{
org.apache.iceberg.types.Type fieldType = nestedField.type();
if (fieldType.isPrimitiveType()) {
return Stream.of(Map.entry(nestedField.fieldId(), fieldType.asPrimitiveType()));
return Stream.of(entry(nestedField.fieldId(), fieldType.asPrimitiveType()));
}

if (fieldType.isNestedType()) {
Expand Down Expand Up @@ -859,19 +865,29 @@ public static Transaction newCreateTableTransaction(TrinoCatalog catalog, Connec
SortOrder sortOrder = parseSortFields(schema, getSortOrder(tableMetadata.getProperties()));

if (replace) {
return catalog.newCreateOrReplaceTableTransaction(session, schemaTableName, schema, partitionSpec, sortOrder, tableLocation, createTableProperties(tableMetadata, allowedExtraProperties));
return catalog.newCreateOrReplaceTableTransaction(
session,
schemaTableName,
schema,
partitionSpec,
sortOrder,
tableLocation,
createTableProperties(session, tableMetadata, allowedExtraProperties));
}
return catalog.newCreateTableTransaction(session, schemaTableName, schema, partitionSpec, sortOrder, Optional.ofNullable(tableLocation), createTableProperties(tableMetadata, allowedExtraProperties));
return catalog.newCreateTableTransaction(session, schemaTableName, schema, partitionSpec, sortOrder, Optional.ofNullable(tableLocation), createTableProperties(session, tableMetadata, allowedExtraProperties));
}

public static Map<String, String> createTableProperties(ConnectorTableMetadata tableMetadata, Predicate<String> allowedExtraProperties)
public static Map<String, String> createTableProperties(ConnectorSession session, ConnectorTableMetadata tableMetadata, Predicate<String> allowedExtraProperties)
{
ImmutableMap.Builder<String, String> propertiesBuilder = ImmutableMap.builder();
IcebergFileFormat fileFormat = IcebergTableProperties.getFileFormat(tableMetadata.getProperties());
propertiesBuilder.put(DEFAULT_FILE_FORMAT, fileFormat.toIceberg().toString());
propertiesBuilder.put(FORMAT_VERSION, Integer.toString(IcebergTableProperties.getFormatVersion(tableMetadata.getProperties())));
propertiesBuilder.put(COMMIT_NUM_RETRIES, Integer.toString(IcebergTableProperties.getMaxCommitRetry(tableMetadata.getProperties())));

resolveWriteCompressionCodecProperty(session, fileFormat)
.ifPresent(writeCodecEntry -> propertiesBuilder.put(writeCodecEntry.getKey(), writeCodecEntry.getValue()));

boolean objectStoreLayoutEnabled = IcebergTableProperties.getObjectStoreLayoutEnabled(tableMetadata.getProperties());
if (objectStoreLayoutEnabled) {
propertiesBuilder.put(OBJECT_STORE_ENABLED, "true");
Expand Down Expand Up @@ -918,6 +934,27 @@ public static Map<String, String> createTableProperties(ConnectorTableMetadata t
.buildOrThrow();
}

private static Optional<Entry<String, String>> resolveWriteCompressionCodecProperty(ConnectorSession session, IcebergFileFormat fileFormat)
{
return switch (fileFormat) {
case PARQUET -> getCompressionCodec(session).getParquetCompressionCodec()
.map(compressionCodec -> entry(PARQUET_COMPRESSION, compressionCodec.name().toLowerCase(Locale.ENGLISH)));
case ORC -> Optional.of(entry(ORC_COMPRESSION, getCompressionCodec(session).getOrcCompressionKind().name().toLowerCase(Locale.ENGLISH)));
case AVRO -> getCompressionCodec(session).getAvroCompressionKind()
.map(compressionCodec -> entry(AVRO_COMPRESSION, convertToIceberg(compressionCodec)));
};
}

public static String convertToIceberg(AvroCompressionKind compressionCodec)
{
return switch (compressionCodec) {
case NULL -> "uncompressed";
case DEFLATE -> "gzip";
case SNAPPY -> "snappy";
case ZSTANDARD -> "zstd";
};
}

public static void verifyExtraProperties(Set<String> basePropertyKeys, Map<String, String> extraProperties, Predicate<String> allowedExtraProperties)
{
Set<String> illegalExtraProperties = ImmutableSet.<String>builder()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -309,7 +309,10 @@ protected Location createMaterializedViewStorage(
Schema schema = schemaFromMetadata(columns);
PartitionSpec partitionSpec = parsePartitionFields(schema, getPartitioning(materializedViewProperties));
SortOrder sortOrder = parseSortFields(schema, getSortOrder(materializedViewProperties));
Map<String, String> properties = createTableProperties(new ConnectorTableMetadata(storageTableName, columns, materializedViewProperties, Optional.empty()), _ -> false);
Map<String, String> properties = createTableProperties(
session,
new ConnectorTableMetadata(storageTableName, columns, materializedViewProperties, Optional.empty()),
_ -> false);

TableMetadata metadata = newTableMetadata(schema, partitionSpec, sortOrder, tableLocation, properties);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5057,19 +5057,27 @@ public void testSplitPruningForFilterOnNonPartitionColumn()
public void testGetIcebergTableProperties()
{
assertUpdate("CREATE TABLE test_iceberg_get_table_props (x BIGINT)");
verifyIcebergTableProperties(computeActual("SELECT * FROM \"test_iceberg_get_table_props$properties\""));
verifyIcebergTableProperties(computeActual("SELECT * FROM \"test_iceberg_get_table_props$properties\""), HiveCompressionCodec.ZSTD);
assertUpdate("DROP TABLE test_iceberg_get_table_props");
}

protected void verifyIcebergTableProperties(MaterializedResult actual)
protected void verifyIcebergTableProperties(MaterializedResult actual, HiveCompressionCodec codec)
{
assertThat(actual).isNotNull();
MaterializedResult expected = resultBuilder(getSession())
String codecName = switch (format) {
case AVRO -> codec.getAvroCompressionKind().map(IcebergUtil::convertToIceberg).orElseThrow();
case ORC -> codec.getOrcCompressionKind().name();
case PARQUET -> codec.getParquetCompressionCodec().orElseThrow().name();
};
MaterializedResult.Builder expected = resultBuilder(getSession())
.row("write.format.default", format.name())
.row("write.parquet.compression-codec", "zstd")
.row("commit.retry.num-retries", "4")
.build();
assertEqualsIgnoreOrder(actual.getMaterializedRows(), expected.getMaterializedRows());
.row("write.%s.compression-codec".formatted(format.name().toLowerCase(ENGLISH)), codecName.toLowerCase(ENGLISH))
.row("commit.retry.num-retries", "4");
if (format != PARQUET) {
// this is incorrectly persisted in Iceberg: https://github.com/trinodb/trino/issues/20401
expected.row("write.parquet.compression-codec", "zstd");
}
assertEqualsIgnoreOrder(actual.getMaterializedRows(), expected.build().getMaterializedRows());
}

@Test
Expand Down Expand Up @@ -8523,6 +8531,9 @@ private void testCreateTableWithCompressionCodec(HiveCompressionCodec compressio
assertUpdate(session, createTableSql, 25);
assertQuery("SELECT * FROM " + tableName, "SELECT * FROM nation");
assertQuery("SELECT count(*) FROM " + tableName, "VALUES 25");
verifyIcebergTableProperties(computeActual("SELECT * FROM \"%s$properties\"".formatted(tableName)), compressionCodec);
assertThat(query("ALTER TABLE " + tableName + " SET PROPERTIES \"write.parquet.compression-codec\" = 'zstd'"))
.failure().hasMessageContaining("Catalog 'iceberg' table property 'write.parquet.compression-codec' does not exist");
assertUpdate("DROP TABLE " + tableName);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,11 @@
*/
package io.trino.plugin.iceberg;

import io.trino.testing.sql.TestTable;
import org.junit.jupiter.api.Test;

import static io.trino.plugin.iceberg.IcebergFileFormat.ORC;
import static org.assertj.core.api.Assertions.assertThat;

public class TestIcebergOrcSystemTables
extends BaseIcebergSystemTables
Expand All @@ -22,4 +26,17 @@ public TestIcebergOrcSystemTables()
{
super(ORC);
}

@Test
public void testPropertiesTable()
{
try (TestTable table = newTrinoTable("test_properties_table", "AS SELECT 1 x")) {
assertThat(query("SELECT * FROM \"%s$properties\"".formatted(table.getName())))
.matches("""
VALUES (VARCHAR 'write.format.default', VARCHAR 'ORC'),
(VARCHAR 'commit.retry.num-retries', VARCHAR '4'),
(VARCHAR 'write.orc.compression-codec', VARCHAR 'zstd'),
(VARCHAR 'write.parquet.compression-codec', VARCHAR 'zstd')""");
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,14 @@
*/
package io.trino.plugin.iceberg;

import io.trino.Session;
import io.trino.testing.sql.TestTable;
import org.apache.parquet.format.CompressionCodec;
import org.junit.jupiter.api.Test;

import static io.trino.plugin.iceberg.IcebergFileFormat.PARQUET;
import static io.trino.testing.TestingNames.randomNameSuffix;
import static org.assertj.core.api.Assertions.assertThat;

public class TestIcebergParquetSystemTables
extends BaseIcebergSystemTables
Expand All @@ -22,4 +29,31 @@ public TestIcebergParquetSystemTables()
{
super(PARQUET);
}

@Test
public void testPropertiesTable()
{
try (TestTable table = newTrinoTable("test_properties_table", "AS SELECT 1 x")) {
assertThat(query("SELECT * FROM \"%s$properties\"".formatted(table.getName())))
.matches("""
VALUES (VARCHAR 'write.format.default', VARCHAR 'PARQUET'),
(VARCHAR 'commit.retry.num-retries', VARCHAR '4'),
(VARCHAR 'write.parquet.compression-codec', VARCHAR 'zstd')""");
}
String tableName = "test_table_codec" + randomNameSuffix();
Session snappySession = Session.builder(getSession())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you can do ALTER TABLE .... SET PROPERTIES... - maybe it is relevant for your test

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you - adding a test case

.setCatalogSessionProperty(getSession().getCatalog().orElseThrow(), "compression_codec", CompressionCodec.SNAPPY.name())
.build();
try {
assertUpdate(snappySession, "CREATE TABLE test_schema.%s (_varchar VARCHAR, _date DATE) WITH (partitioning = ARRAY['_date'])".formatted(tableName));
assertThat(query("SELECT * FROM test_schema.\"%s$properties\"".formatted(tableName)))
.matches("""
VALUES (VARCHAR 'write.format.default', VARCHAR 'PARQUET'),
(VARCHAR 'commit.retry.num-retries', VARCHAR '4'),
(VARCHAR 'write.parquet.compression-codec', VARCHAR 'snappy')""");
}
finally {
assertUpdate("DROP TABLE IF EXISTS test_schema.%s".formatted(tableName));
}
}
}
Loading
Loading