Skip to content

Commit

Permalink
[FLINK-16744][task][hotfix] Finalize StreamTask methods used during c…
Browse files Browse the repository at this point in the history
…onstruction

Motivation: prevent access to uninitialized descendant state from
StreamTask constructor which otherwise leads to NPE
  • Loading branch information
rkhachatryan authored and pnowojski committed Apr 10, 2020
1 parent 492d8fd commit e5d8f3d
Show file tree
Hide file tree
Showing 9 changed files with 54 additions and 95 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ public boolean shouldInterruptOnCancel() {
*
* @return The environment of this task.
*/
public Environment getEnvironment() {
public final Environment getEnvironment() {
return this.environment;
}

Expand All @@ -147,7 +147,7 @@ public Environment getEnvironment() {
*
* @return user code class loader of this invokable.
*/
public ClassLoader getUserCodeClassLoader() {
public final ClassLoader getUserCodeClassLoader() {
return getEnvironment().getUserClassLoader();
}

Expand All @@ -174,7 +174,7 @@ public int getIndexInSubtaskGroup() {
*
* @return the task configuration object which was attached to the original {@link org.apache.flink.runtime.jobgraph.JobVertex}
*/
public Configuration getTaskConfiguration() {
public final Configuration getTaskConfiguration() {
return this.environment.getTaskConfiguration();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,18 +35,9 @@ public DummyInvokable() {
super(new DummyEnvironment("test", 1, 0));
}

public DummyInvokable(Environment environment, @Nullable TaskStateSnapshot initialState) {
super(environment);
}

@Override
public void invoke() {}

@Override
public ClassLoader getUserCodeClassLoader() {
return getClass().getClassLoader();
}

@Override
public int getCurrentNumberOfSubtasks() {
return 1;
Expand All @@ -57,11 +48,6 @@ public int getIndexInSubtaskGroup() {
return 0;
}

@Override
public final Configuration getTaskConfiguration() {
return new Configuration();
}

@Override
public final Configuration getJobConfiguration() {
return new Configuration();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,23 +123,23 @@ public static MockEnvironmentBuilder builder() {
}

protected MockEnvironment(
JobID jobID,
JobVertexID jobVertexID,
String taskName,
long offHeapMemorySize,
MockInputSplitProvider inputSplitProvider,
int bufferSize,
Configuration taskConfiguration,
ExecutionConfig executionConfig,
IOManager ioManager,
TaskStateManager taskStateManager,
GlobalAggregateManager aggregateManager,
int maxParallelism,
int parallelism,
int subtaskIndex,
ClassLoader userCodeClassLoader,
TaskMetricGroup taskMetricGroup,
TaskManagerRuntimeInfo taskManagerRuntimeInfo) {
JobID jobID,
JobVertexID jobVertexID,
String taskName,
MockInputSplitProvider inputSplitProvider,
int bufferSize,
Configuration taskConfiguration,
ExecutionConfig executionConfig,
IOManager ioManager,
TaskStateManager taskStateManager,
GlobalAggregateManager aggregateManager,
int maxParallelism,
int parallelism,
int subtaskIndex,
ClassLoader userCodeClassLoader,
TaskMetricGroup taskMetricGroup,
TaskManagerRuntimeInfo taskManagerRuntimeInfo,
MemoryManager memManager) {

this.jobID = jobID;
this.jobVertexID = jobVertexID;
Expand All @@ -150,7 +150,7 @@ protected MockEnvironment(
this.inputs = new LinkedList<InputGate>();
this.outputs = new LinkedList<ResultPartitionWriter>();

this.memManager = MemoryManagerBuilder.newBuilder().setMemorySize(MemoryType.OFF_HEAP, offHeapMemorySize).build();
this.memManager = memManager;
this.ioManager = ioManager;
this.taskManagerRuntimeInfo = taskManagerRuntimeInfo;

Expand Down Expand Up @@ -335,7 +335,7 @@ public void acknowledgeCheckpoint(long checkpointId, CheckpointMetrics checkpoin

@Override
public void declineCheckpoint(long checkpointId, Throwable cause) {
throw new UnsupportedOperationException();
throw new UnsupportedOperationException(cause);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,12 @@
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.JobID;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.memory.MemoryType;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.memory.MemoryManager;
import org.apache.flink.runtime.memory.MemoryManagerBuilder;
import org.apache.flink.runtime.metrics.groups.TaskMetricGroup;
import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
import org.apache.flink.runtime.state.TaskStateManager;
Expand All @@ -34,13 +36,12 @@
import org.apache.flink.runtime.taskmanager.TaskManagerRuntimeInfo;
import org.apache.flink.runtime.util.TestingTaskManagerRuntimeInfo;

public class MockEnvironmentBuilder {
public class MockEnvironmentBuilder {
private String taskName = "mock-task";
private long managedMemorySize = 1024 * MemoryManager.DEFAULT_PAGE_SIZE;
private MockInputSplitProvider inputSplitProvider = null;
private int bufferSize = 16;
private TaskStateManager taskStateManager = new TestTaskStateManager();
private GlobalAggregateManager aggregateManager= new TestGlobalAggregateManager();
private GlobalAggregateManager aggregateManager = new TestGlobalAggregateManager();
private Configuration taskConfiguration = new Configuration();
private ExecutionConfig executionConfig = new ExecutionConfig();
private int maxParallelism = 1;
Expand All @@ -52,14 +53,19 @@ public class MockEnvironmentBuilder {
private TaskMetricGroup taskMetricGroup = UnregisteredMetricGroups.createUnregisteredTaskMetricGroup();
private TaskManagerRuntimeInfo taskManagerRuntimeInfo = new TestingTaskManagerRuntimeInfo();
private IOManager ioManager;
private MemoryManager memoryManager = buildMemoryManager(1024 * MemoryManager.DEFAULT_PAGE_SIZE);

private MemoryManager buildMemoryManager(long memorySize) {
return MemoryManagerBuilder.newBuilder().setMemorySize(MemoryType.OFF_HEAP, memorySize).build();
}

public MockEnvironmentBuilder setTaskName(String taskName) {
this.taskName = taskName;
return this;
}

public MockEnvironmentBuilder setManagedMemorySize(long managedMemorySize) {
this.managedMemorySize = managedMemorySize;
this.memoryManager = buildMemoryManager(managedMemorySize);
return this;
}

Expand Down Expand Up @@ -88,7 +94,7 @@ public MockEnvironmentBuilder setExecutionConfig(ExecutionConfig executionConfig
return this;
}

public MockEnvironmentBuilder setTaskManagerRuntimeInfo(TaskManagerRuntimeInfo taskManagerRuntimeInfo){
public MockEnvironmentBuilder setTaskManagerRuntimeInfo(TaskManagerRuntimeInfo taskManagerRuntimeInfo) {
this.taskManagerRuntimeInfo = taskManagerRuntimeInfo;
return this;
}
Expand Down Expand Up @@ -133,6 +139,11 @@ public MockEnvironmentBuilder setIOManager(IOManager ioManager) {
return this;
}

public MockEnvironmentBuilder setMemoryManager(MemoryManager memoryManager) {
this.memoryManager = memoryManager;
return this;
}

public MockEnvironment build() {
if (ioManager == null) {
ioManager = new IOManagerAsync();
Expand All @@ -141,7 +152,6 @@ public MockEnvironment build() {
jobID,
jobVertexID,
taskName,
managedMemorySize,
inputSplitProvider,
bufferSize,
taskConfiguration,
Expand All @@ -154,6 +164,7 @@ public MockEnvironment build() {
subtaskIndex,
userCodeClassLoader,
taskMetricGroup,
taskManagerRuntimeInfo);
taskManagerRuntimeInfo,
memoryManager);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -661,7 +661,7 @@ boolean isSerializingTimestamps() {
* Gets the name of the task, in the form "taskname (2/5)".
* @return The name of the task.
*/
public String getName() {
public final String getName() {
return getEnvironment().getTaskInfo().getTaskNameWithSubtasks();
}

Expand Down Expand Up @@ -842,7 +842,7 @@ protected void handleCheckpointException(Exception exception) {
handleException(exception);
}

public ExecutorService getAsyncOperationsThreadPool() {
public final ExecutorService getAsyncOperationsThreadPool() {
return asyncOperationsThreadPool;
}

Expand Down Expand Up @@ -1034,7 +1034,7 @@ void handleAsyncException(String message, Throwable exception) {
}
}

public CloseableRegistry getCancelables() {
public final CloseableRegistry getCancelables() {
return cancelables;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.fs.CloseableRegistry;
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.checkpoint.OperatorStateRepartitioner;
import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
Expand Down Expand Up @@ -115,8 +114,6 @@ public class AbstractStreamOperatorTestHarness<OUT> implements AutoCloseable {

protected StreamTaskStateInitializer streamTaskStateInitializer;

CloseableRegistry closableRegistry;

private final TaskMailbox taskMailbox;

// use this as default for tests
Expand Down Expand Up @@ -234,7 +231,6 @@ private AbstractStreamOperatorTestHarness(
this.config.setCheckpointingEnabled(true);
this.config.setOperatorID(operatorID);
this.executionConfig = env.getExecutionConfig();
this.closableRegistry = new CloseableRegistry();
this.checkpointLock = new Object();

this.environment = Preconditions.checkNotNull(env);
Expand All @@ -258,7 +254,6 @@ private AbstractStreamOperatorTestHarness(
.setConfig(config)
.setExecutionConfig(executionConfig)
.setStreamTaskStateInitializer(streamTaskStateInitializer)
.setClosableRegistry(closableRegistry)
.setCheckpointStorage(checkpointStorage)
.setTimerService(processingTimeService)
.setHandleAsyncException(handleAsyncException)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
package org.apache.flink.streaming.util;

import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.core.fs.CloseableRegistry;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.state.CheckpointStorageWorkerView;
import org.apache.flink.runtime.util.FatalExitExceptionHandler;
Expand All @@ -42,25 +41,21 @@
*/
public class MockStreamTask<OUT, OP extends StreamOperator<OUT>> extends StreamTask<OUT, OP> {

private final String name;
private final Object checkpointLock;
private final StreamConfig config;
private final ExecutionConfig executionConfig;
private StreamTaskStateInitializer streamTaskStateInitializer;
private final CloseableRegistry closableRegistry;
private final StreamStatusMaintainer streamStatusMaintainer;
private final CheckpointStorageWorkerView checkpointStorage;
private final ProcessingTimeService processingTimeService;
private final BiConsumer<String, Throwable> handleAsyncException;

public MockStreamTask(
Environment environment,
String name,
Object checkpointLock,
StreamConfig config,
ExecutionConfig executionConfig,
StreamTaskStateInitializer streamTaskStateInitializer,
CloseableRegistry closableRegistry,
StreamStatusMaintainer streamStatusMaintainer,
CheckpointStorageWorkerView checkpointStorage,
TimerService timerService,
Expand All @@ -70,12 +65,10 @@ public MockStreamTask(
StreamInputProcessor inputProcessor) throws Exception {

super(environment, timerService, FatalExitExceptionHandler.INSTANCE, taskActionExecutor, taskMailbox);
this.name = name;
this.checkpointLock = checkpointLock;
this.config = config;
this.executionConfig = executionConfig;
this.streamTaskStateInitializer = streamTaskStateInitializer;
this.closableRegistry = closableRegistry;
this.streamStatusMaintainer = streamStatusMaintainer;
this.checkpointStorage = checkpointStorage;
this.processingTimeService = timerService;
Expand All @@ -92,11 +85,6 @@ protected void cleanup() {
mailboxProcessor.allActionsCompleted();
}

@Override
public String getName() {
return name;
}

/**
* Checkpoint lock in {@link StreamTask} is replaced by {@link StreamTaskActionExecutor}.
* <code>getCheckpointLock</code> method was moved from to the {@link org.apache.flink.streaming.runtime.tasks.SourceStreamTask SourceStreamTask}.
Expand All @@ -111,11 +99,6 @@ public StreamConfig getConfiguration() {
return config;
}

@Override
public Environment getEnvironment() {
return super.getEnvironment();
}

@Override
public ExecutionConfig getExecutionConfig() {
return executionConfig;
Expand All @@ -130,11 +113,6 @@ public void setStreamTaskStateInitializer(StreamTaskStateInitializer streamTaskS
this.streamTaskStateInitializer = streamTaskStateInitializer;
}

@Override
public CloseableRegistry getCancelables() {
return closableRegistry;
}

@Override
public StreamStatusMaintainer getStreamStatusMaintainer() {
return streamStatusMaintainer;
Expand Down
Loading

0 comments on commit e5d8f3d

Please sign in to comment.