diff --git a/core/trino-main/src/main/java/io/trino/execution/scheduler/TaskDescriptorStorage.java b/core/trino-main/src/main/java/io/trino/execution/scheduler/TaskDescriptorStorage.java index ee543f0b3946..d00036326ac7 100644 --- a/core/trino-main/src/main/java/io/trino/execution/scheduler/TaskDescriptorStorage.java +++ b/core/trino-main/src/main/java/io/trino/execution/scheduler/TaskDescriptorStorage.java @@ -18,6 +18,7 @@ import com.google.common.math.Stats; import com.google.errorprone.annotations.concurrent.GuardedBy; import com.google.inject.Inject; +import io.airlift.json.JsonCodec; import io.airlift.log.Logger; import io.airlift.units.DataSize; import io.trino.annotation.NotThreadSafe; @@ -32,6 +33,7 @@ import java.util.Collection; import java.util.Comparator; import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.NoSuchElementException; import java.util.Objects; @@ -54,6 +56,7 @@ public class TaskDescriptorStorage private static final Logger log = Logger.get(TaskDescriptorStorage.class); private final long maxMemoryInBytes; + private final JsonCodec splitJsonCodec; @GuardedBy("this") private final Map storages = new HashMap<>(); @@ -61,14 +64,17 @@ public class TaskDescriptorStorage private long reservedBytes; @Inject - public TaskDescriptorStorage(QueryManagerConfig config) + public TaskDescriptorStorage( + QueryManagerConfig config, + JsonCodec splitJsonCodec) { - this(config.getFaultTolerantExecutionTaskDescriptorStorageMaxMemory()); + this(config.getFaultTolerantExecutionTaskDescriptorStorageMaxMemory(), splitJsonCodec); } - public TaskDescriptorStorage(DataSize maxMemory) + public TaskDescriptorStorage(DataSize maxMemory, JsonCodec splitJsonCodec) { this.maxMemoryInBytes = maxMemory.toBytes(); + this.splitJsonCodec = requireNonNull(splitJsonCodec, "splitJsonCodec is null"); } /** @@ -189,7 +195,7 @@ public synchronized long getReservedBytes() } @NotThreadSafe - private static class TaskDescriptors + private class TaskDescriptors { private final Map descriptors = new HashMap<>(); private long reservedBytes; @@ -242,7 +248,14 @@ private String getDebugInfo() Map.Entry::getKey, entry -> getDebugInfo(entry.getValue()))); - return String.valueOf(debugInfoByStageId); + List biggestSplits = descriptorsByStageId.entries().stream() + .flatMap(entry -> entry.getValue().getSplits().entries().stream().map(splitEntry -> Map.entry("%s/%s".formatted(entry.getKey(), splitEntry.getKey()), splitEntry.getValue()))) + .sorted(Comparator.>comparingLong(entry -> entry.getValue().getRetainedSizeInBytes()).reversed()) + .limit(3) + .map(entry -> "{nodeId=%s, size=%s, split=%s}".formatted(entry.getKey(), entry.getValue().getRetainedSizeInBytes(), splitJsonCodec.toJson(entry.getValue()))) + .toList(); + + return "stagesInfo=%s; biggestSplits=%s".formatted(debugInfoByStageId, biggestSplits); } private String getDebugInfo(Collection taskDescriptors) diff --git a/core/trino-main/src/main/java/io/trino/server/CoordinatorModule.java b/core/trino-main/src/main/java/io/trino/server/CoordinatorModule.java index e705934c8b8b..4016c6fdf6af 100644 --- a/core/trino-main/src/main/java/io/trino/server/CoordinatorModule.java +++ b/core/trino-main/src/main/java/io/trino/server/CoordinatorModule.java @@ -87,6 +87,7 @@ import io.trino.memory.TotalReservationLowMemoryKiller; import io.trino.memory.TotalReservationOnBlockedNodesQueryLowMemoryKiller; import io.trino.memory.TotalReservationOnBlockedNodesTaskLowMemoryKiller; +import io.trino.metadata.Split; import io.trino.operator.ForScheduler; import io.trino.operator.OperatorStats; import io.trino.server.protocol.ExecutingStatementResource; @@ -307,6 +308,7 @@ protected void setup(Binder binder) binder.bind(EventDrivenTaskSourceFactory.class).in(Scopes.SINGLETON); binder.bind(TaskDescriptorStorage.class).in(Scopes.SINGLETON); + jsonCodecBinder(binder).bindJsonCodec(Split.class); newExporter(binder).export(TaskDescriptorStorage.class).withGeneratedName(); binder.bind(TaskExecutionStats.class).in(Scopes.SINGLETON); diff --git a/core/trino-main/src/test/java/io/trino/execution/scheduler/TestTaskDescriptorStorage.java b/core/trino-main/src/test/java/io/trino/execution/scheduler/TestTaskDescriptorStorage.java index 5d5018bfa6c7..93658b076537 100644 --- a/core/trino-main/src/test/java/io/trino/execution/scheduler/TestTaskDescriptorStorage.java +++ b/core/trino-main/src/test/java/io/trino/execution/scheduler/TestTaskDescriptorStorage.java @@ -30,6 +30,7 @@ import java.util.Optional; +import static io.airlift.json.JsonCodec.jsonCodec; import static io.airlift.units.DataSize.Unit.KILOBYTE; import static io.trino.operator.ExchangeOperator.REMOTE_CATALOG_HANDLE; import static io.trino.spi.StandardErrorCode.EXCEEDED_TASK_DESCRIPTOR_STORAGE_CAPACITY; @@ -51,7 +52,7 @@ public class TestTaskDescriptorStorage @Test public void testHappyPath() { - TaskDescriptorStorage manager = new TaskDescriptorStorage(DataSize.of(15, KILOBYTE)); + TaskDescriptorStorage manager = new TaskDescriptorStorage(DataSize.of(15, KILOBYTE), jsonCodec(Split.class)); manager.initialize(QUERY_1); manager.initialize(QUERY_2); @@ -101,7 +102,7 @@ public void testHappyPath() @Test public void testDestroy() { - TaskDescriptorStorage manager = new TaskDescriptorStorage(DataSize.of(5, KILOBYTE)); + TaskDescriptorStorage manager = new TaskDescriptorStorage(DataSize.of(5, KILOBYTE), jsonCodec(Split.class)); manager.initialize(QUERY_1); manager.initialize(QUERY_2); @@ -128,7 +129,7 @@ public void testDestroy() @Test public void testCapacityExceeded() { - TaskDescriptorStorage manager = new TaskDescriptorStorage(DataSize.of(5, KILOBYTE)); + TaskDescriptorStorage manager = new TaskDescriptorStorage(DataSize.of(5, KILOBYTE), jsonCodec(Split.class)); manager.initialize(QUERY_1); manager.initialize(QUERY_2);