Skip to content

Commit

Permalink
Change PlanNodeStats from a union class to hierarchy
Browse files Browse the repository at this point in the history
  • Loading branch information
raghavsethi committed Feb 5, 2019
1 parent 1630408 commit de2a775
Show file tree
Hide file tree
Showing 5 changed files with 226 additions and 80 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.facebook.presto.sql.planner.planPrinter;

import com.facebook.presto.sql.planner.plan.PlanNodeId;
import io.airlift.units.DataSize;
import io.airlift.units.Duration;

import java.util.Map;

import static com.google.common.base.Preconditions.checkArgument;
import static java.lang.Math.max;
import static java.lang.Math.sqrt;
import static java.util.Objects.requireNonNull;
import static java.util.stream.Collectors.toMap;

public class HashCollisionPlanNodeStats
extends PlanNodeStats
{
private final Map<String, OperatorHashCollisionsStats> operatorHashCollisionsStats;

public HashCollisionPlanNodeStats(
PlanNodeId planNodeId,
Duration planNodeScheduledTime,
Duration planNodeCpuTime,
long planNodeInputPositions,
DataSize planNodeInputDataSize,
long planNodeOutputPositions,
DataSize planNodeOutputDataSize,
Map<String, OperatorInputStats> operatorInputStats,
Map<String, OperatorHashCollisionsStats> operatorHashCollisionsStats)
{
super(planNodeId, planNodeScheduledTime, planNodeCpuTime, planNodeInputPositions, planNodeInputDataSize, planNodeOutputPositions, planNodeOutputDataSize, operatorInputStats);
this.operatorHashCollisionsStats = requireNonNull(operatorHashCollisionsStats, "operatorHashCollisionsStats is null");
}

public Map<String, Double> getOperatorHashCollisionsAverages()
{
return operatorHashCollisionsStats.entrySet().stream()
.collect(toMap(
Map.Entry::getKey,
entry -> entry.getValue().getWeightedHashCollisions() / operatorInputStats.get(entry.getKey()).getInputPositions()));
}

public Map<String, Double> getOperatorHashCollisionsStdDevs()
{
return operatorHashCollisionsStats.entrySet().stream()
.collect(toMap(
Map.Entry::getKey,
entry -> computedWeightedStdDev(
entry.getValue().getWeightedSumSquaredHashCollisions(),
entry.getValue().getWeightedHashCollisions(),
operatorInputStats.get(entry.getKey()).getInputPositions())));
}

private static double computedWeightedStdDev(double sumSquared, double sum, double totalWeight)
{
double average = sum / totalWeight;
double variance = (sumSquared - 2 * sum * average) / totalWeight + average * average;
// variance might be negative because of numeric inaccuracy, therefore we need to use max
return sqrt(max(variance, 0d));
}

public Map<String, Double> getOperatorExpectedCollisionsAverages()
{
return operatorHashCollisionsStats.entrySet().stream()
.collect(toMap(
Map.Entry::getKey,
entry -> entry.getValue().getWeightedExpectedHashCollisions() / operatorInputStats.get(entry.getKey()).getInputPositions()));
}

@Override
public PlanNodeStats mergeWith(PlanNodeStats other)
{
checkArgument(other instanceof HashCollisionPlanNodeStats, "other is not an instanceof HashCollisionPlanNodeStats");
PlanNodeStats merged = super.mergeWith(other);

return new HashCollisionPlanNodeStats(
merged.getPlanNodeId(),
merged.getPlanNodeScheduledTime(),
merged.getPlanNodeCpuTime(),
merged.getPlanNodeInputPositions(),
merged.getPlanNodeInputDataSize(),
merged.getPlanNodeOutputPositions(),
merged.getPlanNodeOutputDataSize(),
merged.operatorInputStats,
operatorHashCollisionsStats);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
import io.airlift.units.Duration;

import java.util.Map;
import java.util.Optional;
import java.util.Set;

import static com.facebook.presto.util.MoreMaps.mergeMaps;
Expand All @@ -31,8 +30,6 @@
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static java.util.stream.Collectors.toMap;

// TODO: break into operator-specific stats classes instead of having a big union-class aggregating all stats together
@Deprecated
public class PlanNodeStats
implements Mergeable<PlanNodeStats>
{
Expand All @@ -45,9 +42,7 @@ public class PlanNodeStats
private final long planNodeOutputPositions;
private final DataSize planNodeOutputDataSize;

private final Map<String, OperatorInputStats> operatorInputStats;
private final Map<String, OperatorHashCollisionsStats> operatorHashCollisionsStats;
private final Optional<WindowOperatorStats> windowOperatorStats;
protected final Map<String, OperatorInputStats> operatorInputStats;

PlanNodeStats(
PlanNodeId planNodeId,
Expand All @@ -57,9 +52,7 @@ public class PlanNodeStats
DataSize planNodeInputDataSize,
long planNodeOutputPositions,
DataSize planNodeOutputDataSize,
Map<String, OperatorInputStats> operatorInputStats,
Map<String, OperatorHashCollisionsStats> operatorHashCollisionsStats,
Optional<WindowOperatorStats> windowOperatorStats)
Map<String, OperatorInputStats> operatorInputStats)
{
this.planNodeId = requireNonNull(planNodeId, "planNodeId is null");

Expand All @@ -71,8 +64,6 @@ public class PlanNodeStats
this.planNodeOutputDataSize = planNodeOutputDataSize;

this.operatorInputStats = requireNonNull(operatorInputStats, "operatorInputStats is null");
this.operatorHashCollisionsStats = requireNonNull(operatorHashCollisionsStats, "operatorHashCollisionsStats is null");
this.windowOperatorStats = requireNonNull(windowOperatorStats, "windowOperatorStats is null");
}

private static double computedStdDev(double sumSquared, double sum, long n)
Expand Down Expand Up @@ -142,46 +133,6 @@ public Map<String, Double> getOperatorInputPositionsStdDevs()
entry.getValue().getTotalDrivers())));
}

public Map<String, Double> getOperatorHashCollisionsAverages()
{
return operatorHashCollisionsStats.entrySet().stream()
.collect(toMap(
Map.Entry::getKey,
entry -> entry.getValue().getWeightedHashCollisions() / operatorInputStats.get(entry.getKey()).getInputPositions()));
}

public Map<String, Double> getOperatorHashCollisionsStdDevs()
{
return operatorHashCollisionsStats.entrySet().stream()
.collect(toMap(
Map.Entry::getKey,
entry -> computedWeightedStdDev(
entry.getValue().getWeightedSumSquaredHashCollisions(),
entry.getValue().getWeightedHashCollisions(),
operatorInputStats.get(entry.getKey()).getInputPositions())));
}

private static double computedWeightedStdDev(double sumSquared, double sum, double totalWeight)
{
double average = sum / totalWeight;
double variance = (sumSquared - 2 * sum * average) / totalWeight + average * average;
// variance might be negative because of numeric inaccuracy, therefore we need to use max
return sqrt(max(variance, 0d));
}

public Map<String, Double> getOperatorExpectedCollisionsAverages()
{
return operatorHashCollisionsStats.entrySet().stream()
.collect(toMap(
Map.Entry::getKey,
entry -> entry.getValue().getWeightedExpectedHashCollisions() / operatorInputStats.get(entry.getKey()).getInputPositions()));
}

public Optional<WindowOperatorStats> getWindowOperatorStats()
{
return windowOperatorStats;
}

@Override
public PlanNodeStats mergeWith(PlanNodeStats other)
{
Expand All @@ -193,17 +144,13 @@ public PlanNodeStats mergeWith(PlanNodeStats other)
DataSize planNodeOutputDataSize = succinctBytes(this.planNodeOutputDataSize.toBytes() + other.planNodeOutputDataSize.toBytes());

Map<String, OperatorInputStats> operatorInputStats = mergeMaps(this.operatorInputStats, other.operatorInputStats, OperatorInputStats::merge);
Map<String, OperatorHashCollisionsStats> operatorHashCollisionsStats = mergeMaps(this.operatorHashCollisionsStats, other.operatorHashCollisionsStats, OperatorHashCollisionsStats::merge);
Optional<WindowOperatorStats> windowNodeStats = Mergeable.merge(this.windowOperatorStats, other.windowOperatorStats);

return new PlanNodeStats(
planNodeId,
new Duration(planNodeScheduledTime.toMillis() + other.getPlanNodeScheduledTime().toMillis(), MILLISECONDS),
new Duration(planNodeCpuTime.toMillis() + other.getPlanNodeCpuTime().toMillis(), MILLISECONDS),
planNodeInputPositions, planNodeInputDataSize,
planNodeOutputPositions, planNodeOutputDataSize,
operatorInputStats,
operatorHashCollisionsStats,
windowNodeStats);
operatorInputStats);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,15 +29,13 @@
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;

import static com.facebook.presto.util.MoreMaps.mergeMaps;
import static com.google.common.collect.Iterables.getLast;
import static com.google.common.collect.Lists.reverse;
import static io.airlift.units.DataSize.Unit.BYTE;
import static io.airlift.units.DataSize.succinctDataSize;
import static java.util.Collections.emptyMap;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static java.util.stream.Collectors.toList;

Expand Down Expand Up @@ -164,22 +162,52 @@ private static List<PlanNodeStats> getPlanNodeStats(TaskStats taskStats)
if (!planNodeInputPositions.containsKey(planNodeId)) {
continue;
}
stats.add(new PlanNodeStats(
planNodeId,
new Duration(planNodeScheduledMillis.get(planNodeId), MILLISECONDS),
new Duration(planNodeCpuMillis.get(planNodeId), MILLISECONDS),
planNodeInputPositions.get(planNodeId),
succinctDataSize(planNodeInputBytes.get(planNodeId), BYTE),
// It's possible there will be no output stats because all the pipelines that we observed were non-output.
// For example in a query like SELECT * FROM a JOIN b ON c = d LIMIT 1
// It's possible to observe stats after the build starts, but before the probe does
// and therefore only have scheduled time, but no output stats
planNodeOutputPositions.getOrDefault(planNodeId, 0L),
succinctDataSize(planNodeOutputBytes.getOrDefault(planNodeId, 0L), BYTE),
operatorInputStats.get(planNodeId),
// Only some operators emit hash collisions statistics
operatorHashCollisionsStats.getOrDefault(planNodeId, emptyMap()),
Optional.ofNullable(windowNodeStats.get(planNodeId))));

PlanNodeStats nodeStats;

// It's possible there will be no output stats because all the pipelines that we observed were non-output.
// For example in a query like SELECT * FROM a JOIN b ON c = d LIMIT 1
// It's possible to observe stats after the build starts, but before the probe does
// and therefore only have scheduled time, but no output stats
long outputPositions = planNodeOutputPositions.getOrDefault(planNodeId, 0L);

if (operatorHashCollisionsStats.containsKey(planNodeId)) {
nodeStats = new HashCollisionPlanNodeStats(
planNodeId,
new Duration(planNodeScheduledMillis.get(planNodeId), MILLISECONDS),
new Duration(planNodeCpuMillis.get(planNodeId), MILLISECONDS),
planNodeInputPositions.get(planNodeId),
succinctDataSize(planNodeInputBytes.get(planNodeId), BYTE),
outputPositions,
succinctDataSize(planNodeOutputBytes.getOrDefault(planNodeId, 0L), BYTE),
operatorInputStats.get(planNodeId),
operatorHashCollisionsStats.get(planNodeId));
}
else if (windowNodeStats.containsKey(planNodeId)) {
nodeStats = new WindowPlanNodeStats(
planNodeId,
new Duration(planNodeScheduledMillis.get(planNodeId), MILLISECONDS),
new Duration(planNodeCpuMillis.get(planNodeId), MILLISECONDS),
planNodeInputPositions.get(planNodeId),
succinctDataSize(planNodeInputBytes.get(planNodeId), BYTE),
outputPositions,
succinctDataSize(planNodeOutputBytes.getOrDefault(planNodeId, 0L), BYTE),
operatorInputStats.get(planNodeId),
windowNodeStats.get(planNodeId));
}
else {
nodeStats = new PlanNodeStats(
planNodeId,
new Duration(planNodeScheduledMillis.get(planNodeId), MILLISECONDS),
new Duration(planNodeCpuMillis.get(planNodeId), MILLISECONDS),
planNodeInputPositions.get(planNodeId),
succinctDataSize(planNodeInputBytes.get(planNodeId), BYTE),
outputPositions,
succinctDataSize(planNodeOutputBytes.getOrDefault(planNodeId, 0L), BYTE),
operatorInputStats.get(planNodeId));
}

stats.add(nodeStats);
}
return stats;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,7 @@
import static java.lang.Double.isNaN;
import static java.lang.String.format;
import static java.util.Arrays.stream;
import static java.util.Collections.emptyMap;
import static java.util.Objects.requireNonNull;
import static java.util.stream.Collectors.joining;
import static java.util.stream.Collectors.toList;
Expand Down Expand Up @@ -1165,9 +1166,8 @@ private void addStats(NodeRepresentation output, PlanNode node)

printDistributions(output, nodeStats);

if (nodeStats.getWindowOperatorStats().isPresent()) {
// TODO: Once PlanNodeStats becomes broken into smaller classes, we should rely on toString() method of WindowOperatorStats here
printWindowOperatorStats(output, nodeStats.getWindowOperatorStats().get());
if (nodeStats instanceof WindowPlanNodeStats) {
printWindowOperatorStats(output, ((WindowPlanNodeStats) nodeStats).getWindowOperatorStats());
}
}

Expand Down Expand Up @@ -1252,9 +1252,14 @@ private void printDistributions(NodeRepresentation output, PlanNodeStats stats)
Map<String, Double> inputAverages = stats.getOperatorInputPositionsAverages();
Map<String, Double> inputStdDevs = stats.getOperatorInputPositionsStdDevs();

Map<String, Double> hashCollisionsAverages = stats.getOperatorHashCollisionsAverages();
Map<String, Double> hashCollisionsStdDevs = stats.getOperatorHashCollisionsStdDevs();
Map<String, Double> expectedHashCollisionsAverages = stats.getOperatorExpectedCollisionsAverages();
Map<String, Double> hashCollisionsAverages = emptyMap();
Map<String, Double> hashCollisionsStdDevs = emptyMap();
Map<String, Double> expectedHashCollisionsAverages = emptyMap();
if (stats instanceof HashCollisionPlanNodeStats) {
hashCollisionsAverages = ((HashCollisionPlanNodeStats) stats).getOperatorHashCollisionsAverages();
hashCollisionsStdDevs = ((HashCollisionPlanNodeStats) stats).getOperatorHashCollisionsStdDevs();
expectedHashCollisionsAverages = ((HashCollisionPlanNodeStats) stats).getOperatorExpectedCollisionsAverages();
}

Map<String, String> translatedOperatorTypes = translateOperatorTypes(stats.getOperatorTypes());

Expand Down
Loading

0 comments on commit de2a775

Please sign in to comment.