Skip to content

Commit

Permalink
[GCS] reply to the owner only after the actor has been successfully c…
Browse files Browse the repository at this point in the history
…reated. (ray-project#8079)

* reply to the owner only after the actor is successfully created.

* reply immediately if the actor is already created

* fix comment

* add test_actor_creation_task provided by @stephanie Wang

Co-authored-by: senlin.zsl <[email protected]>
  • Loading branch information
wumuzi520 and senlin.zsl authored Apr 19, 2020
1 parent da296bf commit 3f28a8a
Show file tree
Hide file tree
Showing 4 changed files with 51 additions and 33 deletions.
23 changes: 23 additions & 0 deletions python/ray/tests/test_reference_counting.py
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,29 @@ def one_dep_large(dep, signal=None):
check_refcounts({})


def test_actor_creation_task(ray_start_regular):
@ray.remote
def large_object():
# This will be spilled to plasma.
return np.zeros(10 * 1024 * 1024, dtype=np.uint8)

@ray.remote(resources={"init": 1})
class Actor:
def __init__(self, dependency):
return

def ping(self):
return

a = Actor.remote(large_object.remote())
ping = a.ping.remote()
ready, unready = ray.wait([ping], timeout=1)
assert not ready

ray.experimental.set_resource("init", 1)
ray.get(ping)


def test_basic_pinning(one_worker_100MiB):
@ray.remote
def f(array):
Expand Down
45 changes: 19 additions & 26 deletions src/ray/gcs/gcs_server/gcs_actor_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -111,48 +111,31 @@ void GcsActorManager::RegisterActor(
auto actor_id = ActorID::FromBinary(actor_creation_task_spec.actor_id());

auto iter = registered_actors_.find(actor_id);
if (iter != registered_actors_.end()) {
if (iter != registered_actors_.end() &&
iter->second->GetState() == rpc::ActorTableData::ALIVE) {
// When the network fails, Driver/Worker is not sure whether GcsServer has received
// the request of actor creation task, so Driver/Worker will try again and again until
// receiving the reply from GcsServer. If the actor is already records on the GCS
// Server side, the GCS Server will be responsible for creating or reconstructing the
// actor regardless of whether the Driver/Worker sends the request to create the actor
// again, so we just need fast reply OK to the Driver/Worker that the actor is already
// recorded by GCS Server.
// receiving the reply from GcsServer. If the actor has been created successfully then
// just reply to the caller.
callback(iter->second);
return;
}

auto pending_register_iter = actor_to_register_callbacks_.find(actor_id);
if (pending_register_iter != actor_to_register_callbacks_.end()) {
// It is a duplicate message, just mark the callback as pending and invoke it after
// the related actor is flushed.
// the actor has been successfully created.
pending_register_iter->second.emplace_back(std::move(callback));
return;
}

// Mark the callback as pending and invoke it after the related actor is flushed.
// Mark the callback as pending and invoke it after the actor has been successfully
// created.
actor_to_register_callbacks_[actor_id].emplace_back(std::move(callback));

auto actor = std::make_shared<GcsActor>(request);
auto actor_table_data =
std::make_shared<rpc::ActorTableData>(actor->GetActorTableData());
// The backend storage is reliable in the future, so the status must be ok.
RAY_CHECK_OK(actor_info_accessor_.AsyncUpdate(
actor_id, actor_table_data, [this, actor](Status status) {
RAY_CHECK_OK(status);
RAY_CHECK(registered_actors_.emplace(actor->GetActorID(), actor).second);
// Invoke all callbacks for all registration requests of this actor (duplicated
// requests are included) and remove all of them from
// actor_to_register_callbacks_.
auto iter = actor_to_register_callbacks_.find(actor->GetActorID());
RAY_CHECK(iter != actor_to_register_callbacks_.end() && !iter->second.empty());
for (auto &callback : iter->second) {
callback(actor);
}
actor_to_register_callbacks_.erase(iter);
gcs_actor_scheduler_->Schedule(actor);
}));
RAY_CHECK(registered_actors_.emplace(actor->GetActorID(), actor).second);
gcs_actor_scheduler_->Schedule(actor);
}

void GcsActorManager::ReconstructActorOnWorker(const ray::ClientID &node_id,
Expand Down Expand Up @@ -265,6 +248,16 @@ void GcsActorManager::OnActorCreateSuccess(std::shared_ptr<GcsActor> actor) {
std::make_shared<rpc::ActorTableData>(actor->GetActorTableData());
// The backend storage is reliable in the future, so the status must be ok.
RAY_CHECK_OK(actor_info_accessor_.AsyncUpdate(actor_id, actor_table_data, nullptr));

// Invoke all callbacks for all registration requests of this actor (duplicated
// requests are included) and remove all of them from actor_to_register_callbacks_.
auto iter = actor_to_register_callbacks_.find(actor->GetActorID());
if (iter != actor_to_register_callbacks_.end()) {
for (auto &callback : iter->second) {
callback(actor);
}
actor_to_register_callbacks_.erase(iter);
}
}

void GcsActorManager::SchedulePendingActors() {
Expand Down
5 changes: 3 additions & 2 deletions src/ray/gcs/gcs_server/gcs_actor_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -125,8 +125,9 @@ class GcsActorManager {
/// Register actor asynchronously.
///
/// \param request Contains the meta info to create the actor.
/// \param callback Will be invoked after the meta info is flushed to the storage or be
/// invoked immediately if the meta info already exists.
/// \param callback Will be invoked after the actor is created successfully or be
/// invoked immediately if the actor is already registered to `registered_actors_` and
/// its state is `ALIVE`.
void RegisterActor(const rpc::CreateActorRequest &request,
RegisterActorCallback callback);

Expand Down
11 changes: 6 additions & 5 deletions src/ray/gcs/gcs_server/test/gcs_actor_manager_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -108,23 +108,24 @@ TEST_F(GcsActorManagerTest, TestNormalFlow) {
auto job_id = JobID::FromInt(1);
auto create_actor_request =
Mocker::GenCreateActorRequest(job_id, /*max_reconstructions=*/2);
std::vector<std::shared_ptr<gcs::GcsActor>> registered_actors;
std::vector<std::shared_ptr<gcs::GcsActor>> finished_actors;
gcs_actor_manager_->RegisterActor(
create_actor_request, [&registered_actors](std::shared_ptr<gcs::GcsActor> actor) {
registered_actors.emplace_back(actor);
create_actor_request, [&finished_actors](std::shared_ptr<gcs::GcsActor> actor) {
finished_actors.emplace_back(actor);
});

ASSERT_EQ(1, registered_actors.size());
ASSERT_EQ(0, finished_actors.size());
ASSERT_EQ(1, gcs_actor_manager_->GetAllRegisteredActors().size());
ASSERT_EQ(1, gcs_actor_manager_->GetAllPendingActors().size());

auto actor = registered_actors.front();
auto actor = gcs_actor_manager_->GetAllRegisteredActors().begin()->second;
ASSERT_EQ(rpc::ActorTableData::PENDING, actor->GetState());

// Add node_1 and then check if the actor is in state `ALIVE`
auto node_1 = Mocker::GenNodeInfo();
auto node_id_1 = ClientID::FromBinary(node_1->node_id());
gcs_node_manager_->AddNode(node_1);
ASSERT_EQ(1, finished_actors.size());
ASSERT_EQ(1, gcs_node_manager_->GetAllAliveNodes().size());
ASSERT_EQ(0, gcs_actor_manager_->GetAllPendingActors().size());
ASSERT_EQ(rpc::ActorTableData::ALIVE, actor->GetState());
Expand Down

0 comments on commit 3f28a8a

Please sign in to comment.