Skip to content

Commit

Permalink
[FLINK-2732] Display TM logs in Dashboard
Browse files Browse the repository at this point in the history
This closes apache#1790
  • Loading branch information
zentol committed Apr 18, 2016
1 parent 5f993c6 commit 6d53bbc
Show file tree
Hide file tree
Showing 22 changed files with 812 additions and 65 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,11 @@ public final class ConfigConstants {
*/
public static final String TASK_MANAGER_TMP_DIR_KEY = "taskmanager.tmp.dirs";

/**
* The config parameter defining the taskmanager log file location
*/
public static final String TASK_MANAGER_LOG_PATH_KEY = "taskmanager.log.path";

/**
* The config parameter defining the amount of memory to be allocated by the task manager's
* memory manager (in megabytes). If not set, a relative fraction will be allocated, as defined
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,25 +22,19 @@
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.codec.http.DefaultFullHttpResponse;
import io.netty.handler.codec.http.HttpHeaders;
import io.netty.handler.codec.http.HttpResponse;
import io.netty.handler.codec.http.HttpResponseStatus;
import io.netty.handler.codec.http.HttpVersion;
import io.netty.handler.codec.http.router.KeepAliveWrite;
import io.netty.handler.codec.http.router.Routed;

import org.apache.flink.runtime.instance.ActorGateway;
import org.apache.flink.runtime.webmonitor.handlers.HandlerRedirectUtils;
import org.apache.flink.runtime.webmonitor.handlers.RequestHandler;
import org.apache.flink.util.ExceptionUtils;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Option;
import scala.Tuple2;
import scala.concurrent.Await;
import scala.concurrent.Future;
import scala.concurrent.duration.FiniteDuration;

Expand All @@ -58,68 +52,27 @@
* proper codes, like OK, NOT_FOUND, or SERVER_ERROR.
*/
@ChannelHandler.Sharable
public class RuntimeMonitorHandler extends SimpleChannelInboundHandler<Routed> {
public class RuntimeMonitorHandler extends RuntimeMonitorHandlerBase {

private static final Logger LOG = LoggerFactory.getLogger(RuntimeMonitorHandler.class);

private static final Charset ENCODING = Charset.forName("UTF-8");

public static final String WEB_MONITOR_ADDRESS_KEY = "web.monitor.address";


private final RequestHandler handler;

private final JobManagerRetriever retriever;

private final Future<String> localJobManagerAddressFuture;

private final FiniteDuration timeout;

private String localJobManagerAddress;

private final RequestHandler handler;

public RuntimeMonitorHandler(
RequestHandler handler,
JobManagerRetriever retriever,
Future<String> localJobManagerAddressFuture,
FiniteDuration timeout) {

super(retriever, localJobManagerAddressFuture, timeout);
this.handler = checkNotNull(handler);
this.retriever = checkNotNull(retriever);
this.localJobManagerAddressFuture = checkNotNull(localJobManagerAddressFuture);
this.timeout = checkNotNull(timeout);
}

@Override
protected void channelRead0(ChannelHandlerContext ctx, Routed routed) throws Exception {
if (localJobManagerAddressFuture.isCompleted()) {
if (localJobManagerAddress == null) {
localJobManagerAddress = Await.result(localJobManagerAddressFuture, timeout);
}

Option<Tuple2<ActorGateway, Integer>> jobManager = retriever.getJobManagerGatewayAndWebPort();

if (jobManager.isDefined()) {
Tuple2<ActorGateway, Integer> gatewayPort = jobManager.get();
String redirectAddress = HandlerRedirectUtils.getRedirectAddress(
localJobManagerAddress, gatewayPort);

if (redirectAddress != null) {
HttpResponse redirect = HandlerRedirectUtils.getRedirectResponse(redirectAddress, routed.path());
KeepAliveWrite.flush(ctx, routed.request(), redirect);
}
else {
respondAsLeader(ctx, routed, gatewayPort._1());
}
} else {
KeepAliveWrite.flush(ctx, routed.request(), HandlerRedirectUtils.getUnavailableResponse());
}
} else {
KeepAliveWrite.flush(ctx, routed.request(), HandlerRedirectUtils.getUnavailableResponse());
}
}

private void respondAsLeader(ChannelHandlerContext ctx, Routed routed, ActorGateway jobManager) {
protected void respondAsLeader(ChannelHandlerContext ctx, Routed routed, ActorGateway jobManager) {
DefaultFullHttpResponse response;

try {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.runtime.webmonitor;

import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.codec.http.HttpResponse;
import io.netty.handler.codec.http.router.KeepAliveWrite;
import io.netty.handler.codec.http.router.Routed;
import org.apache.flink.runtime.instance.ActorGateway;
import org.apache.flink.runtime.webmonitor.handlers.HandlerRedirectUtils;
import org.apache.flink.runtime.webmonitor.handlers.RequestHandler;
import scala.Option;
import scala.Tuple2;
import scala.concurrent.Await;
import scala.concurrent.Future;
import scala.concurrent.duration.FiniteDuration;

import static com.google.common.base.Preconditions.checkNotNull;

/**
* The Netty channel handler that processes all HTTP requests.
* This handler takes the path parameters and delegates the work to a {@link RequestHandler}.
* This handler also deals with setting correct response MIME types and returning
* proper codes, like OK, NOT_FOUND, or SERVER_ERROR.
*/
@ChannelHandler.Sharable
public abstract class RuntimeMonitorHandlerBase extends SimpleChannelInboundHandler<Routed> {

private final JobManagerRetriever retriever;

protected final Future<String> localJobManagerAddressFuture;

protected final FiniteDuration timeout;

protected String localJobManagerAddress;

public RuntimeMonitorHandlerBase(
JobManagerRetriever retriever,
Future<String> localJobManagerAddressFuture,
FiniteDuration timeout) {

this.retriever = checkNotNull(retriever);
this.localJobManagerAddressFuture = checkNotNull(localJobManagerAddressFuture);
this.timeout = checkNotNull(timeout);
}

@Override
protected void channelRead0(ChannelHandlerContext ctx, Routed routed) throws Exception {
if (localJobManagerAddressFuture.isCompleted()) {
if (localJobManagerAddress == null) {
localJobManagerAddress = Await.result(localJobManagerAddressFuture, timeout);
}

Option<Tuple2<ActorGateway, Integer>> jobManager = retriever.getJobManagerGatewayAndWebPort();

if (jobManager.isDefined()) {
Tuple2<ActorGateway, Integer> gatewayPort = jobManager.get();
String redirectAddress = HandlerRedirectUtils.getRedirectAddress(
localJobManagerAddress, gatewayPort);

if (redirectAddress != null) {
HttpResponse redirect = HandlerRedirectUtils.getRedirectResponse(redirectAddress, routed.path());
KeepAliveWrite.flush(ctx, routed.request(), redirect);
}
else {
respondAsLeader(ctx, routed, gatewayPort._1());
}
} else {
KeepAliveWrite.flush(ctx, routed.request(), HandlerRedirectUtils.getUnavailableResponse());
}
} else {
KeepAliveWrite.flush(ctx, routed.request(), HandlerRedirectUtils.getUnavailableResponse());
}
}

protected abstract void respondAsLeader(ChannelHandlerContext ctx, Routed routed, ActorGateway jobManager);
}
Original file line number Diff line number Diff line change
Expand Up @@ -65,16 +65,21 @@
import org.apache.flink.runtime.webmonitor.handlers.SubtaskExecutionAttemptDetailsHandler;
import org.apache.flink.runtime.webmonitor.handlers.SubtasksAllAccumulatorsHandler;
import org.apache.flink.runtime.webmonitor.handlers.SubtasksTimesHandler;
import org.apache.flink.runtime.webmonitor.handlers.TaskManagerLogHandler;
import org.apache.flink.runtime.webmonitor.handlers.TaskManagersHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.concurrent.ExecutionContext$;
import scala.concurrent.ExecutionContextExecutor;
import scala.concurrent.Promise;
import scala.concurrent.duration.FiniteDuration;

import java.io.File;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.UUID;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;

Expand Down Expand Up @@ -126,6 +131,8 @@ public class WebRuntimeMonitor implements WebMonitor {

private AtomicBoolean cleanedUp = new AtomicBoolean();

private ExecutorService executorService;

public WebRuntimeMonitor(
Configuration config,
LeaderRetrievalService leaderRetrievalService,
Expand Down Expand Up @@ -193,6 +200,10 @@ public WebRuntimeMonitor(

// --------------------------------------------------------------------

executorService = new ForkJoinPool();

ExecutionContextExecutor context = ExecutionContext$.MODULE$.fromExecutor(executorService);

router = new Router()
// config how to interact with this web server
.GET("/config", handler(new DashboardConfigHandler(cfg.getRefreshInterval())))
Expand Down Expand Up @@ -234,7 +245,11 @@ public WebRuntimeMonitor(
.GET("/jobs/:jobid/checkpoints", handler(new JobCheckpointsHandler(currentGraphs)))

.GET("/taskmanagers", handler(new TaskManagersHandler(DEFAULT_REQUEST_TIMEOUT)))
.GET("/taskmanagers/:" + TaskManagersHandler.TASK_MANAGER_ID_KEY, handler(new TaskManagersHandler(DEFAULT_REQUEST_TIMEOUT)))
.GET("/taskmanagers/:" + TaskManagersHandler.TASK_MANAGER_ID_KEY + "/metrics", handler(new TaskManagersHandler(DEFAULT_REQUEST_TIMEOUT)))
.GET("/taskmanagers/:" + TaskManagersHandler.TASK_MANAGER_ID_KEY + "/log",
new TaskManagerLogHandler(retriever, context, jobManagerAddressPromise.future(), timeout, TaskManagerLogHandler.FileMode.LOG, config))
.GET("/taskmanagers/:" + TaskManagersHandler.TASK_MANAGER_ID_KEY + "/stdout",
new TaskManagerLogHandler(retriever, context, jobManagerAddressPromise.future(), timeout, TaskManagerLogHandler.FileMode.STDOUT, config))

// log and stdout
.GET("/jobmanager/log", logFiles.logFile == null ? new ConstantTextHandler("(log file unavailable)") :
Expand Down Expand Up @@ -377,6 +392,8 @@ public void stop() throws Exception {

backPressureStatsTracker.shutDown();

executorService.shutdownNow();

cleanup();
}
}
Expand Down
Loading

0 comments on commit 6d53bbc

Please sign in to comment.