Skip to content

Commit

Permalink
GCS-Based actor management implementation (ray-project#6763)
Browse files Browse the repository at this point in the history
* add gcs actor manager

* fix test_metrics.py

* fix TestTaskInfo

* fix comment

* fix comment

* fix comment

* fix comment

* fix comment

* fix comment

* fix compile error

* fix merge error

Co-authored-by: senlin.zsl <[email protected]>
  • Loading branch information
wumuzi520 and senlin.zsl authored Apr 13, 2020
1 parent 1b0f6fd commit 4a81793
Show file tree
Hide file tree
Showing 38 changed files with 2,636 additions and 116 deletions.
46 changes: 45 additions & 1 deletion BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -325,6 +325,9 @@ cc_library(
deps = [
":gcs",
":gcs_service_rpc",
":node_manager_rpc",
":raylet_lib",
":worker_rpc",
],
)

Expand Down Expand Up @@ -725,6 +728,45 @@ cc_test(
],
)

cc_test(
name = "gcs_node_manager_test",
srcs = [
"src/ray/gcs/gcs_server/test/gcs_node_manager_test.cc",
"src/ray/gcs/gcs_server/test/gcs_test_util.h",
],
copts = COPTS,
deps = [
":gcs_server_lib",
"@com_google_googletest//:gtest_main",
],
)

cc_test(
name = "gcs_actor_scheduler_test",
srcs = [
"src/ray/gcs/gcs_server/test/gcs_actor_scheduler_test.cc",
"src/ray/gcs/gcs_server/test/gcs_test_util.h",
],
copts = COPTS,
deps = [
":gcs_server_lib",
"@com_google_googletest//:gtest_main",
],
)

cc_test(
name = "gcs_actor_manager_test",
srcs = [
"src/ray/gcs/gcs_server/test/gcs_actor_manager_test.cc",
"src/ray/gcs/gcs_server/test/gcs_test_util.h",
],
copts = COPTS,
deps = [
":gcs_server_lib",
"@com_google_googletest//:gtest_main",
],
)

cc_library(
name = "service_based_gcs_client_lib",
srcs = glob(
Expand All @@ -739,7 +781,8 @@ cc_library(
),
copts = COPTS,
deps = [
":gcs_server_lib",
":gcs",
":gcs_service_rpc",
],
)

Expand All @@ -754,6 +797,7 @@ cc_test(
"//:redis-server",
],
deps = [
":gcs_server_lib",
":service_based_gcs_client_lib",
"@com_google_googletest//:gtest_main",
],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ public boolean wasCurrentActorReconstructed() {
return false;
}

return runtime.getGcsClient().actorExists(getCurrentActorId());
return runtime.getGcsClient().wasCurrentActorReconstructed(getCurrentActorId());
}

@Override
Expand Down
26 changes: 24 additions & 2 deletions java/runtime/src/main/java/io/ray/runtime/gcs/GcsClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import io.ray.api.id.TaskId;
import io.ray.api.id.UniqueId;
import io.ray.api.runtimecontext.NodeInfo;
import io.ray.runtime.config.RayConfig;
import io.ray.runtime.generated.Gcs;
import io.ray.runtime.generated.Gcs.ActorCheckpointIdData;
import io.ray.runtime.generated.Gcs.GcsNodeInfo;
Expand All @@ -27,9 +28,7 @@
* An implementation of GcsClient.
*/
public class GcsClient {

private static Logger LOGGER = LoggerFactory.getLogger(GcsClient.class);

private RedisClient primary;

private List<RedisClient> shards;
Expand Down Expand Up @@ -126,6 +125,29 @@ public boolean actorExists(ActorId actorId) {
return primary.exists(key);
}

public boolean wasCurrentActorReconstructed(ActorId actorId) {
byte[] key = ArrayUtils.addAll(TablePrefix.ACTOR.toString().getBytes(), actorId.getBytes());
if (!RayConfig.getInstance().gcsServiceEnabled) {
return primary.exists(key);
}

// TODO(ZhuSenlin): Get the actor table data from CoreWorker later.
byte[] value = primary.get(key);
if (value == null) {
return false;
}
Gcs.ActorTableData actorTableData = null;
try {
actorTableData = Gcs.ActorTableData.parseFrom(value);
} catch (InvalidProtocolBufferException e) {
throw new RuntimeException("Received invalid protobuf data from GCS.");
}

long maxReconstructions = actorTableData.getMaxReconstructions();
long remainingReconstructions = actorTableData.getRemainingReconstructions();
return maxReconstructions - remainingReconstructions != 0;
}

/**
* Query whether the raylet task exists in Gcs.
*/
Expand Down
2 changes: 1 addition & 1 deletion python/ray/tests/test_metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,7 @@ def getpid(self):
if child_actor_info["state"] == -1:
assert child_actor_info["requiredResources"]["CustomResource"] == 1
else:
assert child_actor_info["state"] == 0
assert child_actor_info["state"] == 1
assert len(child_actor_info["children"]) == 0
assert child_actor_info["usedResources"]["CPU"] == 1

Expand Down
4 changes: 4 additions & 0 deletions src/ray/common/ray_config_def.h
Original file line number Diff line number Diff line change
Expand Up @@ -263,6 +263,10 @@ RAY_CONFIG(int64_t, internal_gcs_service_connect_wait_milliseconds, 100)
/// The interval at which the gcs server will check if redis has gone down.
/// When this happens, gcs server will kill itself.
RAY_CONFIG(int64_t, gcs_redis_heartbeat_interval_milliseconds, 100)
/// Duration to wait between retries for leasing worker in gcs server.
RAY_CONFIG(uint32_t, gcs_lease_worker_retry_interval_ms, 200)
/// Duration to wait between retries for creating actor in gcs server.
RAY_CONFIG(uint32_t, gcs_create_actor_retry_interval_ms, 200)

/// Maximum number of times to retry putting an object when the plasma store is full.
/// Can be set to -1 to enable unlimited retries.
Expand Down
23 changes: 18 additions & 5 deletions src/ray/core_worker/core_worker.cc
Original file line number Diff line number Diff line change
Expand Up @@ -410,6 +410,16 @@ CoreWorker::CoreWorker(const CoreWorkerOptions &options, const WorkerID &worker_
return std::shared_ptr<raylet::RayletClient>(
new raylet::RayletClient(std::move(grpc_client)));
};

std::function<Status(const TaskSpecification &, const gcs::StatusCallback &)>
actor_create_callback = nullptr;
if (RayConfig::instance().gcs_service_enabled()) {
actor_create_callback = [this](const TaskSpecification &task_spec,
const gcs::StatusCallback &callback) {
return gcs_client_->Actors().AsyncCreateActor(task_spec, callback);
};
}

direct_actor_submitter_ = std::unique_ptr<CoreWorkerDirectActorTaskSubmitter>(
new CoreWorkerDirectActorTaskSubmitter(rpc_address_, client_factory, memory_store_,
task_manager_));
Expand All @@ -418,7 +428,8 @@ CoreWorker::CoreWorker(const CoreWorkerOptions &options, const WorkerID &worker_
std::unique_ptr<CoreWorkerDirectTaskSubmitter>(new CoreWorkerDirectTaskSubmitter(
rpc_address_, local_raylet_client_, client_factory, raylet_client_factory,
memory_store_, task_manager_, local_raylet_id,
RayConfig::instance().worker_lease_timeout_milliseconds()));
RayConfig::instance().worker_lease_timeout_milliseconds(),
std::move(actor_create_callback)));
future_resolver_.reset(new FutureResolver(memory_store_, client_factory));
// Unfortunately the raylet client has to be constructed after the receivers.
if (direct_task_receiver_ != nullptr) {
Expand Down Expand Up @@ -624,7 +635,6 @@ void CoreWorker::RegisterToGcs() {
RAY_CHECK_OK(gcs_client_->Workers().AsyncRegisterWorker(options_.worker_type, worker_id,
worker_info, nullptr));
}

void CoreWorker::CheckForRayletFailure(const boost::system::error_code &error) {
if (error == boost::asio::error::operation_aborted) {
return;
Expand Down Expand Up @@ -1244,7 +1254,9 @@ bool CoreWorker::AddActorHandle(std::unique_ptr<ActorHandle> actor_handle,
// Register a callback to handle actor notifications.
auto actor_notification_callback = [this](const ActorID &actor_id,
const gcs::ActorTableData &actor_data) {
if (actor_data.state() == gcs::ActorTableData::RECONSTRUCTING) {
if (actor_data.state() == gcs::ActorTableData::PENDING) {
// The actor is being created and not yet ready, just ignore!
} else if (actor_data.state() == gcs::ActorTableData::RECONSTRUCTING) {
absl::MutexLock lock(&actor_handles_mutex_);
auto it = actor_handles_.find(actor_id);
RAY_CHECK(it != actor_handles_.end());
Expand All @@ -1265,8 +1277,9 @@ bool CoreWorker::AddActorHandle(std::unique_ptr<ActorHandle> actor_handle,
direct_actor_submitter_->ConnectActor(actor_id, actor_data.address());
}

RAY_LOG(INFO) << "received notification on actor, state="
<< static_cast<int>(actor_data.state()) << ", actor_id: " << actor_id
const auto &actor_state = gcs::ActorTableData::ActorState_Name(actor_data.state());
RAY_LOG(INFO) << "received notification on actor, state: " << actor_state
<< ", actor_id: " << actor_id
<< ", ip address: " << actor_data.address().ip_address()
<< ", port: " << actor_data.address().port() << ", worker_id: "
<< WorkerID::FromBinary(actor_data.address().worker_id())
Expand Down
25 changes: 24 additions & 1 deletion src/ray/core_worker/transport/direct_task_transport.cc
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,37 @@
#include "ray/core_worker/transport/direct_task_transport.h"

#include "ray/core_worker/transport/dependency_resolver.h"
#include "ray/core_worker/transport/direct_actor_transport.h"

namespace ray {

Status CoreWorkerDirectTaskSubmitter::SubmitTask(TaskSpecification task_spec) {
RAY_LOG(DEBUG) << "Submit task " << task_spec.TaskId();
resolver_.ResolveDependencies(task_spec, [this, task_spec]() {
RAY_LOG(DEBUG) << "Task dependencies resolved " << task_spec.TaskId();
if (actor_create_callback_ && task_spec.IsActorCreationTask()) {
// If gcs actor management is enabled, the actor creation task will be sent to gcs
// server directly after the in-memory dependent objects are resolved.
// For more details please see the protocol of actor management based on gcs.
// https://docs.google.com/document/d/1EAWide-jy05akJp6OMtDn58XOK7bUyruWMia4E-fV28/edit?usp=sharing
auto actor_id = task_spec.ActorCreationId();
auto task_id = task_spec.TaskId();
RAY_LOG(INFO) << "Submitting actor creation task to GCS: " << actor_id;
auto status =
actor_create_callback_(task_spec, [this, actor_id, task_id](Status status) {
// If GCS is failed, GcsRpcClient may receive IOError status but it will not
// trigger this callback, because GcsRpcClient has retry logic at the
// bottom. So if this callback is invoked with an error there must be
// something wrong with the protocol of gcs-based actor management.
// So just check `status.ok()` here.
RAY_CHECK_OK(status);
RAY_LOG(INFO) << "Actor creation task submitted to GCS: " << actor_id;
task_finisher_->CompletePendingTask(task_id, rpc::PushTaskReply(),
rpc::Address());
});
RAY_CHECK_OK(status);
return;
}

absl::MutexLock lock(&mu_);
// Note that the dependencies in the task spec are mutated to only contain
// plasma dependencies after ResolveDependencies finishes.
Expand Down
25 changes: 17 additions & 8 deletions src/ray/core_worker/transport/direct_task_transport.h
Original file line number Diff line number Diff line change
Expand Up @@ -49,21 +49,23 @@ using SchedulingKey = std::tuple<SchedulingClass, std::vector<ObjectID>, ActorID
// This class is thread-safe.
class CoreWorkerDirectTaskSubmitter {
public:
CoreWorkerDirectTaskSubmitter(rpc::Address rpc_address,
std::shared_ptr<WorkerLeaseInterface> lease_client,
rpc::ClientFactoryFn client_factory,
LeaseClientFactoryFn lease_client_factory,
std::shared_ptr<CoreWorkerMemoryStore> store,
std::shared_ptr<TaskFinisherInterface> task_finisher,
ClientID local_raylet_id, int64_t lease_timeout_ms)
explicit CoreWorkerDirectTaskSubmitter(
rpc::Address rpc_address, std::shared_ptr<WorkerLeaseInterface> lease_client,
rpc::ClientFactoryFn client_factory, LeaseClientFactoryFn lease_client_factory,
std::shared_ptr<CoreWorkerMemoryStore> store,
std::shared_ptr<TaskFinisherInterface> task_finisher, ClientID local_raylet_id,
int64_t lease_timeout_ms,
std::function<Status(const TaskSpecification &, const gcs::StatusCallback &)>
actor_create_callback = nullptr)
: rpc_address_(rpc_address),
local_lease_client_(lease_client),
client_factory_(client_factory),
lease_client_factory_(lease_client_factory),
resolver_(store, task_finisher),
task_finisher_(task_finisher),
lease_timeout_ms_(lease_timeout_ms),
local_raylet_id_(local_raylet_id),
lease_timeout_ms_(lease_timeout_ms) {}
actor_create_callback_(std::move(actor_create_callback)) {}

/// Schedule a task for direct submission to a worker.
///
Expand Down Expand Up @@ -148,6 +150,13 @@ class CoreWorkerDirectTaskSubmitter {
/// if a remote raylet tells us to spill the task back to the local raylet.
const ClientID local_raylet_id_;

/// A function to override actor creation. The callback will be called once the actor
/// creation task has been accepted for submission, but the actor may not be created
/// yet.
std::function<Status(const TaskSpecification &task_spec,
const gcs::StatusCallback &callback)>
actor_create_callback_;

// Protects task submission state below.
absl::Mutex mu_;

Expand Down
9 changes: 9 additions & 0 deletions src/ray/gcs/accessor.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
#define RAY_GCS_ACCESSOR_H

#include "ray/common/id.h"
#include "ray/common/task/task_spec.h"
#include "ray/gcs/callback.h"
#include "ray/gcs/entry_change_notification.h"
#include "ray/protobuf/gcs.pb.h"
Expand Down Expand Up @@ -46,6 +47,14 @@ class ActorInfoAccessor {
virtual Status AsyncGet(const ActorID &actor_id,
const OptionalItemCallback<rpc::ActorTableData> &callback) = 0;

/// Create an actor to GCS asynchronously.
///
/// \param task_spec The specification for the actor creation task.
/// \param callback Callback that will be called after the actor info is written to GCS.
/// \return Status
virtual Status AsyncCreateActor(const TaskSpecification &task_spec,
const StatusCallback &callback) = 0;

/// Register an actor to GCS asynchronously.
///
/// \param data_ptr The actor that will be registered to the GCS.
Expand Down
18 changes: 17 additions & 1 deletion src/ray/gcs/gcs_client/service_based_accessor.cc
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ ServiceBasedActorInfoAccessor::ServiceBasedActorInfoAccessor(
ServiceBasedGcsClient *client_impl)
: subscribe_id_(ClientID::FromRandom()),
client_impl_(client_impl),
actor_sub_executor_(client_impl->GetRedisGcsClient().log_based_actor_table()) {}
actor_sub_executor_(client_impl->GetRedisGcsClient().actor_table()) {}

Status ServiceBasedActorInfoAccessor::GetAll(
std::vector<ActorTableData> *actor_table_data_list) {
Expand All @@ -106,6 +106,22 @@ Status ServiceBasedActorInfoAccessor::AsyncGet(
return Status::OK();
}

Status ServiceBasedActorInfoAccessor::AsyncCreateActor(
const ray::TaskSpecification &task_spec, const ray::gcs::StatusCallback &callback) {
RAY_CHECK(task_spec.IsActorCreationTask() && callback);
rpc::CreateActorRequest request;
request.mutable_task_spec()->CopyFrom(task_spec.GetMessage());
client_impl_->GetGcsRpcClient().CreateActor(
request, [callback](const Status &, const rpc::CreateActorReply &reply) {
auto status =
reply.status().code() == (int)StatusCode::OK
? Status()
: Status(StatusCode(reply.status().code()), reply.status().message());
callback(status);
});
return Status::OK();
}

Status ServiceBasedActorInfoAccessor::AsyncRegister(
const std::shared_ptr<rpc::ActorTableData> &data_ptr,
const StatusCallback &callback) {
Expand Down
6 changes: 5 additions & 1 deletion src/ray/gcs/gcs_client/service_based_accessor.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
#ifndef RAY_GCS_SERVICE_BASED_ACCESSOR_H
#define RAY_GCS_SERVICE_BASED_ACCESSOR_H

#include <ray/common/task/task_spec.h>
#include "ray/gcs/accessor.h"
#include "ray/gcs/subscription_executor.h"
#include "ray/util/sequencer.h"
Expand Down Expand Up @@ -63,6 +64,9 @@ class ServiceBasedActorInfoAccessor : public ActorInfoAccessor {
Status AsyncGet(const ActorID &actor_id,
const OptionalItemCallback<rpc::ActorTableData> &callback) override;

Status AsyncCreateActor(const TaskSpecification &task_spec,
const StatusCallback &callback) override;

Status AsyncRegister(const std::shared_ptr<rpc::ActorTableData> &data_ptr,
const StatusCallback &callback) override;

Expand Down Expand Up @@ -97,7 +101,7 @@ class ServiceBasedActorInfoAccessor : public ActorInfoAccessor {
private:
ServiceBasedGcsClient *client_impl_;

typedef SubscriptionExecutor<ActorID, ActorTableData, LogBasedActorTable>
typedef SubscriptionExecutor<ActorID, ActorTableData, ActorTable>
ActorSubscriptionExecutor;
ActorSubscriptionExecutor actor_sub_executor_;

Expand Down
15 changes: 15 additions & 0 deletions src/ray/gcs/gcs_server/actor_info_handler_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,21 @@
namespace ray {
namespace rpc {

void DefaultActorInfoHandler::HandleCreateActor(
const ray::rpc::CreateActorRequest &request, ray::rpc::CreateActorReply *reply,
ray::rpc::SendReplyCallback send_reply_callback) {
RAY_CHECK(request.task_spec().type() == TaskType::ACTOR_CREATION_TASK);
auto actor_id =
ActorID::FromBinary(request.task_spec().actor_creation_task_spec().actor_id());

RAY_LOG(INFO) << "Registering actor, actor id = " << actor_id;
gcs_actor_manager_.RegisterActor(request, [reply, send_reply_callback, actor_id](
std::shared_ptr<gcs::GcsActor> actor) {
RAY_LOG(INFO) << "Registered actor, actor id = " << actor_id;
GCS_RPC_SEND_REPLY(send_reply_callback, reply, Status::OK());
});
}

void DefaultActorInfoHandler::HandleGetActorInfo(
const rpc::GetActorInfoRequest &request, rpc::GetActorInfoReply *reply,
rpc::SendReplyCallback send_reply_callback) {
Expand Down
Loading

0 comments on commit 4a81793

Please sign in to comment.