Skip to content

Commit

Permalink
Revert "Revert "[core] Move reconnection to RPC layer for GCS client.…
Browse files Browse the repository at this point in the history
…"" (ray-project#24764)

* Revert "Revert "[core] Move reconnection to RPC layer for GCS client. (ray-project#24330)" (ray-project#24762)"

This reverts commit 30f370b.
  • Loading branch information
fishbone authored May 15, 2022
1 parent 8e8deae commit 684e395
Show file tree
Hide file tree
Showing 14 changed files with 705 additions and 313 deletions.
23 changes: 23 additions & 0 deletions BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -2064,6 +2064,29 @@ cc_test(
],
)

cc_test(
name = "gcs_client_reconnection_test",
srcs = [
"src/ray/gcs/gcs_client/test/gcs_client_reconnection_test.cc",
],
args = [
"$(location redis-server)",
"$(location redis-cli)",
],
copts = COPTS,
data = [
"//:redis-cli",
"//:redis-server",
],
tags = ["team:core"],
deps = [
":gcs_client_lib",
":gcs_server_lib",
":gcs_test_util_lib",
"@com_google_googletest//:gtest_main",
],
)

cc_library(
name = "object_manager",
srcs = glob([
Expand Down
1 change: 1 addition & 0 deletions ci/ci.sh
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,7 @@ test_core() {
-//:gcs_server_test
-//:gcs_server_rpc_test
-//:ray_syncer_test # TODO (iycheng): it's flaky on windows. Add it back once we figure out the cause
-//:gcs_client_reconnection_test
)
;;
esac
Expand Down
8 changes: 4 additions & 4 deletions python/ray/_private/services.py
Original file line number Diff line number Diff line change
Expand Up @@ -579,17 +579,17 @@ def create_redis_client(redis_address, password=None):
try:
cli.ping()
return cli
except Exception:
except Exception as e:
create_redis_client.instances.pop(redis_address)
if i >= num_retries - 1:
break
raise RuntimeError(
f"Unable to connect to Redis at {redis_address}: {e}"
)
# Wait a little bit.
time.sleep(delay)
# Make sure the retry interval doesn't increase too large.
delay = min(1, delay * 2)

raise RuntimeError(f"Unable to connect to Redis at {redis_address}")


def start_ray_process(
command,
Expand Down
72 changes: 72 additions & 0 deletions python/ray/tests/test_gcs_fault_tolerance.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import ray
import ray._private.gcs_utils as gcs_utils
import pytest
from time import sleep
from ray._private.test_utils import (
generate_system_config_map,
wait_for_condition,
Expand Down Expand Up @@ -214,6 +215,77 @@ def test_gcs_client_reconnect(ray_start_regular_with_external_redis, auto_reconn
assert gcs_client.internal_kv_get(b"a", None) == b"b"


@pytest.mark.parametrize(
"ray_start_regular_with_external_redis",
[
{
**generate_system_config_map(
num_heartbeats_timeout=20, gcs_rpc_server_reconnect_timeout_s=3600
),
"namespace": "actor",
}
],
indirect=True,
)
def test_actor_workloads(ray_start_regular_with_external_redis):
"""This test cover the case to create actor while gcs is down
and also make sure existing actor continue to work even when
GCS is down.
"""

@ray.remote
class Counter:
def r(self, v):
return v

c = Counter.remote()
r = ray.get(c.r.remote(10))
assert r == 10

print("GCS is killed")
ray.worker._global_node.kill_gcs_server()

print("Start to create a new actor")
cc = Counter.remote()
with pytest.raises(ray.exceptions.GetTimeoutError):
ray.get(cc.r.remote(10), timeout=5)

assert ray.get(c.r.remote(10)) == 10
ray.worker._global_node.start_gcs_server()

import threading

def f():
assert ray.get(cc.r.remote(10)) == 10

t = threading.Thread(target=f)
t.start()
t.join()

c = Counter.options(lifetime="detached", name="C").remote()

assert ray.get(c.r.remote(10)) == 10

ray.worker._global_node.kill_gcs_server()

sleep(2)

assert ray.get(c.r.remote(10)) == 10

ray.worker._global_node.start_gcs_server()

from ray._private.test_utils import run_string_as_driver

run_string_as_driver(
"""
import ray
ray.init('auto', namespace='actor')
a = ray.get_actor("C")
assert ray.get(a.r.remote(10)) == 10
"""
)


if __name__ == "__main__":
import pytest

Expand Down
2 changes: 1 addition & 1 deletion src/mock/ray/gcs/gcs_client/gcs_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ class MockGcsClient : public GcsClient {
public:
MOCK_METHOD(Status, Connect, (instrumented_io_context & io_service), (override));
MOCK_METHOD(void, Disconnect, (), (override));
MOCK_METHOD((std::pair<std::string, int>), GetGcsServerAddress, (), (override));
MOCK_METHOD((std::pair<std::string, int>), GetGcsServerAddress, (), (const, override));
MOCK_METHOD(std::string, DebugString, (), (const, override));

MockGcsClient() {
Expand Down
17 changes: 17 additions & 0 deletions src/ray/common/ray_config_def.h
Original file line number Diff line number Diff line change
Expand Up @@ -350,9 +350,26 @@ RAY_CONFIG(bool, support_fork, false)
/// Each reconnection ping will be retried every 1 second.
RAY_CONFIG(int32_t, gcs_rpc_server_reconnect_timeout_s, 60)

/// The timeout for GCS connection in seconds
RAY_CONFIG(int32_t, gcs_rpc_server_connect_timeout_s, 5)

/// Minimum interval between reconnecting gcs rpc server when gcs server restarts.
RAY_CONFIG(int32_t, minimum_gcs_reconnect_interval_milliseconds, 5000)

/// gRPC channel reconnection related configs to GCS.
/// Check https://grpc.github.io/grpc/core/group__grpc__arg__keys.html for details
RAY_CONFIG(int32_t, gcs_grpc_max_reconnect_backoff_ms, 2000)
RAY_CONFIG(int32_t, gcs_grpc_min_reconnect_backoff_ms, 100)
RAY_CONFIG(int32_t, gcs_grpc_initial_reconnect_backoff_ms, 100)

/// Maximum bytes of request queued when RPC failed due to GCS is down.
/// If reach the limit, the core worker will hang until GCS is reconnected.
/// By default, the value if 5GB.
RAY_CONFIG(uint64_t, gcs_grpc_max_request_queued_max_bytes, 1024UL * 1024 * 1024 * 5)

/// The duration between two checks for grpc status.
RAY_CONFIG(int32_t, gcs_client_check_connection_status_interval_milliseconds, 1000)

/// Feature flag to use the ray syncer for resource synchronization
RAY_CONFIG(bool, use_ray_syncer, false)

Expand Down
11 changes: 1 addition & 10 deletions src/ray/core_worker/core_worker.cc
Original file line number Diff line number Diff line change
Expand Up @@ -186,16 +186,7 @@ CoreWorker::CoreWorker(const CoreWorkerOptions &options, const WorkerID &worker_
gcs_server_address_.second = port;
});

gcs_client_ = std::make_shared<gcs::GcsClient>(
options_.gcs_options, [this](std::pair<std::string, int> *address) {
absl::MutexLock lock(&gcs_server_address_mutex_);
if (gcs_server_address_.second != 0) {
address->first = gcs_server_address_.first;
address->second = gcs_server_address_.second;
return true;
}
return false;
});
gcs_client_ = std::make_shared<gcs::GcsClient>(options_.gcs_options);

RAY_CHECK_OK(gcs_client_->Connect(io_service_));
RegisterToGcs();
Expand Down
Loading

0 comments on commit 684e395

Please sign in to comment.