Skip to content

Commit

Permalink
Revert "Revert "[core] Fix gcs healthch manager crash when node is re…
Browse files Browse the repository at this point in the history
…moved by node manager."" (ray-project#32019)

This reverts commit 51c5eda.

Reverts ray-project#31995

Skip the windows test.

Signed-off-by: Yi Cheng <[email protected]>
  • Loading branch information
fishbone authored Jan 30, 2023
1 parent 907e968 commit 664c844
Show file tree
Hide file tree
Showing 6 changed files with 91 additions and 59 deletions.
1 change: 1 addition & 0 deletions .bazelrc
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ build:tsan --copt -g
build:tsan --copt -fno-omit-frame-pointer
build:tsan --copt -Wno-uninitialized
build:tsan --linkopt -fsanitize=thread
build:tsan --cxxopt="-D_RAY_TSAN_BUILD"
# This config is only for running TSAN with LLVM toolchain on Linux.
build:tsan-clang --config=tsan
build:tsan-clang --config=llvm
Expand Down
2 changes: 1 addition & 1 deletion BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -1874,7 +1874,7 @@ cc_library(

cc_test(
name = "gcs_health_check_manager_test",
size = "small",
size = "medium",
srcs = [
"src/ray/gcs/gcs_server/test/gcs_health_check_manager_test.cc",
],
Expand Down
1 change: 1 addition & 0 deletions ci/ci.sh
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,7 @@ test_core() {
-//:event_test
-//:gcs_server_rpc_test
-//:ray_syncer_test # TODO (iycheng): it's flaky on windows. Add it back once we figure out the cause
-//:gcs_health_check_manager_test
-//:gcs_client_reconnection_test
)
;;
Expand Down
48 changes: 22 additions & 26 deletions src/ray/gcs/gcs_server/gcs_health_check_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -53,15 +53,19 @@ void GcsHealthCheckManager::RemoveNode(const NodeID &node_id) {
if (iter == health_check_contexts_.end()) {
return;
}
iter->second->Stop();
health_check_contexts_.erase(iter);
},
"GcsHealthCheckManager::RemoveNode");
}

void GcsHealthCheckManager::FailNode(const NodeID &node_id) {
RAY_LOG(WARNING) << "Node " << node_id << " is dead because the health check failed.";
on_node_death_callback_(node_id);
health_check_contexts_.erase(node_id);
auto iter = health_check_contexts_.find(node_id);
if (iter != health_check_contexts_.end()) {
on_node_death_callback_(node_id);
health_check_contexts_.erase(iter);
}
}

std::vector<NodeID> GcsHealthCheckManager::GetAllNodes() const {
Expand All @@ -75,27 +79,23 @@ std::vector<NodeID> GcsHealthCheckManager::GetAllNodes() const {
void GcsHealthCheckManager::HealthCheckContext::StartHealthCheck() {
using ::grpc::health::v1::HealthCheckResponse;

context_ = std::make_shared<grpc::ClientContext>();
// Reset the context/request/response for the next request.
context_.~ClientContext();
new (&context_) grpc::ClientContext();
response_.Clear();

auto deadline =
std::chrono::system_clock::now() + std::chrono::milliseconds(manager_->timeout_ms_);
context_->set_deadline(deadline);
context_.set_deadline(deadline);
stub_->async()->Check(
context_.get(),
&request_,
&response_,
[this, stopped = this->stopped_, context = this->context_, now = absl::Now()](
::grpc::Status status) {
&context_, &request_, &response_, [this, now = absl::Now()](::grpc::Status status) {
// This callback is done in gRPC's thread pool.
STATS_health_check_rpc_latency_ms.Record(
absl::ToInt64Milliseconds(absl::Now() - now));
if (status.error_code() == ::grpc::StatusCode::CANCELLED) {
return;
}
manager_->io_service_.post(
[this, stopped, status]() {
// Stopped has to be read in the same thread where it's updated.
if (*stopped) {
[this, status]() {
if (stopped_) {
delete this;
return;
}
RAY_LOG(DEBUG) << "Health check status: " << int(response_.status());
Expand All @@ -110,32 +110,28 @@ void GcsHealthCheckManager::HealthCheckContext::StartHealthCheck() {
}

if (health_check_remaining_ == 0) {
manager_->io_service_.post([this]() { manager_->FailNode(node_id_); },
"");
manager_->FailNode(node_id_);
delete this;
} else {
// Do another health check.
timer_.expires_from_now(
boost::posix_time::milliseconds(manager_->period_ms_));
timer_.async_wait([this, stopped](auto ec) {
// We need to check stopped here as well since cancel
// won't impact the queued tasks.
if (ec != boost::asio::error::operation_aborted && !*stopped) {
StartHealthCheck();
}
});
timer_.async_wait([this](auto) { StartHealthCheck(); });
}
},
"HealthCheck");
});
}

void GcsHealthCheckManager::HealthCheckContext::Stop() { stopped_ = true; }

void GcsHealthCheckManager::AddNode(const NodeID &node_id,
std::shared_ptr<grpc::Channel> channel) {
io_service_.dispatch(
[this, channel, node_id]() {
RAY_CHECK(health_check_contexts_.count(node_id) == 0);
auto context = std::make_unique<HealthCheckContext>(this, channel, node_id);
health_check_contexts_.emplace(std::make_pair(node_id, std::move(context)));
auto context = new HealthCheckContext(this, channel, node_id);
health_check_contexts_.emplace(std::make_pair(node_id, context));
},
"GcsHealthCheckManager::AddNode");
}
Expand Down
23 changes: 5 additions & 18 deletions src/ray/gcs/gcs_server/gcs_health_check_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -91,27 +91,16 @@ class GcsHealthCheckManager {
NodeID node_id)
: manager_(manager),
node_id_(node_id),
stopped_(std::make_shared<bool>(false)),
timer_(manager->io_service_),
health_check_remaining_(manager->failure_threshold_) {
request_.set_service(node_id.Hex());
stub_ = grpc::health::v1::Health::NewStub(channel);
timer_.expires_from_now(
boost::posix_time::milliseconds(manager_->initial_delay_ms_));
timer_.async_wait([this](auto ec) {
if (ec != boost::asio::error::operation_aborted) {
StartHealthCheck();
}
});
timer_.async_wait([this](auto) { StartHealthCheck(); });
}

~HealthCheckContext() {
timer_.cancel();
if (context_ != nullptr) {
context_->TryCancel();
}
*stopped_ = true;
}
void Stop();

private:
void StartHealthCheck();
Expand All @@ -121,14 +110,12 @@ class GcsHealthCheckManager {
NodeID node_id_;

// Whether the health check has stopped.
std::shared_ptr<bool> stopped_;
bool stopped_ = false;

/// gRPC related fields
std::unique_ptr<::grpc::health::v1::Health::Stub> stub_;

// The context is used in the gRPC callback which is in another
// thread, so we need it to be a shared_ptr.
std::shared_ptr<grpc::ClientContext> context_;
grpc::ClientContext context_;
::grpc::health::v1::HealthCheckRequest request_;
::grpc::health::v1::HealthCheckResponse response_;

Expand All @@ -146,7 +133,7 @@ class GcsHealthCheckManager {
std::function<void(const NodeID &)> on_node_death_callback_;

/// The context of the health check for each nodes.
absl::flat_hash_map<NodeID, std::unique_ptr<HealthCheckContext>> health_check_contexts_;
absl::flat_hash_map<NodeID, HealthCheckContext *> health_check_contexts_;

/// The delay for the first health check request.
const int64_t initial_delay_ms_;
Expand Down
75 changes: 61 additions & 14 deletions src/ray/gcs/gcs_server/test/gcs_health_check_manager_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,13 @@
#include <boost/date_time/posix_time/posix_time.hpp>
#include <boost/optional.hpp>
#include <boost/thread.hpp>
#include <cstdlib>
#include <unordered_map>

using namespace boost;
using namespace boost::asio;
using namespace boost::asio::ip;

#include <ray/rpc/grpc_server.h>

#include <chrono>
Expand All @@ -30,6 +34,20 @@ using namespace boost;
#include "gtest/gtest.h"
#include "ray/gcs/gcs_server/gcs_health_check_manager.h"

int GetFreePort() {
io_service io_service;
tcp::acceptor acceptor(io_service);
tcp::endpoint endpoint;

// try to bind to port 0 to find a free port
acceptor.open(tcp::v4());
acceptor.bind(tcp::endpoint(tcp::v4(), 0));
endpoint = acceptor.local_endpoint();
auto port = endpoint.port();
acceptor.close();
return port;
}

using namespace ray;
using namespace std::literals::chrono_literals;

Expand All @@ -46,7 +64,6 @@ class GcsHealthCheckManagerTest : public ::testing::Test {
timeout_ms,
period_ms,
failure_threshold);
port = 10000;
}

void TearDown() override {
Expand All @@ -65,7 +82,8 @@ class GcsHealthCheckManagerTest : public ::testing::Test {
NodeID AddServer(bool alive = true) {
std::promise<int> port_promise;
auto node_id = NodeID::FromRandom();

auto port = GetFreePort();
RAY_LOG(INFO) << "Get port " << port;
auto server = std::make_shared<rpc::GrpcServer>(node_id.Hex(), port, true);

auto channel = grpc::CreateChannel("localhost:" + std::to_string(port),
Expand All @@ -76,7 +94,6 @@ class GcsHealthCheckManagerTest : public ::testing::Test {
}
servers.emplace(node_id, server);
health_check->AddNode(node_id, channel);
++port;
return node_id;
}

Expand Down Expand Up @@ -115,14 +132,13 @@ class GcsHealthCheckManagerTest : public ::testing::Test {
}
}

int port;
instrumented_io_context io_service;
std::unique_ptr<gcs::GcsHealthCheckManager> health_check;
std::unordered_map<NodeID, std::shared_ptr<rpc::GrpcServer>> servers;
std::unordered_set<NodeID> dead_nodes;
const int64_t initial_delay_ms = 1000;
const int64_t timeout_ms = 1000;
const int64_t period_ms = 1000;
const int64_t initial_delay_ms = 100;
const int64_t timeout_ms = 10;
const int64_t period_ms = 10;
const int64_t failure_threshold = 5;
};

Expand All @@ -143,8 +159,6 @@ TEST_F(GcsHealthCheckManagerTest, TestBasic) {
Run(2); // One for starting RPC and one for the RPC callback.
}

Run(); // For failure callback.

ASSERT_EQ(1, dead_nodes.size());
ASSERT_TRUE(dead_nodes.count(node_id));
}
Expand All @@ -169,8 +183,6 @@ TEST_F(GcsHealthCheckManagerTest, StoppedAndResume) {
}
}

Run(); // For failure callback.

ASSERT_EQ(0, dead_nodes.size());
}

Expand All @@ -196,8 +208,6 @@ TEST_F(GcsHealthCheckManagerTest, Crashed) {
Run(2); // One for starting RPC and one for the RPC callback.
}

Run(); // For failure callback.

ASSERT_EQ(1, dead_nodes.size());
ASSERT_TRUE(dead_nodes.count(node_id));
}
Expand Down Expand Up @@ -230,12 +240,49 @@ TEST_F(GcsHealthCheckManagerTest, NoRegister) {
Run(2); // One for starting RPC and one for the RPC callback.
}

Run(2);
Run(1);
ASSERT_EQ(1, dead_nodes.size());
ASSERT_TRUE(dead_nodes.count(node_id));
}

TEST_F(GcsHealthCheckManagerTest, StressTest) {
#ifdef _RAY_TSAN_BUILD
GTEST_SKIP() << "Disabled in tsan because of performance";
#endif
boost::asio::io_service::work work(io_service);
std::srand(std::time(nullptr));
auto t = std::make_unique<std::thread>([this]() { this->io_service.run(); });

std::vector<NodeID> alive_nodes;

for (int i = 0; i < 200; ++i) {
alive_nodes.emplace_back(AddServer(true));
std::this_thread::sleep_for(10ms);
}

for (size_t i = 0; i < 20000UL; ++i) {
RAY_LOG(INFO) << "Progress: " << i << "/20000";
auto iter = alive_nodes.begin() + std::rand() % alive_nodes.size();
health_check->RemoveNode(*iter);
DeleteServer(*iter);
alive_nodes.erase(iter);
alive_nodes.emplace_back(AddServer(true));
}
RAY_LOG(INFO) << "Finished!";
io_service.stop();
t->join();
}

int main(int argc, char **argv) {
InitShutdownRAII ray_log_shutdown_raii(ray::RayLog::StartRayLog,
ray::RayLog::ShutDownRayLog,
argv[0],
ray::RayLogLevel::INFO,
/*log_dir=*/"");

ray::RayLog::InstallFailureSignalHandler(argv[0]);
ray::RayLog::InstallTerminateHandler();

::testing::InitGoogleTest(&argc, argv);
return RUN_ALL_TESTS();
}

0 comments on commit 664c844

Please sign in to comment.