Skip to content

Commit

Permalink
[Core] Make PythonGcsClient to be thread safe (ray-project#41777)
Browse files Browse the repository at this point in the history
Currently, the python gcs client connect allows multithreading but PythonGcsClient::Connect is not thread safe which might result race condition that can cause segfault in PythonGcsClient::Connect. So, our solution is make the PythonGcsClient entirely thread safe by adding mutex lock in Connect.

Signed-off-by: Jonathan Nitisastro <[email protected]>
  • Loading branch information
jonathan-anyscale authored Dec 14, 2023
1 parent 263a250 commit d9b7b33
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 0 deletions.
16 changes: 16 additions & 0 deletions src/ray/gcs/gcs_client/gcs_client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,7 @@ Status HandleGcsError(rpc::GcsStatus status) {
Status PythonGcsClient::Connect(const ClusterID &cluster_id,
int64_t timeout_ms,
size_t num_retries) {
absl::WriterMutexLock lock(&mutex_);
channel_ =
rpc::GcsRpcClient::CreateGcsChannel(options_.gcs_address_, options_.gcs_port_);
node_info_stub_ = rpc::NodeInfoGcsService::NewStub(channel_);
Expand Down Expand Up @@ -216,6 +217,7 @@ Status PythonGcsClient::CheckAlive(const std::vector<std::string> &raylet_addres
request.add_raylet_address(address);
}

absl::ReaderMutexLock lock(&mutex_);
rpc::CheckAliveReply reply;
grpc::Status status = node_info_stub_->CheckAlive(&context, request, &reply);

Expand All @@ -241,6 +243,7 @@ Status PythonGcsClient::InternalKVGet(const std::string &ns,
request.set_namespace_(ns);
request.set_key(key);

absl::ReaderMutexLock lock(&mutex_);
rpc::InternalKVGetReply reply;

grpc::Status status = kv_stub_->InternalKVGet(&context, request, &reply);
Expand Down Expand Up @@ -268,6 +271,7 @@ Status PythonGcsClient::InternalKVMultiGet(
request.set_namespace_(ns);
request.mutable_keys()->Add(keys.begin(), keys.end());

absl::ReaderMutexLock lock(&mutex_);
rpc::InternalKVMultiGetReply reply;

grpc::Status status = kv_stub_->InternalKVMultiGet(&context, request, &reply);
Expand Down Expand Up @@ -302,6 +306,7 @@ Status PythonGcsClient::InternalKVPut(const std::string &ns,
request.set_value(value);
request.set_overwrite(overwrite);

absl::ReaderMutexLock lock(&mutex_);
rpc::InternalKVPutReply reply;

grpc::Status status = kv_stub_->InternalKVPut(&context, request, &reply);
Expand All @@ -328,6 +333,7 @@ Status PythonGcsClient::InternalKVDel(const std::string &ns,
request.set_key(key);
request.set_del_by_prefix(del_by_prefix);

absl::ReaderMutexLock lock(&mutex_);
rpc::InternalKVDelReply reply;

grpc::Status status = kv_stub_->InternalKVDel(&context, request, &reply);
Expand All @@ -352,6 +358,7 @@ Status PythonGcsClient::InternalKVKeys(const std::string &ns,
request.set_namespace_(ns);
request.set_prefix(prefix);

absl::ReaderMutexLock lock(&mutex_);
rpc::InternalKVKeysReply reply;

grpc::Status status = kv_stub_->InternalKVKeys(&context, request, &reply);
Expand All @@ -376,6 +383,7 @@ Status PythonGcsClient::InternalKVExists(const std::string &ns,
request.set_namespace_(ns);
request.set_key(key);

absl::ReaderMutexLock lock(&mutex_);
rpc::InternalKVExistsReply reply;

grpc::Status status = kv_stub_->InternalKVExists(&context, request, &reply);
Expand All @@ -399,6 +407,7 @@ Status PythonGcsClient::PinRuntimeEnvUri(const std::string &uri,
request.set_uri(uri);
request.set_expiration_s(expiration_s);

absl::ReaderMutexLock lock(&mutex_);
rpc::PinRuntimeEnvURIReply reply;

grpc::Status status = runtime_env_stub_->PinRuntimeEnvURI(&context, request, &reply);
Expand All @@ -423,6 +432,7 @@ Status PythonGcsClient::GetAllNodeInfo(int64_t timeout_ms,
grpc::ClientContext context;
PrepareContext(context, timeout_ms);

absl::ReaderMutexLock lock(&mutex_);
rpc::GetAllNodeInfoRequest request;
rpc::GetAllNodeInfoReply reply;

Expand All @@ -443,6 +453,7 @@ Status PythonGcsClient::GetAllJobInfo(int64_t timeout_ms,
grpc::ClientContext context;
PrepareContext(context, timeout_ms);

absl::ReaderMutexLock lock(&mutex_);
rpc::GetAllJobInfoRequest request;
rpc::GetAllJobInfoReply reply;

Expand All @@ -463,6 +474,7 @@ Status PythonGcsClient::GetAllResourceUsage(int64_t timeout_ms,
grpc::ClientContext context;
PrepareContext(context, timeout_ms);

absl::ReaderMutexLock lock(&mutex_);
rpc::GetAllResourceUsageRequest request;
rpc::GetAllResourceUsageReply reply;

Expand All @@ -485,6 +497,7 @@ Status PythonGcsClient::RequestClusterResourceConstraint(
grpc::ClientContext context;
PrepareContext(context, timeout_ms);

absl::ReaderMutexLock lock(&mutex_);
rpc::autoscaler::RequestClusterResourceConstraintRequest request;
rpc::autoscaler::RequestClusterResourceConstraintReply reply;
RAY_CHECK(bundles.size() == count_array.size());
Expand Down Expand Up @@ -516,6 +529,7 @@ Status PythonGcsClient::GetClusterStatus(int64_t timeout_ms,
grpc::ClientContext context;
PrepareContext(context, timeout_ms);

absl::ReaderMutexLock lock(&mutex_);
grpc::Status status = autoscaler_stub_->GetClusterStatus(&context, request, &reply);

if (status.ok()) {
Expand All @@ -542,6 +556,7 @@ Status PythonGcsClient::DrainNode(const std::string &node_id,
grpc::ClientContext context;
PrepareContext(context, timeout_ms);

absl::ReaderMutexLock lock(&mutex_);
grpc::Status status = autoscaler_stub_->DrainNode(&context, request, &reply);

if (status.ok()) {
Expand All @@ -562,6 +577,7 @@ Status PythonGcsClient::DrainNodes(const std::vector<std::string> &node_ids,
request.add_drain_node_data()->set_node_id(node_id);
}

absl::ReaderMutexLock lock(&mutex_);
rpc::DrainNodeReply reply;

grpc::Status status = node_info_stub_->DrainNode(&context, request, &reply);
Expand Down
2 changes: 2 additions & 0 deletions src/ray/gcs/gcs_client/gcs_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -278,6 +278,8 @@ class RAY_EXPORT PythonGcsClient {
std::unique_ptr<rpc::JobInfoGcsService::Stub> job_info_stub_;
std::unique_ptr<rpc::autoscaler::AutoscalerStateService::Stub> autoscaler_stub_;
std::shared_ptr<grpc::Channel> channel_;
// Make PythonGcsClient thread safe, so add a mutex to protect it.
absl::Mutex mutex_;
};

std::unordered_map<std::string, double> PythonGetResourcesTotal(
Expand Down

0 comments on commit d9b7b33

Please sign in to comment.