Skip to content

Commit

Permalink
Expose more stats for TaskDescriptorStorage
Browse files Browse the repository at this point in the history
  • Loading branch information
losipiuk committed Oct 16, 2023
1 parent 008374a commit ae88f44
Showing 1 changed file with 186 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,13 @@
*/
package io.trino.execution.scheduler;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Suppliers;
import com.google.common.base.VerifyException;
import com.google.common.collect.HashBasedTable;
import com.google.common.collect.Multimap;
import com.google.common.collect.Table;
import com.google.common.math.Quantiles;
import com.google.common.math.Stats;
import com.google.errorprone.annotations.concurrent.GuardedBy;
import com.google.inject.Inject;
Expand All @@ -31,6 +34,7 @@
import io.trino.spi.TrinoException;
import io.trino.sql.planner.plan.PlanNodeId;
import org.weakref.jmx.Managed;
import org.weakref.jmx.Nested;

import java.util.Collection;
import java.util.Comparator;
Expand All @@ -40,12 +44,18 @@
import java.util.NoSuchElementException;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Supplier;
import java.util.stream.Stream;

import static com.google.common.base.Preconditions.checkState;
import static com.google.common.base.Verify.verify;
import static com.google.common.collect.ImmutableList.toImmutableList;
import static com.google.common.collect.ImmutableMap.toImmutableMap;
import static com.google.common.collect.ImmutableSet.toImmutableSet;
import static com.google.common.collect.ImmutableSetMultimap.toImmutableSetMultimap;
import static com.google.common.math.Quantiles.percentiles;
import static io.airlift.units.DataSize.succinctBytes;
import static io.trino.spi.StandardErrorCode.EXCEEDED_TASK_DESCRIPTOR_STORAGE_CAPACITY;
import static java.lang.String.format;
Expand All @@ -57,6 +67,7 @@ public class TaskDescriptorStorage

private final long maxMemoryInBytes;
private final JsonCodec<Split> splitJsonCodec;
private final StorageStats storageStats;

@GuardedBy("this")
private final Map<QueryId, TaskDescriptors> storages = new HashMap<>();
Expand All @@ -75,6 +86,7 @@ public TaskDescriptorStorage(DataSize maxMemory, JsonCodec<Split> splitJsonCodec
{
this.maxMemoryInBytes = maxMemory.toBytes();
this.splitJsonCodec = requireNonNull(splitJsonCodec, "splitJsonCodec is null");
this.storageStats = new StorageStats(Suppliers.memoizeWithExpiration(this::computeStats, 1, TimeUnit.SECONDS));
}

/**
Expand Down Expand Up @@ -188,26 +200,94 @@ private synchronized void updateMemoryReservation(long delta)
}
}

@Managed
public synchronized long getReservedBytes()
@VisibleForTesting
synchronized long getReservedBytes()
{
return reservedBytes;
}

@Managed
@Nested
public StorageStats getStats()
{
// This should not contain materialized values. GuiceMBeanExporter calls it only once during application startup
// and then only @Managed methods all called on that instance.
return storageStats;
}

private synchronized StorageStatsValue computeStats()
{
int queriesCount = storages.size();
long stagesCount = storages.values().stream().mapToLong(TaskDescriptors::getStagesCount).sum();

Quantiles.ScaleAndIndexes percentiles = percentiles().indexes(50, 90, 95);

long queryReservedBytesP50 = 0;
long queryReservedBytesP90 = 0;
long queryReservedBytesP95 = 0;
long queryReservedBytesAvg = 0;
long stageReservedBytesP50 = 0;
long stageReservedBytesP90 = 0;
long stageReservedBytesP95 = 0;
long stageReservedBytesAvg = 0;

if (queriesCount > 0) { // we cannot compute percentiles for empty set

Map<Integer, Double> queryReservedBytesPercentiles = percentiles.compute(
storages.values().stream()
.map(TaskDescriptors::getReservedBytes)
.collect(toImmutableList()));

queryReservedBytesP50 = queryReservedBytesPercentiles.get(50).longValue();
queryReservedBytesP90 = queryReservedBytesPercentiles.get(90).longValue();
queryReservedBytesP95 = queryReservedBytesPercentiles.get(95).longValue();
queryReservedBytesAvg = reservedBytes / queriesCount;

List<Long> storagesReservedBytes = storages.values().stream()
.flatMap(TaskDescriptors::getStagesReservedBytes)
.collect(toImmutableList());

if (!storagesReservedBytes.isEmpty()) {
Map<Integer, Double> stagesReservedBytesPercentiles = percentiles.compute(
storagesReservedBytes);
stageReservedBytesP50 = stagesReservedBytesPercentiles.get(50).longValue();
stageReservedBytesP90 = stagesReservedBytesPercentiles.get(90).longValue();
stageReservedBytesP95 = stagesReservedBytesPercentiles.get(95).longValue();
stageReservedBytesAvg = reservedBytes / stagesCount;
}
}

return new StorageStatsValue(
queriesCount,
stagesCount,
reservedBytes,
queryReservedBytesAvg,
queryReservedBytesP50,
queryReservedBytesP90,
queryReservedBytesP95,
stageReservedBytesAvg,
stageReservedBytesP50,
stageReservedBytesP90,
stageReservedBytesP95);
}

@NotThreadSafe
private class TaskDescriptors
{
private final Table<StageId, Integer /* partitionId */, TaskDescriptor> descriptors = HashBasedTable.create();

private long reservedBytes;
private final Map<StageId, AtomicLong> stagesReservedBytes = new HashMap<>();
private RuntimeException failure;

public void put(StageId stageId, int partitionId, TaskDescriptor descriptor)
{
throwIfFailed();
checkState(!descriptors.contains(stageId, partitionId), "task descriptor is already present for key %s/%s ", stageId, partitionId);
descriptors.put(stageId, partitionId, descriptor);
reservedBytes += descriptor.getRetainedSizeInBytes();
long descriptorRetainedBytes = descriptor.getRetainedSizeInBytes();
reservedBytes += descriptorRetainedBytes;
stagesReservedBytes.computeIfAbsent(stageId, ignored -> new AtomicLong()).addAndGet(descriptorRetainedBytes);
}

public TaskDescriptor get(StageId stageId, int partitionId)
Expand All @@ -227,7 +307,9 @@ public void remove(StageId stageId, int partitionId)
if (descriptor == null) {
throw new NoSuchElementException(format("descriptor not found for key %s/%s", stageId, partitionId));
}
reservedBytes -= descriptor.getRetainedSizeInBytes();
long descriptorRetainedBytes = descriptor.getRetainedSizeInBytes();
reservedBytes -= descriptorRetainedBytes;
requireNonNull(stagesReservedBytes.get(stageId), () -> format("no entry for stage %s", stageId)).addAndGet(-descriptorRetainedBytes);
}

public long getReservedBytes()
Expand Down Expand Up @@ -298,5 +380,105 @@ private void throwIfFailed()
throw failure;
}
}

public int getStagesCount()
{
return descriptors.rowMap().size();
}

public Stream<Long> getStagesReservedBytes()
{
return stagesReservedBytes.values().stream()
.map(AtomicLong::get);
}
}

private record StorageStatsValue(
long queriesCount,
long stagesCount,
long reservedBytes,
long queryReservedBytesAvg,
long queryReservedBytesP50,
long queryReservedBytesP90,
long queryReservedBytesP95,
long stageReservedBytesAvg,
long stageReservedBytesP50,
long stageReservedBytesP90,
long stageReservedBytesP95) {}

public static class StorageStats
{
private final Supplier<StorageStatsValue> statsSupplier;

StorageStats(Supplier<StorageStatsValue> statsSupplier)
{
this.statsSupplier = requireNonNull(statsSupplier, "statsSupplier is null");
}

@Managed
public long getQueriesCount()
{
return statsSupplier.get().queriesCount();
}

@Managed
public long getStagesCount()
{
return statsSupplier.get().stagesCount();
}

@Managed
public long getReservedBytes()
{
return statsSupplier.get().reservedBytes();
}

@Managed
public long getQueryReservedBytesAvg()
{
return statsSupplier.get().queryReservedBytesAvg();
}

@Managed
public long getQueryReservedBytesP50()
{
return statsSupplier.get().queryReservedBytesP50();
}

@Managed
public long getQueryReservedBytesP90()
{
return statsSupplier.get().queryReservedBytesP90();
}

@Managed
public long getQueryReservedBytesP95()
{
return statsSupplier.get().queryReservedBytesP95();
}

@Managed
public long getStageReservedBytesAvg()
{
return statsSupplier.get().stageReservedBytesP50();
}

@Managed
public long getStageReservedBytesP50()
{
return statsSupplier.get().stageReservedBytesP50();
}

@Managed
public long getStageReservedBytesP90()
{
return statsSupplier.get().stageReservedBytesP90();
}

@Managed
public long getStageReservedBytesP95()
{
return statsSupplier.get().stageReservedBytesP95();
}
}
}

0 comments on commit ae88f44

Please sign in to comment.