Skip to content

Commit

Permalink
[fix](arrow-flight-sql) Arrow Flight support multiple endpoints (apac…
Browse files Browse the repository at this point in the history
…he#44286)

### What problem does this PR solve?

Problem Summary:

By default, the query results of all BE nodes will be aggregated to one
BE node. ADBC Client will only receive one endpoint and pull data from
the BE node corresponding to this endpoint.

`set global enable_parallel_result_sink=true;` to allow each BE to
return query results separately. ADBC Client will receive multiple
endpoints and pull data from each endpoint.
  • Loading branch information
xinyiZzz authored Nov 25, 2024
1 parent 11379b5 commit 05b48d6
Show file tree
Hide file tree
Showing 6 changed files with 228 additions and 169 deletions.
40 changes: 8 additions & 32 deletions fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@

import org.apache.doris.analysis.BoolLiteral;
import org.apache.doris.analysis.DecimalLiteral;
import org.apache.doris.analysis.Expr;
import org.apache.doris.analysis.FloatLiteral;
import org.apache.doris.analysis.IntLiteral;
import org.apache.doris.analysis.LiteralExpr;
Expand Down Expand Up @@ -63,11 +62,11 @@
import org.apache.doris.plugin.AuditEvent.AuditEventBuilder;
import org.apache.doris.resource.Tag;
import org.apache.doris.service.arrowflight.results.FlightSqlChannel;
import org.apache.doris.service.arrowflight.results.FlightSqlEndpointsLocation;
import org.apache.doris.statistics.ColumnStatistic;
import org.apache.doris.statistics.Histogram;
import org.apache.doris.system.Backend;
import org.apache.doris.task.LoadTaskInfo;
import org.apache.doris.thrift.TNetworkAddress;
import org.apache.doris.thrift.TResultSinkType;
import org.apache.doris.thrift.TStatusCode;
import org.apache.doris.thrift.TUniqueId;
Expand Down Expand Up @@ -134,10 +133,7 @@ public enum ConnectType {
protected volatile String peerIdentity;
private final Map<String, String> preparedQuerys = new HashMap<>();
private String runningQuery;
private TNetworkAddress resultFlightServerAddr;
private TNetworkAddress resultInternalServiceAddr;
private ArrayList<Expr> resultOutputExprs;
private TUniqueId finstId;
private final List<FlightSqlEndpointsLocation> flightSqlEndpointsLocations = Lists.newArrayList();
private boolean returnResultFromLocal = true;
// mysql net
protected volatile MysqlChannel mysqlChannel;
Expand Down Expand Up @@ -730,36 +726,16 @@ public String getRunningQuery() {
return runningQuery;
}

public void setResultFlightServerAddr(TNetworkAddress resultFlightServerAddr) {
this.resultFlightServerAddr = resultFlightServerAddr;
public void addFlightSqlEndpointsLocation(FlightSqlEndpointsLocation flightSqlEndpointsLocation) {
this.flightSqlEndpointsLocations.add(flightSqlEndpointsLocation);
}

public TNetworkAddress getResultFlightServerAddr() {
return resultFlightServerAddr;
public List<FlightSqlEndpointsLocation> getFlightSqlEndpointsLocations() {
return flightSqlEndpointsLocations;
}

public void setResultInternalServiceAddr(TNetworkAddress resultInternalServiceAddr) {
this.resultInternalServiceAddr = resultInternalServiceAddr;
}

public TNetworkAddress getResultInternalServiceAddr() {
return resultInternalServiceAddr;
}

public void setResultOutputExprs(ArrayList<Expr> resultOutputExprs) {
this.resultOutputExprs = resultOutputExprs;
}

public ArrayList<Expr> getResultOutputExprs() {
return resultOutputExprs;
}

public void setFinstId(TUniqueId finstId) {
this.finstId = finstId;
}

public TUniqueId getFinstId() {
return finstId;
public void clearFlightSqlEndpointsLocations() {
flightSqlEndpointsLocations.clear();
}

public void setReturnResultFromLocal(boolean returnResultFromLocal) {
Expand Down
32 changes: 16 additions & 16 deletions fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@
import org.apache.doris.rpc.RpcException;
import org.apache.doris.service.ExecuteEnv;
import org.apache.doris.service.FrontendOptions;
import org.apache.doris.service.arrowflight.results.FlightSqlEndpointsLocation;
import org.apache.doris.system.Backend;
import org.apache.doris.system.SystemInfoService;
import org.apache.doris.task.LoadEtlTask;
Expand Down Expand Up @@ -733,29 +734,27 @@ protected void execInternal() throws Exception {
enableParallelResultSink = queryOptions.isEnableParallelOutfile();
}

TNetworkAddress execBeAddr = topParams.instanceExecParams.get(0).host;
Set<TNetworkAddress> addrs = new HashSet<>();
for (FInstanceExecParam param : topParams.instanceExecParams) {
if (addrs.contains(param.host)) {
continue;
}
addrs.add(param.host);
receivers.add(new ResultReceiver(queryId, param.instanceId, addressToBackendID.get(param.host),
toBrpcHost(param.host), this.timeoutDeadline,
context.getSessionVariable().getMaxMsgSizeOfResultReceiver(), enableParallelResultSink));
}

if (!context.isReturnResultFromLocal()) {
Preconditions.checkState(context.getConnectType().equals(ConnectType.ARROW_FLIGHT_SQL));
if (enableParallelResultSink) {
context.setFinstId(queryId);
if (context.isReturnResultFromLocal()) {
receivers.add(new ResultReceiver(queryId, param.instanceId, addressToBackendID.get(param.host),
toBrpcHost(param.host), this.timeoutDeadline,
context.getSessionVariable().getMaxMsgSizeOfResultReceiver(), enableParallelResultSink));
} else {
context.setFinstId(topParams.instanceExecParams.get(0).instanceId);
Preconditions.checkState(context.getConnectType().equals(ConnectType.ARROW_FLIGHT_SQL));
TUniqueId finstId;
if (enableParallelResultSink) {
finstId = queryId;
} else {
finstId = topParams.instanceExecParams.get(0).instanceId;
}
context.addFlightSqlEndpointsLocation(new FlightSqlEndpointsLocation(finstId,
toArrowFlightHost(param.host), toBrpcHost(param.host), fragments.get(0).getOutputExprs()));
}
context.setFinstId(topParams.instanceExecParams.get(0).instanceId);
context.setResultFlightServerAddr(toArrowFlightHost(execBeAddr));
context.setResultInternalServiceAddr(toBrpcHost(execBeAddr));
context.setResultOutputExprs(fragments.get(0).getOutputExprs());
}

LOG.info("dispatch result sink of query {} to {}", DebugUtil.printId(queryId),
Expand All @@ -766,7 +765,8 @@ protected void execInternal() throws Exception {
// set the broker address for OUTFILE sink
ResultFileSink topResultFileSink = (ResultFileSink) topDataSink;
FsBroker broker = Env.getCurrentEnv().getBrokerMgr()
.getBroker(topResultFileSink.getBrokerName(), execBeAddr.getHostname());
.getBroker(topResultFileSink.getBrokerName(),
topParams.instanceExecParams.get(0).host.getHostname());
topResultFileSink.setBrokerAddr(broker.host, broker.port);
}
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
import org.apache.doris.qe.runtime.ThriftPlansBuilder;
import org.apache.doris.resource.workloadgroup.QueryQueue;
import org.apache.doris.resource.workloadgroup.QueueToken;
import org.apache.doris.service.arrowflight.results.FlightSqlEndpointsLocation;
import org.apache.doris.system.Backend;
import org.apache.doris.thrift.TErrorTabletInfo;
import org.apache.doris.thrift.TNetworkAddress;
Expand Down Expand Up @@ -90,7 +91,7 @@ public NereidsCoordinator(ConnectContext context, Analyzer analyzer,
this.coordinatorContext.setJobProcessor(buildJobProcessor(coordinatorContext));

Preconditions.checkState(!planner.getFragments().isEmpty()
&& coordinatorContext.instanceNum.get() > 0, "Fragment and Instance can not be empty˚");
&& coordinatorContext.instanceNum.get() > 0, "Fragment and Instance can not be empty˚");
}

// broker load
Expand Down Expand Up @@ -431,18 +432,22 @@ private void setForArrowFlight(CoordinatorContext coordinatorContext, PipelineDi
if (dataSink instanceof ResultSink || dataSink instanceof ResultFileSink) {
if (connectContext != null && !connectContext.isReturnResultFromLocal()) {
Preconditions.checkState(connectContext.getConnectType().equals(ConnectType.ARROW_FLIGHT_SQL));

AssignedJob firstInstance = topPlan.getInstanceJobs().get(0);
BackendWorker worker = (BackendWorker) firstInstance.getAssignedWorker();
Backend backend = worker.getBackend();

connectContext.setFinstId(firstInstance.instanceId());
if (backend.getArrowFlightSqlPort() < 0) {
throw new IllegalStateException("be arrow_flight_sql_port cannot be empty.");
for (AssignedJob instance : topPlan.getInstanceJobs()) {
BackendWorker worker = (BackendWorker) instance.getAssignedWorker();
Backend backend = worker.getBackend();
if (backend.getArrowFlightSqlPort() < 0) {
throw new IllegalStateException("be arrow_flight_sql_port cannot be empty.");
}
TUniqueId finstId;
if (connectContext.getSessionVariable().enableParallelResultSink()) {
finstId = getQueryId();
} else {
finstId = instance.instanceId();
}
connectContext.addFlightSqlEndpointsLocation(new FlightSqlEndpointsLocation(finstId,
backend.getArrowFlightAddress(), backend.getBrpcAddress(),
topPlan.getFragmentJob().getFragment().getOutputExprs()));
}
connectContext.setResultFlightServerAddr(backend.getArrowFlightAddress());
connectContext.setResultInternalServiceAddr(backend.getBrpcAddress());
connectContext.setResultOutputExprs(topPlan.getFragmentJob().getFragment().getOutputExprs());
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,13 @@
import org.apache.doris.mysql.MysqlCommand;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.qe.QueryState.MysqlStateType;
import org.apache.doris.service.arrowflight.results.FlightSqlEndpointsLocation;
import org.apache.doris.service.arrowflight.results.FlightSqlResultCacheEntry;
import org.apache.doris.service.arrowflight.sessions.FlightSessionsManager;
import org.apache.doris.thrift.TUniqueId;

import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.google.protobuf.Any;
import com.google.protobuf.ByteString;
import com.google.protobuf.Message;
Expand Down Expand Up @@ -187,6 +189,7 @@ private FlightInfo executeQueryStatement(String peerIdentity, ConnectContext con
Preconditions.checkState(!query.isEmpty());
// After the previous query was executed, there was no getStreamStatement to take away the result.
connectContext.getFlightSqlChannel().reset();
connectContext.clearFlightSqlEndpointsLocations();
try (FlightSqlConnectProcessor flightSQLConnectProcessor = new FlightSqlConnectProcessor(connectContext)) {
flightSQLConnectProcessor.handleQuery(query);
if (connectContext.getState().getStateType() == MysqlStateType.ERR) {
Expand Down Expand Up @@ -225,50 +228,52 @@ private FlightInfo executeQueryStatement(String peerIdentity, ConnectContext con
}
} else {
// Now only query stmt will pull results from BE.
Schema schema = flightSQLConnectProcessor.fetchArrowFlightSchema(5000);
if (schema == null) {
flightSQLConnectProcessor.fetchArrowFlightSchema(5000);
if (flightSQLConnectProcessor.getArrowSchema() == null) {
throw CallStatus.INTERNAL.withDescription("fetch arrow flight schema is null")
.toRuntimeException();
}

TUniqueId queryId = connectContext.queryId();
if (!connectContext.getSessionVariable().enableParallelResultSink()) {
// only one instance
queryId = connectContext.getFinstId();
}
// Ticket contains the IP and Brpc Port of the Doris BE node where the query result is located.
final ByteString handle = ByteString.copyFromUtf8(
DebugUtil.printId(queryId) + "&" + connectContext.getResultInternalServiceAddr().hostname
+ "&" + connectContext.getResultInternalServiceAddr().port + "&" + query);
TicketStatementQuery ticketStatement = TicketStatementQuery.newBuilder().setStatementHandle(handle)
.build();
Ticket ticket = new Ticket(Any.pack(ticketStatement).toByteArray());
// TODO Support multiple endpoints.
Location location;
if (flightSQLConnectProcessor.getPublicAccessAddr().isSetHostname()) {
// In a production environment, it is often inconvenient to expose Doris BE nodes
// to the external network.
// However, a reverse proxy (such as nginx) can be added to all Doris BE nodes,
// and the external client will be randomly routed to a Doris BE node when connecting to nginx.
// The query results of Arrow Flight SQL will be randomly saved on a Doris BE node.
// If it is different from the Doris BE node randomly routed by nginx,
// data forwarding needs to be done inside the Doris BE node.
if (flightSQLConnectProcessor.getPublicAccessAddr().isSetPort()) {
location = Location.forGrpcInsecure(
flightSQLConnectProcessor.getPublicAccessAddr().hostname,
flightSQLConnectProcessor.getPublicAccessAddr().port);
List<FlightEndpoint> endpoints = Lists.newArrayList();
for (FlightSqlEndpointsLocation endpointLoc : connectContext.getFlightSqlEndpointsLocations()) {
TUniqueId tid = endpointLoc.getFinstId();
// Ticket contains the IP and Brpc Port of the Doris BE node where the query result is located.
final ByteString handle = ByteString.copyFromUtf8(
DebugUtil.printId(tid) + "&" + endpointLoc.getResultInternalServiceAddr().hostname + "&"
+ endpointLoc.getResultInternalServiceAddr().port + "&" + query);
TicketStatementQuery ticketStatement = TicketStatementQuery.newBuilder()
.setStatementHandle(handle).build();
Ticket ticket = new Ticket(Any.pack(ticketStatement).toByteArray());
Location location;
if (endpointLoc.getResultPublicAccessAddr().isSetHostname()) {
// In a production environment, it is often inconvenient to expose Doris BE nodes
// to the external network.
// However, a reverse proxy (such as nginx) can be added to all Doris BE nodes,
// and the external client will be randomly routed to a Doris BE node when connecting
// to nginx.
// The query results of Arrow Flight SQL will be randomly saved on a Doris BE node.
// If it is different from the Doris BE node randomly routed by nginx,
// data forwarding needs to be done inside the Doris BE node.
if (endpointLoc.getResultPublicAccessAddr().isSetPort()) {
location = Location.forGrpcInsecure(endpointLoc.getResultPublicAccessAddr().hostname,
endpointLoc.getResultPublicAccessAddr().port);
} else {
location = Location.forGrpcInsecure(endpointLoc.getResultPublicAccessAddr().hostname,
endpointLoc.getResultFlightServerAddr().port);
}
} else {
location = Location.forGrpcInsecure(
flightSQLConnectProcessor.getPublicAccessAddr().hostname,
connectContext.getResultFlightServerAddr().port);
location = Location.forGrpcInsecure(endpointLoc.getResultFlightServerAddr().hostname,
endpointLoc.getResultFlightServerAddr().port);
}
} else {
location = Location.forGrpcInsecure(connectContext.getResultFlightServerAddr().hostname,
connectContext.getResultFlightServerAddr().port);
// By default, the query results of all BE nodes will be aggregated to one BE node.
// ADBC Client will only receive one endpoint and pull data from the BE node
// corresponding to this endpoint.
// `set global enable_parallel_result_sink=true;` to allow each BE to return query results
// separately. ADBC Client will receive multiple endpoints and pull data from each endpoint.
endpoints.add(new FlightEndpoint(ticket, location));
}
List<FlightEndpoint> endpoints = Collections.singletonList(new FlightEndpoint(ticket, location));
// TODO Set in BE callback after query end, Client will not callback.
return new FlightInfo(schema, descriptor, endpoints, -1, -1);
return new FlightInfo(flightSQLConnectProcessor.getArrowSchema(), descriptor, endpoints, -1, -1);
}
}
} catch (Exception e) {
Expand Down
Loading

0 comments on commit 05b48d6

Please sign in to comment.