Skip to content

Commit

Permalink
[GCS] introduce new gcs client and refactor actor table (ray-project#…
Browse files Browse the repository at this point in the history
  • Loading branch information
micafan authored and raulchen committed Jul 19, 2019
1 parent 0af07bd commit b5b8c1d
Show file tree
Hide file tree
Showing 30 changed files with 874 additions and 420 deletions.
15 changes: 13 additions & 2 deletions BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -586,9 +586,20 @@ cc_library(
)

cc_binary(
name = "gcs_client_test",
name = "redis_gcs_client_test",
testonly = 1,
srcs = ["src/ray/gcs/client_test.cc"],
srcs = ["src/ray/gcs/redis_gcs_client_test.cc"],
copts = COPTS,
deps = [
":gcs",
"@com_google_googletest//:gtest_main",
],
)

cc_binary(
name = "actor_state_accessor_test",
testonly = 1,
srcs = ["src/ray/gcs/actor_state_accessor_test.cc"],
copts = COPTS,
deps = [
":gcs",
Expand Down
114 changes: 114 additions & 0 deletions src/ray/gcs/actor_state_accessor.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
#include "ray/gcs/actor_state_accessor.h"
#include <boost/none.hpp>
#include "ray/gcs/redis_gcs_client.h"
#include "ray/util/logging.h"

namespace ray {

namespace gcs {

ActorStateAccessor::ActorStateAccessor(RedisGcsClient &client_impl)
: client_impl_(client_impl) {}

Status ActorStateAccessor::AsyncGet(const ActorID &actor_id,
const MultiItemCallback<ActorTableData> &callback) {
RAY_CHECK(callback != nullptr);
auto on_done = [callback](RedisGcsClient *client, const ActorID &actor_id,
const std::vector<ActorTableData> &data) {
callback(Status::OK(), data);
};

ActorTable &actor_table = client_impl_.actor_table();
return actor_table.Lookup(JobID::Nil(), actor_id, on_done);
}

Status ActorStateAccessor::AsyncRegister(const std::shared_ptr<ActorTableData> &data_ptr,
const StatusCallback &callback) {
auto on_success = [callback](RedisGcsClient *client, const ActorID &actor_id,
const ActorTableData &data) {
if (callback != nullptr) {
callback(Status::OK());
}
};

auto on_failure = [callback](RedisGcsClient *client, const ActorID &actor_id,
const ActorTableData &data) {
if (callback != nullptr) {
callback(Status::Invalid("Adding actor failed."));
}
};

ActorID actor_id = ActorID::FromBinary(data_ptr->actor_id());
ActorTable &actor_table = client_impl_.actor_table();
return actor_table.AppendAt(JobID::Nil(), actor_id, data_ptr, on_success, on_failure,
/*log_length*/ 0);
}

Status ActorStateAccessor::AsyncUpdate(const ActorID &actor_id,
const std::shared_ptr<ActorTableData> &data_ptr,
const StatusCallback &callback) {
// The actor log starts with an ALIVE entry. This is followed by 0 to N pairs
// of (RECONSTRUCTING, ALIVE) entries, where N is the maximum number of
// reconstructions. This is followed optionally by a DEAD entry.
int log_length =
2 * (data_ptr->max_reconstructions() - data_ptr->remaining_reconstructions());
if (data_ptr->state() != ActorTableData::ALIVE) {
// RECONSTRUCTING or DEAD entries have an odd index.
log_length += 1;
}

auto on_success = [callback](RedisGcsClient *client, const ActorID &actor_id,
const ActorTableData &data) {
// If we successfully appended a record to the GCS table of the actor that
// has died, signal this to anyone receiving signals from this actor.
if (data.state() == ActorTableData::DEAD ||
data.state() == ActorTableData::RECONSTRUCTING) {
std::vector<std::string> args = {"XADD", actor_id.Hex(), "*", "signal",
"ACTOR_DIED_SIGNAL"};
auto redis_context = client->primary_context();
RAY_CHECK_OK(redis_context->RunArgvAsync(args));
}

if (callback != nullptr) {
callback(Status::OK());
}
};

auto on_failure = [callback](RedisGcsClient *client, const ActorID &actor_id,
const ActorTableData &data) {
if (callback != nullptr) {
callback(Status::Invalid("Updating actor failed."));
}
};

ActorTable &actor_table = client_impl_.actor_table();
return actor_table.AppendAt(JobID::Nil(), actor_id, data_ptr, on_success, on_failure,
log_length);
}

Status ActorStateAccessor::AsyncSubscribe(
const SubscribeCallback<ActorID, ActorTableData> &subscribe,
const StatusCallback &done) {
RAY_CHECK(subscribe != nullptr);
auto on_subscribe = [subscribe](RedisGcsClient *client, const ActorID &actor_id,
const std::vector<ActorTableData> &data) {
if (!data.empty()) {
// We only need the last entry, because it represents the latest state of
// this actor.
subscribe(actor_id, data.back());
}
};

auto on_done = [done](RedisGcsClient *client) {
if (done != nullptr) {
done(Status::OK());
}
};

ActorTable &actor_table = client_impl_.actor_table();
return actor_table.Subscribe(JobID::Nil(), ClientID::Nil(), on_subscribe, on_done);
}

} // namespace gcs

} // namespace ray
71 changes: 71 additions & 0 deletions src/ray/gcs/actor_state_accessor.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
#ifndef RAY_GCS_ACTOR_STATE_ACCESSOR_H
#define RAY_GCS_ACTOR_STATE_ACCESSOR_H

#include "ray/common/id.h"
#include "ray/gcs/callback.h"
#include "ray/gcs/tables.h"

namespace ray {

namespace gcs {

class RedisGcsClient;

/// \class ActorStateAccessor
/// ActorStateAccessor class encapsulates the implementation details of
/// reading or writing or subscribing of actor's specification (immutable fields which
/// determined at submission time, and mutable fields which are determined at runtime).
class ActorStateAccessor {
public:
explicit ActorStateAccessor(RedisGcsClient &client_impl);

~ActorStateAccessor() {}

/// Get actor specification from GCS asynchronously.
///
/// \param actor_id The ID of actor to look up in the GCS.
/// \param callback Callback that will be called after lookup finishes.
/// \return Status
Status AsyncGet(const ActorID &actor_id,
const MultiItemCallback<ActorTableData> &callback);

/// Register an actor to GCS asynchronously.
///
/// \param data_ptr The actor that will be registered to the GCS.
/// \param callback Callback that will be called after actor has been registered
/// to the GCS.
/// \return Status
Status AsyncRegister(const std::shared_ptr<ActorTableData> &data_ptr,
const StatusCallback &callback);

/// Update dynamic states of actor in GCS asynchronously.
///
/// \param actor_id ID of the actor to update.
/// \param data_ptr Data of the actor to update.
/// \param callback Callback that will be called after update finishes.
/// \return Status
/// TODO(micafan) Don't expose the whole `ActorTableData` and only allow
/// updating dynamic states.
Status AsyncUpdate(const ActorID &actor_id,
const std::shared_ptr<ActorTableData> &data_ptr,
const StatusCallback &callback);

/// Subscribe to any register operations of actors.
///
/// \param subscribe Callback that will be called each time when an actor is registered
/// or updated.
/// \param done Callback that will be called when subscription is complete and we
/// are ready to receive notification.
/// \return Status
Status AsyncSubscribe(const SubscribeCallback<ActorID, ActorTableData> &subscribe,
const StatusCallback &done);

private:
RedisGcsClient &client_impl_;
};

} // namespace gcs

} // namespace ray

#endif // RAY_GCS_ACTOR_STATE_ACCESSOR_H
161 changes: 161 additions & 0 deletions src/ray/gcs/actor_state_accessor_test.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,161 @@
#include <atomic>
#include <chrono>
#include <string>
#include <thread>
#include <vector>
#include "gtest/gtest.h"
#include "ray/gcs/redis_gcs_client.h"

namespace ray {

namespace gcs {

class ActorStateAccessorTest : public ::testing::Test {
public:
ActorStateAccessorTest() : options_("127.0.0.1", 6379, "", true) {}

virtual void SetUp() {
GenTestData();

gcs_client_.reset(new RedisGcsClient(options_));
RAY_CHECK_OK(gcs_client_->Connect(io_service_));

work_thread.reset(new std::thread([this] {
std::auto_ptr<boost::asio::io_service::work> work(
new boost::asio::io_service::work(io_service_));
io_service_.run();
}));
}

virtual void TearDown() {
gcs_client_->Disconnect();

io_service_.stop();
work_thread->join();
work_thread.reset();

gcs_client_.reset();

ClearTestData();
}

protected:
void GenTestData() { GenActorData(); }

void GenActorData() {
for (size_t i = 0; i < 2; ++i) {
std::shared_ptr<ActorTableData> actor = std::make_shared<ActorTableData>();
ActorID actor_id = ActorID::FromRandom();
actor->set_actor_id(actor_id.Binary());
actor->set_max_reconstructions(1);
actor->set_remaining_reconstructions(1);
JobID job_id = JobID::FromInt(i);
actor->set_job_id(job_id.Binary());
actor->set_state(ActorTableData::ALIVE);
actor_datas_[actor_id] = actor;
}
}

void ClearTestData() { actor_datas_.clear(); }

void WaitPendingDone(std::chrono::milliseconds timeout) {
WaitPendingDone(pending_count_, timeout);
}

void WaitPendingDone(std::atomic<int> &pending_count,
std::chrono::milliseconds timeout) {
while (pending_count != 0 && timeout.count() > 0) {
std::chrono::milliseconds interval(10);
std::this_thread::sleep_for(interval);
timeout -= interval;
}
EXPECT_EQ(pending_count, 0);
}

protected:
GcsClientOptions options_;
std::unique_ptr<RedisGcsClient> gcs_client_;

boost::asio::io_service io_service_;
std::unique_ptr<std::thread> work_thread;

std::unordered_map<ActorID, std::shared_ptr<ActorTableData>> actor_datas_;

std::atomic<int> pending_count_{0};
};

TEST_F(ActorStateAccessorTest, RegisterAndGet) {
ActorStateAccessor &actor_accessor = gcs_client_->Actors();
// register
for (const auto &elem : actor_datas_) {
const auto &actor = elem.second;
++pending_count_;
actor_accessor.AsyncRegister(actor, [this](Status status) {
RAY_CHECK_OK(status);
--pending_count_;
});
}

std::chrono::milliseconds timeout(10000);
WaitPendingDone(timeout);

// get
for (const auto &elem : actor_datas_) {
const auto &actor = elem.second;
++pending_count_;
actor_accessor.AsyncGet(elem.first,
[this](Status status, std::vector<ActorTableData> datas) {
ASSERT_EQ(datas.size(), 1U);
ActorID actor_id = ActorID::FromBinary(datas[0].actor_id());
auto it = actor_datas_.find(actor_id);
ASSERT_TRUE(it != actor_datas_.end());
--pending_count_;
});
}

WaitPendingDone(timeout);
}

TEST_F(ActorStateAccessorTest, Subscribe) {
ActorStateAccessor &actor_accessor = gcs_client_->Actors();
std::chrono::milliseconds timeout(10000);
// subscribe
std::atomic<int> sub_pending_count(0);
std::atomic<int> do_sub_pending_count(0);
auto subscribe = [this, &sub_pending_count](const ActorID &actor_id,
const ActorTableData &data) {
const auto it = actor_datas_.find(actor_id);
ASSERT_TRUE(it != actor_datas_.end());
--sub_pending_count;
};
auto done = [&do_sub_pending_count](Status status) {
RAY_CHECK_OK(status);
--do_sub_pending_count;
};

++do_sub_pending_count;
actor_accessor.AsyncSubscribe(subscribe, done);
// Wait until subscribe finishes.
WaitPendingDone(do_sub_pending_count, timeout);

// register
std::atomic<int> register_pending_count(0);
for (const auto &elem : actor_datas_) {
const auto &actor = elem.second;
++sub_pending_count;
++register_pending_count;
actor_accessor.AsyncRegister(actor, [&register_pending_count](Status status) {
RAY_CHECK_OK(status);
--register_pending_count;
});
}
// Wait until register finishes.
WaitPendingDone(register_pending_count, timeout);

// Wait for all subscribe notifications.
WaitPendingDone(sub_pending_count, timeout);
}

} // namespace gcs

} // namespace ray
Loading

0 comments on commit b5b8c1d

Please sign in to comment.