Skip to content

Commit

Permalink
[FLINK-25906][streaming] check explicit env allowed on StreamExecutio…
Browse files Browse the repository at this point in the history
…nEnvironment own.
  • Loading branch information
JingGe authored and fapaul committed Feb 15, 2022
1 parent 52a4018 commit daff5df
Show file tree
Hide file tree
Showing 4 changed files with 22 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ def excluded_methods(cls):
'socketTextStream', 'initializeContextEnvironment', 'readTextFile',
'setNumberOfExecutionRetries', 'executeAsync', 'registerJobListener',
'clearJobListeners', 'getJobListeners', 'fromSequence', 'getConfiguration',
'generateStreamGraph', 'getTransformations'}
'generateStreamGraph', 'getTransformations', 'areExplicitEnvironmentsAllowed'}


if __name__ == '__main__':
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import org.apache.flink.annotation.Public;
import org.apache.flink.api.common.InvalidProgramException;
import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.DeploymentOptions;
import org.apache.flink.streaming.api.graph.StreamGraph;
Expand Down Expand Up @@ -53,7 +52,7 @@ public LocalStreamEnvironment(@Nonnull Configuration configuration) {
}

private static Configuration validateAndGetConfiguration(final Configuration configuration) {
if (!ExecutionEnvironment.areExplicitEnvironmentsAllowed()) {
if (!areExplicitEnvironmentsAllowed()) {
throw new InvalidProgramException(
"The LocalStreamEnvironment cannot be used when submitting a program through a client, "
+ "or running in a TestEnvironment context.");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2480,6 +2480,19 @@ public void registerCachedFile(String filePath, String name, boolean executable)
name, new DistributedCache.DistributedCacheEntry(filePath, executable)));
}

/**
* Checks whether it is currently permitted to explicitly instantiate a LocalEnvironment or a
* RemoteEnvironment.
*
* @return True, if it is possible to explicitly instantiate a LocalEnvironment or a
* RemoteEnvironment, false otherwise.
*/
@Internal
public static boolean areExplicitEnvironmentsAllowed() {
return contextEnvironmentFactory == null
&& threadLocalContextEnvironmentFactory.get() == null;
}

// Private helpers.
@SuppressWarnings("unchecked")
private <OUT, T extends TypeInformation<OUT>> T getTypeInfo(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,8 @@ class StreamingScalaAPICompletenessTest extends ScalaAPICompletenessTestBase {
"org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.isForceCheckpointing",
"org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.generateStreamGraph",
"org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getTransformations",
"org.apache.flink.streaming.api.environment.StreamExecutionEnvironment" +
".areExplicitEnvironmentsAllowed",


// TypeHints are only needed for Java API, Scala API doesn't need them
Expand Down Expand Up @@ -113,8 +115,8 @@ class StreamingScalaAPICompletenessTest extends ScalaAPICompletenessTestBase {

checkMethods(
"ConnectedStreams", "ConnectedStreams",
classOf[org.apache.flink.streaming.api.datastream.ConnectedStreams[_,_]],
classOf[ConnectedStreams[_,_]])
classOf[org.apache.flink.streaming.api.datastream.ConnectedStreams[_, _]],
classOf[ConnectedStreams[_, _]])

checkMethods(
"WindowedStream", "WindowedStream",
Expand All @@ -133,12 +135,12 @@ class StreamingScalaAPICompletenessTest extends ScalaAPICompletenessTestBase {

checkMethods(
"JoinedStreams.WithWindow", "JoinedStreams.WithWindow",
classOf[org.apache.flink.streaming.api.datastream.JoinedStreams.WithWindow[_,_,_,_]],
classOf[JoinedStreams[_,_]#Where[_]#EqualTo#WithWindow[_]])
classOf[org.apache.flink.streaming.api.datastream.JoinedStreams.WithWindow[_, _, _, _]],
classOf[JoinedStreams[_, _]#Where[_]#EqualTo#WithWindow[_]])

checkMethods(
"CoGroupedStreams.WithWindow", "CoGroupedStreams.WithWindow",
classOf[org.apache.flink.streaming.api.datastream.CoGroupedStreams.WithWindow[_,_,_,_]],
classOf[org.apache.flink.streaming.api.datastream.CoGroupedStreams.WithWindow[_, _, _, _]],
classOf[CoGroupedStreams[_, _]#Where[_]#EqualTo#WithWindow[_]])
}
}

0 comments on commit daff5df

Please sign in to comment.