Skip to content

Commit

Permalink
[FLINK-3427] [webui] Refactorings to watermark tracking
Browse files Browse the repository at this point in the history
  • Loading branch information
uce committed Mar 8, 2017
1 parent d84b65f commit 4ef18f6
Show file tree
Hide file tree
Showing 7 changed files with 93 additions and 73 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
split
.split#canvas
.canvas-wrapper
div.main-canvas(job-plan, plan="plan", low-watermarks="lowWatermarks" jobid="{{jobid}}", set-node="changeNode(nodeid)")
div.main-canvas(job-plan, plan="plan", watermarks="watermarks" jobid="{{jobid}}", set-node="changeNode(nodeid)")

.split#job-panel
.panel.panel-default.panel-multi(ng-if="plan")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,14 @@ table.table.table-body-hover.table-clickable.table-activable
th Parallelism
th Status

tbody(ng-repeat="v in job.vertices" ng-class="{ active: v.id == nodeid && hasWatermarks(nodeid) }" ng-click="changeNode(v.id)")
tbody(ng-repeat="v in job.vertices" ng-class="{ active: v.id == nodeid }" ng-click="changeNode(v.id)")
tr(ng-if="v.type == 'regular'")

td.td-long {{ v.name | humanizeText }}
td {{ watermarks | lowWatermark:v.id }}
td {{ watermarks[v.id]["lowWatermark"] | humanizeWatermark }}
td {{ v.parallelism }}
td
td
bs-label(status="{{v.status}}") {{v.status}}
tr(ng-if="nodeid && v.id == nodeid && hasWatermarks(nodeid)")
td(colspan="11")
div(ng-include=" 'partials/jobs/job.plan.node.watermarks.html' ")
tr(ng-if="nodeid && v.id == nodeid")
td(colspan="4")
div(ng-show="hasWatermark(v.id)" ng-include=" 'partials/jobs/job.plan.node.watermarks.html' ")
div(ng-show="!hasWatermark(v.id)") No Watermarks
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,13 @@
See the License for the specific language governing permissions and
limitations under the License.
table.table.table-hover.table-clickable.table-activable.table-inner(ng-if="hasWatermarks(nodeid)")
table.table.table-hover.table-clickable.table-activable.table-inner
thead
tr
th id
th Subtask
th Watermark

tbody
tr(ng-repeat="watermark in watermarksByNode(nodeid)")
td {{ watermark.id }}
td {{ watermark.value | parseWatermark }}
tr(ng-repeat="(subtaskIndex, watermark) in watermarks[nodeid]['watermarks']")
td {{ subtaskIndex | increment }}
td {{ watermark | humanizeWatermark }}
16 changes: 5 additions & 11 deletions flink-runtime-web/web-dashboard/app/scripts/common/filters.coffee
Original file line number Diff line number Diff line change
Expand Up @@ -88,19 +88,13 @@ angular.module('flinkApp')
.filter "percentage", ->
(number) -> (number * 100).toFixed(0) + '%'

.filter "parseWatermark", (watermarksConfig)->
.filter "humanizeWatermark", (watermarksConfig) ->
(value) ->
if value <= watermarksConfig.minValue
if isNaN(value) || value <= watermarksConfig.noWatermark
return 'No Watermark'
else
return value

.filter "lowWatermark", (watermarksConfig)->
(watermarks, nodeid) ->
lowWatermark = "No Watermark"
if watermarks != null && watermarks[nodeid] && watermarks[nodeid].length
values = (watermark.value for watermark in watermarks[nodeid])
lowWatermark = Math.min.apply(null, values)
if lowWatermark <= watermarksConfig.minValue
lowWatermark = "No Watermark"
return lowWatermark
.filter "increment", ->
(number) ->
parseInt(number) + 1
9 changes: 5 additions & 4 deletions flink-runtime-web/web-dashboard/app/scripts/index.coffee
Original file line number Diff line number Diff line change
Expand Up @@ -30,14 +30,17 @@ angular.module('flinkApp', ['ui.router', 'angularMoment', 'dndLists'])

.value 'flinkConfig', {
jobServer: ''
# jobServer: 'http://localhost:8081/'
# jobServer: 'http://localhost:8081/'
"refresh-interval": 10000
}

# --------------------------------------

.value 'watermarksConfig', {
minValue: -9223372036854776000
# A value of (Java) Long.MIN_VALUE indicates that there is no watermark
# available. This is parsed by Javascript as this number. We have it as
# a constant here to compare available watermarks against.
noWatermark: -9223372036854776000
}

# --------------------------------------
Expand All @@ -52,7 +55,6 @@ angular.module('flinkApp', ['ui.router', 'angularMoment', 'dndLists'])
JobsService.listJobs()
, flinkConfig["refresh-interval"]


# --------------------------------------

.config ($uiViewScrollProvider) ->
Expand Down Expand Up @@ -125,7 +127,6 @@ angular.module('flinkApp', ['ui.router', 'angularMoment', 'dndLists'])
views:
'node-details':
templateUrl: "partials/jobs/job.plan.node-list.watermarks.html"
controller: 'JobPlanWatermarksController'

.state "single-job.plan.taskmanagers",
url: "/taskmanagers"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,7 @@ angular.module('flinkApp')
$scope.jobid = $stateParams.jobid
$scope.job = null
$scope.plan = null
$scope.watermarks = null
$scope.lowWatermarks = null
$scope.watermarks = {}
$scope.vertices = null
$scope.backPressureOperatorStats = {}

Expand All @@ -61,8 +60,7 @@ angular.module('flinkApp')
$scope.$on '$destroy', ->
$scope.job = null
$scope.plan = null
$scope.watermarks = null
$scope.lowWatermarks = null
$scope.watermarks = {}
$scope.vertices = null
$scope.backPressureOperatorStats = null

Expand All @@ -84,43 +82,80 @@ angular.module('flinkApp')
$scope.plan = data.plan
MetricsService.setupMetrics($stateParams.jobid, data.vertices)

getWatermarks = (nodes)->
# This function uses a promise to resolve watermarks once fetched via the metrics service, since watermarks have to be fetched individually for each node, we have to wait until all API calls have been made before we can resolve the promise. In the end we will have an array of low watermarks for each node: e.g. {somenodeid: [{id: 0, value: -9223372036854776000}], anothernodeid: [{id: 0, value: -9223372036854776000}, {id: 1, value: -9223372036854776000}]}.
# Asynchronously requests the watermark metrics for the given nodes. The
# returned object has the following structure:
#
# {
# "<nodeId>": {
# "lowWatermark": <lowWatermark>
# "watermarks": {
# 0: <watermark for subtask 0>
# ...
# n: <watermark for subtask n>
# }
# }
# }
#
# If no watermark is available, lowWatermark will be NaN and
# the watermarks will be empty.
getWatermarks = (nodes) ->
# Requests the watermarks for a single vertex. Triggers a request
# to the Metrics service.
requestWatermarkForNode = (node) =>
deferred = $q.defer()

jid = $scope.job.jid

# Request metrics for each subtask
metricIds = (i + ".currentLowWatermark" for i in [0..node.parallelism - 1])
MetricsService.getMetrics(jid, node.id, metricIds).then (metrics) ->
minValue = NaN
watermarks = {}

for key, value of metrics.values
subtaskIndex = key.replace('.currentLowWatermark', '')
watermarks[subtaskIndex] = value

if (isNaN(minValue) || value < minValue)
minValue = value

if (!isNaN(minValue) && minValue > watermarksConfig.noWatermark)
lowWatermark = minValue
else
# NaN indicates no watermark available
lowWatermark = NaN

deferred.resolve({"lowWatermark": lowWatermark, "watermarks": watermarks})

deferred.promise

deferred = $q.defer()
watermarks = {}
jid = $scope.job.jid

# Request watermarks for each node and update watermarks
len = nodes.length
angular.forEach nodes, (node, index) =>
metricIds = []
# for each node, we need to specify which metrics we want to collect, for each subtask, we need to fetch the currentLowWatermark, and each param is formed by concatenating subtask index to '.currentLowWatermark'.
for num in [0..node.parallelism - 1]
metricIds.push(num + ".currentLowWatermark")
MetricsService.getMetrics(jid, node.id, metricIds).then (data) ->
values = []
for key, value of data.values
values.push(id: key.replace('.currentLowWatermark', ''), value: value)
watermarks[node.id] = values
if index >= $scope.plan.nodes.length - 1
nodeId = node.id
requestWatermarkForNode(node).then (data) ->
watermarks[nodeId] = data
if (index >= len - 1)
deferred.resolve(watermarks)

deferred.promise

getLowWatermarks = (watermarks)->
lowWatermarks = []
for k,v of watermarks
minValue = Math.min.apply(null,(watermark.value for watermark in v))
lowWatermarks[k] = if minValue <= watermarksConfig.minValue || v.length == 0 then 'No Watermark' else minValue
return lowWatermarks
# Returns true if the lowWatermark is != NaN
$scope.hasWatermark = (nodeid) ->
$scope.watermarks[nodeid] && !isNaN($scope.watermarks[nodeid]["lowWatermark"])

$scope.$watch 'plan', (newPlan) ->
if newPlan
getWatermarks(newPlan.nodes).then (data) ->
$scope.watermarks = data
$scope.lowWatermarks = getLowWatermarks(data)

$scope.$on 'reload', (event) ->
$scope.$on 'reload', () ->
if $scope.plan
getWatermarks($scope.plan.nodes).then (data) ->
$scope.watermarks = data
$scope.lowWatermarks = getLowWatermarks(data)

# --------------------------------------

Expand Down Expand Up @@ -359,14 +394,3 @@ angular.module('flinkApp')
loadMetrics() if $scope.nodeid

# --------------------------------------

.controller 'JobPlanWatermarksController', ($scope, $filter) ->
$scope.hasWatermarks = (nodeid) ->
return true if $scope.watermarksByNode(nodeid).length

$scope.watermarksByNode = (nodeid) ->
if $scope.watermarks != null && $scope.watermarks[nodeid] && $scope.watermarks[nodeid].length
return $scope.watermarks[nodeid]
return []

# --------------------------------------
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ angular.module('flinkApp')

# ----------------------------------------------

.directive 'split', () ->
.directive 'split', () ->
return compile: (tElem, tAttrs) ->
Split(tElem.children(), (
sizes: [50, 50]
Expand All @@ -189,7 +189,7 @@ angular.module('flinkApp')

scope:
plan: '='
lowWatermarks: '='
watermarks: '='
setNode: '&'

link: (scope, elem, attrs) ->
Expand Down Expand Up @@ -436,10 +436,11 @@ angular.module('flinkApp')
return el.step_function[j] if el.step_function[j].id is nodeID

mergeWatermarks = (data, watermarks) ->
for k,v of watermarks
if (!_.isEmpty(watermarks))
for node in data.nodes
if node.id == k
node.lowWatermark = v
if (watermarks[node.id] && !isNaN(watermarks[node.id]["lowWatermark"]))
node.lowWatermark = watermarks[node.id]["lowWatermark"]

return data

lastPosition = 0
Expand All @@ -456,7 +457,7 @@ angular.module('flinkApp')
marginy: 40
})

loadJsonToDagre(g, mergeWatermarks(scope.plan, scope.lowWatermarks))
loadJsonToDagre(g, mergeWatermarks(scope.plan, scope.watermarks))

d3mainSvgG.selectAll("*").remove()

Expand Down Expand Up @@ -494,7 +495,7 @@ angular.module('flinkApp')
scope.$watch attrs.plan, (newPlan) ->
drawGraph() if newPlan

scope.$watch attrs.lowWatermarks, (newLowWatermarks) ->
drawGraph() if newLowWatermarks && scope.plan
scope.$watch attrs.watermarks, (newWatermarks) ->
drawGraph() if newWatermarks && scope.plan

return

0 comments on commit 4ef18f6

Please sign in to comment.