Skip to content

Commit

Permalink
Avro: Fix BuildAvroProjection list and map handling to preserve field…
Browse files Browse the repository at this point in the history
… IDs (apache#4120)

Co-authored-by: Haizhou Zhao <[email protected]>
  • Loading branch information
haizhou-zhao and Haizhou Zhao authored Feb 16, 2022
1 parent 9a403d6 commit 013d4ad
Show file tree
Hide file tree
Showing 6 changed files with 469 additions and 14 deletions.
39 changes: 39 additions & 0 deletions core/src/main/java/org/apache/iceberg/avro/AvroSchemaUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,25 @@ static boolean hasIds(Schema schema) {
return AvroCustomOrderSchemaVisitor.visit(schema, new HasIds());
}

/**
* Check if any of the nodes in a given avro schema is missing an ID
* <p>
* To have an ID for a node:
* <ul>
* <li>a field node under struct (record) schema should have {@link FIELD_ID_PROP} property
* <li>an element node under list (array) schema should have {@link ELEMENT_ID_PROP} property
* <li>a pair of key and value node under map schema should have {@link KEY_ID_PROP} and
* {@link VALUE_ID_PROP} respectively
* <li>a primitive node is not assigned any ID properties
* </ul>
* <p>
* @param schema an Avro Schema
* @return true if any of the nodes of the given Avro Schema is missing an ID property, false otherwise
*/
static boolean missingIds(Schema schema) {
return AvroCustomOrderSchemaVisitor.visit(schema, new MissingIds());
}

public static Map<Type, Schema> convertTypes(Types.StructType type, String name) {
TypeToSchema converter = new TypeToSchema(ImmutableMap.of(type, name));
TypeUtil.visit(type, converter);
Expand Down Expand Up @@ -357,6 +376,26 @@ static Schema.Field copyField(Schema.Field field, Schema newSchema, String newNa
return copy;
}

static Schema replaceElement(Schema array, Schema elementSchema) {
Preconditions.checkArgument(array.getType() == org.apache.avro.Schema.Type.ARRAY,
"Cannot invoke replaceElement on non array schema: %s", array);
Schema copy = Schema.createArray(elementSchema);
for (Map.Entry<String, Object> prop : array.getObjectProps().entrySet()) {
copy.addProp(prop.getKey(), prop.getValue());
}
return copy;
}

static Schema replaceValue(Schema map, Schema valueSchema) {
Preconditions.checkArgument(map.getType() == org.apache.avro.Schema.Type.MAP,
"Cannot invoke replaceValue on non map schema: %s", map);
Schema copy = Schema.createMap(valueSchema);
for (Map.Entry<String, Object> prop : map.getObjectProps().entrySet()) {
copy.addProp(prop.getKey(), prop.getValue());
}
return copy;
}

public static String makeCompatibleName(String name) {
if (!validAvroName(name)) {
return sanitize(name);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,11 @@ class BuildAvroProjection extends AvroCustomOrderSchemaVisitor<Schema, Schema.Fi
this.current = expectedSchema.asStruct();
}

BuildAvroProjection(org.apache.iceberg.types.Type expectedType, Map<String, String> renames) {
this.renames = renames;
this.current = expectedType;
}

@Override
@SuppressWarnings("checkstyle:CyclomaticComplexity")
public Schema record(Schema record, List<String> names, Iterable<Schema.Field> schemaIterable) {
Expand Down Expand Up @@ -201,7 +206,7 @@ public Schema array(Schema array, Supplier<Schema> element) {

// element was changed, create a new array
if (!Objects.equals(elementSchema, array.getElementType())) {
return Schema.createArray(elementSchema);
return AvroSchemaUtil.replaceElement(array, elementSchema);
}

return array;
Expand All @@ -225,7 +230,7 @@ public Schema map(Schema map, Supplier<Schema> value) {

// element was changed, create a new map
if (!Objects.equals(valueSchema, map.getValueType())) {
return Schema.createMap(valueSchema);
return AvroSchemaUtil.replaceValue(map, valueSchema);
}

return map;
Expand Down
15 changes: 3 additions & 12 deletions core/src/main/java/org/apache/iceberg/avro/HasIds.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.util.List;
import java.util.function.Supplier;
import org.apache.avro.Schema;
import org.apache.iceberg.relocated.com.google.common.collect.Iterables;

/**
* Lazily evaluates the schema to see if any field ids are set.
Expand All @@ -30,12 +31,7 @@
class HasIds extends AvroCustomOrderSchemaVisitor<Boolean, Boolean> {
@Override
public Boolean record(Schema record, List<String> names, Iterable<Boolean> fields) {
for (Boolean field : fields) {
if (field) {
return true;
}
}
return false;
return Iterables.any(fields, Boolean.TRUE::equals);
}

@Override
Expand All @@ -58,12 +54,7 @@ public Boolean array(Schema array, Supplier<Boolean> element) {

@Override
public Boolean union(Schema union, Iterable<Boolean> options) {
for (Boolean option : options) {
if (option) {
return true;
}
}
return false;
return Iterables.any(options, Boolean.TRUE::equals);
}

@Override
Expand Down
71 changes: 71 additions & 0 deletions core/src/main/java/org/apache/iceberg/avro/MissingIds.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.iceberg.avro;

import java.util.List;
import java.util.function.Supplier;
import org.apache.avro.Schema;
import org.apache.iceberg.relocated.com.google.common.collect.Iterables;

/**
* Returns true once the first node is found with ID property missing. Reverse of {@link HasIds}
* <p>
* Note: To use {@link AvroSchemaUtil#toIceberg(Schema)} on an avro schema, the avro schema need to be either
* have IDs on every node or not have IDs at all. Invoke {@link AvroSchemaUtil#hasIds(Schema)} only proves
* that the schema has at least one ID, and not sufficient condition for invoking
* {@link AvroSchemaUtil#toIceberg(Schema)} on the schema.
*/
class MissingIds extends AvroCustomOrderSchemaVisitor<Boolean, Boolean> {
@Override
public Boolean record(Schema record, List<String> names, Iterable<Boolean> fields) {
return Iterables.any(fields, Boolean.TRUE::equals);
}

@Override
public Boolean field(Schema.Field field, Supplier<Boolean> fieldResult) {
// either this field is missing ID, or the subtree is missing ID somewhere
return !AvroSchemaUtil.hasFieldId(field) || fieldResult.get();
}

@Override
public Boolean map(Schema map, Supplier<Boolean> value) {
// either this map node is missing (key/value) ID, or the subtree is missing ID somewhere
return !AvroSchemaUtil.hasProperty(map, AvroSchemaUtil.KEY_ID_PROP) ||
!AvroSchemaUtil.hasProperty(map, AvroSchemaUtil.VALUE_ID_PROP) ||
value.get();
}

@Override
public Boolean array(Schema array, Supplier<Boolean> element) {
// either this list node is missing (elem) ID, or the subtree is missing ID somewhere
return !AvroSchemaUtil.hasProperty(array, AvroSchemaUtil.ELEMENT_ID_PROP) || element.get();
}

@Override
public Boolean union(Schema union, Iterable<Boolean> options) {
return Iterables.any(options, Boolean.TRUE::equals);
}

@Override
public Boolean primitive(Schema primitive) {
// primitive node cannot be missing ID as Iceberg do not assign primitive node IDs in the first place
return false;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.iceberg.avro;

import java.util.Collections;
import org.apache.avro.SchemaBuilder;
import org.apache.iceberg.Schema;
import org.junit.Test;

import static org.junit.Assert.assertFalse;

public class TestAvroSchemaProjection {

@Test
public void projectWithListSchemaChanged() {
final org.apache.avro.Schema currentAvroSchema = SchemaBuilder.record("myrecord").namespace("unit.test").fields()
.name("f1").type().nullable().array().items(
SchemaBuilder.record("elem").fields()
.name("f11").type().nullable().intType().noDefault().endRecord())
.noDefault().endRecord();

final org.apache.avro.Schema updatedAvroSchema = SchemaBuilder.record("myrecord").namespace("unit.test").fields()
.name("f1").type().nullable().array().items(
SchemaBuilder.record("elem").fields()
.name("f11").type().nullable().intType().noDefault()
.name("f12").type().nullable().stringType().noDefault()
.endRecord())
.noDefault().endRecord();

final Schema currentIcebergSchema = AvroSchemaUtil.toIceberg(currentAvroSchema);

// Getting the node ID in updatedAvroSchema allocated by converting into iceberg schema and back
final org.apache.avro.Schema idAllocatedUpdatedAvroSchema =
AvroSchemaUtil.convert(AvroSchemaUtil.toIceberg(updatedAvroSchema).asStruct());

final org.apache.avro.Schema projectedAvroSchema =
AvroSchemaUtil.buildAvroProjection(idAllocatedUpdatedAvroSchema, currentIcebergSchema, Collections.emptyMap());

assertFalse("Result of buildAvroProjection is missing some IDs",
AvroSchemaUtil.missingIds(projectedAvroSchema));
}


@Test
public void projectWithMapSchemaChanged() {
final org.apache.avro.Schema currentAvroSchema = SchemaBuilder.record("myrecord").namespace("unit.test").fields()
.name("f1").type().nullable().map().values(
SchemaBuilder.record("elem").fields()
.name("f11").type().nullable().intType().noDefault().endRecord())
.noDefault().endRecord();

final org.apache.avro.Schema updatedAvroSchema = SchemaBuilder.record("myrecord").namespace("unit.test").fields()
.name("f1").type().nullable().map().values(
SchemaBuilder.record("elem").fields()
.name("f11").type().nullable().intType().noDefault()
.name("f12").type().nullable().stringType().noDefault()
.endRecord())
.noDefault().endRecord();

final Schema currentIcebergSchema = AvroSchemaUtil.toIceberg(currentAvroSchema);

// Getting the node ID in updatedAvroSchema allocated by converting into iceberg schema and back
final org.apache.avro.Schema idAllocatedUpdatedAvroSchema =
AvroSchemaUtil.convert(AvroSchemaUtil.toIceberg(updatedAvroSchema).asStruct());

final org.apache.avro.Schema projectedAvroSchema =
AvroSchemaUtil.buildAvroProjection(idAllocatedUpdatedAvroSchema, currentIcebergSchema, Collections.emptyMap());

assertFalse("Result of buildAvroProjection is missing some IDs",
AvroSchemaUtil.missingIds(projectedAvroSchema));
}

}
Loading

0 comments on commit 013d4ad

Please sign in to comment.