Skip to content

Commit

Permalink
Revert "Revert "[serve] Add initial health check before marking a rep… (
Browse files Browse the repository at this point in the history
ray-project#31554)

ray-project#31189 broke the Java codepath. This PR fixes that and also adds the initial health check to Java behavior.
  • Loading branch information
zcin authored Jan 10, 2023
1 parent 2182af1 commit 6a7edce
Show file tree
Hide file tree
Showing 4 changed files with 113 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,17 @@ public boolean isAllocated() {
return true;
}

/**
* Tell the caller this replica is successfully initialized.
*
* @return
*/
public Object isInitialized(Object userConfig) {
Object deploymentVersion = reconfigure(userConfig);
checkHealth();
return deploymentVersion;
}

/**
* Wait until there is no request in processing. It is used for stopping replica gracefully.
*
Expand Down
24 changes: 13 additions & 11 deletions python/ray/serve/_private/deployment_state.py
Original file line number Diff line number Diff line change
Expand Up @@ -375,10 +375,10 @@ def start(self, deployment_info: DeploymentInfo, version: DeploymentVersion):
if self._is_cross_language:
self._actor_handle = JavaActorHandleProxy(self._actor_handle)
self._allocated_obj_ref = self._actor_handle.is_allocated.remote()
self._ready_obj_ref = self._actor_handle.reconfigure.remote(user_config)
self._ready_obj_ref = self._actor_handle.is_initialized.remote(user_config)
else:
self._allocated_obj_ref = self._actor_handle.is_allocated.remote()
self._ready_obj_ref = self._actor_handle.reconfigure.remote(
self._ready_obj_ref = self._actor_handle.is_initialized.remote(
user_config,
# Ensure that `is_allocated` will execute before `reconfigure`,
# because `reconfigure` runs user code that could block the replica
Expand Down Expand Up @@ -431,22 +431,23 @@ def check_ready(self) -> Tuple[ReplicaStartupStatus, Optional[DeploymentVersion]
Check if current replica has started by making ray API calls on
relevant actor / object ref.
Replica initialization calls __init__(), reconfigure(), and check_health().
Returns:
state (ReplicaStartupStatus):
PENDING_ALLOCATION:
- replica is waiting for a worker to start
PENDING_INITIALIZATION
- replica reconfigure() haven't returned.
- replica initialization hasn't finished.
FAILED:
- replica __init__() failed.
- replica initialization failed.
SUCCEEDED:
- replica __init__() and reconfigure() succeeded.
- replica initialization succeeded.
version:
None:
- replica reconfigure() haven't returned OR
- replica __init__() failed.
- for PENDING_ALLOCATION, PENDING_INITIALIZATION, or FAILED states
version:
- replica __init__() and reconfigure() succeeded.
- for SUCCEEDED state
"""

# Check whether the replica has been allocated.
Expand Down Expand Up @@ -1406,9 +1407,10 @@ def _check_curr_status(self) -> bool:
name=self._name,
status=DeploymentStatus.UNHEALTHY,
message=(
"The Deployment constructor failed "
f"{failed_to_start_count} times in a row. See "
"logs for details."
f"The Deployment failed to start {failed_to_start_count} "
"times in a row. This may be due to a problem with the "
"deployment constructor or the initial health check failing. "
"See logs for details."
),
)
return False
Expand Down
19 changes: 15 additions & 4 deletions python/ray/serve/_private/replica.py
Original file line number Diff line number Diff line change
Expand Up @@ -214,13 +214,24 @@ async def is_allocated(self) -> str:
"""
return ray.get_runtime_context().node_id

async def reconfigure(
async def is_initialized(
self, user_config: Optional[Any] = None, _after: Optional[Any] = None
) -> Tuple[DeploymentConfig, DeploymentVersion]:
):
# Unused `_after` argument is for scheduling: passing an ObjectRef
# allows delaying reconfiguration until after this call has returned.
if self.replica is None:
await self._initialize_replica()
await self._initialize_replica()

metadata = await self.reconfigure(user_config)

# A new replica should not be considered healthy until it passes an
# initial health check. If an initial health check fails, consider
# it an initialization failure.
await self.check_health()
return metadata

async def reconfigure(
self, user_config: Optional[Any] = None
) -> Tuple[DeploymentConfig, DeploymentVersion]:
if user_config is not None:
await self.replica.reconfigure(user_config)

Expand Down
75 changes: 74 additions & 1 deletion python/ray/serve/tests/test_healthcheck.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
from ray.exceptions import RayError
from ray._private.test_utils import wait_for_condition
from ray import serve
from ray.serve._private.common import DeploymentStatus
from ray.serve._private.constants import REPLICA_HEALTH_CHECK_UNHEALTHY_THRESHOLD


Expand Down Expand Up @@ -162,7 +163,7 @@ def __call__(self, *args):


def test_consecutive_failures(serve_instance):
# Test that the health check must fail N times before being marked unhealthy.
# Test that the health check must fail N times before being restarted.

counter = ray.remote(Counter).remote()

Expand Down Expand Up @@ -202,6 +203,78 @@ def check_fails_3_times():
check_fails_3_times()


def test_health_check_failure_makes_deployment_unhealthy(serve_instance):
"""If a deployment always fails health check, the deployment should be unhealthy."""

@serve.deployment
class AlwaysUnhealthy:
def check_health(self):
raise Exception("intended to fail")

def __call__(self, *args):
return ray.get_runtime_context().current_actor

with pytest.raises(RuntimeError):
serve.run(AlwaysUnhealthy.bind())

app_status = serve_instance.get_serve_status()
assert (
app_status.deployment_statuses[0].name == "AlwaysUnhealthy"
and app_status.deployment_statuses[0].status == DeploymentStatus.UNHEALTHY
)


def test_health_check_failure_makes_deployment_unhealthy_transition(serve_instance):
"""
If a deployment transitions to unhealthy, then continues to fail health check after
being restarted, the deployment should be unhealthy.
"""

class Toggle:
def __init__(self):
self._should_fail = False

def set_should_fail(self):
self._should_fail = True

def should_fail(self):
return self._should_fail

@serve.deployment(health_check_period_s=1, health_check_timeout_s=1)
class WillBeUnhealthy:
def __init__(self, toggle):
self._toggle = toggle

def check_health(self):
if ray.get(self._toggle.should_fail.remote()):
raise Exception("intended to fail")

def __call__(self, *args):
return ray.get_runtime_context().current_actor

def check_status(expected_status: DeploymentStatus):
app_status = serve_instance.get_serve_status()
return (
app_status.deployment_statuses[0].name == "WillBeUnhealthy"
and app_status.deployment_statuses[0].status == expected_status
)

toggle = ray.remote(Toggle).remote()
serve.run(WillBeUnhealthy.bind(toggle))

# Check that deployment is healthy initially
assert check_status(DeploymentStatus.HEALTHY)

ray.get(toggle.set_should_fail.remote())

# Check that deployment is now unhealthy
wait_for_condition(check_status, expected_status=DeploymentStatus.UNHEALTHY)

# Check that deployment stays unhealthy
for _ in range(5):
assert check_status(DeploymentStatus.UNHEALTHY)


if __name__ == "__main__":
import sys

Expand Down

0 comments on commit 6a7edce

Please sign in to comment.