Skip to content

Commit

Permalink
[Core] Lightweight Resource Report for New Scheduler (ray-project#16527)
Browse files Browse the repository at this point in the history
* check resource diff

* fix

* fix

* comment modified

* fix
  • Loading branch information
lixin-wei authored Jul 1, 2021
1 parent 94149ef commit e00d898
Show file tree
Hide file tree
Showing 7 changed files with 50 additions and 7 deletions.
3 changes: 3 additions & 0 deletions src/ray/common/ray_config_def.h
Original file line number Diff line number Diff line change
Expand Up @@ -426,3 +426,6 @@ RAY_CONFIG(bool, gcs_task_scheduling_enabled,
getenv("RAY_GCS_TASK_SCHEDULING_ENABLED") == std::string("true"))

RAY_CONFIG(uint32_t, max_error_msg_size_bytes, 512 * 1024)

/// If enabled, raylet will report resources only when resources are changed.
RAY_CONFIG(bool, enable_light_weight_resource_report, true)
3 changes: 2 additions & 1 deletion src/ray/raylet/node_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -569,7 +569,8 @@ void NodeManager::FillResourceReport(rpc::ResourcesData &resources_data) {
cluster_resource_scheduler_->UpdateLastResourceUsage(
gcs_client_->NodeResources().GetLastResourceUsage());
cluster_resource_scheduler_->FillResourceUsage(resources_data);
cluster_task_manager_->FillResourceUsage(resources_data);
cluster_task_manager_->FillResourceUsage(
resources_data, gcs_client_->NodeResources().GetLastResourceUsage());
if (RayConfig::instance().gcs_task_scheduling_enabled()) {
FillNormalTaskResourceUsage(resources_data);
}
Expand Down
4 changes: 4 additions & 0 deletions src/ray/raylet/scheduling/cluster_resource_scheduler.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1020,6 +1020,10 @@ void ClusterResourceScheduler::FillResourceUsage(rpc::ResourcesData &resources_d
if (resources != *last_report_resources_.get()) {
last_report_resources_.reset(new NodeResources(resources));
}

if (!RayConfig::instance().enable_light_weight_resource_report()) {
resources_data.set_resources_available_changed(true);
}
}

ray::gcs::NodeResourceInfoAccessor::ResourceMap
Expand Down
20 changes: 16 additions & 4 deletions src/ray/raylet/scheduling/cluster_task_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -557,13 +557,12 @@ void ClusterTaskManager::FillPendingActorInfo(rpc::GetNodeStatsReply *reply) con
}
}

void ClusterTaskManager::FillResourceUsage(rpc::ResourcesData &data) {
void ClusterTaskManager::FillResourceUsage(
rpc::ResourcesData &data,
const std::shared_ptr<SchedulingResources> &last_reported_resources) {
if (max_resource_shapes_per_load_report_ == 0) {
return;
}
// TODO (WangTao): Find a way to check if load changed and combine it with light
// heartbeat. Now we just report it every time.
data.set_resource_load_changed(true);
auto resource_loads = data.mutable_resource_load();
auto resource_load_by_shape =
data.mutable_resource_load_by_shape()->mutable_resource_demands();
Expand Down Expand Up @@ -724,6 +723,19 @@ void ClusterTaskManager::FillResourceUsage(rpc::ResourcesData &data) {
by_shape_entry->set_backlog_size(backlog_it->second);
}
}

if (RayConfig::instance().enable_light_weight_resource_report()) {
// Check whether resources have been changed.
std::unordered_map<std::string, double> local_resource_map(
data.resource_load().begin(), data.resource_load().end());
ResourceSet local_resource(local_resource_map);
if (last_reported_resources == nullptr ||
!last_reported_resources->GetLoadResources().IsEqual(local_resource)) {
data.set_resource_load_changed(true);
}
} else {
data.set_resource_load_changed(true);
}
}

bool ClusterTaskManager::AnyPendingTasks(Task *exemplar, bool *any_pending,
Expand Down
4 changes: 3 additions & 1 deletion src/ray/raylet/scheduling/cluster_task_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,9 @@ class ClusterTaskManager : public ClusterTaskManagerInterface {
///
/// \param Output parameter. `resource_load` and `resource_load_by_shape` are the only
/// fields used.
void FillResourceUsage(rpc::ResourcesData &data) override;
void FillResourceUsage(rpc::ResourcesData &data,
const std::shared_ptr<SchedulingResources>
&last_reported_resources = nullptr) override;

/// Return if any tasks are pending resource acquisition.
///
Expand Down
4 changes: 3 additions & 1 deletion src/ray/raylet/scheduling/cluster_task_manager_interface.h
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,9 @@ class ClusterTaskManagerInterface {
///
/// \param Output parameter. `resource_load` and `resource_load_by_shape` are the only
/// fields used.
virtual void FillResourceUsage(rpc::ResourcesData &data) = 0;
virtual void FillResourceUsage(
rpc::ResourcesData &data,
const std::shared_ptr<SchedulingResources> &last_reported_resources = nullptr) = 0;

/// Populate the list of pending or infeasible actor tasks for node stats.
///
Expand Down
19 changes: 19 additions & 0 deletions src/ray/raylet/scheduling/cluster_task_manager_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1394,6 +1394,25 @@ TEST_F(ClusterTaskManagerTest, LargeArgsNoStarvationTest) {
AssertNoLeaks();
}

TEST_F(ClusterTaskManagerTest, TestResourceDiff) {
// When scheduling_resources is null, resource is always marked as changed
rpc::ResourcesData resource_data;
task_manager_.FillResourceUsage(resource_data, nullptr);
ASSERT_TRUE(resource_data.resource_load_changed());
auto scheduling_resources = std::make_shared<SchedulingResources>();
// Same resources(empty), not changed.
resource_data.set_resource_load_changed(false);
task_manager_.FillResourceUsage(resource_data, scheduling_resources);
ASSERT_FALSE(resource_data.resource_load_changed());
// Resource changed.
resource_data.set_resource_load_changed(false);
ResourceSet res;
res.AddOrUpdateResource("CPU", 100);
scheduling_resources->SetLoadResources(std::move(res));
task_manager_.FillResourceUsage(resource_data, scheduling_resources);
ASSERT_TRUE(resource_data.resource_load_changed());
}

int main(int argc, char **argv) {
::testing::InitGoogleTest(&argc, argv);
return RUN_ALL_TESTS();
Expand Down

0 comments on commit e00d898

Please sign in to comment.