Skip to content

Commit

Permalink
Multithreading refactor for ObjectManager. (ray-project#1911)
Browse files Browse the repository at this point in the history
* removes transfer service. adds separate pool for sends and receives.

* get rid of send/receive transfer counts.

* update comment.

* remove clang formatting.

* clang formatting.
  • Loading branch information
elibol authored and pcmoritz committed Apr 16, 2018
1 parent 3c817ad commit ddfc875
Show file tree
Hide file tree
Showing 11 changed files with 118 additions and 420 deletions.
1 change: 0 additions & 1 deletion src/ray/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@ set(RAY_SRCS
object_manager/object_buffer_pool.cc
object_manager/object_store_notification_manager.cc
object_manager/object_directory.cc
object_manager/transfer_queue.cc
object_manager/object_manager.cc
raylet/monitor.cc
raylet/mock_gcs_client.cc
Expand Down
211 changes: 67 additions & 144 deletions src/ray/object_manager/object_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ namespace object_manager_protocol = ray::object_manager::protocol;
namespace ray {

ObjectManager::ObjectManager(asio::io_service &main_service,
std::unique_ptr<asio::io_service> object_manager_service,
const ObjectManagerConfig &config,
std::shared_ptr<gcs::AsyncGcsClient> gcs_client)
// TODO(hme): Eliminate knowledge of GCS.
Expand All @@ -19,13 +18,9 @@ ObjectManager::ObjectManager(asio::io_service &main_service,
// an object prematurely whenever we reach the maximum number of sends.
buffer_pool_(config_.store_socket_name, config_.object_chunk_size,
/*release_delay=*/2 * config_.max_sends),
object_manager_service_(std::move(object_manager_service)),
work_(*object_manager_service_),
connection_pool_(),
transfer_queue_(),
num_transfers_send_(0),
num_transfers_receive_(0),
num_threads_(config_.max_sends + config_.max_receives) {
send_work_(send_service_),
receive_work_(receive_service_),
connection_pool_() {
RAY_CHECK(config_.max_sends > 0);
RAY_CHECK(config_.max_receives > 0);
main_service_ = &main_service;
Expand All @@ -37,7 +32,6 @@ ObjectManager::ObjectManager(asio::io_service &main_service,
}

ObjectManager::ObjectManager(asio::io_service &main_service,
std::unique_ptr<asio::io_service> object_manager_service,
const ObjectManagerConfig &config,
std::unique_ptr<ObjectDirectoryInterface> od)
: config_(config),
Expand All @@ -47,13 +41,9 @@ ObjectManager::ObjectManager(asio::io_service &main_service,
// an object prematurely whenever we reach the maximum number of sends.
buffer_pool_(config_.store_socket_name, config_.object_chunk_size,
/*release_delay=*/2 * config_.max_sends),
object_manager_service_(std::move(object_manager_service)),
work_(*object_manager_service_),
connection_pool_(),
transfer_queue_(),
num_transfers_send_(0),
num_transfers_receive_(0),
num_threads_(config_.max_sends + config_.max_receives) {
send_work_(send_service_),
receive_work_(receive_service_),
connection_pool_() {
RAY_CHECK(config_.max_sends > 0);
RAY_CHECK(config_.max_receives > 0);
// TODO(hme) Client ID is never set with this constructor.
Expand All @@ -68,17 +58,26 @@ ObjectManager::ObjectManager(asio::io_service &main_service,
ObjectManager::~ObjectManager() { StopIOService(); }

void ObjectManager::StartIOService() {
for (int i = 0; i < num_threads_; ++i) {
io_threads_.emplace_back(std::thread(&ObjectManager::IOServiceLoop, this));
for (int i = 0; i < config_.max_sends; ++i) {
send_threads_.emplace_back(std::thread(&ObjectManager::RunSendService, this));
}
for (int i = 0; i < config_.max_receives; ++i) {
receive_threads_.emplace_back(std::thread(&ObjectManager::RunReceiveService, this));
}
}

void ObjectManager::IOServiceLoop() { object_manager_service_->run(); }
void ObjectManager::RunSendService() { send_service_.run(); }

void ObjectManager::RunReceiveService() { receive_service_.run(); }

void ObjectManager::StopIOService() {
object_manager_service_->stop();
for (int i = 0; i < num_threads_; ++i) {
io_threads_[i].join();
send_service_.stop();
for (int i = 0; i < config_.max_sends; ++i) {
send_threads_[i].join();
}
receive_service_.stop();
for (int i = 0; i < config_.max_receives; ++i) {
receive_threads_[i].join();
}
}

Expand Down Expand Up @@ -107,9 +106,7 @@ ray::Status ObjectManager::SubscribeObjDeleted(
}

ray::Status ObjectManager::Pull(const ObjectID &object_id) {
main_service_->dispatch(
[this, object_id]() { RAY_CHECK_OK(PullGetLocations(object_id)); });
return Status::OK();
return PullGetLocations(object_id);
}

void ObjectManager::SchedulePull(const ObjectID &object_id, int wait_ms) {
Expand All @@ -118,8 +115,7 @@ void ObjectManager::SchedulePull(const ObjectID &object_id, int wait_ms) {
pull_requests_[object_id]->async_wait(
[this, object_id](const boost::system::error_code &error_code) {
pull_requests_.erase(object_id);
main_service_->dispatch(
[this, object_id]() { RAY_CHECK_OK(PullGetLocations(object_id)); });
RAY_CHECK_OK(PullGetLocations(object_id));
});
}

Expand All @@ -137,7 +133,6 @@ void ObjectManager::GetLocationsSuccess(const std::vector<ray::ClientID> &client
const ray::ObjectID &object_id) {
RAY_CHECK(!client_ids.empty());
ClientID client_id = client_ids.front();
pull_requests_.erase(object_id);
ray::Status status_code = Pull(object_id, client_id);
}

Expand All @@ -146,10 +141,7 @@ void ObjectManager::GetLocationsFailed(const ObjectID &object_id) {
}

ray::Status ObjectManager::Pull(const ObjectID &object_id, const ClientID &client_id) {
main_service_->dispatch([this, object_id, client_id]() {
RAY_CHECK_OK(PullEstablishConnection(object_id, client_id));
});
return Status::OK();
return PullEstablishConnection(object_id, client_id);
};

ray::Status ObjectManager::PullEstablishConnection(const ObjectID &object_id,
Expand Down Expand Up @@ -212,95 +204,34 @@ ray::Status ObjectManager::Push(const ObjectID &object_id, const ClientID &clien
return ray::Status::OK();
}

main_service_->dispatch([this, object_id, client_id]() {
// TODO(hme): Cache this data in ObjectDirectory.
// Okay for now since the GCS client caches this data.
Status status = object_directory_->GetInformation(
client_id,
[this, object_id, client_id](const RemoteConnectionInfo &info) {
ObjectInfoT object_info = local_objects_[object_id];
uint64_t data_size =
static_cast<uint64_t>(object_info.data_size + object_info.metadata_size);
uint64_t metadata_size = static_cast<uint64_t>(object_info.metadata_size);
uint64_t num_chunks = buffer_pool_.GetNumChunks(data_size);
for (uint64_t chunk_index = 0; chunk_index < num_chunks; ++chunk_index) {
transfer_queue_.QueueSend(client_id, object_id, data_size, metadata_size,
chunk_index, info);
}
RAY_CHECK_OK(DequeueTransfers());
},
[](const Status &status) {
// Push is best effort, so do nothing here.
});
RAY_CHECK_OK(status);
});
return ray::Status::OK();
}

ray::Status ObjectManager::DequeueTransfers() {
ray::Status status = ray::Status::OK();
// Dequeue sends.
while (true) {
int num_transfers_send = std::atomic_fetch_add(&num_transfers_send_, 1);
if (num_transfers_send < config_.max_sends) {
TransferQueue::SendRequest req;
bool exists = transfer_queue_.DequeueSendIfPresent(&req);
if (exists) {
object_manager_service_->dispatch([this, req]() {
RAY_LOG(DEBUG) << "DequeueSend " << client_id_ << " " << req.object_id << " "
<< num_transfers_send_ << "/" << config_.max_sends;
RAY_CHECK_OK(ExecuteSendObject(req.client_id, req.object_id, req.data_size,
req.metadata_size, req.chunk_index,
req.connection_info));
});
} else {
std::atomic_fetch_sub(&num_transfers_send_, 1);
break;
}
} else {
std::atomic_fetch_sub(&num_transfers_send_, 1);
break;
}
}
// Dequeue receives.
while (true) {
int num_transfers_receive = std::atomic_fetch_add(&num_transfers_receive_, 1);
if (num_transfers_receive < config_.max_receives) {
TransferQueue::ReceiveRequest req;
bool exists = transfer_queue_.DequeueReceiveIfPresent(&req);
if (exists) {
object_manager_service_->dispatch([this, req]() {
RAY_LOG(DEBUG) << "DequeueReceive " << client_id_ << " " << req.object_id << " "
<< num_transfers_receive_ << "/" << config_.max_receives;
RAY_CHECK_OK(ExecuteReceiveObject(req.client_id, req.object_id, req.data_size,
req.metadata_size, req.chunk_index,
req.conn));
});
} else {
std::atomic_fetch_sub(&num_transfers_receive_, 1);
break;
}
} else {
std::atomic_fetch_sub(&num_transfers_receive_, 1);
break;
}
}
// TODO(hme): Cache this data in ObjectDirectory.
// Okay for now since the GCS client caches this data.
Status status = object_directory_->GetInformation(
client_id,
[this, object_id, client_id](const RemoteConnectionInfo &info) {
ObjectInfoT object_info = local_objects_[object_id];
uint64_t data_size =
static_cast<uint64_t>(object_info.data_size + object_info.metadata_size);
uint64_t metadata_size = static_cast<uint64_t>(object_info.metadata_size);
uint64_t num_chunks = buffer_pool_.GetNumChunks(data_size);
for (uint64_t chunk_index = 0; chunk_index < num_chunks; ++chunk_index) {
send_service_.post([this, client_id, object_id, data_size, metadata_size,
chunk_index, info]() {
ExecuteSendObject(client_id, object_id, data_size, metadata_size, chunk_index,
info);
});
}
},
[](const Status &status) {
// Push is best effort, so do nothing here.
});
return status;
}

ray::Status ObjectManager::TransferCompleted(TransferQueue::TransferType type) {
if (type == TransferQueue::TransferType::SEND) {
std::atomic_fetch_sub(&num_transfers_send_, 1);
} else {
std::atomic_fetch_sub(&num_transfers_receive_, 1);
}
return DequeueTransfers();
};

ray::Status ObjectManager::ExecuteSendObject(
const ClientID &client_id, const ObjectID &object_id, uint64_t data_size,
uint64_t metadata_size, uint64_t chunk_index,
const RemoteConnectionInfo &connection_info) {
void ObjectManager::ExecuteSendObject(const ClientID &client_id,
const ObjectID &object_id, uint64_t data_size,
uint64_t metadata_size, uint64_t chunk_index,
const RemoteConnectionInfo &connection_info) {
RAY_LOG(DEBUG) << "ExecuteSendObject " << client_id << " " << object_id << " "
<< chunk_index;
ray::Status status;
Expand All @@ -314,7 +245,7 @@ ray::Status ObjectManager::ExecuteSendObject(
conn);
}
status = SendObjectHeaders(object_id, data_size, metadata_size, chunk_index, conn);
return Status::OK();
RAY_CHECK_OK(status);
}

ray::Status ObjectManager::SendObjectHeaders(const ObjectID &object_id,
Expand All @@ -325,14 +256,11 @@ ray::Status ObjectManager::SendObjectHeaders(const ObjectID &object_id,
buffer_pool_.GetChunk(object_id, data_size, metadata_size, chunk_index);
ObjectBufferPool::ChunkInfo chunk_info = chunk_status.first;

if (!chunk_status.second.ok()) {
// This is the first thread to invoke GetChunk => Get failed on the
// plasma client.
// No reference is acquired for this chunk, so no need to release the chunk.
// TODO(hme): Retry send here? If so, store RemoteConnectionInfo in SenderConnection.
RAY_CHECK_OK(TransferCompleted(TransferQueue::TransferType::SEND));
return chunk_status.second;
}
// If status is not okay, then return immediately because
// plasma_client.Get failed.
// No reference is acquired for this chunk, so no need to release the chunk.
RAY_RETURN_NOT_OK(chunk_status.second);

// Create buffer.
flatbuffers::FlatBufferBuilder fbb;
// TODO(hme): use to_flatbuf
Expand All @@ -349,7 +277,6 @@ ray::Status ObjectManager::SendObjectHeaders(const ObjectID &object_id,
ray::Status ObjectManager::SendObjectData(const ObjectID &object_id,
const ObjectBufferPool::ChunkInfo &chunk_info,
std::shared_ptr<SenderConnection> conn) {
// TransferQueue::SendContext context = transfer_queue_.GetContext(context_id);
boost::system::error_code ec;
std::vector<asio::const_buffer> buffer;
buffer.push_back(asio::buffer(chunk_info.data, chunk_info.buffer_length));
Expand All @@ -367,8 +294,7 @@ ray::Status ObjectManager::SendObjectData(const ObjectID &object_id,
RAY_CHECK_OK(
connection_pool_.ReleaseSender(ConnectionPool::ConnectionType::TRANSFER, conn));
RAY_LOG(DEBUG) << "SendCompleted " << client_id_ << " " << object_id << " "
<< num_transfers_send_ << "/" << config_.max_sends;
RAY_CHECK_OK(TransferCompleted(TransferQueue::TransferType::SEND));
<< config_.max_sends;
return status;
}

Expand All @@ -387,8 +313,8 @@ ray::Status ObjectManager::Wait(const std::vector<ObjectID> &object_ids,

std::shared_ptr<SenderConnection> ObjectManager::CreateSenderConnection(
ConnectionPool::ConnectionType type, RemoteConnectionInfo info) {
std::shared_ptr<SenderConnection> conn = SenderConnection::Create(
*object_manager_service_, info.client_id, info.ip, info.port);
std::shared_ptr<SenderConnection> conn =
SenderConnection::Create(*main_service_, info.client_id, info.ip, info.port);
// Prepare client connection info buffer
flatbuffers::FlatBufferBuilder fbb;
bool is_transfer = (type == ConnectionPool::ConnectionType::TRANSFER);
Expand Down Expand Up @@ -472,17 +398,16 @@ void ObjectManager::ReceivePushRequest(std::shared_ptr<TcpClientConnection> conn
uint64_t chunk_index = object_header->chunk_index();
uint64_t data_size = object_header->data_size();
uint64_t metadata_size = object_header->metadata_size();
transfer_queue_.QueueReceive(conn->GetClientID(), object_id, data_size, metadata_size,
chunk_index, conn);
RAY_LOG(DEBUG) << "ReceivePushRequest " << conn->GetClientID() << " " << object_id
<< " " << chunk_index;
RAY_CHECK_OK(DequeueTransfers());
receive_service_.post([this, object_id, data_size, metadata_size, chunk_index, conn]() {
ExecuteReceiveObject(conn->GetClientID(), object_id, data_size, metadata_size,
chunk_index, conn);
});
}

ray::Status ObjectManager::ExecuteReceiveObject(
const ClientID &client_id, const ObjectID &object_id, uint64_t data_size,
uint64_t metadata_size, uint64_t chunk_index,
std::shared_ptr<TcpClientConnection> conn) {
void ObjectManager::ExecuteReceiveObject(const ClientID &client_id,
const ObjectID &object_id, uint64_t data_size,
uint64_t metadata_size, uint64_t chunk_index,
std::shared_ptr<TcpClientConnection> conn) {
RAY_LOG(DEBUG) << "ExecuteReceiveObject " << client_id << " " << object_id << " "
<< chunk_index;

Expand Down Expand Up @@ -518,9 +443,7 @@ ray::Status ObjectManager::ExecuteReceiveObject(
}
conn->ProcessMessages();
RAY_LOG(DEBUG) << "ReceiveCompleted " << client_id_ << " " << object_id << " "
<< num_transfers_receive_ << "/" << config_.max_receives;
RAY_CHECK_OK(TransferCompleted(TransferQueue::TransferType::RECEIVE));
return Status::OK();
<< "/" << config_.max_receives;
}

} // namespace ray
Loading

0 comments on commit ddfc875

Please sign in to comment.