Skip to content

Commit

Permalink
Update Progress bar to use task/summarize state-api endpoint. (ray-pr…
Browse files Browse the repository at this point in the history
…oject#31577)

This removes our dependency on prometheus for the progress bar but there is now a 10000 task limit per job. Updates the task summarize endpoint to accept job id as a filter

Frequency of progress bar updates has increased greatly because previously, prometheus scrapes every 15 seconds.
Otherwise, the UI is unchanged:
  • Loading branch information
alanwguo authored Jan 17, 2023
1 parent 22a8ab6 commit ef78f2c
Show file tree
Hide file tree
Showing 10 changed files with 323 additions and 61 deletions.
128 changes: 89 additions & 39 deletions dashboard/client/src/pages/job/hook/useJobProgress.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,51 @@
import _ from "lodash";
import { useRef, useState } from "react";
import { useState } from "react";
import useSWR from "swr";
import { API_REFRESH_INTERVAL_MS } from "../../../common/constants";
import { getJobProgress, getJobProgressByTaskName } from "../../../service/job";
import { getStateApiJobProgressByTaskName } from "../../../service/job";
import { StateApiJobProgressByTaskName, TaskProgress } from "../../../type/job";
import { TypeTaskStatus } from "../../../type/task";

const TASK_STATE_NAME_TO_PROGRESS_KEY: Record<
TypeTaskStatus,
keyof TaskProgress
> = {
[TypeTaskStatus.PENDING_ARGS_AVAIL]: "numPendingArgsAvail",
[TypeTaskStatus.PENDING_NODE_ASSIGNMENT]: "numPendingNodeAssignment",
[TypeTaskStatus.PENDING_OBJ_STORE_MEM_AVAIL]: "numPendingNodeAssignment",
[TypeTaskStatus.PENDING_ARGS_FETCH]: "numPendingNodeAssignment",
[TypeTaskStatus.SUBMITTED_TO_WORKER]: "numSubmittedToWorker",
[TypeTaskStatus.RUNNING]: "numRunning",
[TypeTaskStatus.RUNNING_IN_RAY_GET]: "numRunning",
[TypeTaskStatus.RUNNING_IN_RAY_WAIT]: "numRunning",
[TypeTaskStatus.FINISHED]: "numFinished",
[TypeTaskStatus.FAILED]: "numFailed",
[TypeTaskStatus.NIL]: "numUnknown",
};

const useFetchStateApiProgressByTaskName = (
jobId: string | undefined,
isRefreshing: boolean,
setMsg: (msg: string) => void,
setError: (error: boolean) => void,
setRefresh: (refresh: boolean) => void,
) => {
return useSWR(
jobId ? ["useJobProgressByTaskName", jobId] : null,
async (_, jobId) => {
const rsp = await getStateApiJobProgressByTaskName(jobId);
setMsg(rsp.data.msg);

if (rsp.data.result) {
return formatSummaryToTaskProgress(rsp.data.data.result.result);
} else {
setError(true);
setRefresh(false);
}
},
{ refreshInterval: isRefreshing ? API_REFRESH_INTERVAL_MS : 0 },
);
};

/**
* Hook for fetching a job's task progress.
Expand All @@ -16,30 +59,28 @@ export const useJobProgress = (jobId?: string) => {
const [msg, setMsg] = useState("Loading progress...");
const [error, setError] = useState(false);
const [isRefreshing, setRefresh] = useState(true);
const refreshRef = useRef(isRefreshing);
const onSwitchChange = (event: React.ChangeEvent<HTMLInputElement>) => {
setRefresh(event.target.checked);
};
refreshRef.current = isRefreshing;
const { data: progress } = useSWR(
jobId ? ["useJobProgress", jobId] : null,
async (_, jobId) => {
const rsp = await getJobProgress(jobId);

setMsg(rsp.data.msg);
if (rsp.data.result) {
return rsp.data.data.detail;
} else {
setError(true);
setRefresh(false);
}
},
{ refreshInterval: isRefreshing ? API_REFRESH_INTERVAL_MS : 0 },
const { data: tasks } = useFetchStateApiProgressByTaskName(
jobId,
isRefreshing,
setMsg,
setError,
setRefresh,
);

const summed = (tasks ?? []).reduce((acc, task) => {
Object.entries(task.progress).forEach(([k, count]) => {
const key = k as keyof TaskProgress;
acc[key] = (acc[key] ?? 0) + count;
});
return acc;
}, {} as TaskProgress);

const driverExists = !jobId ? false : true;
return {
progress,
progress: summed,
msg,
error,
isRefreshing,
Expand All @@ -61,30 +102,19 @@ export const useJobProgressByTaskName = (jobId: string) => {
const [msg, setMsg] = useState("Loading progress...");
const [error, setError] = useState(false);
const [isRefreshing, setRefresh] = useState(true);
const refreshRef = useRef(isRefreshing);
const onSwitchChange = (event: React.ChangeEvent<HTMLInputElement>) => {
setRefresh(event.target.checked);
};
refreshRef.current = isRefreshing;

const { data: progress } = useSWR(
["useJobProgressByTaskName", jobId],
async (_, jobId) => {
const rsp = await getJobProgressByTaskName(jobId);
setMsg(rsp.data.msg);

if (rsp.data.result) {
return rsp.data.data.detail;
} else {
setError(true);
setRefresh(false);
}
},
{ refreshInterval: isRefreshing ? API_REFRESH_INTERVAL_MS : 0 },
const { data: tasks } = useFetchStateApiProgressByTaskName(
jobId,
isRefreshing,
setMsg,
setError,
setRefresh,
);

const tasks = progress?.tasks ?? [];
const tasksWithTotals = tasks.map((task) => {
const formattedTasks = (tasks ?? []).map((task) => {
const {
numFailed = 0,
numPendingArgsAvail = 0,
Expand All @@ -103,7 +133,7 @@ export const useJobProgressByTaskName = (jobId: string) => {
return { ...task, numFailed, numActive, numFinished };
});
const sortedTasks = _.orderBy(
tasksWithTotals,
formattedTasks,
["numFailed", "numActive", "numFinished"],
["desc", "desc", "desc"],
);
Expand All @@ -112,11 +142,31 @@ export const useJobProgressByTaskName = (jobId: string) => {
return {
progress: paginatedTasks,
page: { pageNo: page, pageSize: 10 },
total: tasks.length,
total: formattedTasks.length,
setPage,
msg,
error,
isRefreshing,
onSwitchChange,
};
};

export const formatSummaryToTaskProgress = (
summary: StateApiJobProgressByTaskName,
) => {
const tasks = summary.node_id_to_summary.cluster.summary;
const formattedTasks = Object.entries(tasks).map(([name, task]) => {
const formattedProgress: TaskProgress = {};
Object.entries(task.state_counts).forEach(([state, count]) => {
const key: keyof TaskProgress =
TASK_STATE_NAME_TO_PROGRESS_KEY[state as TypeTaskStatus] ??
"numUnknown";

formattedProgress[key] = (formattedProgress[key] ?? 0) + count;
});

return { name, progress: formattedProgress };
});

return formattedTasks;
};
68 changes: 68 additions & 0 deletions dashboard/client/src/pages/job/hook/useJobProgress.unit.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
import { StateApiJobProgressByTaskName, TaskProgress } from "../../../type/job";
import { TypeTaskStatus } from "../../../type/task";
import { formatSummaryToTaskProgress } from "./useJobProgress";

describe("formatSummaryToTaskProgress", () => {
it("formats correctly", () => {
const summary: StateApiJobProgressByTaskName = {
node_id_to_summary: {
cluster: {
summary: {
task_1: {
func_or_class_name: "task_1",
state_counts: {
[TypeTaskStatus.FINISHED]: 1,
[TypeTaskStatus.FAILED]: 2,
[TypeTaskStatus.RUNNING]: 3,
[TypeTaskStatus.RUNNING_IN_RAY_GET]: 4,
[TypeTaskStatus.PENDING_ARGS_AVAIL]: 5,
},
},
task_2: {
func_or_class_name: "task_2",
state_counts: {
[TypeTaskStatus.FINISHED]: 100,
[TypeTaskStatus.NIL]: 5,
},
},
task_3: {
func_or_class_name: "task_3",
state_counts: {
[TypeTaskStatus.RUNNING]: 1,
someNewState: 1,
},
},
},
},
},
};

const expected: { name: string; progress: TaskProgress }[] = [
{
name: "task_1",
progress: {
numFinished: 1,
numFailed: 2,
numRunning: 7,
numPendingArgsAvail: 5,
},
},
{
name: "task_2",
progress: {
numFinished: 100,
numUnknown: 5,
},
},
{
name: "task_3",
progress: {
numRunning: 1,
numUnknown: 1,
},
},
];

expect(formatSummaryToTaskProgress(summary)).toEqual(expected);
});
});
7 changes: 7 additions & 0 deletions dashboard/client/src/service/job.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import {
JobListRsp,
JobProgressByTaskNameRsp,
JobProgressRsp,
StateApiJobProgressByTaskNameRsp,
UnifiedJob,
} from "../type/job";
import { get } from "./requestHandlers";
Expand All @@ -24,3 +25,9 @@ export const getJobProgressByTaskName = (jobId: string) => {
`api/progress_by_task_name?job_id=${jobId}`,
);
};

export const getStateApiJobProgressByTaskName = (jobId: string) => {
return get<StateApiJobProgressByTaskNameRsp>(
`api/v0/tasks/summarize?filter_keys=job_id&filter_predicates=%3D&filter_values=${jobId}`,
);
};
25 changes: 25 additions & 0 deletions dashboard/client/src/type/job.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -109,3 +109,28 @@ export type JobProgressByTaskNameRsp = {
msg: string;
result: boolean;
};

export type StateApiJobProgressByTaskName = {
node_id_to_summary: {
cluster: {
summary: {
[taskName: string]: {
func_or_class_name: string;
state_counts: {
[stateName: string]: number;
};
};
};
};
};
};

export type StateApiJobProgressByTaskNameRsp = {
data: {
result: {
result: StateApiJobProgressByTaskName;
};
};
msg: string;
result: boolean;
};
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ export enum TypeTaskType {

export type Task = {
task_id: string;
name: str;
name: string;
scheduling_state: TypeTaskStatus;
job_id: string;
node_id: string;
Expand Down
2 changes: 2 additions & 0 deletions dashboard/modules/metrics/metrics_head.py
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,7 @@ async def prometheus_health(self, req) -> bool:
success=False, message="prometheus healthcheck failed.", reason=str(e)
)

# TODO(aguo): DEPRECATED: Delete this endpoint
@routes.get("/api/progress")
async def get_progress(self, req):
"""
Expand Down Expand Up @@ -237,6 +238,7 @@ async def get_progress(self, req):
message=str(e),
)

# TODO(aguo): DEPRECATED: Delete this endpoint
@routes.get("/api/progress_by_task_name")
async def get_progress_by_task_name(self, req):
"""
Expand Down
27 changes: 18 additions & 9 deletions dashboard/modules/state/state_head.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import asyncio
import logging
from dataclasses import asdict
from typing import Callable, Optional
from typing import Callable, List, Tuple, Optional

import aiohttp.web
from abc import ABC, abstractmethod
Expand All @@ -22,6 +22,8 @@
RAY_MAX_LIMIT_FROM_API_SERVER,
ListApiOptions,
GetLogOptions,
PredicateType,
SupportedFilterType,
SummaryApiOptions,
SummaryApiResponse,
DEFAULT_RPC_TIMEOUT,
Expand Down Expand Up @@ -161,6 +163,18 @@ async def limit_handler_(self):
result=None,
)

def _get_filters_from_req(
self, req: aiohttp.web.Request
) -> List[Tuple[str, PredicateType, SupportedFilterType]]:
filter_keys = req.query.getall("filter_keys", [])
filter_predicates = req.query.getall("filter_predicates", [])
filter_values = req.query.getall("filter_values", [])
assert len(filter_keys) == len(filter_values)
filters = []
for key, predicate, val in zip(filter_keys, filter_predicates, filter_values):
filters.append((key, predicate, val))
return filters

def _options_from_req(self, req: aiohttp.web.Request) -> ListApiOptions:
"""Obtain `ListApiOptions` from the aiohttp request."""
limit = int(
Expand All @@ -176,13 +190,7 @@ def _options_from_req(self, req: aiohttp.web.Request) -> ListApiOptions:
)

timeout = int(req.query.get("timeout", 30))
filter_keys = req.query.getall("filter_keys", [])
filter_predicates = req.query.getall("filter_predicates", [])
filter_values = req.query.getall("filter_values", [])
assert len(filter_keys) == len(filter_values)
filters = []
for key, predicate, val in zip(filter_keys, filter_predicates, filter_values):
filters.append((key, predicate, val))
filters = self._get_filters_from_req(req)
detail = convert_string_to_type(req.query.get("detail", False), bool)

return ListApiOptions(
Expand All @@ -191,7 +199,8 @@ def _options_from_req(self, req: aiohttp.web.Request) -> ListApiOptions:

def _summary_options_from_req(self, req: aiohttp.web.Request) -> SummaryApiOptions:
timeout = int(req.query.get("timeout", DEFAULT_RPC_TIMEOUT))
return SummaryApiOptions(timeout=timeout)
filters = self._get_filters_from_req(req)
return SummaryApiOptions(timeout=timeout, filters=filters)

def _reply(self, success: bool, error_message: str, result: dict, **kwargs):
"""Reply to the client."""
Expand Down
Loading

0 comments on commit ef78f2c

Please sign in to comment.