Skip to content

Commit

Permalink
[FLINK-12143][dist] Ship plugins in the cluster the same way as lib jars
Browse files Browse the repository at this point in the history
  • Loading branch information
pnowojski committed Jun 13, 2019
1 parent b5dc213 commit 186a2dd
Show file tree
Hide file tree
Showing 12 changed files with 106 additions and 36 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,6 @@
import java.util.Date;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -124,8 +123,7 @@ public CliFrontend(
this.configuration = Preconditions.checkNotNull(configuration);
this.customCommandLines = Preconditions.checkNotNull(customCommandLines);

//TODO provide plugin path.
FileSystem.initialize(this.configuration, PluginUtils.createPluginManagerFromRootFolder(Optional.empty()));
FileSystem.initialize(configuration, PluginUtils.createPluginManagerFromRootFolder(configuration));

this.customCommandLineOptions = new Options();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,9 +130,34 @@ public static Configuration loadConfiguration(final String configDir, @Nullable
configuration.addAll(dynamicProperties);
}

return enrichWithEnvironmentVariables(configuration);
}

private static Configuration enrichWithEnvironmentVariables(Configuration configuration) {
enrichWithEnvironmentVariable(ConfigConstants.ENV_FLINK_PLUGINS_DIR, configuration);
return configuration;
}

private static void enrichWithEnvironmentVariable(String environmentVariable, Configuration configuration) {
String pluginsDirFromEnv = System.getenv(environmentVariable);

if (pluginsDirFromEnv == null) {
return;
}

String pluginsDirFromConfig = configuration.getString(environmentVariable, pluginsDirFromEnv);

if (!pluginsDirFromEnv.equals(pluginsDirFromConfig)) {
throw new IllegalConfigurationException(
"The given configuration file already contains a value (" + pluginsDirFromEnv +
") for the key (" + environmentVariable +
") that would have been overwritten with (" + pluginsDirFromConfig +
") by an environment with the same name.");
}

configuration.setString(environmentVariable, pluginsDirFromEnv);
}

/**
* Loads a YAML-file of key-value pairs.
*
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.core.plugin;

import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;

import java.io.File;
import java.nio.file.Path;
import java.util.Optional;

/**
* Stores the configuration for plugins mechanism.
*/
public class PluginConfig {
private final Optional<Path> pluginsPath;

private PluginConfig() {
this.pluginsPath = Optional.empty();
}

private PluginConfig(Path pluginsPath) {
this.pluginsPath = Optional.of(pluginsPath);
}

public Optional<Path> getPluginsPath() {
return pluginsPath;
}

public static PluginConfig fromConfiguration(Configuration configuration) {
String pluginsDir = configuration.getString(ConfigConstants.ENV_FLINK_PLUGINS_DIR, null);
if (pluginsDir == null) {
return new PluginConfig();
}

File pluginsDirFile = new File(pluginsDir);
if (!pluginsDirFile.isDirectory()) {
return new PluginConfig();
}
return new PluginConfig(pluginsDirFile.toPath());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,12 @@

package org.apache.flink.core.plugin;

import org.apache.flink.configuration.Configuration;
import org.apache.flink.util.FlinkRuntimeException;

import java.io.IOException;
import java.nio.file.Path;
import java.util.Collection;
import java.util.Collections;
import java.util.Optional;

/**
* Utility functions for the plugin mechanism.
Expand All @@ -35,20 +34,22 @@ private PluginUtils() {
throw new AssertionError("Singleton class.");
}

public static PluginManager createPluginManagerFromRootFolder(Optional<Path> pluginsRootPath) {
Collection<PluginDescriptor> pluginDescriptorsForDirectory;
public static PluginManager createPluginManagerFromRootFolder(Configuration configuration) {
return createPluginManagerFromRootFolder(PluginConfig.fromConfiguration(configuration));
}

if (pluginsRootPath.isPresent()) {
private static PluginManager createPluginManagerFromRootFolder(PluginConfig pluginConfig) {
if (pluginConfig.getPluginsPath().isPresent()) {
try {
pluginDescriptorsForDirectory =
new DirectoryBasedPluginFinder(pluginsRootPath.get()).findPlugins();
Collection<PluginDescriptor> pluginDescriptors =
new DirectoryBasedPluginFinder(pluginConfig.getPluginsPath().get()).findPlugins();
return new PluginManager(pluginDescriptors);
} catch (IOException e) {
throw new FlinkRuntimeException("Exception when trying to initialize plugin system.", e);
}
} else {
pluginDescriptorsForDirectory = Collections.emptyList();
}

return new PluginManager(pluginDescriptorsForDirectory);
else {
return new PluginManager(Collections.emptyList());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ public void testConfigurationYAML() {

@Test(expected = IllegalArgumentException.class)
public void testFailIfNull() {
GlobalConfiguration.loadConfiguration(null);
GlobalConfiguration.loadConfiguration((String) null);
}

@Test(expected = IllegalConfigurationException.class)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@

import java.lang.reflect.UndeclaredThrowableException;
import java.util.Map;
import java.util.Optional;

/**
* The entry point for running a TaskManager in a Mesos container.
Expand Down Expand Up @@ -87,8 +86,7 @@ public static void main(String[] args) throws Exception {
final Map<String, String> envs = System.getenv();

// configure the filesystems
//TODO provide plugin path.
FileSystem.initialize(configuration, PluginUtils.createPluginManagerFromRootFolder(Optional.empty()));
FileSystem.initialize(configuration, PluginUtils.createPluginManagerFromRootFolder(configuration));

// tell akka to die in case of an error
configuration.setBoolean(AkkaOptions.JVM_EXIT_ON_FATAL_ERROR, true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,6 @@
import java.time.ZonedDateTime;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
Expand Down Expand Up @@ -111,8 +110,7 @@ public static void main(String[] args) throws Exception {
LOG.info("Loading configuration from {}", configDir);
final Configuration flinkConfig = GlobalConfiguration.loadConfiguration(configDir);

//TODO provide plugin path.
FileSystem.initialize(flinkConfig, PluginUtils.createPluginManagerFromRootFolder(Optional.empty()));
FileSystem.initialize(flinkConfig, PluginUtils.createPluginManagerFromRootFolder(flinkConfig));

// run the history server
SecurityUtils.install(new SecurityConfiguration(flinkConfig));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,6 @@
import java.lang.reflect.UndeclaredThrowableException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
Expand Down Expand Up @@ -187,8 +186,7 @@ public void startCluster() throws ClusterEntrypointException {

private void configureFileSystems(Configuration configuration) {
LOG.info("Install default filesystem.");
//TODO provide plugin path
FileSystem.initialize(configuration, PluginUtils.createPluginManagerFromRootFolder(Optional.empty()));
FileSystem.initialize(configuration, PluginUtils.createPluginManagerFromRootFolder(configuration));
}

protected SecurityContext installSecurityContext(Configuration configuration) throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,6 @@
import java.net.InetAddress;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Optional;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
Expand Down Expand Up @@ -285,8 +284,7 @@ public static void main(String[] args) throws Exception {

final Configuration configuration = loadConfiguration(args);

//TODO provide plugin path.
FileSystem.initialize(configuration, PluginUtils.createPluginManagerFromRootFolder(Optional.empty()));
FileSystem.initialize(configuration, PluginUtils.createPluginManagerFromRootFolder(configuration));

SecurityUtils.install(new SecurityConfiguration(configuration));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,6 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;

/**
* Executor that performs the Flink communication locally. The calls are blocking depending on the
Expand Down Expand Up @@ -113,8 +112,7 @@ public LocalExecutor(URL defaultEnv, List<URL> jars, List<URL> libraries) {
this.flinkConfig = GlobalConfiguration.loadConfiguration(flinkConfigDir);

// initialize default file system
//TODO provide plugin path.
FileSystem.initialize(flinkConfig, PluginUtils.createPluginManagerFromRootFolder(Optional.empty()));
FileSystem.initialize(flinkConfig, PluginUtils.createPluginManagerFromRootFolder(flinkConfig));

// load command lines for deployment
this.commandLines = CliFrontend.loadCustomCommandLines(flinkConfig, flinkConfigDir);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,6 @@
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;

Expand Down Expand Up @@ -660,8 +659,9 @@ public ApplicationReport startAppMaster(

// ------------------ Initialize the file systems -------------------------

//TODO provide plugin path.
org.apache.flink.core.fs.FileSystem.initialize(configuration, PluginUtils.createPluginManagerFromRootFolder(Optional.empty()));
org.apache.flink.core.fs.FileSystem.initialize(
configuration,
PluginUtils.createPluginManagerFromRootFolder(configuration));

// initialize file system
// Copy the application master jar to the filesystem
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,6 @@
import java.io.IOException;
import java.lang.reflect.UndeclaredThrowableException;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.Callable;

/**
Expand Down Expand Up @@ -95,8 +94,7 @@ private static void run(String[] args) {

final Configuration configuration = GlobalConfiguration.loadConfiguration(currDir);

//TODO provide path.
FileSystem.initialize(configuration, PluginUtils.createPluginManagerFromRootFolder(Optional.empty()));
FileSystem.initialize(configuration, PluginUtils.createPluginManagerFromRootFolder(configuration));

setupConfigurationAndInstallSecurityContext(configuration, currDir, ENV);

Expand Down

0 comments on commit 186a2dd

Please sign in to comment.