Skip to content

Commit

Permalink
[core] Force kill worker whose job has exited (ray-project#32217)
Browse files Browse the repository at this point in the history
## Why are these changes needed?

The worker leaks currently when the task references some global import like tensorflow. There are couple issues that led to this bug:

when the worker finishes executing it does not clean up all its borrowed references
the reference counting code treats borrowed reference as something it owns
if the worker thinks it owns references it will not exit
the worker pool will not force exit an idle worker, even if the job is dead, if the worker refuses to due to the aforementioned object ownership
This PR implements the logic in worker pool to force kill an idle worker whose job has exited
  • Loading branch information
clarng authored Feb 10, 2023
1 parent 37086a5 commit 704fd4a
Show file tree
Hide file tree
Showing 8 changed files with 215 additions and 12 deletions.
2 changes: 1 addition & 1 deletion python/ray/tests/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,7 @@ py_test_module_list(
"test_multi_node_2.py",
"test_multinode_failures.py",
"test_multinode_failures_2.py",
"test_node_manager.py",
"test_object_assign_owner.py",
"test_placement_group.py",
"test_placement_group_2.py",
Expand Down Expand Up @@ -180,7 +181,6 @@ py_test_module_list(
"test_metrics_agent_2.py",
"test_microbenchmarks.py",
"test_mini.py",
"test_node_manager.py",
"test_numba.py",
"test_redis_tls.py",
"test_raylet_output.py",
Expand Down
34 changes: 34 additions & 0 deletions python/ray/tests/test_node_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@
)
import pytest
import os
from ray.experimental.state.api import list_objects
import subprocess


# This tests the queue transitions for infeasible tasks. This has been an issue
Expand Down Expand Up @@ -221,6 +223,38 @@ def fn(self):
ray.get(det_actor.fn.remote())


@pytest.mark.parametrize(
"call_ray_start",
["""ray start --head"""],
indirect=True,
)
def test_reference_global_import_does_not_leak_worker_upon_driver_exit(call_ray_start):
driver = """
import ray
import numpy as np
import tensorflow
def leak_repro(obj):
tensorflow
return []
ds = ray.data.from_numpy(np.ones((100_000)))
ds.map(leak_repro, max_retries=0)
"""
try:
run_string_as_driver(driver)
except subprocess.CalledProcessError:
pass

ray.init(address=call_ray_start)

def no_object_leaks():
objects = list_objects(_explain=True, timeout=3)
return len(objects) == 0

wait_for_condition(no_object_leaks, timeout=10, retry_interval_ms=1000)


if __name__ == "__main__":
import sys

Expand Down
3 changes: 3 additions & 0 deletions src/ray/common/ray_config_def.h
Original file line number Diff line number Diff line change
Expand Up @@ -775,3 +775,6 @@ RAY_CONFIG(int64_t,
/// the mapped plasma pages.
RAY_CONFIG(bool, worker_core_dump_exclude_plasma_store, true)
RAY_CONFIG(bool, raylet_core_dump_exclude_plasma_store, true)

/// Whether to kill idle workers of a terminated job.
RAY_CONFIG(bool, kill_idle_workers_of_terminated_job, true)
15 changes: 12 additions & 3 deletions src/ray/core_worker/core_worker.cc
Original file line number Diff line number Diff line change
Expand Up @@ -3588,12 +3588,21 @@ void CoreWorker::HandleExit(rpc::ExitRequest request,
// We consider the worker to be idle if it doesn't own any objects and it doesn't have
// any object pinning RPCs in flight.
bool is_idle = !own_objects && pins_in_flight == 0;
reply->set_success(is_idle);
bool force_exit = request.force_exit();
RAY_LOG(DEBUG) << "Exiting: is_idle: " << is_idle << " force_exit: " << force_exit;
if (!is_idle && force_exit) {
RAY_LOG(INFO) << "Force exiting worker that owns object. This may cause other "
"workers that depends on the object to lose it. "
<< "Own objects: " << own_objects
<< " # Pins in flight: " << pins_in_flight;
}
bool will_exit = is_idle || force_exit;
reply->set_success(will_exit);
send_reply_callback(
Status::OK(),
[this, is_idle]() {
[this, will_exit]() {
// If the worker is idle, we exit.
if (is_idle) {
if (will_exit) {
Exit(rpc::WorkerExitType::INTENDED_SYSTEM_EXIT,
"Worker exits because it was idle (it doesn't have objects it owns while "
"no task or actor has been scheduled) for a long time.");
Expand Down
6 changes: 5 additions & 1 deletion src/ray/protobuf/core_worker.proto
Original file line number Diff line number Diff line change
Expand Up @@ -360,7 +360,11 @@ message DeleteSpilledObjectsRequest {

message DeleteSpilledObjectsReply {}

message ExitRequest {}
message ExitRequest {
/// Whether to force exit the worker, regardless of whether the core worker
/// owns object.
bool force_exit = 1;
}

message ExitReply {
/// Whether or not exit succeeds. If the core worker owns any object, the request fails.
Expand Down
3 changes: 3 additions & 0 deletions src/ray/raylet/worker.h
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,9 @@ class WorkerInterface {
FRIEND_TEST(WorkerPoolTest, PopWorkerMultiTenancy);
FRIEND_TEST(WorkerPoolTest, TestWorkerCapping);
FRIEND_TEST(WorkerPoolTest, TestWorkerCappingLaterNWorkersNotOwningObjects);
FRIEND_TEST(WorkerPoolTest, TestJobFinishedForceKillIdleWorker);
FRIEND_TEST(WorkerPoolTest,
WorkerFromAliveJobDoesNotBlockWorkerFromDeadJobFromGettingKilled);
FRIEND_TEST(WorkerPoolTest, TestWorkerCappingWithExitDelay);
FRIEND_TEST(WorkerPoolTest, MaximumStartupConcurrency);
FRIEND_TEST(WorkerPoolTest, HandleWorkerRegistration);
Expand Down
41 changes: 37 additions & 4 deletions src/ray/raylet/worker_pool.cc
Original file line number Diff line number Diff line change
Expand Up @@ -734,7 +734,8 @@ Status WorkerPool::RegisterWorker(const std::shared_ptr<WorkerInterface> &worker
STATS_worker_register_time_ms.Record(duration.count());
RAY_LOG(DEBUG) << "Registering worker " << worker->WorkerId() << " with pid " << pid
<< ", port: " << port << ", register cost: " << duration.count()
<< ", worker_type: " << rpc::WorkerType_Name(worker->GetWorkerType());
<< ", worker_type: " << rpc::WorkerType_Name(worker->GetWorkerType())
<< ", startup token: " << worker_startup_token;
worker->SetAssignedPort(port);

state.registered_workers.insert(worker);
Expand Down Expand Up @@ -975,12 +976,17 @@ void WorkerPool::PushWorker(const std::shared_ptr<WorkerInterface> &worker) {
&found,
&used,
&task_id);
RAY_LOG(DEBUG) << "PushWorker " << worker->WorkerId() << " used: " << used;
if (!used) {
// Put the worker to the idle pool.
state.idle.insert(worker);
int64_t now = get_time_();
idle_of_all_languages_.emplace_back(worker, now);
idle_of_all_languages_map_[worker] = now;
} else if (!found) {
RAY_LOG(INFO) << "Worker not returned to the idle pool after being used. This may "
"cause a worker leak, worker id:"
<< worker->WorkerId();
}
// We either have an idle worker or a slot to start a new worker.
if (worker->GetWorkerType() == rpc::WorkerType::WORKER) {
Expand All @@ -1006,11 +1012,18 @@ void WorkerPool::TryKillingIdleWorkers() {
for (const auto &idle_pair : idle_of_all_languages_) {
const auto &idle_worker = idle_pair.first;
const auto &job_id = idle_worker->GetAssignedJobId();

RAY_LOG(DEBUG) << " Checking idle worker "
<< idle_worker->GetAssignedTask().GetTaskSpecification().DebugString()
<< " worker id " << idle_worker->WorkerId();

if (running_size <= static_cast<size_t>(num_workers_soft_limit_)) {
if (!finished_jobs_.count(job_id)) {
if (!finished_jobs_.contains(job_id)) {
// Ignore the soft limit for jobs that have already finished, as we
// should always clean up these workers.
break;
RAY_LOG(DEBUG) << "job not finished. Not going to kill worker "
<< idle_worker->WorkerId();
continue;
}
}

Expand All @@ -1020,6 +1033,8 @@ void WorkerPool::TryKillingIdleWorkers() {
}

if (idle_worker->IsDead()) {
RAY_LOG(DEBUG) << "idle worker is already dead. Not going to kill worker "
<< idle_worker->WorkerId();
// This worker has already been killed.
// This is possible because a Java worker process may hold multiple workers.
continue;
Expand All @@ -1034,6 +1049,8 @@ void WorkerPool::TryKillingIdleWorkers() {
continue;
}

// TODO(clarng): get rid of multiple workers per process code here, as that is
// not longer supported.
auto process = idle_worker->GetProcess();
// Make sure all workers in this worker process are idle.
// This block of code is needed by Java workers.
Expand Down Expand Up @@ -1079,8 +1096,10 @@ void WorkerPool::TryKillingIdleWorkers() {
<< " with pid " << process.GetId()
<< " has been idle for a a while. Kill it.";
// To avoid object lost issue caused by forcibly killing, send an RPC request to the
// worker to allow it to do cleanup before exiting.
// worker to allow it to do cleanup before exiting. We kill it anyway if the driver
// is already exited.
if (!worker->IsDead()) {
RAY_LOG(DEBUG) << "Sending exit message to worker " << worker->WorkerId();
// Register the worker to pending exit so that we can correctly calculate the
// running_size.
// This also means that there's an inflight `Exit` RPC request to the worker.
Expand All @@ -1090,6 +1109,12 @@ void WorkerPool::TryKillingIdleWorkers() {
RAY_CHECK(running_size > 0);
running_size--;
rpc::ExitRequest request;
if (finished_jobs_.contains(job_id) &&
RayConfig::instance().kill_idle_workers_of_terminated_job()) {
RAY_LOG(INFO) << "Force exiting worker whose job has exited "
<< worker->WorkerId();
request.set_force_exit(true);
}
rpc_client->Exit(
request, [this, worker](const ray::Status &status, const rpc::ExitReply &r) {
RAY_CHECK(pending_exit_idle_workers_.erase(worker->WorkerId()));
Expand All @@ -1100,6 +1125,7 @@ void WorkerPool::TryKillingIdleWorkers() {
// In case of failed to send request, we remove it from pool as well
// TODO (iycheng): We should handle the grpc failure in better way.
if (!status.ok() || r.success()) {
RAY_LOG(DEBUG) << "Removed worker " << worker->WorkerId();
auto &worker_state = GetStateForLanguage(worker->GetLanguage());
// If we could kill the worker properly, we remove them from the idle
// pool.
Expand All @@ -1111,6 +1137,7 @@ void WorkerPool::TryKillingIdleWorkers() {
worker->MarkDead();
}
} else {
RAY_LOG(DEBUG) << "Failed to remove worker " << worker->WorkerId();
// We re-insert the idle worker to the back of the queue if it fails to
// kill the worker (e.g., when the worker owns the object). Without this,
// if the first N workers own objects, it can't kill idle workers that are
Expand All @@ -1123,6 +1150,8 @@ void WorkerPool::TryKillingIdleWorkers() {
}
});
} else {
RAY_LOG(DEBUG) << "Removing dead worker " << worker->WorkerId();

// Even it's a dead worker, we still need to remove them from the pool.
RemoveWorker(worker_state.idle, worker);
}
Expand Down Expand Up @@ -1287,6 +1316,10 @@ void WorkerPool::PrestartWorkers(const TaskSpecification &task_spec,
int64_t backlog_size,
int64_t num_available_cpus) {
// Code path of task that needs a dedicated worker.
RAY_LOG(DEBUG) << "PrestartWorkers, num_available_cpus " << num_available_cpus
<< " backlog_size " << backlog_size << " task spec "
<< task_spec.DebugString() << " has runtime env "
<< task_spec.HasRuntimeEnv();
if ((task_spec.IsActorCreationTask() && !task_spec.DynamicWorkerOptions().empty()) ||
task_spec.HasRuntimeEnv() || task_spec.GetLanguage() != ray::Language::PYTHON) {
return; // Not handled.
Expand Down
Loading

0 comments on commit 704fd4a

Please sign in to comment.