diff --git a/core/src/main/java/org/apache/iceberg/avro/AvroSchemaUtil.java b/core/src/main/java/org/apache/iceberg/avro/AvroSchemaUtil.java index 929fb5f1414f..b6b4ffffafac 100644 --- a/core/src/main/java/org/apache/iceberg/avro/AvroSchemaUtil.java +++ b/core/src/main/java/org/apache/iceberg/avro/AvroSchemaUtil.java @@ -90,6 +90,25 @@ static boolean hasIds(Schema schema) { return AvroCustomOrderSchemaVisitor.visit(schema, new HasIds()); } + /** + * Check if any of the nodes in a given avro schema is missing an ID + *

+ * To have an ID for a node: + *

+ *

+ * @param schema an Avro Schema + * @return true if any of the nodes of the given Avro Schema is missing an ID property, false otherwise + */ + static boolean missingIds(Schema schema) { + return AvroCustomOrderSchemaVisitor.visit(schema, new MissingIds()); + } + public static Map convertTypes(Types.StructType type, String name) { TypeToSchema converter = new TypeToSchema(ImmutableMap.of(type, name)); TypeUtil.visit(type, converter); @@ -357,6 +376,26 @@ static Schema.Field copyField(Schema.Field field, Schema newSchema, String newNa return copy; } + static Schema replaceElement(Schema array, Schema elementSchema) { + Preconditions.checkArgument(array.getType() == org.apache.avro.Schema.Type.ARRAY, + "Cannot invoke replaceElement on non array schema: %s", array); + Schema copy = Schema.createArray(elementSchema); + for (Map.Entry prop : array.getObjectProps().entrySet()) { + copy.addProp(prop.getKey(), prop.getValue()); + } + return copy; + } + + static Schema replaceValue(Schema map, Schema valueSchema) { + Preconditions.checkArgument(map.getType() == org.apache.avro.Schema.Type.MAP, + "Cannot invoke replaceValue on non map schema: %s", map); + Schema copy = Schema.createMap(valueSchema); + for (Map.Entry prop : map.getObjectProps().entrySet()) { + copy.addProp(prop.getKey(), prop.getValue()); + } + return copy; + } + public static String makeCompatibleName(String name) { if (!validAvroName(name)) { return sanitize(name); diff --git a/core/src/main/java/org/apache/iceberg/avro/BuildAvroProjection.java b/core/src/main/java/org/apache/iceberg/avro/BuildAvroProjection.java index cd934dc4b89d..815e6bc5db85 100644 --- a/core/src/main/java/org/apache/iceberg/avro/BuildAvroProjection.java +++ b/core/src/main/java/org/apache/iceberg/avro/BuildAvroProjection.java @@ -49,6 +49,11 @@ class BuildAvroProjection extends AvroCustomOrderSchemaVisitor renames) { + this.renames = renames; + this.current = expectedType; + } + @Override @SuppressWarnings("checkstyle:CyclomaticComplexity") public Schema record(Schema record, List names, Iterable schemaIterable) { @@ -201,7 +206,7 @@ public Schema array(Schema array, Supplier element) { // element was changed, create a new array if (!Objects.equals(elementSchema, array.getElementType())) { - return Schema.createArray(elementSchema); + return AvroSchemaUtil.replaceElement(array, elementSchema); } return array; @@ -225,7 +230,7 @@ public Schema map(Schema map, Supplier value) { // element was changed, create a new map if (!Objects.equals(valueSchema, map.getValueType())) { - return Schema.createMap(valueSchema); + return AvroSchemaUtil.replaceValue(map, valueSchema); } return map; diff --git a/core/src/main/java/org/apache/iceberg/avro/HasIds.java b/core/src/main/java/org/apache/iceberg/avro/HasIds.java index 69efcaa4a4fe..85ce3809a974 100644 --- a/core/src/main/java/org/apache/iceberg/avro/HasIds.java +++ b/core/src/main/java/org/apache/iceberg/avro/HasIds.java @@ -22,6 +22,7 @@ import java.util.List; import java.util.function.Supplier; import org.apache.avro.Schema; +import org.apache.iceberg.relocated.com.google.common.collect.Iterables; /** * Lazily evaluates the schema to see if any field ids are set. @@ -30,12 +31,7 @@ class HasIds extends AvroCustomOrderSchemaVisitor { @Override public Boolean record(Schema record, List names, Iterable fields) { - for (Boolean field : fields) { - if (field) { - return true; - } - } - return false; + return Iterables.any(fields, Boolean.TRUE::equals); } @Override @@ -58,12 +54,7 @@ public Boolean array(Schema array, Supplier element) { @Override public Boolean union(Schema union, Iterable options) { - for (Boolean option : options) { - if (option) { - return true; - } - } - return false; + return Iterables.any(options, Boolean.TRUE::equals); } @Override diff --git a/core/src/main/java/org/apache/iceberg/avro/MissingIds.java b/core/src/main/java/org/apache/iceberg/avro/MissingIds.java new file mode 100644 index 000000000000..6b7a2d2161ed --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/avro/MissingIds.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.avro; + +import java.util.List; +import java.util.function.Supplier; +import org.apache.avro.Schema; +import org.apache.iceberg.relocated.com.google.common.collect.Iterables; + +/** + * Returns true once the first node is found with ID property missing. Reverse of {@link HasIds} + *

+ * Note: To use {@link AvroSchemaUtil#toIceberg(Schema)} on an avro schema, the avro schema need to be either + * have IDs on every node or not have IDs at all. Invoke {@link AvroSchemaUtil#hasIds(Schema)} only proves + * that the schema has at least one ID, and not sufficient condition for invoking + * {@link AvroSchemaUtil#toIceberg(Schema)} on the schema. + */ +class MissingIds extends AvroCustomOrderSchemaVisitor { + @Override + public Boolean record(Schema record, List names, Iterable fields) { + return Iterables.any(fields, Boolean.TRUE::equals); + } + + @Override + public Boolean field(Schema.Field field, Supplier fieldResult) { + // either this field is missing ID, or the subtree is missing ID somewhere + return !AvroSchemaUtil.hasFieldId(field) || fieldResult.get(); + } + + @Override + public Boolean map(Schema map, Supplier value) { + // either this map node is missing (key/value) ID, or the subtree is missing ID somewhere + return !AvroSchemaUtil.hasProperty(map, AvroSchemaUtil.KEY_ID_PROP) || + !AvroSchemaUtil.hasProperty(map, AvroSchemaUtil.VALUE_ID_PROP) || + value.get(); + } + + @Override + public Boolean array(Schema array, Supplier element) { + // either this list node is missing (elem) ID, or the subtree is missing ID somewhere + return !AvroSchemaUtil.hasProperty(array, AvroSchemaUtil.ELEMENT_ID_PROP) || element.get(); + } + + @Override + public Boolean union(Schema union, Iterable options) { + return Iterables.any(options, Boolean.TRUE::equals); + } + + @Override + public Boolean primitive(Schema primitive) { + // primitive node cannot be missing ID as Iceberg do not assign primitive node IDs in the first place + return false; + } +} diff --git a/core/src/test/java/org/apache/iceberg/avro/TestAvroSchemaProjection.java b/core/src/test/java/org/apache/iceberg/avro/TestAvroSchemaProjection.java new file mode 100644 index 000000000000..71dc9a6cb17e --- /dev/null +++ b/core/src/test/java/org/apache/iceberg/avro/TestAvroSchemaProjection.java @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.avro; + +import java.util.Collections; +import org.apache.avro.SchemaBuilder; +import org.apache.iceberg.Schema; +import org.junit.Test; + +import static org.junit.Assert.assertFalse; + +public class TestAvroSchemaProjection { + + @Test + public void projectWithListSchemaChanged() { + final org.apache.avro.Schema currentAvroSchema = SchemaBuilder.record("myrecord").namespace("unit.test").fields() + .name("f1").type().nullable().array().items( + SchemaBuilder.record("elem").fields() + .name("f11").type().nullable().intType().noDefault().endRecord()) + .noDefault().endRecord(); + + final org.apache.avro.Schema updatedAvroSchema = SchemaBuilder.record("myrecord").namespace("unit.test").fields() + .name("f1").type().nullable().array().items( + SchemaBuilder.record("elem").fields() + .name("f11").type().nullable().intType().noDefault() + .name("f12").type().nullable().stringType().noDefault() + .endRecord()) + .noDefault().endRecord(); + + final Schema currentIcebergSchema = AvroSchemaUtil.toIceberg(currentAvroSchema); + + // Getting the node ID in updatedAvroSchema allocated by converting into iceberg schema and back + final org.apache.avro.Schema idAllocatedUpdatedAvroSchema = + AvroSchemaUtil.convert(AvroSchemaUtil.toIceberg(updatedAvroSchema).asStruct()); + + final org.apache.avro.Schema projectedAvroSchema = + AvroSchemaUtil.buildAvroProjection(idAllocatedUpdatedAvroSchema, currentIcebergSchema, Collections.emptyMap()); + + assertFalse("Result of buildAvroProjection is missing some IDs", + AvroSchemaUtil.missingIds(projectedAvroSchema)); + } + + + @Test + public void projectWithMapSchemaChanged() { + final org.apache.avro.Schema currentAvroSchema = SchemaBuilder.record("myrecord").namespace("unit.test").fields() + .name("f1").type().nullable().map().values( + SchemaBuilder.record("elem").fields() + .name("f11").type().nullable().intType().noDefault().endRecord()) + .noDefault().endRecord(); + + final org.apache.avro.Schema updatedAvroSchema = SchemaBuilder.record("myrecord").namespace("unit.test").fields() + .name("f1").type().nullable().map().values( + SchemaBuilder.record("elem").fields() + .name("f11").type().nullable().intType().noDefault() + .name("f12").type().nullable().stringType().noDefault() + .endRecord()) + .noDefault().endRecord(); + + final Schema currentIcebergSchema = AvroSchemaUtil.toIceberg(currentAvroSchema); + + // Getting the node ID in updatedAvroSchema allocated by converting into iceberg schema and back + final org.apache.avro.Schema idAllocatedUpdatedAvroSchema = + AvroSchemaUtil.convert(AvroSchemaUtil.toIceberg(updatedAvroSchema).asStruct()); + + final org.apache.avro.Schema projectedAvroSchema = + AvroSchemaUtil.buildAvroProjection(idAllocatedUpdatedAvroSchema, currentIcebergSchema, Collections.emptyMap()); + + assertFalse("Result of buildAvroProjection is missing some IDs", + AvroSchemaUtil.missingIds(projectedAvroSchema)); + } + +} diff --git a/core/src/test/java/org/apache/iceberg/avro/TestBuildAvroProjection.java b/core/src/test/java/org/apache/iceberg/avro/TestBuildAvroProjection.java new file mode 100644 index 000000000000..79a54f886c1d --- /dev/null +++ b/core/src/test/java/org/apache/iceberg/avro/TestBuildAvroProjection.java @@ -0,0 +1,259 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.avro; + +import java.util.Collections; +import java.util.function.Supplier; +import org.apache.avro.SchemaBuilder; +import org.apache.iceberg.types.Type; +import org.apache.iceberg.types.Types; +import org.junit.Test; + +import static org.apache.iceberg.types.Types.NestedField.optional; +import static org.junit.Assert.assertEquals; + +public class TestBuildAvroProjection { + + @Test + public void projectArrayWithElementSchemaUnchanged() { + + final Type icebergType = Types.ListType.ofRequired(0, + Types.StructType.of( + optional(1, "int1", Types.IntegerType.get()), + optional(2, "string1", Types.StringType.get()) + ) + ); + + final org.apache.avro.Schema expected = SchemaBuilder.array().prop(AvroSchemaUtil.ELEMENT_ID_PROP, "0") + .items( + SchemaBuilder.record("elem").namespace("unit.test").fields() + .name("int1").prop(AvroSchemaUtil.FIELD_ID_PROP, "1").type().nullable().intType().noDefault() + .name("string1").prop(AvroSchemaUtil.FIELD_ID_PROP, "2").type().nullable().stringType().noDefault() + .endRecord()); + + final BuildAvroProjection testSubject = new BuildAvroProjection(icebergType, Collections.emptyMap()); + + final Supplier supplier = expected::getElementType; + + final org.apache.avro.Schema actual = testSubject.array(expected, supplier); + + assertEquals("Array projection produced undesired array schema", + expected, actual); + assertEquals("Unexpected element ID discovered on the projected array schema", + 0, Integer.valueOf(actual.getProp(AvroSchemaUtil.ELEMENT_ID_PROP)).intValue()); + } + + @Test + public void projectArrayWithExtraFieldInElementSchema() { + + final Type icebergType = Types.ListType.ofRequired(0, + Types.StructType.of( + optional(1, "int1", Types.IntegerType.get()), + optional(2, "string1", Types.StringType.get()) + ) + ); + + final org.apache.avro.Schema extraField = SchemaBuilder.array().prop(AvroSchemaUtil.ELEMENT_ID_PROP, "0") + .items( + SchemaBuilder.record("elem").namespace("unit.test").fields() + .name("int1").prop(AvroSchemaUtil.FIELD_ID_PROP, "1").type().nullable().intType().noDefault() + .name("string1").prop(AvroSchemaUtil.FIELD_ID_PROP, "2").type().nullable().stringType().noDefault() + .name("float1").prop(AvroSchemaUtil.FIELD_ID_PROP, "3").type().nullable().floatType().noDefault() + .endRecord()); + + // once projected onto iceberg schema, the avro schema will lose the extra float field + final org.apache.avro.Schema expected = SchemaBuilder.array().prop(AvroSchemaUtil.ELEMENT_ID_PROP, "0") + .items( + SchemaBuilder.record("elem").namespace("unit.test").fields() + .name("int1").prop(AvroSchemaUtil.FIELD_ID_PROP, "1").type().nullable().intType().noDefault() + .name("string1").prop(AvroSchemaUtil.FIELD_ID_PROP, "2").type().nullable().stringType().noDefault() + .endRecord()); + + final BuildAvroProjection testSubject = new BuildAvroProjection(icebergType, Collections.emptyMap()); + + final Supplier supplier = expected::getElementType; + + final org.apache.avro.Schema actual = testSubject.array(extraField, supplier); + + assertEquals("Array projection produced undesired array schema", + expected, actual); + assertEquals("Unexpected element ID discovered on the projected array schema", + 0, Integer.valueOf(actual.getProp(AvroSchemaUtil.ELEMENT_ID_PROP)).intValue()); + } + + @Test + public void projectArrayWithLessFieldInElementSchema() { + + final Type icebergType = Types.ListType.ofRequired(0, + Types.StructType.of( + optional(1, "int1", Types.IntegerType.get()), + optional(2, "string1", Types.StringType.get()) + ) + ); + + final org.apache.avro.Schema lessField = SchemaBuilder.array().prop(AvroSchemaUtil.ELEMENT_ID_PROP, "0") + .items( + SchemaBuilder.record("elem").namespace("unit.test").fields() + .name("int1").prop(AvroSchemaUtil.FIELD_ID_PROP, "1").type().nullable().intType().noDefault() + .endRecord()); + + // once projected onto iceberg schema, the avro schema will have an extra string column + final org.apache.avro.Schema expected = SchemaBuilder.array().prop(AvroSchemaUtil.ELEMENT_ID_PROP, "0") + .items( + SchemaBuilder.record("elem").namespace("unit.test").fields() + .name("int1").prop(AvroSchemaUtil.FIELD_ID_PROP, "1").type().nullable().intType().noDefault() + .name("string1_r").prop(AvroSchemaUtil.FIELD_ID_PROP, "2").type().nullable().stringType().noDefault() + .endRecord()); + + final BuildAvroProjection testSubject = new BuildAvroProjection(icebergType, Collections.emptyMap()); + + final Supplier supplier = expected::getElementType; + + final org.apache.avro.Schema actual = testSubject.array(lessField, supplier); + + assertEquals("Array projection produced undesired array schema", + expected, actual); + assertEquals("Unexpected element ID discovered on the projected array schema", + 0, Integer.valueOf(actual.getProp(AvroSchemaUtil.ELEMENT_ID_PROP)).intValue()); + } + + @Test + public void projectMapWithValueSchemaUnchanged() { + + final Type icebergType = Types.MapType.ofRequired(0, 1, + Types.StringType.get(), + Types.StructType.of( + optional(2, "int1", Types.IntegerType.get()), + optional(3, "string1", Types.StringType.get()) + ) + ); + + final org.apache.avro.Schema expected = SchemaBuilder.map() + .prop(AvroSchemaUtil.KEY_ID_PROP, "0") + .prop(AvroSchemaUtil.VALUE_ID_PROP, "1") + .values( + SchemaBuilder.record("value").namespace("unit.test").fields() + .name("int1").prop(AvroSchemaUtil.FIELD_ID_PROP, "1").type().nullable().intType().noDefault() + .name("string1").prop(AvroSchemaUtil.FIELD_ID_PROP, "2").type().nullable().stringType().noDefault() + .endRecord()); + + final BuildAvroProjection testSubject = new BuildAvroProjection(icebergType, Collections.emptyMap()); + + final Supplier supplier = expected::getValueType; + + final org.apache.avro.Schema actual = testSubject.map(expected, supplier); + + assertEquals("Map projection produced undesired map schema", + expected, actual); + assertEquals("Unexpected key ID discovered on the projected map schema", + 0, Integer.valueOf(actual.getProp(AvroSchemaUtil.KEY_ID_PROP)).intValue()); + assertEquals("Unexpected value ID discovered on the projected map schema", + 1, Integer.valueOf(actual.getProp(AvroSchemaUtil.VALUE_ID_PROP)).intValue()); + } + + @Test + public void projectMapWithExtraFieldInValueSchema() { + + final Type icebergType = Types.MapType.ofRequired(0, 1, + Types.StringType.get(), + Types.StructType.of( + optional(2, "int1", Types.IntegerType.get()), + optional(3, "string1", Types.StringType.get()) + ) + ); + + final org.apache.avro.Schema extraField = SchemaBuilder.map() + .prop(AvroSchemaUtil.KEY_ID_PROP, "0") + .prop(AvroSchemaUtil.VALUE_ID_PROP, "1") + .values( + SchemaBuilder.record("value").namespace("unit.test").fields() + .name("int1").prop(AvroSchemaUtil.FIELD_ID_PROP, "1").type().nullable().intType().noDefault() + .name("string1").prop(AvroSchemaUtil.FIELD_ID_PROP, "2").type().nullable().stringType().noDefault() + .name("float1").prop(AvroSchemaUtil.FIELD_ID_PROP, "3").type().nullable().floatType().noDefault() + .endRecord()); + + // once projected onto iceberg schema, the avro schema will lose the extra float field + final org.apache.avro.Schema expected = SchemaBuilder.map() + .prop(AvroSchemaUtil.KEY_ID_PROP, "0") + .prop(AvroSchemaUtil.VALUE_ID_PROP, "1") + .values( + SchemaBuilder.record("value").namespace("unit.test").fields() + .name("int1").prop(AvroSchemaUtil.FIELD_ID_PROP, "1").type().nullable().intType().noDefault() + .name("string1").prop(AvroSchemaUtil.FIELD_ID_PROP, "2").type().nullable().stringType().noDefault() + .endRecord()); + + final BuildAvroProjection testSubject = new BuildAvroProjection(icebergType, Collections.emptyMap()); + + final Supplier supplier = expected::getValueType; + + final org.apache.avro.Schema actual = testSubject.map(extraField, supplier); + + assertEquals("Map projection produced undesired map schema", + expected, actual); + assertEquals("Unexpected key ID discovered on the projected map schema", + 0, Integer.valueOf(actual.getProp(AvroSchemaUtil.KEY_ID_PROP)).intValue()); + assertEquals("Unexpected value ID discovered on the projected map schema", + 1, Integer.valueOf(actual.getProp(AvroSchemaUtil.VALUE_ID_PROP)).intValue()); + } + + + @Test + public void projectMapWithLessFieldInValueSchema() { + + final Type icebergType = Types.MapType.ofRequired(0, 1, + Types.StringType.get(), + Types.StructType.of( + optional(2, "int1", Types.IntegerType.get()), + optional(3, "string1", Types.StringType.get()) + ) + ); + + final org.apache.avro.Schema lessField = SchemaBuilder.map() + .prop(AvroSchemaUtil.KEY_ID_PROP, "0") + .prop(AvroSchemaUtil.VALUE_ID_PROP, "1") + .values( + SchemaBuilder.record("value").namespace("unit.test").fields() + .name("int1").prop(AvroSchemaUtil.FIELD_ID_PROP, "1").type().nullable().intType().noDefault() + .endRecord()); + + // once projected onto iceberg schema, the avro schema will have an extra string column + final org.apache.avro.Schema expected = SchemaBuilder.map() + .prop(AvroSchemaUtil.KEY_ID_PROP, "0") + .prop(AvroSchemaUtil.VALUE_ID_PROP, "1") + .values( + SchemaBuilder.record("value").namespace("unit.test").fields() + .name("int1").prop(AvroSchemaUtil.FIELD_ID_PROP, "1").type().nullable().intType().noDefault() + .name("string1_r2").prop(AvroSchemaUtil.FIELD_ID_PROP, "2").type().nullable().stringType().noDefault() + .endRecord()); + + final BuildAvroProjection testSubject = new BuildAvroProjection(icebergType, Collections.emptyMap()); + + final Supplier supplier = expected::getValueType; + + final org.apache.avro.Schema actual = testSubject.map(lessField, supplier); + + assertEquals("Map projection produced undesired map schema", + expected, actual); + assertEquals("Unexpected key ID discovered on the projected map schema", + 0, Integer.valueOf(actual.getProp(AvroSchemaUtil.KEY_ID_PROP)).intValue()); + assertEquals("Unexpected value ID discovered on the projected map schema", + 1, Integer.valueOf(actual.getProp(AvroSchemaUtil.VALUE_ID_PROP)).intValue()); + } +}