Skip to content

Commit

Permalink
Use OrderingCompiler in OrderBy spilling
Browse files Browse the repository at this point in the history
Cherry-pick of trinodb/trino@472fb5a

Co-authored-by: Karol Sobczak <[email protected]>
  • Loading branch information
2 people authored and highker committed Jul 24, 2020
1 parent 9d11bdb commit 1a56a5e
Show file tree
Hide file tree
Showing 5 changed files with 31 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import com.facebook.presto.operator.OrderByOperator.OrderByOperatorFactory;
import com.facebook.presto.operator.PagesIndex;
import com.facebook.presto.spi.plan.PlanNodeId;
import com.facebook.presto.sql.gen.OrderingCompiler;
import com.facebook.presto.testing.LocalQueryRunner;
import com.google.common.collect.ImmutableList;

Expand Down Expand Up @@ -56,7 +57,8 @@ protected List<? extends OperatorFactory> createOperatorFactories()
ImmutableList.of(ASC_NULLS_LAST),
new PagesIndex.TestingFactory(false),
false,
Optional.empty());
Optional.empty(),
new OrderingCompiler());

return ImmutableList.of(tableScanOperator, limitOperator, orderByOperator);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import com.facebook.presto.spi.plan.PlanNodeId;
import com.facebook.presto.spiller.Spiller;
import com.facebook.presto.spiller.SpillerFactory;
import com.facebook.presto.sql.gen.OrderingCompiler;
import com.google.common.collect.ImmutableList;
import com.google.common.primitives.Ints;
import com.google.common.util.concurrent.ListenableFuture;
Expand Down Expand Up @@ -52,10 +53,12 @@ public static class OrderByOperatorFactory
private final int expectedPositions;
private final List<Integer> sortChannels;
private final List<SortOrder> sortOrder;
private boolean closed;
private final PagesIndex.Factory pagesIndexFactory;
private final boolean spillEnabled;
private final Optional<SpillerFactory> spillerFactory;
private final OrderingCompiler orderingCompiler;

private boolean closed;

public OrderByOperatorFactory(
int operatorId,
Expand All @@ -67,7 +70,8 @@ public OrderByOperatorFactory(
List<SortOrder> sortOrder,
PagesIndex.Factory pagesIndexFactory,
boolean spillEnabled,
Optional<SpillerFactory> spillerFactory)
Optional<SpillerFactory> spillerFactory,
OrderingCompiler orderingCompiler)
{
this.operatorId = operatorId;
this.planNodeId = requireNonNull(planNodeId, "planNodeId is null");
Expand All @@ -80,6 +84,7 @@ public OrderByOperatorFactory(
this.pagesIndexFactory = requireNonNull(pagesIndexFactory, "pagesIndexFactory is null");
this.spillEnabled = spillEnabled;
this.spillerFactory = requireNonNull(spillerFactory, "spillerFactory is null");
this.orderingCompiler = requireNonNull(orderingCompiler, "orderingCompiler is null");
checkArgument(!spillEnabled || spillerFactory.isPresent(), "Spiller Factory is not present when spill is enabled");
}

Expand All @@ -98,7 +103,8 @@ public Operator createOperator(DriverContext driverContext)
sortOrder,
pagesIndexFactory,
spillEnabled,
spillerFactory);
spillerFactory,
orderingCompiler);
}

@Override
Expand All @@ -120,7 +126,8 @@ public OperatorFactory duplicate()
sortOrder,
pagesIndexFactory,
spillEnabled,
spillerFactory);
spillerFactory,
orderingCompiler);
}
}

Expand All @@ -144,6 +151,7 @@ private enum State

private final boolean spillEnabled;
private final Optional<SpillerFactory> spillerFactory;
private final OrderingCompiler orderingCompiler;

private Optional<Spiller> spiller = Optional.empty();
private ListenableFuture<?> spillInProgress = immediateFuture(null);
Expand All @@ -162,7 +170,8 @@ public OrderByOperator(
List<SortOrder> sortOrder,
PagesIndex.Factory pagesIndexFactory,
boolean spillEnabled,
Optional<SpillerFactory> spillerFactory)
Optional<SpillerFactory> spillerFactory,
OrderingCompiler orderingCompiler)
{
requireNonNull(pagesIndexFactory, "pagesIndexFactory is null");

Expand All @@ -177,6 +186,7 @@ public OrderByOperator(
this.pageIndex = pagesIndexFactory.newPagesIndex(sourceTypes, expectedPositions);
this.spillEnabled = spillEnabled;
this.spillerFactory = requireNonNull(spillerFactory, "spillerFactory is null");
this.orderingCompiler = requireNonNull(orderingCompiler, "orderingCompiler is null");
checkArgument(!spillEnabled || spillerFactory.isPresent(), "Spiller Factory is not present when spill is enabled");
}

Expand Down Expand Up @@ -324,8 +334,7 @@ private WorkProcessor<Page> mergeSpilledAndMemoryPages(List<WorkProcessor<Page>>

return mergeSortedPages(
sortedStreams,
// TODO use compiled comparator, like PagesIndex's OrderingCompiler
new SimplePageWithPositionComparator(sourceTypes, sortChannels, sortOrder),
orderingCompiler.compilePageWithPositionComparator(sourceTypes, sortChannels, sortOrder),
sourceTypes,
operatorContext.aggregateUserMemoryContext(),
operatorContext.getDriverContext().getYieldSignal());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -248,7 +248,7 @@ private PageWithPositionComparator internalCompilePageWithPositionComparator(Lis
comparator = pageWithPositionsComparatorClass.getConstructor().newInstance();
}
catch (Throwable t) {
log.error(t, "Error compiling merge sort comparator for channels %s with order %s", sortChannels, sortChannels);
log.error(t, "Error compiling comparator for channels %s with order %s", sortChannels, sortChannels);
comparator = new SimplePageWithPositionComparator(types, sortChannels, sortOrders);
}
return comparator;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1070,7 +1070,8 @@ public PhysicalOperation visitSort(SortNode node, LocalExecutionPlanContext cont
sortOrder.build(),
pagesIndexFactory,
spillEnabled,
Optional.of(spillerFactory));
Optional.of(spillerFactory),
orderingCompiler);

return new PhysicalOperation(operator, source.getLayout(), context, source);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import com.facebook.presto.common.Page;
import com.facebook.presto.operator.OrderByOperator.OrderByOperatorFactory;
import com.facebook.presto.spi.plan.PlanNodeId;
import com.facebook.presto.sql.gen.OrderingCompiler;
import com.facebook.presto.testing.MaterializedResult;
import com.google.common.collect.ImmutableList;
import io.airlift.units.DataSize;
Expand Down Expand Up @@ -97,7 +98,8 @@ public void testSingleFieldKey(boolean spillEnabled)
ImmutableList.of(ASC_NULLS_LAST),
new PagesIndex.TestingFactory(false),
spillEnabled,
Optional.of(new DummySpillerFactory()));
Optional.of(new DummySpillerFactory()),
new OrderingCompiler());

MaterializedResult expected = resultBuilder(driverContext.getSession(), DOUBLE)
.row(-0.1)
Expand Down Expand Up @@ -130,7 +132,8 @@ public void testMultiFieldKey(boolean spillEnabled)
ImmutableList.of(ASC_NULLS_LAST, DESC_NULLS_LAST),
new PagesIndex.TestingFactory(false),
spillEnabled,
Optional.of(new DummySpillerFactory()));
Optional.of(new DummySpillerFactory()),
new OrderingCompiler());

MaterializedResult expected = MaterializedResult.resultBuilder(driverContext.getSession(), VARCHAR, BIGINT)
.row("a", 4L)
Expand Down Expand Up @@ -163,7 +166,8 @@ public void testReverseOrder(boolean spillEnabled)
ImmutableList.of(DESC_NULLS_LAST),
new PagesIndex.TestingFactory(false),
spillEnabled,
Optional.of(new DummySpillerFactory()));
Optional.of(new DummySpillerFactory()),
new OrderingCompiler());

MaterializedResult expected = resultBuilder(driverContext.getSession(), BIGINT)
.row(4L)
Expand Down Expand Up @@ -200,7 +204,8 @@ public void testMemoryLimit()
ImmutableList.of(ASC_NULLS_LAST),
new PagesIndex.TestingFactory(false),
false,
Optional.of(new DummySpillerFactory()));
Optional.of(new DummySpillerFactory()),
new OrderingCompiler());

toPages(operatorFactory, driverContext, input);
}
Expand Down

0 comments on commit 1a56a5e

Please sign in to comment.