Skip to content

Commit

Permalink
[FLINK-1501] Add metrics library for monitoring TaskManagers
Browse files Browse the repository at this point in the history
This closes apache#421
  • Loading branch information
rmetzger committed Mar 27, 2015
1 parent 6b9cee3 commit 2d1f8b0
Show file tree
Hide file tree
Showing 42 changed files with 733 additions and 3,234 deletions.
1 change: 1 addition & 0 deletions LICENSE
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,7 @@ The Apache Flink project bundles the following files under the MIT License:
- normalize.css v3.0.0 (http://git.io/normalize) - Copyright (c) Nicolas Gallagher and Jonathan Neal
- Font Awesome - Code (http://fortawesome.github.io/Font-Awesome/) - Copyright (c) 2014 Dave Gandy
- D3 dagre renderer (https://github.com/cpettitt/dagre-d3) - Copyright (c) 2012-2013 Chris Pettitt
- Rickshaw (https://github.com/shutterstock/rickshaw) - Copyright (C) 2011-2013 by Shutterstock Images, LLC

All rights reserved.

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -615,6 +615,9 @@ public final class ConfigConstants {
* Sets the number of local task managers
*/
public static final String LOCAL_INSTANCE_MANAGER_NUMBER_TASK_MANAGER = "localinstancemanager.numtaskmanager";


public static final String LOCAL_INSTANCE_MANAGER_START_WEBSERVER = "localinstancemanager.start-webserver";

// ------------------------------------------------------------------------

Expand Down
21 changes: 21 additions & 0 deletions flink-runtime/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,10 @@ under the License.

<packaging>jar</packaging>

<properties>
<metrics.version>3.1.0</metrics.version>
</properties>

<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
Expand Down Expand Up @@ -146,6 +150,23 @@ under the License.
</exclusions>
</dependency>

<dependency>
<groupId>io.dropwizard.metrics</groupId>
<artifactId>metrics-core</artifactId>
<version>${metrics.version}</version>
</dependency>
<dependency>
<groupId>io.dropwizard.metrics</groupId>
<artifactId>metrics-jvm</artifactId>
<version>${metrics.version}</version>
</dependency>

<dependency>
<groupId>io.dropwizard.metrics</groupId>
<artifactId>metrics-json</artifactId>
<version>${metrics.version}</version>
</dependency>

</dependencies>

<build>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,10 +66,13 @@ public class Instance {

/** Time when last heat beat has been received from the task manager running on this taskManager. */
private volatile long lastReceivedHeartBeat = System.currentTimeMillis();

private byte[] lastMetricsReport;

/** Flag marking the instance as alive or as dead. */
private volatile boolean isDead;


// --------------------------------------------------------------------------------------------

/**
Expand Down Expand Up @@ -170,6 +173,14 @@ public void reportHeartBeat() {
this.lastReceivedHeartBeat = System.currentTimeMillis();
}

public void setMetricsReport(byte[] lastMetricsReport) {
this.lastMetricsReport = lastMetricsReport;
}

public byte[] getLastMetricsReport() {
return lastMetricsReport;
}

/**
* Checks whether the last heartbeat occurred within the last {@code n} milliseconds
* before the given timestamp {@code now}.
Expand Down Expand Up @@ -332,4 +343,6 @@ public String toString() {
return String.format("%s @ %s - %d slots - URL: %s", instanceId, connectionInfo.getHostname(),
numberOfSlots, (taskManager != null ? taskManager.path() : "ActorRef.noSender"));
}


}
Original file line number Diff line number Diff line change
Expand Up @@ -49,13 +49,13 @@ public class InstanceManager {

/** Set of hosts known to run a task manager that are thus able to execute tasks (by connection). */
private final Map<ActorRef, Instance> registeredHostsByConnection;

/** Set of hosts that were present once and have died */
private final Set<ActorRef> deadHosts;

/** Listeners that want to be notified about availability and disappearance of instances */
private final List<InstanceListener> instanceListeners = new ArrayList<InstanceListener>();

/** The total number of task slots that the system has */
private int totalNumberOfAliveTaskSlots;

Expand All @@ -65,7 +65,7 @@ public class InstanceManager {
// ------------------------------------------------------------------------
// Constructor and set-up
// ------------------------------------------------------------------------

/**
* Creates an new instance manager.
*/
Expand All @@ -85,24 +85,24 @@ public void shutdown() {
for (Instance i : this.registeredHostsById.values()) {
i.markDead();
}

this.registeredHostsById.clear();
this.registeredHostsByConnection.clear();
this.deadHosts.clear();
this.totalNumberOfAliveTaskSlots = 0;
}
}

public boolean reportHeartBeat(InstanceID instanceId) {
public boolean reportHeartBeat(InstanceID instanceId, byte[] lastMetricsReport) {
if (instanceId == null) {
throw new IllegalArgumentException("InstanceID may not be null.");
}

synchronized (this.lock) {
if (this.isShutdown) {
return false;
}

Instance host = registeredHostsById.get(instanceId);

if (host == null){
Expand All @@ -115,6 +115,7 @@ public boolean reportHeartBeat(InstanceID instanceId) {
}

host.reportHeartBeat();
host.setMetricsReport(lastMetricsReport);

if (LOG.isDebugEnabled()) {
LOG.debug("Received heartbeat from TaskManager " + host);
Expand All @@ -124,20 +125,19 @@ public boolean reportHeartBeat(InstanceID instanceId) {
}
}

public InstanceID registerTaskManager(ActorRef taskManager, InstanceConnectionInfo connectionInfo,
HardwareDescription resources, int numberOfSlots){
public InstanceID registerTaskManager(ActorRef taskManager, InstanceConnectionInfo connectionInfo, HardwareDescription resources, int numberOfSlots){
synchronized(this.lock){
if (this.isShutdown) {
throw new IllegalStateException("InstanceManager is shut down.");
}

Instance prior = registeredHostsByConnection.get(taskManager);
if (prior != null) {
LOG.info("Registration attempt from TaskManager at " + taskManager.path() +
". This connection is already registered under ID " + prior.getId());
return null;
}

boolean wasDead = this.deadHosts.remove(taskManager);
if (wasDead) {
LOG.info("Registering TaskManager at " + taskManager.path() +
Expand All @@ -148,25 +148,25 @@ public InstanceID registerTaskManager(ActorRef taskManager, InstanceConnectionIn
do {
id = new InstanceID();
} while (registeredHostsById.containsKey(id));


Instance host = new Instance(taskManager, connectionInfo, id, resources, numberOfSlots);

registeredHostsById.put(id, host);
registeredHostsByConnection.put(taskManager, host);

totalNumberOfAliveTaskSlots += numberOfSlots;

if (LOG.isInfoEnabled()) {
LOG.info(String.format("Registered TaskManager at %s (%s) as %s. Current number of registered hosts is %d.",
connectionInfo.getHostname(), taskManager.path(), id, registeredHostsById.size()));
}

host.reportHeartBeat();

// notify all listeners (for example the scheduler)
notifyNewInstance(host);

return id;
}
}
Expand Down Expand Up @@ -202,7 +202,7 @@ public int getNumberOfRegisteredTaskManagers() {
public int getTotalNumberOfSlots() {
return this.totalNumberOfAliveTaskSlots;
}

public Collection<Instance> getAllRegisteredInstances() {
synchronized (this.lock) {
// return a copy (rather than a Collections.unmodifiable(...) wrapper), such that
Expand All @@ -218,21 +218,21 @@ public Instance getRegisteredInstanceById(InstanceID instanceID) {
public Instance getRegisteredInstance(ActorRef ref) {
return registeredHostsByConnection.get(ref);
}

// --------------------------------------------------------------------------------------------

public void addInstanceListener(InstanceListener listener) {
synchronized (this.instanceListeners) {
this.instanceListeners.add(listener);
}
}

public void removeInstanceListener(InstanceListener listener) {
synchronized (this.instanceListeners) {
this.instanceListeners.remove(listener);
}
}

private void notifyNewInstance(Instance instance) {
synchronized (this.instanceListeners) {
for (InstanceListener listener : this.instanceListeners) {
Expand All @@ -245,7 +245,7 @@ private void notifyNewInstance(Instance instance) {
}
}
}

private void notifyDeadInstance(Instance instance) {
synchronized (this.instanceListeners) {
for (InstanceListener listener : this.instanceListeners) {
Expand All @@ -257,4 +257,4 @@ private void notifyDeadInstance(Instance instance) {
}
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,10 @@ private void writeTaskmanagers(HttpServletResponse resp) throws IOException {
objInner.put("freeMemory", instance.getResources().getSizeOfJvmHeap() >>> 20);
objInner.put("managedMemory", instance.getResources().getSizeOfManagedMemory() >>> 20);
objInner.put("instanceID", instance.getId());
byte[] report = instance.getLastMetricsReport();
if(report != null) {
objInner.put("metrics", new JSONObject(new String(report, "utf-8")));
}
array.put(objInner);
} catch (JSONException e) {
LOG.warn("Json object creation failed", e);
Expand Down
24 changes: 24 additions & 0 deletions flink-runtime/src/main/resources/log4j.properties
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
################################################################################
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################


# Convenience file for local debugging of the JobManager/TaskManager.
log4j.rootLogger=OFF, console
log4j.appender.console=org.apache.log4j.ConsoleAppender
log4j.appender.console.layout=org.apache.log4j.PatternLayout
log4j.appender.console.layout.ConversionPattern=%d{HH:mm:ss,SSS} %-5p %-60c %x - %m%n
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,6 @@
<script src="js/bootstrap.js"></script>

<!-- Scripts from Flink -->
<script type="text/javascript" src="js/jquery.flot.min.js"></script>
<script type="text/javascript" src="js/helpers.js"></script>
<script type="text/javascript" src="js/jcanvas.min.js"></script>
<script type="text/javascript" src="js/timeline.js"></script>
<script type="text/javascript" src="js/helpers.js"></script>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@
<script src="js/bootstrap.js"></script>

<!-- Scripts from Flink -->
<script type="text/javascript" src="js/jquery.flot.min.js"></script>
<script type="text/javascript" src="js/helpers.js"></script>
<script type="text/javascript" src="js/jcanvas.min.js"></script>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@
<script src="js/bootstrap.js"></script>

<!-- Scripts from Flink -->
<script type="text/javascript" src="js/jquery.flot.min.js"></script>
<script type="text/javascript" src="js/helpers.js"></script>
<script type="text/javascript" src="js/configuration.js"></script>
<script type="text/javascript" src="js/jcanvas.min.js"></script>
Expand Down

Large diffs are not rendered by default.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@
<script src="js/bootstrap.js"></script>

<!-- Scripts from Flink -->
<script type="text/javascript" src="js/jquery.flot.min.js"></script>
<script type="text/javascript" src="js/helpers.js"></script>
<script type="text/javascript" src="js/jobmanagerFrontend.js"></script>
<script type="text/javascript" src="js/jcanvas.min.js"></script>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@
<script src="js/bootstrap.js"></script>

<!-- Scripts from Flink -->
<script type="text/javascript" src="js/jquery.flot.min.js"></script>
<script type="text/javascript" src="js/helpers.js"></script>
<script type="text/javascript" src="js/jobmanagerFrontend.js"></script>
<script type="text/javascript" src="js/jcanvas.min.js"></script>
Expand Down

This file was deleted.

Large diffs are not rendered by default.

Large diffs are not rendered by default.

Loading

0 comments on commit 2d1f8b0

Please sign in to comment.