Skip to content

Commit

Permalink
[FLINK-31471] Allow setting JobResourceRequirements through WEB UI.
Browse files Browse the repository at this point in the history
  • Loading branch information
dmvk committed Apr 14, 2023
1 parent a81ffa6 commit 40d2cfb
Show file tree
Hide file tree
Showing 21 changed files with 375 additions and 27 deletions.
3 changes: 3 additions & 0 deletions docs/layouts/shortcodes/generated/rest_v1_dispatcher.html
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,9 @@
"web-history" : {
"type" : "boolean"
},
"web-rescale" : {
"type" : "boolean"
},
"web-submit" : {
"type" : "boolean"
}
Expand Down
6 changes: 6 additions & 0 deletions docs/layouts/shortcodes/generated/web_configuration.html
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,12 @@
<td>Long</td>
<td>Refresh interval for the web-frontend in milliseconds.</td>
</tr>
<tr>
<td><h5>web.rescale.enable</h5></td>
<td style="word-wrap: break-word;">true</td>
<td>Boolean</td>
<td>Flag indicating whether jobs can be rescaled from the web-frontend.</td>
</tr>
<tr>
<td><h5>web.submit.enable</h5></td>
<td style="word-wrap: break-word;">true</td>
Expand Down
2 changes: 2 additions & 0 deletions docs/static/generated/rest_v1_dispatcher.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2090,6 +2090,8 @@ components:
type: boolean
web-history:
type: boolean
web-rescale:
type: boolean
web-submit:
type: boolean
GarbageCollectorInfo:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,7 @@ public static JobManagerOptions.SchedulerType getSchedulerType(Configuration con
}
}

private static boolean isReactiveModeEnabled(Configuration configuration) {
public static boolean isReactiveModeEnabled(Configuration configuration) {
return configuration.get(JobManagerOptions.SCHEDULER_MODE)
== SchedulerExecutionMode.REACTIVE;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,14 +133,22 @@ public class WebOptions {
.withDescription(
"Flag indicating whether jobs can be uploaded and run from the web-frontend.");

/** Config parameter indicating whether jobs can be cancel from the web-frontend. */
/** Config parameter indicating whether jobs can be canceled from the web-frontend. */
public static final ConfigOption<Boolean> CANCEL_ENABLE =
key("web.cancel.enable")
.booleanType()
.defaultValue(true)
.withDescription(
"Flag indicating whether jobs can be canceled from the web-frontend.");

/** Config parameter indicating whether jobs can be rescaled from the web-frontend. */
public static final ConfigOption<Boolean> RESCALE_ENABLE =
key("web.rescale.enable")
.booleanType()
.defaultValue(true)
.withDescription(
"Flag indicating whether jobs can be rescaled from the web-frontend.");

/** Config parameter defining the number of checkpoints to remember for recent history. */
public static final ConfigOption<Integer> CHECKPOINTS_HISTORY_SIZE =
key("web.checkpoints.history")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -379,6 +379,7 @@ private void createDashboardConfigFile() throws IOException {
ZonedDateTime.now(),
false,
false,
false,
true)));
fw.flush();
} catch (IOException ioe) {
Expand Down
3 changes: 3 additions & 0 deletions flink-runtime-web/src/test/resources/rest_api_v1.snapshot
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,9 @@
"web-cancel" : {
"type" : "boolean"
},
"web-rescale" : {
"type" : "boolean"
},
"web-history" : {
"type" : "boolean"
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,5 +42,6 @@ export interface Configuration {
'web-history': boolean;
'web-submit': boolean;
'web-cancel': boolean;
'web-rescale': boolean;
};
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

export interface JobResourceRequirements {
[key: string]: JobVertexResourceRequirements;
}

export interface JobVertexResourceRequirements {
parallelism: {
lowerBound: number;
upperBound: number;
};
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
<ng-container *ngIf="nodes.length > 0">
<flink-job-overview-list
(nodeClick)="onNodeClick($event)"
(rescale)="onRescale($event)"
[nodes]="nodes"
[selectedNode]="selectedNode"
></flink-job-overview-list>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,9 @@ import { DagreComponent } from '@flink-runtime-web/components/dagre/dagre.compon
import { ResizeComponent } from '@flink-runtime-web/components/resize/resize.component';
import { NodesItemCorrect, NodesItemLink } from '@flink-runtime-web/interfaces';
import { JobOverviewListComponent } from '@flink-runtime-web/pages/job/overview/list/job-overview-list.component';
import { MetricsService } from '@flink-runtime-web/services';
import { JobService, MetricsService } from '@flink-runtime-web/services';
import { NzAlertModule } from 'ng-zorro-antd/alert';
import { NzNotificationService } from 'ng-zorro-antd/notification';

import { JobLocalService } from '../job-local.service';

Expand Down Expand Up @@ -65,6 +66,8 @@ export class JobOverviewComponent implements OnInit, OnDestroy {
public readonly elementRef: ElementRef,
private readonly metricService: MetricsService,
private readonly jobLocalService: JobLocalService,
private readonly jobService: JobService,
private readonly notificationService: NzNotificationService,
private readonly cdr: ChangeDetectorRef
) {}

Expand Down Expand Up @@ -115,6 +118,15 @@ export class JobOverviewComponent implements OnInit, OnDestroy {
}
}

public onRescale(desiredParallelism: Map<string, number>): void {
this.jobService.changeDesiredParallelism(this.jobId, desiredParallelism).subscribe(() => {
this.notificationService.success(
'Rescaling operation.',
'Job resources requirements have been updated. Job will now try to rescale.'
);
});
}

public onResizeEnd(): void {
if (!this.selectedNode) {
this.dagreComponent.moveToCenter();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,12 @@
<th [nzSortFn]="sortReadRecordsFn" nzWidth="150px">Records Received</th>
<th [nzSortFn]="sortWriteBytesFn" nzWidth="150px">Bytes Sent</th>
<th [nzSortFn]="sortWriteRecordsFn" nzWidth="120px">Records Sent</th>
<th [nzSortFn]="sortParallelismFn" nzWidth="120px">Parallelism</th>
<th [nzSortFn]="sortParallelismFn" nzWidth="100px">Parallelism</th>
<th [nzSortFn]="sortStartTimeFn" nzWidth="150px">Start Time</th>
<th [nzSortFn]="sortDurationFn" nzWidth="150px">Duration</th>
<th [nzSortFn]="sortEndTimeFn" nzWidth="150px">End Time</th>
<th nzWidth="100px" nzRight>Tasks</th>
<th nzWidth="60px" nzRight>Tasks</th>
<th *ngIf="webRescaleEnabled" nzWidth="80px" nzRight>Scale</th>
</tr>
</thead>
<tbody>
Expand Down Expand Up @@ -95,13 +96,43 @@
{{ node.detail.metrics['write-records'] | number: '1.0-0' }}
</span>
</td>
<td>{{ node.parallelism }}</td>
<td>
<span>
{{ node.parallelism }}
</span>
<span *ngIf="desiredParallelism.has(node.id)">
<b>
<i nz-icon nzType="sync"></i>
{{ desiredParallelism.get(node.id) }}
</b>
</span>
</td>
<td>{{ node.detail['start-time'] | humanizeDate: 'yyyy-MM-dd HH:mm:ss' }}</td>
<td>{{ node.detail?.duration | humanizeDuration }}</td>
<td>{{ node.detail['end-time'] | humanizeDate: 'yyyy-MM-dd HH:mm:ss' }}</td>
<td nzRight [class.selected]="selectedNode?.id === node.id">
<flink-task-badge [tasks]="node.detail?.tasks"></flink-task-badge>
</td>
<td *ngIf="webRescaleEnabled" nzRight>
<nz-button-group>
<button
nz-button
nzSize="small"
nzType="default"
(click)="clickScaleUp(node); $event.stopPropagation()"
>
<span nz-icon nzType="plus"></span>
</button>
<button
nz-button
nzSize="small"
nzType="default"
(click)="clickScaleDown(node); $event.stopPropagation()"
>
<span nz-icon nzType="minus"></span>
</button>
</nz-button-group>
</td>
</tr>
</tbody>
</nz-table>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,10 @@ import { JobBadgeComponent } from '@flink-runtime-web/components/job-badge/job-b
import { ResizeComponent } from '@flink-runtime-web/components/resize/resize.component';
import { TaskBadgeComponent } from '@flink-runtime-web/components/task-badge/task-badge.component';
import { NodesItemCorrect } from '@flink-runtime-web/interfaces';
import { StatusService } from '@flink-runtime-web/services';
import { NzBadgeModule } from 'ng-zorro-antd/badge';
import { NzButtonModule } from 'ng-zorro-antd/button';
import { NzIconModule } from 'ng-zorro-antd/icon';
import { NzTableModule } from 'ng-zorro-antd/table';
import { NzTableSortFn } from 'ng-zorro-antd/table/src/table.types';
import { NzToolTipModule } from 'ng-zorro-antd/tooltip';
Expand All @@ -36,6 +40,8 @@ function createSortFn(
return (pre, next) => (selector(pre)! > selector(next)! ? 1 : -1);
}

const rescaleTimeout = 2500;

@Component({
selector: 'flink-job-overview-list',
templateUrl: './job-overview-list.component.html',
Expand All @@ -52,12 +58,16 @@ function createSortFn(
HumanizeDatePipe,
HumanizeDurationPipe,
TaskBadgeComponent,
ResizeComponent
ResizeComponent,
NzButtonModule,
NzIconModule,
NzBadgeModule
],
standalone: true
})
export class JobOverviewListComponent {
public readonly trackById = (_: number, node: NodesItemCorrect): string => node.id;
public readonly webRescaleEnabled = this.statusService.configuration.features['web-rescale'];

public readonly sortStatusFn = createSortFn(item => item.detail?.status);
public readonly sortReadBytesFn = createSortFn(item => item.detail?.metrics?.['read-bytes']);
Expand All @@ -70,26 +80,69 @@ export class JobOverviewListComponent {
public readonly sortEndTimeFn = createSortFn(item => item.detail?.['end-time']);

public innerNodes: NodesItemCorrect[] = [];
public sortName: string;
public sortValue: string;
public left = 390;

public desiredParallelism = new Map<string, number>();

public rescaleTimeoutId: number | undefined;

@Output() public readonly nodeClick = new EventEmitter<NodesItemCorrect>();

@Output() public readonly rescale = new EventEmitter<Map<string, number>>();

@Input() public selectedNode: NodesItemCorrect;

@Input()
public set nodes(value: NodesItemCorrect[]) {
this.innerNodes = value;
for (const node of value) {
if (node.parallelism == this.desiredParallelism.get(node.id)) {
this.desiredParallelism.delete(node.id);
}
}
}

public get nodes(): NodesItemCorrect[] {
return this.innerNodes;
}

constructor(public readonly elementRef: ElementRef) {}
constructor(public readonly elementRef: ElementRef, private readonly statusService: StatusService) {}

public clickNode(node: NodesItemCorrect): void {
this.nodeClick.emit(node);
}

public clickScaleUp(node: NodesItemCorrect): void {
let currentDesiredParallelism = this.desiredParallelism.get(node.id);
if (currentDesiredParallelism == undefined) {
currentDesiredParallelism = node.parallelism;
}
const newDesiredParallelism = currentDesiredParallelism + 1;
this.changeDesiredParallelism(node, newDesiredParallelism);
}

public clickScaleDown(node: NodesItemCorrect): void {
let currentDesiredParallelism = this.desiredParallelism.get(node.id);
if (currentDesiredParallelism == undefined) {
currentDesiredParallelism = node.parallelism;
}
const newDesiredParallelism = Math.max(1, currentDesiredParallelism - 1);
this.changeDesiredParallelism(node, newDesiredParallelism);
}

private changeDesiredParallelism(node: NodesItemCorrect, newDesiredParallelism: number): void {
if (newDesiredParallelism == node.parallelism) {
this.desiredParallelism.delete(node.id);
} else {
this.desiredParallelism.set(node.id, newDesiredParallelism);
}
if (this.rescaleTimeoutId != undefined) {
window.clearTimeout(this.rescaleTimeoutId);
}
this.rescaleTimeoutId = window.setTimeout(() => {
if (this.desiredParallelism.size > 0) {
this.rescale.emit(this.desiredParallelism);
}
}, rescaleTimeout);
}
}
Loading

0 comments on commit 40d2cfb

Please sign in to comment.