diff --git a/fe/fe-core/src/main/java/org/apache/doris/cloud/load/CloudBrokerLoadJob.java b/fe/fe-core/src/main/java/org/apache/doris/cloud/load/CloudBrokerLoadJob.java index 0c8e20827167d4..cca896614d53c6 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/cloud/load/CloudBrokerLoadJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/cloud/load/CloudBrokerLoadJob.java @@ -25,6 +25,7 @@ import org.apache.doris.catalog.Table; import org.apache.doris.cloud.system.CloudSystemInfoService; import org.apache.doris.common.MetaNotFoundException; +import org.apache.doris.common.Status; import org.apache.doris.common.UserException; import org.apache.doris.common.util.LogBuilder; import org.apache.doris.common.util.LogKey; @@ -45,6 +46,7 @@ import org.apache.doris.qe.QeProcessorImpl; import org.apache.doris.qe.StmtExecutor; import org.apache.doris.system.Backend; +import org.apache.doris.thrift.TStatusCode; import org.apache.doris.thrift.TUniqueId; import com.google.common.base.Strings; @@ -316,7 +318,7 @@ protected void unprotectedExecuteRetry(FailMsg failMsg) { for (TUniqueId loadId : loadIds) { Coordinator coordinator = QeProcessorImpl.INSTANCE.getCoordinator(loadId); if (coordinator != null) { - coordinator.cancel(); + coordinator.cancel(new Status(TStatusCode.CANCELLED, "load job failed")); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/manager/QueryProfileAction.java b/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/manager/QueryProfileAction.java index 0623932bc9dad2..e5f19f90a1a9fe 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/manager/QueryProfileAction.java +++ b/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/manager/QueryProfileAction.java @@ -22,6 +22,7 @@ import org.apache.doris.common.AuthenticationException; import org.apache.doris.common.Config; import org.apache.doris.common.Pair; +import org.apache.doris.common.Status; import org.apache.doris.common.proc.CurrentQueryStatementsProcNode; import org.apache.doris.common.proc.ProcResult; import org.apache.doris.common.profile.ProfileTreeNode; @@ -38,6 +39,7 @@ import org.apache.doris.qe.ConnectContext; import org.apache.doris.service.ExecuteEnv; import org.apache.doris.service.FrontendOptions; +import org.apache.doris.thrift.TStatusCode; import com.google.common.base.Strings; import com.google.common.collect.ImmutableList; @@ -576,7 +578,7 @@ public Object killQuery(HttpServletRequest request, HttpServletResponse response } ExecuteEnv env = ExecuteEnv.getInstance(); - env.getScheduler().cancelQuery(queryId); + env.getScheduler().cancelQuery(queryId, new Status(TStatusCode.CANCELLED, "cancel query by rest api")); return ResponseEntityBuilder.ok(); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/InsertTask.java b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/InsertTask.java index 1c41c243f7d7d7..d1a425aeaf7838 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/InsertTask.java +++ b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/InsertTask.java @@ -21,6 +21,7 @@ import org.apache.doris.catalog.Column; import org.apache.doris.catalog.Env; import org.apache.doris.catalog.ScalarType; +import org.apache.doris.common.Status; import org.apache.doris.common.util.TimeUtils; import org.apache.doris.job.exception.JobException; import org.apache.doris.job.task.AbstractTask; @@ -33,6 +34,7 @@ import org.apache.doris.qe.StmtExecutor; import org.apache.doris.thrift.TCell; import org.apache.doris.thrift.TRow; +import org.apache.doris.thrift.TStatusCode; import org.apache.doris.thrift.TUniqueId; import com.google.common.collect.ImmutableList; @@ -219,7 +221,7 @@ protected void executeCancelLogic() { } isCanceled.getAndSet(true); if (null != stmtExecutor) { - stmtExecutor.cancel(); + stmtExecutor.cancel(new Status(TStatusCode.CANCELLED, "insert task cancelled")); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/mtmv/MTMVTask.java b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/mtmv/MTMVTask.java index 59a421509d9c75..a2ec9fb03b00dd 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/mtmv/MTMVTask.java +++ b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/mtmv/MTMVTask.java @@ -25,6 +25,7 @@ import org.apache.doris.common.AnalysisException; import org.apache.doris.common.DdlException; import org.apache.doris.common.FeConstants; +import org.apache.doris.common.Status; import org.apache.doris.common.UserException; import org.apache.doris.common.util.DebugUtil; import org.apache.doris.common.util.TimeUtils; @@ -50,6 +51,7 @@ import org.apache.doris.qe.StmtExecutor; import org.apache.doris.thrift.TCell; import org.apache.doris.thrift.TRow; +import org.apache.doris.thrift.TStatusCode; import org.apache.doris.thrift.TUniqueId; import com.google.common.collect.ImmutableList; @@ -258,7 +260,7 @@ public synchronized void onSuccess() throws JobException { protected synchronized void executeCancelLogic() { LOG.info("mtmv task cancel, taskId: {}", super.getTaskId()); if (executor != null) { - executor.cancel(); + executor.cancel(new Status(TStatusCode.CANCELLED, "mtmv task cancelled")); } after(); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/ExportTaskExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/load/ExportTaskExecutor.java index d18d69bd90bbe3..1cfdc0c174c645 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/ExportTaskExecutor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/ExportTaskExecutor.java @@ -25,6 +25,7 @@ import org.apache.doris.catalog.Partition; import org.apache.doris.catalog.TabletMeta; import org.apache.doris.common.AnalysisException; +import org.apache.doris.common.Status; import org.apache.doris.load.ExportFailMsg.CancelType; import org.apache.doris.nereids.analyzer.UnboundRelation; import org.apache.doris.nereids.glue.LogicalPlanAdapter; @@ -35,6 +36,7 @@ import org.apache.doris.qe.StmtExecutor; import org.apache.doris.scheduler.exception.JobException; import org.apache.doris.scheduler.executor.TransientTaskExecutor; +import org.apache.doris.thrift.TStatusCode; import org.apache.doris.thrift.TUniqueId; import com.google.common.collect.Lists; @@ -156,7 +158,7 @@ public void cancel() throws JobException { } isCanceled.getAndSet(true); if (stmtExecutor != null) { - stmtExecutor.cancel(); + stmtExecutor.cancel(new Status(TStatusCode.CANCELLED, "export task cancelled")); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadJob.java b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadJob.java index 3268fb3c464836..f450a1dca7dff6 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadJob.java @@ -36,6 +36,7 @@ import org.apache.doris.common.LoadException; import org.apache.doris.common.MetaNotFoundException; import org.apache.doris.common.QuotaExceedException; +import org.apache.doris.common.Status; import org.apache.doris.common.UserException; import org.apache.doris.common.io.Text; import org.apache.doris.common.io.Writable; @@ -57,6 +58,7 @@ import org.apache.doris.qe.QeProcessorImpl; import org.apache.doris.thrift.TEtlState; import org.apache.doris.thrift.TPipelineWorkloadGroup; +import org.apache.doris.thrift.TStatusCode; import org.apache.doris.thrift.TUniqueId; import org.apache.doris.transaction.AbstractTxnStateChangeCallback; import org.apache.doris.transaction.BeginTransactionException; @@ -607,7 +609,7 @@ protected void unprotectedExecuteRetry(FailMsg failMsg) { for (TUniqueId loadId : loadIds) { Coordinator coordinator = QeProcessorImpl.INSTANCE.getCoordinator(loadId); if (coordinator != null) { - coordinator.cancel(); + coordinator.cancel(new Status(TStatusCode.CANCELLED, failMsg.getMsg())); } } @@ -671,7 +673,7 @@ protected void unprotectedExecuteCancel(FailMsg failMsg, boolean abortTxn) { for (TUniqueId loadId : loadIds) { Coordinator coordinator = QeProcessorImpl.INSTANCE.getCoordinator(loadId); if (coordinator != null) { - coordinator.cancel(); + coordinator.cancel(new Status(TStatusCode.CANCELLED, failMsg.getMsg())); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/AbstractInsertExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/AbstractInsertExecutor.java index de3fc5eb9530bb..0dc5922794ea8a 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/AbstractInsertExecutor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/AbstractInsertExecutor.java @@ -24,6 +24,7 @@ import org.apache.doris.common.ErrorCode; import org.apache.doris.common.ErrorReport; import org.apache.doris.common.FeConstants; +import org.apache.doris.common.Status; import org.apache.doris.common.UserException; import org.apache.doris.common.util.DebugUtil; import org.apache.doris.nereids.NereidsPlanner; @@ -37,6 +38,7 @@ import org.apache.doris.qe.StmtExecutor; import org.apache.doris.task.LoadEtlTask; import org.apache.doris.thrift.TQueryType; +import org.apache.doris.thrift.TStatusCode; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -146,7 +148,7 @@ protected final void execImpl(StmtExecutor executor, long jobId) throws Exceptio } boolean notTimeout = coordinator.join(execTimeout); if (!coordinator.isDone()) { - coordinator.cancel(); + coordinator.cancel(new Status(TStatusCode.CANCELLED, "insert timeout")); if (notTimeout) { errMsg = coordinator.getExecStatus().getErrorMsg(); ErrorReport.reportDdlException("there exists unhealthy backend. " diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java index 155c4a769546d5..dd00944c64f6c5 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java @@ -938,7 +938,7 @@ public void kill(boolean killConnection) { closeChannel(); } // Now, cancel running query. - cancelQuery(); + cancelQuery(new Status(TStatusCode.CANCELLED, "cancel query by user")); } // kill operation with no protect by timeout. @@ -960,10 +960,10 @@ private void killByTimeout(boolean killConnection) { } } - public void cancelQuery() { + public void cancelQuery(Status cancelReason) { StmtExecutor executorRef = executor; if (executorRef != null) { - executorRef.cancel(); + executorRef.cancel(cancelReason); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectScheduler.java b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectScheduler.java index cafe9edd3a18f0..43fa4dddca7844 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectScheduler.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectScheduler.java @@ -19,6 +19,7 @@ import org.apache.doris.analysis.UserIdentity; import org.apache.doris.catalog.Env; +import org.apache.doris.common.Status; import org.apache.doris.common.ThreadPoolManager; import org.apache.doris.common.util.DebugUtil; import org.apache.doris.mysql.privilege.PrivPredicate; @@ -145,11 +146,11 @@ public ConnectContext getContext(String flightToken) { return null; } - public void cancelQuery(String queryId) { + public void cancelQuery(String queryId, Status cancelReason) { for (ConnectContext ctx : connectionMap.values()) { TUniqueId qid = ctx.queryId(); if (qid != null && DebugUtil.printId(qid).equals(queryId)) { - ctx.cancelQuery(); + ctx.cancelQuery(cancelReason); break; } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java index 39cdb051378c90..4753436196ddf2 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java @@ -1278,18 +1278,11 @@ public Status shouldCancel(List currentBackends) { } } - // Cancel execution of query. This includes the execution of the local plan - // fragment, - // if any, as well as all plan fragments on remote nodes. - public void cancel() { - cancel(new Status(TStatusCode.CANCELLED, "query is cancelled by user")); + @Override + public void cancel(Status cancelReason) { if (queueToken != null) { queueToken.cancel(); } - } - - @Override - public void cancel(Status cancelReason) { for (ScanNode scanNode : scanNodes) { scanNode.stop(); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java index 89dc1634b84ac6..4f7b676709cdfd 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java @@ -1528,7 +1528,7 @@ private void resetAnalyzerAndStmt() { } // Because this is called by other thread - public void cancel() { + public void cancel(Status cancelReason) { if (masterOpExecutor != null) { try { masterOpExecutor.cancel(); @@ -1544,7 +1544,7 @@ public void cancel() { } Coordinator coordRef = coord; if (coordRef != null) { - coordRef.cancel(); + coordRef.cancel(cancelReason); } if (mysqlLoadId != null) { Env.getCurrentEnv().getLoadManager().getMysqlLoadManager().cancelMySqlLoad(mysqlLoadId); @@ -1570,20 +1570,6 @@ private Optional getInsertOverwriteTableCommand() { return Optional.empty(); } - // Because this is called by other thread - public void cancel(Status cancelReason) { - Coordinator coordRef = coord; - if (coordRef != null) { - coordRef.cancel(cancelReason); - } - if (mysqlLoadId != null) { - Env.getCurrentEnv().getLoadManager().getMysqlLoadManager().cancelMySqlLoad(mysqlLoadId); - } - if (parsedStmt instanceof AnalyzeTblStmt || parsedStmt instanceof AnalyzeDBStmt) { - Env.getCurrentEnv().getAnalysisManager().cancelSyncTask(context); - } - } - // Handle kill statement. private void handleKill() throws UserException { KillStmt killStmt = (KillStmt) parsedStmt; diff --git a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadActionCancelQuery.java b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadActionCancelQuery.java index 2dcff6075f4d74..268ccc8a5f6b50 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadActionCancelQuery.java +++ b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadActionCancelQuery.java @@ -17,7 +17,9 @@ package org.apache.doris.resource.workloadschedpolicy; +import org.apache.doris.common.Status; import org.apache.doris.qe.QeProcessorImpl; +import org.apache.doris.thrift.TStatusCode; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -32,7 +34,7 @@ public void exec(WorkloadQueryInfo queryInfo) { && queryInfo.tUniqueId != null && QeProcessorImpl.INSTANCE.getCoordinator(queryInfo.tUniqueId) != null) { LOG.info("cancel query {} triggered by query schedule policy.", queryInfo.queryId); - queryInfo.context.cancelQuery(); + queryInfo.context.cancelQuery(new Status(TStatusCode.CANCELLED, "cancel query by workload policy")); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java index 09ceaad8cde771..54d52cd499bd16 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java +++ b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java @@ -61,6 +61,7 @@ import org.apache.doris.common.Pair; import org.apache.doris.common.PatternMatcher; import org.apache.doris.common.PatternMatcherException; +import org.apache.doris.common.Status; import org.apache.doris.common.ThriftServerContext; import org.apache.doris.common.ThriftServerEventProcessor; import org.apache.doris.common.UserException; @@ -1065,7 +1066,7 @@ public TMasterOpResult forward(TMasterOpRequest params) throws TException { TUniqueId queryId = params.getQueryId(); ConnectContext ctx = proxyQueryIdToConnCtx.get(queryId); if (ctx != null) { - ctx.cancelQuery(); + ctx.cancelQuery(new Status(TStatusCode.CANCELLED, "cancel query by forward request.")); } final TMasterOpResult result = new TMasterOpResult(); result.setStatusCode(0); diff --git a/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/sessions/FlightSqlConnectContext.java b/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/sessions/FlightSqlConnectContext.java index 9f703dff92b1a9..4badae03b3141e 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/sessions/FlightSqlConnectContext.java +++ b/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/sessions/FlightSqlConnectContext.java @@ -17,11 +17,13 @@ package org.apache.doris.service.arrowflight.sessions; +import org.apache.doris.common.Status; import org.apache.doris.mysql.MysqlChannel; import org.apache.doris.qe.ConnectContext; import org.apache.doris.qe.ConnectProcessor; import org.apache.doris.service.arrowflight.results.FlightSqlChannel; import org.apache.doris.thrift.TResultSinkType; +import org.apache.doris.thrift.TStatusCode; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -74,7 +76,7 @@ public void kill(boolean killConnection) { connectScheduler.unregisterConnection(this); } // Now, cancel running query. - cancelQuery(); + cancelQuery(new Status(TStatusCode.CANCELLED, "arrow flight query killed by user")); } @Override diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/BaseAnalysisTask.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/BaseAnalysisTask.java index 329231f360487c..e0708ea99fdfdc 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/statistics/BaseAnalysisTask.java +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/BaseAnalysisTask.java @@ -26,6 +26,7 @@ import org.apache.doris.catalog.TableIf; import org.apache.doris.catalog.Type; import org.apache.doris.common.DdlException; +import org.apache.doris.common.Status; import org.apache.doris.common.util.DebugUtil; import org.apache.doris.datasource.CatalogIf; import org.apache.doris.qe.AuditLogHelper; @@ -36,6 +37,7 @@ import org.apache.doris.statistics.AnalysisInfo.AnalysisType; import org.apache.doris.statistics.util.DBObjects; import org.apache.doris.statistics.util.StatisticsUtil; +import org.apache.doris.thrift.TStatusCode; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Joiner; @@ -267,7 +269,7 @@ protected void setTaskStateToRunning() { public void cancel() { killed = true; if (stmtExecutor != null) { - stmtExecutor.cancel(); + stmtExecutor.cancel(new Status(TStatusCode.CANCELLED, "analysis task cancelled")); } Env.getCurrentEnv().getAnalysisManager() .updateTaskStatus(info, AnalysisState.FAILED,