Skip to content

Commit

Permalink
[FLINK-3125] [web dashboard] Web server starts also when JobManager l…
Browse files Browse the repository at this point in the history
…og files cannot be accessed.
  • Loading branch information
StephanEwen committed Dec 6, 2015
1 parent 1ec0d10 commit a1f4dd1
Show file tree
Hide file tree
Showing 4 changed files with 125 additions and 70 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
import org.apache.flink.runtime.webmonitor.files.StaticFileServerHandler;
import org.apache.flink.runtime.webmonitor.handlers.ConstantTextHandler;
import org.apache.flink.runtime.webmonitor.handlers.JobAccumulatorsHandler;
import org.apache.flink.runtime.webmonitor.handlers.JobCancellationHandler;
import org.apache.flink.runtime.webmonitor.handlers.JobManagerConfigHandler;
Expand Down Expand Up @@ -99,8 +100,6 @@ public class WebRuntimeMonitor implements WebMonitor {

private final Router router;

private final int configuredPort;

private final ServerBootstrap bootstrap;

private final Promise<String> jobManagerAddressPromise = new scala.concurrent.impl.Promise.DefaultPromise<>();
Expand All @@ -118,6 +117,7 @@ public WebRuntimeMonitor(
Configuration config,
LeaderRetrievalService leaderRetrievalService,
ActorSystem actorSystem) throws IOException, InterruptedException {

this.leaderRetrievalService = checkNotNull(leaderRetrievalService);

final WebMonitorConfig cfg = new WebMonitorConfig(config);
Expand All @@ -127,15 +127,12 @@ public WebRuntimeMonitor(
webRootDir = new File(System.getProperty("java.io.tmpdir"), fileName);
LOG.info("Using directory {} for the web interface files", webRootDir);

final WebMonitorUtils.LogFiles logFiles = WebMonitorUtils.LogFiles.find(config);

LOG.info("Serving job manager log from {}", logFiles.logFile.getAbsolutePath());
LOG.info("Serving job manager stdout from {}", logFiles.stdOutFile.getAbsolutePath());

final WebMonitorUtils.LogFileLocation logFiles = WebMonitorUtils.LogFileLocation.find(config);

// port configuration
this.configuredPort = cfg.getWebFrontendPort();
if (this.configuredPort < 0) {
throw new IllegalArgumentException("Web frontend port is invalid: " + this.configuredPort);
int configuredPort = cfg.getWebFrontendPort();
if (configuredPort < 0) {
throw new IllegalArgumentException("Web frontend port is invalid: " + configuredPort);
}

timeout = AkkaUtils.getTimeout(config);
Expand All @@ -144,7 +141,7 @@ public WebRuntimeMonitor(
retriever = new JobManagerRetriever(this, actorSystem, lookupTimeout, timeout);

ExecutionGraphHolder currentGraphs = new ExecutionGraphHolder();

router = new Router()
// config how to interact with this web server
.GET("/config", handler(new DashboardConfigHandler(cfg.getRefreshInterval())))
Expand Down Expand Up @@ -183,8 +180,11 @@ public WebRuntimeMonitor(
.GET("/taskmanagers/:" + TaskManagersHandler.TASK_MANAGER_ID_KEY, handler(new TaskManagersHandler(DEFAULT_REQUEST_TIMEOUT)))

// log and stdout
.GET("/jobmanager/log", new StaticFileServerHandler(retriever, jobManagerAddressPromise.future(), timeout, logFiles.logFile))
.GET("/jobmanager/stdout", new StaticFileServerHandler(retriever, jobManagerAddressPromise.future(), timeout, logFiles.stdOutFile))
.GET("/jobmanager/log", logFiles.logFile == null ? new ConstantTextHandler("(log file unavailable)") :
new StaticFileServerHandler(retriever, jobManagerAddressPromise.future(), timeout, logFiles.logFile))

.GET("/jobmanager/stdout", logFiles.stdOutFile == null ? new ConstantTextHandler("(stdout file unavailable)") :
new StaticFileServerHandler(retriever, jobManagerAddressPromise.future(), timeout, logFiles.stdOutFile))

// Cancel a job via GET (for proper integration with YARN this has to be performed via GET)
.GET("/jobs/:jobid/yarn-cancel", handler(new JobCancellationHandler()))
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.runtime.webmonitor.handlers;

import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.codec.http.DefaultFullHttpResponse;
import io.netty.handler.codec.http.HttpHeaders;
import io.netty.handler.codec.http.HttpResponse;
import io.netty.handler.codec.http.HttpResponseStatus;
import io.netty.handler.codec.http.HttpVersion;
import io.netty.handler.codec.http.router.KeepAliveWrite;
import io.netty.handler.codec.http.router.Routed;

import org.apache.flink.runtime.webmonitor.files.MimeTypes;

import java.io.UnsupportedEncodingException;

/**
* Responder that returns a constant String.
*/
@ChannelHandler.Sharable
public class ConstantTextHandler extends SimpleChannelInboundHandler<Routed> {

private static final String MIME_TYPE = MimeTypes.getMimeTypeForExtension("txt");

private final byte[] encodedText;

public ConstantTextHandler(String text) {
try {
this.encodedText = text.getBytes("UTF-8");
}
catch (UnsupportedEncodingException e) {
throw new RuntimeException(e.getMessage(), e);
}
}

@Override
protected void channelRead0(ChannelHandlerContext ctx, Routed routed) throws Exception {
HttpResponse response = new DefaultFullHttpResponse(
HttpVersion.HTTP_1_1, HttpResponseStatus.OK, Unpooled.wrappedBuffer(encodedText));

response.headers().set(HttpHeaders.Names.CONTENT_ENCODING, "utf-8");
response.headers().set(HttpHeaders.Names.CONTENT_LENGTH, encodedText.length);
response.headers().set(HttpHeaders.Names.CONTENT_TYPE, MIME_TYPE);

KeepAliveWrite.flush(ctx, routed.request(), response);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,19 +19,21 @@
package org.apache.flink.runtime.webmonitor;

import akka.actor.ActorSystem;

import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.IllegalConfigurationException;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.executiongraph.ExecutionGraph;
import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
import org.apache.flink.runtime.executiongraph.ExecutionVertex;
import org.apache.flink.runtime.jobgraph.JobStatus;
import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
import org.apache.flink.runtime.messages.webmonitor.JobDetails;

import org.codehaus.jettison.json.JSONArray;
import org.codehaus.jettison.json.JSONException;
import org.codehaus.jettison.json.JSONObject;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -53,60 +55,53 @@ public final class WebMonitorUtils {
/**
* Singleton to hold the log and stdout file
*/
public static class LogFiles {

private static LogFiles INSTANCE;
public static class LogFileLocation {

public final File logFile;
public final File stdOutFile;

private LogFiles(String logFile) {
this.logFile = checkFileLocation(logFile);
String stdOutFile = logFile.replaceFirst("\\.log$", ".out");
this.stdOutFile = checkFileLocation(stdOutFile);;
private LogFileLocation(File logFile, File stdOutFile) {
this.logFile = logFile;
this.stdOutFile = stdOutFile;
}


/**
* Verify log file location
* @param logFilePath Path to log file
* @return File or null if not a valid log file
* Finds the Flink log directory using log.file Java property that is set during startup.
*/
private static File checkFileLocation (String logFilePath) {
File logFile = new File(logFilePath);
if (logFile.exists() && logFile.canRead()) {
return logFile;
} else {
throw new IllegalConfigurationException("Job manager log file was supposed to be at " +
logFile.getAbsolutePath() + " but it does not exist or is not readable.");
public static LogFileLocation find(Configuration config) {
final String logEnv = "log.file";
String logFilePath = System.getProperty(logEnv);

if (logFilePath == null) {
LOG.warn("Log file environment variable '{}' is not set.", logEnv);
logFilePath = config.getString(ConfigConstants.JOB_MANAGER_WEB_LOG_PATH_KEY, null);
}

// not configured, cannot serve log files
if (logFilePath == null || logFilePath.length() < 4) {
LOG.warn("JobManager log files are unavailable in the web dashboard. " +
"Log file location not found in environment variable '{}' or configuration key '{}'.",
logEnv, ConfigConstants.JOB_MANAGER_WEB_LOG_PATH_KEY);
return new LogFileLocation(null, null);
}

String outFilePath = logFilePath.substring(0, logFilePath.length() - 3).concat("out");

LOG.info("Determined location of JobManager log file: {}", logFilePath);
LOG.info("Determined location of JobManager stdout file: {}", outFilePath);

return new LogFileLocation(resolveFileLocation(logFilePath), resolveFileLocation(outFilePath));
}

/**
* Finds the Flink log directory using log.file Java property that is set during startup.
* Verify log file location
* @param logFilePath Path to log file
* @return File or null if not a valid log file
*/
public static LogFiles find(Configuration config) {
if (INSTANCE == null) {

/** Figure out log file location based on 'log.file' VM argument **/
final String logEnv = "log.file";
String logFilePath = System.getProperty(logEnv);

if (logFilePath == null) {
LOG.warn("Log file environment variable '{}' is not set.", logEnv);
logFilePath = config.getString(ConfigConstants.JOB_MANAGER_WEB_LOG_PATH_KEY, null);
}

if (logFilePath == null) {
throw new IllegalConfigurationException("JobManager log file not found. " +
"Can't serve log files. Log file location couldn't be determined via the " +
logEnv + " environment variable or the config constant " +
ConfigConstants.JOB_MANAGER_WEB_LOG_PATH_KEY);
}

INSTANCE = new LogFiles(logFilePath);
}

return INSTANCE;
private static File resolveFileLocation(String logFilePath) {
File logFile = new File(logFilePath);
return (logFile.exists() && logFile.canRead()) ? logFile : null;
}
}

Expand All @@ -127,9 +122,9 @@ public static WebMonitor startWebRuntimeMonitor(
// try to load and instantiate the class
try {
String classname = "org.apache.flink.runtime.webmonitor.WebRuntimeMonitor";
Class clazz = Class.forName(classname).asSubclass(WebMonitor.class);
@SuppressWarnings("unchecked")
Constructor<WebMonitor> constructor = clazz.getConstructor(Configuration.class,
Class<? extends WebMonitor> clazz = Class.forName(classname).asSubclass(WebMonitor.class);

Constructor<? extends WebMonitor> constructor = clazz.getConstructor(Configuration.class,
LeaderRetrievalService.class,
ActorSystem.class);
return constructor.newInstance(config, leaderRetrievalService, actorSystem);
Expand All @@ -147,7 +142,7 @@ public static WebMonitor startWebRuntimeMonitor(
}
}

public static Map<String, String> fromKeyValueJsonArray (JSONArray parsed) throws JSONException {
public static Map<String, String> fromKeyValueJsonArray(JSONArray parsed) throws JSONException {
Map<String, String> hashMap = new HashMap<>();

for (int i = 0; i < parsed.length(); i++) {
Expand Down Expand Up @@ -194,6 +189,4 @@ public static JobDetails createDetailsForJob(ExecutionGraph job) {
private WebMonitorUtils() {
throw new RuntimeException();
}


}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,13 +32,10 @@
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import scala.concurrent.duration.FiniteDuration;

import java.io.File;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Map;
import java.util.concurrent.TimeUnit;

@RunWith(Parameterized.class)
public class WebFrontendITCase extends MultipleProgramsTestBase {
Expand All @@ -56,8 +53,6 @@ public static void initialize() {
port = webMonitor.getServerPort();
}

static final FiniteDuration timeout = new FiniteDuration(10, TimeUnit.SECONDS);

public WebFrontendITCase(TestExecutionMode m) {
super(m);
}
Expand Down Expand Up @@ -115,7 +110,7 @@ public void getTaskmanagers() {
@Test
public void getLogAndStdoutFiles() {
try {
WebMonitorUtils.LogFiles logFiles = WebMonitorUtils.LogFiles.find(cluster.configuration());
WebMonitorUtils.LogFileLocation logFiles = WebMonitorUtils.LogFileLocation.find(cluster.configuration());

FileUtils.writeStringToFile(logFiles.logFile, "job manager log");
String logs = getFromHTTP("http://localhost:" + port + "/jobmanager/log");
Expand All @@ -124,7 +119,7 @@ public void getLogAndStdoutFiles() {
FileUtils.writeStringToFile(logFiles.stdOutFile, "job manager out");
logs = getFromHTTP("http://localhost:" + port + "/jobmanager/stdout");
Assert.assertTrue(logs.contains("job manager out"));
}catch(Throwable e) {
} catch(Throwable e) {
e.printStackTrace();
Assert.fail(e.getMessage());
}
Expand All @@ -141,7 +136,7 @@ public void getConfiguration() {
Assert.assertEquals(
cluster.configuration().getString("taskmanager.numberOfTaskSlots", null),
conf.get(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS));
} catch(Throwable e) {
} catch (Throwable e) {
e.printStackTrace();
Assert.fail(e.getMessage());
}
Expand Down

0 comments on commit a1f4dd1

Please sign in to comment.