Skip to content

Commit

Permalink
Core: Add serialization for MetadataUpdate (apache#4716)
Browse files Browse the repository at this point in the history
  • Loading branch information
kbendick authored May 23, 2022
1 parent 086a5db commit 6e9905d
Show file tree
Hide file tree
Showing 6 changed files with 696 additions and 18 deletions.
10 changes: 5 additions & 5 deletions core/src/main/java/org/apache/iceberg/MetadataUpdate.java
Original file line number Diff line number Diff line change
Expand Up @@ -233,16 +233,16 @@ public void applyTo(TableMetadata.Builder metadataBuilder) {
}

class SetSnapshotRef implements MetadataUpdate {
private final String name;
private final String refName;
private final Long snapshotId;
private final SnapshotRefType type;
private Integer minSnapshotsToKeep;
private Long maxSnapshotAgeMs;
private Long maxRefAgeMs;

public SetSnapshotRef(String name, Long snapshotId, SnapshotRefType type, Integer minSnapshotsToKeep,
public SetSnapshotRef(String refName, Long snapshotId, SnapshotRefType type, Integer minSnapshotsToKeep,
Long maxSnapshotAgeMs, Long maxRefAgeMs) {
this.name = name;
this.refName = refName;
this.snapshotId = snapshotId;
this.type = type;
this.minSnapshotsToKeep = minSnapshotsToKeep;
Expand All @@ -251,7 +251,7 @@ public SetSnapshotRef(String name, Long snapshotId, SnapshotRefType type, Intege
}

public String name() {
return name;
return refName;
}

public String type() {
Expand Down Expand Up @@ -281,7 +281,7 @@ public void applyTo(TableMetadata.Builder metadataBuilder) {
.maxSnapshotAgeMs(maxSnapshotAgeMs)
.maxRefAgeMs(maxRefAgeMs)
.build();
metadataBuilder.setRef(name, ref);
metadataBuilder.setRef(refName, ref);
}
}

Expand Down
163 changes: 159 additions & 4 deletions core/src/main/java/org/apache/iceberg/MetadataUpdateParser.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,11 @@
import java.io.UncheckedIOException;
import java.util.Locale;
import java.util.Map;
import java.util.Set;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
import org.apache.iceberg.util.JsonUtil;

public class MetadataUpdateParser {
Expand All @@ -53,6 +56,9 @@ private MetadataUpdateParser() {
static final String REMOVE_PROPERTIES = "remove-properties";
static final String SET_LOCATION = "set-location";

// AssignUUID
private static final String UUID = "uuid";

// UpgradeFormatVersion
private static final String FORMAT_VERSION = "format-version";

Expand All @@ -72,6 +78,32 @@ private MetadataUpdateParser() {
// AddSortOrder
private static final String SORT_ORDER = "sort-order";

// SetDefaultSortOrder
private static final String SORT_ORDER_ID = "sort-order-id";

// AddSnapshot
private static final String SNAPSHOT = "snapshot";

// RemoveSnapshot
private static final String SNAPSHOT_IDS = "snapshot-ids";

// SetSnapshotRef
private static final String REF_NAME = "ref-name";
private static final String SNAPSHOT_ID = "snapshot-id";
private static final String TYPE = "type";
private static final String MIN_SNAPSHOTS_TO_KEEP = "min-snapshots-to-keep";
private static final String MAX_SNAPSHOT_AGE_MS = "max-snapshot-age-ms";
private static final String MAX_REF_AGE_MS = "max-ref-age-ms";

// SetProperties
private static final String UPDATED = "updated";

// RemoveProperties
private static final String REMOVED = "removed";

// SetLocation
private static final String LOCATION = "location";

private static final Map<Class<? extends MetadataUpdate>, String> ACTIONS = ImmutableMap
.<Class<? extends MetadataUpdate>, String>builder()
.put(MetadataUpdate.AssignUUID.class, ASSIGN_UUID)
Expand Down Expand Up @@ -112,12 +144,18 @@ public static String toJson(MetadataUpdate metadataUpdate, boolean pretty) {
public static void toJson(MetadataUpdate metadataUpdate, JsonGenerator generator) throws IOException {
String updateAction = ACTIONS.get(metadataUpdate.getClass());

// Provide better exception message than the NPE thrown by writing null for the update action, which is required
Preconditions.checkArgument(updateAction != null,
"Cannot convert metadata update to json. Unrecognized metadata update type: {}",
metadataUpdate.getClass().getName());

generator.writeStartObject();
generator.writeStringField(ACTION, updateAction);

switch (updateAction) {
case ASSIGN_UUID:
throw new UnsupportedOperationException("Not Implemented: MetadataUpdate#toJson for AssignUUID");
writeAssignUUID((MetadataUpdate.AssignUUID) metadataUpdate, generator);
break;
case UPGRADE_FORMAT_VERSION:
writeUpgradeFormatVersion((MetadataUpdate.UpgradeFormatVersion) metadataUpdate, generator);
break;
Expand All @@ -137,13 +175,26 @@ public static void toJson(MetadataUpdate metadataUpdate, JsonGenerator generator
writeAddSortOrder((MetadataUpdate.AddSortOrder) metadataUpdate, generator);
break;
case SET_DEFAULT_SORT_ORDER:
writeSetDefaultSortOrder((MetadataUpdate.SetDefaultSortOrder) metadataUpdate, generator);
break;
case ADD_SNAPSHOT:
writeAddSnapshot((MetadataUpdate.AddSnapshot) metadataUpdate, generator);
break;
case REMOVE_SNAPSHOTS:
writeRemoveSnapshots((MetadataUpdate.RemoveSnapshot) metadataUpdate, generator);
break;
case SET_SNAPSHOT_REF:
writeSetSnapshotRef((MetadataUpdate.SetSnapshotRef) metadataUpdate, generator);
break;
case SET_PROPERTIES:
writeSetProperties((MetadataUpdate.SetProperties) metadataUpdate, generator);
break;
case REMOVE_PROPERTIES:
writeRemoveProperties((MetadataUpdate.RemoveProperties) metadataUpdate, generator);
break;
case SET_LOCATION:
throw new UnsupportedOperationException("Not Implemented: MetadataUpdate#toJson for " + updateAction);
writeSetLocation((MetadataUpdate.SetLocation) metadataUpdate, generator);
break;
default:
throw new IllegalArgumentException(
String.format("Cannot convert metadata update to json. Unrecognized action: %s", updateAction));
Expand Down Expand Up @@ -174,7 +225,7 @@ public static MetadataUpdate fromJson(JsonNode jsonNode) {

switch (action) {
case ASSIGN_UUID:
throw new UnsupportedOperationException("Not implemented: AssignUUID");
return readAssignUUID(jsonNode);
case UPGRADE_FORMAT_VERSION:
return readUpgradeFormatVersion(jsonNode);
case ADD_SCHEMA:
Expand All @@ -188,19 +239,29 @@ public static MetadataUpdate fromJson(JsonNode jsonNode) {
case ADD_SORT_ORDER:
return readAddSortOrder(jsonNode);
case SET_DEFAULT_SORT_ORDER:
return readSetDefaultSortOrder(jsonNode);
case ADD_SNAPSHOT:
return readAddSnapshot(jsonNode);
case REMOVE_SNAPSHOTS:
return readRemoveSnapshots(jsonNode);
case SET_SNAPSHOT_REF:
return readSetSnapshotRef(jsonNode);
case SET_PROPERTIES:
return readSetProperties(jsonNode);
case REMOVE_PROPERTIES:
return readRemoveProperties(jsonNode);
case SET_LOCATION:
throw new UnsupportedOperationException("Not Implemented: MetadataUpdatefromJson for " + action);
return readSetLocation(jsonNode);
default:
throw new UnsupportedOperationException(
String.format("Cannot convert metadata update action to json: %s", action));
}
}

private static void writeAssignUUID(MetadataUpdate.AssignUUID update, JsonGenerator gen) throws IOException {
gen.writeStringField(UUID, update.uuid());
}

private static void writeUpgradeFormatVersion(MetadataUpdate.UpgradeFormatVersion update, JsonGenerator gen)
throws IOException {
gen.writeNumberField(FORMAT_VERSION, update.formatVersion());
Expand Down Expand Up @@ -234,6 +295,55 @@ private static void writeAddSortOrder(MetadataUpdate.AddSortOrder update, JsonGe
SortOrderParser.toJson(update.sortOrder(), gen);
}

private static void writeSetDefaultSortOrder(MetadataUpdate.SetDefaultSortOrder update, JsonGenerator gen)
throws IOException {
gen.writeNumberField(SORT_ORDER_ID, update.sortOrderId());
}

private static void writeAddSnapshot(MetadataUpdate.AddSnapshot update, JsonGenerator gen) throws IOException {
gen.writeFieldName(SNAPSHOT);
SnapshotParser.toJson(update.snapshot(), gen);
}

// TODO - Reconcile the spec's set-based removal with the current class implementation that only handles one value.
private static void writeRemoveSnapshots(MetadataUpdate.RemoveSnapshot update, JsonGenerator gen) throws IOException {
gen.writeArrayFieldStart(SNAPSHOT_IDS);
for (long snapshotId : ImmutableSet.of(update.snapshotId())) {
gen.writeNumber(snapshotId);
}
gen.writeEndArray();
}

private static void writeSetSnapshotRef(MetadataUpdate.SetSnapshotRef update, JsonGenerator gen) throws IOException {
gen.writeStringField(REF_NAME, update.name());
gen.writeNumberField(SNAPSHOT_ID, update.snapshotId());
gen.writeStringField(TYPE, update.type());
JsonUtil.writeIntegerFieldIf(
update.minSnapshotsToKeep() != null, MIN_SNAPSHOTS_TO_KEEP, update.minSnapshotsToKeep(), gen);
JsonUtil.writeLongFieldIf(update.maxSnapshotAgeMs() != null, MAX_SNAPSHOT_AGE_MS, update.maxSnapshotAgeMs(), gen);
JsonUtil.writeLongFieldIf(update.maxRefAgeMs() != null, MAX_REF_AGE_MS, update.maxRefAgeMs(), gen);
}

private static void writeSetProperties(MetadataUpdate.SetProperties update, JsonGenerator gen) throws IOException {
gen.writeFieldName(UPDATED);
gen.writeObject(update.updated());
}

private static void writeRemoveProperties(MetadataUpdate.RemoveProperties update, JsonGenerator gen)
throws IOException {
gen.writeFieldName(REMOVED);
gen.writeObject(update.removed());
}

private static void writeSetLocation(MetadataUpdate.SetLocation update, JsonGenerator gen) throws IOException {
gen.writeStringField(LOCATION, update.location());
}

private static MetadataUpdate readAssignUUID(JsonNode node) {
String uuid = JsonUtil.getString(UUID, node);
return new MetadataUpdate.AssignUUID(uuid);
}

private static MetadataUpdate readUpgradeFormatVersion(JsonNode node) {
int formatVersion = JsonUtil.getInt(FORMAT_VERSION, node);
return new MetadataUpdate.UpgradeFormatVersion(formatVersion);
Expand Down Expand Up @@ -271,5 +381,50 @@ private static MetadataUpdate readAddSortOrder(JsonNode node) {
UnboundSortOrder sortOrder = SortOrderParser.fromJson(sortOrderNode);
return new MetadataUpdate.AddSortOrder(sortOrder);
}

private static MetadataUpdate readSetDefaultSortOrder(JsonNode node) {
int sortOrderId = JsonUtil.getInt(SORT_ORDER_ID, node);
return new MetadataUpdate.SetDefaultSortOrder(sortOrderId);
}

private static MetadataUpdate readAddSnapshot(JsonNode node) {
Preconditions.checkArgument(node.has(SNAPSHOT), "Cannot parse missing field: snapshot");
Snapshot snapshot = SnapshotParser.fromJson(null, node.get(SNAPSHOT));
return new MetadataUpdate.AddSnapshot(snapshot);
}

private static MetadataUpdate readRemoveSnapshots(JsonNode node) {
Set<Long> snapshotIds = JsonUtil.getLongSetOrNull(SNAPSHOT_IDS, node);
Preconditions.checkArgument(snapshotIds != null && snapshotIds.size() == 1,
"Invalid set of snapshot ids to remove. Expected one value but received: %s", snapshotIds);
Long snapshotId = Iterables.getOnlyElement(snapshotIds);
return new MetadataUpdate.RemoveSnapshot(snapshotId);
}

private static MetadataUpdate readSetSnapshotRef(JsonNode node) {
String refName = JsonUtil.getString(REF_NAME, node);
long snapshotId = JsonUtil.getLong(SNAPSHOT_ID, node);
SnapshotRefType type = SnapshotRefType.valueOf(JsonUtil.getString(TYPE, node).toUpperCase(Locale.ENGLISH));
Integer minSnapshotsToKeep = JsonUtil.getIntOrNull(MIN_SNAPSHOTS_TO_KEEP, node);
Long maxSnapshotAgeMs = JsonUtil.getLongOrNull(MAX_SNAPSHOT_AGE_MS, node);
Long maxRefAgeMs = JsonUtil.getLongOrNull(MAX_REF_AGE_MS, node);
return new MetadataUpdate.SetSnapshotRef(
refName, snapshotId, type, minSnapshotsToKeep, maxSnapshotAgeMs, maxRefAgeMs);
}

private static MetadataUpdate readSetProperties(JsonNode node) {
Map<String, String> updated = JsonUtil.getStringMap(UPDATED, node);
return new MetadataUpdate.SetProperties(updated);
}

private static MetadataUpdate readRemoveProperties(JsonNode node) {
Set<String> removed = JsonUtil.getStringSet(REMOVED, node);
return new MetadataUpdate.RemoveProperties(removed);
}

private static MetadataUpdate readSetLocation(JsonNode node) {
String location = JsonUtil.getString(LOCATION, node);
return new MetadataUpdate.SetLocation(location);
}
}

9 changes: 8 additions & 1 deletion core/src/main/java/org/apache/iceberg/SnapshotParser.java
Original file line number Diff line number Diff line change
Expand Up @@ -98,10 +98,17 @@ static void toJson(Snapshot snapshot, JsonGenerator generator)
}

public static String toJson(Snapshot snapshot) {
// Use true as default value of pretty for backwards compatibility
return toJson(snapshot, true);
}

public static String toJson(Snapshot snapshot, boolean pretty) {
try {
StringWriter writer = new StringWriter();
JsonGenerator generator = JsonUtil.factory().createGenerator(writer);
generator.useDefaultPrettyPrinter();
if (pretty) {
generator.useDefaultPrettyPrinter();
}
toJson(snapshot, generator);
generator.flush();
return writer.toString();
Expand Down
36 changes: 36 additions & 0 deletions core/src/main/java/org/apache/iceberg/util/JsonUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,14 @@ public static List<String> getStringList(String property, JsonNode node) {
.build();
}

public static Set<String> getStringSet(String property, JsonNode node) {
Preconditions.checkArgument(node.hasNonNull(property), "Cannot parse missing set %s", property);

return ImmutableSet.<String>builder()
.addAll(new JsonStringArrayIterator(property, node))
.build();
}

public static List<String> getStringListOrNull(String property, JsonNode node) {
if (!node.has(property) || node.get(property).isNull()) {
return null;
Expand All @@ -168,6 +176,16 @@ public static Set<Integer> getIntegerSetOrNull(String property, JsonNode node) {
.build();
}

public static Set<Long> getLongSetOrNull(String property, JsonNode node) {
if (!node.hasNonNull(property)) {
return null;
}

return ImmutableSet.<Long>builder()
.addAll(new JsonLongArrayIterator(property, node))
.build();
}

public static void writeIntegerFieldIf(boolean condition, String key, Integer value, JsonGenerator generator)
throws IOException {
if (condition) {
Expand Down Expand Up @@ -243,4 +261,22 @@ void validate(JsonNode element) {
Preconditions.checkArgument(element.isInt(), "Cannot parse integer from non-int value: %s", element);
}
}

static class JsonLongArrayIterator extends JsonArrayIterator<Long> {

JsonLongArrayIterator(String property, JsonNode node) {
super(property, node);
}

@Override
Long convert(JsonNode element) {
return element.asLong();
}

@Override
void validate(JsonNode element) {
Preconditions.checkArgument(element.isIntegralNumber() && element.canConvertToLong(),
"Cannot parse long from non-long value: %s", element);
}
}
}
Loading

0 comments on commit 6e9905d

Please sign in to comment.