Skip to content

Commit

Permalink
[FLINK-11952][4/4] Integrate plugin mechanism with FileSystem initial…
Browse files Browse the repository at this point in the history
…ization in process entry points

This integration currently still does not provide a proper plugin root folder because this
requires more changes, in particular FLINK-11952.

This closes apache#8038.
  • Loading branch information
StefanRRichter committed Apr 15, 2019
1 parent c7b141b commit c6307b5
Show file tree
Hide file tree
Showing 9 changed files with 34 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.configuration.RestOptions;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.plugin.PluginUtils;
import org.apache.flink.optimizer.DataStatistics;
import org.apache.flink.optimizer.Optimizer;
import org.apache.flink.optimizer.costs.DefaultCostEstimator;
Expand Down Expand Up @@ -80,6 +81,7 @@
import java.util.Date;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -127,7 +129,8 @@ public CliFrontend(
this.configuration = Preconditions.checkNotNull(configuration);
this.customCommandLines = Preconditions.checkNotNull(customCommandLines);

FileSystem.initialize(this.configuration);
//TODO provide plugin path.
FileSystem.initialize(this.configuration, PluginUtils.createPluginManagerFromRootFolder(Optional.empty()));

this.customCommandLineOptions = new Options();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.flink.configuration.AkkaOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.plugin.PluginUtils;
import org.apache.flink.mesos.runtime.clusterframework.MesosConfigKeys;
import org.apache.flink.runtime.clusterframework.BootstrapTools;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
Expand All @@ -42,6 +43,7 @@

import java.lang.reflect.UndeclaredThrowableException;
import java.util.Map;
import java.util.Optional;

/**
* The entry point for running a TaskManager in a Mesos container.
Expand Down Expand Up @@ -85,7 +87,8 @@ public static void main(String[] args) throws Exception {
final Map<String, String> envs = System.getenv();

// configure the filesystems
FileSystem.initialize(configuration);
//TODO provide plugin path.
FileSystem.initialize(configuration, PluginUtils.createPluginManagerFromRootFolder(Optional.empty()));

// tell akka to die in case of an error
configuration.setBoolean(AkkaOptions.JVM_EXIT_ON_FATAL_ERROR, true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.flink.configuration.HistoryServerOptions;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path;
import org.apache.flink.core.plugin.PluginUtils;
import org.apache.flink.runtime.history.FsJobArchivist;
import org.apache.flink.runtime.io.network.netty.SSLHandlerFactory;
import org.apache.flink.runtime.net.SSLUtils;
Expand Down Expand Up @@ -57,6 +58,7 @@
import java.time.ZonedDateTime;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
Expand Down Expand Up @@ -109,7 +111,8 @@ public static void main(String[] args) throws Exception {
LOG.info("Loading configuration from {}", configDir);
final Configuration flinkConfig = GlobalConfiguration.loadConfiguration(configDir);

FileSystem.initialize(flinkConfig);
//TODO provide plugin path.
FileSystem.initialize(flinkConfig, PluginUtils.createPluginManagerFromRootFolder(Optional.empty()));

// run the history server
SecurityUtils.install(new SecurityConfiguration(flinkConfig));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.apache.flink.configuration.RestOptions;
import org.apache.flink.configuration.WebOptions;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.plugin.PluginUtils;
import org.apache.flink.runtime.blob.BlobServer;
import org.apache.flink.runtime.clusterframework.ApplicationStatus;
import org.apache.flink.runtime.concurrent.FutureUtils;
Expand Down Expand Up @@ -75,6 +76,7 @@
import java.lang.reflect.UndeclaredThrowableException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
Expand Down Expand Up @@ -153,6 +155,7 @@ public void startCluster() throws ClusterEntrypointException {
LOG.info("Starting {}.", getClass().getSimpleName());

try {

configureFileSystems(configuration);

SecurityContext securityContext = installSecurityContext(configuration);
Expand Down Expand Up @@ -183,7 +186,8 @@ public void startCluster() throws ClusterEntrypointException {

private void configureFileSystems(Configuration configuration) {
LOG.info("Install default filesystem.");
FileSystem.initialize(configuration);
//TODO provide plugin path
FileSystem.initialize(configuration, PluginUtils.createPluginManagerFromRootFolder(Optional.empty()));
}

protected SecurityContext installSecurityContext(Configuration configuration) throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.flink.configuration.GlobalConfiguration;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.plugin.PluginUtils;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.blob.BlobCacheService;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
Expand Down Expand Up @@ -65,6 +66,7 @@
import java.net.InetAddress;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Optional;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
Expand Down Expand Up @@ -277,7 +279,8 @@ public static void main(String[] args) throws Exception {

final Configuration configuration = loadConfiguration(args);

FileSystem.initialize(configuration);
//TODO provide plugin path.
FileSystem.initialize(configuration, PluginUtils.createPluginManagerFromRootFolder(Optional.empty()));

SecurityUtils.install(new SecurityConfiguration(configuration));

Expand Down
Empty file.
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.apache.flink.configuration.GlobalConfiguration;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path;
import org.apache.flink.core.plugin.PluginUtils;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.table.api.QueryConfig;
import org.apache.flink.table.api.Table;
Expand Down Expand Up @@ -67,6 +68,7 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;

/**
* Executor that performs the Flink communication locally. The calls are blocking depending on the
Expand Down Expand Up @@ -110,7 +112,8 @@ public LocalExecutor(URL defaultEnv, List<URL> jars, List<URL> libraries) {
this.flinkConfig = GlobalConfiguration.loadConfiguration(flinkConfigDir);

// initialize default file system
FileSystem.initialize(this.flinkConfig);
//TODO provide plugin path.
FileSystem.initialize(flinkConfig, PluginUtils.createPluginManagerFromRootFolder(Optional.empty()));

// load command lines for deployment
this.commandLines = CliFrontend.loadCustomCommandLines(flinkConfig, flinkConfigDir);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.apache.flink.configuration.RestOptions;
import org.apache.flink.configuration.SecurityOptions;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.core.plugin.PluginUtils;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.clusterframework.BootstrapTools;
import org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters;
Expand Down Expand Up @@ -97,6 +98,7 @@
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;

import static org.apache.flink.configuration.ConfigConstants.ENV_FLINK_LIB_DIR;
Expand Down Expand Up @@ -661,7 +663,8 @@ public ApplicationReport startAppMaster(

// ------------------ Initialize the file systems -------------------------

org.apache.flink.core.fs.FileSystem.initialize(configuration);
//TODO provide plugin path.
org.apache.flink.core.fs.FileSystem.initialize(configuration, PluginUtils.createPluginManagerFromRootFolder(Optional.empty()));

// initialize file system
// Copy the application master jar to the filesystem
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.flink.configuration.SecurityOptions;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.plugin.PluginUtils;
import org.apache.flink.runtime.clusterframework.BootstrapTools;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.security.SecurityConfiguration;
Expand All @@ -45,6 +46,7 @@
import java.io.IOException;
import java.lang.reflect.UndeclaredThrowableException;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.Callable;

/**
Expand Down Expand Up @@ -92,7 +94,9 @@ private static void run(String[] args) {
LOG.info("Current working Directory: {}", currDir);

final Configuration configuration = GlobalConfiguration.loadConfiguration(currDir);
FileSystem.initialize(configuration);

//TODO provide path.
FileSystem.initialize(configuration, PluginUtils.createPluginManagerFromRootFolder(Optional.empty()));

setupConfigurationAndInstallSecurityContext(configuration, currDir, ENV);

Expand Down

0 comments on commit c6307b5

Please sign in to comment.