diff --git a/flink-python/src/main/java/org/apache/flink/python/PythonFunctionRunner.java b/flink-python/src/main/java/org/apache/flink/python/PythonFunctionRunner.java index 078778a3371a1..6ef6a70b59ed3 100644 --- a/flink-python/src/main/java/org/apache/flink/python/PythonFunctionRunner.java +++ b/flink-python/src/main/java/org/apache/flink/python/PythonFunctionRunner.java @@ -24,7 +24,7 @@ /** The base interface of runner which is responsible for the execution of Python functions. */ @Internal -public interface PythonFunctionRunner { +public interface PythonFunctionRunner extends AutoCloseable { /** * Prepares the Python function runner, such as preparing the Python execution environment, etc. diff --git a/flink-python/src/main/java/org/apache/flink/streaming/api/runners/python/beam/BeamPythonFunctionRunner.java b/flink-python/src/main/java/org/apache/flink/streaming/api/runners/python/beam/BeamPythonFunctionRunner.java index 9f1aee39f41ee..85268014f9011 100644 --- a/flink-python/src/main/java/org/apache/flink/streaming/api/runners/python/beam/BeamPythonFunctionRunner.java +++ b/flink-python/src/main/java/org/apache/flink/streaming/api/runners/python/beam/BeamPythonFunctionRunner.java @@ -38,6 +38,7 @@ import org.apache.flink.streaming.api.operators.python.process.timer.TimerRegistration; import org.apache.flink.streaming.api.runners.python.beam.state.BeamStateRequestHandler; import org.apache.flink.util.Preconditions; +import org.apache.flink.util.ShutdownHookUtil; import org.apache.flink.util.function.LongFunctionWithException; import org.apache.beam.model.fnexecution.v1.BeamFnApi; @@ -186,6 +187,8 @@ public abstract class BeamPythonFunctionRunner implements PythonFunctionRunner { /** The shared resource among Python operators of the same slot. */ private transient OpaqueMemoryResource sharedResources; + private transient Thread shutdownHook; + public BeamPythonFunctionRunner( String taskName, ProcessPythonEnvironmentManager environmentManager, @@ -287,6 +290,10 @@ public void open(ReadableConfig config) throws Exception { jobBundleFactory, createPythonExecutionEnvironment(config, -1)); } progressHandler = getProgressHandler(flinkMetricContainer); + + shutdownHook = + ShutdownHookUtil.addShutdownHook( + this, BeamPythonFunctionRunner.class.getSimpleName(), LOG); } @Override @@ -311,6 +318,12 @@ public void close() throws Exception { } finally { sharedResources = null; } + + if (shutdownHook != null) { + ShutdownHookUtil.removeShutdownHook( + shutdownHook, BeamPythonFunctionRunner.class.getSimpleName(), LOG); + shutdownHook = null; + } } @Override