Skip to content

Commit

Permalink
Enforce written intermediate bytes limit for cte materialization
Browse files Browse the repository at this point in the history
  • Loading branch information
jaystarshot committed Mar 1, 2024
1 parent d05abfb commit 245c93f
Show file tree
Hide file tree
Showing 16 changed files with 168 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import static com.facebook.presto.SystemSessionProperties.CTE_FILTER_AND_PROJECTION_PUSHDOWN_ENABLED;
import static com.facebook.presto.SystemSessionProperties.CTE_MATERIALIZATION_STRATEGY;
import static com.facebook.presto.SystemSessionProperties.PUSHDOWN_SUBFIELDS_ENABLED;
import static com.facebook.presto.SystemSessionProperties.QUERY_MAX_WRITTEN_INTERMEDIATE_BYTES;
import static com.facebook.presto.testing.assertions.Assert.assertEquals;
import static io.airlift.tpch.TpchTable.CUSTOMER;
import static io.airlift.tpch.TpchTable.LINE_ITEM;
Expand Down Expand Up @@ -1100,6 +1101,18 @@ public void testCteNoFilterPushDown()
queryRunner.execute(getSession(), query));
}

@Test
public void testWrittenIntemediateByteLimit()
throws Exception
{
String testQuery = "WITH cte1 AS (SELECT * FROM ORDERS JOIN ORDERS ON TRUE) " +
"SELECT * FROM cte1";
Session session = Session.builder(getMaterializedSession())
.setSystemProperty(QUERY_MAX_WRITTEN_INTERMEDIATE_BYTES, "0MB")
.build();
assertQueryFails(session, testQuery, "Query has exceeded WrittenIntermediate Limit of 0MB.*");
}

private void compareResults(MaterializedResult actual, MaterializedResult expected)
{
compareResults(actual, expected, false);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.facebook.presto;

import com.facebook.presto.spi.PrestoException;
import io.airlift.units.DataSize;

import static com.facebook.presto.spi.StandardErrorCode.EXCEEDED_WRITTEN_INTERMEDIATE_BYTES_LIMIT;

public class ExceededIntermediateWrittenBytesException
extends PrestoException
{
public ExceededIntermediateWrittenBytesException(DataSize limit)
{
super(EXCEEDED_WRITTEN_INTERMEDIATE_BYTES_LIMIT, "Query has exceeded WrittenIntermediate Limit of " + limit.toString() +
", Please retry with set SESSION cte_materialization_strategy=none;");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,8 @@ public final class SystemSessionProperties
public static final String RESOURCE_OVERCOMMIT = "resource_overcommit";
public static final String QUERY_MAX_CPU_TIME = "query_max_cpu_time";
public static final String QUERY_MAX_SCAN_RAW_INPUT_BYTES = "query_max_scan_raw_input_bytes";

public static final String QUERY_MAX_WRITTEN_INTERMEDIATE_BYTES = "query_max_written_intermediate_bytes";
public static final String QUERY_MAX_OUTPUT_POSITIONS = "query_max_output_positions";
public static final String QUERY_MAX_OUTPUT_SIZE = "query_max_output_size";
public static final String QUERY_MAX_STAGE_COUNT = "query_max_stage_count";
Expand Down Expand Up @@ -621,6 +623,11 @@ public SystemSessionProperties(
"Maximum scan raw input bytes of a query",
queryManagerConfig.getQueryMaxScanRawInputBytes(),
false),
dataSizeProperty(
QUERY_MAX_WRITTEN_INTERMEDIATE_BYTES,
"Maximum written intermediate bytes of a query",
queryManagerConfig.getQueryMaxWrittenIntermediateBytes(),
false),
longProperty(
QUERY_MAX_OUTPUT_POSITIONS,
"Maximum number of output rows that can be fetched by a query",
Expand Down Expand Up @@ -2253,6 +2260,11 @@ public static Duration getQueryMaxCpuTime(Session session)
return session.getSystemProperty(QUERY_MAX_CPU_TIME, Duration.class);
}

public static DataSize getQueryMaxWrittenIntermediateBytesLimit(Session session)
{
return session.getSystemProperty(QUERY_MAX_WRITTEN_INTERMEDIATE_BYTES, DataSize.class);
}

public static DataSize getQueryMaxScanRawInputBytes(Session session)
{
return session.getSystemProperty(QUERY_MAX_SCAN_RAW_INPUT_BYTES, DataSize.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,12 @@ public DataSize getRawInputDataSize()
return DataSize.succinctBytes(0);
}

@Override
public DataSize getWrittenIntermediateDataSize()
{
return DataSize.succinctBytes(0);
}

@Override
public long getOutputPositions()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,11 @@ public DataSize getRawInputDataSize()
return DataSize.succinctBytes(0);
}

@Override
public DataSize getWrittenIntermediateDataSize()
{
return DataSize.succinctBytes(0);
}
@Override
public long getOutputPositions()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,8 @@ public interface QueryExecution

DataSize getRawInputDataSize();

DataSize getWrittenIntermediateDataSize();

long getOutputPositions();

DataSize getOutputDataSize();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import java.util.concurrent.TimeUnit;

import static io.airlift.units.DataSize.Unit.PETABYTE;
import static io.airlift.units.DataSize.Unit.TERABYTE;
import static java.util.concurrent.TimeUnit.MINUTES;

@DefunctConfig({
Expand Down Expand Up @@ -79,6 +80,7 @@ public class QueryManagerConfig
private Duration queryMaxCpuTime = new Duration(1_000_000_000, TimeUnit.DAYS);

private DataSize queryMaxScanRawInputBytes = DataSize.succinctDataSize(1000, PETABYTE);
private DataSize queryMaxWrittenIntermediateBytes = DataSize.succinctDataSize(2, TERABYTE);
private long queryMaxOutputPositions = Long.MAX_VALUE;
private DataSize queryMaxOutputSize = DataSize.succinctDataSize(1000, PETABYTE);

Expand Down Expand Up @@ -481,6 +483,18 @@ public QueryManagerConfig setQueryMaxScanRawInputBytes(DataSize queryMaxRawInput
return this;
}

public DataSize getQueryMaxWrittenIntermediateBytes()
{
return this.queryMaxWrittenIntermediateBytes;
}

@Config("query.max-written-intermediate-bytes")
public QueryManagerConfig setQueryMaxWrittenIntermediateBytes(DataSize queryMaxWrittenIntermediateBytes)
{
this.queryMaxWrittenIntermediateBytes = queryMaxWrittenIntermediateBytes;
return this;
}

@Min(1)
public long getQueryMaxOutputPositions()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -377,6 +377,20 @@ public DataSize getRawInputDataSize()
return scheduler.getRawInputDataSize();
}

@Override
public DataSize getWrittenIntermediateDataSize()
{
SqlQuerySchedulerInterface scheduler = queryScheduler.get();
Optional<QueryInfo> finalQueryInfo = stateMachine.getFinalQueryInfo();
if (finalQueryInfo.isPresent()) {
return finalQueryInfo.get().getQueryStats().getWrittenIntermediatePhysicalDataSize();
}
if (scheduler == null) {
return new DataSize(0, BYTE);
}
return scheduler.getWrittenIntermediateDataSize();
}

@Override
public long getOutputPositions()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import com.facebook.airlift.concurrent.ThreadPoolExecutorMBean;
import com.facebook.airlift.log.Logger;
import com.facebook.presto.ExceededCpuLimitException;
import com.facebook.presto.ExceededIntermediateWrittenBytesException;
import com.facebook.presto.ExceededOutputSizeLimitException;
import com.facebook.presto.ExceededScanLimitException;
import com.facebook.presto.Session;
Expand All @@ -30,6 +31,7 @@
import com.facebook.presto.server.BasicQueryInfo;
import com.facebook.presto.spi.PrestoException;
import com.facebook.presto.spi.QueryId;
import com.facebook.presto.spi.eventlistener.CTEInformation;
import com.facebook.presto.spi.resourceGroups.ResourceGroupQueryLimits;
import com.facebook.presto.sql.planner.Plan;
import com.facebook.presto.version.EmbedVersion;
Expand Down Expand Up @@ -62,6 +64,7 @@
import static com.facebook.presto.SystemSessionProperties.getQueryMaxOutputPositions;
import static com.facebook.presto.SystemSessionProperties.getQueryMaxOutputSize;
import static com.facebook.presto.SystemSessionProperties.getQueryMaxScanRawInputBytes;
import static com.facebook.presto.SystemSessionProperties.getQueryMaxWrittenIntermediateBytesLimit;
import static com.facebook.presto.execution.QueryLimit.Source.QUERY;
import static com.facebook.presto.execution.QueryLimit.Source.RESOURCE_GROUP;
import static com.facebook.presto.execution.QueryLimit.Source.SYSTEM;
Expand All @@ -88,6 +91,8 @@ public class SqlQueryManager

private final Duration maxQueryCpuTime;
private final DataSize maxQueryScanPhysicalBytes;

private final DataSize maxWrittenIntermediatePhysicalBytes;
private final long maxQueryOutputPositions;
private final DataSize maxQueryOutputSize;

Expand All @@ -114,6 +119,7 @@ public SqlQueryManager(

this.maxQueryCpuTime = queryManagerConfig.getQueryMaxCpuTime();
this.maxQueryScanPhysicalBytes = queryManagerConfig.getQueryMaxScanRawInputBytes();
this.maxWrittenIntermediatePhysicalBytes = queryManagerConfig.getQueryMaxWrittenIntermediateBytes();
this.maxQueryOutputPositions = queryManagerConfig.getQueryMaxOutputPositions();
this.maxQueryOutputSize = queryManagerConfig.getQueryMaxOutputSize();

Expand Down Expand Up @@ -158,6 +164,13 @@ public void start()
log.error(e, "Error enforcing query output rows limits");
}

try {
enforceWrittenIntermediateBytesLimit();
}
catch (Throwable e) {
log.error(e, "Error enforcing written intermediate limits");
}

try {
enforceOutputSizeLimits();
}
Expand Down Expand Up @@ -419,6 +432,26 @@ private void enforceScanLimits()
}
}

/**
* Enforce WrittenIntermediateDataSize bytes limits
*/
private void enforceWrittenIntermediateBytesLimit()
{
for (QueryExecution query : queryTracker.getAllQueries()) {
if (query.getSession().getCteInformationCollector()
.getCTEInformationList().stream().noneMatch(CTEInformation::isMaterialized)) {
// No Ctes Materialized
continue;
}
DataSize writtenIntermediateDataSize = query.getWrittenIntermediateDataSize();
DataSize sessionlimit = getQueryMaxWrittenIntermediateBytesLimit(query.getSession());
DataSize limit = Ordering.natural().min(maxWrittenIntermediatePhysicalBytes, sessionlimit);
if (writtenIntermediateDataSize.compareTo(limit) >= 0) {
query.fail(new ExceededIntermediateWrittenBytesException(limit));
}
}
}

/**
* Enforce query output row limits
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import com.facebook.presto.metadata.InternalNode;
import com.facebook.presto.metadata.RemoteTransactionHandle;
import com.facebook.presto.metadata.Split;
import com.facebook.presto.server.remotetask.HttpRemoteTask;
import com.facebook.presto.spi.PrestoException;
import com.facebook.presto.spi.plan.PlanNodeId;
import com.facebook.presto.split.RemoteSplit;
Expand Down Expand Up @@ -350,6 +351,17 @@ public synchronized DataSize getRawInputDataSize()
return DataSize.succinctBytes(datasize);
}

public synchronized DataSize getWrittenIntermediateDataSize()
{
long datasize = getAllTasks().stream()
.filter(remoteTask -> remoteTask instanceof HttpRemoteTask)
.map(remoteTask -> (HttpRemoteTask) remoteTask)
.filter(httpRemoteTask -> !httpRemoteTask.getPlanFragment().isOutputTableWriterFragment())
.mapToLong(task -> task.getTaskInfo().getStats().getPhysicalWrittenDataSizeInBytes())
.sum();
return DataSize.succinctBytes(datasize);
}

public BasicStageExecutionStats getBasicStageStats()
{
return stateMachine.getBasicStageStats(this::getAllTaskInfo);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -825,6 +825,15 @@ public DataSize getRawInputDataSize()
return DataSize.succinctBytes(datasize);
}

@Override
public DataSize getWrittenIntermediateDataSize()
{
long datasize = stageExecutions.values().stream()
.mapToLong(stage -> stage.getStageExecution().getWrittenIntermediateDataSize().toBytes())
.sum();
return DataSize.succinctBytes(datasize);
}

@Override
public long getOutputPositions()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ public interface SqlQuerySchedulerInterface

DataSize getRawInputDataSize();

DataSize getWrittenIntermediateDataSize();

long getOutputPositions();

DataSize getOutputDataSize();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -399,6 +399,11 @@ public HttpRemoteTask(
}
}

public PlanFragment getPlanFragment()
{
return planFragment;
}

@Override
public TaskId getTaskId()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,12 @@ public DataSize getRawInputDataSize()
return null;
}

@Override
public DataSize getWrittenIntermediateDataSize()
{
return null;
}

@Override
public long getOutputPositions()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@

import static io.airlift.units.DataSize.Unit.MEGABYTE;
import static io.airlift.units.DataSize.Unit.PETABYTE;
import static io.airlift.units.DataSize.Unit.TERABYTE;
import static java.util.concurrent.TimeUnit.HOURS;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static java.util.concurrent.TimeUnit.MINUTES;
Expand Down Expand Up @@ -64,6 +65,7 @@ public void testDefaults()
.setQueryMaxExecutionTime(new Duration(100, TimeUnit.DAYS))
.setQueryMaxCpuTime(new Duration(1_000_000_000, TimeUnit.DAYS))
.setQueryMaxScanRawInputBytes(new DataSize(1000, PETABYTE))
.setQueryMaxWrittenIntermediateBytes(new DataSize(2, TERABYTE))
.setQueryMaxOutputPositions(Long.MAX_VALUE)
.setQueryMaxOutputSize(new DataSize(1000, PETABYTE))
.setRequiredWorkers(1)
Expand Down Expand Up @@ -115,6 +117,7 @@ public void testExplicitPropertyMappings()
.put("query.max-execution-time", "3h")
.put("query.max-cpu-time", "2d")
.put("query.max-scan-raw-input-bytes", "1MB")
.put("query.max-written-intermediate-bytes", "100MB")
.put("query.max-output-positions", "259")
.put("query.max-output-size", "100MB")
.put("query.use-streaming-exchange-for-mark-distinct", "true")
Expand All @@ -134,6 +137,7 @@ public void testExplicitPropertyMappings()
.put("query.cte-hash-partition-count", "128")
.put("query.cte-partitioning-provider-catalog", "hive")
.put("query-manager.enable-worker-isolation", "true")

.build();

QueryManagerConfig expected = new QueryManagerConfig()
Expand Down Expand Up @@ -167,6 +171,7 @@ public void testExplicitPropertyMappings()
.setQueryMaxScanRawInputBytes(new DataSize(1, MEGABYTE))
.setQueryMaxOutputPositions(259)
.setQueryMaxOutputSize(new DataSize(100, MEGABYTE))
.setQueryMaxWrittenIntermediateBytes(new DataSize(100, MEGABYTE))
.setRequiredWorkers(333)
.setRequiredWorkersMaxWait(new Duration(33, TimeUnit.MINUTES))
.setRequiredCoordinators(999)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,7 @@ public enum StandardErrorCode
NATIVE_EXECUTION_PROCESS_LAUNCH_ERROR(0x0002_000F, INTERNAL_ERROR),
MISSING_RESOURCE_GROUP_SELECTOR(0x0002_0010, INTERNAL_ERROR),
EXCEEDED_HEAP_MEMORY_LIMIT(0x0002_0011, INSUFFICIENT_RESOURCES),
EXCEEDED_WRITTEN_INTERMEDIATE_BYTES_LIMIT(0x0002_0012, INSUFFICIENT_RESOURCES),
/**/;

// Error code range 0x0003 is reserved for Presto-on-Spark
Expand Down

0 comments on commit 245c93f

Please sign in to comment.