Skip to content

Commit

Permalink
Revert "Use Boost.Process instead of pid_t (ray-project#6510)" (ray-p…
Browse files Browse the repository at this point in the history
…roject#6909)

This reverts commit fb8e361.
  • Loading branch information
mehrdadn authored and edoakes committed Jan 26, 2020
1 parent 2fca550 commit 38ec2e7
Show file tree
Hide file tree
Showing 11 changed files with 205 additions and 335 deletions.
1 change: 0 additions & 1 deletion BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -399,7 +399,6 @@ cc_library(
":stats_lib",
":worker_rpc",
"@boost//:asio",
"@boost//:process",
"@com_github_jupp0r_prometheus_cpp//pull",
"@com_google_absl//absl/base:core_headers",
"@com_google_absl//absl/container:flat_hash_set",
Expand Down
2 changes: 0 additions & 2 deletions bazel/ray_deps_setup.bzl
Original file line number Diff line number Diff line change
Expand Up @@ -123,8 +123,6 @@ def ray_deps_setup():
# Backport Clang-Cl patch on Boost 1.69 to Boost <= 1.68:
# https://lists.boost.org/Archives/boost/2018/09/243420.php
"//thirdparty/patches:boost-type_traits-trivial_move.patch",
# Partially backport waitpid() patch on Boost 1.72 to Boost <= 1.68
"//thirdparty/patches:boost-process-teminate-waitpid-nohang.patch",
],
)

Expand Down
6 changes: 3 additions & 3 deletions python/ray/test_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ def wait_for_pid_to_exit(pid, timeout=20):
return
time.sleep(0.1)
raise RayTestTimeoutException(
"Timed out while waiting for process {} to exit.".format(pid))
"Timed out while waiting for process to exit.")


def wait_for_children_of_pid(pid, num_children=1, timeout=20):
Expand All @@ -51,8 +51,8 @@ def wait_for_children_of_pid(pid, num_children=1, timeout=20):
return
time.sleep(0.1)
raise RayTestTimeoutException(
"Timed out while waiting for process {} children to start "
"({}/{} started).".format(pid, num_alive, num_children))
"Timed out while waiting for process children to start "
"({}/{} started).".format(num_alive, num_children))


def wait_for_children_of_pid_to_exit(pid, timeout=20):
Expand Down
61 changes: 27 additions & 34 deletions src/ray/raylet/node_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -89,9 +89,8 @@ NodeManager::NodeManager(boost::asio::io_service &io_service,
object_manager_profile_timer_(io_service),
initial_config_(config),
local_available_resources_(config.resource_config),
worker_pool_(io_service, config.num_initial_workers,
config.maximum_startup_concurrency, gcs_client_,
config.worker_commands),
worker_pool_(config.num_initial_workers, config.maximum_startup_concurrency,
gcs_client_, config.worker_commands),
scheduling_policy_(local_queues_),
reconstruction_policy_(
io_service_,
Expand Down Expand Up @@ -229,23 +228,22 @@ void NodeManager::HandleUnexpectedWorkerFailure(
}

void NodeManager::KillWorker(std::shared_ptr<Worker> worker) {
#ifdef _WIN32
// TODO(mehrdadn): Implement implement graceful process termination mechanism
#else
// If we're just cleaning up a single worker, allow it some time to clean
// up its state before force killing. The client socket will be closed
// and the worker struct will be freed after the timeout.
kill(worker->Process().get()->id(), SIGTERM);
#endif
kill(worker->Pid(), SIGTERM);

auto retry_timer = std::make_shared<boost::asio::deadline_timer>(io_service_);
auto retry_duration = boost::posix_time::milliseconds(
RayConfig::instance().kill_worker_timeout_milliseconds());
retry_timer->expires_from_now(retry_duration);
retry_timer->async_wait([retry_timer, worker](const boost::system::error_code &error) {
RAY_LOG(DEBUG) << "Send SIGKILL to worker, pid=" << worker->Process().get()->id();
// Force kill worker
worker->Process().get()->terminate();
RAY_LOG(DEBUG) << "Send SIGKILL to worker, pid=" << worker->Pid();
// Force kill worker. TODO(mehrdadn, rkn): The worker may have already died
// and had its PID reassigned to a different process, at least on Windows.
// On Linux, this may or may not be the case, depending on e.g. whether
// the process has been already waited on. Regardless, this must be fixed.
kill(worker->Pid(), SIGKILL);
});
}

Expand Down Expand Up @@ -857,9 +855,8 @@ void NodeManager::ProcessClientMessage(
RAY_LOG(DEBUG) << "[Worker] Message "
<< protocol::EnumNameMessageType(message_type_value) << "("
<< message_type << ") from worker with PID "
<< (registered_worker
? std::to_string(registered_worker->Process().get()->id())
: "nil");
<< (registered_worker ? std::to_string(registered_worker->Pid())
: "nil");
if (registered_worker && registered_worker->IsDead()) {
// For a worker that is marked as dead (because the job has died already),
// all the messages are ignored except DisconnectClient.
Expand Down Expand Up @@ -966,6 +963,12 @@ void NodeManager::ProcessClientMessage(
void NodeManager::ProcessRegisterClientRequestMessage(
const std::shared_ptr<LocalClientConnection> &client, const uint8_t *message_data) {
client->Register();
auto message = flatbuffers::GetRoot<protocol::RegisterClientRequest>(message_data);
Language language = static_cast<Language>(message->language());
WorkerID worker_id = from_flatbuf<WorkerID>(*message->worker_id());
auto worker = std::make_shared<Worker>(worker_id, message->worker_pid(), language,
message->port(), client, client_call_manager_);
Status status;
flatbuffers::FlatBufferBuilder fbb;
auto reply =
ray::protocol::CreateRegisterClientReply(fbb, to_flatbuf(fbb, self_node_id_));
Expand All @@ -980,31 +983,24 @@ void NodeManager::ProcessRegisterClientRequestMessage(
}
});

auto message = flatbuffers::GetRoot<protocol::RegisterClientRequest>(message_data);
Language language = static_cast<Language>(message->language());
WorkerID worker_id = from_flatbuf<WorkerID>(*message->worker_id());
pid_t pid = message->worker_pid();
auto worker = std::make_shared<Worker>(worker_id, language, message->port(), client,
client_call_manager_);
if (message->is_worker()) {
// Register the new worker.
if (worker_pool_.RegisterWorker(worker, pid).ok()) {
if (worker_pool_.RegisterWorker(worker).ok()) {
HandleWorkerAvailable(worker->Connection());
}
} else {
// Register the new driver.
worker->SetProcess(ProcessHandle::FromPid(pid));
const JobID job_id = from_flatbuf<JobID>(*message->job_id());
// Compute a dummy driver task id from a given driver.
const TaskID driver_task_id = TaskID::ComputeDriverTaskId(worker_id);
worker->AssignTaskId(driver_task_id);
worker->AssignJobId(job_id);
Status status = worker_pool_.RegisterDriver(worker);
status = worker_pool_.RegisterDriver(worker);
if (status.ok()) {
local_queues_.AddDriverTaskId(driver_task_id);
auto job_data_ptr =
gcs::CreateJobTableData(job_id, /*is_dead*/ false, std::time(nullptr),
initial_config_.node_manager_address, pid);
auto job_data_ptr = gcs::CreateJobTableData(
job_id, /*is_dead*/ false, std::time(nullptr),
initial_config_.node_manager_address, message->worker_pid());
RAY_CHECK_OK(gcs_client_->Jobs().AsyncAdd(job_data_ptr, nullptr));
}
}
Expand Down Expand Up @@ -1200,8 +1196,7 @@ void NodeManager::ProcessDisconnectClientMessage(
cluster_resource_map_[self_node_id_].Release(lifetime_resources.ToResourceSet());
worker->ResetLifetimeResourceIds();

RAY_LOG(DEBUG) << "Worker (pid=" << worker->Process().get()->id()
<< ") is disconnected. "
RAY_LOG(DEBUG) << "Worker (pid=" << worker->Pid() << ") is disconnected. "
<< "job_id: " << worker->GetAssignedJobId();

// Since some resources may have been released, we can try to dispatch more tasks.
Expand All @@ -1215,8 +1210,7 @@ void NodeManager::ProcessDisconnectClientMessage(
local_queues_.RemoveDriverTaskId(TaskID::ComputeDriverTaskId(driver_id));
worker_pool_.DisconnectDriver(worker);

RAY_LOG(DEBUG) << "Driver (pid=" << worker->Process().get()->id()
<< ") is disconnected. "
RAY_LOG(DEBUG) << "Driver (pid=" << worker->Pid() << ") is disconnected. "
<< "job_id: " << job_id;
}

Expand Down Expand Up @@ -2296,8 +2290,7 @@ void NodeManager::AssignTask(const std::shared_ptr<Worker> &worker, const Task &
}

RAY_LOG(DEBUG) << "Assigning task " << spec.TaskId() << " to worker with pid "
<< worker->Process().get()->id()
<< ", worker id: " << worker->WorkerId();
<< worker->Pid() << ", worker id: " << worker->WorkerId();
flatbuffers::FlatBufferBuilder fbb;

// Resource accounting: acquire resources for the assigned task.
Expand Down Expand Up @@ -3128,7 +3121,7 @@ void NodeManager::HandleGetNodeStats(const rpc::GetNodeStatsRequest &request,
rpc::SendReplyCallback send_reply_callback) {
for (const auto &driver : worker_pool_.GetAllDrivers()) {
auto worker_stats = reply->add_workers_stats();
worker_stats->set_pid(driver->Process().get()->id());
worker_stats->set_pid(driver->Pid());
worker_stats->set_is_driver(true);
}
for (const auto task : local_queues_.GetTasks(TaskState::INFEASIBLE)) {
Expand Down Expand Up @@ -3191,7 +3184,7 @@ void NodeManager::HandleGetNodeStats(const rpc::GetNodeStatsRequest &request,
<< status.ToString();
} else {
auto worker_stats = reply->add_workers_stats();
worker_stats->set_pid(worker->Process().get()->id());
worker_stats->set_pid(worker->Pid());
worker_stats->set_is_driver(false);
reply->set_num_workers(reply->num_workers() + 1);
worker_stats->mutable_core_worker_stats()->MergeFrom(r.core_worker_stats());
Expand Down
10 changes: 3 additions & 7 deletions src/ray/raylet/worker.cc
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,11 @@ namespace ray {
namespace raylet {

/// A constructor responsible for initializing the state of a worker.
Worker::Worker(const WorkerID &worker_id, const Language &language, int port,
Worker::Worker(const WorkerID &worker_id, pid_t pid, const Language &language, int port,
std::shared_ptr<LocalClientConnection> connection,
rpc::ClientCallManager &client_call_manager)
: worker_id_(worker_id),
pid_(pid),
language_(language),
port_(port),
connection_(connection),
Expand All @@ -41,12 +42,7 @@ bool Worker::IsBlocked() const { return blocked_; }

WorkerID Worker::WorkerId() const { return worker_id_; }

ProcessHandle Worker::Process() const { return proc_; }

void Worker::SetProcess(const ProcessHandle &proc) {
RAY_CHECK(!proc_); // this procedure should not be called multiple times
proc_ = proc;
}
pid_t Worker::Pid() const { return pid_; }

Language Worker::GetLanguage() const { return language_; }

Expand Down
15 changes: 7 additions & 8 deletions src/ray/raylet/worker.h
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,8 @@
#include "ray/common/task/task.h"
#include "ray/common/task/task_common.h"
#include "ray/rpc/worker/core_worker_client.h"
#include "ray/util/process.h"

#include <unistd.h> // pid_t

namespace ray {

Expand All @@ -21,8 +22,7 @@ namespace raylet {
class Worker {
public:
/// A constructor that initializes a worker object.
/// NOTE: You MUST manually set the worker process.
Worker(const WorkerID &worker_id, const Language &language, int port,
Worker(const WorkerID &worker_id, pid_t pid, const Language &language, int port,
std::shared_ptr<LocalClientConnection> connection,
rpc::ClientCallManager &client_call_manager);
/// A destructor responsible for freeing all worker state.
Expand All @@ -34,9 +34,8 @@ class Worker {
bool IsBlocked() const;
/// Return the worker's ID.
WorkerID WorkerId() const;
/// Return the worker process.
ProcessHandle Process() const;
void SetProcess(const ProcessHandle &proc);
/// Return the worker's PID.
pid_t Pid() const;
Language GetLanguage() const;
int Port() const;
void AssignTaskId(const TaskID &task_id);
Expand Down Expand Up @@ -80,8 +79,8 @@ class Worker {
private:
/// The worker's ID.
WorkerID worker_id_;
/// The worker's process.
ProcessHandle proc_;
/// The worker's PID.
pid_t pid_;
/// The language type of this worker.
Language language_;
/// Port that this worker listens on.
Expand Down
Loading

0 comments on commit 38ec2e7

Please sign in to comment.