Skip to content

Commit

Permalink
[FLINK-3160] [web-dashboard] Aggregate operator statistics by TaskMan…
Browse files Browse the repository at this point in the history
…ager

Adds a new per-job tab displaying subtask statistics aggregated by TaskManager.

This closes apache#1564.
  • Loading branch information
greghogan authored and uce committed Feb 15, 2016
1 parent babf84c commit 73bc35f
Show file tree
Hide file tree
Showing 29 changed files with 615 additions and 32 deletions.
41 changes: 41 additions & 0 deletions docs/internals/monitoring_rest_api.md
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,9 @@ Values in angle brackets are variables, for example `http://hostname:8081/jobs/<
- `/jobs/<jobid>/config`
- `/jobs/<jobid>/exceptions`
- `/jobs/<jobid>/accumulators`
- `/jobs/<jobid>/vertices/<vertexid>`
- `/jobs/<jobid>/vertices/<vertexid>/subtasktimes`
- `/jobs/<jobid>/vertices/<vertexid>/taskmanagers`
- `/jobs/<jobid>/vertices/<vertexid>/accumulators`
- `/jobs/<jobid>/vertices/<vertexid>/subtasks/accumulators`
- `/jobs/<jobid>/vertices/<vertexid>/subtasks/<subtasknum>`
Expand Down Expand Up @@ -364,6 +366,45 @@ Sample Result:
}
~~~

**`/jobs/<jobid>/vertices/<vertexid>/taskmanagers`**

TaskManager statistics for one specific vertex. This is an aggregation of subtask statistics returned by `/jobs/<jobid>/vertices/<vertexid>`.

Sample Result:

~~~
{
"id": "fe20bcc29b87cdc76589ca42114c2499",
"name": "Reduce (SUM(1), at main(WordCount.java:72)",
"now": 1454348282653,
"taskmanagers": [ {
"host": "ip-10-0-43-227:35413",
"status": "FINISHED",
"start-time": 1454347870991,
"end-time": 1454347872111,
"duration": 1120,
"metrics": {
"read-bytes": 32503056, "write-bytes": 9637041, "read-records": 2906087, "write-records": 849467
},
"status-counts": {
"CREATED": 0, "SCHEDULED": 0, "DEPLOYING": 0, "RUNNING": 0, "FINISHED": 18, "CANCELING": 0, "CANCELED": 0, "FAILED": 0
}
},{
"host": "ip-10-0-43-227:41486",
"status": "FINISHED",
"start-time": 1454347871001,
"end-time": 1454347872395,
"duration": 1394,
"metrics": {
"read-bytes": 32389499, "write-bytes": 9608829, "read-records": 2895999, "write-records": 846948
},
"status-counts": {
"CREATED": 0, "SCHEDULED": 0, "DEPLOYING": 0, "RUNNING": 0, "FINISHED": 18, "CANCELING": 0, "CANCELED": 0, "FAILED": 0
}
} ]
}
~~~

**`/jobs/<jobid>/vertices/<vertexid>/accumulators`**

The aggregated user-defined accumulators, for a specific vertex.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@
import org.apache.flink.runtime.webmonitor.handlers.JobVertexBackPressureHandler;
import org.apache.flink.runtime.webmonitor.handlers.JobVertexCheckpointsHandler;
import org.apache.flink.runtime.webmonitor.handlers.JobVertexDetailsHandler;
import org.apache.flink.runtime.webmonitor.handlers.JobVertexTaskManagersHandler;
import org.apache.flink.runtime.webmonitor.handlers.RequestHandler;
import org.apache.flink.runtime.webmonitor.handlers.SubtaskCurrentAttemptDetailsHandler;
import org.apache.flink.runtime.webmonitor.handlers.SubtaskExecutionAttemptAccumulatorsHandler;
Expand Down Expand Up @@ -213,6 +214,7 @@ public WebRuntimeMonitor(

.GET("/jobs/:jobid/vertices/:vertexid", handler(new JobVertexDetailsHandler(currentGraphs)))
.GET("/jobs/:jobid/vertices/:vertexid/subtasktimes", handler(new SubtasksTimesHandler(currentGraphs)))
.GET("/jobs/:jobid/vertices/:vertexid/taskmanagers", handler(new JobVertexTaskManagersHandler(currentGraphs)))
.GET("/jobs/:jobid/vertices/:vertexid/accumulators", handler(new JobVertexAccumulatorsHandler(currentGraphs)))
.GET("/jobs/:jobid/vertices/:vertexid/checkpoints", handler(new JobVertexCheckpointsHandler(currentGraphs)))
.GET("/jobs/:jobid/vertices/:vertexid/backpressure", handler(new JobVertexBackPressureHandler(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ public boolean accept(File dir, String name) {
success = success || f.delete();
}
StringWriter writer = new StringWriter();
JsonGenerator gen = JsonFactory.jacksonFactory.createJsonGenerator(writer);
JsonGenerator gen = JsonFactory.jacksonFactory.createGenerator(writer);
gen.writeStartObject();
if (!success) {
// this seems to always fail on Windows.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ public String handleRequest(Map<String, String> pathParams, Map<String, String>
}

StringWriter writer = new StringWriter();
JsonGenerator gen = JsonFactory.jacksonFactory.createJsonGenerator(writer);
JsonGenerator gen = JsonFactory.jacksonFactory.createGenerator(writer);
gen.writeStartObject();
gen.writeStringField("jobid", graph.f0.getJobID().toString());
gen.writeEndObject();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ public String handleRequest(ExecutionJobVertex jobVertex, Map<String, String> pa
final ExecutionState status = vertex.getExecutionState();

InstanceConnectionInfo location = vertex.getCurrentAssignedResourceLocation();
String locationString = location == null ? "(unassigned)" : location.getHostname();
String locationString = location == null ? "(unassigned)" : location.getHostname() + ":" + location.dataPort();

long startTime = vertex.getStateTimestamp(ExecutionState.DEPLOYING);
if (startTime == 0) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,175 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.runtime.webmonitor.handlers;

import com.fasterxml.jackson.core.JsonGenerator;
import org.apache.flink.api.common.accumulators.Accumulator;
import org.apache.flink.api.common.accumulators.LongCounter;
import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
import org.apache.flink.runtime.executiongraph.ExecutionVertex;
import org.apache.flink.runtime.instance.InstanceConnectionInfo;
import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder;

import java.io.StringWriter;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;

/**
* A request handler that provides the details of a job vertex, including id, name, and the
* runtime and metrics of all its subtasks aggregated by TaskManager.
*/
public class JobVertexTaskManagersHandler extends AbstractJobVertexRequestHandler {

public JobVertexTaskManagersHandler(ExecutionGraphHolder executionGraphHolder) {
super(executionGraphHolder);
}

@Override
public String handleRequest(ExecutionJobVertex jobVertex, Map<String, String> params) throws Exception {
// Build a map that groups tasks by TaskManager
Map<String, List<ExecutionVertex>> taskManagerVertices = new HashMap<>();

for (ExecutionVertex vertex : jobVertex.getTaskVertices()) {
InstanceConnectionInfo location = vertex.getCurrentAssignedResourceLocation();
String taskManager = location == null ? "(unassigned)" : location.getHostname() + ":" + location.dataPort();

List<ExecutionVertex> vertices = taskManagerVertices.get(taskManager);

if (vertices == null) {
vertices = new ArrayList<ExecutionVertex>();
taskManagerVertices.put(taskManager, vertices);
}

vertices.add(vertex);
}

// Build JSON response
final long now = System.currentTimeMillis();

StringWriter writer = new StringWriter();
JsonGenerator gen = JsonFactory.jacksonFactory.createGenerator(writer);

gen.writeStartObject();

gen.writeStringField("id", jobVertex.getJobVertexId().toString());
gen.writeStringField("name", jobVertex.getJobVertex().getName());
gen.writeNumberField("now", now);

gen.writeArrayFieldStart("taskmanagers");
for (Entry<String, List<ExecutionVertex>> entry : taskManagerVertices.entrySet()) {
String host = entry.getKey();
List<ExecutionVertex> taskVertices = entry.getValue();

int[] tasksPerState = new int[ExecutionState.values().length];

long startTime = Long.MAX_VALUE;
long endTime = 0;
boolean allFinished = true;

LongCounter tmReadBytes = new LongCounter();
LongCounter tmWriteBytes = new LongCounter();
LongCounter tmReadRecords = new LongCounter();
LongCounter tmWriteRecords = new LongCounter();

for (ExecutionVertex vertex : taskVertices) {
final ExecutionState state = vertex.getExecutionState();
tasksPerState[state.ordinal()]++;

// take the earliest start time
long started = vertex.getStateTimestamp(ExecutionState.DEPLOYING);
if (started > 0) {
startTime = Math.min(startTime, started);
}

allFinished &= state.isTerminal();
endTime = Math.max(endTime, vertex.getStateTimestamp(state));

Map<AccumulatorRegistry.Metric, Accumulator<?, ?>> metrics = vertex.getCurrentExecutionAttempt().getFlinkAccumulators();

if (metrics != null) {
LongCounter readBytes = (LongCounter) metrics.get(AccumulatorRegistry.Metric.NUM_BYTES_IN);
tmReadBytes.merge(readBytes);

LongCounter writeBytes = (LongCounter) metrics.get(AccumulatorRegistry.Metric.NUM_BYTES_OUT);
tmWriteBytes.merge(writeBytes);

LongCounter readRecords = (LongCounter) metrics.get(AccumulatorRegistry.Metric.NUM_RECORDS_IN);
tmReadRecords.merge(readRecords);

LongCounter writeRecords = (LongCounter) metrics.get(AccumulatorRegistry.Metric.NUM_RECORDS_OUT);
tmWriteRecords.merge(writeRecords);
}
}

long duration;
if (startTime < Long.MAX_VALUE) {
if (allFinished) {
duration = endTime - startTime;
}
else {
endTime = -1L;
duration = now - startTime;
}
}
else {
startTime = -1L;
endTime = -1L;
duration = -1L;
}

ExecutionState jobVertexState =
ExecutionJobVertex.getAggregateJobVertexState(tasksPerState, taskVertices.size());

gen.writeStartObject();

gen.writeStringField("host", host);
gen.writeStringField("status", jobVertexState.name());

gen.writeNumberField("start-time", startTime);
gen.writeNumberField("end-time", endTime);
gen.writeNumberField("duration", duration);

gen.writeObjectFieldStart("metrics");
gen.writeNumberField("read-bytes", tmReadBytes.getLocalValuePrimitive());
gen.writeNumberField("write-bytes", tmWriteBytes.getLocalValuePrimitive());
gen.writeNumberField("read-records", tmReadRecords.getLocalValuePrimitive());
gen.writeNumberField("write-records", tmWriteRecords.getLocalValuePrimitive());
gen.writeEndObject();

gen.writeObjectFieldStart("status-counts");
for (ExecutionState state : ExecutionState.values()) {
gen.writeNumberField(state.name(), tasksPerState[state.ordinal()]);
}
gen.writeEndObject();

gen.writeEndObject();
}
gen.writeEndArray();

gen.writeEndObject();

gen.close();
return writer.toString();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ nav.navbar.navbar-default.navbar-fixed-top.navbar-main
th Status

tbody
tr(ng-repeat="job in jobs|orderBy:\"'end-time'\":true" ui-sref="single-job.plan.overview({ jobid: job.jid })")
tr(ng-repeat="job in jobs|orderBy:\"'end-time'\":true" ui-sref="single-job.plan.subtasks({ jobid: job.jid })")
td {{job['start-time'] | amDateFormat:'YYYY-MM-DD, H:mm:ss'}}
td {{job['end-time'] | amDateFormat:'YYYY-MM-DD, H:mm:ss'}}
td(title="{{job.duration | humanizeDuration:false}}") {{job.duration | humanizeDuration:true}}
Expand Down
2 changes: 1 addition & 1 deletion flink-runtime-web/web-dashboard/app/partials/jobs/job.jade
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ nav.navbar.navbar-default.navbar-fixed-top.navbar-main(ng-if="job")
nav.navbar.navbar-default.navbar-fixed-top.navbar-main-additional(ng-if="job")
ul.nav.nav-tabs
li(ui-sref-active='active')
a(ui-sref=".plan.overview") Plan
a(ui-sref=".plan.subtasks") Plan

//- li(ui-sref-active='active' ng-if="job['end-time'] > -1")
li(ui-sref-active='active')
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,10 @@
nav.navbar.navbar-default.navbar-secondary-additional
ul.nav.nav-tabs
li(ui-sref-active='active')
a(ui-sref=".overview({nodeid: nodeid})") Overview
a(ui-sref=".subtasks({nodeid: nodeid})") Subtasks

li(ui-sref-active='active')
a(ui-sref=".taskmanagers({nodeid: nodeid})") TaskManagers

li(ui-sref-active='active')
a(ui-sref=".accumulators({nodeid: nodeid})") Accumulators
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
//
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
table.table.table-body-hover.table-clickable.table-activable
thead
tr
th Start Time
th End Time
th Duration
th Name
th Bytes received
th Records received
th Bytes sent
th Records sent
th Tasks
th Status

tbody(ng-repeat="v in job.vertices" ng-class="{ active: v.id == nodeid }" ng-click="changeNode(v.id)")
tr(ng-if="v.type == 'regular'")
td
span(ng-if="v['start-time'] > -1") {{ v['start-time'] | amDateFormat:'YYYY-MM-DD, H:mm:ss' }}
td
span(ng-if="v['end-time'] > -1") {{ v['end-time'] | amDateFormat:'YYYY-MM-DD, H:mm:ss' }}
td
span(ng-if="v.duration > -1" title="{{v.duration | humanizeDuration:false}}") {{v.duration | humanizeDuration:true}}

td.td-long {{ v.name | humanizeText }}
td(title="{{v.metrics['read-bytes']}} bytes") {{ v.metrics['read-bytes'] | humanizeBytes }}
td {{ v.metrics['read-records'] | number }}
td(title="{{v.metrics['write-bytes']}} bytes") {{ v.metrics['write-bytes'] | humanizeBytes }}
td {{ v.metrics['write-records'] | number }}
td
.label-group
bs-label(status="{{status}}" ng-repeat="(index, status) in stateList") {{v.tasks[status]}}

td
bs-label(status="{{v.status}}") {{v.status}}
tr(ng-if="nodeid && v.id == nodeid")
td(colspan="10")
div(ng-include=" 'partials/jobs/job.plan.node.taskmanagers.html' ")
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ table.table.table-hover.table-clickable.table-activable.table-inner(ng-if="subta
th Status

tbody
tr(ng-repeat="subtask in subtasks")
tr(ng-repeat="subtask in subtasks | orderBy:'host'")
td
span(ng-if="subtask['start-time'] > -1") {{ subtask['start-time'] | amDateFormat:'YYYY-MM-DD, H:mm:ss' }}
td
Expand Down
Loading

0 comments on commit 73bc35f

Please sign in to comment.