Skip to content

Commit

Permalink
Move plasma retry logic into plasma store provider (ray-project#7328)
Browse files Browse the repository at this point in the history
  • Loading branch information
edoakes authored Feb 27, 2020
1 parent aec0365 commit 2ad9bc5
Show file tree
Hide file tree
Showing 6 changed files with 67 additions and 63 deletions.
41 changes: 11 additions & 30 deletions python/ray/_raylet.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -93,10 +93,6 @@ from ray.exceptions import (
)
from ray.experimental.no_return import NoReturn
from ray.utils import decode
from ray.ray_constants import (
DEFAULT_PUT_OBJECT_DELAY,
DEFAULT_PUT_OBJECT_RETRIES,
)

cimport cpython

Expand Down Expand Up @@ -673,32 +669,17 @@ cdef class CoreWorker:
size_t data_size, ObjectID object_id,
c_vector[CObjectID] contained_ids,
CObjectID *c_object_id, shared_ptr[CBuffer] *data):
delay = ray_constants.DEFAULT_PUT_OBJECT_DELAY
for attempt in reversed(
range(ray_constants.DEFAULT_PUT_OBJECT_RETRIES)):
try:
if object_id is None:
with nogil:
check_status(self.core_worker.get().Create(
metadata, data_size, contained_ids,
c_object_id, data))
else:
c_object_id[0] = object_id.native()
with nogil:
check_status(self.core_worker.get().Create(
metadata, data_size,
c_object_id[0], data))
break
except ObjectStoreFullError as e:
if attempt:
logger.warning("Waiting {} seconds for space to free up "
"in the object store.".format(delay))
gc.collect()
time.sleep(delay)
delay *= 2
else:
self.dump_object_store_memory_usage()
raise e
if object_id is None:
with nogil:
check_status(self.core_worker.get().Create(
metadata, data_size, contained_ids,
c_object_id, data))
else:
c_object_id[0] = object_id.native()
with nogil:
check_status(self.core_worker.get().Create(
metadata, data_size,
c_object_id[0], data))

# If data is nullptr, that means the ObjectID already existed,
# which we ignore.
Expand Down
6 changes: 0 additions & 6 deletions python/ray/ray_constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,6 @@ def direct_call_enabled():
# The default maximum number of bytes to allocate to the object store unless
# overridden by the user.
DEFAULT_OBJECT_STORE_MAX_MEMORY_BYTES = 20 * 10**9
# The default number of retries to call `put` when the object store is full.
DEFAULT_PUT_OBJECT_RETRIES = 5
# The default seconds for delay between calls to retry `put` when
# the object store is full. This delay is exponentially doubled up to
# DEFAULT_PUT_OBJECT_RETRIES times.
DEFAULT_PUT_OBJECT_DELAY = 1
# The smallest cap on the memory used by the object store that we allow.
# This must be greater than MEMORY_RESOURCE_UNIT_BYTES * 0.7
OBJECT_STORE_MINIMUM_MEMORY_BYTES = 75 * 1024 * 1024
Expand Down
25 changes: 14 additions & 11 deletions python/ray/tests/test_failure.py
Original file line number Diff line number Diff line change
Expand Up @@ -892,30 +892,33 @@ def test_connect_with_disconnected_node(shutdown_only):
@pytest.mark.parametrize(
"ray_start_cluster_head", [{
"num_cpus": 5,
"object_store_memory": 10**8
"object_store_memory": 10**8,
"_internal_config": json.dumps({
"object_store_full_max_retries": 0
})
}],
indirect=True)
@pytest.mark.parametrize("num_actors", [1, 2, 5])
def test_parallel_actor_fill_plasma_retry(ray_start_cluster_head, num_actors):
def test_parallel_actor_fill_plasma_retry(ray_start_cluster_head):
@ray.remote
class LargeMemoryActor:
def some_expensive_task(self):
return np.zeros(10**8 // 2, dtype=np.uint8)

actors = [LargeMemoryActor.remote() for _ in range(num_actors)]
actors = [LargeMemoryActor.remote() for _ in range(5)]
for _ in range(10):
pending = [a.some_expensive_task.remote() for a in actors]
while pending:
[done], pending = ray.wait(pending, num_returns=1)


@pytest.mark.parametrize(
"ray_start_cluster_head", [{
"num_cpus": 2,
"object_store_memory": 10**8
}],
indirect=True)
def test_fill_object_store_exception(ray_start_cluster_head):
def test_fill_object_store_exception(shutdown_only):
ray.init(
num_cpus=2,
object_store_memory=10**8,
_internal_config=json.dumps({
"object_store_full_max_retries": 0
}))

@ray.remote
def expensive_task():
return np.zeros((10**8) // 10, dtype=np.uint8)
Expand Down
1 change: 1 addition & 0 deletions python/ray/tests/test_reference_counting.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
def one_worker_100MiB(request):
config = json.dumps({
"distributed_ref_counting_enabled": 1,
"object_store_full_max_retries": 1,
})
yield ray.init(
num_cpus=1,
Expand Down
7 changes: 7 additions & 0 deletions src/ray/common/ray_config_def.h
Original file line number Diff line number Diff line change
Expand Up @@ -238,3 +238,10 @@ RAY_CONFIG(uint32_t, object_store_get_max_ids_to_print_in_warning, 20)
/// Note: this only takes effect when gcs service is enabled.
RAY_CONFIG(int64_t, gcs_service_connect_retries, 50)
RAY_CONFIG(int64_t, gcs_service_connect_wait_milliseconds, 100)

/// Maximum number of times to retry putting an object when the plasma store is full.
/// Can be set to -1 to enable unlimited retries.
RAY_CONFIG(int32_t, object_store_full_max_retries, 5)
/// Duration to sleep after failing to put an object in plasma because it is full.
/// This will be exponentially increased for each retry.
RAY_CONFIG(uint32_t, object_store_full_initial_delay_ms, 1000)
50 changes: 34 additions & 16 deletions src/ray/core_worker/store_provider/plasma_store_provider.cc
Original file line number Diff line number Diff line change
Expand Up @@ -55,28 +55,46 @@ Status CoreWorkerPlasmaStoreProvider::Create(const std::shared_ptr<Buffer> &meta
const size_t data_size,
const ObjectID &object_id,
std::shared_ptr<Buffer> *data) {
auto plasma_id = object_id.ToPlasmaId();
std::shared_ptr<arrow::Buffer> arrow_buffer;
{
std::lock_guard<std::mutex> guard(store_client_mutex_);
arrow::Status status =
store_client_.Create(plasma_id, data_size, metadata ? metadata->Data() : nullptr,
metadata ? metadata->Size() : 0, &arrow_buffer);
if (plasma::IsPlasmaObjectExists(status)) {
RAY_LOG(WARNING) << "Trying to put an object that already existed in plasma: "
<< object_id << ".";
return Status::OK();
int32_t retries = 0;
int32_t max_retries = RayConfig::instance().object_store_full_max_retries();
uint32_t delay = RayConfig::instance().object_store_full_initial_delay_ms();
Status status;
bool should_retry = true;
while (should_retry) {
should_retry = false;
arrow::Status plasma_status;
std::shared_ptr<arrow::Buffer> arrow_buffer;
{
std::lock_guard<std::mutex> guard(store_client_mutex_);
plasma_status = store_client_.Create(
object_id.ToPlasmaId(), data_size, metadata ? metadata->Data() : nullptr,
metadata ? metadata->Size() : 0, &arrow_buffer);
}
if (plasma::IsPlasmaStoreFull(status)) {
if (plasma::IsPlasmaStoreFull(plasma_status)) {
std::ostringstream message;
message << "Failed to put object " << object_id << " in object store because it "
<< "is full. Object size is " << data_size << " bytes.";
return Status::ObjectStoreFull(message.str());
status = Status::ObjectStoreFull(message.str());
if (max_retries < 0 || retries < max_retries) {
RAY_LOG(ERROR) << message.str() << " Plasma store status:\n"
<< MemoryUsageString() << "\nWaiting " << delay
<< "ms for space to free up...";
usleep(1000 * delay);
delay *= 2;
retries += 1;
should_retry = true;
}
} else if (plasma::IsPlasmaObjectExists(plasma_status)) {
RAY_LOG(WARNING) << "Trying to put an object that already existed in plasma: "
<< object_id << ".";
status = Status::OK();
} else {
RAY_ARROW_RETURN_NOT_OK(plasma_status);
*data = std::make_shared<PlasmaBuffer>(PlasmaBuffer(arrow_buffer));
status = Status::OK();
}
RAY_ARROW_RETURN_NOT_OK(status);
}
*data = std::make_shared<PlasmaBuffer>(PlasmaBuffer(arrow_buffer));
return Status::OK();
return status;
}

Status CoreWorkerPlasmaStoreProvider::Seal(const ObjectID &object_id) {
Expand Down

0 comments on commit 2ad9bc5

Please sign in to comment.