Skip to content

Commit

Permalink
Get rid of Arrow test utils (ray-project#3734)
Browse files Browse the repository at this point in the history
* convert code to proper C++

* revert changes to "id.h" because ray-project#3765 has been merged.

* revert changes to Python bindings because they will be removed in ray-project#3541

* remove dependencies of Arrow logging

* revert changes to Arrow logging

* lint
  • Loading branch information
suquark authored and pcmoritz committed Jan 18, 2019
1 parent 319c134 commit 16a3b99
Show file tree
Hide file tree
Showing 7 changed files with 50 additions and 39 deletions.
23 changes: 12 additions & 11 deletions src/ray/object_manager/object_buffer_pool.cc
Original file line number Diff line number Diff line change
@@ -1,14 +1,15 @@
#include "ray/object_manager/object_buffer_pool.h"

#include "arrow/util/logging.h"
#include "ray/status.h"
#include "ray/util/logging.h"

namespace ray {

ObjectBufferPool::ObjectBufferPool(const std::string &store_socket_name,
uint64_t chunk_size)
: default_chunk_size_(chunk_size) {
store_socket_name_ = store_socket_name;
ARROW_CHECK_OK(store_client_.Connect(store_socket_name_.c_str()));
RAY_ARROW_CHECK_OK(store_client_.Connect(store_socket_name_.c_str()));
}

ObjectBufferPool::~ObjectBufferPool() {
Expand All @@ -23,7 +24,7 @@ ObjectBufferPool::~ObjectBufferPool() {
}
RAY_CHECK(get_buffer_state_.empty());
RAY_CHECK(create_buffer_state_.empty());
ARROW_CHECK_OK(store_client_.Disconnect());
RAY_ARROW_CHECK_OK(store_client_.Disconnect());
}

uint64_t ObjectBufferPool::GetNumChunks(uint64_t data_size) {
Expand All @@ -44,7 +45,7 @@ std::pair<const ObjectBufferPool::ChunkInfo &, ray::Status> ObjectBufferPool::Ge
if (get_buffer_state_.count(object_id) == 0) {
plasma::ObjectBuffer object_buffer;
plasma::ObjectID plasma_id = object_id.to_plasma_id();
ARROW_CHECK_OK(store_client_.Get(&plasma_id, 1, 0, &object_buffer));
RAY_ARROW_CHECK_OK(store_client_.Get(&plasma_id, 1, 0, &object_buffer));
if (object_buffer.data == nullptr) {
RAY_LOG(ERROR) << "Failed to get object";
return std::pair<const ObjectBufferPool::ChunkInfo &, ray::Status>(
Expand Down Expand Up @@ -73,14 +74,14 @@ void ObjectBufferPool::ReleaseGetChunk(const ObjectID &object_id, uint64_t chunk
buffer_state.references--;
RAY_LOG(DEBUG) << "ReleaseBuffer " << object_id << " " << buffer_state.references;
if (buffer_state.references == 0) {
ARROW_CHECK_OK(store_client_.Release(object_id.to_plasma_id()));
RAY_ARROW_CHECK_OK(store_client_.Release(object_id.to_plasma_id()));
get_buffer_state_.erase(object_id);
}
}

void ObjectBufferPool::AbortGet(const ObjectID &object_id) {
std::lock_guard<std::mutex> lock(pool_mutex_);
ARROW_CHECK_OK(store_client_.Release(object_id.to_plasma_id()));
RAY_ARROW_CHECK_OK(store_client_.Release(object_id.to_plasma_id()));
get_buffer_state_.erase(object_id);
}

Expand Down Expand Up @@ -156,16 +157,16 @@ void ObjectBufferPool::SealChunk(const ObjectID &object_id, const uint64_t chunk
<< create_buffer_state_[object_id].num_seals_remaining;
if (create_buffer_state_[object_id].num_seals_remaining == 0) {
const plasma::ObjectID plasma_id = object_id.to_plasma_id();
ARROW_CHECK_OK(store_client_.Seal(plasma_id));
ARROW_CHECK_OK(store_client_.Release(plasma_id));
RAY_ARROW_CHECK_OK(store_client_.Seal(plasma_id));
RAY_ARROW_CHECK_OK(store_client_.Release(plasma_id));
create_buffer_state_.erase(object_id);
}
}

void ObjectBufferPool::AbortCreate(const ObjectID &object_id) {
const plasma::ObjectID plasma_id = object_id.to_plasma_id();
ARROW_CHECK_OK(store_client_.Release(plasma_id));
ARROW_CHECK_OK(store_client_.Abort(plasma_id));
RAY_ARROW_CHECK_OK(store_client_.Release(plasma_id));
RAY_ARROW_CHECK_OK(store_client_.Abort(plasma_id));
create_buffer_state_.erase(object_id);
}

Expand Down Expand Up @@ -194,7 +195,7 @@ void ObjectBufferPool::FreeObjects(const std::vector<ObjectID> &object_ids) {
plasma_ids.push_back(id.to_plasma_id());
}
std::lock_guard<std::mutex> lock(pool_mutex_);
ARROW_CHECK_OK(store_client_.Delete(plasma_ids));
RAY_ARROW_CHECK_OK(store_client_.Delete(plasma_ids));
}

std::string ObjectBufferPool::DebugString() const {
Expand Down
8 changes: 4 additions & 4 deletions src/ray/object_manager/object_store_notification_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
#include <boost/bind.hpp>
#include <boost/function.hpp>

#include "arrow/util/logging.h"
#include "ray/status.h"

#include "ray/common/common_protocol.h"
#include "ray/object_manager/object_store_notification_manager.h"
Expand All @@ -20,17 +20,17 @@ ObjectStoreNotificationManager::ObjectStoreNotificationManager(
num_adds_processed_(0),
num_removes_processed_(0),
socket_(io_service) {
ARROW_CHECK_OK(store_client_.Connect(store_socket_name.c_str()));
RAY_ARROW_CHECK_OK(store_client_.Connect(store_socket_name.c_str()));

ARROW_CHECK_OK(store_client_.Subscribe(&c_socket_));
RAY_ARROW_CHECK_OK(store_client_.Subscribe(&c_socket_));
boost::system::error_code ec;
socket_.assign(boost::asio::local::stream_protocol(), c_socket_, ec);
assert(!ec.value());
NotificationWait();
}

ObjectStoreNotificationManager::~ObjectStoreNotificationManager() {
ARROW_CHECK_OK(store_client_.Disconnect());
RAY_ARROW_CHECK_OK(store_client_.Disconnect());
}

void ObjectStoreNotificationManager::NotificationWait() {
Expand Down
16 changes: 8 additions & 8 deletions src/ray/object_manager/test/object_manager_stress_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

#include "gtest/gtest.h"

#include "arrow/util/logging.h"
#include "ray/status.h"

#include "ray/object_manager/object_manager.h"

Expand Down Expand Up @@ -157,8 +157,8 @@ class TestObjectManagerBase : public ::testing::Test {
server2.reset(new MockServer(main_service, om_config_2, gcs_client_2));

// connect to stores.
ARROW_CHECK_OK(client1.Connect(store_id_1));
ARROW_CHECK_OK(client2.Connect(store_id_2));
RAY_ARROW_CHECK_OK(client1.Connect(store_id_1));
RAY_ARROW_CHECK_OK(client2.Connect(store_id_2));
}

void TearDown() {
Expand All @@ -179,9 +179,9 @@ class TestObjectManagerBase : public ::testing::Test {
uint8_t metadata[] = {5};
int64_t metadata_size = sizeof(metadata);
std::shared_ptr<Buffer> data;
ARROW_CHECK_OK(client.Create(object_id.to_plasma_id(), data_size, metadata,
metadata_size, &data));
ARROW_CHECK_OK(client.Seal(object_id.to_plasma_id()));
RAY_ARROW_CHECK_OK(client.Create(object_id.to_plasma_id(), data_size, metadata,
metadata_size, &data));
RAY_ARROW_CHECK_OK(client.Seal(object_id.to_plasma_id()));
return object_id;
}

Expand Down Expand Up @@ -291,14 +291,14 @@ class StressTestObjectManager : public TestObjectManagerBase {
plasma::ObjectBuffer GetObject(plasma::PlasmaClient &client, ObjectID &object_id) {
plasma::ObjectBuffer object_buffer;
plasma::ObjectID plasma_id = object_id.to_plasma_id();
ARROW_CHECK_OK(client.Get(&plasma_id, 1, 0, &object_buffer));
RAY_ARROW_CHECK_OK(client.Get(&plasma_id, 1, 0, &object_buffer));
return object_buffer;
}

static unsigned char *GetDigest(plasma::PlasmaClient &client, ObjectID &object_id) {
const int64_t size = sizeof(uint64_t);
static unsigned char digest_1[size];
ARROW_CHECK_OK(client.Hash(object_id.to_plasma_id(), &digest_1[0]));
RAY_ARROW_CHECK_OK(client.Hash(object_id.to_plasma_id(), &digest_1[0]));
return digest_1;
}

Expand Down
12 changes: 6 additions & 6 deletions src/ray/object_manager/test/object_manager_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

#include "gtest/gtest.h"

#include "arrow/util/logging.h"
#include "ray/status.h"

#include "ray/object_manager/object_manager.h"

Expand Down Expand Up @@ -142,8 +142,8 @@ class TestObjectManagerBase : public ::testing::Test {
server2.reset(new MockServer(main_service, om_config_2, gcs_client_2));

// connect to stores.
ARROW_CHECK_OK(client1.Connect(store_id_1));
ARROW_CHECK_OK(client2.Connect(store_id_2));
RAY_ARROW_CHECK_OK(client1.Connect(store_id_1));
RAY_ARROW_CHECK_OK(client2.Connect(store_id_2));
}

void TearDown() {
Expand All @@ -168,9 +168,9 @@ class TestObjectManagerBase : public ::testing::Test {
uint8_t metadata[] = {5};
int64_t metadata_size = sizeof(metadata);
std::shared_ptr<Buffer> data;
ARROW_CHECK_OK(client.Create(object_id.to_plasma_id(), data_size, metadata,
metadata_size, &data));
ARROW_CHECK_OK(client.Seal(object_id.to_plasma_id()));
RAY_ARROW_CHECK_OK(client.Create(object_id.to_plasma_id(), data_size, metadata,
metadata_size, &data));
RAY_ARROW_CHECK_OK(client.Seal(object_id.to_plasma_id()));
return object_id;
}

Expand Down
8 changes: 4 additions & 4 deletions src/ray/raylet/node_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

#include <fstream>

#include "arrow/util/logging.h"
#include "ray/status.h"

#include "ray/common/common_protocol.h"
#include "ray/id.h"
Expand Down Expand Up @@ -90,7 +90,7 @@ NodeManager::NodeManager(boost::asio::io_service &io_service,
RAY_CHECK_OK(object_manager_.SubscribeObjDeleted(
[this](const ObjectID &object_id) { HandleObjectMissing(object_id); }));

ARROW_CHECK_OK(store_client_.Connect(config.store_socket_name.c_str()));
RAY_ARROW_CHECK_OK(store_client_.Connect(config.store_socket_name.c_str()));
}

ray::Status NodeManager::RegisterGcs() {
Expand Down Expand Up @@ -1148,8 +1148,8 @@ void NodeManager::TreatTaskAsFailed(const Task &task) {
if (!status.IsPlasmaObjectExists()) {
// TODO(rkn): We probably don't want this checks. E.g., if the object
// store is full, we don't want to kill the raylet.
ARROW_CHECK_OK(status);
ARROW_CHECK_OK(store_client_.Seal(object_id.to_plasma_id()));
RAY_ARROW_CHECK_OK(status);
RAY_ARROW_CHECK_OK(store_client_.Seal(object_id.to_plasma_id()));
}
}
// A task failing is equivalent to assigning and finishing the task, so clean
Expand Down
12 changes: 6 additions & 6 deletions src/ray/raylet/object_manager_integration_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

#include "gtest/gtest.h"

#include "arrow/util/logging.h"
#include "ray/status.h"

#include "ray/raylet/raylet.h"

Expand Down Expand Up @@ -76,8 +76,8 @@ class TestObjectManagerBase : public ::testing::Test {
GetNodeManagerConfig("raylet_2", store_sock_2), om_config_2, gcs_client_2));

// connect to stores.
ARROW_CHECK_OK(client1.Connect(store_sock_1));
ARROW_CHECK_OK(client2.Connect(store_sock_2));
RAY_ARROW_CHECK_OK(client1.Connect(store_sock_1));
RAY_ARROW_CHECK_OK(client2.Connect(store_sock_2));
}

void TearDown() {
Expand All @@ -104,9 +104,9 @@ class TestObjectManagerBase : public ::testing::Test {
uint8_t metadata[] = {5};
int64_t metadata_size = sizeof(metadata);
std::shared_ptr<Buffer> data;
ARROW_CHECK_OK(client.Create(object_id.to_plasma_id(), data_size, metadata,
metadata_size, &data));
ARROW_CHECK_OK(client.Seal(object_id.to_plasma_id()));
RAY_ARROW_CHECK_OK(client.Create(object_id.to_plasma_id(), data_size, metadata,
metadata_size, &data));
RAY_ARROW_CHECK_OK(client.Seal(object_id.to_plasma_id()));
return object_id;
}

Expand Down
10 changes: 10 additions & 0 deletions src/ray/status.h
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,16 @@
// logged message.
#define RAY_CHECK_OK(s) RAY_CHECK_OK_PREPEND(s, "Bad status")

// This macro is used to replace the "ARROW_CHECK_OK_PREPEND" macro.
#define RAY_ARROW_CHECK_OK_PREPEND(to_call, msg) \
do { \
::arrow::Status _s = (to_call); \
RAY_CHECK(_s.ok()) << (msg) << ": " << _s.ToString(); \
} while (0)

// This macro is used to replace the "ARROW_CHECK_OK" macro.
#define RAY_ARROW_CHECK_OK(s) RAY_ARROW_CHECK_OK_PREPEND(s, "Bad status")

namespace ray {

enum class StatusCode : char {
Expand Down

0 comments on commit 16a3b99

Please sign in to comment.