Skip to content

Commit

Permalink
Add per-node revocable memory limit
Browse files Browse the repository at this point in the history
This config allows spilling queries to fail after allocating a
sufficiently large amount of revocable memory.
  • Loading branch information
Saksham Sachdev authored and highker committed Nov 16, 2020
1 parent 32e32af commit 2499b40
Show file tree
Hide file tree
Showing 18 changed files with 84 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -276,6 +276,7 @@ protected Map<String, Long> runOnce()
new DataSize(256, MEGABYTE),
new DataSize(512, MEGABYTE),
new DataSize(256, MEGABYTE),
new DataSize(1, GIGABYTE),
memoryPool,
new TestingGcMonitor(),
localQueryRunner.getExecutor(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ public List<Page> execute(@Language("SQL") String query)
new DataSize(1, GIGABYTE),
new DataSize(2, GIGABYTE),
new DataSize(1, GIGABYTE),
new DataSize(2, GIGABYTE),
memoryPool,
new TestingGcMonitor(),
localQueryRunner.getExecutor(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import static com.facebook.presto.spi.StandardErrorCode.EXCEEDED_GLOBAL_MEMORY_LIMIT;
import static com.facebook.presto.spi.StandardErrorCode.EXCEEDED_LOCAL_MEMORY_LIMIT;
import static com.facebook.presto.spi.StandardErrorCode.EXCEEDED_REVOCABLE_MEMORY_LIMIT;
import static java.lang.String.format;

public class ExceededMemoryLimitException
Expand Down Expand Up @@ -52,6 +53,13 @@ public static ExceededMemoryLimitException exceededLocalTotalMemoryLimit(DataSiz
format("Query exceeded per-node total memory limit of %s [%s]", maxMemory, additionalFailureInfo));
}

public static ExceededMemoryLimitException exceededLocalRevocableMemoryLimit(DataSize maxMemory, String additionalFailureInfo)
{
return new ExceededMemoryLimitException(
EXCEEDED_REVOCABLE_MEMORY_LIMIT,
format("Query exceeded per-node revocable memory limit of %s [%s]", maxMemory, additionalFailureInfo));
}

private ExceededMemoryLimitException(StandardErrorCode errorCode, String message)
{
super(errorCode, message);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -163,10 +163,11 @@ public SqlTaskManager(
DataSize maxQueryUserMemoryPerNode = nodeMemoryConfig.getMaxQueryMemoryPerNode();
DataSize maxQueryTotalMemoryPerNode = nodeMemoryConfig.getMaxQueryTotalMemoryPerNode();
DataSize maxQuerySpillPerNode = nodeSpillConfig.getQueryMaxSpillPerNode();
DataSize maxRevocableMemoryPerNode = nodeSpillConfig.getMaxRevocableMemoryPerNode();
DataSize maxQueryBroadcastMemory = nodeMemoryConfig.getMaxQueryBroadcastMemory();

queryContexts = CacheBuilder.newBuilder().weakValues().build(CacheLoader.from(
queryId -> createQueryContext(queryId, localMemoryManager, localSpillManager, gcMonitor, maxQueryUserMemoryPerNode, maxQueryTotalMemoryPerNode, maxQuerySpillPerNode, maxQueryBroadcastMemory)));
queryId -> createQueryContext(queryId, localMemoryManager, localSpillManager, gcMonitor, maxQueryUserMemoryPerNode, maxQueryTotalMemoryPerNode, maxRevocableMemoryPerNode, maxQuerySpillPerNode, maxQueryBroadcastMemory)));

tasks = CacheBuilder.newBuilder().build(CacheLoader.from(
taskId -> createSqlTask(
Expand All @@ -192,6 +193,7 @@ private QueryContext createQueryContext(
GcMonitor gcMonitor,
DataSize maxQueryUserMemoryPerNode,
DataSize maxQueryTotalMemoryPerNode,
DataSize maxRevocableMemoryPerNode,
DataSize maxQuerySpillPerNode,
DataSize maxQueryBroadcastMemory)
{
Expand All @@ -200,6 +202,7 @@ private QueryContext createQueryContext(
maxQueryUserMemoryPerNode,
maxQueryTotalMemoryPerNode,
maxQueryBroadcastMemory,
maxRevocableMemoryPerNode,
localMemoryManager.getGeneralPool(),
gcMonitor,
taskNotificationExecutor,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import java.util.function.Predicate;

import static com.facebook.presto.ExceededMemoryLimitException.exceededLocalBroadcastMemoryLimit;
import static com.facebook.presto.ExceededMemoryLimitException.exceededLocalRevocableMemoryLimit;
import static com.facebook.presto.ExceededMemoryLimitException.exceededLocalTotalMemoryLimit;
import static com.facebook.presto.ExceededMemoryLimitException.exceededLocalUserMemoryLimit;
import static com.facebook.presto.ExceededSpillLimitException.exceededPerQueryLocalLimit;
Expand Down Expand Up @@ -81,6 +82,10 @@ public class QueryContext
@GuardedBy("this")
private long peakNodeTotalMemory;

// TODO: Make max revocable memory be configurable by session property.
@GuardedBy("this")
private final long maxRevocableMemory;

@GuardedBy("this")
private long broadcastUsed;
@GuardedBy("this")
Expand All @@ -99,6 +104,7 @@ public QueryContext(
DataSize maxUserMemory,
DataSize maxTotalMemory,
DataSize maxBroadcastUsedMemory,
DataSize maxRevocableMemory,
MemoryPool memoryPool,
GcMonitor gcMonitor,
Executor notificationExecutor,
Expand All @@ -110,6 +116,7 @@ public QueryContext(
this.maxUserMemory = requireNonNull(maxUserMemory, "maxUserMemory is null").toBytes();
this.maxTotalMemory = requireNonNull(maxTotalMemory, "maxTotalMemory is null").toBytes();
this.maxBroadcastUsedMemory = requireNonNull(maxBroadcastUsedMemory, "maxBroadcastUsedMemory is null").toBytes();
this.maxRevocableMemory = requireNonNull(maxRevocableMemory, "maxRevocableMemory is null").toBytes();
this.memoryPool = requireNonNull(memoryPool, "memoryPool is null");
this.gcMonitor = requireNonNull(gcMonitor, "gcMonitor is null");
this.notificationExecutor = requireNonNull(notificationExecutor, "notificationExecutor is null");
Expand Down Expand Up @@ -164,7 +171,9 @@ private synchronized ListenableFuture<?> updateUserMemory(String allocationTag,
//TODO Add tagging support for revocable memory reservations if needed
private synchronized ListenableFuture<?> updateRevocableMemory(String allocationTag, long delta)
{
long totalRevocableMemory = memoryPool.getQueryRevocableMemoryReservation(queryId);
if (delta >= 0) {
enforceRevocableMemoryLimit(totalRevocableMemory, delta, maxRevocableMemory);
return memoryPool.reserveRevocable(queryId, delta);
}
memoryPool.freeRevocable(queryId, -delta);
Expand Down Expand Up @@ -427,6 +436,14 @@ private void enforceTotalMemoryLimit(long allocated, long delta, long maxMemory)
}
}

@GuardedBy("this")
private void enforceRevocableMemoryLimit(long allocated, long delta, long maxMemory)
{
if (allocated + delta > maxMemory) {
throw exceededLocalRevocableMemoryLimit(succinctBytes(maxMemory), getAdditionalFailureInfo(allocated, delta));
}
}

@GuardedBy("this")
private String getAdditionalFailureInfo(long allocated, long delta)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
public class NodeSpillConfig
{
private DataSize maxSpillPerNode = new DataSize(100, DataSize.Unit.GIGABYTE);
private DataSize maxRevocableMemoryPerNode = new DataSize(16, DataSize.Unit.GIGABYTE);
private DataSize queryMaxSpillPerNode = new DataSize(100, DataSize.Unit.GIGABYTE);
private DataSize tempStorageBufferSize = new DataSize(4, DataSize.Unit.KILOBYTE);

Expand Down Expand Up @@ -53,6 +54,19 @@ public NodeSpillConfig setQueryMaxSpillPerNode(DataSize queryMaxSpillPerNode)
return this;
}

@NotNull
public DataSize getMaxRevocableMemoryPerNode()
{
return maxRevocableMemoryPerNode;
}

@Config("experimental.max-revocable-memory-per-node")
public NodeSpillConfig setMaxRevocableMemoryPerNode(DataSize maxRevocableMemoryPerNode)
{
this.maxRevocableMemoryPerNode = maxRevocableMemoryPerNode;
return this;
}

public boolean isSpillCompressionEnabled()
{
return spillCompressionEnabled;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@ public static class Builder
private DataSize queryMaxTotalMemory = new DataSize(512, MEGABYTE);
private DataSize memoryPoolSize = new DataSize(1, GIGABYTE);
private DataSize maxSpillSize = new DataSize(1, GIGABYTE);
private DataSize maxRevocableMemory = new DataSize(1, GIGABYTE);
private DataSize queryMaxSpillSize = new DataSize(1, GIGABYTE);
private Optional<FragmentResultCacheContext> fragmentResultCacheContext = Optional.empty();

Expand Down Expand Up @@ -174,6 +175,7 @@ public TaskContext build()
queryMaxMemory,
queryMaxTotalMemory,
queryMaxMemory,
maxRevocableMemory,
memoryPool,
GC_MONITOR,
notificationExecutor,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,7 @@ public MockRemoteTask(TaskId taskId,
new DataSize(1, MEGABYTE),
new DataSize(2, MEGABYTE),
new DataSize(1, MEGABYTE),
new DataSize(1, GIGABYTE),
memoryPool,
new TestingGcMonitor(),
executor,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -502,6 +502,7 @@ private SqlTask newSqlTask()
new DataSize(1, MEGABYTE),
new DataSize(2, MEGABYTE),
new DataSize(1, MEGABYTE),
new DataSize(1, GIGABYTE),
memoryPool,
new TestingGcMonitor(),
executor,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -313,6 +313,7 @@ public SqlTask createInitialTask()
new DataSize(1, MEGABYTE),
new DataSize(2, MEGABYTE),
new DataSize(1, MEGABYTE),
new DataSize(1, GIGABYTE),
new MemoryPool(new MemoryPoolId("test"), new DataSize(1, GIGABYTE)),
new TestingGcMonitor(),
taskNotificationExecutor,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -614,6 +614,7 @@ private TaskContext newTestingTaskContext(ScheduledExecutorService taskNotificat
new DataSize(1, MEGABYTE),
new DataSize(2, MEGABYTE),
new DataSize(1, MEGABYTE),
new DataSize(1, GIGABYTE),
new MemoryPool(new MemoryPoolId("test"), new DataSize(1, GIGABYTE)),
new TestingGcMonitor(),
taskNotificationExecutor,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ private void setUp(Supplier<List<Driver>> driversSupplier)
TEN_MEGABYTES,
new DataSize(20, MEGABYTE),
TEN_MEGABYTES,
new DataSize(1, GIGABYTE),
userPool,
new TestingGcMonitor(),
localQueryRunner.getExecutor(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ public class TestMemoryTracking
{
private static final DataSize queryMaxMemory = new DataSize(1, GIGABYTE);
private static final DataSize queryMaxTotalMemory = new DataSize(1, GIGABYTE);
private static final DataSize queryMaxRevocableMemory = new DataSize(2, GIGABYTE);
private static final DataSize memoryPoolSize = new DataSize(1, GIGABYTE);
private static final DataSize maxSpillSize = new DataSize(1, GIGABYTE);
private static final DataSize queryMaxSpillSize = new DataSize(1, GIGABYTE);
Expand Down Expand Up @@ -102,6 +103,7 @@ public void setUpTest()
queryMaxMemory,
queryMaxTotalMemory,
queryMaxMemory,
queryMaxRevocableMemory,
memoryPool,
new TestingGcMonitor(),
notificationExecutor,
Expand Down Expand Up @@ -163,6 +165,23 @@ public void testLocalTotalMemoryLimitExceeded()
}
}

@Test
public void testLocalRevocableMemoryLimitExceeded()
{
LocalMemoryContext revocableMemoryContext = operatorContext.localRevocableMemoryContext();
revocableMemoryContext.setBytes(100);
assertOperatorMemoryAllocations(operatorContext.getOperatorMemoryContext(), 0, 0, 100);
revocableMemoryContext.setBytes(queryMaxRevocableMemory.toBytes());
assertOperatorMemoryAllocations(operatorContext.getOperatorMemoryContext(), 0, 0, queryMaxRevocableMemory.toBytes());
try {
revocableMemoryContext.setBytes(queryMaxRevocableMemory.toBytes() + 1);
fail("allocation should hit the per-node revocable memory limit");
}
catch (ExceededMemoryLimitException e) {
assertEquals(e.getMessage(), format("Query exceeded per-node revocable memory limit of %1$s [Allocated: %1$s, Delta: 1B]", queryMaxRevocableMemory));
}
}

@Test
public void testLocalSystemAllocations()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import static com.facebook.presto.memory.LocalMemoryManager.GENERAL_POOL;
import static com.facebook.presto.memory.LocalMemoryManager.RESERVED_POOL;
import static io.airlift.units.DataSize.Unit.BYTE;
import static io.airlift.units.DataSize.Unit.GIGABYTE;
import static java.util.concurrent.Executors.newScheduledThreadPool;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNull;
Expand Down Expand Up @@ -79,6 +80,7 @@ public void testSetMemoryPool(boolean useReservedPool)
new DataSize(10, BYTE),
new DataSize(20, BYTE),
new DataSize(10, BYTE),
new DataSize(1, GIGABYTE),
new MemoryPool(GENERAL_POOL, new DataSize(10, BYTE)),
new TestingGcMonitor(),
localQueryRunner.getExecutor(),
Expand Down Expand Up @@ -145,6 +147,7 @@ private static QueryContext createQueryContext(QueryId queryId, MemoryPool gener
new DataSize(10_000, BYTE),
new DataSize(10_000, BYTE),
new DataSize(10_000, BYTE),
new DataSize(1, GIGABYTE),
generalPool,
new TestingGcMonitor(),
TEST_EXECUTOR,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ public static GroupByHashYieldResult finishOperatorWithYieldingGroupByHash(List<
new DataSize(512, MEGABYTE),
new DataSize(1024, MEGABYTE),
new DataSize(512, MEGABYTE),
new DataSize(1, GIGABYTE),
memoryPool,
new TestingGcMonitor(),
EXECUTOR,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ public void testDefaults()
{
assertRecordedDefaults(ConfigAssertions.recordDefaults(NodeSpillConfig.class)
.setMaxSpillPerNode(new DataSize(100, GIGABYTE))
.setMaxRevocableMemoryPerNode(new DataSize(16, GIGABYTE))
.setQueryMaxSpillPerNode(new DataSize(100, GIGABYTE))
.setSpillCompressionEnabled(false)
.setSpillEncryptionEnabled(false)
Expand All @@ -44,6 +45,7 @@ public void testExplicitPropertyMappings()
{
Map<String, String> properties = new ImmutableMap.Builder<String, String>()
.put("experimental.max-spill-per-node", "10MB")
.put("experimental.max-revocable-memory-per-node", "24MB")
.put("experimental.query-max-spill-per-node", "15 MB")
.put("experimental.spill-compression-enabled", "true")
.put("experimental.spill-encryption-enabled", "true")
Expand All @@ -52,6 +54,7 @@ public void testExplicitPropertyMappings()

NodeSpillConfig expected = new NodeSpillConfig()
.setMaxSpillPerNode(new DataSize(10, MEGABYTE))
.setMaxRevocableMemoryPerNode(new DataSize(24, MEGABYTE))
.setQueryMaxSpillPerNode(new DataSize(15, MEGABYTE))
.setSpillCompressionEnabled(true)
.setSpillEncryptionEnabled(true)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,7 @@ public class PrestoSparkTaskExecutorFactory

private final DataSize maxUserMemory;
private final DataSize maxTotalMemory;
private final DataSize maxRevocableMemory;
private final DataSize maxSpillMemory;
private final DataSize sinkMaxBufferSize;

Expand Down Expand Up @@ -194,6 +195,7 @@ public PrestoSparkTaskExecutorFactory(
fragmentResultCacheManager,
requireNonNull(nodeMemoryConfig, "nodeMemoryConfig is null").getMaxQueryMemoryPerNode(),
requireNonNull(nodeMemoryConfig, "nodeMemoryConfig is null").getMaxQueryTotalMemoryPerNode(),
requireNonNull(nodeSpillConfig, "nodeSpillConfig is null").getMaxRevocableMemoryPerNode(),
requireNonNull(nodeSpillConfig, "nodeSpillConfig is null").getMaxSpillPerNode(),
requireNonNull(taskManagerConfig, "taskManagerConfig is null").getSinkMaxBufferSize(),
requireNonNull(taskManagerConfig, "taskManagerConfig is null").isPerOperatorCpuTimerEnabled(),
Expand All @@ -220,6 +222,7 @@ public PrestoSparkTaskExecutorFactory(
FragmentResultCacheManager fragmentResultCacheManager,
DataSize maxUserMemory,
DataSize maxTotalMemory,
DataSize maxRevocableMemory,
DataSize maxSpillMemory,
DataSize sinkMaxBufferSize,
boolean perOperatorCpuTimerEnabled,
Expand All @@ -244,6 +247,7 @@ public PrestoSparkTaskExecutorFactory(
this.fragmentResultCacheManager = requireNonNull(fragmentResultCacheManager, "fragmentResultCacheManager is null");
this.maxUserMemory = requireNonNull(maxUserMemory, "maxUserMemory is null");
this.maxTotalMemory = requireNonNull(maxTotalMemory, "maxTotalMemory is null");
this.maxRevocableMemory = requireNonNull(maxRevocableMemory, "maxRevocableMemory is null");
this.maxSpillMemory = requireNonNull(maxSpillMemory, "maxSpillMemory is null");
this.sinkMaxBufferSize = requireNonNull(sinkMaxBufferSize, "sinkMaxBufferSize is null");
this.perOperatorCpuTimerEnabled = perOperatorCpuTimerEnabled;
Expand Down Expand Up @@ -327,6 +331,7 @@ public <T extends PrestoSparkTaskOutput> IPrestoSparkTaskExecutor<T> doCreate(
maxUserMemory,
maxTotalMemory,
maxUserMemory,
maxRevocableMemory,
memoryPool,
new TestingGcMonitor(),
notificationExecutor,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@ public enum StandardErrorCode
ADMINISTRATIVELY_PREEMPTED(0x0002_0008, INSUFFICIENT_RESOURCES),
EXCEEDED_SCAN_RAW_BYTES_READ_LIMIT(0x0002_0009, INSUFFICIENT_RESOURCES),
EXCEEDED_OUTPUT_SIZE_LIMIT(0x0002_000A, INSUFFICIENT_RESOURCES),
EXCEEDED_REVOCABLE_MEMORY_LIMIT(0x0002_000B, INSUFFICIENT_RESOURCES),
/**/;

// Error code range 0x0003 is reserved for Presto-on-Spark
Expand Down

0 comments on commit 2499b40

Please sign in to comment.