Skip to content

Commit

Permalink
Revert "[FLINK-14813][metrics] Provide isBackPressured Task metric"
Browse files Browse the repository at this point in the history
This reverts commit 2772cd0.
  • Loading branch information
pnowojski committed Nov 28, 2019
1 parent 33f5e33 commit 9c9ac52
Show file tree
Hide file tree
Showing 10 changed files with 6 additions and 142 deletions.
26 changes: 2 additions & 24 deletions docs/monitoring/metrics.md
Original file line number Diff line number Diff line change
Expand Up @@ -1119,7 +1119,7 @@ Metrics related to data exchange between task executors using netty network comm
<td>Gauge</td>
</tr>
<tr>
<th rowspan="16">Task</th>
<th rowspan="8">Task</th>
<td rowspan="2">Shuffle.Netty.Input.Buffers</td>
<td>inputQueueLength</td>
<td>The number of queued input buffers.</td>
Expand Down Expand Up @@ -1164,6 +1164,7 @@ Metrics related to data exchange between task executors using netty network comm
<td>Gauge</td>
</tr>
<tr>
<th rowspan="8"><strong>Task</strong></th>
<td rowspan="8">Shuffle.Netty.Input</td>
<td>numBytesInLocal</td>
<td>The total number of bytes this task has read from a local source.</td>
Expand Down Expand Up @@ -1207,29 +1208,6 @@ Metrics related to data exchange between task executors using netty network comm
</tbody>
</table>

### Common shuffle service metrics

<table class="table table-bordered">
<thead>
<tr>
<th class="text-left" style="width: 18%">Scope</th>
<th class="text-left" style="width: 22%">Infix</th>
<th class="text-left" style="width: 22%">Metrics</th>
<th class="text-left" style="width: 30%">Description</th>
<th class="text-left" style="width: 8%">Type</th>
</tr>
</thead>
<tbody>
<tr>
<th rowspan="1"><strong>Task</strong></th>
<td rowspan="1">Shuffle.BackPressure</td>
<td>isBackPressured</td>
<td>whether the task is back-pressured</td>
<td>Gauge</td>
</tr>
</tbody>
</table>

### Cluster
<table class="table table-bordered">
<thead>
Expand Down
25 changes: 2 additions & 23 deletions docs/monitoring/metrics.zh.md
Original file line number Diff line number Diff line change
Expand Up @@ -1119,7 +1119,7 @@ Metrics related to data exchange between task executors using netty network comm
<td>Gauge</td>
</tr>
<tr>
<th rowspan="16">Task</th>
<th rowspan="8">Task</th>
<td rowspan="2">Shuffle.Netty.Input.Buffers</td>
<td>inputQueueLength</td>
<td>The number of queued input buffers.</td>
Expand Down Expand Up @@ -1164,6 +1164,7 @@ Metrics related to data exchange between task executors using netty network comm
<td>Gauge</td>
</tr>
<tr>
<th rowspan="8"><strong>Task</strong></th>
<td rowspan="8">Shuffle.Netty.Input</td>
<td>numBytesInLocal</td>
<td>The total number of bytes this task has read from a local source.</td>
Expand Down Expand Up @@ -1207,28 +1208,6 @@ Metrics related to data exchange between task executors using netty network comm
</tbody>
</table>

### Common shuffle service metrics
<table class="table table-bordered">
<thead>
<tr>
<th class="text-left" style="width: 18%">Scope</th>
<th class="text-left" style="width: 22%">Infix</th>
<th class="text-left" style="width: 22%">Metrics</th>
<th class="text-left" style="width: 30%">Description</th>
<th class="text-left" style="width: 8%">Type</th>
</tr>
</thead>
<tbody>
<tr>
<th rowspan="1"><strong>Task</strong></th>
<td rowspan="1">Shuffle.BackPressure</td>
<td>isBackPressured</td>
<td>whether the task is back-pressured</td>
<td>Gauge</td>
</tr>
</tbody>
</table>

### Cluster
<table class="table table-bordered">
<thead>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@
import org.apache.flink.runtime.shuffle.ShuffleEnvironment;
import org.apache.flink.runtime.shuffle.ShuffleIOOwnerContext;
import org.apache.flink.runtime.taskmanager.NettyShuffleEnvironmentConfiguration;
import org.apache.flink.runtime.taskmanager.Task;
import org.apache.flink.util.Preconditions;

import org.slf4j.Logger;
Expand All @@ -60,7 +59,6 @@

import static org.apache.flink.runtime.io.network.metrics.NettyShuffleMetricFactory.METRIC_GROUP_INPUT;
import static org.apache.flink.runtime.io.network.metrics.NettyShuffleMetricFactory.METRIC_GROUP_OUTPUT;
import static org.apache.flink.runtime.io.network.metrics.NettyShuffleMetricFactory.createShuffleBackPressureMetricGroup;
import static org.apache.flink.runtime.io.network.metrics.NettyShuffleMetricFactory.createShuffleIOOwnerMetricGroup;
import static org.apache.flink.runtime.io.network.metrics.NettyShuffleMetricFactory.registerInputMetrics;
import static org.apache.flink.runtime.io.network.metrics.NettyShuffleMetricFactory.registerOutputMetrics;
Expand Down Expand Up @@ -176,14 +174,12 @@ public ShuffleIOOwnerContext createShuffleIOOwnerContext(
ExecutionAttemptID executionAttemptID,
MetricGroup parentGroup) {
MetricGroup nettyGroup = createShuffleIOOwnerMetricGroup(checkNotNull(parentGroup));
MetricGroup backPressureGroup = createShuffleBackPressureMetricGroup(checkNotNull(parentGroup));
return new ShuffleIOOwnerContext(
checkNotNull(ownerName),
checkNotNull(executionAttemptID),
parentGroup,
nettyGroup.addGroup(METRIC_GROUP_INPUT),
nettyGroup.addGroup(METRIC_GROUP_OUTPUT),
backPressureGroup);
nettyGroup.addGroup(METRIC_GROUP_OUTPUT));
}

@Override
Expand Down Expand Up @@ -295,11 +291,6 @@ public int start() throws IOException {
}
}

@Override
public void registgerBackPressureMetric(ShuffleIOOwnerContext ownerContext, Task task) {
NettyShuffleMetricFactory.registerBackPressureMetrics(ownerContext.getBackPreessureGroup(), task);
}

/**
* Tries to shut down all network I/O components.
*/
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import org.apache.flink.runtime.io.network.partition.ResultPartition;
import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate;
import org.apache.flink.runtime.taskmanager.Task;

import java.util.Arrays;

Expand Down Expand Up @@ -57,7 +56,6 @@ public class NettyShuffleMetricFactory {
public static final String METRIC_GROUP_OUTPUT = "Output";
public static final String METRIC_GROUP_INPUT = "Input";
private static final String METRIC_GROUP_BUFFERS = "Buffers";
public static final String METRIC_GROUP_BACKPRESSURE = "BackPressure";

// task level output metrics: Shuffle.Netty.Output.*

Expand All @@ -71,9 +69,6 @@ public class NettyShuffleMetricFactory {
private static final String METRIC_INPUT_FLOATING_BUFFERS_USAGE = "inputFloatingBuffersUsage";
private static final String METRIC_INPUT_EXCLUSIVE_BUFFERS_USAGE = "inputExclusiveBuffersUsage";

// task level backpressure metric: Shuffle.BackPressure.isBackPressured;
private static final String METRIC_BACKPRESSURE_IS_BACKPRESSURED = "isBackPressured";

private NettyShuffleMetricFactory() {
}

Expand Down Expand Up @@ -103,10 +98,6 @@ public static MetricGroup createShuffleIOOwnerMetricGroup(MetricGroup parentGrou
return parentGroup.addGroup(METRIC_GROUP_SHUFFLE).addGroup(METRIC_GROUP_NETTY);
}

public static MetricGroup createShuffleBackPressureMetricGroup(MetricGroup parentGroup) {
return parentGroup.addGroup(METRIC_GROUP_SHUFFLE).addGroup(METRIC_GROUP_BACKPRESSURE);
}

/**
* Registers legacy network metric groups before shuffle service refactoring.
*
Expand Down Expand Up @@ -203,10 +194,4 @@ private static void registerInputMetrics(
buffersGroup.gauge(METRIC_INPUT_POOL_USAGE, new InputBufferPoolUsageGauge(inputGates));
}
}

public static void registerBackPressureMetrics(
MetricGroup backPressureGroup,
Task task) {
backPressureGroup.gauge(METRIC_BACKPRESSURE_IS_BACKPRESSURED, new BackPressureGauge(task));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
import org.apache.flink.runtime.io.network.partition.PartitionProducerStateProvider;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
import org.apache.flink.runtime.taskmanager.Task;

import java.io.IOException;
import java.util.Collection;
Expand Down Expand Up @@ -174,11 +173,4 @@ Collection<G> createInputGates(
boolean updatePartitionInfo(
ExecutionAttemptID consumerID,
PartitionInfo partitionInfo) throws IOException, InterruptedException;

/**
* Register metric for task is back pressured.
* @param ownerContext
* @param task
*/
void registgerBackPressureMetric(ShuffleIOOwnerContext ownerContext, Task task);
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,21 +32,18 @@ public class ShuffleIOOwnerContext {
private final MetricGroup parentGroup;
private final MetricGroup outputGroup;
private final MetricGroup inputGroup;
private final MetricGroup backPreessureGroup;

public ShuffleIOOwnerContext(
String ownerName,
ExecutionAttemptID executionAttemptID,
MetricGroup parentGroup,
MetricGroup outputGroup,
MetricGroup inputGroup,
MetricGroup backPreessureGroup) {
MetricGroup inputGroup) {
this.ownerName = checkNotNull(ownerName);
this.executionAttemptID = checkNotNull(executionAttemptID);
this.parentGroup = checkNotNull(parentGroup);
this.outputGroup = checkNotNull(outputGroup);
this.inputGroup = checkNotNull(inputGroup);
this.backPreessureGroup = checkNotNull(backPreessureGroup);
}

public String getOwnerName() {
Expand All @@ -68,8 +65,4 @@ public MetricGroup getOutputGroup() {
public MetricGroup getInputGroup() {
return inputGroup;
}

public MetricGroup getBackPreessureGroup() {
return backPreessureGroup;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -392,7 +392,6 @@ public Task(
.registerLegacyNetworkMetrics(metrics.getIOMetricGroup(), resultPartitionWriters, gates);
}

shuffleEnvironment.registgerBackPressureMetric(taskShuffleContext, this);
invokableHasBeenCanceled = new AtomicBoolean(false);

// finally, create the executing thread, but do not start it
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,6 @@
import org.apache.flink.runtime.taskexecutor.slot.TaskSlotTable;
import org.apache.flink.runtime.taskexecutor.slot.TaskSlotUtils;
import org.apache.flink.runtime.taskmanager.NoOpTaskManagerActions;
import org.apache.flink.runtime.taskmanager.Task;
import org.apache.flink.runtime.util.TestingFatalErrorHandler;
import org.apache.flink.util.SerializedValue;
import org.apache.flink.util.TestLogger;
Expand Down Expand Up @@ -568,11 +567,6 @@ public boolean updatePartitionInfo(ExecutionAttemptID consumerID, PartitionInfo
return backingShuffleEnvironment.updatePartitionInfo(consumerID, partitionInfo);
}

@Override
public void registgerBackPressureMetric(ShuffleIOOwnerContext ownerContext, Task task) {
backingShuffleEnvironment.registgerBackPressureMetric(ownerContext, task);
}

@Override
public void close() throws Exception {
backingShuffleEnvironment.close();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@
import org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders;
import org.apache.flink.runtime.execution.librarycache.LibraryCacheManager;
import org.apache.flink.runtime.io.network.NettyShuffleEnvironmentBuilder;
import org.apache.flink.runtime.io.network.metrics.BackPressureGauge;
import org.apache.flink.runtime.io.network.partition.NoOpResultPartitionConsumableNotifier;
import org.apache.flink.runtime.io.network.partition.ResultPartitionConsumableNotifier;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
Expand Down Expand Up @@ -924,13 +923,6 @@ public void testNoBackPressureIfTaskNotStarted() throws Exception {
assertFalse(task.isBackPressured());
}

@Test
public void testBackPressureMetric() throws Exception {
final Task task = createTaskBuilder().build();
BackPressureGauge backPressureGauge = new BackPressureGauge(task);
assertEquals(backPressureGauge.getValue(), task.isBackPressured());
}

// ------------------------------------------------------------------------
// customized TaskManagerActions
// ------------------------------------------------------------------------
Expand Down

0 comments on commit 9c9ac52

Please sign in to comment.