Skip to content

Commit

Permalink
[Core] Remove dead code (ray-project#47074)
Browse files Browse the repository at this point in the history
The code is dead
  • Loading branch information
jjyao authored Aug 11, 2024
1 parent 2572bba commit 768108f
Show file tree
Hide file tree
Showing 7 changed files with 8 additions and 73 deletions.
2 changes: 0 additions & 2 deletions src/ray/core_worker/test/actor_manager_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -118,8 +118,6 @@ class MockActorTaskSubmitter : public ActorTaskSubmitterInterface {
int64_t num_restarts,
bool dead,
const rpc::ActorDeathCause &death_cause));
MOCK_METHOD3(KillActor,
void(const ActorID &actor_id, bool force_kill, bool no_restart));

MOCK_METHOD0(CheckTimeoutTasks, void());

Expand Down
39 changes: 0 additions & 39 deletions src/ray/core_worker/transport/actor_task_submitter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -40,35 +40,6 @@ void ActorTaskSubmitter::AddActorQueueIfNotExists(const ActorID &actor_id,
actor_id, execute_out_of_order, max_pending_calls, fail_if_actor_unreachable));
}

void ActorTaskSubmitter::KillActor(const ActorID &actor_id,
bool force_kill,
bool no_restart) {
absl::MutexLock lock(&mu_);
rpc::KillActorRequest request;
request.set_intended_actor_id(actor_id.Binary());
request.set_force_kill(force_kill);
request.set_no_restart(no_restart);

auto it = client_queues_.find(actor_id);
// The language frontend can only kill actors that it has a reference to.
RAY_CHECK(it != client_queues_.end());

if (!it->second.pending_force_kill) {
it->second.pending_force_kill = request;
} else if (force_kill) {
// Overwrite the previous request to kill the actor if the new request is a
// force kill.
it->second.pending_force_kill->set_force_kill(true);
if (no_restart) {
// Overwrite the previous request to disable restart if the new request's
// no_restart flag is set to true.
it->second.pending_force_kill->set_no_restart(true);
}
}

SendPendingTasks(actor_id);
}

Status ActorTaskSubmitter::SubmitTask(TaskSpecification task_spec) {
auto task_id = task_spec.TaskId();
auto actor_id = task_spec.ActorId();
Expand Down Expand Up @@ -166,7 +137,6 @@ void ActorTaskSubmitter::DisconnectRpcClient(ClientQueue &queue) {
queue.rpc_client = nullptr;
core_worker_client_pool_.Disconnect(WorkerID::FromBinary(queue.worker_id));
queue.worker_id.clear();
queue.pending_force_kill.reset();
}

void ActorTaskSubmitter::FailInflightTasks(
Expand Down Expand Up @@ -409,15 +379,6 @@ void ActorTaskSubmitter::SendPendingTasks(const ActorID &actor_id) {
return;
}

// Check if there is a pending force kill. If there is, send it and disconnect the
// client.
if (client_queue.pending_force_kill) {
RAY_LOG(INFO).WithField(actor_id) << "Sending KillActor request to actor";
// It's okay if this fails because this means the worker is already dead.
client_queue.rpc_client->KillActor(*client_queue.pending_force_kill, nullptr);
client_queue.pending_force_kill.reset();
}

// Submit all pending actor_submit_queue->
while (true) {
auto task = actor_submit_queue->PopNextTaskToSend();
Expand Down
14 changes: 0 additions & 14 deletions src/ray/core_worker/transport/actor_task_submitter.h
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,6 @@ class ActorTaskSubmitterInterface {
int64_t num_restarts,
bool dead,
const rpc::ActorDeathCause &death_cause) = 0;
virtual void KillActor(const ActorID &actor_id, bool force_kill, bool no_restart) = 0;

virtual void CheckTimeoutTasks() = 0;

Expand Down Expand Up @@ -117,15 +116,6 @@ class ActorTaskSubmitter : public ActorTaskSubmitterInterface {
/// \return Status::Invalid if the task is not yet supported.
Status SubmitTask(TaskSpecification task_spec);

/// Tell this actor to exit immediately.
///
/// \param[in] actor_id The actor_id of the actor to kill.
/// \param[in] force_kill Whether to force kill the actor, or let the actor
/// try a clean exit.
/// \param[in] no_restart If set to true, the killed actor will not be
/// restarted anymore.
void KillActor(const ActorID &actor_id, bool force_kill, bool no_restart);

/// Create connection to actor and send all pending tasks.
///
/// \param[in] actor_id Actor ID.
Expand Down Expand Up @@ -317,10 +307,6 @@ class ActorTaskSubmitter : public ActorTaskSubmitterInterface {
/// case we hard code an error info.
std::deque<std::shared_ptr<PendingTaskWaitingForDeathInfo>> wait_for_death_info_tasks;

/// A force-kill request that should be sent to the actor once an RPC
/// client to the actor is available.
absl::optional<rpc::KillActorRequest> pending_force_kill;

/// Stores all callbacks of inflight tasks. Note that this doesn't include tasks
/// without replies.
absl::flat_hash_map<TaskID, rpc::ClientCallback<rpc::PushTaskReply>>
Expand Down
14 changes: 5 additions & 9 deletions src/ray/gcs/gcs_server/gcs_actor_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -554,7 +554,7 @@ void GcsActorManager::HandleKillActorViaGcs(rpc::KillActorViaGcsRequest request,
if (no_restart) {
DestroyActor(actor_id, GenKilledByApplicationCause(GetActor(actor_id)));
} else {
KillActor(actor_id, force_kill, no_restart);
KillActor(actor_id, force_kill);
}

GCS_RPC_SEND_REPLY(send_reply_callback, reply, Status::OK());
Expand Down Expand Up @@ -1519,13 +1519,11 @@ void GcsActorManager::RemoveActorFromOwner(const std::shared_ptr<GcsActor> &acto

void GcsActorManager::NotifyCoreWorkerToKillActor(const std::shared_ptr<GcsActor> &actor,
const rpc::ActorDeathCause &death_cause,
bool force_kill,
bool no_restart) {
bool force_kill) {
rpc::KillActorRequest request;
request.set_intended_actor_id(actor->GetActorID().Binary());
request.mutable_death_cause()->CopyFrom(death_cause);
request.set_force_kill(force_kill);
request.set_no_restart(no_restart);
auto actor_client = worker_client_factory_(actor->GetAddress());
RAY_LOG(DEBUG) << "Send request to kill actor " << actor->GetActorID() << " to worker "
<< actor->GetWorkerID() << " at node " << actor->GetNodeID();
Expand All @@ -1534,9 +1532,7 @@ void GcsActorManager::NotifyCoreWorkerToKillActor(const std::shared_ptr<GcsActor
});
}

void GcsActorManager::KillActor(const ActorID &actor_id,
bool force_kill,
bool no_restart) {
void GcsActorManager::KillActor(const ActorID &actor_id, bool force_kill) {
RAY_LOG(DEBUG) << "Killing actor, job id = " << actor_id.JobId()
<< ", actor id = " << actor_id << ", force_kill = " << force_kill;
auto it = registered_actors_.find(actor_id);
Expand All @@ -1559,7 +1555,7 @@ void GcsActorManager::KillActor(const ActorID &actor_id,
// The actor has already been created. Destroy the process by force-killing
// it.
NotifyCoreWorkerToKillActor(
actor, GenKilledByApplicationCause(GetActor(actor_id)), force_kill, no_restart);
actor, GenKilledByApplicationCause(GetActor(actor_id)), force_kill);
} else {
const auto &task_id = actor->GetCreationTaskSpecification().TaskId();
RAY_LOG(DEBUG) << "The actor " << actor->GetActorID()
Expand All @@ -1568,7 +1564,7 @@ void GcsActorManager::KillActor(const ActorID &actor_id,
// The actor is in phase of creating, so we need to notify the core
// worker exit to avoid process and resource leak.
NotifyCoreWorkerToKillActor(
actor, GenKilledByApplicationCause(GetActor(actor_id)), force_kill, no_restart);
actor, GenKilledByApplicationCause(GetActor(actor_id)), force_kill);
}
CancelActorInScheduling(actor, task_id);
ReconstructActor(actor_id,
Expand Down
7 changes: 2 additions & 5 deletions src/ray/gcs/gcs_server/gcs_actor_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -525,19 +525,16 @@ class GcsActorManager : public rpc::ActorInfoHandler {
///
/// \param actor_id ID of the actor to kill.
/// \param force_kill Whether to force kill an actor by killing the worker.
/// \param no_restart If set to true, the killed actor will not be restarted anymore.
void KillActor(const ActorID &actor_id, bool force_kill, bool no_restart);
void KillActor(const ActorID &actor_id, bool force_kill);

/// Notify CoreWorker to kill the specified actor.
///
/// \param actor The actor to be killed.
/// \param death_cause Context about why this actor is dead.
/// \param force_kill Whether to force kill an actor by killing the worker.
/// \param no_restart If set to true, the killed actor will not be restarted anymore.
void NotifyCoreWorkerToKillActor(const std::shared_ptr<GcsActor> &actor,
const rpc::ActorDeathCause &death_cause,
bool force_kill = true,
bool no_restart = true);
bool force_kill = true);

/// Add the destroyed actor to the cache. If the cache is full, one actor is randomly
/// evicted.
Expand Down
1 change: 0 additions & 1 deletion src/ray/gcs/gcs_server/gcs_actor_scheduler.cc
Original file line number Diff line number Diff line change
Expand Up @@ -546,7 +546,6 @@ bool GcsActorScheduler::KillActorOnWorker(const rpc::Address &worker_address,
// Set it to be Nil() since it hasn't been setup yet.
request.set_intended_actor_id(actor_id.Binary());
request.set_force_kill(true);
request.set_no_restart(true);
cli->KillActor(request, [actor_id](auto &status, auto &&) {
RAY_LOG(DEBUG) << "Killing actor " << actor_id
<< " with return status: " << status.ToString();
Expand Down
4 changes: 1 addition & 3 deletions src/ray/protobuf/core_worker.proto
Original file line number Diff line number Diff line change
Expand Up @@ -241,10 +241,8 @@ message KillActorRequest {
bytes intended_actor_id = 1;
// Whether to force kill the actor.
bool force_kill = 2;
// If set to true, the killed actor will not be restarted anymore.
bool no_restart = 3;
// The precise reason why this actor receives a kill request.
ActorDeathCause death_cause = 4;
ActorDeathCause death_cause = 3;
}

message KillActorReply {}
Expand Down

0 comments on commit 768108f

Please sign in to comment.