Skip to content

Commit

Permalink
[core] Pass owner address from the workers to the raylet (ray-project…
Browse files Browse the repository at this point in the history
…#9299)

* Add intended worker ID to GetObjectStatus, tests

* Remove TaskID owner_id

* lint

* Add owner address to task args

* Make TaskArg a virtual class, remove multi args

* Set owner address for task args

* merge

* Fix tests

* Add ObjectRefs to task dependency manager, pass from task spec args

* tmp

* tmp

* Fix

* Add ownership info for task arguments

* Convert WaitForDirectActorCallArgs

* lint

* build

* update

* build

* java

* Move code

* build

* Revert "Fix Google log directory again (ray-project#9063)"

This reverts commit 275da2e.

* Fix free

* fix tests

* Fix tests

* build

* build

* fix

* Change assertion to warning to fix java
  • Loading branch information
stephanie-wang authored Jul 9, 2020
1 parent 4687b80 commit 0389735
Show file tree
Hide file tree
Showing 28 changed files with 409 additions and 199 deletions.
8 changes: 5 additions & 3 deletions python/ray/tests/test_reference_counting.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,10 +38,12 @@ def _fill_object_store_and_get(oid, succeed=True, object_MiB=40,
oid = ray.ObjectID(oid)

if succeed:
ray.get(oid)
wait_for_condition(
lambda: ray.worker.global_worker.core_worker.object_exists(oid))
else:
with pytest.raises(ray.exceptions.RayTimeoutError):
ray.get(oid, timeout=0.1)
wait_for_condition(
lambda: not ray.worker.global_worker.core_worker.object_exists(oid)
)


def _check_refcounts(expected):
Expand Down
8 changes: 5 additions & 3 deletions python/ray/tests/test_reference_counting_2.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,12 @@ def _fill_object_store_and_get(oid, succeed=True, object_MiB=40,
oid = ray.ObjectID(oid)

if succeed:
ray.get(oid)
wait_for_condition(
lambda: ray.worker.global_worker.core_worker.object_exists(oid))
else:
with pytest.raises(ray.exceptions.RayTimeoutError):
ray.get(oid, timeout=0.1)
wait_for_condition(
lambda: not ray.worker.global_worker.core_worker.object_exists(oid)
)


# Test that an object containing object IDs within it pins the inner IDs
Expand Down
4 changes: 3 additions & 1 deletion src/ray/common/task/task.cc
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,9 @@ const TaskSpecification &Task::GetTaskSpecification() const { return task_spec_;

void Task::IncrementNumForwards() { task_execution_spec_.IncrementNumForwards(); }

const std::vector<ObjectID> &Task::GetDependencies() const { return dependencies_; }
const std::vector<rpc::ObjectReference> &Task::GetDependencies() const {
return dependencies_;
}

void Task::ComputeDependencies() { dependencies_ = task_spec_.GetDependencies(); }

Expand Down
4 changes: 2 additions & 2 deletions src/ray/common/task/task.h
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ class Task {
/// arguments and the mutable execution dependencies.
///
/// \return The object dependencies.
const std::vector<ObjectID> &GetDependencies() const;
const std::vector<rpc::ObjectReference> &GetDependencies() const;

/// Update the dynamic/mutable information for this task.
/// \param task Task structure with updated dynamic information.
Expand Down Expand Up @@ -110,7 +110,7 @@ class Task {
/// A cached copy of the task's object dependencies, including arguments from
/// the TaskSpecification and execution dependencies from the
/// TaskExecutionSpecification.
std::vector<ObjectID> dependencies_;
std::vector<rpc::ObjectReference> dependencies_;

/// For direct task calls, overrides the dispatch behaviour to send an RPC
/// back to the submitting worker.
Expand Down
22 changes: 21 additions & 1 deletion src/ray/common/task/task_spec.cc
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,11 @@ ObjectID TaskSpecification::ArgId(size_t arg_index) const {
return ObjectID::FromBinary(message_->args(arg_index).object_ref().object_id());
}

rpc::ObjectReference TaskSpecification::ArgRef(size_t arg_index) const {
RAY_CHECK(ArgByRef(arg_index));
return message_->args(arg_index).object_ref();
}

const uint8_t *TaskSpecification::ArgData(size_t arg_index) const {
return reinterpret_cast<const uint8_t *>(message_->args(arg_index).data().data());
}
Expand All @@ -141,7 +146,7 @@ const ResourceSet &TaskSpecification::GetRequiredResources() const {
return *required_resources_;
}

std::vector<ObjectID> TaskSpecification::GetDependencies() const {
std::vector<ObjectID> TaskSpecification::GetDependencyIds() const {
std::vector<ObjectID> dependencies;
for (size_t i = 0; i < NumArgs(); ++i) {
if (ArgByRef(i)) {
Expand All @@ -154,6 +159,21 @@ std::vector<ObjectID> TaskSpecification::GetDependencies() const {
return dependencies;
}

std::vector<rpc::ObjectReference> TaskSpecification::GetDependencies() const {
std::vector<rpc::ObjectReference> dependencies;
for (size_t i = 0; i < NumArgs(); ++i) {
if (ArgByRef(i)) {
dependencies.push_back(message_->args(i).object_ref());
}
}
if (IsActorTask()) {
const auto &dummy_ref =
GetReferenceForActorDummyObject(PreviousActorTaskDummyObjectId());
dependencies.push_back(dummy_ref);
}
return dependencies;
}

const ResourceSet &TaskSpecification::GetRequiredPlacementResources() const {
return *required_placement_resources_;
}
Expand Down
18 changes: 17 additions & 1 deletion src/ray/common/task/task_spec.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,13 @@ namespace ray {
typedef ResourceSet SchedulingClassDescriptor;
typedef int SchedulingClass;

static inline rpc::ObjectReference GetReferenceForActorDummyObject(
const ObjectID &object_id) {
rpc::ObjectReference ref;
ref.set_object_id(object_id.Binary());
return ref;
};

/// Wrapper class of protobuf `TaskSpec`, see `common.proto` for details.
/// TODO(ekl) we should consider passing around std::unique_ptrs<TaskSpecification>
/// instead `const TaskSpecification`, since this class is actually mutable.
Expand Down Expand Up @@ -71,6 +78,8 @@ class TaskSpecification : public MessageWrapper<rpc::TaskSpec> {

ObjectID ArgId(size_t arg_index) const;

rpc::ObjectReference ArgRef(size_t arg_index) const;

ObjectID ReturnId(size_t return_index) const;

const uint8_t *ArgData(size_t arg_index) const;
Expand Down Expand Up @@ -109,11 +118,18 @@ class TaskSpecification : public MessageWrapper<rpc::TaskSpec> {
/// \return The resources that are required to place a task on a node.
const ResourceSet &GetRequiredPlacementResources() const;

/// Return the ObjectIDs of any dependencies passed by reference to this
/// task. This is recomputed each time, so it can be used if the task spec is
/// mutated.
///
/// \return The recomputed IDs of the dependencies for the task.
std::vector<ObjectID> GetDependencyIds() const;

/// Return the dependencies of this task. This is recomputed each time, so it can
/// be used if the task spec is mutated.
///
/// \return The recomputed dependencies for the task.
std::vector<ObjectID> GetDependencies() const;
std::vector<rpc::ObjectReference> GetDependencies() const;

bool IsDriverTask() const;

Expand Down
12 changes: 12 additions & 0 deletions src/ray/common/test_util.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,22 @@

#include "gtest/gtest.h"
#include "ray/common/id.h"
#include "ray/protobuf/common.pb.h"
#include "ray/util/util.h"

namespace ray {

static inline std::vector<rpc::ObjectReference> ObjectIdsToRefs(
std::vector<ObjectID> object_ids) {
std::vector<rpc::ObjectReference> refs;
for (const auto &object_id : object_ids) {
rpc::ObjectReference ref;
ref.set_object_id(object_id.Binary());
refs.push_back(ref);
}
return refs;
}

class Buffer;
class RayObject;

Expand Down
57 changes: 36 additions & 21 deletions src/ray/core_worker/core_worker.cc
Original file line number Diff line number Diff line change
Expand Up @@ -354,7 +354,8 @@ CoreWorker::CoreWorker(const CoreWorkerOptions &options, const WorkerID &worker_
actor_reporter_ = std::unique_ptr<ActorReporter>(new ActorReporter(gcs_client_));

plasma_store_provider_.reset(new CoreWorkerPlasmaStoreProvider(
options_.store_socket, local_raylet_client_, options_.check_signals,
options_.store_socket, local_raylet_client_, reference_counter_,
options_.check_signals,
/*evict_if_full=*/RayConfig::instance().object_pinning_enabled(),
boost::bind(&CoreWorker::TriggerGlobalGC, this),
boost::bind(&CoreWorker::CurrentCallSite, this)));
Expand All @@ -368,7 +369,9 @@ CoreWorker::CoreWorker(const CoreWorkerOptions &options, const WorkerID &worker_

auto check_node_alive_fn = [this](const ClientID &node_id) {
auto node = gcs_client_->Nodes().Get(node_id);
RAY_CHECK(node.has_value());
if (!node) {
return false;
}
return node->state() == rpc::GcsNodeInfo::ALIVE;
};
auto reconstruct_object_callback = [this](const ObjectID &object_id) {
Expand Down Expand Up @@ -447,7 +450,8 @@ CoreWorker::CoreWorker(const CoreWorkerOptions &options, const WorkerID &worker_
future_resolver_.reset(new FutureResolver(memory_store_, client_factory, rpc_address_));
// Unfortunately the raylet client has to be constructed after the receivers.
if (direct_task_receiver_ != nullptr) {
direct_task_receiver_->Init(client_factory, rpc_address_, local_raylet_client_);
task_argument_waiter_.reset(new DependencyWaiterImpl(*local_raylet_client_));
direct_task_receiver_->Init(client_factory, rpc_address_, task_argument_waiter_);
}

actor_manager_ = std::unique_ptr<ActorManager>(
Expand Down Expand Up @@ -1415,12 +1419,8 @@ Status CoreWorker::ExecuteTask(const TaskSpecification &task_spec,
// execution and unpinned once the task completes. We will notify the caller
// about any IDs that we are still borrowing by the time the task completes.
std::vector<ObjectID> borrowed_ids;
RAY_CHECK_OK(BuildArgsForExecutor(task_spec, &args, &arg_reference_ids, &borrowed_ids));
// Pin the borrowed IDs for the duration of the task.
for (const auto &borrowed_id : borrowed_ids) {
RAY_LOG(DEBUG) << "Incrementing ref for borrowed ID " << borrowed_id;
reference_counter_->AddLocalReference(borrowed_id, task_spec.CallSiteString());
}
RAY_CHECK_OK(
GetAndPinArgsForExecutor(task_spec, &args, &arg_reference_ids, &borrowed_ids));

std::vector<ObjectID> return_ids;
for (size_t i = 0; i < task_spec.NumReturns(); i++) {
Expand Down Expand Up @@ -1468,8 +1468,9 @@ Status CoreWorker::ExecuteTask(const TaskSpecification &task_spec,

// Get the reference counts for any IDs that we borrowed during this task and
// return them to the caller. This will notify the caller of any IDs that we
// (or a nested task) are still borrowing. It will also any new IDs that were
// contained in a borrowed ID that we (or a nested task) are now borrowing.
// (or a nested task) are still borrowing. It will also notify the caller of
// any new IDs that were contained in a borrowed ID that we (or a nested
// task) are now borrowing.
if (!borrowed_ids.empty()) {
reference_counter_->GetAndClearLocalBorrowers(borrowed_ids, borrowed_refs);
}
Expand Down Expand Up @@ -1532,10 +1533,10 @@ void CoreWorker::ExecuteTaskLocalMode(const TaskSpecification &task_spec,
SetActorId(old_id);
}

Status CoreWorker::BuildArgsForExecutor(const TaskSpecification &task,
std::vector<std::shared_ptr<RayObject>> *args,
std::vector<ObjectID> *arg_reference_ids,
std::vector<ObjectID> *borrowed_ids) {
Status CoreWorker::GetAndPinArgsForExecutor(const TaskSpecification &task,
std::vector<std::shared_ptr<RayObject>> *args,
std::vector<ObjectID> *arg_reference_ids,
std::vector<ObjectID> *borrowed_ids) {
auto num_args = task.NumArgs();
args->resize(num_args);
arg_reference_ids->resize(num_args);
Expand All @@ -1560,10 +1561,15 @@ Status CoreWorker::BuildArgsForExecutor(const TaskSpecification &task,
it->second.push_back(i);
}
arg_reference_ids->at(i) = arg_id;
// The task borrows all args passed by reference. Because the task does
// not have a reference to the argument ID in the frontend, it is not
// possible for the task to still be borrowing the argument by the time
// it finishes.
// Pin all args passed by reference for the duration of the task. This
// ensures that when the task completes, we can retrieve metadata about
// any borrowed ObjectIDs that were serialized in the argument's value.
RAY_LOG(DEBUG) << "Incrementing ref for argument ID " << arg_id;
reference_counter_->AddLocalReference(arg_id, task.CallSiteString());
// Attach the argument's owner's address. This is needed to retrieve the
// value from plasma.
reference_counter_->AddBorrowedObject(arg_id, ObjectID::Nil(),
task.ArgRef(i).owner_address());
borrowed_ids->push_back(arg_id);
} else {
// A pass-by-value argument.
Expand All @@ -1585,6 +1591,11 @@ Status CoreWorker::BuildArgsForExecutor(const TaskSpecification &task,
// possible for the task to continue borrowing these arguments by the
// time it finishes.
for (const auto &inlined_id : task.ArgInlinedIds(i)) {
RAY_LOG(DEBUG) << "Incrementing ref for borrowed ID " << inlined_id;
// We do not need to add the ownership information here because it will
// get added once the language frontend deserializes the value, before
// the ObjectID can be used.
reference_counter_->AddLocalReference(inlined_id, task.CallSiteString());
borrowed_ids->push_back(inlined_id);
}
}
Expand Down Expand Up @@ -1649,10 +1660,14 @@ void CoreWorker::HandleDirectActorCallArgWaitComplete(
return;
}

// Post on the task execution event loop since this may trigger the
// execution of a task that is now ready to run.
task_execution_service_.post([=] {
direct_task_receiver_->HandleDirectActorCallArgWaitComplete(request, reply,
send_reply_callback);
RAY_LOG(DEBUG) << "Arg wait complete for tag " << request.tag();
task_argument_waiter_->OnWaitComplete(request.tag());
});

send_reply_callback(Status::OK(), nullptr, nullptr);
}

void CoreWorker::HandleGetObjectStatus(const rpc::GetObjectStatusRequest &request,
Expand Down
39 changes: 25 additions & 14 deletions src/ray/core_worker/core_worker.h
Original file line number Diff line number Diff line change
Expand Up @@ -849,10 +849,17 @@ class CoreWorker : public rpc::CoreWorkerServiceHandler {
void ExecuteTaskLocalMode(const TaskSpecification &task_spec,
const ActorID &actor_id = ActorID::Nil());

/// Build arguments for task executor. This would loop through all the arguments
/// in task spec, and for each of them that's passed by reference (ObjectID),
/// fetch its content from store and; for arguments that are passed by value,
/// just copy their content.
/// Get the values of the task arguments for the executor. Values are
/// retrieved from the local plasma store or, if the value is inlined, from
/// the task spec.
///
/// This also pins all plasma arguments and ObjectIDs that were contained in
/// an inlined argument by adding a local reference in the reference counter.
/// This is to ensure that we have the address of the object's owner, which
/// is needed to retrieve the value. It also ensures that when the task
/// completes, we can retrieve any metadata about objects that are still
/// being borrowed by this process. The IDs should be unpinned once the task
/// completes.
///
/// \param spec[in] task Task specification.
/// \param args[out] args Argument data as RayObjects.
Expand All @@ -863,16 +870,16 @@ class CoreWorker : public rpc::CoreWorkerServiceHandler {
/// // TODO(edoakes): this is a bit of a hack that's necessary because
/// we have separate serialization paths for by-value and by-reference
/// arguments in Python. This should ideally be handled better there.
/// \param args[out] borrowed_ids ObjectIDs that we are borrowing from the
/// task caller for the duration of the task execution. This
/// vector will be populated with all argument IDs that were
/// passed by reference and any ObjectIDs that were included
/// in the task spec's inlined arguments.
/// \return The arguments for passing to task executor.
Status BuildArgsForExecutor(const TaskSpecification &task,
std::vector<std::shared_ptr<RayObject>> *args,
std::vector<ObjectID> *arg_reference_ids,
std::vector<ObjectID> *borrowed_ids);
/// \param args[out] pinned_ids ObjectIDs that should be unpinned once the
/// task completes execution. This vector will be populated
/// with all argument IDs that were passed by reference and
/// any ObjectIDs that were included in the task spec's
/// inlined arguments.
/// \return Error if the values could not be retrieved.
Status GetAndPinArgsForExecutor(const TaskSpecification &task,
std::vector<std::shared_ptr<RayObject>> *args,
std::vector<ObjectID> *arg_reference_ids,
std::vector<ObjectID> *pinned_ids);

/// Returns whether the message was sent to the wrong worker. The right error reply
/// is sent automatically. Messages end up on the wrong worker when a worker dies
Expand Down Expand Up @@ -1048,6 +1055,10 @@ class CoreWorker : public rpc::CoreWorkerServiceHandler {
/// Common rpc service for all worker modules.
rpc::CoreWorkerGrpcService grpc_service_;

/// Used to notify the task receiver when the arguments of a queued
/// actor task are ready.
std::shared_ptr<DependencyWaiterImpl> task_argument_waiter_;

// Interface that receives tasks from direct actor calls.
std::unique_ptr<CoreWorkerDirectTaskReceiver> direct_task_receiver_;

Expand Down
30 changes: 30 additions & 0 deletions src/ray/core_worker/reference_count.cc
Original file line number Diff line number Diff line change
Expand Up @@ -325,6 +325,11 @@ void ReferenceCounter::RemoveSubmittedTaskReferences(
bool ReferenceCounter::GetOwner(const ObjectID &object_id,
rpc::Address *owner_address) const {
absl::MutexLock lock(&mutex_);
return GetOwnerInternal(object_id, owner_address);
}

bool ReferenceCounter::GetOwnerInternal(const ObjectID &object_id,
rpc::Address *owner_address) const {
auto it = object_id_refs_.find(object_id);
if (it == object_id_refs_.end()) {
return false;
Expand All @@ -338,6 +343,31 @@ bool ReferenceCounter::GetOwner(const ObjectID &object_id,
}
}

std::vector<rpc::Address> ReferenceCounter::GetOwnerAddresses(
const std::vector<ObjectID> object_ids) const {
absl::MutexLock lock(&mutex_);
std::vector<rpc::Address> owner_addresses;
for (const auto &object_id : object_ids) {
rpc::Address owner_addr;
bool has_owner = GetOwnerInternal(object_id, &owner_addr);
if (!has_owner) {
RAY_LOG(WARNING)
<< " Object IDs generated randomly (ObjectID.from_random()) or out-of-band "
"(ObjectID.from_binary(...)) cannot be passed to ray.get(), ray.wait(), or "
"as "
"a task argument because Ray does not know which task will create them. "
"If this was not how your object ID was generated, please file an issue "
"at https://github.com/ray-project/ray/issues/";
// TODO(swang): Java does not seem to keep the ref count properly, so the
// entry may get deleted.
owner_addresses.push_back(rpc::Address());
} else {
owner_addresses.push_back(owner_addr);
}
}
return owner_addresses;
}

void ReferenceCounter::FreePlasmaObjects(const std::vector<ObjectID> &object_ids) {
absl::MutexLock lock(&mutex_);
for (const ObjectID &object_id : object_ids) {
Expand Down
Loading

0 comments on commit 0389735

Please sign in to comment.