Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Set write compression codec in Iceberg #24851

Open
wants to merge 1 commit into
base: master
Choose a base branch
from

Conversation

SemionPar
Copy link
Contributor

@SemionPar SemionPar commented Jan 30, 2025

Description

Set currently used write compression codec in Iceberg table properties.
Compression codec values are set as spelled by the spec - details in #24851 (comment)

Additional context and related issues

https://github.com/apache/iceberg/pull/8593/files#diff-c540a31e66b157a8f080433c82a29a070096d0e08c6578a0099153f1229bdb7aR93-R96 is setting default when no property is provided, even if file type != parquet: apache/iceberg#9490

Release notes

( ) This is not user-visible or is docs only, and no release notes are required.
( ) Release notes are required. Please propose a release note for me.
( ) Release notes are required, with the following suggested text:

## Section
* Fix some things. ({issue}`issuenumber`)

@cla-bot cla-bot bot added the cla-signed label Jan 30, 2025
@github-actions github-actions bot added the iceberg Iceberg connector label Jan 30, 2025
@SemionPar SemionPar force-pushed the semionpar/iceberg-metadata-compression-codec branch from bab1d84 to b743d37 Compare February 3, 2025 14:28
@SemionPar SemionPar requested a review from ebyhr February 3, 2025 14:29
propertiesBuilder.put(COMMIT_NUM_RETRIES, Integer.toString(IcebergTableProperties.getMaxCommitRetry(tableMetadata.getProperties())));

switch (fileFormat) {
case PARQUET -> getCompressionCodec(session).getParquetCompressionCodec()
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @ebyhr, do you think we should introduce 3 matching properties for write.[parquet|avro|orc].compression-codec in IcebergTableProperties instead? This seems to be a more idiomatic way of doing this now that I looked here again

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think so.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Spent some time on this with @pajaks today. A better-informed remark on this:

  • since we already have compression-codec as a session property, it would make it difficult to support it in IcebergTableProperties as a full-fledged table property (meaning: adding it to ALTER TABLE table_name SET PROPERTIES compression-codec = 'xxx'; - no other Iceberg table property has a corresponding session property and it is for a reason, although I don't think I grasp it in its entirety just yet

@ebyhr ebyhr mentioned this pull request Feb 4, 2025
@ebyhr ebyhr force-pushed the semionpar/iceberg-metadata-compression-codec branch from b743d37 to a64427d Compare February 4, 2025 03:03
@SemionPar SemionPar force-pushed the semionpar/iceberg-metadata-compression-codec branch 2 times, most recently from e9d8deb to e7b1327 Compare February 18, 2025 17:57
@SemionPar SemionPar marked this pull request as ready for review February 18, 2025 17:58
@SemionPar SemionPar changed the title [wip] Set parquet compression in Iceberg metadata Set parquet compression in Iceberg metadata Feb 18, 2025
@SemionPar
Copy link
Contributor Author

SemionPar commented Feb 20, 2025

Looks like spark is having trouble inserting rows into a table for 'AVRO - ZSTD' pair when write.avro.compression-codec=zstd table property is set:

org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 2.0 failed 1 times, most recent failure: Lost task 0.0 in stage 2.0 (TID 2) (spark executor driver): java.lang.IllegalArgumentException: Unsupported compression codec: zstandard
at org.apache.iceberg.avro.Avro$WriteBuilder$Context.toCodec(Avro.java:259)
at org.apache.iceberg.avro.Avro$WriteBuilder$Context.dataContext(Avro.java:217)
at org.apache.iceberg.avro.Avro$WriteBuilder.build(Avro.java:192)
at org.apache.iceberg.avro.Avro$DataWriteBuilder.build(Avro.java:371)
at org.apache.iceberg.data.BaseFileWriterFactory.newDataWriter(BaseFileWriterFactory.java:115)
at org.apache.iceberg.io.RollingDataWriter.newWriter(RollingDataWriter.java:52)
at org.apache.iceberg.io.RollingDataWriter.newWriter(RollingDataWriter.java:32)
at org.apache.iceberg.io.RollingFileWriter.openCurrentWriter(RollingFileWriter.java:108)
at org.apache.iceberg.io.RollingDataWriter.<init>(RollingDataWriter.java:47)
at org.apache.iceberg.spark.source.SparkWrite$UnpartitionedDataWriter.<init>(SparkWrite.java:701)
at org.apache.iceberg.spark.source.SparkWrite$WriterFactory.createWriter(SparkWrite.java:675)
at org.apache.iceberg.spark.source.SparkWrite$WriterFactory.createWriter(SparkWrite.java:652)
at org.apache.spark.sql.execution.datasources.v2.WritingSparkTask.run(WriteToDataSourceV2Exec.scala:459)
at org.apache.spark.sql.execution.datasources.v2.WritingSparkTask.run$(WriteToDataSourceV2Exec.scala:448)
at org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$.run(WriteToDataSourceV2Exec.scala:514)
at org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec.$anonfun$writeWithV2$2(WriteToDataSourceV2Exec.scala:411)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:92)
at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:161)
at org.apache.spark.scheduler.Task.run(Task.scala:139)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:554)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1529)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:557)

This is the ghcr.io/trinodb/testing/spark3-iceberg:108, image history:

➜  git docker history ghcr.io/trinodb/testing/spark3-iceberg:108
IMAGE          CREATED        CREATED BY                                      SIZE      COMMENT
<missing>      6 weeks ago    RUN |6 SPARK_VERSION=3.4.2 HADOOP_VERSION=3 …   430MB     buildkit.dockerfile.v0
<missing>      6 weeks ago    ARG SPARK_ARTIFACT=spark-3.4.2-bin-hadoop3      0B        buildkit.dockerfile.v0
<missing>      6 weeks ago    ARG POSTGRESQL_JAR_VERSION=42.3.5               0B        buildkit.dockerfile.v0
<missing>      6 weeks ago    ARG ICEBERG_JAR_VERSION=3.4_2.12                0B        buildkit.dockerfile.v0
<missing>      6 weeks ago    ARG ICEBERG_VERSION=1.7.0                       0B        buildkit.dockerfile.v0
<missing>      6 weeks ago    ARG HADOOP_VERSION=3                            0B        buildkit.dockerfile.v0
<missing>      6 weeks ago    ARG SPARK_VERSION=3.4.2                         0B        buildkit.dockerfile.v0
(...)

Codec name should be zstd according to the spec:

write.avro.compression-codec | gzip | Avro compression codec: gzip(deflate with 9 level), zstd, snappy, uncompressed

This goes to show that spark does read write.*.compression-codec table property when writing - Trino does not.

(VARCHAR 'write.parquet.compression-codec', VARCHAR 'zstd')""");
}
String tableName = "test_table_codec" + randomNameSuffix();
Session snappySession = Session.builder(getSession())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you can do ALTER TABLE .... SET PROPERTIES... - maybe it is relevant for your test

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you - adding a test case

@SemionPar
Copy link
Contributor Author

Write codec values via Iceberg spec:

write.avro.compression-codec | gzip | Avro compression codec: gzip(deflate with 9 level), zstd, snappy, uncompressed
write.orc.compression-codec | zlib | ORC compression codec: zstd, lz4, lzo, zlib, snappy, none
write.parquet.compression-codec | zstd | Parquet compression codec: zstd, brotli, lz4, gzip, snappy, uncompressed

What is set after table is created with Trino, with this commit (looks good now, apart from excess "write.parquet.compression-codec", "zstd" which is a result of apache/iceberg#9490):

storageFormat: PARQUET
compressionCodec: NONE
properties: [write.format.default|PARQUET|, write.parquet.compression-codec|uncompressed|, commit.retry.num-retries|4|]

storageFormat: PARQUET
compressionCodec: SNAPPY
properties: [write.format.default|PARQUET|, write.parquet.compression-codec|snappy|, commit.retry.num-retries|4|]

storageFormat: PARQUET
compressionCodec: ZSTD
properties: [write.format.default|PARQUET|, write.parquet.compression-codec|zstd|, commit.retry.num-retries|4|]

storageFormat: PARQUET
compressionCodec: GZIP
properties: [write.format.default|PARQUET|, write.parquet.compression-codec|gzip|, commit.retry.num-retries|4|]

storageFormat: ORC
compressionCodec: NONE
properties: [write.orc.compression-codec|none|, commit.retry.num-retries|4|, write.format.default|ORC|, write.parquet.compression-codec|zstd|]

storageFormat: ORC
compressionCodec: SNAPPY
properties: [write.orc.compression-codec|snappy|, commit.retry.num-retries|4|, write.format.default|ORC|, write.parquet.compression-codec|zstd|]

storageFormat: ORC
compressionCodec: LZ4
properties: [write.orc.compression-codec|lz4|, commit.retry.num-retries|4|, write.format.default|ORC|, write.parquet.compression-codec|zstd|]

storageFormat: ORC
compressionCodec: ZSTD
properties: [write.orc.compression-codec|zstd|, commit.retry.num-retries|4|, write.format.default|ORC|, write.parquet.compression-codec|zstd|]

storageFormat: ORC
compressionCodec: GZIP
properties: [write.orc.compression-codec|zlib|, commit.retry.num-retries|4|, write.format.default|ORC|, write.parquet.compression-codec|zstd|]

storageFormat: AVRO
compressionCodec: NONE
properties: [commit.retry.num-retries|4|, write.avro.compression-codec|uncompressed|, write.format.default|AVRO|, write.parquet.compression-codec|zstd|]

storageFormat: AVRO
compressionCodec: SNAPPY
properties: [commit.retry.num-retries|4|, write.avro.compression-codec|snappy|, write.format.default|AVRO|, write.parquet.compression-codec|zstd|]

storageFormat: AVRO
compressionCodec: ZSTD
properties: [commit.retry.num-retries|4|, write.avro.compression-codec|zstd|, write.format.default|AVRO|, write.parquet.compression-codec|zstd|]

storageFormat: AVRO
compressionCodec: GZIP
properties: [commit.retry.num-retries|4|, write.avro.compression-codec|gzip|, write.format.default|AVRO|, write.parquet.compression-codec|zstd|]

@SemionPar SemionPar changed the title Set parquet compression in Iceberg metadata Set write compression codec in Iceberg metadata Feb 21, 2025
@SemionPar SemionPar changed the title Set write compression codec in Iceberg metadata Set write compression codec in Iceberg Feb 21, 2025
@SemionPar SemionPar force-pushed the semionpar/iceberg-metadata-compression-codec branch from 6b83fd8 to 76e3ce4 Compare February 21, 2025 10:42
@SemionPar
Copy link
Contributor Author

Fixed CI and rebased PR - @ebyhr PTAL

@SemionPar SemionPar force-pushed the semionpar/iceberg-metadata-compression-codec branch 3 times, most recently from 7b11fd3 to 971fa18 Compare February 24, 2025 15:44
@SemionPar SemionPar force-pushed the semionpar/iceberg-metadata-compression-codec branch from 971fa18 to bf5da75 Compare February 25, 2025 08:59
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
cla-signed iceberg Iceberg connector
Development

Successfully merging this pull request may close these issues.

None yet

4 participants