Skip to content

Commit

Permalink
Add memory-time integral stat for queries
Browse files Browse the repository at this point in the history
We now track and report cumulative memory (i.e. the area under the
curve of the time x memory graph) for Presto queries. This is also
exposed in the UI as the number of GB seconds used.
  • Loading branch information
Raghav Sethi committed Jan 16, 2016
1 parent 15ce84d commit 2d62365
Show file tree
Hide file tree
Showing 10 changed files with 135 additions and 64 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -231,6 +231,7 @@ public QueryInfo getQueryInfo(StageInfo rootStage)
int runningDrivers = 0;
int completedDrivers = 0;

long cumulativeMemory = 0;
long totalMemoryReservation = 0;
long peakMemoryReservation = 0;

Expand Down Expand Up @@ -263,6 +264,7 @@ public QueryInfo getQueryInfo(StageInfo rootStage)
runningDrivers += stageStats.getRunningDrivers();
completedDrivers += stageStats.getCompletedDrivers();

cumulativeMemory += stageStats.getCumulativeMemory();
totalMemoryReservation += stageStats.getTotalMemoryReservation().toBytes();
peakMemoryReservation = getPeakMemoryInBytes();

Expand Down Expand Up @@ -312,6 +314,7 @@ public QueryInfo getQueryInfo(StageInfo rootStage)
runningDrivers,
completedDrivers,

cumulativeMemory,
new DataSize(totalMemoryReservation, BYTE).convertToMostSuccinctDataSize(),
new DataSize(peakMemoryReservation, BYTE).convertToMostSuccinctDataSize(),
new Duration(totalScheduledTime, NANOSECONDS).convertToMostSuccinctTimeUnit(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ public class QueryStats
private final int runningDrivers;
private final int completedDrivers;

private final double cumulativeMemory;
private final DataSize totalMemoryReservation;
private final DataSize peakMemoryReservation;

Expand Down Expand Up @@ -90,6 +91,7 @@ public QueryStats()
this.queuedDrivers = 0;
this.runningDrivers = 0;
this.completedDrivers = 0;
this.cumulativeMemory = 0.0;
this.totalMemoryReservation = null;
this.peakMemoryReservation = null;
this.totalScheduledTime = null;
Expand Down Expand Up @@ -129,6 +131,7 @@ public QueryStats(
@JsonProperty("runningDrivers") int runningDrivers,
@JsonProperty("completedDrivers") int completedDrivers,

@JsonProperty("cumulativeMemory") double cumulativeMemory,
@JsonProperty("totalMemoryReservation") DataSize totalMemoryReservation,
@JsonProperty("peakMemoryReservation") DataSize peakMemoryReservation,

Expand Down Expand Up @@ -176,6 +179,7 @@ public QueryStats(
checkArgument(completedDrivers >= 0, "completedDrivers is negative");
this.completedDrivers = completedDrivers;

this.cumulativeMemory = requireNonNull(cumulativeMemory, "cumulativeMemory is null");
this.totalMemoryReservation = requireNonNull(totalMemoryReservation, "totalMemoryReservation is null");
this.peakMemoryReservation = requireNonNull(peakMemoryReservation, "peakMemoryReservation is null");
this.totalScheduledTime = requireNonNull(totalScheduledTime, "totalScheduledTime is null");
Expand Down Expand Up @@ -300,6 +304,12 @@ public int getCompletedDrivers()
return completedDrivers;
}

@JsonProperty
public double getCumulativeMemory()
{
return cumulativeMemory;
}

@JsonProperty
public DataSize getTotalMemoryReservation()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,7 @@ public StageInfo getStageInfo(Supplier<Iterable<TaskInfo>> taskInfosSupplier, Su
int runningDrivers = 0;
int completedDrivers = 0;

long cumulativeMemory = 0;
long totalMemoryReservation = 0;

long totalScheduledTime = 0;
Expand Down Expand Up @@ -215,6 +216,7 @@ public StageInfo getStageInfo(Supplier<Iterable<TaskInfo>> taskInfosSupplier, Su
runningDrivers += taskStats.getRunningDrivers();
completedDrivers += taskStats.getCompletedDrivers();

cumulativeMemory += taskStats.getCumulativeMemory();
totalMemoryReservation += taskStats.getMemoryReservation().toBytes();

totalScheduledTime += taskStats.getTotalScheduledTime().roundTo(NANOSECONDS);
Expand Down Expand Up @@ -251,6 +253,7 @@ public StageInfo getStageInfo(Supplier<Iterable<TaskInfo>> taskInfosSupplier, Su
runningDrivers,
completedDrivers,

cumulativeMemory,
succinctDataSize(totalMemoryReservation, BYTE),
succinctDuration(totalScheduledTime, NANOSECONDS),
succinctDuration(totalCpuTime, NANOSECONDS),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ public class StageStats
private final int runningDrivers;
private final int completedDrivers;

private final double cumulativeMemory;
private final DataSize totalMemoryReservation;

private final Duration totalScheduledTime;
Expand Down Expand Up @@ -80,6 +81,7 @@ public StageStats()
this.queuedDrivers = 0;
this.runningDrivers = 0;
this.completedDrivers = 0;
this.cumulativeMemory = 0.0;
this.totalMemoryReservation = null;
this.totalScheduledTime = null;
this.totalCpuTime = null;
Expand Down Expand Up @@ -112,6 +114,7 @@ public StageStats(
@JsonProperty("runningDrivers") int runningDrivers,
@JsonProperty("completedDrivers") int completedDrivers,

@JsonProperty("cumulativeMemory") double cumulativeMemory,
@JsonProperty("totalMemoryReservation") DataSize totalMemoryReservation,

@JsonProperty("totalScheduledTime") Duration totalScheduledTime,
Expand Down Expand Up @@ -151,6 +154,7 @@ public StageStats(
checkArgument(completedDrivers >= 0, "completedDrivers is negative");
this.completedDrivers = completedDrivers;

this.cumulativeMemory = requireNonNull(cumulativeMemory, "cumulativeMemory is null");
this.totalMemoryReservation = requireNonNull(totalMemoryReservation, "totalMemoryReservation is null");

this.totalScheduledTime = requireNonNull(totalScheduledTime, "totalScheduledTime is null");
Expand Down Expand Up @@ -239,6 +243,12 @@ public int getCompletedDrivers()
return completedDrivers;
}

@JsonProperty
public double getCumulativeMemory()
{
return cumulativeMemory;
}

@JsonProperty
public DataSize getTotalMemoryReservation()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,14 @@
import com.facebook.presto.util.ImmutableCollectors;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import com.google.common.util.concurrent.AtomicDouble;
import com.google.common.util.concurrent.ListenableFuture;
import io.airlift.stats.CounterStat;
import io.airlift.units.DataSize;
import io.airlift.units.Duration;
import org.joda.time.DateTime;

import javax.annotation.concurrent.GuardedBy;
import javax.annotation.concurrent.ThreadSafe;

import java.util.List;
Expand All @@ -51,7 +53,6 @@ public class TaskContext
private final Session session;

private final DataSize operatorPreAllocatedMemory;

private final AtomicLong memoryReservation = new AtomicLong();
private final AtomicLong systemMemoryReservation = new AtomicLong();

Expand All @@ -69,6 +70,15 @@ public class TaskContext
private final boolean verboseStats;
private final boolean cpuTimerEnabled;

private final Object cumulativeMemoryLock = new Object();
private final AtomicDouble cumulativeMemory = new AtomicDouble(0.0);

@GuardedBy("cumulativeMemoryLock")
private long lastMemoryReservation = 0;

@GuardedBy("cumulativeMemoryLock")
private long lastTaskStatCallNanos = 0;

public TaskContext(QueryContext queryContext,
TaskStateMachine taskStateMachine,
Executor executor,
Expand All @@ -82,7 +92,6 @@ public TaskContext(QueryContext queryContext,
this.executor = requireNonNull(executor, "executor is null");
this.session = session;
this.operatorPreAllocatedMemory = requireNonNull(operatorPreAllocatedMemory, "operatorPreAllocatedMemory is null");

taskStateMachine.addStateChangeListener(new StateChangeListener<TaskState>()
{
@Override
Expand Down Expand Up @@ -325,6 +334,16 @@ public TaskStats getTaskStats()
elapsedTime = new Duration(0, NANOSECONDS);
}

synchronized (cumulativeMemoryLock) {
double sinceLastPeriodMillis = (System.nanoTime() - lastTaskStatCallNanos) / 1_000_000.0;
long currentSystemMemory = systemMemoryReservation.get();
long averageMemoryForLastPeriod = (currentSystemMemory + lastMemoryReservation) / 2;
cumulativeMemory.addAndGet(averageMemoryForLastPeriod * sinceLastPeriodMillis);

lastTaskStatCallNanos = System.nanoTime();
lastMemoryReservation = currentSystemMemory;
}

boolean fullyBlocked = pipelineStats.stream()
.filter(pipeline -> pipeline.getRunningDrivers() > 0 || pipeline.getRunningPartitionedDrivers() > 0)
.allMatch(PipelineStats::isFullyBlocked);
Expand All @@ -345,6 +364,7 @@ public TaskStats getTaskStats()
runningDrivers,
runningPartitionedDrivers,
completedDrivers,
cumulativeMemory.get(),
new DataSize(memoryReservation.get(), BYTE).convertToMostSuccinctDataSize(),
new DataSize(systemMemoryReservation.get(), BYTE).convertToMostSuccinctDataSize(),
new Duration(totalScheduledTime, NANOSECONDS).convertToMostSuccinctTimeUnit(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ public class TaskStats
private final int runningPartitionedDrivers;
private final int completedDrivers;

private final double cumulativeMemory;
private final DataSize memoryReservation;
private final DataSize systemMemoryReservation;

Expand Down Expand Up @@ -83,6 +84,7 @@ public TaskStats(DateTime createTime, DateTime endTime)
0,
0,
0,
0.0,
new DataSize(0, BYTE),
new DataSize(0, BYTE),
new Duration(0, MILLISECONDS),
Expand Down Expand Up @@ -116,6 +118,7 @@ public TaskStats(
@JsonProperty("runningPartitionedDrivers") int runningPartitionedDrivers,
@JsonProperty("completedDrivers") int completedDrivers,

@JsonProperty("cumulativeMemory") double cumulativeMemory,
@JsonProperty("memoryReservation") DataSize memoryReservation,
@JsonProperty("systemMemoryReservation") DataSize systemMemoryReservation,

Expand Down Expand Up @@ -159,6 +162,7 @@ public TaskStats(
checkArgument(completedDrivers >= 0, "completedDrivers is negative");
this.completedDrivers = completedDrivers;

this.cumulativeMemory = requireNonNull(cumulativeMemory, "cumulativeMemory is null");
this.memoryReservation = requireNonNull(memoryReservation, "memoryReservation is null");
this.systemMemoryReservation = requireNonNull(systemMemoryReservation, "systemMemoryReservation is null");

Expand Down Expand Up @@ -247,6 +251,12 @@ public int getCompletedDrivers()
return completedDrivers;
}

@JsonProperty
public double getCumulativeMemory()
{
return cumulativeMemory;
}

@JsonProperty
public DataSize getMemoryReservation()
{
Expand Down Expand Up @@ -364,6 +374,7 @@ public TaskStats summarize()
runningDrivers,
runningPartitionedDrivers,
completedDrivers,
cumulativeMemory,
memoryReservation,
systemMemoryReservation,
totalScheduledTime,
Expand Down
14 changes: 11 additions & 3 deletions presto-main/src/main/resources/webapp/query.html
Original file line number Diff line number Diff line change
Expand Up @@ -110,8 +110,8 @@ <h2>Summary</h2>
<dt>Memory Pool</dt>
<dd id="memoryPool"></dd>

<dt>Memory</dt>
<dd id="memory"></dd>
<dt>Total Memory</dt>
<dd id="totalMemory"></dd>

<dt>CPU Time</dt>
<dd id="cpuTime"></dd>
Expand All @@ -122,6 +122,9 @@ <h2>Summary</h2>
<dt>Data Size</dt>
<dd id="dataSize"></dd>

<dt>Cumulative Memory</dt>
<dd id="cumulativeMemory"></dd>

<dt>Raw</dt>
<dd><a id="rawJson"></a></dd>

Expand Down Expand Up @@ -207,10 +210,11 @@ <h2>Tasks</h2>
d3.select('#endTime').text(formatter(new Date(query.queryStats.endTime)));
}
d3.select('#memoryPool').text(query.memoryPool);
d3.select('#memory').text(query.queryStats.totalMemoryReservation);
d3.select('#totalMemory').text(query.queryStats.totalMemoryReservation);
d3.select('#cpuTime').text(query.queryStats.totalCpuTime);
d3.select('#rows').text(formatCount(query.queryStats.rawInputPositions));
d3.select('#dataSize').text(query.queryStats.rawInputDataSize);
d3.select('#cumulativeMemory').text(formatCumulativeMemory(query.queryStats.cumulativeMemory));

var button = d3.select('#killQuery')
.append('button')
Expand Down Expand Up @@ -285,6 +289,10 @@ <h2>Tasks</h2>
$('#tasks').stickyTableHeaders();
});

function formatCumulativeMemory(cumulativeMemory) {
return (cumulativeMemory / Math.pow(1000.0, 4)).toLocaleString()) + 'GB seconds'
}

function formatStackTrace(info) {
return doFormatStackTrace(info, [], "", "");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,24 +49,25 @@ public class TestQueryStats
15,
16,

new DataSize(17, BYTE),
17.0,
new DataSize(18, BYTE),
new DataSize(19, BYTE),

new Duration(19, NANOSECONDS),
new Duration(20, NANOSECONDS),
new Duration(21, NANOSECONDS),
new Duration(22, NANOSECONDS),
new Duration(23, NANOSECONDS),
false,
ImmutableSet.of(),

new DataSize(23, BYTE),
24,
new DataSize(24, BYTE),
25,

new DataSize(25, BYTE),
26,
new DataSize(26, BYTE),
27,

new DataSize(27, BYTE),
28);
new DataSize(28, BYTE),
29);

@Test
public void testJson()
Expand Down Expand Up @@ -103,21 +104,22 @@ public static void assertExpectedQueryStats(QueryStats actual)
assertEquals(actual.getRunningDrivers(), 15);
assertEquals(actual.getCompletedDrivers(), 16);

assertEquals(actual.getTotalMemoryReservation(), new DataSize(17, BYTE));
assertEquals(actual.getPeakMemoryReservation(), new DataSize(18, BYTE));
assertEquals(actual.getCumulativeMemory(), 17.0);
assertEquals(actual.getTotalMemoryReservation(), new DataSize(18, BYTE));
assertEquals(actual.getPeakMemoryReservation(), new DataSize(19, BYTE));

assertEquals(actual.getTotalScheduledTime(), new Duration(19, NANOSECONDS));
assertEquals(actual.getTotalCpuTime(), new Duration(20, NANOSECONDS));
assertEquals(actual.getTotalUserTime(), new Duration(21, NANOSECONDS));
assertEquals(actual.getTotalBlockedTime(), new Duration(22, NANOSECONDS));
assertEquals(actual.getTotalScheduledTime(), new Duration(20, NANOSECONDS));
assertEquals(actual.getTotalCpuTime(), new Duration(21, NANOSECONDS));
assertEquals(actual.getTotalUserTime(), new Duration(22, NANOSECONDS));
assertEquals(actual.getTotalBlockedTime(), new Duration(23, NANOSECONDS));

assertEquals(actual.getRawInputDataSize(), new DataSize(23, BYTE));
assertEquals(actual.getRawInputPositions(), 24);
assertEquals(actual.getRawInputDataSize(), new DataSize(24, BYTE));
assertEquals(actual.getRawInputPositions(), 25);

assertEquals(actual.getProcessedInputDataSize(), new DataSize(25, BYTE));
assertEquals(actual.getProcessedInputPositions(), 26);
assertEquals(actual.getProcessedInputDataSize(), new DataSize(26, BYTE));
assertEquals(actual.getProcessedInputPositions(), 27);

assertEquals(actual.getOutputDataSize(), new DataSize(27, BYTE));
assertEquals(actual.getOutputPositions(), 28);
assertEquals(actual.getOutputDataSize(), new DataSize(28, BYTE));
assertEquals(actual.getOutputPositions(), 29);
}
}
Loading

0 comments on commit 2d62365

Please sign in to comment.