Skip to content

Commit

Permalink
[core][autoscaler][v2] Reconciler - ray install & terminating instanc…
Browse files Browse the repository at this point in the history
…es [9/x] (ray-project#43072)



---------

Signed-off-by: rickyyx <[email protected]>
Signed-off-by: Ricky Xu <[email protected]>
  • Loading branch information
rickyyx authored Feb 22, 2024
1 parent 7b0a6d9 commit 32e6c62
Show file tree
Hide file tree
Showing 3 changed files with 311 additions and 11 deletions.
198 changes: 198 additions & 0 deletions python/ray/autoscaler/v2/instance_manager/reconciler.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import math
import time
import uuid
from abc import ABC, abstractmethod
from collections import defaultdict
from typing import Dict, List, Optional, Set, Tuple

Expand Down Expand Up @@ -42,6 +43,111 @@
logger = logging.getLogger(__name__)


class IInstanceUpdater(ABC):
"""
An interface to for making instance update.
"""

@abstractmethod
def make_update(self, instance: IMInstance) -> Optional[IMInstanceUpdateEvent]:
"""
Make an instance update for the instance.
Args:
instance: The instance to make update.
Returns:
The instance update event if there's an update. None otherwise.
"""
raise NotImplementedError


class TimeoutInstanceUpdater(IInstanceUpdater):
"""
An instance updater that updates the instance to a new status if it's stuck in the
current status for too long.
"""

def __init__(
self,
cur_status: IMInstance.InstanceStatus,
timeout_s: int,
new_status: Optional["IMInstance.InstanceStatus"] = None,
):
"""
Args:
cur_status: The current status of the instance.
timeout_s: The timeout in seconds.
new_status: The new status to transition to if the instance is stuck in the
current status for too long.
"""
self.cur_status = cur_status
self.timeout_s = timeout_s
self.new_status = new_status

def make_update(self, instance: IMInstance) -> Optional[IMInstanceUpdateEvent]:
if InstanceUtil.has_timeout(instance, self.timeout_s):
return IMInstanceUpdateEvent(
instance_id=instance.instance_id,
new_instance_status=self.new_status,
details=(
f"Timeout={self.timeout_s}s at status "
f"{IMInstance.InstanceStatus.Name(self.cur_status)}"
),
)
return None


class StuckRequestedInstanceUpdater(IInstanceUpdater):
"""
An instance updater that makes updates for instances stuck in the REQUESTED status
for too long.
"""

def __init__(
self,
timeout_s: int,
max_num_request_to_allocate: int,
):
"""
Args:
timeout_s: The timeout in seconds.
max_num_request_to_allocate: The maximum number of times an instance
could be requested to allocate.
"""
self.max_num_request_to_allocate = max_num_request_to_allocate
self.timeout_s = timeout_s

def make_update(self, instance: IMInstance) -> Optional[IMInstanceUpdateEvent]:
if not InstanceUtil.has_timeout(instance, self.timeout_s):
# Not timeout yet, be patient.
return None

all_request_times_ns = sorted(
InstanceUtil.get_status_transition_times_ns(
instance, select_instance_status=IMInstance.REQUESTED
)
)

# Fail the allocation if we have tried too many times.
if len(all_request_times_ns) >= self.max_num_request_to_allocate:
return IMInstanceUpdateEvent(
instance_id=instance.instance_id,
new_instance_status=IMInstance.ALLOCATION_FAILED,
details=(
"Failed to allocate cloud instance after "
f"{len(all_request_times_ns)} attempts"
),
)

# Retry the allocation if we could by transitioning to QUEUED again.
return IMInstanceUpdateEvent(
instance_id=instance.instance_id,
new_instance_status=IMInstance.QUEUED,
details=f"QUEUED again after timeout={self.timeout_s}s",
)


class Reconciler:
"""
A singleton class that reconciles the instance states of the instance manager
Expand Down Expand Up @@ -233,6 +339,13 @@ def _step_next(
instance_manager=instance_manager, autoscaling_config=autoscaling_config
)

Reconciler._terminate_instances(instance_manager=instance_manager)
if not autoscaling_config.skip_ray_install():
Reconciler._install_ray(
instance_manager=instance_manager,
non_terminated_cloud_instances=non_terminated_cloud_instances,
)

#######################################################
# Utility methods for reconciling instance states.
#######################################################
Expand Down Expand Up @@ -900,6 +1013,91 @@ def _warn_stuck_instances(
)
)

@staticmethod
def _terminate_instances(instance_manager: InstanceManager):
"""
Terminate instances with the below statuses:
- RAY_STOPPED: ray was stopped on the cloud instance.
- RAY_INSTALL_FAILED: ray installation failed on the cloud instance,
we will not retry.
- TERMINATION_FAILED: cloud provider failed to terminate the instance
or timeout for termination happened, we will retry again.
Args:
instance_manager: The instance manager to reconcile.
"""

im_instances, version = Reconciler._get_im_instances(instance_manager)
updates = {}
for instance in im_instances:
if instance.status not in [
IMInstance.RAY_STOPPED,
IMInstance.RAY_INSTALL_FAILED,
IMInstance.TERMINATION_FAILED,
]:
continue

# Terminate the instance.
logger.info(
f"Terminating instance {instance.instance_id} with status "
f"{IMInstance.InstanceStatus.Name(instance.status)}"
)
updates[instance.instance_id] = IMInstanceUpdateEvent(
instance_id=instance.instance_id,
new_instance_status=IMInstance.TERMINATING,
)

Reconciler._update_instance_manager(instance_manager, version, updates)

@staticmethod
def _install_ray(
instance_manager: InstanceManager,
non_terminated_cloud_instances: Dict[CloudInstanceId, CloudInstance],
) -> None:
"""
Install ray on the allocated instances when it's ready (cloud instance
should be running)
This is needed if ray installation needs to be performed by
the instance manager.
Args:
instance_manager: The instance manager to reconcile.
"""
im_instances, version = Reconciler._get_im_instances(instance_manager)
updates = {}
for instance in im_instances:
if instance.status != IMInstance.ALLOCATED:
continue

cloud_instance = non_terminated_cloud_instances.get(
instance.cloud_instance_id
)

assert cloud_instance, (
f"Cloud instance {instance.cloud_instance_id} is not found "
"in non_terminated_cloud_instances."
)

if not cloud_instance.is_running:
# It might still be pending (e.g. setting up ssh)
continue

# Install ray on the running cloud instance
updates[instance.instance_id] = IMInstanceUpdateEvent(
instance_id=instance.instance_id,
new_instance_status=IMInstance.RAY_INSTALLING,
)
logger.info(
"Updating {}({}) with {}".format(
instance.instance_id,
IMInstance.InstanceStatus.Name(instance.status),
message_to_dict(updates[instance.instance_id]),
)
)

Reconciler._update_instance_manager(instance_manager, version, updates)

@staticmethod
def _handle_stuck_requested_instance(
instance: IMInstance, timeout_s: int, max_num_retry_request_to_allocate: int
Expand Down
Loading

0 comments on commit 32e6c62

Please sign in to comment.