Skip to content

Commit

Permalink
Revert "Merge pull request apache#32757: Schema inference parameteriz…
Browse files Browse the repository at this point in the history
…ed types"

This reverts commit a50f91c.
  • Loading branch information
reuvenlax committed Oct 21, 2024
1 parent 1e27978 commit b8a2b9a
Show file tree
Hide file tree
Showing 27 changed files with 197 additions and 961 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,8 @@

import java.lang.reflect.Method;
import java.lang.reflect.Modifier;
import java.lang.reflect.Type;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import org.apache.beam.sdk.schemas.annotations.SchemaIgnore;
import org.apache.beam.sdk.schemas.utils.AutoValueUtils;
Expand Down Expand Up @@ -63,9 +61,8 @@ public List<FieldValueTypeInformation> get(TypeDescriptor<?> typeDescriptor) {
.filter(m -> !m.isAnnotationPresent(SchemaIgnore.class))
.collect(Collectors.toList());
List<FieldValueTypeInformation> types = Lists.newArrayListWithCapacity(methods.size());
Map<Type, Type> boundTypes = ReflectUtils.getAllBoundTypes(typeDescriptor);
for (int i = 0; i < methods.size(); ++i) {
types.add(FieldValueTypeInformation.forGetter(methods.get(i), i, boundTypes));
types.add(FieldValueTypeInformation.forGetter(methods.get(i), i));
}
types.sort(Comparator.comparing(FieldValueTypeInformation::getNumber));
validateFieldNumbers(types);
Expand Down Expand Up @@ -146,8 +143,7 @@ public SchemaUserTypeCreator schemaTypeCreator(

@Override
public <T> @Nullable Schema schemaFor(TypeDescriptor<T> typeDescriptor) {
Map<Type, Type> boundTypes = ReflectUtils.getAllBoundTypes(typeDescriptor);
return JavaBeanUtils.schemaFromJavaBeanClass(
typeDescriptor, AbstractGetterTypeSupplier.INSTANCE, boundTypes);
typeDescriptor, AbstractGetterTypeSupplier.INSTANCE);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,10 @@
import java.lang.reflect.Field;
import java.lang.reflect.Member;
import java.lang.reflect.Method;
import java.lang.reflect.Type;
import java.util.Arrays;
import java.util.Collections;
import java.util.Map;
import java.util.stream.Stream;
import org.apache.beam.sdk.annotations.Internal;
import org.apache.beam.sdk.schemas.annotations.SchemaCaseFormat;
import org.apache.beam.sdk.schemas.annotations.SchemaFieldDescription;
import org.apache.beam.sdk.schemas.annotations.SchemaFieldName;
Expand All @@ -46,7 +44,6 @@
"nullness", // TODO(https://github.com/apache/beam/issues/20497)
"rawtypes"
})
@Internal
public abstract class FieldValueTypeInformation implements Serializable {
/** Optionally returns the field index. */
public abstract @Nullable Integer getNumber();
Expand Down Expand Up @@ -128,20 +125,18 @@ public static FieldValueTypeInformation forOneOf(
.build();
}

public static FieldValueTypeInformation forField(
Field field, int index, Map<Type, Type> boundTypes) {
TypeDescriptor<?> type =
TypeDescriptor.of(ReflectUtils.resolveType(field.getGenericType(), boundTypes));
public static FieldValueTypeInformation forField(Field field, int index) {
TypeDescriptor<?> type = TypeDescriptor.of(field.getGenericType());
return new AutoValue_FieldValueTypeInformation.Builder()
.setName(getNameOverride(field.getName(), field))
.setNumber(getNumberOverride(index, field))
.setNullable(hasNullableAnnotation(field))
.setType(type)
.setRawType(type.getRawType())
.setField(field)
.setElementType(getIterableComponentType(field, boundTypes))
.setMapKeyType(getMapKeyType(field, boundTypes))
.setMapValueType(getMapValueType(field, boundTypes))
.setElementType(getIterableComponentType(field))
.setMapKeyType(getMapKeyType(field))
.setMapValueType(getMapValueType(field))
.setOneOfTypes(Collections.emptyMap())
.setDescription(getFieldDescription(field))
.build();
Expand Down Expand Up @@ -189,8 +184,7 @@ public static <T extends AnnotatedElement & Member> String getNameOverride(
return fieldDescription.value();
}

public static FieldValueTypeInformation forGetter(
Method method, int index, Map<Type, Type> boundTypes) {
public static FieldValueTypeInformation forGetter(Method method, int index) {
String name;
if (method.getName().startsWith("get")) {
name = ReflectUtils.stripPrefix(method.getName(), "get");
Expand All @@ -200,8 +194,7 @@ public static FieldValueTypeInformation forGetter(
throw new RuntimeException("Getter has wrong prefix " + method.getName());
}

TypeDescriptor<?> type =
TypeDescriptor.of(ReflectUtils.resolveType(method.getGenericReturnType(), boundTypes));
TypeDescriptor<?> type = TypeDescriptor.of(method.getGenericReturnType());
boolean nullable = hasNullableReturnType(method);
return new AutoValue_FieldValueTypeInformation.Builder()
.setName(getNameOverride(name, method))
Expand All @@ -210,9 +203,9 @@ public static FieldValueTypeInformation forGetter(
.setType(type)
.setRawType(type.getRawType())
.setMethod(method)
.setElementType(getIterableComponentType(type, boundTypes))
.setMapKeyType(getMapKeyType(type, boundTypes))
.setMapValueType(getMapValueType(type, boundTypes))
.setElementType(getIterableComponentType(type))
.setMapKeyType(getMapKeyType(type))
.setMapValueType(getMapValueType(type))
.setOneOfTypes(Collections.emptyMap())
.setDescription(getFieldDescription(method))
.build();
Expand Down Expand Up @@ -259,33 +252,29 @@ private static boolean isNullableAnnotation(Annotation annotation) {
return annotation.annotationType().getSimpleName().equals("Nullable");
}

public static FieldValueTypeInformation forSetter(
Method method, Map<Type, Type> boundParameters) {
return forSetter(method, "set", boundParameters);
public static FieldValueTypeInformation forSetter(Method method) {
return forSetter(method, "set");
}

public static FieldValueTypeInformation forSetter(
Method method, String setterPrefix, Map<Type, Type> boundTypes) {
public static FieldValueTypeInformation forSetter(Method method, String setterPrefix) {
String name;
if (method.getName().startsWith(setterPrefix)) {
name = ReflectUtils.stripPrefix(method.getName(), setterPrefix);
} else {
throw new RuntimeException("Setter has wrong prefix " + method.getName());
}

TypeDescriptor<?> type =
TypeDescriptor.of(
ReflectUtils.resolveType(method.getGenericParameterTypes()[0], boundTypes));
TypeDescriptor<?> type = TypeDescriptor.of(method.getGenericParameterTypes()[0]);
boolean nullable = hasSingleNullableParameter(method);
return new AutoValue_FieldValueTypeInformation.Builder()
.setName(name)
.setNullable(nullable)
.setType(type)
.setRawType(type.getRawType())
.setMethod(method)
.setElementType(getIterableComponentType(type, boundTypes))
.setMapKeyType(getMapKeyType(type, boundTypes))
.setMapValueType(getMapValueType(type, boundTypes))
.setElementType(getIterableComponentType(type))
.setMapKeyType(getMapKeyType(type))
.setMapValueType(getMapValueType(type))
.setOneOfTypes(Collections.emptyMap())
.build();
}
Expand All @@ -294,15 +283,13 @@ public FieldValueTypeInformation withName(String name) {
return toBuilder().setName(name).build();
}

private static FieldValueTypeInformation getIterableComponentType(
Field field, Map<Type, Type> boundTypes) {
return getIterableComponentType(TypeDescriptor.of(field.getGenericType()), boundTypes);
private static FieldValueTypeInformation getIterableComponentType(Field field) {
return getIterableComponentType(TypeDescriptor.of(field.getGenericType()));
}

static @Nullable FieldValueTypeInformation getIterableComponentType(
TypeDescriptor<?> valueType, Map<Type, Type> boundTypes) {
static @Nullable FieldValueTypeInformation getIterableComponentType(TypeDescriptor<?> valueType) {
// TODO: Figure out nullable elements.
TypeDescriptor<?> componentType = ReflectUtils.getIterableComponentType(valueType, boundTypes);
TypeDescriptor<?> componentType = ReflectUtils.getIterableComponentType(valueType);
if (componentType == null) {
return null;
}
Expand All @@ -312,43 +299,41 @@ private static FieldValueTypeInformation getIterableComponentType(
.setNullable(false)
.setType(componentType)
.setRawType(componentType.getRawType())
.setElementType(getIterableComponentType(componentType, boundTypes))
.setMapKeyType(getMapKeyType(componentType, boundTypes))
.setMapValueType(getMapValueType(componentType, boundTypes))
.setElementType(getIterableComponentType(componentType))
.setMapKeyType(getMapKeyType(componentType))
.setMapValueType(getMapValueType(componentType))
.setOneOfTypes(Collections.emptyMap())
.build();
}

// If the Field is a map type, returns the key type, otherwise returns a null reference.

private static @Nullable FieldValueTypeInformation getMapKeyType(
Field field, Map<Type, Type> boundTypes) {
return getMapKeyType(TypeDescriptor.of(field.getGenericType()), boundTypes);
private static @Nullable FieldValueTypeInformation getMapKeyType(Field field) {
return getMapKeyType(TypeDescriptor.of(field.getGenericType()));
}

private static @Nullable FieldValueTypeInformation getMapKeyType(
TypeDescriptor<?> typeDescriptor, Map<Type, Type> boundTypes) {
return getMapType(typeDescriptor, 0, boundTypes);
TypeDescriptor<?> typeDescriptor) {
return getMapType(typeDescriptor, 0);
}

// If the Field is a map type, returns the value type, otherwise returns a null reference.

private static @Nullable FieldValueTypeInformation getMapValueType(
Field field, Map<Type, Type> boundTypes) {
return getMapType(TypeDescriptor.of(field.getGenericType()), 1, boundTypes);
private static @Nullable FieldValueTypeInformation getMapValueType(Field field) {
return getMapType(TypeDescriptor.of(field.getGenericType()), 1);
}

private static @Nullable FieldValueTypeInformation getMapValueType(
TypeDescriptor<?> typeDescriptor, Map<Type, Type> boundTypes) {
return getMapType(typeDescriptor, 1, boundTypes);
TypeDescriptor<?> typeDescriptor) {
return getMapType(typeDescriptor, 1);
}

// If the Field is a map type, returns the key or value type (0 is key type, 1 is value).
// Otherwise returns a null reference.
@SuppressWarnings("unchecked")
private static @Nullable FieldValueTypeInformation getMapType(
TypeDescriptor<?> valueType, int index, Map<Type, Type> boundTypes) {
TypeDescriptor mapType = ReflectUtils.getMapType(valueType, index, boundTypes);
TypeDescriptor<?> valueType, int index) {
TypeDescriptor mapType = ReflectUtils.getMapType(valueType, index);
if (mapType == null) {
return null;
}
Expand All @@ -357,9 +342,9 @@ private static FieldValueTypeInformation getIterableComponentType(
.setNullable(false)
.setType(mapType)
.setRawType(mapType.getRawType())
.setElementType(getIterableComponentType(mapType, boundTypes))
.setMapKeyType(getMapKeyType(mapType, boundTypes))
.setMapValueType(getMapValueType(mapType, boundTypes))
.setElementType(getIterableComponentType(mapType))
.setMapKeyType(getMapKeyType(mapType))
.setMapValueType(getMapValueType(mapType))
.setOneOfTypes(Collections.emptyMap())
.build();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,8 @@

import java.lang.reflect.Constructor;
import java.lang.reflect.Method;
import java.lang.reflect.Type;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import org.apache.beam.sdk.schemas.annotations.SchemaCaseFormat;
import org.apache.beam.sdk.schemas.annotations.SchemaFieldName;
Expand Down Expand Up @@ -69,9 +67,8 @@ public List<FieldValueTypeInformation> get(TypeDescriptor<?> typeDescriptor) {
.filter(m -> !m.isAnnotationPresent(SchemaIgnore.class))
.collect(Collectors.toList());
List<FieldValueTypeInformation> types = Lists.newArrayListWithCapacity(methods.size());
Map<Type, Type> boundTypes = ReflectUtils.getAllBoundTypes(typeDescriptor);
for (int i = 0; i < methods.size(); ++i) {
types.add(FieldValueTypeInformation.forGetter(methods.get(i), i, boundTypes));
types.add(FieldValueTypeInformation.forGetter(methods.get(i), i));
}
types.sort(Comparator.comparing(FieldValueTypeInformation::getNumber));
validateFieldNumbers(types);
Expand Down Expand Up @@ -114,11 +111,10 @@ public static class SetterTypeSupplier implements FieldValueTypeSupplier {

@Override
public List<FieldValueTypeInformation> get(TypeDescriptor<?> typeDescriptor) {
Map<Type, Type> boundTypes = ReflectUtils.getAllBoundTypes(typeDescriptor);
return ReflectUtils.getMethods(typeDescriptor.getRawType()).stream()
.filter(ReflectUtils::isSetter)
.filter(m -> !m.isAnnotationPresent(SchemaIgnore.class))
.map(m -> FieldValueTypeInformation.forSetter(m, boundTypes))
.map(FieldValueTypeInformation::forSetter)
.map(
t -> {
if (t.getMethod().getAnnotation(SchemaFieldNumber.class) != null) {
Expand Down Expand Up @@ -160,10 +156,8 @@ public boolean equals(@Nullable Object obj) {

@Override
public <T> Schema schemaFor(TypeDescriptor<T> typeDescriptor) {
Map<Type, Type> boundTypes = ReflectUtils.getAllBoundTypes(typeDescriptor);
Schema schema =
JavaBeanUtils.schemaFromJavaBeanClass(
typeDescriptor, GetterTypeSupplier.INSTANCE, boundTypes);
JavaBeanUtils.schemaFromJavaBeanClass(typeDescriptor, GetterTypeSupplier.INSTANCE);

// If there are no creator methods, then validate that we have setters for every field.
// Otherwise, we will have no way of creating instances of the class.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,8 @@
import java.lang.reflect.Field;
import java.lang.reflect.Method;
import java.lang.reflect.Modifier;
import java.lang.reflect.Type;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
Expand Down Expand Up @@ -64,11 +62,9 @@ public List<FieldValueTypeInformation> get(TypeDescriptor<?> typeDescriptor) {
ReflectUtils.getFields(typeDescriptor.getRawType()).stream()
.filter(m -> !m.isAnnotationPresent(SchemaIgnore.class))
.collect(Collectors.toList());

List<FieldValueTypeInformation> types = Lists.newArrayListWithCapacity(fields.size());
Map<Type, Type> boundTypes = ReflectUtils.getAllBoundTypes(typeDescriptor);
for (int i = 0; i < fields.size(); ++i) {
types.add(FieldValueTypeInformation.forField(fields.get(i), i, boundTypes));
types.add(FieldValueTypeInformation.forField(fields.get(i), i));
}
types.sort(Comparator.comparing(FieldValueTypeInformation::getNumber));
validateFieldNumbers(types);
Expand Down Expand Up @@ -115,9 +111,7 @@ private static void validateFieldNumbers(List<FieldValueTypeInformation> types)

@Override
public <T> Schema schemaFor(TypeDescriptor<T> typeDescriptor) {
Map<Type, Type> boundTypes = ReflectUtils.getAllBoundTypes(typeDescriptor);
return POJOUtils.schemaFromPojoClass(
typeDescriptor, JavaFieldTypeSupplier.INSTANCE, boundTypes);
return POJOUtils.schemaFromPojoClass(typeDescriptor, JavaFieldTypeSupplier.INSTANCE);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,8 @@ public interface SchemaProvider extends Serializable {
* Given a type, return a function that converts that type to a {@link Row} object If no schema
* exists, returns null.
*/
<T> @Nullable SerializableFunction<T, Row> toRowFunction(TypeDescriptor<T> typeDescriptor);
@Nullable
<T> SerializableFunction<T, Row> toRowFunction(TypeDescriptor<T> typeDescriptor);

/**
* Given a type, returns a function that converts from a {@link Row} object to that type. If no
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,12 +76,13 @@ void registerProvider(TypeDescriptor typeDescriptor, SchemaProvider schemaProvid
providers.put(typeDescriptor, schemaProvider);
}

private <T> @Nullable SchemaProvider schemaProviderFor(TypeDescriptor<T> typeDescriptor) {
@Override
public <T> @Nullable Schema schemaFor(TypeDescriptor<T> typeDescriptor) {
TypeDescriptor<?> type = typeDescriptor;
do {
SchemaProvider schemaProvider = providers.get(type);
if (schemaProvider != null) {
return schemaProvider;
return schemaProvider.schemaFor(type);
}
Class<?> superClass = type.getRawType().getSuperclass();
if (superClass == null || superClass.equals(Object.class)) {
Expand All @@ -91,24 +92,38 @@ void registerProvider(TypeDescriptor typeDescriptor, SchemaProvider schemaProvid
} while (true);
}

@Override
public <T> @Nullable Schema schemaFor(TypeDescriptor<T> typeDescriptor) {
@Nullable SchemaProvider schemaProvider = schemaProviderFor(typeDescriptor);
return schemaProvider != null ? schemaProvider.schemaFor(typeDescriptor) : null;
}

@Override
public <T> @Nullable SerializableFunction<T, Row> toRowFunction(
TypeDescriptor<T> typeDescriptor) {
@Nullable SchemaProvider schemaProvider = schemaProviderFor(typeDescriptor);
return schemaProvider != null ? schemaProvider.toRowFunction(typeDescriptor) : null;
TypeDescriptor<?> type = typeDescriptor;
do {
SchemaProvider schemaProvider = providers.get(type);
if (schemaProvider != null) {
return (SerializableFunction<T, Row>) schemaProvider.toRowFunction(type);
}
Class<?> superClass = type.getRawType().getSuperclass();
if (superClass == null || superClass.equals(Object.class)) {
return null;
}
type = TypeDescriptor.of(superClass);
} while (true);
}

@Override
public <T> @Nullable SerializableFunction<Row, T> fromRowFunction(
TypeDescriptor<T> typeDescriptor) {
@Nullable SchemaProvider schemaProvider = schemaProviderFor(typeDescriptor);
return schemaProvider != null ? schemaProvider.fromRowFunction(typeDescriptor) : null;
TypeDescriptor<?> type = typeDescriptor;
do {
SchemaProvider schemaProvider = providers.get(type);
if (schemaProvider != null) {
return (SerializableFunction<Row, T>) schemaProvider.fromRowFunction(type);
}
Class<?> superClass = type.getRawType().getSuperclass();
if (superClass == null || superClass.equals(Object.class)) {
return null;
}
type = TypeDescriptor.of(superClass);
} while (true);
}
}

Expand Down
Loading

0 comments on commit b8a2b9a

Please sign in to comment.