Skip to content

Commit

Permalink
[multistage] Split MailboxReceiveOperator into sorted and non-sorted …
Browse files Browse the repository at this point in the history
…versions (#10570)

* Split MailboxReceiveOperator into sorted and non-sorted versions
  - sorted mailbox receiver will keep polling for data from all the mailboxes until it gets only 'null' blocks or gets EOS from all mailboxes.
  - non-sorted mailbox receiver will return immediately when data arrives (behavior identical to prior to #10408 
* Fix tests - ORDER BY with only limit should set isSortOnReceiver to false
  - when there's fetch/offset in SortNode but no Collation, skip sorting thus speed up performance
  • Loading branch information
somandal authored Apr 7, 2023
1 parent 0df38f1 commit 6283ee7
Show file tree
Hide file tree
Showing 12 changed files with 1,319 additions and 541 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ public void onMatch(RelOptRuleCall call) {
RelDistributions.hash(Collections.emptyList()),
sort.getCollation(),
false,
true);
!sort.getCollation().getKeys().isEmpty());
call.transformTo(LogicalSort.create(exchange, sort.getCollation(), sort.offset, sort.fetch));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@
"Execution Plan",
"\nLogicalProject(EXPR$0=[dateTrunc('DAY', $4)])",
"\n LogicalSort(offset=[0], fetch=[10])",
"\n PinotLogicalSortExchange(distribution=[hash], collation=[[]], isSortOnSender=[false], isSortOnReceiver=[true])",
"\n PinotLogicalSortExchange(distribution=[hash], collation=[[]], isSortOnSender=[false], isSortOnReceiver=[false])",
"\n LogicalSort(fetch=[10])",
"\n LogicalTableScan(table=[[a]])",
"\n"
Expand All @@ -52,7 +52,7 @@
"Execution Plan",
"\nLogicalProject(day=[dateTrunc('DAY', $4)])",
"\n LogicalSort(offset=[0], fetch=[10])",
"\n PinotLogicalSortExchange(distribution=[hash], collation=[[]], isSortOnSender=[false], isSortOnReceiver=[true])",
"\n PinotLogicalSortExchange(distribution=[hash], collation=[[]], isSortOnSender=[false], isSortOnReceiver=[false])",
"\n LogicalSort(fetch=[10])",
"\n LogicalTableScan(table=[[a]])",
"\n"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@
"output": [
"Execution Plan",
"\nLogicalSort(offset=[0], fetch=[10])",
"\n PinotLogicalSortExchange(distribution=[hash], collation=[[]], isSortOnSender=[false], isSortOnReceiver=[true])",
"\n PinotLogicalSortExchange(distribution=[hash], collation=[[]], isSortOnSender=[false], isSortOnReceiver=[false])",
"\n LogicalSort(fetch=[10])",
"\n LogicalProject(col1=[$0], $1=[$2])",
"\n LogicalWindow(window#0=[window(aggs [SUM($1)])])",
Expand Down Expand Up @@ -300,7 +300,7 @@
"output": [
"Execution Plan",
"\nLogicalSort(offset=[0], fetch=[100])",
"\n PinotLogicalSortExchange(distribution=[hash], collation=[[]], isSortOnSender=[false], isSortOnReceiver=[true])",
"\n PinotLogicalSortExchange(distribution=[hash], collation=[[]], isSortOnSender=[false], isSortOnReceiver=[false])",
"\n LogicalSort(fetch=[100])",
"\n LogicalProject(col1=[$0], $1=[$3], $2=[$4])",
"\n LogicalWindow(window#0=[window(aggs [SUM($2), COUNT($1)])])",
Expand Down Expand Up @@ -536,7 +536,7 @@
"output": [
"Execution Plan",
"\nLogicalSort(offset=[0], fetch=[10])",
"\n PinotLogicalSortExchange(distribution=[hash], collation=[[]], isSortOnSender=[false], isSortOnReceiver=[true])",
"\n PinotLogicalSortExchange(distribution=[hash], collation=[[]], isSortOnSender=[false], isSortOnReceiver=[false])",
"\n LogicalSort(fetch=[10])",
"\n LogicalProject(col1=[$0], EXPR$1=[/(CAST($3):DOUBLE NOT NULL, $4)])",
"\n LogicalWindow(window#0=[window(partition {1} aggs [SUM($2), COUNT($2)])])",
Expand Down Expand Up @@ -785,7 +785,7 @@
"output": [
"Execution Plan",
"\nLogicalSort(offset=[0], fetch=[10])",
"\n PinotLogicalSortExchange(distribution=[hash], collation=[[]], isSortOnSender=[false], isSortOnReceiver=[true])",
"\n PinotLogicalSortExchange(distribution=[hash], collation=[[]], isSortOnSender=[false], isSortOnReceiver=[false])",
"\n LogicalSort(fetch=[10])",
"\n LogicalProject(col1=[$0], EXPR$1=[/(CAST($2):DOUBLE NOT NULL, $3)], EXPR$2=[$4])",
"\n LogicalWindow(window#0=[window(partition {0} aggs [SUM($1), COUNT($1), MIN($1)])])",
Expand Down Expand Up @@ -1097,7 +1097,7 @@
"output": [
"Execution Plan",
"\nLogicalSort(offset=[0], fetch=[10])",
"\n PinotLogicalSortExchange(distribution=[hash], collation=[[]], isSortOnSender=[false], isSortOnReceiver=[true])",
"\n PinotLogicalSortExchange(distribution=[hash], collation=[[]], isSortOnSender=[false], isSortOnReceiver=[false])",
"\n LogicalSort(fetch=[10])",
"\n LogicalProject(col1=[$0], EXPR$1=[/(CAST($3):DOUBLE NOT NULL, $4)])",
"\n LogicalWindow(window#0=[window(order by [1] aggs [SUM($2), COUNT($2)])])",
Expand Down Expand Up @@ -1265,7 +1265,7 @@
"output": [
"Execution Plan",
"\nLogicalSort(offset=[0], fetch=[10])",
"\n PinotLogicalSortExchange(distribution=[hash], collation=[[]], isSortOnSender=[false], isSortOnReceiver=[true])",
"\n PinotLogicalSortExchange(distribution=[hash], collation=[[]], isSortOnSender=[false], isSortOnReceiver=[false])",
"\n LogicalSort(fetch=[10])",
"\n LogicalProject(col1=[$0], EXPR$1=[/(CAST($2):DOUBLE NOT NULL, $3)], EXPR$2=[$4])",
"\n LogicalWindow(window#0=[window(order by [0] aggs [SUM($1), COUNT($1), MIN($1)])])",
Expand Down Expand Up @@ -1433,7 +1433,7 @@
"output": [
"Execution Plan",
"\nLogicalSort(offset=[0], fetch=[10])",
"\n PinotLogicalSortExchange(distribution=[hash], collation=[[]], isSortOnSender=[false], isSortOnReceiver=[true])",
"\n PinotLogicalSortExchange(distribution=[hash], collation=[[]], isSortOnSender=[false], isSortOnReceiver=[false])",
"\n LogicalSort(fetch=[10])",
"\n LogicalProject(col1=[$0], EXPR$1=[/(CAST($3):DOUBLE NOT NULL, $4)])",
"\n LogicalWindow(window#0=[window(partition {1} order by [1] aggs [SUM($2), COUNT($2)])])",
Expand Down Expand Up @@ -1640,7 +1640,7 @@
"output": [
"Execution Plan",
"\nLogicalSort(offset=[0], fetch=[10])",
"\n PinotLogicalSortExchange(distribution=[hash], collation=[[]], isSortOnSender=[false], isSortOnReceiver=[true])",
"\n PinotLogicalSortExchange(distribution=[hash], collation=[[]], isSortOnSender=[false], isSortOnReceiver=[false])",
"\n LogicalSort(fetch=[10])",
"\n LogicalProject(col1=[$0], EXPR$1=[/(CAST($2):DOUBLE NOT NULL, $3)], EXPR$2=[$4])",
"\n LogicalWindow(window#0=[window(partition {0} order by [0] aggs [SUM($1), COUNT($1), MIN($1)])])",
Expand Down Expand Up @@ -1846,7 +1846,7 @@
"output": [
"Execution Plan",
"\nLogicalSort(offset=[0], fetch=[10])",
"\n PinotLogicalSortExchange(distribution=[hash], collation=[[]], isSortOnSender=[false], isSortOnReceiver=[true])",
"\n PinotLogicalSortExchange(distribution=[hash], collation=[[]], isSortOnSender=[false], isSortOnReceiver=[false])",
"\n LogicalSort(fetch=[10])",
"\n LogicalProject(col1=[$0], EXPR$1=[/(CAST($3):DOUBLE NOT NULL, $4)])",
"\n LogicalWindow(window#0=[window(partition {1} order by [0] aggs [SUM($2), COUNT($2)])])",
Expand Down Expand Up @@ -2014,7 +2014,7 @@
"output": [
"Execution Plan",
"\nLogicalSort(offset=[0], fetch=[10])",
"\n PinotLogicalSortExchange(distribution=[hash], collation=[[]], isSortOnSender=[false], isSortOnReceiver=[true])",
"\n PinotLogicalSortExchange(distribution=[hash], collation=[[]], isSortOnSender=[false], isSortOnReceiver=[false])",
"\n LogicalSort(fetch=[10])",
"\n LogicalProject(col1=[$0], EXPR$1=[/(CAST($3):DOUBLE NOT NULL, $4)], EXPR$2=[$5])",
"\n LogicalWindow(window#0=[window(partition {0} order by [1] aggs [SUM($2), COUNT($2), MIN($2)])])",
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pinot.query.runtime.operator;

import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Set;
import org.apache.calcite.rel.RelDistribution;
import org.apache.pinot.query.mailbox.JsonMailboxIdentifier;
import org.apache.pinot.query.mailbox.MailboxIdentifier;
import org.apache.pinot.query.mailbox.MailboxService;
import org.apache.pinot.query.routing.VirtualServer;
import org.apache.pinot.query.routing.VirtualServerAddress;
import org.apache.pinot.query.runtime.blocks.TransferableBlock;
import org.apache.pinot.query.runtime.plan.OpChainExecutionContext;
import org.apache.pinot.query.service.QueryConfig;


/**
* Base class to be used by the various MailboxReceiveOperators such as the sorted and non-sorted versions. This
* class contains the common logic needed for MailboxReceive
*
* BaseMailboxReceiveOperator receives mailbox from mailboxService from sendingStageInstances.
* We use sendingStageInstance to deduce mailboxId and fetch the content from mailboxService.
* When exchangeType is Singleton, we find the mapping mailbox for the mailboxService. If not found, use empty list.
* When exchangeType is non-Singleton, we pull from each instance in round-robin way to get matched mailbox content.
*/
public abstract class BaseMailboxReceiveOperator extends MultiStageOperator {

// TODO: Unify SUPPORTED_EXCHANGE_TYPES with MailboxSendOperator.
protected static final Set<RelDistribution.Type> SUPPORTED_EXCHANGE_TYPES =
ImmutableSet.of(RelDistribution.Type.BROADCAST_DISTRIBUTED, RelDistribution.Type.HASH_DISTRIBUTED,
RelDistribution.Type.SINGLETON, RelDistribution.Type.RANDOM_DISTRIBUTED);

protected final MailboxService<TransferableBlock> _mailboxService;
protected final RelDistribution.Type _exchangeType;
protected final List<MailboxIdentifier> _sendingMailbox;
protected final long _deadlineTimestampNano;
protected int _serverIdx;
protected TransferableBlock _upstreamErrorBlock;

protected static MailboxIdentifier toMailboxId(VirtualServer sender, long jobId, int senderStageId,
int receiverStageId, VirtualServerAddress receiver) {
return new JsonMailboxIdentifier(
String.format("%s_%s", jobId, senderStageId),
new VirtualServerAddress(sender),
receiver,
senderStageId,
receiverStageId);
}

public BaseMailboxReceiveOperator(OpChainExecutionContext context, List<VirtualServer> sendingStageInstances,
RelDistribution.Type exchangeType, int senderStageId, int receiverStageId, Long timeoutMs) {
super(context);
_mailboxService = context.getMailboxService();
VirtualServerAddress receiver = context.getServer();
long jobId = context.getRequestId();
Preconditions.checkState(SUPPORTED_EXCHANGE_TYPES.contains(exchangeType),
"Exchange/Distribution type: " + exchangeType + " is not supported!");
long timeoutNano = (timeoutMs != null ? timeoutMs : QueryConfig.DEFAULT_MAILBOX_TIMEOUT_MS) * 1_000_000L;
_deadlineTimestampNano = timeoutNano + System.nanoTime();

_exchangeType = exchangeType;
if (_exchangeType == RelDistribution.Type.SINGLETON) {
VirtualServer singletonInstance = null;
for (VirtualServer serverInstance : sendingStageInstances) {
if (serverInstance.getHostname().equals(_mailboxService.getHostname())
&& serverInstance.getQueryMailboxPort() == _mailboxService.getMailboxPort()) {
Preconditions.checkState(singletonInstance == null, "multiple instance found for singleton exchange type!");
singletonInstance = serverInstance;
}
}

if (singletonInstance == null) {
// TODO: fix WorkerManager assignment, this should not happen if we properly assign workers.
// see: https://github.com/apache/pinot/issues/9611
_sendingMailbox = Collections.emptyList();
} else {
_sendingMailbox =
Collections.singletonList(toMailboxId(singletonInstance, jobId, senderStageId, receiverStageId, receiver));
}
} else {
_sendingMailbox = new ArrayList<>(sendingStageInstances.size());
for (VirtualServer instance : sendingStageInstances) {
_sendingMailbox.add(toMailboxId(instance, jobId, senderStageId, receiverStageId, receiver));
}
}
_upstreamErrorBlock = null;
_serverIdx = 0;
}

public List<MailboxIdentifier> getSendingMailbox() {
return _sendingMailbox;
}

@Override
public List<MultiStageOperator> getChildOperators() {
return ImmutableList.of();
}

@Override
public void close() {
super.close();
for (MailboxIdentifier sendingMailbox : _sendingMailbox) {
_mailboxService.releaseReceivingMailbox(sendingMailbox);
}
}

@Override
public void cancel(Throwable t) {
super.cancel(t);
for (MailboxIdentifier sendingMailbox : _sendingMailbox) {
_mailboxService.releaseReceivingMailbox(sendingMailbox);
}
}
}
Loading

0 comments on commit 6283ee7

Please sign in to comment.