Skip to content

Commit

Permalink
[FLINK-31286][python] Make sure Python processes are cleaned up when …
Browse files Browse the repository at this point in the history
…TaskManager crashes

This closes apache#22065.
  • Loading branch information
dianfu committed Mar 2, 2023
1 parent 95dd542 commit ab4e85e
Show file tree
Hide file tree
Showing 2 changed files with 14 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@

/** The base interface of runner which is responsible for the execution of Python functions. */
@Internal
public interface PythonFunctionRunner {
public interface PythonFunctionRunner extends AutoCloseable {

/**
* Prepares the Python function runner, such as preparing the Python execution environment, etc.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import org.apache.flink.streaming.api.operators.python.process.timer.TimerRegistration;
import org.apache.flink.streaming.api.runners.python.beam.state.BeamStateRequestHandler;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.ShutdownHookUtil;
import org.apache.flink.util.function.LongFunctionWithException;

import org.apache.beam.model.fnexecution.v1.BeamFnApi;
Expand Down Expand Up @@ -186,6 +187,8 @@ public abstract class BeamPythonFunctionRunner implements PythonFunctionRunner {
/** The shared resource among Python operators of the same slot. */
private transient OpaqueMemoryResource<PythonSharedResources> sharedResources;

private transient Thread shutdownHook;

public BeamPythonFunctionRunner(
String taskName,
ProcessPythonEnvironmentManager environmentManager,
Expand Down Expand Up @@ -287,6 +290,10 @@ public void open(ReadableConfig config) throws Exception {
jobBundleFactory, createPythonExecutionEnvironment(config, -1));
}
progressHandler = getProgressHandler(flinkMetricContainer);

shutdownHook =
ShutdownHookUtil.addShutdownHook(
this, BeamPythonFunctionRunner.class.getSimpleName(), LOG);
}

@Override
Expand All @@ -311,6 +318,12 @@ public void close() throws Exception {
} finally {
sharedResources = null;
}

if (shutdownHook != null) {
ShutdownHookUtil.removeShutdownHook(
shutdownHook, BeamPythonFunctionRunner.class.getSimpleName(), LOG);
shutdownHook = null;
}
}

@Override
Expand Down

0 comments on commit ab4e85e

Please sign in to comment.