Skip to content

Commit

Permalink
Spark 3.1, 3.2, 3.3: Backport removal of snapshot-property in CommitM…
Browse files Browse the repository at this point in the history
…etadata properties (apache#7991)
  • Loading branch information
nastra authored Jul 11, 2023
1 parent 62cb7b5 commit 9607a52
Show file tree
Hide file tree
Showing 6 changed files with 103 additions and 33 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,9 @@

import java.util.Map;
import java.util.concurrent.Callable;
import org.apache.iceberg.SnapshotSummary;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.util.ExceptionUtil;

/** utility class to accept thread local commit properties */
Expand All @@ -35,13 +37,19 @@ private CommitMetadata() {}
* running the code wrapped as a caller, and any snapshot committed within the callable object
* will be attached with the metadata defined in properties
*
* @param properties extra commit metadata to attach to the snapshot committed within callable
* @param properties extra commit metadata to attach to the snapshot committed within callable.
* The prefix will be removed for properties starting with {@link
* SnapshotSummary#EXTRA_METADATA_PREFIX}
* @param callable the code to be executed
* @param exClass the expected type of exception which would be thrown from callable
*/
public static <R, E extends Exception> R withCommitProperties(
Map<String, String> properties, Callable<R> callable, Class<E> exClass) throws E {
COMMIT_PROPERTIES.set(properties);
Map<String, String> props = Maps.newHashMap();
properties.forEach(
(k, v) -> props.put(k.replace(SnapshotSummary.EXTRA_METADATA_PREFIX, ""), v));

COMMIT_PROPERTIES.set(props);
try {
return callable.call();
} catch (Throwable e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import java.math.RoundingMode;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.hadoop.conf.Configuration;
import org.apache.iceberg.AssertHelpers;
import org.apache.iceberg.DataFile;
Expand All @@ -34,13 +33,14 @@
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
import org.apache.iceberg.Snapshot;
import org.apache.iceberg.SnapshotSummary;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableProperties;
import org.apache.iceberg.hadoop.HadoopTables;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.iceberg.relocated.com.google.common.math.LongMath;
import org.apache.iceberg.spark.CommitMetadata;
import org.apache.iceberg.spark.SparkReadOptions;
Expand All @@ -52,6 +52,7 @@
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession;
import org.assertj.core.api.Assertions;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
Expand Down Expand Up @@ -427,8 +428,14 @@ public void testExtraSnapshotMetadataWithSQL() throws InterruptedException, IOEx
Thread writerThread =
new Thread(
() -> {
Map<String, String> properties = Maps.newHashMap();
properties.put("writer-thread", String.valueOf(Thread.currentThread().getName()));
Map<String, String> properties =
ImmutableMap.of(
"writer-thread",
String.valueOf(Thread.currentThread().getName()),
SnapshotSummary.EXTRA_METADATA_PREFIX + "extra-key",
"someValue",
SnapshotSummary.EXTRA_METADATA_PREFIX + "another-key",
"anotherValue");
CommitMetadata.withCommitProperties(
properties,
() -> {
Expand All @@ -440,12 +447,13 @@ public void testExtraSnapshotMetadataWithSQL() throws InterruptedException, IOEx
writerThread.setName("test-extra-commit-message-writer-thread");
writerThread.start();
writerThread.join();
Set<String> threadNames = Sets.newHashSet();
for (Snapshot snapshot : table.snapshots()) {
threadNames.add(snapshot.summary().get("writer-thread"));
}
Assert.assertEquals(2, threadNames.size());
Assert.assertTrue(threadNames.contains(null));
Assert.assertTrue(threadNames.contains("test-extra-commit-message-writer-thread"));

List<Snapshot> snapshots = Lists.newArrayList(table.snapshots());
Assert.assertEquals(2, snapshots.size());
Assert.assertNull(snapshots.get(0).summary().get("writer-thread"));
Assertions.assertThat(snapshots.get(1).summary())
.containsEntry("writer-thread", "test-extra-commit-message-writer-thread")
.containsEntry("extra-key", "someValue")
.containsEntry("another-key", "anotherValue");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,9 @@

import java.util.Map;
import java.util.concurrent.Callable;
import org.apache.iceberg.SnapshotSummary;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.util.ExceptionUtil;

/** utility class to accept thread local commit properties */
Expand All @@ -35,13 +37,19 @@ private CommitMetadata() {}
* running the code wrapped as a caller, and any snapshot committed within the callable object
* will be attached with the metadata defined in properties
*
* @param properties extra commit metadata to attach to the snapshot committed within callable
* @param properties extra commit metadata to attach to the snapshot committed within callable.
* The prefix will be removed for properties starting with {@link
* SnapshotSummary#EXTRA_METADATA_PREFIX}
* @param callable the code to be executed
* @param exClass the expected type of exception which would be thrown from callable
*/
public static <R, E extends Exception> R withCommitProperties(
Map<String, String> properties, Callable<R> callable, Class<E> exClass) throws E {
COMMIT_PROPERTIES.set(properties);
Map<String, String> props = Maps.newHashMap();
properties.forEach(
(k, v) -> props.put(k.replace(SnapshotSummary.EXTRA_METADATA_PREFIX, ""), v));

COMMIT_PROPERTIES.set(props);
try {
return callable.call();
} catch (Throwable e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,12 @@
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
import org.apache.iceberg.Snapshot;
import org.apache.iceberg.SnapshotSummary;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableProperties;
import org.apache.iceberg.hadoop.HadoopTables;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.relocated.com.google.common.math.LongMath;
Expand All @@ -53,6 +55,7 @@
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.catalyst.analysis.NoSuchTableException;
import org.assertj.core.api.Assertions;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
Expand Down Expand Up @@ -428,8 +431,14 @@ public void testExtraSnapshotMetadataWithSQL() throws InterruptedException, IOEx
Thread writerThread =
new Thread(
() -> {
Map<String, String> properties = Maps.newHashMap();
properties.put("writer-thread", String.valueOf(Thread.currentThread().getName()));
Map<String, String> properties =
ImmutableMap.of(
"writer-thread",
String.valueOf(Thread.currentThread().getName()),
SnapshotSummary.EXTRA_METADATA_PREFIX + "extra-key",
"someValue",
SnapshotSummary.EXTRA_METADATA_PREFIX + "another-key",
"anotherValue");
CommitMetadata.withCommitProperties(
properties,
() -> {
Expand All @@ -445,8 +454,10 @@ public void testExtraSnapshotMetadataWithSQL() throws InterruptedException, IOEx
List<Snapshot> snapshots = Lists.newArrayList(table.snapshots());
Assert.assertEquals(2, snapshots.size());
Assert.assertNull(snapshots.get(0).summary().get("writer-thread"));
Assert.assertEquals(
"test-extra-commit-message-writer-thread", snapshots.get(1).summary().get("writer-thread"));
Assertions.assertThat(snapshots.get(1).summary())
.containsEntry("writer-thread", "test-extra-commit-message-writer-thread")
.containsEntry("extra-key", "someValue")
.containsEntry("another-key", "anotherValue");
}

@Test
Expand All @@ -462,8 +473,14 @@ public void testExtraSnapshotMetadataWithDelete()
Thread writerThread =
new Thread(
() -> {
Map<String, String> properties = Maps.newHashMap();
properties.put("writer-thread", String.valueOf(Thread.currentThread().getName()));
Map<String, String> properties =
ImmutableMap.of(
"writer-thread",
String.valueOf(Thread.currentThread().getName()),
SnapshotSummary.EXTRA_METADATA_PREFIX + "extra-key",
"someValue",
SnapshotSummary.EXTRA_METADATA_PREFIX + "another-key",
"anotherValue");
CommitMetadata.withCommitProperties(
properties,
() -> {
Expand All @@ -480,7 +497,9 @@ public void testExtraSnapshotMetadataWithDelete()
List<Snapshot> snapshots = Lists.newArrayList(table.snapshots());
Assert.assertEquals(2, snapshots.size());
Assert.assertNull(snapshots.get(0).summary().get("writer-thread"));
Assert.assertEquals(
"test-extra-commit-message-delete-thread", snapshots.get(1).summary().get("writer-thread"));
Assertions.assertThat(snapshots.get(1).summary())
.containsEntry("writer-thread", "test-extra-commit-message-delete-thread")
.containsEntry("extra-key", "someValue")
.containsEntry("another-key", "anotherValue");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,9 @@

import java.util.Map;
import java.util.concurrent.Callable;
import org.apache.iceberg.SnapshotSummary;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.util.ExceptionUtil;

/** utility class to accept thread local commit properties */
Expand All @@ -35,13 +37,19 @@ private CommitMetadata() {}
* running the code wrapped as a caller, and any snapshot committed within the callable object
* will be attached with the metadata defined in properties
*
* @param properties extra commit metadata to attach to the snapshot committed within callable
* @param properties extra commit metadata to attach to the snapshot committed within callable.
* The prefix will be removed for properties starting with {@link
* SnapshotSummary#EXTRA_METADATA_PREFIX}
* @param callable the code to be executed
* @param exClass the expected type of exception which would be thrown from callable
*/
public static <R, E extends Exception> R withCommitProperties(
Map<String, String> properties, Callable<R> callable, Class<E> exClass) throws E {
COMMIT_PROPERTIES.set(properties);
Map<String, String> props = Maps.newHashMap();
properties.forEach(
(k, v) -> props.put(k.replace(SnapshotSummary.EXTRA_METADATA_PREFIX, ""), v));

COMMIT_PROPERTIES.set(props);
try {
return callable.call();
} catch (Throwable e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,12 @@
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
import org.apache.iceberg.Snapshot;
import org.apache.iceberg.SnapshotSummary;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableProperties;
import org.apache.iceberg.hadoop.HadoopTables;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.relocated.com.google.common.math.LongMath;
Expand All @@ -53,6 +55,7 @@
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.catalyst.analysis.NoSuchTableException;
import org.assertj.core.api.Assertions;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
Expand Down Expand Up @@ -428,8 +431,14 @@ public void testExtraSnapshotMetadataWithSQL() throws InterruptedException, IOEx
Thread writerThread =
new Thread(
() -> {
Map<String, String> properties = Maps.newHashMap();
properties.put("writer-thread", String.valueOf(Thread.currentThread().getName()));
Map<String, String> properties =
ImmutableMap.of(
"writer-thread",
String.valueOf(Thread.currentThread().getName()),
SnapshotSummary.EXTRA_METADATA_PREFIX + "extra-key",
"someValue",
SnapshotSummary.EXTRA_METADATA_PREFIX + "another-key",
"anotherValue");
CommitMetadata.withCommitProperties(
properties,
() -> {
Expand All @@ -445,8 +454,10 @@ public void testExtraSnapshotMetadataWithSQL() throws InterruptedException, IOEx
List<Snapshot> snapshots = Lists.newArrayList(table.snapshots());
Assert.assertEquals(2, snapshots.size());
Assert.assertNull(snapshots.get(0).summary().get("writer-thread"));
Assert.assertEquals(
"test-extra-commit-message-writer-thread", snapshots.get(1).summary().get("writer-thread"));
Assertions.assertThat(snapshots.get(1).summary())
.containsEntry("writer-thread", "test-extra-commit-message-writer-thread")
.containsEntry("extra-key", "someValue")
.containsEntry("another-key", "anotherValue");
}

@Test
Expand All @@ -462,8 +473,14 @@ public void testExtraSnapshotMetadataWithDelete()
Thread writerThread =
new Thread(
() -> {
Map<String, String> properties = Maps.newHashMap();
properties.put("writer-thread", String.valueOf(Thread.currentThread().getName()));
Map<String, String> properties =
ImmutableMap.of(
"writer-thread",
String.valueOf(Thread.currentThread().getName()),
SnapshotSummary.EXTRA_METADATA_PREFIX + "extra-key",
"someValue",
SnapshotSummary.EXTRA_METADATA_PREFIX + "another-key",
"anotherValue");
CommitMetadata.withCommitProperties(
properties,
() -> {
Expand All @@ -480,7 +497,9 @@ public void testExtraSnapshotMetadataWithDelete()
List<Snapshot> snapshots = Lists.newArrayList(table.snapshots());
Assert.assertEquals(2, snapshots.size());
Assert.assertNull(snapshots.get(0).summary().get("writer-thread"));
Assert.assertEquals(
"test-extra-commit-message-delete-thread", snapshots.get(1).summary().get("writer-thread"));
Assertions.assertThat(snapshots.get(1).summary())
.containsEntry("writer-thread", "test-extra-commit-message-delete-thread")
.containsEntry("extra-key", "someValue")
.containsEntry("another-key", "anotherValue");
}
}

0 comments on commit 9607a52

Please sign in to comment.