Skip to content

Commit

Permalink
[FLINK-24928][ui] Remove deepFind
Browse files Browse the repository at this point in the history
deepFind was inherently type-unsafe, and actually not really needed in
most cases as we can use selector functions instead.

This actually surfaced several type mismatches were paths given to
deepFind didn't exist on the interface.
  • Loading branch information
Airblader authored and AHeise committed Nov 22, 2021
1 parent 85f9294 commit 15b217e
Show file tree
Hide file tree
Showing 12 changed files with 189 additions and 197 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,32 @@ export interface CheckPointDetailInterface {
}>;
}

export interface CompletedSubTaskCheckPointStatistics {
ack_timestamp: number;
end_to_end_duration: number;
state_size: number;
checkpoint: {
sync: number;
async: number;
};
alignment: {
buffer: number;
processed: number;
persisted: number;
duration: number;
};
start_delay: number;
unaligned_checkpoint: boolean;
aborted: boolean;
}

export interface PendingSubTaskCheckPointStatistics {}

export type SubTaskCheckPointStatisticsItem = {
index: number;
status: string;
} & (CompletedSubTaskCheckPointStatistics | PendingSubTaskCheckPointStatistics);

export interface CheckPointSubTaskInterface {
id: number;
status: string;
Expand All @@ -181,8 +207,5 @@ export interface CheckPointSubTaskInterface {
};
start_delay: CheckPointMinMaxAvgStatisticsInterface;
};
subtasks: Array<{
index: number;
status: string;
}>;
subtasks: SubTaskCheckPointStatisticsItem[];
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,24 @@ import { first } from 'rxjs/operators';

import { NzTableSortFn } from 'ng-zorro-antd/table/src/table.types';

import { CheckPointSubTaskInterface, JobDetailCorrectInterface, VerticesItemInterface } from 'interfaces';
import {
CheckPointSubTaskInterface,
CompletedSubTaskCheckPointStatistics,
JobDetailCorrectInterface,
SubTaskCheckPointStatisticsItem,
VerticesItemInterface
} from 'interfaces';
import { JobService } from 'services';
import { deepFind } from 'utils';

function createSortFn(
selector: (item: CompletedSubTaskCheckPointStatistics) => number | boolean
): NzTableSortFn<SubTaskCheckPointStatisticsItem> {
// FIXME This type-asserts that pre / next are a specific subtype.
return (pre, next) =>
selector(pre as CompletedSubTaskCheckPointStatistics) > selector(next as CompletedSubTaskCheckPointStatistics)
? 1
: -1;
}

@Component({
selector: 'flink-job-checkpoints-subtask',
Expand All @@ -40,31 +55,42 @@ import { deepFind } from 'utils';
changeDetection: ChangeDetectionStrategy.OnPush
})
export class JobCheckpointsSubtaskComponent implements OnInit, OnChanges {
@Input() vertex: VerticesItemInterface;
@Input() checkPointId: number;
jobDetail: JobDetailCorrectInterface;
subTaskCheckPoint: CheckPointSubTaskInterface;
listOfSubTaskCheckPoint: Array<{ index: number; status: string }> = [];
isLoading = true;
sortName: string;
sortValue: string;
@Input() public vertex: VerticesItemInterface;
@Input() public checkPointId: number;

public jobDetail: JobDetailCorrectInterface;
public subTaskCheckPoint: CheckPointSubTaskInterface;
public listOfSubTaskCheckPoint: SubTaskCheckPointStatisticsItem[] = [];
public isLoading = true;
public sortName: string;
public sortValue: string;

sortAckTimestampFn = this.sortFn('ack_timestamp');
sortEndToEndDurationFn = this.sortFn('end_to_end_duration');
sortStateSizeFn = this.sortFn('state_size');
sortCpSyncFn = this.sortFn('checkpoint.sync');
sortCpAsyncFn = this.sortFn('checkpoint.async');
sortAlignmentProcessedFn = this.sortFn('alignment.processed');
sortAlignmentDurationFn = this.sortFn('alignment.duration');
sortStartDelayFn = this.sortFn('start_delay');
sortUnalignedCpFn = this.sortFn('unaligned_checkpoint');
public readonly sortAckTimestampFn = createSortFn(item => item.ack_timestamp);
public readonly sortEndToEndDurationFn = createSortFn(item => item.end_to_end_duration);
public readonly sortStateSizeFn = createSortFn(item => item.state_size);
public readonly sortCpSyncFn = createSortFn(item => item.checkpoint?.sync);
public readonly sortCpAsyncFn = createSortFn(item => item.checkpoint?.async);
public readonly sortAlignmentProcessedFn = createSortFn(item => item.alignment?.processed);
public readonly sortAlignmentDurationFn = createSortFn(item => item.alignment?.duration);
public readonly sortStartDelayFn = createSortFn(item => item.start_delay);
public readonly sortUnalignedCpFn = createSortFn(item => item.unaligned_checkpoint);

sortFn(path: string): NzTableSortFn<{ index: number; status: string }> {
return (pre: { index: number; status: string }, next: { index: number; status: string }) =>
deepFind(pre, path) > deepFind(next, path) ? 1 : -1;
constructor(private readonly jobService: JobService, private readonly cdr: ChangeDetectorRef) {}

public ngOnInit(): void {
this.jobService.jobDetail$.pipe(first()).subscribe(job => {
this.jobDetail = job;
this.refresh();
});
}

public ngOnChanges(changes: SimpleChanges): void {
if (changes.checkPointId) {
this.refresh();
}
}

refresh(): void {
public refresh(): void {
if (this.jobDetail && this.jobDetail.jid) {
this.jobService.loadCheckpointSubtaskDetails(this.jobDetail.jid, this.checkPointId, this.vertex.id).subscribe(
data => {
Expand All @@ -80,19 +106,4 @@ export class JobCheckpointsSubtaskComponent implements OnInit, OnChanges {
);
}
}

constructor(private jobService: JobService, private cdr: ChangeDetectorRef) {}

ngOnInit(): void {
this.jobService.jobDetail$.pipe(first()).subscribe(job => {
this.jobDetail = job;
this.refresh();
});
}

ngOnChanges(changes: SimpleChanges): void {
if (changes.checkPointId) {
this.refresh();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@
</thead>
<tbody>
<tr
*ngFor="let node of table.data; trackBy: trackJobBy"
*ngFor="let node of table.data; trackBy: trackById"
class="clickable"
(click)="clickNode(node)"
[class.selected]="selectedNode?.id === node.id"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,17 @@
* limitations under the License.
*/

import { Component, EventEmitter, Input, Output, ChangeDetectionStrategy, ElementRef } from '@angular/core';
import { ChangeDetectionStrategy, Component, ElementRef, EventEmitter, Input, Output } from '@angular/core';

import { NzTableSortFn } from 'ng-zorro-antd/table/src/table.types';

import { NodesItemCorrectInterface } from 'interfaces';
import { deepFind } from 'utils';

function createSortFn(
selector: (item: NodesItemCorrectInterface) => number | string | undefined
): NzTableSortFn<NodesItemCorrectInterface> {
return (pre, next) => (selector(pre)! > selector(next)! ? 1 : -1);
}

@Component({
selector: 'flink-job-overview-list',
Expand All @@ -30,44 +35,39 @@ import { deepFind } from 'utils';
changeDetection: ChangeDetectionStrategy.OnPush
})
export class JobOverviewListComponent {
innerNodes: NodesItemCorrectInterface[] = [];
sortName: string;
sortValue: string;
left = 390;
@Output() nodeClick = new EventEmitter();
@Input() selectedNode: NodesItemCorrectInterface;
public readonly trackById = (_: number, node: NodesItemCorrectInterface): string => node.id;

public readonly sortStatusFn = createSortFn(item => item.detail?.status);
public readonly sortReadBytesFn = createSortFn(item => item.detail?.metrics?.['read-bytes']);
public readonly sortReadRecordsFn = createSortFn(item => item.detail?.metrics?.['read-records']);
public readonly sortWriteBytesFn = createSortFn(item => item.detail?.metrics?.['write-bytes']);
public readonly sortWriteRecordsFn = createSortFn(item => item.detail?.metrics?.['write-records']);
public readonly sortParallelismFn = createSortFn(item => item.parallelism);
public readonly sortStartTimeFn = createSortFn(item => item.detail?.['start-time']);
public readonly sortDurationFn = createSortFn(item => item.detail?.duration);
public readonly sortEndTimeFn = createSortFn(item => item.detail?.['end-time']);

public innerNodes: NodesItemCorrectInterface[] = [];
public sortName: string;
public sortValue: string;
public left = 390;

@Output() public readonly nodeClick = new EventEmitter<NodesItemCorrectInterface>();

@Input() public selectedNode: NodesItemCorrectInterface;

@Input()
set nodes(value: NodesItemCorrectInterface[]) {
public set nodes(value: NodesItemCorrectInterface[]) {
this.innerNodes = value;
}

get nodes(): NodesItemCorrectInterface[] {
public get nodes(): NodesItemCorrectInterface[] {
return this.innerNodes;
}

sortStatusFn = this.sortFn('detail.status');
sortReadBytesFn = this.sortFn('detail.metrics.read-bytes');
sortReadRecordsFn = this.sortFn('detail.metrics.read-records');
sortWriteBytesFn = this.sortFn('detail.metrics.write-bytes');
sortWriteRecordsFn = this.sortFn('detail.metrics.write-records');
sortParallelismFn = this.sortFn('parallelism');
sortStartTimeFn = this.sortFn('detail.start-time');
sortDurationFn = this.sortFn('detail.duration');
sortEndTimeFn = this.sortFn('detail.end-time');
constructor(public readonly elementRef: ElementRef) {}

sortFn(path: string): NzTableSortFn<NodesItemCorrectInterface> {
return (pre: NodesItemCorrectInterface, next: NodesItemCorrectInterface) =>
deepFind(pre, path) > deepFind(next, path) ? 1 : -1;
}

trackJobBy(_: number, node: NodesItemCorrectInterface): string {
return node.id;
}

clickNode(node: NodesItemCorrectInterface): void {
public clickNode(node: NodesItemCorrectInterface): void {
this.nodeClick.emit(node);
}

constructor(public elementRef: ElementRef) {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@
</tr>
</thead>
<tbody>
<tr *ngFor="let task of table.data; trackBy: trackTaskBy">
<tr *ngFor="let task of table.data; trackBy: trackBySubtask">
<td nzLeft="0">
{{ task.subtask }}
</td>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,18 @@
* limitations under the License.
*/

import { Component, OnInit, ChangeDetectionStrategy, OnDestroy, ChangeDetectorRef } from '@angular/core';
import { ChangeDetectionStrategy, ChangeDetectorRef, Component, OnDestroy, OnInit } from '@angular/core';
import { Subject } from 'rxjs';
import { flatMap, takeUntil } from 'rxjs/operators';
import { mergeMap, takeUntil } from 'rxjs/operators';

import { NzTableSortFn } from 'ng-zorro-antd/table/src/table.types';

import { JobSubTaskInterface } from 'interfaces';
import { JobService } from 'services';
import { deepFind } from 'utils';

function createSortFn(selector: (item: JobSubTaskInterface) => number | string): NzTableSortFn<JobSubTaskInterface> {
return (pre, next) => (selector(pre) > selector(next) ? 1 : -1);
}

@Component({
selector: 'flink-job-overview-drawer-subtasks',
Expand All @@ -33,39 +36,33 @@ import { deepFind } from 'utils';
changeDetection: ChangeDetectionStrategy.OnPush
})
export class JobOverviewDrawerSubtasksComponent implements OnInit, OnDestroy {
listOfTask: JobSubTaskInterface[] = [];
destroy$ = new Subject();
sortName: string;
sortValue: string;
isLoading = true;
public readonly trackBySubtask = (_: number, node: JobSubTaskInterface): number => node.subtask;

sortReadBytesFn = this.sortFn('metrics.read-bytes');
sortReadRecordsFn = this.sortFn('metrics.read-records');
sortWriteBytesFn = this.sortFn('metrics.write-bytes');
sortWriteRecordsFn = this.sortFn('metrics.write-records');
sortAttemptFn = this.sortFn('attempt');
sortHostFn = this.sortFn('host');
sortStartTimeFn = this.sortFn('detail.start-time');
sortDurationFn = this.sortFn('detail.duration');
sortEndTimeFn = this.sortFn('detail.end-time');
sortStatusFn = this.sortFn('status');
public readonly sortReadBytesFn = createSortFn(item => item.metrics?.['read-bytes']);
public readonly sortReadRecordsFn = createSortFn(item => item.metrics?.['read-records']);
public readonly sortWriteBytesFn = createSortFn(item => item.metrics?.['write-bytes']);
public readonly sortWriteRecordsFn = createSortFn(item => item.metrics?.['write-records']);
public readonly sortAttemptFn = createSortFn(item => item.attempt);
public readonly sortHostFn = createSortFn(item => item.host);
public readonly sortStartTimeFn = createSortFn(item => item['start_time']);
public readonly sortDurationFn = createSortFn(item => item.duration);
public readonly sortEndTimeFn = createSortFn(item => item['end-time']);
public readonly sortStatusFn = createSortFn(item => item.status);

sortFn(path: string): NzTableSortFn<JobSubTaskInterface> {
return (pre: JobSubTaskInterface, next: JobSubTaskInterface) =>
deepFind(pre, path) > deepFind(next, path) ? 1 : -1;
}
public listOfTask: JobSubTaskInterface[] = [];
public sortName: string;
public sortValue: string;
public isLoading = true;

trackTaskBy(_: number, node: JobSubTaskInterface): number {
return node.subtask;
}
private readonly destroy$ = new Subject<void>();

constructor(private jobService: JobService, private cdr: ChangeDetectorRef) {}
constructor(private readonly jobService: JobService, private readonly cdr: ChangeDetectorRef) {}

ngOnInit(): void {
public ngOnInit(): void {
this.jobService.jobWithVertex$
.pipe(
takeUntil(this.destroy$),
flatMap(data => this.jobService.loadSubTasks(data.job.jid, data.vertex!.id))
mergeMap(data => this.jobService.loadSubTasks(data.job.jid, data.vertex!.id))
)
.subscribe(
data => {
Expand All @@ -80,7 +77,7 @@ export class JobOverviewDrawerSubtasksComponent implements OnInit, OnDestroy {
);
}

ngOnDestroy(): void {
public ngOnDestroy(): void {
this.destroy$.next();
this.destroy$.complete();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@
</tr>
</thead>
<tbody>
<tr *ngFor="let taskManager of table.data; trackBy: trackTaskManagerBy">
<tr *ngFor="let taskManager of table.data; trackBy: trackByHost">
<td nzLeft="0px">{{ taskManager.host }}</td>
<td nzLeft="160px">
<span
Expand Down
Loading

0 comments on commit 15b217e

Please sign in to comment.