forked from apache/flink
-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
[FLINK-6630] [FLINK-6631] Implement FLIP-6 Mesos cluster entrypoints …
…+ MesosTaskExecutorRunner - bin: new entrypoints scripts for flip-6 - ClusterEntrypoint: Refactor the shutdown method - ClusterEntrypoint: Install default FileSystem (for parity with legacy entrypoints) - ClusterEntrypoint: new MesosJobClusterEntrypoint, MesosSessionClusterEntrypoint, MesosEntrypointUtils, MesosTaskExecutorRunner - MesosServices: enhanced with artifactServer, localActorSystem - MesosResourceManager: Fallback to old TM params when UNKNOWN resource profile is provided - MesosResourceManager: config setting for taskmanager startup script (mesos.resourcemanager.tasks.taskmanager-cmd) - test: added a 'noop' job graph for testing purposes This closes apache#4555.
- Loading branch information
1 parent
76f1022
commit bbac4a6
Showing
22 changed files
with
1,186 additions
and
184 deletions.
There are no files selected for viewing
47 changes: 47 additions & 0 deletions
47
flink-dist/src/main/flink-bin/mesos-bin/mesos-appmaster-flip6-job.sh
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,47 @@ | ||
#!/usr/bin/env bash | ||
################################################################################ | ||
# Licensed to the Apache Software Foundation (ASF) under one | ||
# or more contributor license agreements. See the NOTICE file | ||
# distributed with this work for additional information | ||
# regarding copyright ownership. The ASF licenses this file | ||
# to you under the Apache License, Version 2.0 (the | ||
# "License"); you may not use this file except in compliance | ||
# with the License. You may obtain a copy of the License at | ||
# | ||
# http://www.apache.org/licenses/LICENSE-2.0 | ||
# | ||
# Unless required by applicable law or agreed to in writing, software | ||
# distributed under the License is distributed on an "AS IS" BASIS, | ||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
# See the License for the specific language governing permissions and | ||
# limitations under the License. | ||
################################################################################ | ||
|
||
bin=`dirname "$0"` | ||
bin=`cd "$bin"; pwd` | ||
|
||
# get Flink config | ||
. "$bin"/config.sh | ||
|
||
if [ "$FLINK_IDENT_STRING" = "" ]; then | ||
FLINK_IDENT_STRING="$USER" | ||
fi | ||
|
||
CC_CLASSPATH=`manglePathList $(constructFlinkClassPath):$INTERNAL_HADOOP_CLASSPATHS` | ||
|
||
log="${FLINK_LOG_DIR}/flink-${FLINK_IDENT_STRING}-mesos-appmaster-${HOSTNAME}.log" | ||
log_setting="-Dlog.file="$log" -Dlog4j.configuration=file:"$FLINK_CONF_DIR"/log4j.properties -Dlogback.configurationFile=file:"$FLINK_CONF_DIR"/logback.xml" | ||
|
||
export FLINK_CONF_DIR | ||
export FLINK_BIN_DIR | ||
export FLINK_LIB_DIR | ||
|
||
exec $JAVA_RUN $JVM_ARGS -classpath "$CC_CLASSPATH" $log_setting org.apache.flink.mesos.entrypoint.MesosJobClusterEntrypoint "$@" | ||
|
||
rc=$? | ||
|
||
if [[ $rc -ne 0 ]]; then | ||
echo "Error while starting the mesos application master. Please check ${log} for more details." | ||
fi | ||
|
||
exit $rc |
47 changes: 47 additions & 0 deletions
47
flink-dist/src/main/flink-bin/mesos-bin/mesos-appmaster-flip6-session.sh
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,47 @@ | ||
#!/usr/bin/env bash | ||
################################################################################ | ||
# Licensed to the Apache Software Foundation (ASF) under one | ||
# or more contributor license agreements. See the NOTICE file | ||
# distributed with this work for additional information | ||
# regarding copyright ownership. The ASF licenses this file | ||
# to you under the Apache License, Version 2.0 (the | ||
# "License"); you may not use this file except in compliance | ||
# with the License. You may obtain a copy of the License at | ||
# | ||
# http://www.apache.org/licenses/LICENSE-2.0 | ||
# | ||
# Unless required by applicable law or agreed to in writing, software | ||
# distributed under the License is distributed on an "AS IS" BASIS, | ||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
# See the License for the specific language governing permissions and | ||
# limitations under the License. | ||
################################################################################ | ||
|
||
bin=`dirname "$0"` | ||
bin=`cd "$bin"; pwd` | ||
|
||
# get Flink config | ||
. "$bin"/config.sh | ||
|
||
if [ "$FLINK_IDENT_STRING" = "" ]; then | ||
FLINK_IDENT_STRING="$USER" | ||
fi | ||
|
||
CC_CLASSPATH=`manglePathList $(constructFlinkClassPath):$INTERNAL_HADOOP_CLASSPATHS` | ||
|
||
log="${FLINK_LOG_DIR}/flink-${FLINK_IDENT_STRING}-mesos-appmaster-${HOSTNAME}.log" | ||
log_setting="-Dlog.file="$log" -Dlog4j.configuration=file:"$FLINK_CONF_DIR"/log4j.properties -Dlogback.configurationFile=file:"$FLINK_CONF_DIR"/logback.xml" | ||
|
||
export FLINK_CONF_DIR | ||
export FLINK_BIN_DIR | ||
export FLINK_LIB_DIR | ||
|
||
exec $JAVA_RUN $JVM_ARGS -classpath "$CC_CLASSPATH" $log_setting org.apache.flink.mesos.entrypoint.MesosSessionClusterEntrypoint "$@" | ||
|
||
rc=$? | ||
|
||
if [[ $rc -ne 0 ]]; then | ||
echo "Error while starting the mesos application master. Please check ${log} for more details." | ||
fi | ||
|
||
exit $rc |
45 changes: 45 additions & 0 deletions
45
flink-dist/src/main/flink-bin/mesos-bin/mesos-taskmanager-flip6.sh
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,45 @@ | ||
#!/usr/bin/env bash | ||
################################################################################ | ||
# Licensed to the Apache Software Foundation (ASF) under one | ||
# or more contributor license agreements. See the NOTICE file | ||
# distributed with this work for additional information | ||
# regarding copyright ownership. The ASF licenses this file | ||
# to you under the Apache License, Version 2.0 (the | ||
# "License"); you may not use this file except in compliance | ||
# with the License. You may obtain a copy of the License at | ||
# | ||
# http://www.apache.org/licenses/LICENSE-2.0 | ||
# | ||
# Unless required by applicable law or agreed to in writing, software | ||
# distributed under the License is distributed on an "AS IS" BASIS, | ||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
# See the License for the specific language governing permissions and | ||
# limitations under the License. | ||
################################################################################ | ||
|
||
bin=`dirname "$0"` | ||
bin=`cd "$bin"; pwd` | ||
|
||
# get Flink config | ||
. "$bin"/config.sh | ||
|
||
CC_CLASSPATH=`manglePathList $(constructFlinkClassPath):$INTERNAL_HADOOP_CLASSPATHS` | ||
|
||
log=flink-taskmanager.log | ||
log_setting="-Dlog.file="$log" -Dlog4j.configuration=file:"$FLINK_CONF_DIR"/log4j.properties -Dlogback.configurationFile=file:"$FLINK_CONF_DIR"/logback.xml" | ||
|
||
# Add precomputed memory JVM options | ||
if [ -z "${FLINK_ENV_JAVA_OPTS_MEM}" ]; then | ||
FLINK_ENV_JAVA_OPTS_MEM="" | ||
fi | ||
export FLINK_ENV_JAVA_OPTS="${FLINK_ENV_JAVA_OPTS} ${FLINK_ENV_JAVA_OPTS_MEM}" | ||
|
||
# Add TaskManager-specific JVM options | ||
export FLINK_ENV_JAVA_OPTS="${FLINK_ENV_JAVA_OPTS} ${FLINK_ENV_JAVA_OPTS_TM}" | ||
|
||
export FLINK_CONF_DIR | ||
export FLINK_BIN_DIR | ||
export FLINK_LIB_DIR | ||
|
||
exec $JAVA_RUN $JVM_ARGS ${FLINK_ENV_JAVA_OPTS} -classpath "$CC_CLASSPATH" $log_setting org.apache.flink.mesos.entrypoint.MesosTaskExecutorRunner "$@" | ||
|
177 changes: 177 additions & 0 deletions
177
flink-mesos/src/main/java/org/apache/flink/mesos/entrypoint/MesosEntrypointUtils.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,177 @@ | ||
/* | ||
* Licensed to the Apache Software Foundation (ASF) under one | ||
* or more contributor license agreements. See the NOTICE file | ||
* distributed with this work for additional information | ||
* regarding copyright ownership. The ASF licenses this file | ||
* to you under the Apache License, Version 2.0 (the | ||
* "License"); you may not use this file except in compliance | ||
* with the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
package org.apache.flink.mesos.entrypoint; | ||
|
||
import org.apache.flink.configuration.Configuration; | ||
import org.apache.flink.configuration.GlobalConfiguration; | ||
import org.apache.flink.configuration.IllegalConfigurationException; | ||
import org.apache.flink.mesos.configuration.MesosOptions; | ||
import org.apache.flink.mesos.runtime.clusterframework.MesosTaskManagerParameters; | ||
import org.apache.flink.mesos.util.MesosConfiguration; | ||
import org.apache.flink.runtime.clusterframework.BootstrapTools; | ||
import org.apache.flink.runtime.clusterframework.ContainerSpecification; | ||
import org.apache.flink.runtime.clusterframework.overlays.CompositeContainerOverlay; | ||
import org.apache.flink.runtime.clusterframework.overlays.FlinkDistributionOverlay; | ||
import org.apache.flink.runtime.clusterframework.overlays.HadoopConfOverlay; | ||
import org.apache.flink.runtime.clusterframework.overlays.HadoopUserOverlay; | ||
import org.apache.flink.runtime.clusterframework.overlays.KeytabOverlay; | ||
import org.apache.flink.runtime.clusterframework.overlays.Krb5ConfOverlay; | ||
import org.apache.flink.runtime.clusterframework.overlays.SSLStoreOverlay; | ||
|
||
import org.apache.commons.cli.CommandLine; | ||
import org.apache.mesos.Protos; | ||
import org.slf4j.Logger; | ||
|
||
import java.io.IOException; | ||
import java.util.concurrent.TimeUnit; | ||
|
||
import scala.concurrent.duration.Duration; | ||
import scala.concurrent.duration.FiniteDuration; | ||
|
||
/** | ||
* Utils for Mesos entrpoints. | ||
*/ | ||
public class MesosEntrypointUtils { | ||
|
||
/** | ||
* Loads the global configuration and adds the dynamic properties parsed from | ||
* the given command line. | ||
* | ||
* @param cmd command line to parse for dynamic properties | ||
* @return Global configuration with dynamic properties set | ||
* @deprecated replace once FLINK-7269 has been merged | ||
*/ | ||
@Deprecated | ||
public static Configuration loadConfiguration(CommandLine cmd) { | ||
|
||
// merge the dynamic properties from the command-line | ||
Configuration dynamicProperties = BootstrapTools.parseDynamicProperties(cmd); | ||
GlobalConfiguration.setDynamicProperties(dynamicProperties); | ||
Configuration config = GlobalConfiguration.loadConfiguration(); | ||
|
||
return config; | ||
} | ||
|
||
/** | ||
* Loads and validates the Mesos scheduler configuration. | ||
* @param flinkConfig the global configuration. | ||
* @param hostname the hostname to advertise to the Mesos master. | ||
*/ | ||
public static MesosConfiguration createMesosSchedulerConfiguration(Configuration flinkConfig, String hostname) { | ||
|
||
Protos.FrameworkInfo.Builder frameworkInfo = Protos.FrameworkInfo.newBuilder() | ||
.setHostname(hostname); | ||
Protos.Credential.Builder credential = null; | ||
|
||
if (!flinkConfig.contains(MesosOptions.MASTER_URL)) { | ||
throw new IllegalConfigurationException(MesosOptions.MASTER_URL.key() + " must be configured."); | ||
} | ||
String masterUrl = flinkConfig.getString(MesosOptions.MASTER_URL); | ||
|
||
Duration failoverTimeout = FiniteDuration.apply( | ||
flinkConfig.getInteger( | ||
MesosOptions.FAILOVER_TIMEOUT_SECONDS), | ||
TimeUnit.SECONDS); | ||
frameworkInfo.setFailoverTimeout(failoverTimeout.toSeconds()); | ||
|
||
frameworkInfo.setName(flinkConfig.getString( | ||
MesosOptions.RESOURCEMANAGER_FRAMEWORK_NAME)); | ||
|
||
frameworkInfo.setRole(flinkConfig.getString( | ||
MesosOptions.RESOURCEMANAGER_FRAMEWORK_ROLE)); | ||
|
||
frameworkInfo.setUser(flinkConfig.getString( | ||
MesosOptions.RESOURCEMANAGER_FRAMEWORK_USER)); | ||
|
||
if (flinkConfig.contains(MesosOptions.RESOURCEMANAGER_FRAMEWORK_PRINCIPAL)) { | ||
frameworkInfo.setPrincipal(flinkConfig.getString( | ||
MesosOptions.RESOURCEMANAGER_FRAMEWORK_PRINCIPAL)); | ||
|
||
credential = Protos.Credential.newBuilder(); | ||
credential.setPrincipal(frameworkInfo.getPrincipal()); | ||
|
||
// some environments use a side-channel to communicate the secret to Mesos, | ||
// and thus don't set the 'secret' configuration setting | ||
if (flinkConfig.contains(MesosOptions.RESOURCEMANAGER_FRAMEWORK_SECRET)) { | ||
credential.setSecret(flinkConfig.getString( | ||
MesosOptions.RESOURCEMANAGER_FRAMEWORK_SECRET)); | ||
} | ||
} | ||
|
||
MesosConfiguration mesos = | ||
new MesosConfiguration(masterUrl, frameworkInfo, scala.Option.apply(credential)); | ||
|
||
return mesos; | ||
} | ||
|
||
public static MesosTaskManagerParameters createTmParameters(Configuration configuration, Logger log) { | ||
// TM configuration | ||
final MesosTaskManagerParameters taskManagerParameters = MesosTaskManagerParameters.create(configuration); | ||
|
||
log.info("TaskManagers will be created with {} task slots", | ||
taskManagerParameters.containeredParameters().numSlots()); | ||
log.info("TaskManagers will be started with container size {} MB, JVM heap size {} MB, " + | ||
"JVM direct memory limit {} MB, {} cpus", | ||
taskManagerParameters.containeredParameters().taskManagerTotalMemoryMB(), | ||
taskManagerParameters.containeredParameters().taskManagerHeapSizeMB(), | ||
taskManagerParameters.containeredParameters().taskManagerDirectMemoryLimitMB(), | ||
taskManagerParameters.cpus()); | ||
|
||
return taskManagerParameters; | ||
} | ||
|
||
public static ContainerSpecification createContainerSpec(Configuration configuration, Configuration dynamicProperties) | ||
throws Exception { | ||
// generate a container spec which conveys the artifacts/vars needed to launch a TM | ||
ContainerSpecification spec = new ContainerSpecification(); | ||
|
||
// propagate the AM dynamic configuration to the TM | ||
spec.getDynamicConfiguration().addAll(dynamicProperties); | ||
|
||
applyOverlays(configuration, spec); | ||
|
||
return spec; | ||
} | ||
|
||
/** | ||
* Generate a container specification as a TaskManager template. | ||
* | ||
* <p>This code is extremely Mesos-specific and registers all the artifacts that the TaskManager | ||
* needs (such as JAR file, config file, ...) and all environment variables into a container specification. | ||
* The Mesos fetcher then ensures that those artifacts will be copied into the task's sandbox directory. | ||
* A lightweight HTTP server serves the artifacts to the fetcher. | ||
*/ | ||
public static void applyOverlays( | ||
Configuration configuration, ContainerSpecification containerSpec) throws IOException { | ||
|
||
// create the overlays that will produce the specification | ||
CompositeContainerOverlay overlay = new CompositeContainerOverlay( | ||
FlinkDistributionOverlay.newBuilder().fromEnvironment(configuration).build(), | ||
HadoopConfOverlay.newBuilder().fromEnvironment(configuration).build(), | ||
HadoopUserOverlay.newBuilder().fromEnvironment(configuration).build(), | ||
KeytabOverlay.newBuilder().fromEnvironment(configuration).build(), | ||
Krb5ConfOverlay.newBuilder().fromEnvironment(configuration).build(), | ||
SSLStoreOverlay.newBuilder().fromEnvironment(configuration).build() | ||
); | ||
|
||
// apply the overlays | ||
overlay.configure(containerSpec); | ||
} | ||
|
||
} |
Oops, something went wrong.