Skip to content

Commit

Permalink
Spark: Refactor building write distribution and ordering (apache#3720)
Browse files Browse the repository at this point in the history
  • Loading branch information
aokolnychyi authored Dec 13, 2021
1 parent 4c48df2 commit 53557ac
Show file tree
Hide file tree
Showing 6 changed files with 365 additions and 140 deletions.
1 change: 1 addition & 0 deletions .baseline/checkstyle/checkstyle.xml
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@
org.apache.commons.lang3.Validate.*,
org.apache.iceberg.expressions.Expressions.*,
org.apache.iceberg.expressions.Expression.Operation.*,
org.apache.iceberg.DistributionMode.*,
org.apache.iceberg.IsolationLevel.*,
org.apache.iceberg.NullOrder.*,
org.apache.iceberg.MetadataTableType.*,
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,11 @@
import org.apache.iceberg.Schema;
import org.apache.iceberg.SortDirection;
import org.apache.iceberg.transforms.SortOrderVisitor;
import org.apache.spark.sql.connector.expressions.Expressions;
import org.apache.spark.sql.connector.expressions.NullOrdering;
import org.apache.spark.sql.connector.expressions.SortOrder;

class SortOrderToSpark implements SortOrderVisitor<OrderField> {
class SortOrderToSpark implements SortOrderVisitor<SortOrder> {

private final Map<Integer, String> quotedNameById;

Expand All @@ -34,42 +37,56 @@ class SortOrderToSpark implements SortOrderVisitor<OrderField> {
}

@Override
public OrderField field(String sourceName, int id, SortDirection direction, NullOrder nullOrder) {
return OrderField.column(quotedName(id), direction, nullOrder);
public SortOrder field(String sourceName, int id, SortDirection direction, NullOrder nullOrder) {
return Expressions.sort(Expressions.column(quotedName(id)), toSpark(direction), toSpark(nullOrder));
}

@Override
public OrderField bucket(String sourceName, int id, int width, SortDirection direction, NullOrder nullOrder) {
return OrderField.bucket(quotedName(id), width, direction, nullOrder);
public SortOrder bucket(String sourceName, int id, int width, SortDirection direction, NullOrder nullOrder) {
return Expressions.sort(Expressions.bucket(width, quotedName(id)), toSpark(direction), toSpark(nullOrder));
}

@Override
public OrderField truncate(String sourceName, int id, int width, SortDirection direction, NullOrder nullOrder) {
return OrderField.truncate(quotedName(id), width, direction, nullOrder);
public SortOrder truncate(String sourceName, int id, int width, SortDirection direction, NullOrder nullOrder) {
return Expressions.sort(Expressions.apply(
"truncate", Expressions.column(quotedName(id)), Expressions.literal(width)),
toSpark(direction), toSpark(nullOrder));
}

@Override
public OrderField year(String sourceName, int id, SortDirection direction, NullOrder nullOrder) {
return OrderField.year(quotedName(id), direction, nullOrder);
public SortOrder year(String sourceName, int id, SortDirection direction, NullOrder nullOrder) {
return Expressions.sort(Expressions.years(quotedName(id)), toSpark(direction), toSpark(nullOrder));
}

@Override
public OrderField month(String sourceName, int id, SortDirection direction, NullOrder nullOrder) {
return OrderField.month(quotedName(id), direction, nullOrder);
public SortOrder month(String sourceName, int id, SortDirection direction, NullOrder nullOrder) {
return Expressions.sort(Expressions.months(quotedName(id)), toSpark(direction), toSpark(nullOrder));
}

@Override
public OrderField day(String sourceName, int id, SortDirection direction, NullOrder nullOrder) {
return OrderField.day(quotedName(id), direction, nullOrder);
public SortOrder day(String sourceName, int id, SortDirection direction, NullOrder nullOrder) {
return Expressions.sort(Expressions.days(quotedName(id)), toSpark(direction), toSpark(nullOrder));
}

@Override
public OrderField hour(String sourceName, int id, SortDirection direction, NullOrder nullOrder) {
return OrderField.hour(quotedName(id), direction, nullOrder);
public SortOrder hour(String sourceName, int id, SortDirection direction, NullOrder nullOrder) {
return Expressions.sort(Expressions.hours(quotedName(id)), toSpark(direction), toSpark(nullOrder));
}

private String quotedName(int id) {
return quotedNameById.get(id);
}

private org.apache.spark.sql.connector.expressions.SortDirection toSpark(SortDirection direction) {
if (direction == SortDirection.ASC) {
return org.apache.spark.sql.connector.expressions.SortDirection.ASCENDING;
} else {
return org.apache.spark.sql.connector.expressions.SortDirection.DESCENDING;
}
}

private NullOrdering toSpark(NullOrder nullOrder) {
return nullOrder == NullOrder.NULLS_FIRST ? NullOrdering.NULLS_FIRST : NullOrdering.NULLS_LAST;
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -40,19 +40,11 @@ public static Distribution buildRequiredDistribution(Table table, DistributionMo
return Distributions.unspecified();

case HASH:
if (table.spec().isUnpartitioned()) {
return Distributions.unspecified();
} else {
return Distributions.clustered(Spark3Util.toTransforms(table.spec()));
}
return Distributions.clustered(Spark3Util.toTransforms(table.spec()));

case RANGE:
if (table.spec().isUnpartitioned() && table.sortOrder().isUnsorted()) {
return Distributions.unspecified();
} else {
org.apache.iceberg.SortOrder requiredSortOrder = SortOrderUtil.buildSortOrder(table);
return Distributions.ordered(convert(requiredSortOrder));
}
org.apache.iceberg.SortOrder requiredSortOrder = SortOrderUtil.buildSortOrder(table);
return Distributions.ordered(convert(requiredSortOrder));

default:
throw new IllegalArgumentException("Unsupported distribution mode: " + distributionMode);
Expand All @@ -71,7 +63,7 @@ public static SortOrder[] buildRequiredOrdering(Table table, Distribution distri
}

public static SortOrder[] convert(org.apache.iceberg.SortOrder sortOrder) {
List<OrderField> converted = SortOrderVisitor.visit(sortOrder, new SortOrderToSpark(sortOrder.schema()));
return converted.toArray(new OrderField[0]);
List<SortOrder> converted = SortOrderVisitor.visit(sortOrder, new SortOrderToSpark(sortOrder.schema()));
return converted.toArray(new SortOrder[0]);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,10 @@
import org.apache.spark.sql.RuntimeConfig;
import org.apache.spark.sql.SparkSession;

import static org.apache.iceberg.DistributionMode.HASH;
import static org.apache.iceberg.DistributionMode.NONE;
import static org.apache.iceberg.DistributionMode.RANGE;

/**
* A class for common Iceberg configs for Spark writes.
* <p>
Expand Down Expand Up @@ -155,19 +159,23 @@ public String rewrittenFileSetId() {
}

public DistributionMode distributionMode() {
String defaultValue;
if (table.sortOrder().isSorted()) {
defaultValue = TableProperties.WRITE_DISTRIBUTION_MODE_RANGE;
} else {
defaultValue = TableProperties.WRITE_DISTRIBUTION_MODE_DEFAULT;
}

String modeName = confParser.stringConf()
.option(SparkWriteOptions.DISTRIBUTION_MODE)
.tableProperty(TableProperties.WRITE_DISTRIBUTION_MODE)
.defaultValue(defaultValue)
.parse();
return DistributionMode.fromName(modeName);
.parseOptional();

if (modeName != null) {
DistributionMode mode = DistributionMode.fromName(modeName);
if (mode == RANGE && table.spec().isUnpartitioned() && table.sortOrder().isUnsorted()) {
return NONE;
} else if (mode == HASH && table.spec().isUnpartitioned()) {
return NONE;
} else {
return mode;
}
} else {
return table.sortOrder().isSorted() ? RANGE : NONE;
}
}

public DistributionMode deleteDistributionMode() {
Expand Down
Loading

0 comments on commit 53557ac

Please sign in to comment.