Skip to content

Commit

Permalink
Refactor Parquet visitor (apache#1001)
Browse files Browse the repository at this point in the history
  • Loading branch information
rdblue authored May 13, 2020
1 parent 0c881c2 commit e9a9032
Show file tree
Hide file tree
Showing 2 changed files with 153 additions and 76 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -226,11 +226,7 @@ public Optional<Type> visit(LogicalTypeAnnotation.BsonLogicalTypeAnnotation bson
}

private void addAlias(String name, int fieldId) {
String fullName = name;
if (!fieldNames.isEmpty()) {
fullName = DOT.join(DOT.join(fieldNames.descendingIterator()), name);
}
aliasToId.put(fullName, fieldId);
aliasToId.put(DOT.join(path(name)), fieldId);
}

protected int nextId() {
Expand Down
223 changes: 152 additions & 71 deletions parquet/src/main/java/org/apache/iceberg/parquet/ParquetTypeVisitor.java
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,7 @@
import org.apache.parquet.schema.Type;

public class ParquetTypeVisitor<T> {
@SuppressWarnings("checkstyle:VisibilityModifier")
protected Deque<String> fieldNames = Lists.newLinkedList();
private final Deque<String> fieldNames = Lists.newLinkedList();

public static <T> T visit(Type type, ParquetTypeVisitor<T> visitor) {
if (type instanceof MessageType) {
Expand All @@ -48,72 +47,10 @@ public static <T> T visit(Type type, ParquetTypeVisitor<T> visitor) {
if (annotation != null) {
switch (annotation) {
case LIST:
Preconditions.checkArgument(!group.isRepetition(Type.Repetition.REPEATED),
"Invalid list: top-level group is repeated: %s", group);
Preconditions.checkArgument(group.getFieldCount() == 1,
"Invalid list: does not contain single repeated field: %s", group);

GroupType repeatedElement = group.getFields().get(0).asGroupType();
Preconditions.checkArgument(repeatedElement.isRepetition(Type.Repetition.REPEATED),
"Invalid list: inner group is not repeated");
Preconditions.checkArgument(repeatedElement.getFieldCount() <= 1,
"Invalid list: repeated group is not a single field: %s", group);

visitor.fieldNames.push(repeatedElement.getName());
try {
T elementResult = null;
if (repeatedElement.getFieldCount() > 0) {
elementResult = visitField(repeatedElement.getType(0), visitor);
}

return visitor.list(group, elementResult);

} finally {
visitor.fieldNames.pop();
}
return visitList(group, visitor);

case MAP:
Preconditions.checkArgument(!group.isRepetition(Type.Repetition.REPEATED),
"Invalid map: top-level group is repeated: %s", group);
Preconditions.checkArgument(group.getFieldCount() == 1,
"Invalid map: does not contain single repeated field: %s", group);

GroupType repeatedKeyValue = group.getType(0).asGroupType();
Preconditions.checkArgument(repeatedKeyValue.isRepetition(Type.Repetition.REPEATED),
"Invalid map: inner group is not repeated");
Preconditions.checkArgument(repeatedKeyValue.getFieldCount() <= 2,
"Invalid map: repeated group does not have 2 fields");

visitor.fieldNames.push(repeatedKeyValue.getName());
try {
T keyResult = null;
T valueResult = null;
switch (repeatedKeyValue.getFieldCount()) {
case 2:
// if there are 2 fields, both key and value are projected
keyResult = visitField(repeatedKeyValue.getType(0), visitor);
valueResult = visitField(repeatedKeyValue.getType(1), visitor);
break;
case 1:
// if there is just one, use the name to determine what it is
Type keyOrValue = repeatedKeyValue.getType(0);
if (keyOrValue.getName().equalsIgnoreCase("key")) {
keyResult = visitField(keyOrValue, visitor);
// value result remains null
} else {
valueResult = visitField(keyOrValue, visitor);
// key result remains null
}
break;
default:
// both results will remain null
}

return visitor.map(group, keyResult, valueResult);

} finally {
visitor.fieldNames.pop();
}
return visitMap(group, visitor);

default:
}
Expand All @@ -123,19 +60,115 @@ public static <T> T visit(Type type, ParquetTypeVisitor<T> visitor) {
}
}

private static <T> T visitField(Type field, ParquetTypeVisitor<T> visitor) {
visitor.fieldNames.push(field.getName());
private static <T> T visitList(GroupType list, ParquetTypeVisitor<T> visitor) {
Preconditions.checkArgument(!list.isRepetition(Type.Repetition.REPEATED),
"Invalid list: top-level group is repeated: %s", list);
Preconditions.checkArgument(list.getFieldCount() == 1,
"Invalid list: does not contain single repeated field: %s", list);

GroupType repeatedElement = list.getFields().get(0).asGroupType();
Preconditions.checkArgument(repeatedElement.isRepetition(Type.Repetition.REPEATED),
"Invalid list: inner group is not repeated");
Preconditions.checkArgument(repeatedElement.getFieldCount() <= 1,
"Invalid list: repeated group is not a single field: %s", list);

visitor.beforeRepeatedElement(repeatedElement);
try {
return visit(field, visitor);
T elementResult = null;
if (repeatedElement.getFieldCount() > 0) {
Type elementField = repeatedElement.getType(0);
visitor.beforeElementField(elementField);
try {
elementResult = visit(elementField, visitor);
} finally {
visitor.afterElementField(elementField);
}
}

return visitor.list(list, elementResult);

} finally {
visitor.fieldNames.pop();
visitor.afterRepeatedElement(repeatedElement);
}
}

private static <T> T visitMap(GroupType map, ParquetTypeVisitor<T> visitor) {
Preconditions.checkArgument(!map.isRepetition(Type.Repetition.REPEATED),
"Invalid map: top-level group is repeated: %s", map);
Preconditions.checkArgument(map.getFieldCount() == 1,
"Invalid map: does not contain single repeated field: %s", map);

GroupType repeatedKeyValue = map.getType(0).asGroupType();
Preconditions.checkArgument(repeatedKeyValue.isRepetition(Type.Repetition.REPEATED),
"Invalid map: inner group is not repeated");
Preconditions.checkArgument(repeatedKeyValue.getFieldCount() <= 2,
"Invalid map: repeated group does not have 2 fields");

visitor.beforeRepeatedKeyValue(repeatedKeyValue);
try {
T keyResult = null;
T valueResult = null;
switch (repeatedKeyValue.getFieldCount()) {
case 2:
// if there are 2 fields, both key and value are projected
Type keyType = repeatedKeyValue.getType(0);
visitor.beforeKeyField(keyType);
try {
keyResult = visit(keyType, visitor);
} finally {
visitor.afterKeyField(keyType);
}
Type valueType = repeatedKeyValue.getType(1);
visitor.beforeValueField(valueType);
try {
valueResult = visit(valueType, visitor);
} finally {
visitor.afterValueField(valueType);
}
break;

case 1:
// if there is just one, use the name to determine what it is
Type keyOrValue = repeatedKeyValue.getType(0);
if (keyOrValue.getName().equalsIgnoreCase("key")) {
visitor.beforeKeyField(keyOrValue);
try {
keyResult = visit(keyOrValue, visitor);
} finally {
visitor.afterKeyField(keyOrValue);
}
// value result remains null
} else {
visitor.beforeValueField(keyOrValue);
try {
valueResult = visit(keyOrValue, visitor);
} finally {
visitor.afterValueField(keyOrValue);
}
// key result remains null
}
break;

default:
// both results will remain null
}

return visitor.map(map, keyResult, valueResult);

} finally {
visitor.afterRepeatedKeyValue(repeatedKeyValue);
}
}

private static <T> List<T> visitFields(GroupType group, ParquetTypeVisitor<T> visitor) {
List<T> results = Lists.newArrayListWithExpectedSize(group.getFieldCount());
for (Type field : group.getFields()) {
results.add(visitField(field, visitor));
visitor.beforeField(field);
try {
results.add(visit(field, visitor));
} finally {
visitor.afterField(field);
}
}

return results;
Expand All @@ -161,6 +194,54 @@ public T primitive(PrimitiveType primitive) {
return null;
}

public void beforeField(Type type) {
fieldNames.push(type.getName());
}

public void afterField(Type type) {
fieldNames.pop();
}

public void beforeRepeatedElement(Type element) {
beforeField(element);
}

public void afterRepeatedElement(Type element) {
afterField(element);
}

public void beforeElementField(Type element) {
beforeField(element);
}

public void afterElementField(Type element) {
afterField(element);
}

public void beforeRepeatedKeyValue(Type keyValue) {
beforeField(keyValue);
}

public void afterRepeatedKeyValue(Type keyValue) {
afterField(keyValue);
}

public void beforeKeyField(Type key) {
beforeField(key);
}

public void afterKeyField(Type key) {
afterField(key);
}

public void beforeValueField(Type value) {
beforeField(value);
}

public void afterValueField(Type value) {
afterField(value);
}

protected String[] currentPath() {
return Lists.newArrayList(fieldNames.descendingIterator()).toArray(new String[0]);
}
Expand Down

0 comments on commit e9a9032

Please sign in to comment.