Skip to content

Commit

Permalink
Short-circuit Join when probe side empty
Browse files Browse the repository at this point in the history
  • Loading branch information
findepi committed Apr 10, 2018
1 parent 16e0823 commit 832a0ec
Show file tree
Hide file tree
Showing 3 changed files with 113 additions and 1 deletion.
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.facebook.presto.operator;

import com.facebook.presto.spi.Page;
import com.facebook.presto.spi.PageBuilder;

final class EmptyLookupSource
implements LookupSource
{
@Override
public int getChannelCount()
{
throw new UnsupportedOperationException();
}

@Override
public long getInMemorySizeInBytes()
{
return 0;
}

@Override
public long getJoinPositionCount()
{
return 0;
}

@Override
public long joinPositionWithinPartition(long joinPosition)
{
throw new UnsupportedOperationException();
}

@Override
public long getJoinPosition(int position, Page hashChannelsPage, Page allChannelsPage, long rawHash)
{
return -1;
}

@Override
public long getJoinPosition(int position, Page hashChannelsPage, Page allChannelsPage)
{
return -1;
}

@Override
public long getNextJoinPosition(long currentJoinPosition, int probePosition, Page allProbeChannelsPage)
{
throw new UnsupportedOperationException();
}

@Override
public void appendTo(long position, PageBuilder pageBuilder, int outputChannelOffset)
{
throw new UnsupportedOperationException();
}

@Override
public boolean isJoinPositionEligible(long currentJoinPosition, int probePosition, Page allProbeChannelsPage)
{
return false;
}

@Override
public boolean isEmpty()
{
return true;
}

@Override
public void close() {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import static com.facebook.presto.operator.LookupJoinOperators.JoinType.PROBE_OUTER;
import static com.google.common.base.Preconditions.checkState;
import static com.google.common.base.Verify.verify;
import static io.airlift.concurrent.MoreFutures.addSuccessCallback;
import static io.airlift.concurrent.MoreFutures.checkSuccess;
import static io.airlift.concurrent.MoreFutures.getDone;
import static java.lang.String.format;
Expand Down Expand Up @@ -182,6 +183,10 @@ public ListenableFuture<?> isBlocked()
return unspilledLookupSource.get();
}

if (finishing) {
return NOT_BLOCKED;
}

return lookupSourceProviderFuture;
}

Expand Down Expand Up @@ -290,7 +295,14 @@ public Page getOutput()
}

if (!tryFetchLookupSourceProvider()) {
return null;
if (!finishing) {
return null;
}

verify(finishing);
// We are no longer interested in the build side (the lookupSourceProviderFuture's value).
addSuccessCallback(lookupSourceProviderFuture, LookupSourceProvider::close);
lookupSourceProvider = new StaticLookupSourceProvider(new EmptyLookupSource());
}

if (probe == null && finishing && !unspilling) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8040,6 +8040,22 @@ public void testFullJoinWithEmptyBuildSide()
"WITH small_part AS (SELECT * FROM part WHERE name = 'a') SELECT lineitem.orderkey FROM lineitem LEFT JOIN small_part ON lineitem.partkey = small_part.partkey");
}

@Test
public void testInnerJoinWithEmptyProbeSide()
{
assertQuery(
noJoinReordering(),
"WITH small_part AS (SELECT * FROM part WHERE name = 'a') SELECT lineitem.orderkey FROM small_part INNER JOIN lineitem ON small_part.partkey = lineitem.partkey");
}

@Test
public void testRightJoinWithEmptyProbeSide()
{
assertQuery(
noJoinReordering(),
"WITH small_part AS (SELECT * FROM part WHERE name = 'a') SELECT lineitem.orderkey FROM small_part RIGHT JOIN lineitem ON small_part.partkey = lineitem.partkey");
}

protected Session noJoinReordering()
{
return Session.builder(getSession())
Expand Down

0 comments on commit 832a0ec

Please sign in to comment.