Skip to content

Commit

Permalink
Print biggest splits when running out of TaskDescriptorStorage space
Browse files Browse the repository at this point in the history
  • Loading branch information
losipiuk committed Oct 3, 2023
1 parent 5b1838d commit db51813
Show file tree
Hide file tree
Showing 3 changed files with 24 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import com.google.common.math.Stats;
import com.google.errorprone.annotations.concurrent.GuardedBy;
import com.google.inject.Inject;
import io.airlift.json.JsonCodec;
import io.airlift.log.Logger;
import io.airlift.units.DataSize;
import io.trino.annotation.NotThreadSafe;
Expand All @@ -32,6 +33,7 @@
import java.util.Collection;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Objects;
Expand All @@ -54,21 +56,25 @@ public class TaskDescriptorStorage
private static final Logger log = Logger.get(TaskDescriptorStorage.class);

private final long maxMemoryInBytes;
private final JsonCodec<Split> splitJsonCodec;

@GuardedBy("this")
private final Map<QueryId, TaskDescriptors> storages = new HashMap<>();
@GuardedBy("this")
private long reservedBytes;

@Inject
public TaskDescriptorStorage(QueryManagerConfig config)
public TaskDescriptorStorage(
QueryManagerConfig config,
JsonCodec<Split> splitJsonCodec)
{
this(config.getFaultTolerantExecutionTaskDescriptorStorageMaxMemory());
this(config.getFaultTolerantExecutionTaskDescriptorStorageMaxMemory(), splitJsonCodec);
}

public TaskDescriptorStorage(DataSize maxMemory)
public TaskDescriptorStorage(DataSize maxMemory, JsonCodec<Split> splitJsonCodec)
{
this.maxMemoryInBytes = maxMemory.toBytes();
this.splitJsonCodec = requireNonNull(splitJsonCodec, "splitJsonCodec is null");
}

/**
Expand Down Expand Up @@ -189,7 +195,7 @@ public synchronized long getReservedBytes()
}

@NotThreadSafe
private static class TaskDescriptors
private class TaskDescriptors
{
private final Map<TaskDescriptorKey, TaskDescriptor> descriptors = new HashMap<>();
private long reservedBytes;
Expand Down Expand Up @@ -242,7 +248,14 @@ private String getDebugInfo()
Map.Entry::getKey,
entry -> getDebugInfo(entry.getValue())));

return String.valueOf(debugInfoByStageId);
List<String> biggestSplits = descriptorsByStageId.entries().stream()
.flatMap(entry -> entry.getValue().getSplits().entries().stream().map(splitEntry -> Map.entry("%s/%s".formatted(entry.getKey(), splitEntry.getKey()), splitEntry.getValue())))
.sorted(Comparator.<Map.Entry<String, Split>>comparingLong(entry -> entry.getValue().getRetainedSizeInBytes()).reversed())
.limit(3)
.map(entry -> "{nodeId=%s, size=%s, split=%s}".formatted(entry.getKey(), entry.getValue().getRetainedSizeInBytes(), splitJsonCodec.toJson(entry.getValue())))
.toList();

return "stagesInfo=%s; biggestSplits=%s".formatted(debugInfoByStageId, biggestSplits);
}

private String getDebugInfo(Collection<TaskDescriptor> taskDescriptors)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@
import io.trino.memory.TotalReservationLowMemoryKiller;
import io.trino.memory.TotalReservationOnBlockedNodesQueryLowMemoryKiller;
import io.trino.memory.TotalReservationOnBlockedNodesTaskLowMemoryKiller;
import io.trino.metadata.Split;
import io.trino.operator.ForScheduler;
import io.trino.operator.OperatorStats;
import io.trino.server.protocol.ExecutingStatementResource;
Expand Down Expand Up @@ -307,6 +308,7 @@ protected void setup(Binder binder)

binder.bind(EventDrivenTaskSourceFactory.class).in(Scopes.SINGLETON);
binder.bind(TaskDescriptorStorage.class).in(Scopes.SINGLETON);
jsonCodecBinder(binder).bindJsonCodec(Split.class);
newExporter(binder).export(TaskDescriptorStorage.class).withGeneratedName();

binder.bind(TaskExecutionStats.class).in(Scopes.SINGLETON);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@

import java.util.Optional;

import static io.airlift.json.JsonCodec.jsonCodec;
import static io.airlift.units.DataSize.Unit.KILOBYTE;
import static io.trino.operator.ExchangeOperator.REMOTE_CATALOG_HANDLE;
import static io.trino.spi.StandardErrorCode.EXCEEDED_TASK_DESCRIPTOR_STORAGE_CAPACITY;
Expand All @@ -51,7 +52,7 @@ public class TestTaskDescriptorStorage
@Test
public void testHappyPath()
{
TaskDescriptorStorage manager = new TaskDescriptorStorage(DataSize.of(15, KILOBYTE));
TaskDescriptorStorage manager = new TaskDescriptorStorage(DataSize.of(15, KILOBYTE), jsonCodec(Split.class));
manager.initialize(QUERY_1);
manager.initialize(QUERY_2);

Expand Down Expand Up @@ -101,7 +102,7 @@ public void testHappyPath()
@Test
public void testDestroy()
{
TaskDescriptorStorage manager = new TaskDescriptorStorage(DataSize.of(5, KILOBYTE));
TaskDescriptorStorage manager = new TaskDescriptorStorage(DataSize.of(5, KILOBYTE), jsonCodec(Split.class));
manager.initialize(QUERY_1);
manager.initialize(QUERY_2);

Expand All @@ -128,7 +129,7 @@ public void testDestroy()
@Test
public void testCapacityExceeded()
{
TaskDescriptorStorage manager = new TaskDescriptorStorage(DataSize.of(5, KILOBYTE));
TaskDescriptorStorage manager = new TaskDescriptorStorage(DataSize.of(5, KILOBYTE), jsonCodec(Split.class));
manager.initialize(QUERY_1);
manager.initialize(QUERY_2);

Expand Down

0 comments on commit db51813

Please sign in to comment.