diff --git a/src/ray/object_manager/object_buffer_pool.cc b/src/ray/object_manager/object_buffer_pool.cc index a650928e3a7e5..8f9b8a461e64c 100644 --- a/src/ray/object_manager/object_buffer_pool.cc +++ b/src/ray/object_manager/object_buffer_pool.cc @@ -1,6 +1,7 @@ #include "ray/object_manager/object_buffer_pool.h" -#include "arrow/util/logging.h" +#include "ray/status.h" +#include "ray/util/logging.h" namespace ray { @@ -8,7 +9,7 @@ ObjectBufferPool::ObjectBufferPool(const std::string &store_socket_name, uint64_t chunk_size) : default_chunk_size_(chunk_size) { store_socket_name_ = store_socket_name; - ARROW_CHECK_OK(store_client_.Connect(store_socket_name_.c_str())); + RAY_ARROW_CHECK_OK(store_client_.Connect(store_socket_name_.c_str())); } ObjectBufferPool::~ObjectBufferPool() { @@ -23,7 +24,7 @@ ObjectBufferPool::~ObjectBufferPool() { } RAY_CHECK(get_buffer_state_.empty()); RAY_CHECK(create_buffer_state_.empty()); - ARROW_CHECK_OK(store_client_.Disconnect()); + RAY_ARROW_CHECK_OK(store_client_.Disconnect()); } uint64_t ObjectBufferPool::GetNumChunks(uint64_t data_size) { @@ -44,7 +45,7 @@ std::pair ObjectBufferPool::Ge if (get_buffer_state_.count(object_id) == 0) { plasma::ObjectBuffer object_buffer; plasma::ObjectID plasma_id = object_id.to_plasma_id(); - ARROW_CHECK_OK(store_client_.Get(&plasma_id, 1, 0, &object_buffer)); + RAY_ARROW_CHECK_OK(store_client_.Get(&plasma_id, 1, 0, &object_buffer)); if (object_buffer.data == nullptr) { RAY_LOG(ERROR) << "Failed to get object"; return std::pair( @@ -73,14 +74,14 @@ void ObjectBufferPool::ReleaseGetChunk(const ObjectID &object_id, uint64_t chunk buffer_state.references--; RAY_LOG(DEBUG) << "ReleaseBuffer " << object_id << " " << buffer_state.references; if (buffer_state.references == 0) { - ARROW_CHECK_OK(store_client_.Release(object_id.to_plasma_id())); + RAY_ARROW_CHECK_OK(store_client_.Release(object_id.to_plasma_id())); get_buffer_state_.erase(object_id); } } void ObjectBufferPool::AbortGet(const ObjectID &object_id) { std::lock_guard lock(pool_mutex_); - ARROW_CHECK_OK(store_client_.Release(object_id.to_plasma_id())); + RAY_ARROW_CHECK_OK(store_client_.Release(object_id.to_plasma_id())); get_buffer_state_.erase(object_id); } @@ -156,16 +157,16 @@ void ObjectBufferPool::SealChunk(const ObjectID &object_id, const uint64_t chunk << create_buffer_state_[object_id].num_seals_remaining; if (create_buffer_state_[object_id].num_seals_remaining == 0) { const plasma::ObjectID plasma_id = object_id.to_plasma_id(); - ARROW_CHECK_OK(store_client_.Seal(plasma_id)); - ARROW_CHECK_OK(store_client_.Release(plasma_id)); + RAY_ARROW_CHECK_OK(store_client_.Seal(plasma_id)); + RAY_ARROW_CHECK_OK(store_client_.Release(plasma_id)); create_buffer_state_.erase(object_id); } } void ObjectBufferPool::AbortCreate(const ObjectID &object_id) { const plasma::ObjectID plasma_id = object_id.to_plasma_id(); - ARROW_CHECK_OK(store_client_.Release(plasma_id)); - ARROW_CHECK_OK(store_client_.Abort(plasma_id)); + RAY_ARROW_CHECK_OK(store_client_.Release(plasma_id)); + RAY_ARROW_CHECK_OK(store_client_.Abort(plasma_id)); create_buffer_state_.erase(object_id); } @@ -194,7 +195,7 @@ void ObjectBufferPool::FreeObjects(const std::vector &object_ids) { plasma_ids.push_back(id.to_plasma_id()); } std::lock_guard lock(pool_mutex_); - ARROW_CHECK_OK(store_client_.Delete(plasma_ids)); + RAY_ARROW_CHECK_OK(store_client_.Delete(plasma_ids)); } std::string ObjectBufferPool::DebugString() const { diff --git a/src/ray/object_manager/object_store_notification_manager.cc b/src/ray/object_manager/object_store_notification_manager.cc index 7c327ebbdbad1..efa0c211d51df 100644 --- a/src/ray/object_manager/object_store_notification_manager.cc +++ b/src/ray/object_manager/object_store_notification_manager.cc @@ -5,7 +5,7 @@ #include #include -#include "arrow/util/logging.h" +#include "ray/status.h" #include "ray/common/common_protocol.h" #include "ray/object_manager/object_store_notification_manager.h" @@ -20,9 +20,9 @@ ObjectStoreNotificationManager::ObjectStoreNotificationManager( num_adds_processed_(0), num_removes_processed_(0), socket_(io_service) { - ARROW_CHECK_OK(store_client_.Connect(store_socket_name.c_str())); + RAY_ARROW_CHECK_OK(store_client_.Connect(store_socket_name.c_str())); - ARROW_CHECK_OK(store_client_.Subscribe(&c_socket_)); + RAY_ARROW_CHECK_OK(store_client_.Subscribe(&c_socket_)); boost::system::error_code ec; socket_.assign(boost::asio::local::stream_protocol(), c_socket_, ec); assert(!ec.value()); @@ -30,7 +30,7 @@ ObjectStoreNotificationManager::ObjectStoreNotificationManager( } ObjectStoreNotificationManager::~ObjectStoreNotificationManager() { - ARROW_CHECK_OK(store_client_.Disconnect()); + RAY_ARROW_CHECK_OK(store_client_.Disconnect()); } void ObjectStoreNotificationManager::NotificationWait() { diff --git a/src/ray/object_manager/test/object_manager_stress_test.cc b/src/ray/object_manager/test/object_manager_stress_test.cc index a1f792727685c..3f2d50ead52af 100644 --- a/src/ray/object_manager/test/object_manager_stress_test.cc +++ b/src/ray/object_manager/test/object_manager_stress_test.cc @@ -5,7 +5,7 @@ #include "gtest/gtest.h" -#include "arrow/util/logging.h" +#include "ray/status.h" #include "ray/object_manager/object_manager.h" @@ -157,8 +157,8 @@ class TestObjectManagerBase : public ::testing::Test { server2.reset(new MockServer(main_service, om_config_2, gcs_client_2)); // connect to stores. - ARROW_CHECK_OK(client1.Connect(store_id_1)); - ARROW_CHECK_OK(client2.Connect(store_id_2)); + RAY_ARROW_CHECK_OK(client1.Connect(store_id_1)); + RAY_ARROW_CHECK_OK(client2.Connect(store_id_2)); } void TearDown() { @@ -179,9 +179,9 @@ class TestObjectManagerBase : public ::testing::Test { uint8_t metadata[] = {5}; int64_t metadata_size = sizeof(metadata); std::shared_ptr data; - ARROW_CHECK_OK(client.Create(object_id.to_plasma_id(), data_size, metadata, - metadata_size, &data)); - ARROW_CHECK_OK(client.Seal(object_id.to_plasma_id())); + RAY_ARROW_CHECK_OK(client.Create(object_id.to_plasma_id(), data_size, metadata, + metadata_size, &data)); + RAY_ARROW_CHECK_OK(client.Seal(object_id.to_plasma_id())); return object_id; } @@ -291,14 +291,14 @@ class StressTestObjectManager : public TestObjectManagerBase { plasma::ObjectBuffer GetObject(plasma::PlasmaClient &client, ObjectID &object_id) { plasma::ObjectBuffer object_buffer; plasma::ObjectID plasma_id = object_id.to_plasma_id(); - ARROW_CHECK_OK(client.Get(&plasma_id, 1, 0, &object_buffer)); + RAY_ARROW_CHECK_OK(client.Get(&plasma_id, 1, 0, &object_buffer)); return object_buffer; } static unsigned char *GetDigest(plasma::PlasmaClient &client, ObjectID &object_id) { const int64_t size = sizeof(uint64_t); static unsigned char digest_1[size]; - ARROW_CHECK_OK(client.Hash(object_id.to_plasma_id(), &digest_1[0])); + RAY_ARROW_CHECK_OK(client.Hash(object_id.to_plasma_id(), &digest_1[0])); return digest_1; } diff --git a/src/ray/object_manager/test/object_manager_test.cc b/src/ray/object_manager/test/object_manager_test.cc index a956c1a0aa3ec..a71d7636ae2d2 100644 --- a/src/ray/object_manager/test/object_manager_test.cc +++ b/src/ray/object_manager/test/object_manager_test.cc @@ -3,7 +3,7 @@ #include "gtest/gtest.h" -#include "arrow/util/logging.h" +#include "ray/status.h" #include "ray/object_manager/object_manager.h" @@ -142,8 +142,8 @@ class TestObjectManagerBase : public ::testing::Test { server2.reset(new MockServer(main_service, om_config_2, gcs_client_2)); // connect to stores. - ARROW_CHECK_OK(client1.Connect(store_id_1)); - ARROW_CHECK_OK(client2.Connect(store_id_2)); + RAY_ARROW_CHECK_OK(client1.Connect(store_id_1)); + RAY_ARROW_CHECK_OK(client2.Connect(store_id_2)); } void TearDown() { @@ -168,9 +168,9 @@ class TestObjectManagerBase : public ::testing::Test { uint8_t metadata[] = {5}; int64_t metadata_size = sizeof(metadata); std::shared_ptr data; - ARROW_CHECK_OK(client.Create(object_id.to_plasma_id(), data_size, metadata, - metadata_size, &data)); - ARROW_CHECK_OK(client.Seal(object_id.to_plasma_id())); + RAY_ARROW_CHECK_OK(client.Create(object_id.to_plasma_id(), data_size, metadata, + metadata_size, &data)); + RAY_ARROW_CHECK_OK(client.Seal(object_id.to_plasma_id())); return object_id; } diff --git a/src/ray/raylet/node_manager.cc b/src/ray/raylet/node_manager.cc index afee3369a4c3f..e20173cb8e6a2 100644 --- a/src/ray/raylet/node_manager.cc +++ b/src/ray/raylet/node_manager.cc @@ -2,7 +2,7 @@ #include -#include "arrow/util/logging.h" +#include "ray/status.h" #include "ray/common/common_protocol.h" #include "ray/id.h" @@ -90,7 +90,7 @@ NodeManager::NodeManager(boost::asio::io_service &io_service, RAY_CHECK_OK(object_manager_.SubscribeObjDeleted( [this](const ObjectID &object_id) { HandleObjectMissing(object_id); })); - ARROW_CHECK_OK(store_client_.Connect(config.store_socket_name.c_str())); + RAY_ARROW_CHECK_OK(store_client_.Connect(config.store_socket_name.c_str())); } ray::Status NodeManager::RegisterGcs() { @@ -1148,8 +1148,8 @@ void NodeManager::TreatTaskAsFailed(const Task &task) { if (!status.IsPlasmaObjectExists()) { // TODO(rkn): We probably don't want this checks. E.g., if the object // store is full, we don't want to kill the raylet. - ARROW_CHECK_OK(status); - ARROW_CHECK_OK(store_client_.Seal(object_id.to_plasma_id())); + RAY_ARROW_CHECK_OK(status); + RAY_ARROW_CHECK_OK(store_client_.Seal(object_id.to_plasma_id())); } } // A task failing is equivalent to assigning and finishing the task, so clean diff --git a/src/ray/raylet/object_manager_integration_test.cc b/src/ray/raylet/object_manager_integration_test.cc index e1570dc9be185..8610205954483 100644 --- a/src/ray/raylet/object_manager_integration_test.cc +++ b/src/ray/raylet/object_manager_integration_test.cc @@ -3,7 +3,7 @@ #include "gtest/gtest.h" -#include "arrow/util/logging.h" +#include "ray/status.h" #include "ray/raylet/raylet.h" @@ -76,8 +76,8 @@ class TestObjectManagerBase : public ::testing::Test { GetNodeManagerConfig("raylet_2", store_sock_2), om_config_2, gcs_client_2)); // connect to stores. - ARROW_CHECK_OK(client1.Connect(store_sock_1)); - ARROW_CHECK_OK(client2.Connect(store_sock_2)); + RAY_ARROW_CHECK_OK(client1.Connect(store_sock_1)); + RAY_ARROW_CHECK_OK(client2.Connect(store_sock_2)); } void TearDown() { @@ -104,9 +104,9 @@ class TestObjectManagerBase : public ::testing::Test { uint8_t metadata[] = {5}; int64_t metadata_size = sizeof(metadata); std::shared_ptr data; - ARROW_CHECK_OK(client.Create(object_id.to_plasma_id(), data_size, metadata, - metadata_size, &data)); - ARROW_CHECK_OK(client.Seal(object_id.to_plasma_id())); + RAY_ARROW_CHECK_OK(client.Create(object_id.to_plasma_id(), data_size, metadata, + metadata_size, &data)); + RAY_ARROW_CHECK_OK(client.Seal(object_id.to_plasma_id())); return object_id; } diff --git a/src/ray/status.h b/src/ray/status.h index 47629f32ad877..fb6252b346670 100644 --- a/src/ray/status.h +++ b/src/ray/status.h @@ -44,6 +44,16 @@ // logged message. #define RAY_CHECK_OK(s) RAY_CHECK_OK_PREPEND(s, "Bad status") +// This macro is used to replace the "ARROW_CHECK_OK_PREPEND" macro. +#define RAY_ARROW_CHECK_OK_PREPEND(to_call, msg) \ + do { \ + ::arrow::Status _s = (to_call); \ + RAY_CHECK(_s.ok()) << (msg) << ": " << _s.ToString(); \ + } while (0) + +// This macro is used to replace the "ARROW_CHECK_OK" macro. +#define RAY_ARROW_CHECK_OK(s) RAY_ARROW_CHECK_OK_PREPEND(s, "Bad status") + namespace ray { enum class StatusCode : char {