Skip to content

Commit

Permalink
[Part 4] Support passing metadata to Ray error object. (ray-project#2…
Browse files Browse the repository at this point in the history
…0714)

This will allow us to pass protobuf-defined metadata to the error object. It will allow us to propagate meaningful metadata (e.g., function names for ObjectLostError, ip address for ObjectLostError within raylet, or many useful metadata for ActorDiedError).

### Impl
We will allow the error object to include "payload". The payload will be the protobuf message that includes metadata.
```
# Prev 
ACTOR_DIED (metadata) | (empty)

# New
ACTOR_DIED (metadata) | Serialized protobuf message (body)
```

Note that currently, the body is 

serialized message pack that contains serialized protobuf. This needs to be cleaned up in the future.
  • Loading branch information
rkooo567 authored Dec 1, 2021
1 parent 5e1692e commit 4b9524e
Show file tree
Hide file tree
Showing 3 changed files with 123 additions and 109 deletions.
91 changes: 76 additions & 15 deletions src/ray/common/ray_object.cc
Original file line number Diff line number Diff line change
Expand Up @@ -14,36 +14,97 @@

#include "ray/common/ray_object.h"

namespace ray {
#include "msgpack.hpp"

namespace {

std::shared_ptr<LocalMemoryBuffer> MakeBufferFromString(const uint8_t *data,
size_t data_size) {
std::shared_ptr<ray::LocalMemoryBuffer> MakeBufferFromString(const uint8_t *data,
size_t data_size) {
auto metadata = const_cast<uint8_t *>(data);
auto meta_buffer =
std::make_shared<LocalMemoryBuffer>(metadata, data_size, /*copy_data=*/true);
std::make_shared<ray::LocalMemoryBuffer>(metadata, data_size, /*copy_data=*/true);
return meta_buffer;
}

std::shared_ptr<LocalMemoryBuffer> MakeBufferFromString(const std::string &str) {
std::shared_ptr<ray::LocalMemoryBuffer> MakeBufferFromString(const std::string &str) {
return MakeBufferFromString(reinterpret_cast<const uint8_t *>(str.data()), str.size());
}

std::shared_ptr<LocalMemoryBuffer> MakeErrorMetadataBuffer(rpc::ErrorType error_type) {
std::shared_ptr<ray::LocalMemoryBuffer> MakeErrorMetadataBuffer(
ray::rpc::ErrorType error_type) {
std::string meta = std::to_string(static_cast<int>(error_type));
return MakeBufferFromString(meta);
}

RayObject::RayObject(rpc::ErrorType error_type)
: RayObject(nullptr, MakeErrorMetadataBuffer(error_type), {}) {}
/// Serialize the protobuf message to msg pack.
///
/// Ray uses Msgpack for cross-language object serialization.
/// This method creates a msgpack serialized buffer that contains
/// serialized protobuf message.
///
/// Language frontend can deseiralize this object to obtain
/// data stored in a given protobuf. Check `serialization.py` to see
/// how this works.
///
/// NOTE: The function guarantees that the returned buffer contains data.
///
/// \param protobuf_message The protobuf message to serialize.
/// \return The buffer that contains serialized msgpack message.
template <class ProtobufMessage>
std::shared_ptr<ray::LocalMemoryBuffer> MakeSerializeErrorBuffer(
const ProtobufMessage &protobuf_message) {
// Structure of bytes stored in object store:

// First serialize RayException by the following steps:
// PB's RayException
// --(PB Serialization)-->
// --(msgpack Serialization)-->
// msgpack_serialized_exception(MSE)

// Then add it's length to the head(for coross-language deserialization):
// [MSE's length(9 bytes)] [MSE]

std::string pb_serialized_exception;
protobuf_message.SerializeToString(&pb_serialized_exception);
msgpack::sbuffer msgpack_serialized_exception;
msgpack::packer<msgpack::sbuffer> packer(msgpack_serialized_exception);
packer.pack_bin(pb_serialized_exception.size());
packer.pack_bin_body(pb_serialized_exception.data(), pb_serialized_exception.size());
std::unique_ptr<ray::LocalMemoryBuffer> final_buffer =
std::make_unique<ray::LocalMemoryBuffer>(msgpack_serialized_exception.size() +
kMessagePackOffset);
// copy msgpack-serialized bytes
std::memcpy(final_buffer->Data() + kMessagePackOffset,
msgpack_serialized_exception.data(), msgpack_serialized_exception.size());
// copy offset
msgpack::sbuffer msgpack_int;
msgpack::pack(msgpack_int, msgpack_serialized_exception.size());
std::memcpy(final_buffer->Data(), msgpack_int.data(), msgpack_int.size());
RAY_CHECK(final_buffer->Data() != nullptr);
RAY_CHECK(final_buffer->Size() != 0);

return final_buffer;
}

} // namespace

namespace ray {

RayObject::RayObject(rpc::ErrorType error_type, const std::string &append_data)
: RayObject(MakeBufferFromString(append_data), MakeErrorMetadataBuffer(error_type),
{}) {}
RayObject::RayObject(rpc::ErrorType error_type, const rpc::RayErrorInfo *ray_error_info) {
if (ray_error_info == nullptr) {
Init(nullptr, MakeErrorMetadataBuffer(error_type), {});
return;
}

RayObject::RayObject(rpc::ErrorType error_type, const uint8_t *append_data,
size_t append_data_size)
: RayObject(MakeBufferFromString(append_data, append_data_size),
MakeErrorMetadataBuffer(error_type), {}) {}
RAY_CHECK(ray_error_info->has_actor_init_failure());
// This is temporarily here because changing this requires changes in all language
// frontend.
// TODO(sang, lixin): Remove it.
const auto error_buffer =
MakeSerializeErrorBuffer<rpc::RayException>(ray_error_info->actor_init_failure());
Init(std::move(error_buffer), MakeErrorMetadataBuffer(error_type), {});
return;
}

bool RayObject::IsException(rpc::ErrorType *error_type) const {
if (metadata_ == nullptr) {
Expand Down
73 changes: 45 additions & 28 deletions src/ray/common/ray_object.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,35 +40,26 @@ class RayObject {
/// \param[in] nested_rfs ObjectRefs that were serialized in data.
/// \param[in] copy_data Whether this class should hold a copy of data.
RayObject(const std::shared_ptr<Buffer> &data, const std::shared_ptr<Buffer> &metadata,
const std::vector<rpc::ObjectReference> &nested_refs, bool copy_data = false)
: data_(data),
metadata_(metadata),
nested_refs_(nested_refs),
has_data_copy_(copy_data),
creation_time_nanos_(absl::GetCurrentTimeNanos()) {
if (has_data_copy_) {
// If this object is required to hold a copy of the data,
// make a copy if the passed in buffers don't already have a copy.
if (data_ && !data_->OwnsData()) {
data_ = std::make_shared<LocalMemoryBuffer>(data_->Data(), data_->Size(),
/*copy_data=*/true);
}

if (metadata_ && !metadata_->OwnsData()) {
metadata_ = std::make_shared<LocalMemoryBuffer>(
metadata_->Data(), metadata_->Size(), /*copy_data=*/true);
}
}

RAY_CHECK(data_ || metadata_) << "Data and metadata cannot both be empty.";
const std::vector<rpc::ObjectReference> &nested_refs,
bool copy_data = false) {
Init(data, metadata, nested_refs, copy_data);
}

RayObject(rpc::ErrorType error_type);

RayObject(rpc::ErrorType error_type, const std::string &append_data);

RayObject(rpc::ErrorType error_type, const uint8_t *append_data,
size_t append_data_size);
/// Create an Ray error object. It uses msgpack for the serialization format now.
/// Ray error objects consist of metadata that indicates the error code (see
/// rpc::ErrorType) and the serialized message pack that contains serialized
/// rpc::RayErrorInfo as a body.
///
/// NOTE: There's no reason to have outer-side message pack for serialization (it is
/// tech debt).
/// TODO(sang): Clean this up.
///
/// NOTE: The deserialization side needs to have its own corresponding deserializer.
/// See `serialization.py's def _deserialize_msgpack_data`.
///
/// \param[in] error_type Error type.
/// \param[in] ray_error_info The error information that this object body contains.
RayObject(rpc::ErrorType error_type, const rpc::RayErrorInfo *ray_error_info = nullptr);

/// Return the data of the ray object.
const std::shared_ptr<Buffer> &GetData() const { return data_; }
Expand Down Expand Up @@ -109,9 +100,35 @@ class RayObject {
int64_t CreationTimeNanos() const { return creation_time_nanos_; }

private:
void Init(const std::shared_ptr<Buffer> &data, const std::shared_ptr<Buffer> &metadata,
const std::vector<rpc::ObjectReference> &nested_refs,
bool copy_data = false) {
data_ = data;
metadata_ = metadata;
nested_refs_ = nested_refs;
has_data_copy_ = copy_data;
creation_time_nanos_ = absl::GetCurrentTimeNanos();

if (has_data_copy_) {
// If this object is required to hold a copy of the data,
// make a copy if the passed in buffers don't already have a copy.
if (data_ && !data_->OwnsData()) {
data_ = std::make_shared<LocalMemoryBuffer>(data_->Data(), data_->Size(),
/*copy_data=*/true);
}

if (metadata_ && !metadata_->OwnsData()) {
metadata_ = std::make_shared<LocalMemoryBuffer>(
metadata_->Data(), metadata_->Size(), /*copy_data=*/true);
}
}

RAY_CHECK(data_ || metadata_) << "Data and metadata cannot both be empty.";
}

std::shared_ptr<Buffer> data_;
std::shared_ptr<Buffer> metadata_;
const std::vector<rpc::ObjectReference> nested_refs_;
std::vector<rpc::ObjectReference> nested_refs_;
/// Whether this class holds a data copy.
bool has_data_copy_;
/// Whether this object was accessed.
Expand Down
68 changes: 2 additions & 66 deletions src/ray/core_worker/task_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -14,68 +14,11 @@

#include "ray/core_worker/task_manager.h"

#include "msgpack.hpp"
#include "ray/common/buffer.h"
#include "ray/common/common_protocol.h"
#include "ray/common/constants.h"
#include "ray/util/util.h"

namespace {

///
/// Serialize the protobuf message to msg pack.
///
/// Ray uses Msgpack for cross-language object serialization.
/// This method creates a msgpack serialized buffer that contains
/// serialized protobuf message.
///
/// Language frontend can deseiralize this object to obtain
/// data stored in a given protobuf. Check `serialization.py` to see
/// how this works.
///
/// NOTE: The function guarantees that the returned buffer contains data.
///
/// \param protobuf_message The protobuf message to serialize.
/// \return The buffer that contains serialized msgpack message.
template <class ProtobufMessage>
std::unique_ptr<ray::LocalMemoryBuffer> SerializePBToMsgPack(
const ProtobufMessage *protobuf_message) {
RAY_CHECK(protobuf_message != nullptr);
// Structure of bytes stored in object store:

// First serialize RayException by the following steps:
// PB's RayException
// --(PB Serialization)-->
// --(msgpack Serialization)-->
// msgpack_serialized_exception(MSE)

// Then add it's length to the head(for coross-language deserialization):
// [MSE's length(9 bytes)] [MSE]

std::string pb_serialized_exception;
protobuf_message->SerializeToString(&pb_serialized_exception);
msgpack::sbuffer msgpack_serialized_exception;
msgpack::packer<msgpack::sbuffer> packer(msgpack_serialized_exception);
packer.pack_bin(pb_serialized_exception.size());
packer.pack_bin_body(pb_serialized_exception.data(), pb_serialized_exception.size());
std::unique_ptr<ray::LocalMemoryBuffer> final_buffer =
std::make_unique<ray::LocalMemoryBuffer>(msgpack_serialized_exception.size() +
kMessagePackOffset);
// copy msgpack-serialized bytes
std::memcpy(final_buffer->Data() + kMessagePackOffset,
msgpack_serialized_exception.data(), msgpack_serialized_exception.size());
// copy offset
msgpack::sbuffer msgpack_int;
msgpack::pack(msgpack_int, msgpack_serialized_exception.size());
std::memcpy(final_buffer->Data(), msgpack_int.data(), msgpack_int.size());
RAY_CHECK(final_buffer->Data() != nullptr);
RAY_CHECK(final_buffer->Size() != 0);

return final_buffer;
}

} // namespace

namespace ray {
namespace core {

Expand Down Expand Up @@ -622,15 +565,8 @@ void TaskManager::MarkTaskReturnObjectsFailed(const TaskSpecification &spec,
const auto object_id = ObjectID::FromIndex(task_id, /*index=*/i + 1);
if (ray_error_info == nullptr) {
RAY_UNUSED(in_memory_store_->Put(RayObject(error_type), object_id));
continue;
}

if (ray_error_info->has_actor_init_failure()) {
auto creation_task_exception = ray_error_info->actor_init_failure();
const auto final_buffer =
SerializePBToMsgPack<rpc::RayException>(&creation_task_exception);
RAY_UNUSED(in_memory_store_->Put(
RayObject(error_type, final_buffer->Data(), final_buffer->Size()), object_id));
} else {
RAY_UNUSED(in_memory_store_->Put(RayObject(error_type, ray_error_info), object_id));
}
}
}
Expand Down

0 comments on commit 4b9524e

Please sign in to comment.