Skip to content

Commit

Permalink
rename ActorTable to LogBasedActorTable and add new ActorTable (ray-p…
Browse files Browse the repository at this point in the history
  • Loading branch information
wumuzi520 authored Mar 23, 2020
1 parent 79767fe commit 039961b
Show file tree
Hide file tree
Showing 18 changed files with 391 additions and 53 deletions.
10 changes: 10 additions & 0 deletions src/ray/common/test_util.cc
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,16 @@ void RedisServiceManagerForTest::TearDownTestCase() {
usleep(100 * 1000);
}

void RedisServiceManagerForTest::FlushAll() {
std::string flush_all_redis_command =
REDIS_CLIENT_EXEC_PATH + " -p " + std::to_string(REDIS_SERVER_PORT) + " flushall";
RAY_LOG(INFO) << "Cleaning up redis with command: " << flush_all_redis_command;
if (system(flush_all_redis_command.c_str()) != 0) {
RAY_LOG(WARNING) << "Failed to flush redis. The redis process may no longer exist.";
}
usleep(100 * 1000);
}

bool WaitForCondition(std::function<bool()> condition, int timeout_ms) {
int wait_time = 0;
while (true) {
Expand Down
1 change: 1 addition & 0 deletions src/ray/common/test_util.h
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ class RedisServiceManagerForTest : public ::testing::Test {
public:
static void SetUpTestCase();
static void TearDownTestCase();
static void FlushAll();
};

} // namespace ray
Expand Down
6 changes: 6 additions & 0 deletions src/ray/gcs/accessor.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,12 @@ class ActorInfoAccessor {
public:
virtual ~ActorInfoAccessor() = default;

/// Get all actor specification from GCS synchronously.
///
/// \param actor_table_data_list The container to hold the actor specification.
/// \return Status
virtual Status GetAll(std::vector<rpc::ActorTableData> *actor_table_data_list) = 0;

/// Get actor specification from GCS asynchronously.
///
/// \param actor_id The ID of actor to look up in the GCS.
Expand Down
11 changes: 8 additions & 3 deletions src/ray/gcs/gcs_client/service_based_accessor.cc
Original file line number Diff line number Diff line change
Expand Up @@ -77,9 +77,14 @@ Status ServiceBasedJobInfoAccessor::AsyncSubscribeToFinishedJobs(

ServiceBasedActorInfoAccessor::ServiceBasedActorInfoAccessor(
ServiceBasedGcsClient *client_impl)
: client_impl_(client_impl),
subscribe_id_(ClientID::FromRandom()),
actor_sub_executor_(client_impl->GetRedisGcsClient().actor_table()) {}
: subscribe_id_(ClientID::FromRandom()),
client_impl_(client_impl),
actor_sub_executor_(client_impl->GetRedisGcsClient().log_based_actor_table()) {}

Status ServiceBasedActorInfoAccessor::GetAll(
std::vector<ActorTableData> *actor_table_data_list) {
return Status::Invalid("Not implemented");
}

Status ServiceBasedActorInfoAccessor::AsyncGet(
const ActorID &actor_id, const OptionalItemCallback<rpc::ActorTableData> &callback) {
Expand Down
9 changes: 6 additions & 3 deletions src/ray/gcs/gcs_client/service_based_accessor.h
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,8 @@ class ServiceBasedActorInfoAccessor : public ActorInfoAccessor {

virtual ~ServiceBasedActorInfoAccessor() = default;

Status GetAll(std::vector<ActorTableData> *actor_table_data_list) override;

Status AsyncGet(const ActorID &actor_id,
const OptionalItemCallback<rpc::ActorTableData> &callback) override;

Expand Down Expand Up @@ -89,12 +91,13 @@ class ServiceBasedActorInfoAccessor : public ActorInfoAccessor {
const ActorID &actor_id,
const OptionalItemCallback<rpc::ActorCheckpointIdData> &callback) override;

protected:
ClientID subscribe_id_;

private:
ServiceBasedGcsClient *client_impl_;

ClientID subscribe_id_;

typedef SubscriptionExecutor<ActorID, ActorTableData, ActorTable>
typedef SubscriptionExecutor<ActorID, ActorTableData, LogBasedActorTable>
ActorSubscriptionExecutor;
ActorSubscriptionExecutor actor_sub_executor_;

Expand Down
2 changes: 2 additions & 0 deletions src/ray/gcs/gcs_client/test/service_based_gcs_client_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ class ServiceBasedGcsGcsClientTest : public RedisServiceManagerForTest {
thread_io_service_->join();
thread_gcs_server_->join();
gcs_client_->Disconnect();
FlushAll();
}

bool AddJob(const std::shared_ptr<rpc::JobTableData> &job_table_data) {
Expand Down Expand Up @@ -313,6 +314,7 @@ class ServiceBasedGcsGcsClientTest : public RedisServiceManagerForTest {
rpc::GcsNodeInfo gcs_node_info;
gcs_node_info.set_node_id(node_id);
gcs_node_info.set_state(rpc::GcsNodeInfo_GcsNodeState_ALIVE);
gcs_node_info.set_node_manager_address("127.0.0.1");
return gcs_node_info;
}

Expand Down
2 changes: 2 additions & 0 deletions src/ray/gcs/gcs_server/test/gcs_server_rpc_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -410,6 +410,8 @@ class GcsServerTest : public RedisServiceManagerForTest {
rpc::GcsNodeInfo GenGcsNodeInfo(const std::string &node_id) {
rpc::GcsNodeInfo gcs_node_info;
gcs_node_info.set_node_id(node_id);
gcs_node_info.set_node_manager_address("127.0.0.1");
gcs_node_info.set_node_manager_port(6000);
gcs_node_info.set_state(rpc::GcsNodeInfo_GcsNodeState_ALIVE);
return gcs_node_info;
}
Expand Down
142 changes: 120 additions & 22 deletions src/ray/gcs/redis_accessor.cc
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,32 @@ namespace ray {

namespace gcs {

RedisActorInfoAccessor::RedisActorInfoAccessor(RedisGcsClient *client_impl)
: client_impl_(client_impl), actor_sub_executor_(client_impl_->actor_table()) {}
RedisLogBasedActorInfoAccessor::RedisLogBasedActorInfoAccessor(
RedisGcsClient *client_impl)
: client_impl_(client_impl),
log_based_actor_sub_executor_(client_impl_->log_based_actor_table()) {}

Status RedisActorInfoAccessor::AsyncGet(
std::vector<ActorID> RedisLogBasedActorInfoAccessor::GetAllActorID() const {
return client_impl_->log_based_actor_table().GetAllActorID();
}

Status RedisLogBasedActorInfoAccessor::Get(const ActorID &actor_id,
ActorTableData *actor_table_data) const {
return client_impl_->log_based_actor_table().Get(actor_id, actor_table_data);
}

Status RedisLogBasedActorInfoAccessor::GetAll(
std::vector<ActorTableData> *actor_table_data_list) {
RAY_CHECK(actor_table_data_list);
auto actor_id_list = GetAllActorID();
actor_table_data_list->resize(actor_id_list.size());
for (size_t i = 0; i < actor_id_list.size(); ++i) {
RAY_CHECK_OK(Get(actor_id_list[i], &(*actor_table_data_list)[i]));
}
return Status::OK();
}

Status RedisLogBasedActorInfoAccessor::AsyncGet(
const ActorID &actor_id, const OptionalItemCallback<ActorTableData> &callback) {
RAY_CHECK(callback != nullptr);
auto on_done = [callback](RedisGcsClient *client, const ActorID &actor_id,
Expand All @@ -37,10 +59,11 @@ Status RedisActorInfoAccessor::AsyncGet(
callback(Status::OK(), result);
};

return client_impl_->actor_table().Lookup(actor_id.JobId(), actor_id, on_done);
return client_impl_->log_based_actor_table().Lookup(actor_id.JobId(), actor_id,
on_done);
}

Status RedisActorInfoAccessor::AsyncRegister(
Status RedisLogBasedActorInfoAccessor::AsyncRegister(
const std::shared_ptr<ActorTableData> &data_ptr, const StatusCallback &callback) {
auto on_success = [callback](RedisGcsClient *client, const ActorID &actor_id,
const ActorTableData &data) {
Expand All @@ -57,12 +80,12 @@ Status RedisActorInfoAccessor::AsyncRegister(
};

ActorID actor_id = ActorID::FromBinary(data_ptr->actor_id());
return client_impl_->actor_table().AppendAt(actor_id.JobId(), actor_id, data_ptr,
on_success, on_failure,
/*log_length*/ 0);
return client_impl_->log_based_actor_table().AppendAt(actor_id.JobId(), actor_id,
data_ptr, on_success, on_failure,
/*log_length*/ 0);
}

Status RedisActorInfoAccessor::AsyncUpdate(
Status RedisLogBasedActorInfoAccessor::AsyncUpdate(
const ActorID &actor_id, const std::shared_ptr<ActorTableData> &data_ptr,
const StatusCallback &callback) {
// The actor log starts with an ALIVE entry. This is followed by 0 to N pairs
Expand Down Expand Up @@ -100,30 +123,32 @@ Status RedisActorInfoAccessor::AsyncUpdate(
}
};

return client_impl_->actor_table().AppendAt(actor_id.JobId(), actor_id, data_ptr,
on_success, on_failure, log_length);
return client_impl_->log_based_actor_table().AppendAt(
actor_id.JobId(), actor_id, data_ptr, on_success, on_failure, log_length);
}

Status RedisActorInfoAccessor::AsyncSubscribeAll(
Status RedisLogBasedActorInfoAccessor::AsyncSubscribeAll(
const SubscribeCallback<ActorID, ActorTableData> &subscribe,
const StatusCallback &done) {
RAY_CHECK(subscribe != nullptr);
return actor_sub_executor_.AsyncSubscribeAll(ClientID::Nil(), subscribe, done);
return log_based_actor_sub_executor_.AsyncSubscribeAll(ClientID::Nil(), subscribe,
done);
}

Status RedisActorInfoAccessor::AsyncSubscribe(
Status RedisLogBasedActorInfoAccessor::AsyncSubscribe(
const ActorID &actor_id, const SubscribeCallback<ActorID, ActorTableData> &subscribe,
const StatusCallback &done) {
RAY_CHECK(subscribe != nullptr);
return actor_sub_executor_.AsyncSubscribe(subscribe_id_, actor_id, subscribe, done);
return log_based_actor_sub_executor_.AsyncSubscribe(subscribe_id_, actor_id, subscribe,
done);
}

Status RedisActorInfoAccessor::AsyncUnsubscribe(const ActorID &actor_id,
const StatusCallback &done) {
return actor_sub_executor_.AsyncUnsubscribe(subscribe_id_, actor_id, done);
Status RedisLogBasedActorInfoAccessor::AsyncUnsubscribe(const ActorID &actor_id,
const StatusCallback &done) {
return log_based_actor_sub_executor_.AsyncUnsubscribe(subscribe_id_, actor_id, done);
}

Status RedisActorInfoAccessor::AsyncAddCheckpoint(
Status RedisLogBasedActorInfoAccessor::AsyncAddCheckpoint(
const std::shared_ptr<ActorCheckpointData> &data_ptr,
const StatusCallback &callback) {
ActorID actor_id = ActorID::FromBinary(data_ptr->actor_id());
Expand All @@ -143,7 +168,7 @@ Status RedisActorInfoAccessor::AsyncAddCheckpoint(
return actor_cp_table.Add(actor_id.JobId(), checkpoint_id, data_ptr, on_add_data_done);
}

Status RedisActorInfoAccessor::AsyncGetCheckpoint(
Status RedisLogBasedActorInfoAccessor::AsyncGetCheckpoint(
const ActorCheckpointID &checkpoint_id, const ActorID &actor_id,
const OptionalItemCallback<ActorCheckpointData> &callback) {
RAY_CHECK(callback != nullptr);
Expand All @@ -164,7 +189,7 @@ Status RedisActorInfoAccessor::AsyncGetCheckpoint(
return actor_cp_table.Lookup(actor_id.JobId(), checkpoint_id, on_success, on_failure);
}

Status RedisActorInfoAccessor::AsyncGetCheckpointID(
Status RedisLogBasedActorInfoAccessor::AsyncGetCheckpointID(
const ActorID &actor_id,
const OptionalItemCallback<ActorCheckpointIdData> &callback) {
RAY_CHECK(callback != nullptr);
Expand All @@ -183,7 +208,7 @@ Status RedisActorInfoAccessor::AsyncGetCheckpointID(
return cp_id_table.Lookup(actor_id.JobId(), actor_id, on_success, on_failure);
}

Status RedisActorInfoAccessor::AsyncAddCheckpointID(
Status RedisLogBasedActorInfoAccessor::AsyncAddCheckpointID(
const ActorID &actor_id, const ActorCheckpointID &checkpoint_id,
const StatusCallback &callback) {
ActorCheckpointIdTable::WriteCallback on_done = nullptr;
Expand All @@ -196,6 +221,79 @@ Status RedisActorInfoAccessor::AsyncAddCheckpointID(
return cp_id_table.AddCheckpointId(actor_id.JobId(), actor_id, checkpoint_id, on_done);
}

RedisActorInfoAccessor::RedisActorInfoAccessor(RedisGcsClient *client_impl)
: RedisLogBasedActorInfoAccessor(client_impl),
actor_sub_executor_(client_impl_->actor_table()) {}

std::vector<ActorID> RedisActorInfoAccessor::GetAllActorID() const {
return client_impl_->actor_table().GetAllActorID();
}

Status RedisActorInfoAccessor::Get(const ActorID &actor_id,
ActorTableData *actor_table_data) const {
return client_impl_->actor_table().Get(actor_id, actor_table_data);
}

Status RedisActorInfoAccessor::AsyncGet(
const ActorID &actor_id, const OptionalItemCallback<ActorTableData> &callback) {
RAY_CHECK(callback != nullptr);
auto on_done = [callback](RedisGcsClient *client, const ActorID &actor_id,
const ActorTableData &data) { callback(Status::OK(), data); };

auto on_failure = [callback](RedisGcsClient *client, const ActorID &actor_id) {
if (callback != nullptr) {
callback(Status::Invalid("Get actor failed."), boost::none);
}
};

return client_impl_->actor_table().Lookup(JobID::Nil(), actor_id, on_done, on_failure);
}

Status RedisActorInfoAccessor::AsyncRegister(
const std::shared_ptr<ActorTableData> &data_ptr, const StatusCallback &callback) {
auto on_register_done = [callback](RedisGcsClient *client, const ActorID &actor_id,
const ActorTableData &data) {
if (callback != nullptr) {
callback(Status::OK());
}
};
ActorID actor_id = ActorID::FromBinary(data_ptr->actor_id());
return client_impl_->actor_table().Add(JobID::Nil(), actor_id, data_ptr,
on_register_done);
}

Status RedisActorInfoAccessor::AsyncUpdate(
const ActorID &actor_id, const std::shared_ptr<ActorTableData> &data_ptr,
const StatusCallback &callback) {
auto on_update_done = [callback](RedisGcsClient *client, const ActorID &actor_id,
const ActorTableData &data) {
if (callback != nullptr) {
callback(Status::OK());
}
};
return client_impl_->actor_table().Add(JobID::Nil(), actor_id, data_ptr,
on_update_done);
}

Status RedisActorInfoAccessor::AsyncSubscribeAll(
const SubscribeCallback<ActorID, ActorTableData> &subscribe,
const StatusCallback &done) {
RAY_CHECK(subscribe != nullptr);
return actor_sub_executor_.AsyncSubscribeAll(ClientID::Nil(), subscribe, done);
}

Status RedisActorInfoAccessor::AsyncSubscribe(
const ActorID &actor_id, const SubscribeCallback<ActorID, ActorTableData> &subscribe,
const StatusCallback &done) {
RAY_CHECK(subscribe != nullptr);
return actor_sub_executor_.AsyncSubscribe(subscribe_id_, actor_id, subscribe, done);
}

Status RedisActorInfoAccessor::AsyncUnsubscribe(const ActorID &actor_id,
const StatusCallback &done) {
return actor_sub_executor_.AsyncUnsubscribe(subscribe_id_, actor_id, done);
}

RedisJobInfoAccessor::RedisJobInfoAccessor(RedisGcsClient *client_impl)
: client_impl_(client_impl), job_sub_executor_(client_impl->job_table()) {}

Expand Down
Loading

0 comments on commit 039961b

Please sign in to comment.