Skip to content

Commit

Permalink
API: add StructTransform base class for PartitionKey and SortKey. add…
Browse files Browse the repository at this point in the history
… SortOrderComparators (apache#7798)
  • Loading branch information
stevenzwu authored Nov 28, 2023
1 parent 4e62b58 commit b21a8ce
Show file tree
Hide file tree
Showing 5 changed files with 814 additions and 82 deletions.
97 changes: 15 additions & 82 deletions api/src/main/java/org/apache/iceberg/PartitionKey.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,72 +18,32 @@
*/
package org.apache.iceberg;

import java.io.Serializable;
import java.lang.reflect.Array;
import java.util.Arrays;
import java.util.List;
import java.util.function.Function;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.util.SerializableFunction;
import java.util.stream.Collectors;

/**
* A struct of partition values.
*
* <p>Instances of this class can produce partition values from a data row passed to {@link
* #partition(StructLike)}.
*/
public class PartitionKey implements StructLike, Serializable {
public class PartitionKey extends StructTransform {

private final PartitionSpec spec;
private final int size;
private final Object[] partitionTuple;
private final SerializableFunction[] transforms;
private final Accessor<StructLike>[] accessors;
private final Schema inputSchema;

@SuppressWarnings("unchecked")
public PartitionKey(PartitionSpec spec, Schema inputSchema) {
super(inputSchema, fieldTransform(spec));
this.spec = spec;

List<PartitionField> fields = spec.fields();
this.size = fields.size();
this.partitionTuple = new Object[size];
this.transforms = new SerializableFunction[size];
this.accessors = (Accessor<StructLike>[]) Array.newInstance(Accessor.class, size);

Schema schema = spec.schema();
for (int i = 0; i < size; i += 1) {
PartitionField field = fields.get(i);
Accessor<StructLike> accessor = inputSchema.accessorForField(field.sourceId());
Preconditions.checkArgument(
accessor != null,
"Cannot build accessor for field: " + schema.findField(field.sourceId()));
this.accessors[i] = accessor;
this.transforms[i] = field.transform().bind(accessor.type());
}
this.inputSchema = inputSchema;
}

private PartitionKey(PartitionKey toCopy) {
// only need deep copy inside StructTransform
super(toCopy);
this.spec = toCopy.spec;
this.size = toCopy.size;
this.partitionTuple = new Object[toCopy.partitionTuple.length];
this.transforms = toCopy.transforms;
this.accessors = toCopy.accessors;

System.arraycopy(toCopy.partitionTuple, 0, this.partitionTuple, 0, partitionTuple.length);
}

@Override
public String toString() {
StringBuilder sb = new StringBuilder();
sb.append("[");
for (int i = 0; i < partitionTuple.length; i += 1) {
if (i > 0) {
sb.append(", ");
}
sb.append(partitionTuple[i]);
}
sb.append("]");
return sb.toString();
this.inputSchema = toCopy.inputSchema;
}

public PartitionKey copy() {
Expand All @@ -101,41 +61,14 @@ public String toPath() {
*/
@SuppressWarnings("unchecked")
public void partition(StructLike row) {
for (int i = 0; i < partitionTuple.length; i += 1) {
Function<Object, Object> transform = transforms[i];
partitionTuple[i] = transform.apply(accessors[i].get(row));
}
}

@Override
public int size() {
return size;
}

@Override
public <T> T get(int pos, Class<T> javaClass) {
return javaClass.cast(partitionTuple[pos]);
}

@Override
public <T> void set(int pos, T value) {
partitionTuple[pos] = value;
}

@Override
public boolean equals(Object o) {
if (this == o) {
return true;
} else if (!(o instanceof PartitionKey)) {
return false;
}

PartitionKey that = (PartitionKey) o;
return Arrays.equals(partitionTuple, that.partitionTuple);
wrap(row);
}

@Override
public int hashCode() {
return Arrays.hashCode(partitionTuple);
private static List<FieldTransform> fieldTransform(PartitionSpec spec) {
return spec.fields().stream()
.map(
partitionField ->
new FieldTransform(partitionField.sourceId(), partitionField.transform()))
.collect(Collectors.toList());
}
}
56 changes: 56 additions & 0 deletions api/src/main/java/org/apache/iceberg/SortKey.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iceberg;

import java.util.List;
import java.util.stream.Collectors;

/**
* A struct of flattened sort field values.
*
* <p>Instances of this class can produce sort values from a row passed to {@link
* #wrap(StructLike)}.
*/
public class SortKey extends StructTransform {
private final Schema schema;
private final SortOrder sortOrder;

public SortKey(Schema schema, SortOrder sortOrder) {
super(schema, fieldTransform(sortOrder));
this.schema = schema;
this.sortOrder = sortOrder;
}

private SortKey(SortKey toCopy) {
// only need deep copy inside StructTransform
super(toCopy);
this.schema = toCopy.schema;
this.sortOrder = toCopy.sortOrder;
}

public SortKey copy() {
return new SortKey(this);
}

private static List<FieldTransform> fieldTransform(SortOrder sortOrder) {
return sortOrder.fields().stream()
.map(sortField -> new FieldTransform(sortField.sourceId(), sortField.transform()))
.collect(Collectors.toList());
}
}
105 changes: 105 additions & 0 deletions api/src/main/java/org/apache/iceberg/SortOrderComparators.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iceberg;

import java.lang.reflect.Array;
import java.util.Comparator;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.types.Comparators;
import org.apache.iceberg.types.Type;
import org.apache.iceberg.types.Types;

public class SortOrderComparators {
private SortOrderComparators() {}

/** Compare structs with the specified sort order projection */
public static Comparator<StructLike> forSchema(Schema schema, SortOrder sortOrder) {
Preconditions.checkArgument(sortOrder.isSorted(), "Invalid sort order: unsorted");
SortOrder.checkCompatibility(sortOrder, schema);
return new SortOrderComparator(schema, sortOrder);
}

/** Util method to chain sort direction and null order to the original comparator. */
private static Comparator<Object> sortFieldComparator(
Comparator<Object> original, SortField sortField) {
Comparator<Object> comparator = original;
if (sortField == null) {
return Comparators.nullsFirst().thenComparing(comparator);
}

if (sortField.direction() == SortDirection.DESC) {
comparator = comparator.reversed();
}

if (sortField.nullOrder() == NullOrder.NULLS_FIRST) {
comparator = Comparators.nullsFirst().thenComparing(comparator);
} else if (sortField.nullOrder() == NullOrder.NULLS_LAST) {
comparator = Comparators.nullsLast().thenComparing(comparator);
}

return comparator;
}

private static class SortOrderComparator implements Comparator<StructLike> {
private final SortKey leftKey;
private final SortKey rightKey;
private final int size;
private final Comparator<Object>[] comparators;
private final Type[] transformResultTypes;

private SortOrderComparator(Schema schema, SortOrder sortOrder) {
this.leftKey = new SortKey(schema, sortOrder);
this.rightKey = new SortKey(schema, sortOrder);
this.size = sortOrder.fields().size();
this.comparators = (Comparator<Object>[]) Array.newInstance(Comparator.class, size);
this.transformResultTypes = (Type[]) Array.newInstance(Type.class, size);

for (int i = 0; i < size; ++i) {
SortField sortField = sortOrder.fields().get(i);
Types.NestedField field = schema.findField(sortField.sourceId());
Type transformResultType = sortField.transform().getResultType(field.type());
Preconditions.checkArgument(
transformResultType.isPrimitiveType(), "Invalid transform result type: non-primitive");
transformResultTypes[i] = transformResultType;
Comparator<Object> comparator = Comparators.forType(transformResultType.asPrimitiveType());
comparators[i] = sortFieldComparator(comparator, sortField);
}
}

@Override
public int compare(StructLike left, StructLike right) {
if (left == right) {
return 0;
}

leftKey.wrap(left);
rightKey.wrap(right);

for (int i = 0; i < size; i += 1) {
Class<?> valueClass = transformResultTypes[i].typeId().javaClass();
int cmp = comparators[i].compare(leftKey.get(i, valueClass), rightKey.get(i, valueClass));
if (cmp != 0) {
return cmp;
}
}

return 0;
}
}
}
Loading

0 comments on commit b21a8ce

Please sign in to comment.