Skip to content

Commit

Permalink
[Core] Out of Disk prevention (ray-project#25370)
Browse files Browse the repository at this point in the history
Ray (on K8s) fails silently when running out of disk space.
Today, when running a script that has a large amount of object spilling, if the disk runs out of space then Kubernetes will silently terminate the node. Autoscaling will kick in and replace the dead node. There is no indication that there was a failure due to disk space.
Instead, we should fail tasks with a good error message when the disk is full.

We monitor the disk usage, when node disk usage grows over the predefined capacity (like 90%), we fail new task/actor/object put that allocates new objects.
  • Loading branch information
scv119 authored Jun 22, 2022
1 parent d6e8b90 commit afb092a
Show file tree
Hide file tree
Showing 32 changed files with 726 additions and 15 deletions.
1 change: 1 addition & 0 deletions BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -1431,6 +1431,7 @@ cc_test(
copts = COPTS,
tags = ["team:core"],
deps = [
":ray_common",
":ray_util",
"@com_google_googletest//:gtest_main",
],
Expand Down
5 changes: 5 additions & 0 deletions python/ray/_private/serialization.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
ObjectReconstructionFailedError,
ObjectReconstructionFailedLineageEvictedError,
ObjectReconstructionFailedMaxAttemptsExceededError,
OutOfDiskError,
OwnerDiedError,
PlasmaObjectNotAvailable,
RayActorError,
Expand Down Expand Up @@ -276,6 +277,10 @@ def _deserialize_object(self, data, metadata, object_ref):
return ObjectFetchTimedOutError(
object_ref.hex(), object_ref.owner_address(), object_ref.call_site()
)
elif error_type == ErrorType.Value("OUT_OF_DISK_ERROR"):
return OutOfDiskError(
object_ref.hex(), object_ref.owner_address(), object_ref.call_site()
)
elif error_type == ErrorType.Value("OBJECT_DELETED"):
return ReferenceCountingAssertionError(
object_ref.hex(), object_ref.owner_address(), object_ref.call_site()
Expand Down
3 changes: 3 additions & 0 deletions python/ray/_raylet.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,7 @@ from ray.exceptions import (
RaySystemError,
RayTaskError,
ObjectStoreFullError,
OutOfDiskError,
GetTimeoutError,
TaskCancelledError,
AsyncioActorExit,
Expand Down Expand Up @@ -166,6 +167,8 @@ cdef int check_status(const CRayStatus& status) nogil except -1:

if status.IsObjectStoreFull():
raise ObjectStoreFullError(message)
elif status.IsOutOfDisk():
raise OutOfDiskError(message)
elif status.IsInterrupted():
raise KeyboardInterrupt()
elif status.IsTimedOut():
Expand Down
20 changes: 20 additions & 0 deletions python/ray/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -341,6 +341,26 @@ def __str__(self):
)


@PublicAPI
class OutOfDiskError(RayError):
"""Indicates that the local disk is full.
This is raised if the attempt to store the object fails
because both the object store and disk are full.
"""

def __str__(self):
# TODO(scv119): expose more disk usage information and link to a doc.
return super(OutOfDiskError, self).__str__() + (
"\n"
"The object cannot be created because the local object store"
" is full and the local disk's utilization is over capacity"
" (95% by default)."
"Tip: Use `df` on this node to check disk usage and "
"`ray memory` to check object store memory usage."
)


@PublicAPI
class ObjectLostError(RayError):
"""Indicates that the object is lost from distributed memory, due to
Expand Down
1 change: 1 addition & 0 deletions python/ray/includes/common.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ cdef extern from "ray/common/status.h" namespace "ray" nogil:
c_bool IsUnknownError()
c_bool IsNotImplemented()
c_bool IsObjectStoreFull()
c_bool IsOutOfDisk()
c_bool IsRedisError()
c_bool IsTimedOut()
c_bool IsInterrupted()
Expand Down
1 change: 1 addition & 0 deletions python/ray/tests/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -253,6 +253,7 @@ py_test_module_list(
files = [
"test_basic_3.py",
"test_output.py",
"test_out_of_disk_space.py",
"test_failure_4.py",
"test_object_spilling.py",
"test_object_spilling_no_asan.py",
Expand Down
221 changes: 221 additions & 0 deletions python/ray/tests/test_out_of_disk_space.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,221 @@
import os
import platform
import shutil
import sys
import tempfile
import time
from contextlib import contextmanager

import numpy as np
import pytest

import ray
from ray.cluster_utils import Cluster


def calculate_capacity_threshold(disk_capacity_in_bytes):
usage = shutil.disk_usage("/tmp")
threshold = min(1, 1.0 - 1.0 * (usage.free - disk_capacity_in_bytes) / usage.total)
return threshold


def get_current_usage():
usage = shutil.disk_usage("/tmp")
print(f"free: {usage.free} ")
print(f"current usage: {1.0 - 1.0 * usage.free / usage.total}")
return 1.0 - 1.0 * usage.free / usage.total


@contextmanager
def create_tmp_file(bytes):
tmp_dir = tempfile.mkdtemp(dir="/tmp")
tmp_path = os.path.join(tmp_dir, "test.txt")
with open(tmp_path, "wb") as f:
f.write(os.urandom(bytes))
try:
yield tmp_path
finally:
os.remove(tmp_path)


@pytest.mark.skipif(platform.system() == "Windows", reason="Not targeting Windows")
def test_put_out_of_disk(shutdown_only):
local_fs_capacity_threshold = calculate_capacity_threshold(200 * 1024 * 1024)
ray.init(
num_cpus=1,
object_store_memory=80 * 1024 * 1024,
_system_config={
"local_fs_capacity_threshold": local_fs_capacity_threshold,
"local_fs_monitor_interval_ms": 10,
},
)
assert get_current_usage() < local_fs_capacity_threshold
ref = ray.put(np.random.rand(20 * 1024 * 1024))
del ref
# create a temp file so that the disk size is over the threshold.
# ray.put doesn't work is that fallback allocation uses mmaped file
# that doesn't neccssary allocate disk spaces.
with create_tmp_file(250 * 1024 * 1024):
assert get_current_usage() > local_fs_capacity_threshold
time.sleep(1)
with pytest.raises(ray.exceptions.OutOfDiskError):
ray.put(np.random.rand(20 * 1024 * 1024))
# delete tmp file to reclaim space back.

assert get_current_usage() < local_fs_capacity_threshold
time.sleep(1)
ray.put(np.random.rand(20 * 1024 * 1024))


@pytest.mark.skipif(platform.system() == "Windows", reason="Not targeting Windows")
def test_task_returns(shutdown_only):
local_fs_capacity_threshold = calculate_capacity_threshold(10 * 1024 * 1024)
ray.init(
num_cpus=1,
object_store_memory=80 * 1024 * 1024,
_system_config={
"local_fs_capacity_threshold": local_fs_capacity_threshold,
"local_fs_monitor_interval_ms": 10,
},
)

# create a temp file so that the disk size is over the threshold.
# ray.put doesn't work is that fallback allocation uses mmaped file
# that doesn't neccssary allocate disk spaces.
with create_tmp_file(250 * 1024 * 1024):
assert get_current_usage() > local_fs_capacity_threshold
time.sleep(1)

@ray.remote
def foo():
time.sleep(1)
return np.random.rand(20 * 1024 * 1024) # 160 MB data

try:
ray.get(foo.remote())
except ray.exceptions.RayTaskError as e:
assert isinstance(e.cause, ray.exceptions.OutOfDiskError)


@pytest.mark.skipif(platform.system() == "Windows", reason="Not targeting Windows")
def test_task_put(shutdown_only):
local_fs_capacity_threshold = calculate_capacity_threshold(1 * 1024 * 1024)
ray.init(
num_cpus=1,
object_store_memory=80 * 1024 * 1024,
_system_config={
"local_fs_capacity_threshold": local_fs_capacity_threshold,
"local_fs_monitor_interval_ms": 10,
},
)

# create a temp file so that the disk size is over the threshold.
# ray.put doesn't work is that fallback allocation uses mmaped file
# that doesn't neccssary allocate disk spaces.
with create_tmp_file(250 * 1024 * 1024):
assert get_current_usage() > local_fs_capacity_threshold
time.sleep(1)

@ray.remote
def foo():
ref = ray.put(np.random.rand(20 * 1024 * 1024)) # 160 MB data
return ref

try:
ray.get(foo.remote())
except ray.exceptions.RayTaskError as e:
assert isinstance(e.cause, ray.exceptions.OutOfDiskError)


@pytest.mark.skipif(platform.system() == "Windows", reason="Not targeting Windows")
def test_task_args(shutdown_only):
cluster = Cluster()
cluster.add_node(
num_cpus=1,
object_store_memory=80 * 1024 * 1024,
_system_config={
"local_fs_capacity_threshold": 0,
},
resources={"out_of_memory": 1},
)
cluster.add_node(
num_cpus=1,
object_store_memory=200 * 1024 * 1024,
resources={"sufficient_memory": 1},
)
cluster.wait_for_nodes()
ray.init(address=cluster.address)

@ray.remote
def foo():
return np.random.rand(20 * 1024 * 1024) # 160 MB data

@ray.remote
def bar(obj):
print(obj)

ref = foo.options(resources={"sufficient_memory": 1}).remote()
try:
ray.get(bar.options(resources={"out_of_memory": 1}).remote(ref))
except ray.exceptions.RayTaskError as e:
assert isinstance(e.cause, ray.exceptions.OutOfDiskError)


@pytest.mark.skipif(platform.system() == "Windows", reason="Not targeting Windows")
def test_actor(shutdown_only):
cluster = Cluster()
cluster.add_node(
num_cpus=1,
object_store_memory=80 * 1024 * 1024,
_system_config={
"local_fs_capacity_threshold": 0,
},
resources={"out_of_memory": 1},
)
cluster.add_node(
num_cpus=1,
object_store_memory=200 * 1024 * 1024,
resources={"sufficient_memory": 1},
)
cluster.wait_for_nodes()
ray.init(address=cluster.address)

@ray.remote
def foo():
return np.random.rand(20 * 1024 * 1024) # 160 MB data

@ray.remote
class Actor:
def __init__(self, obj):
self._obj = obj

def foo(self):
print(self._obj)

def args_ood(self, obj):
print(obj)

def return_ood(self):
return np.random.rand(20 * 1024 * 1024)

ref = foo.options(resources={"sufficient_memory": 1}).remote()
with pytest.raises(ray.exceptions.RayActorError):
a = Actor.options(resources={"out_of_memory": 0.001}).remote(ref)
ray.get(a.foo.remote())

a = Actor.options(resources={"out_of_memory": 1}).remote(1)
ray.get(a.foo.remote())
try:
ray.get(a.args_ood.remote(ref))
except ray.exceptions.RayTaskError as e:
assert isinstance(e.cause, ray.exceptions.OutOfDiskError)

ray.get(a.foo.remote())
try:
ray.get(a.return_ood.remote())
except ray.exceptions.RayTaskError as e:
assert isinstance(e.cause, ray.exceptions.OutOfDiskError)


if __name__ == "__main__":
sys.exit(pytest.main(["-sv", __file__]))
2 changes: 2 additions & 0 deletions python/ray/tests/test_plasma_unlimited.py
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,8 @@ def test_fallback_allocation_failure(shutdown_only):
_temp_dir="/dev/shm",
_system_config={
"object_spilling_config": json.dumps(file_system_config),
# set local fs capacity to 100% so it never errors with out of disk.
"local_fs_capacity_threshold": 1,
},
)
shm_size = shutil.disk_usage("/dev/shm").total
Expand Down
Loading

0 comments on commit afb092a

Please sign in to comment.