Skip to content

Commit

Permalink
[FLINK-23452][streaming] Integration ThroughputCalculator in StreamTa…
Browse files Browse the repository at this point in the history
…sk for the calculation of the subtask level throughput
  • Loading branch information
akalash authored and pnowojski committed Jul 29, 2021
1 parent 29d7bcd commit 230f659
Show file tree
Hide file tree
Showing 14 changed files with 233 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -53,10 +53,12 @@
import org.apache.flink.runtime.state.TaskStateManager;
import org.apache.flink.runtime.taskexecutor.GlobalAggregateManager;
import org.apache.flink.runtime.taskmanager.TaskManagerRuntimeInfo;
import org.apache.flink.runtime.throughput.ThroughputCalculator;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.SerializedValue;
import org.apache.flink.util.UserCodeClassLoader;
import org.apache.flink.util.clock.SystemClock;

import java.util.Collections;
import java.util.Map;
Expand Down Expand Up @@ -285,6 +287,13 @@ public TaskEventDispatcher getTaskEventDispatcher() {
throw new UnsupportedOperationException(ERROR_MSG);
}

@Override
public ThroughputCalculator getThroughputMeter() {
// The throughput calculator doesn't make sense for savepoint but the not null value is
// preferable when StreamTask is instantiated.
return new ThroughputCalculator(SystemClock.getInstance(), 10);
}

/** {@link SavepointEnvironment} builder. */
public static class Builder {
private RuntimeContext ctx;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
import org.apache.flink.runtime.state.internal.InternalKvState;
import org.apache.flink.runtime.taskexecutor.GlobalAggregateManager;
import org.apache.flink.runtime.taskmanager.TaskManagerRuntimeInfo;
import org.apache.flink.runtime.throughput.ThroughputCalculator;
import org.apache.flink.util.UserCodeClassLoader;

import java.util.Map;
Expand Down Expand Up @@ -235,6 +236,13 @@ void acknowledgeCheckpoint(

TaskEventDispatcher getTaskEventDispatcher();

/**
* Returns the throughput meter for calculation the throughput for certain period.
*
* @return the throughput calculation service.
*/
ThroughputCalculator getThroughputMeter();

// --------------------------------------------------------------------------------------------
// Fields set in the StreamTask to provide access to mailbox and other runtime resources
// --------------------------------------------------------------------------------------------
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.flink.runtime.io.network.partition.consumer.InputChannel;
import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
import org.apache.flink.runtime.metrics.groups.TaskIOMetricGroup;
import org.apache.flink.runtime.throughput.ThroughputCalculator;

import java.io.IOException;
import java.util.List;
Expand All @@ -45,9 +46,15 @@ public class InputGateWithMetrics extends IndexedInputGate {

private final Counter numBytesIn;

public InputGateWithMetrics(IndexedInputGate inputGate, Counter numBytesIn) {
private final ThroughputCalculator throughputCalculator;

public InputGateWithMetrics(
IndexedInputGate inputGate,
Counter numBytesIn,
ThroughputCalculator throughputCalculator) {
this.inputGate = checkNotNull(inputGate);
this.numBytesIn = checkNotNull(numBytesIn);
this.throughputCalculator = throughputCalculator;
}

@Override
Expand Down Expand Up @@ -141,7 +148,11 @@ public void finishReadRecoveredState() throws IOException {
}

private BufferOrEvent updateMetrics(BufferOrEvent bufferOrEvent) {
numBytesIn.inc(bufferOrEvent.getSize());
int incomingDataSize = bufferOrEvent.getSize();

numBytesIn.inc(incomingDataSize);
throughputCalculator.incomingDataSize(incomingDataSize);

return bufferOrEvent;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import org.apache.flink.runtime.query.TaskKvStateRegistry;
import org.apache.flink.runtime.state.TaskStateManager;
import org.apache.flink.runtime.taskexecutor.GlobalAggregateManager;
import org.apache.flink.runtime.throughput.ThroughputCalculator;
import org.apache.flink.util.UserCodeClassLoader;

import javax.annotation.Nullable;
Expand Down Expand Up @@ -100,6 +101,8 @@ public class RuntimeEnvironment implements Environment {

@Nullable private ExecutorService asyncOperationsThreadPool;

private final ThroughputCalculator throughputCalculator;

// ------------------------------------------------------------------------

public RuntimeEnvironment(
Expand Down Expand Up @@ -128,7 +131,8 @@ public RuntimeEnvironment(
TaskManagerRuntimeInfo taskManagerInfo,
TaskMetricGroup metrics,
Task containingTask,
ExternalResourceInfoProvider externalResourceInfoProvider) {
ExternalResourceInfoProvider externalResourceInfoProvider,
ThroughputCalculator throughputCalculator) {

this.jobId = checkNotNull(jobId);
this.jobVertexId = checkNotNull(jobVertexId);
Expand Down Expand Up @@ -156,6 +160,7 @@ public RuntimeEnvironment(
this.containingTask = containingTask;
this.metrics = metrics;
this.externalResourceInfoProvider = checkNotNull(externalResourceInfoProvider);
this.throughputCalculator = throughputCalculator;
}

// ------------------------------------------------------------------------
Expand Down Expand Up @@ -338,4 +343,9 @@ public ExecutorService getAsyncOperationsThreadPool() {
asyncOperationsThreadPool,
"asyncOperationsThreadPool has not been initialized yet!");
}

@Override
public ThroughputCalculator getThroughputMeter() {
return throughputCalculator;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@
import org.apache.flink.runtime.taskexecutor.KvStateService;
import org.apache.flink.runtime.taskexecutor.PartitionProducerStateChecker;
import org.apache.flink.runtime.taskexecutor.slot.TaskSlotPayload;
import org.apache.flink.runtime.throughput.ThroughputCalculator;
import org.apache.flink.types.Either;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FatalExitExceptionHandler;
Expand All @@ -85,6 +86,7 @@
import org.apache.flink.util.TaskManagerExceptionUtils;
import org.apache.flink.util.UserCodeClassLoader;
import org.apache.flink.util.WrappingRuntimeException;
import org.apache.flink.util.clock.SystemClock;
import org.apache.flink.util.concurrent.FutureUtils;

import org.slf4j.Logger;
Expand Down Expand Up @@ -112,6 +114,7 @@
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import java.util.function.Consumer;

import static org.apache.flink.configuration.TaskManagerOptions.AUTOMATIC_BUFFER_ADJUSTMENT_SAMPLES;
import static org.apache.flink.util.Preconditions.checkArgument;
import static org.apache.flink.util.Preconditions.checkNotNull;
import static org.apache.flink.util.Preconditions.checkState;
Expand Down Expand Up @@ -294,6 +297,9 @@ public class Task
*/
private UserCodeClassLoader userCodeClassLoader;

/** The only one throughput meter per subtask. */
private ThroughputCalculator throughputCalculator;

/**
* <b>IMPORTANT:</b> This constructor may not start any work that would need to be undone in the
* case of a failing task deployment.
Expand Down Expand Up @@ -417,11 +423,17 @@ public Task(
.toArray(new IndexedInputGate[0]);

this.inputGates = new IndexedInputGate[gates.length];
this.throughputCalculator =
new ThroughputCalculator(
SystemClock.getInstance(),
taskConfiguration.get(AUTOMATIC_BUFFER_ADJUSTMENT_SAMPLES));
int counter = 0;
for (IndexedInputGate gate : gates) {
inputGates[counter++] =
new InputGateWithMetrics(
gate, metrics.getIOMetricGroup().getNumBytesInCounter());
gate,
metrics.getIOMetricGroup().getNumBytesInCounter(),
throughputCalculator);
}

if (shuffleEnvironment instanceof NettyShuffleEnvironment) {
Expand Down Expand Up @@ -714,7 +726,8 @@ private void doRun() {
taskManagerConfig,
metrics,
this,
externalResourceInfoProvider);
externalResourceInfoProvider,
throughputCalculator);

// Make sure the user code classloader is accessible thread-locally.
// We are setting the correct context class loader before instantiating the invokable
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,9 +49,11 @@
import org.apache.flink.runtime.taskexecutor.TestGlobalAggregateManager;
import org.apache.flink.runtime.taskmanager.NoOpTaskOperatorEventGateway;
import org.apache.flink.runtime.taskmanager.TaskManagerRuntimeInfo;
import org.apache.flink.runtime.throughput.ThroughputCalculator;
import org.apache.flink.runtime.util.TestingTaskManagerRuntimeInfo;
import org.apache.flink.runtime.util.TestingUserCodeClassLoader;
import org.apache.flink.util.UserCodeClassLoader;
import org.apache.flink.util.clock.SystemClock;

import java.util.Collections;
import java.util.Map;
Expand Down Expand Up @@ -249,6 +251,11 @@ public TaskEventDispatcher getTaskEventDispatcher() {
throw new UnsupportedOperationException();
}

@Override
public ThroughputCalculator getThroughputMeter() {
return new ThroughputCalculator(SystemClock.getInstance(), 10);
}

public void setTaskStateManager(TaskStateManager taskStateManager) {
this.taskStateManager = taskStateManager;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
import org.apache.flink.runtime.taskexecutor.GlobalAggregateManager;
import org.apache.flink.runtime.taskmanager.NoOpTaskOperatorEventGateway;
import org.apache.flink.runtime.taskmanager.TaskManagerRuntimeInfo;
import org.apache.flink.runtime.throughput.ThroughputCalculator;
import org.apache.flink.types.Record;
import org.apache.flink.util.MutableObjectIterator;
import org.apache.flink.util.Preconditions;
Expand Down Expand Up @@ -130,6 +131,8 @@ public class MockEnvironment implements Environment, AutoCloseable {

private ExecutorService asyncOperationsThreadPool;

private final ThroughputCalculator throughputCalculator;

public static MockEnvironmentBuilder builder() {
return new MockEnvironmentBuilder();
}
Expand All @@ -152,10 +155,12 @@ protected MockEnvironment(
TaskMetricGroup taskMetricGroup,
TaskManagerRuntimeInfo taskManagerRuntimeInfo,
MemoryManager memManager,
ExternalResourceInfoProvider externalResourceInfoProvider) {
ExternalResourceInfoProvider externalResourceInfoProvider,
ThroughputCalculator throughputCalculator) {

this.jobID = jobID;
this.jobVertexID = jobVertexID;
this.throughputCalculator = throughputCalculator;

this.taskInfo = new TaskInfo(taskName, maxParallelism, subtaskIndex, parallelism, 0);
this.jobConfiguration = new Configuration();
Expand Down Expand Up @@ -312,6 +317,11 @@ public TaskEventDispatcher getTaskEventDispatcher() {
return taskEventDispatcher;
}

@Override
public ThroughputCalculator getThroughputMeter() {
return throughputCalculator;
}

@Override
public JobVertexID getJobVertexId() {
return jobVertexID;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,11 @@
import org.apache.flink.runtime.taskexecutor.GlobalAggregateManager;
import org.apache.flink.runtime.taskexecutor.TestGlobalAggregateManager;
import org.apache.flink.runtime.taskmanager.TaskManagerRuntimeInfo;
import org.apache.flink.runtime.throughput.ThroughputCalculator;
import org.apache.flink.runtime.util.TestingTaskManagerRuntimeInfo;
import org.apache.flink.runtime.util.TestingUserCodeClassLoader;
import org.apache.flink.util.UserCodeClassLoader;
import org.apache.flink.util.clock.SystemClock;

public class MockEnvironmentBuilder {
private String taskName = "mock-task";
Expand All @@ -61,6 +63,8 @@ public class MockEnvironmentBuilder {
buildMemoryManager(1024 * MemoryManager.DEFAULT_PAGE_SIZE);
private ExternalResourceInfoProvider externalResourceInfoProvider =
ExternalResourceInfoProvider.NO_EXTERNAL_RESOURCES;
private ThroughputCalculator throughputCalculator =
new ThroughputCalculator(SystemClock.getInstance(), 10);

private MemoryManager buildMemoryManager(long memorySize) {
return MemoryManagerBuilder.newBuilder().setMemorySize(memorySize).build();
Expand Down Expand Up @@ -159,6 +163,11 @@ public MockEnvironmentBuilder setExternalResourceInfoProvider(
return this;
}

public MockEnvironmentBuilder setThroughputMeter(ThroughputCalculator throughputCalculator) {
this.throughputCalculator = throughputCalculator;
return this;
}

public MockEnvironment build() {
if (ioManager == null) {
ioManager = new IOManagerAsync();
Expand All @@ -181,6 +190,7 @@ public MockEnvironment build() {
taskMetricGroup,
taskManagerRuntimeInfo,
memoryManager,
externalResourceInfoProvider);
externalResourceInfoProvider,
throughputCalculator);
}
}
Loading

0 comments on commit 230f659

Please sign in to comment.