Skip to content

Commit

Permalink
[FLINK-8949] Add dedicated watermarks metric retrieval endpoint (apac…
Browse files Browse the repository at this point in the history
…he#10238)

Without this, watermarks for jobs with a parallelism of >= 160 cannot be
displayed correctly and will result in a "/bad-request" error message. The
reason is that the watermark for each subtask will be retrieved in a giant
metrics request like the following (abbreviated):

```
http://localhost:8081/jobs/32f2205231d280ad105b011198dd9e5f/vertices/8b69eb2c39b9caf941896fdafa7ca05f/metrics?get=0.currentInputWatermark,1.currentInputWatermark,2.currentInputWatermark,3.currentInputWatermark,...,160.currentInputWatermark
```

We debated raising the maximum header length or lifting the header length
restriction. Instead, we opted for a separate metrics endpoint which returns the
watermarks for the entire job vertex:

```
/jobs/:jobid/vertices/:vertexid/watermarks
```
  • Loading branch information
mxm authored Dec 7, 2019
1 parent 383beb0 commit c1d0e99
Show file tree
Hide file tree
Showing 9 changed files with 460 additions and 51 deletions.
49 changes: 49 additions & 0 deletions docs/_includes/generated/rest_v1_dispatcher.html
Original file line number Diff line number Diff line change
Expand Up @@ -3540,6 +3540,55 @@
</tr>
</tbody>
</table>
<table class="table table-bordered">
<tbody>
<tr>
<td class="text-left" colspan="2"><h5><strong>/jobs/:jobid/vertices/:vertexid/watermarks</strong></h5></td>
</tr>
<tr>
<td class="text-left" style="width: 20%">Verb: <code>GET</code></td>
<td class="text-left">Response code: <code>200 OK</code></td>
</tr>
<tr>
<td colspan="2">Returns the watermarks for all subtasks of a task.</td>
</tr>
<tr>
<td colspan="2">Path parameters</td>
</tr>
<tr>
<td colspan="2">
<ul>
<li><code>jobid</code> - 32-character hexadecimal string value that identifies a job.</li>
<li><code>vertexid</code> - 32-character hexadecimal string value that identifies a job vertex.</li>
</ul>
</td>
</tr>
<tr>
<td colspan="2">
<button data-toggle="collapse" data-target="#1153325552">Request</button>
<div id="1153325552" class="collapse">
<pre>
<code>
{} </code>
</pre>
</div>
</td>
</tr>
<tr>
<td colspan="2">
<button data-toggle="collapse" data-target="#1464053913">Response</button>
<div id="1464053913" class="collapse">
<pre>
<code>
{
"type" : "any"
} </code>
</pre>
</div>
</td>
</tr>
</tbody>
</table>
<table class="table table-bordered">
<tbody>
<tr>
Expand Down
21 changes: 21 additions & 0 deletions flink-runtime-web/src/test/resources/rest_api_v1.snapshot
Original file line number Diff line number Diff line change
Expand Up @@ -2329,6 +2329,27 @@
}
}
}
}, {
"url" : "/jobs/:jobid/vertices/:vertexid/watermarks",
"method" : "GET",
"status-code" : "200 OK",
"file-upload" : false,
"path-parameters" : {
"pathParameters" : [ {
"key" : "jobid"
}, {
"key" : "vertexid"
} ]
},
"query-parameters" : {
"queryParameters" : [ ]
},
"request" : {
"type" : "any"
},
"response" : {
"type" : "any"
}
}, {
"url" : "/overview",
"method" : "GET",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ import {
ViewChild
} from '@angular/core';
import { ActivatedRoute, Router } from '@angular/router';
import { LONG_MIN_VALUE } from 'config';
import { forkJoin, Observable, of, Subject } from 'rxjs';
import { catchError, filter, map, takeUntil } from 'rxjs/operators';
import { NodesItemCorrectInterface, NodesItemLinkInterface } from 'interfaces';
Expand Down Expand Up @@ -66,29 +65,9 @@ export class JobOverviewComponent implements OnInit, OnDestroy {
mergeWithWatermarks(nodes: NodesItemCorrectInterface[]): Observable<NodesItemCorrectInterface[]> {
return forkJoin(
nodes.map(node => {
const listOfMetricId = [];
let lowWatermark = NaN;
for (let i = 0; i < node.parallelism; i++) {
listOfMetricId.push(`${i}.currentInputWatermark`);
}
return this.metricService.getMetrics(this.jobId, node.id, listOfMetricId).pipe(
map(metrics => {
let minValue = NaN;
const watermarks: { [index: string]: number } = {};
for (const key in metrics.values) {
const value = metrics.values[key];
const subtaskIndex = key.replace('.currentInputWatermark', '');
watermarks[subtaskIndex] = value;
if (isNaN(minValue) || value < minValue) {
minValue = value;
}
}
if (!isNaN(minValue) && minValue > LONG_MIN_VALUE) {
lowWatermark = minValue;
} else {
lowWatermark = NaN;
}
return { ...node, lowWatermark };
return this.metricService.getWatermarks(this.jobId, node.id).pipe(
map(result => {
return { ...node, lowWatermark: result.lowWatermark };
})
);
})
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ export class JobOverviewDrawerWatermarksComponent implements OnInit, OnDestroy {
this.jobService.jobWithVertex$
.pipe(
takeUntil(this.destroy$),
flatMap(data => this.metricsService.getWatermarks(data.job.jid, data.vertex!.id, data.vertex!.parallelism))
flatMap(data => this.metricsService.getWatermarks(data.job.jid, data.vertex!.id))
)
.subscribe(
data => {
Expand Down
53 changes: 27 additions & 26 deletions flink-runtime-web/web-dashboard/src/app/services/metrics.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -79,36 +79,37 @@ export class MetricsService {
}

/**
* Get watermarks data
* Gets the watermarks for a given vertex id.
* @param jobId
* @param vertexId
* @param parallelism
*/
getWatermarks(jobId: string, vertexId: string, parallelism: number) {
const listOfMetricName = new Array(parallelism).fill(0).map((_, index) => `${index}.currentInputWatermark`);
return this.getMetrics(jobId, vertexId, listOfMetricName).pipe(
map(metrics => {
let minValue = NaN;
let lowWatermark = NaN;
const watermarks: { [id: string]: number } = {};
const ref = metrics.values;
for (const key in ref) {
const value = ref[key];
const subTaskIndex = key.replace('.currentInputWatermark', '');
watermarks[subTaskIndex] = value;
if (isNaN(minValue) || value < minValue) {
minValue = value;
getWatermarks(jobId: string, vertexId: string) {
return this.httpClient
.get<Array<{ id: string; value: string }>>(
`${BASE_URL}/jobs/${jobId}/vertices/${vertexId}/watermarks`
)
.pipe(
map(arr => {
let minValue = NaN;
let lowWatermark = NaN;
const watermarks: { [id: string]: number } = {};
arr.forEach(item => {
const value = parseInt(item.value, 10);
const subTaskIndex = item.id.replace('.currentInputWatermark', '');
watermarks[subTaskIndex] = value;
if (isNaN(minValue) || value < minValue) {
minValue = value;
}
});
if (!isNaN(minValue) && minValue > LONG_MIN_VALUE) {
lowWatermark = minValue;
} else {
lowWatermark = NaN;
}
}
if (!isNaN(minValue) && minValue > LONG_MIN_VALUE) {
lowWatermark = minValue;
} else {
lowWatermark = NaN;
}
return {
lowWatermark,
watermarks
};
return {
lowWatermark,
watermarks
};
})
);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.runtime.rest.handler.job.metrics;

import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex;
import org.apache.flink.runtime.executiongraph.AccessExecutionVertex;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.metrics.MetricNames;
import org.apache.flink.runtime.rest.handler.HandlerRequest;
import org.apache.flink.runtime.rest.handler.RestHandlerException;
import org.apache.flink.runtime.rest.handler.job.AbstractJobVertexHandler;
import org.apache.flink.runtime.rest.handler.legacy.ExecutionGraphCache;
import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcher;
import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricStore;
import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
import org.apache.flink.runtime.rest.messages.JobIDPathParameter;
import org.apache.flink.runtime.rest.messages.JobVertexMessageParameters;
import org.apache.flink.runtime.rest.messages.job.metrics.JobVertexWatermarksHeaders;
import org.apache.flink.runtime.rest.messages.job.metrics.Metric;
import org.apache.flink.runtime.rest.messages.job.metrics.MetricCollectionResponseBody;
import org.apache.flink.runtime.webmonitor.RestfulGateway;
import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Executor;


/**
* Handler that returns the watermarks given a {@link JobID} and {@link JobVertexID}.
*/
public class JobVertexWatermarksHandler extends AbstractJobVertexHandler<MetricCollectionResponseBody, JobVertexMessageParameters> {

private final MetricFetcher metricFetcher;

public JobVertexWatermarksHandler(
GatewayRetriever<? extends RestfulGateway> leaderRetriever,
Time timeout,
Map<String, String> responseHeaders,
MetricFetcher metricFetcher,
ExecutionGraphCache executionGraphCache,
Executor executor) {
super(leaderRetriever,
timeout,
responseHeaders,
JobVertexWatermarksHeaders.INSTANCE,
executionGraphCache,
executor);
this.metricFetcher = metricFetcher;
}

@Override
protected MetricCollectionResponseBody handleRequest(
HandlerRequest<EmptyRequestBody, JobVertexMessageParameters> request,
AccessExecutionJobVertex jobVertex) throws RestHandlerException {

String jobID = request.getPathParameter(JobIDPathParameter.class).toString();
String taskID = jobVertex.getJobVertexId().toString();

metricFetcher.update();
MetricStore.TaskMetricStore taskMetricStore = metricFetcher.getMetricStore().getTaskMetricStore(jobID, taskID);
if (taskMetricStore == null) {
return new MetricCollectionResponseBody(Collections.emptyList());
}

AccessExecutionVertex[] taskVertices = jobVertex.getTaskVertices();
List<Metric> metrics = new ArrayList<>(taskVertices.length);

for (AccessExecutionVertex taskVertex : taskVertices) {
String id = taskVertex.getParallelSubtaskIndex() + "." + MetricNames.IO_CURRENT_INPUT_WATERMARK;
String watermarkValue = taskMetricStore.getMetric(id);
if (watermarkValue != null) {
metrics.add(new Metric(id, watermarkValue));
}
}

return new MetricCollectionResponseBody(metrics);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.runtime.rest.messages.job.metrics;

import org.apache.flink.runtime.rest.HttpMethodWrapper;
import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
import org.apache.flink.runtime.rest.messages.JobIDPathParameter;
import org.apache.flink.runtime.rest.messages.JobVertexIdPathParameter;
import org.apache.flink.runtime.rest.messages.JobVertexMessageParameters;
import org.apache.flink.runtime.rest.messages.MessageHeaders;

import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;

/**
* {@link MessageHeaders} for retrieving watermarks.
*/
public final class JobVertexWatermarksHeaders implements MessageHeaders<EmptyRequestBody, MetricCollectionResponseBody, JobVertexMessageParameters> {

public static final JobVertexWatermarksHeaders INSTANCE = new JobVertexWatermarksHeaders();

private JobVertexWatermarksHeaders() {
}

@Override
public HttpMethodWrapper getHttpMethod() {
return HttpMethodWrapper.GET;
}

@Override
public String getTargetRestEndpointURL() {
return "/jobs/:" + JobIDPathParameter.KEY + "/vertices/:" + JobVertexIdPathParameter.KEY + "/watermarks";
}

@Override
public String getDescription() {
return "Returns the watermarks for all subtasks of a task.";
}

@Override
public Class<MetricCollectionResponseBody> getResponseClass() {
return MetricCollectionResponseBody.class;
}

@Override
public HttpResponseStatus getResponseStatusCode() {
return HttpResponseStatus.OK;
}

@Override
public Class<EmptyRequestBody> getRequestClass() {
return EmptyRequestBody.class;
}

@Override
public JobVertexMessageParameters getUnresolvedMessageParameters() {
return new JobVertexMessageParameters();
}
}
Loading

0 comments on commit c1d0e99

Please sign in to comment.