Skip to content

Commit

Permalink
[core] Add leased worker reporting in idle nodes (ray-project#39582)
Browse files Browse the repository at this point in the history

---------

Signed-off-by: vitsai <[email protected]>
Signed-off-by: vitsai <[email protected]>
  • Loading branch information
vitsai authored Sep 15, 2023
1 parent 71f0ae7 commit 28c04d8
Show file tree
Hide file tree
Showing 9 changed files with 222 additions and 20 deletions.
125 changes: 125 additions & 0 deletions python/ray/autoscaler/v2/tests/test_sdk.py
Original file line number Diff line number Diff line change
Expand Up @@ -460,6 +460,131 @@ def verify_cluster_no_node():
wait_for_condition(verify_cluster_no_node)


# We test that a node with only workers blocked on get
# is considered idle.
def test_idle_node_blocked(ray_start_cluster):
cluster = ray_start_cluster
cluster.add_node(num_cpus=1)
ray.init(address=cluster.address)

stub = _autoscaler_state_service_stub()

# We don't have node id from `add_node` unfortunately.
def nodes_up():
nodes = list_nodes()
assert len(nodes) == 1
return True

wait_for_condition(nodes_up)

head_node_id = get_node_ids()

def verify_cluster_idle():
state = get_cluster_resource_state(stub)
assert_node_states(
state,
[
ExpectedNodeState(
head_node_id, NodeStatus.IDLE, lambda idle_ms: idle_ms > 0
),
],
)
return True

wait_for_condition(verify_cluster_idle)

# Unschedulable
@ray.remote(num_cpus=10000)
def f():
pass

# Schedule a task running
@ray.remote(num_cpus=1)
def g():
ray.get(f.remote())

t = g.remote()

def verify_cluster_busy():
state = get_cluster_resource_state(stub)
assert_node_states(
state,
[
ExpectedNodeState(
head_node_id, NodeStatus.RUNNING, lambda idle_ms: idle_ms == 0
),
],
)
return True

wait_for_condition(verify_cluster_busy)
for _ in range(10):
time.sleep(0.5)
verify_cluster_busy()

# Kill the task
ray.cancel(t, force=True)
wait_for_condition(verify_cluster_idle)


def test_idle_node_no_resource(ray_start_cluster):
cluster = ray_start_cluster
cluster.add_node(num_cpus=1)
ray.init(address=cluster.address)

stub = _autoscaler_state_service_stub()

# We don't have node id from `add_node` unfortunately.
def nodes_up():
nodes = list_nodes()
assert len(nodes) == 1
return True

wait_for_condition(nodes_up)

head_node_id = get_node_ids()

def verify_cluster_idle():
state = get_cluster_resource_state(stub)
assert_node_states(
state,
[
ExpectedNodeState(
head_node_id, NodeStatus.IDLE, lambda idle_ms: idle_ms > 0
),
],
)
return True

wait_for_condition(verify_cluster_idle)

# Schedule a task running
@ray.remote(num_cpus=0)
def f():
while True:
pass

t = f.remote()

def verify_cluster_busy():
state = get_cluster_resource_state(stub)
assert_node_states(
state,
[
ExpectedNodeState(
head_node_id, NodeStatus.RUNNING, lambda idle_ms: idle_ms == 0
),
],
)
return True

wait_for_condition(verify_cluster_busy)

# Kill the task
ray.cancel(t, force=True)
wait_for_condition(verify_cluster_idle)


def test_get_cluster_status_resources(ray_start_cluster):
cluster = ray_start_cluster
# Head node
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -482,7 +482,7 @@ void CoreWorkerDirectActorTaskSubmitter::PushActorTask(ClientQueue &queue,
auto callback_it = queue.inflight_task_callbacks.find(task_id);
if (callback_it == queue.inflight_task_callbacks.end()) {
RAY_LOG(DEBUG) << "The task " << task_id
<< " has already been marked as failed. Ingore the reply.";
<< " has already been marked as failed. Ignore the reply.";
return;
}
reply_callback = std::move(callback_it->second);
Expand Down
2 changes: 2 additions & 0 deletions src/ray/raylet/local_task_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -872,6 +872,8 @@ void LocalTaskManager::Dispatch(

RAY_CHECK(leased_workers.find(worker->WorkerId()) == leased_workers.end());
leased_workers[worker->WorkerId()] = worker;
cluster_resource_scheduler_->GetLocalResourceManager().SetBusyFootprint(
WorkFootprint::NODE_WORKERS);

// Update our internal view of the cluster state.
std::shared_ptr<TaskResourceInstances> allocated_resources;
Expand Down
10 changes: 4 additions & 6 deletions src/ray/raylet/node_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1456,7 +1456,7 @@ void NodeManager::DisconnectClient(const std::shared_ptr<ClientConnection> &clie
const rpc::RayException *creation_task_exception) {
RAY_LOG(INFO) << "NodeManager::DisconnectClient, disconnect_type=" << disconnect_type
<< ", has creation task exception = " << std::boolalpha
<< bool(creation_task_exception == nullptr);
<< bool(creation_task_exception != nullptr);
std::shared_ptr<WorkerInterface> worker = worker_pool_.GetRegisteredWorker(client);
bool is_worker = false, is_driver = false;
if (worker) {
Expand All @@ -1480,7 +1480,7 @@ void NodeManager::DisconnectClient(const std::shared_ptr<ClientConnection> &clie
dependency_manager_.CancelWaitRequest(worker->WorkerId());

// Erase any lease metadata.
leased_workers_.erase(worker->WorkerId());
ReleaseWorker(worker->WorkerId());

if (creation_task_exception != nullptr) {
RAY_LOG(INFO) << "Formatted creation task exception: "
Expand Down Expand Up @@ -1907,7 +1907,7 @@ void NodeManager::HandleReturnWorker(rpc::ReturnWorkerRequest request,
std::shared_ptr<WorkerInterface> worker = leased_workers_[worker_id];

Status status;
leased_workers_.erase(worker_id);
ReleaseWorker(worker_id);

if (worker) {
if (request.disconnect_worker()) {
Expand Down Expand Up @@ -2005,9 +2005,7 @@ void NodeManager::HandleReleaseUnusedWorkers(rpc::ReleaseUnusedWorkersRequest re
}
}

for (auto &iter : unused_worker_ids) {
leased_workers_.erase(iter);
}
ReleaseWorkers(unused_worker_ids);

send_reply_callback(Status::OK(), nullptr, nullptr);
}
Expand Down
19 changes: 19 additions & 0 deletions src/ray/raylet/node_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,25 @@ class NodeManager : public rpc::NodeManagerServiceHandler,
const std::function<void()> &on_all_replied);

private:
void ReleaseWorker(const WorkerID &worker_id) {
leased_workers_.erase(worker_id);
SetIdleIfLeaseEmpty();
}

void ReleaseWorkers(const std::vector<WorkerID> &worker_ids) {
for (auto &it : worker_ids) {
leased_workers_.erase(it);
}
SetIdleIfLeaseEmpty();
}

inline void SetIdleIfLeaseEmpty() {
if (leased_workers_.empty()) {
cluster_resource_scheduler_->GetLocalResourceManager().SetIdleFootprint(
WorkFootprint::NODE_WORKERS);
}
}

/// If the primary objects' usage is over the threshold
/// specified in RayConfig, spill objects up to the max
/// throughput.
Expand Down
27 changes: 27 additions & 0 deletions src/ray/raylet/scheduling/cluster_task_manager_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -991,6 +991,33 @@ TEST_F(ClusterTaskManagerTest, TestSpillAfterAssigned) {
AssertNoLeaks();
}

TEST_F(ClusterTaskManagerTest, TestIdleNode) {
RayTask task = CreateTask({{}});
rpc::RequestWorkerLeaseReply reply;
bool callback_occurred = false;
bool *callback_occurred_ptr = &callback_occurred;
auto callback = [callback_occurred_ptr](
Status, std::function<void()>, std::function<void()>) {
*callback_occurred_ptr = true;
};

task_manager_.QueueAndScheduleTask(task, false, false, &reply, callback);
pool_.TriggerCallbacks();
ASSERT_TRUE(scheduler_->GetLocalResourceManager().IsLocalNodeIdle());
ASSERT_FALSE(callback_occurred);
ASSERT_EQ(leased_workers_.size(), 0);

std::shared_ptr<MockWorker> worker =
std::make_shared<MockWorker>(WorkerID::FromRandom(), 1234);
pool_.PushWorker(std::static_pointer_cast<WorkerInterface>(worker));
pool_.TriggerCallbacks();

ASSERT_TRUE(callback_occurred);
ASSERT_EQ(leased_workers_.size(), 1);
ASSERT_FALSE(scheduler_->GetLocalResourceManager().IsLocalNodeIdle());
ASSERT_EQ(node_info_calls_, 0);
}

TEST_F(ClusterTaskManagerTest, NotOKPopWorkerAfterDrainingTest) {
/*
Test cases where the node is being drained after PopWorker is called
Expand Down
32 changes: 25 additions & 7 deletions src/ray/raylet/scheduling/local_resource_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ LocalResourceManager::LocalResourceManager(
local_resources_.labels = node_resources.labels;
const auto now = absl::Now();
for (const auto &resource_id : node_resources.total.ExplicitResourceIds()) {
resources_last_idle_time_[resource_id] = now;
last_idle_times_[resource_id] = now;
}
RAY_LOG(DEBUG) << "local resources: " << local_resources_.DebugString();
}
Expand All @@ -55,7 +55,7 @@ void LocalResourceManager::AddLocalResourceInstances(
void LocalResourceManager::DeleteLocalResource(scheduling::ResourceID resource_id) {
local_resources_.available.Remove(resource_id);
local_resources_.total.Remove(resource_id);
resources_last_idle_time_.erase(resource_id);
last_idle_times_.erase(resource_id);
OnResourceOrStateChanged();
}

Expand Down Expand Up @@ -114,6 +114,24 @@ void LocalResourceManager::FreeTaskResourceInstances(
}
}
}
void LocalResourceManager::SetBusyFootprint(WorkFootprint item) {
auto prev = last_idle_times_.find(item);
if (prev != last_idle_times_.end() && !prev->second.has_value()) {
return;
}
last_idle_times_[item] = absl::nullopt;
OnResourceOrStateChanged();
}

void LocalResourceManager::SetIdleFootprint(WorkFootprint item) {
auto prev = last_idle_times_.find(item);
bool state_change = prev == last_idle_times_.end() || !prev->second.has_value();

last_idle_times_[item] = absl::Now();
if (state_change) {
OnResourceOrStateChanged();
}
}

void LocalResourceManager::AddResourceInstances(
scheduling::ResourceID resource_id, const std::vector<double> &resource_instances) {
Expand Down Expand Up @@ -173,21 +191,21 @@ void LocalResourceManager::SetResourceNonIdle(const scheduling::ResourceID &reso
if (resource_id.IsImplicitResource()) {
return;
}
resources_last_idle_time_[resource_id] = absl::nullopt;
last_idle_times_[resource_id] = absl::nullopt;
}

void LocalResourceManager::SetResourceIdle(const scheduling::ResourceID &resource_id) {
if (resource_id.IsImplicitResource()) {
return;
}
resources_last_idle_time_[resource_id] = absl::Now();
last_idle_times_[resource_id] = absl::Now();
}

absl::optional<absl::Time> LocalResourceManager::GetResourceIdleTime() const {
// If all the resources are idle.
absl::Time all_idle_time = absl::InfinitePast();

for (const auto &iter : resources_last_idle_time_) {
for (const auto &iter : last_idle_times_) {
const auto &idle_time_or_busy = iter.second;

if (idle_time_or_busy == absl::nullopt) {
Expand Down Expand Up @@ -261,11 +279,11 @@ void LocalResourceManager::UpdateAvailableObjectStoreMemResource() {
if (used == 0.0) {
// Set it to idle as of now.
RAY_LOG(INFO) << "Object store memory is idle.";
resources_last_idle_time_[ResourceID::ObjectStoreMemory()] = absl::Now();
last_idle_times_[ResourceID::ObjectStoreMemory()] = absl::Now();
} else {
// Clear the idle info since we know it's being used.
RAY_LOG(INFO) << "Object store memory is not idle.";
resources_last_idle_time_[ResourceID::ObjectStoreMemory()] = absl::nullopt;
last_idle_times_[ResourceID::ObjectStoreMemory()] = absl::nullopt;
}

OnResourceOrStateChanged();
Expand Down
19 changes: 17 additions & 2 deletions src/ray/raylet/scheduling/local_resource_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,16 @@

namespace ray {

/// Encapsulates non-resource artifacts that evidence work when present.
enum WorkFootprint {
NODE_WORKERS = 1,
};

// Represents artifacts of a node that can be busy or idle.
// Resources are schedulable, such as gpu or cpu.
// WorkFootprints are not, such as leased workers on a node.
using WorkArtifact = std::variant<WorkFootprint, scheduling::ResourceID>;

/// Class manages the resources of the local node.
/// It is responsible for allocating/deallocating resources for (task) resource request;
/// it also supports creating a new resource or delete an existing resource.
Expand Down Expand Up @@ -108,6 +118,11 @@ class LocalResourceManager : public syncer::ReporterInterface {

void ReleaseWorkerResources(std::shared_ptr<TaskResourceInstances> task_allocation);

// Removes idle time for a WorkFootprint, thereby marking it busy.
void SetBusyFootprint(WorkFootprint item);
// Sets the idle time for a WorkFootprint to now.
void SetIdleFootprint(WorkFootprint item);

double GetLocalAvailableCpus() const;

/// Return human-readable string for this scheduler state.
Expand Down Expand Up @@ -193,9 +208,9 @@ class LocalResourceManager : public syncer::ReporterInterface {
scheduling::NodeID local_node_id_;
/// Resources of local node.
NodeResourceInstances local_resources_;

/// A map storing when the resource was last idle.
absl::flat_hash_map<scheduling::ResourceID, absl::optional<absl::Time>>
resources_last_idle_time_;
absl::flat_hash_map<WorkArtifact, absl::optional<absl::Time>> last_idle_times_;
/// Cached resources, used to compare with newest one in light heartbeat mode.
std::unique_ptr<NodeResources> last_report_resources_;
/// Function to get used object store memory.
Expand Down
6 changes: 2 additions & 4 deletions src/ray/raylet/worker_pool.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1189,10 +1189,8 @@ void WorkerPool::PopWorker(const TaskSpecification &task_spec,
}
};

if (task_spec.IsActorTask()) {
// Code path of actor task.
RAY_CHECK(false) << "Direct call shouldn't reach here.";
}
// Code path of actor task.
RAY_CHECK(!task_spec.IsActorTask()) << "Direct call shouldn't reach here.";

bool is_actor_creation = task_spec.IsActorCreationTask();
std::vector<std::string> dynamic_options{};
Expand Down

0 comments on commit 28c04d8

Please sign in to comment.