Skip to content

Commit

Permalink
[Core] QuickExit CoreWorker when GetCoreWorker is called after shutdown
Browse files Browse the repository at this point in the history
  • Loading branch information
scv119 authored Oct 6, 2021
1 parent 0f91582 commit 1ed5f62
Show file tree
Hide file tree
Showing 2 changed files with 43 additions and 26 deletions.
66 changes: 40 additions & 26 deletions src/ray/core_worker/core_worker.cc
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@

namespace ray {
namespace core {
namespace {

// Duration between internal book-keeping heartbeats.
const uint64_t kInternalHeartbeatMillis = 1000;
Expand Down Expand Up @@ -89,6 +90,16 @@ ObjectLocation CreateObjectLocation(const rpc::GetObjectLocationsOwnerReply &rep
/// The global instance of `CoreWorkerProcess`.
std::unique_ptr<CoreWorkerProcess> core_worker_process;

/// Teriminate the process without cleaning up the resources.
/// It will flush the log if logging_enabled is set to true.
void QuickExit(bool logging_enabled) {
if (logging_enabled) {
RayLog::ShutDownRayLog();
}
_Exit(1);
}
} // namespace

thread_local std::weak_ptr<CoreWorker> CoreWorkerProcess::current_core_worker_;

void CoreWorkerProcess::Initialize(const CoreWorkerOptions &options) {
Expand Down Expand Up @@ -148,18 +159,8 @@ CoreWorkerProcess::CoreWorkerProcess(const CoreWorkerOptions &options)
// NOTE(kfstorm): any initialization depending on RayConfig must happen after this line.
InitializeSystemConfig();

if (options_.num_workers == 1) {
// We need to create the worker instance here if:
// 1. This is a driver process. In this case, the driver is ready to use right after
// the CoreWorkerProcess::Initialize.
// 2. This is a Python worker process. In this case, Python will invoke some core
// worker APIs before `CoreWorkerProcess::RunTaskExecutionLoop` is called. So we need
// to create the worker instance here. One example of invocations is
// https://github.com/ray-project/ray/blob/45ce40e5d44801193220d2c546be8de0feeef988/python/ray/worker.py#L1281.
if (options_.worker_type == WorkerType::DRIVER ||
options_.language == Language::PYTHON) {
CreateWorker();
}
if (ShouldCreateGlobalWorkerOnConstruction()) {
CreateWorker();
}

// Assume stats module will be initialized exactly once in once process.
Expand Down Expand Up @@ -257,6 +258,18 @@ void CoreWorkerProcess::InitializeSystemConfig() {
RayConfig::instance().initialize(promise.get_future().get());
}

bool CoreWorkerProcess::ShouldCreateGlobalWorkerOnConstruction() const {
// We need to create the worker instance here if:
// 1. This is a driver process. In this case, the driver is ready to use right after
// the CoreWorkerProcess::Initialize.
// 2. This is a Python worker process. In this case, Python will invoke some core
// worker APIs before `CoreWorkerProcess::RunTaskExecutionLoop` is called. So we need
// to create the worker instance here. One example of invocations is
// https://github.com/ray-project/ray/blob/45ce40e5d44801193220d2c546be8de0feeef988/python/ray/worker.py#L1281.
return options_.num_workers == 1 && (options_.worker_type == WorkerType::DRIVER ||
options_.language == Language::PYTHON);
}

std::shared_ptr<CoreWorker> CoreWorkerProcess::TryGetWorker(const WorkerID &worker_id) {
if (!core_worker_process) {
return nullptr;
Expand All @@ -273,6 +286,16 @@ CoreWorker &CoreWorkerProcess::GetCoreWorker() {
EnsureInitialized();
if (core_worker_process->options_.num_workers == 1) {
auto global_worker = core_worker_process->GetGlobalWorker();
if (core_worker_process->ShouldCreateGlobalWorkerOnConstruction() && !global_worker) {
// This could only happen when the worker has already been shutdown.
// In this case, we should exit without crashing.
// TODO (scv119): A better solution could be returning error code
// and handling it at language frontend.
RAY_LOG(ERROR) << "The global worker has already been shutdown. This happens when "
"the language frontend accesses the Ray's worker after it is "
"shutdown. The process will exit";
QuickExit(core_worker_process->options_.enable_logging);
}
RAY_CHECK(global_worker) << "global_worker_ must not be NULL";
return *global_worker;
}
Expand Down Expand Up @@ -417,11 +440,8 @@ CoreWorker::CoreWorker(const CoreWorkerOptions &options, const WorkerID &worker_
// Avoid using FATAL log or RAY_CHECK here because they may create a core dump file.
RAY_LOG(ERROR) << "Failed to register worker " << worker_id << " to Raylet. "
<< raylet_client_status;
if (options_.enable_logging) {
RayLog::ShutDownRayLog();
}
// Quit the process immediately.
_Exit(1);
QuickExit(options_.enable_logging);
}

connected_ = true;
Expand Down Expand Up @@ -2882,13 +2902,10 @@ void CoreWorker::HandleCancelTask(const rpc::CancelTaskRequest &request,
<< " has received a force kill request after the cancellation. Killing "
"a worker...";
Disconnect();
if (options_.enable_logging) {
RayLog::ShutDownRayLog();
}
// NOTE(hchen): Use `_Exit()` to force-exit this process without doing cleanup.
// NOTE(hchen): Use `QuickExit()` to force-exit this process without doing cleanup.
// `exit()` will destruct static objects in an incorrect order, which will lead to
// core dumps.
_Exit(1);
QuickExit(options_.enable_logging);
}
}

Expand Down Expand Up @@ -2921,13 +2938,10 @@ void CoreWorker::HandleKillActor(const rpc::KillActorRequest &request,
"please create the Java actor with some dynamic options to make it being "
"hosted in a dedicated worker process.";
}
if (options_.enable_logging) {
RayLog::ShutDownRayLog();
}
// NOTE(hchen): Use `_Exit()` to force-exit this process without doing cleanup.
// NOTE(hchen): Use `QuickExit()` to force-exit this process without doing cleanup.
// `exit()` will destruct static objects in an incorrect order, which will lead to
// core dumps.
_Exit(1);
QuickExit(options_.enable_logging);
} else {
Exit(rpc::WorkerExitType::INTENDED_EXIT);
}
Expand Down
3 changes: 3 additions & 0 deletions src/ray/core_worker/core_worker.h
Original file line number Diff line number Diff line change
Expand Up @@ -291,6 +291,9 @@ class CoreWorkerProcess {

void InitializeSystemConfig();

/// Check that if the global worker should be created on construction.
bool ShouldCreateGlobalWorkerOnConstruction() const;

/// Get the `CoreWorker` instance by worker ID.
///
/// \param[in] workerId The worker ID.
Expand Down

0 comments on commit 1ed5f62

Please sign in to comment.