Skip to content

Commit

Permalink
Add timeout mechanism to Push function instead of retries (ray-projec…
Browse files Browse the repository at this point in the history
…t#2148)

Use timer instead of retries in Push when objects are not local.
  • Loading branch information
guoyuhong authored and elibol committed Jun 1, 2018
1 parent 74dc14d commit c1de03a
Show file tree
Hide file tree
Showing 7 changed files with 136 additions and 49 deletions.
19 changes: 11 additions & 8 deletions src/common/state/ray_config.h
Original file line number Diff line number Diff line change
Expand Up @@ -92,16 +92,16 @@ class RayConfig {
return object_manager_pull_timeout_ms_;
}

int object_manager_push_timeout_ms() const {
return object_manager_push_timeout_ms_;
}

int object_manager_max_sends() const { return object_manager_max_sends_; }

int object_manager_max_receives() const {
return object_manager_max_receives_;
}

int object_manager_max_push_retries() const {
return object_manager_max_push_retries_;
}

uint64_t object_manager_default_chunk_size() const {
return object_manager_default_chunk_size_;
}
Expand Down Expand Up @@ -139,9 +139,9 @@ class RayConfig {
// be addressed. This timeout is often on the critical path for object
// transfers.
object_manager_pull_timeout_ms_(20),
object_manager_push_timeout_ms_(10000),
object_manager_max_sends_(2),
object_manager_max_receives_(2),
object_manager_max_push_retries_(1000),
object_manager_default_chunk_size_(100000000) {}

~RayConfig() {}
Expand Down Expand Up @@ -235,15 +235,18 @@ class RayConfig {
/// ObjectManager.
int object_manager_pull_timeout_ms_;

/// Timeout, in milliseconds, to wait until the Push request fails.
/// Special value:
/// Negative: waiting infinitely.
/// 0: giving up retrying immediately.
int object_manager_push_timeout_ms_;

/// Maximum number of concurrent sends allowed by the object manager.
int object_manager_max_sends_;

/// Maximum number of concurrent receives allowed by the object manager.
int object_manager_max_receives_;

/// Maximum push retries allowed by the object manager.
int object_manager_max_push_retries_;

/// Default chunk size for multi-chunk transfers to use in the object manager.
/// In the object manager, no single thread is permitted to transfer more
/// data than what is specified by the chunk size.
Expand Down
69 changes: 56 additions & 13 deletions src/ray/object_manager/object_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ ObjectManager::ObjectManager(asio::io_service &main_service,
connection_pool_() {
RAY_CHECK(config_.max_sends > 0);
RAY_CHECK(config_.max_receives > 0);
RAY_CHECK(config_.max_push_retries > 0);
main_service_ = &main_service;
store_notification_.SubscribeObjAdded(
[this](const ObjectInfoT &object_info) { NotifyDirectoryObjectAdd(object_info); });
Expand All @@ -47,7 +46,6 @@ ObjectManager::ObjectManager(asio::io_service &main_service,
connection_pool_() {
RAY_CHECK(config_.max_sends > 0);
RAY_CHECK(config_.max_receives > 0);
RAY_CHECK(config_.max_push_retries > 0);
// TODO(hme) Client ID is never set with this constructor.
main_service_ = &main_service;
store_notification_.SubscribeObjAdded(
Expand Down Expand Up @@ -90,6 +88,21 @@ void ObjectManager::NotifyDirectoryObjectAdd(const ObjectInfoT &object_info) {
local_objects_[object_id] = object_info;
ray::Status status =
object_directory_->ReportObjectAdded(object_id, client_id_, object_info);
// Handle the unfulfilled_push_tasks_ which contains the push request that is not
// completed due to unsatisfied local objects.
auto iter = unfulfilled_push_tasks_.find(object_id);
if (iter != unfulfilled_push_tasks_.end()) {
for (auto &pair : iter->second) {
auto &client_id = pair.first;
main_service_->post(
[this, object_id, client_id]() { RAY_CHECK_OK(Push(object_id, client_id)); });
// When push timeout is set to -1, there will be an empty timer in pair.second.
if (pair.second != nullptr) {
pair.second->cancel();
}
}
unfulfilled_push_tasks_.erase(iter);
}
}

void ObjectManager::NotifyDirectoryObjectDeleted(const ObjectID &object_id) {
Expand Down Expand Up @@ -196,19 +209,49 @@ ray::Status ObjectManager::PullSendRequest(const ObjectID &object_id,
return ray::Status::OK();
}

ray::Status ObjectManager::Push(const ObjectID &object_id, const ClientID &client_id,
int retry) {
void ObjectManager::HandlePushTaskTimeout(const ObjectID &object_id,
const ClientID &client_id) {
RAY_LOG(WARNING) << "Invalid Push request ObjectID: " << object_id
<< " after waiting for " << config_.push_timeout_ms << " ms.";
auto iter = unfulfilled_push_tasks_.find(object_id);
RAY_CHECK(iter != unfulfilled_push_tasks_.end());
uint num_erased = iter->second.erase(client_id);
RAY_CHECK(num_erased == 1);
if (iter->second.size() == 0) {
unfulfilled_push_tasks_.erase(iter);
}
}

ray::Status ObjectManager::Push(const ObjectID &object_id, const ClientID &client_id) {
if (local_objects_.count(object_id) == 0) {
if (retry < 0) {
retry = config_.max_push_retries;
} else if (retry == 0) {
RAY_LOG(ERROR) << "Invalid Push request ObjectID: " << object_id
<< " after retrying " << config_.max_push_retries << " times.";
return ray::Status::OK();
// Avoid setting duplicated timer for the same object and client pair.
auto &clients = unfulfilled_push_tasks_[object_id];
if (clients.count(client_id) == 0) {
// If config_.push_timeout_ms < 0, we give an empty timer
// and the task will be kept infinitely.
auto timer = std::unique_ptr<boost::asio::deadline_timer>();
if (config_.push_timeout_ms == 0) {
// The Push request fails directly when config_.push_timeout_ms == 0.
RAY_LOG(WARNING) << "Invalid Push request ObjectID " << object_id
<< " due to direct timeout setting. ";
} else if (config_.push_timeout_ms > 0) {
// Put the task into a queue and wait for the notification of Object added.
timer.reset(new boost::asio::deadline_timer(*main_service_));
auto clean_push_period = boost::posix_time::milliseconds(config_.push_timeout_ms);
timer->expires_from_now(clean_push_period);
timer->async_wait(
[this, object_id, client_id](const boost::system::error_code &error) {
// Timer killing will receive the boost::asio::error::operation_aborted,
// we only handle the timeout event.
if (!error) {
HandlePushTaskTimeout(object_id, client_id);
}
});
}
if (config_.push_timeout_ms != 0) {
clients.emplace(client_id, std::move(timer));
}
}
main_service_->post([this, object_id, client_id, retry]() {
RAY_CHECK_OK(Push(object_id, client_id, retry - 1));
});
return ray::Status::OK();
}

Expand Down
21 changes: 16 additions & 5 deletions src/ray/object_manager/object_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ namespace ray {

struct ObjectManagerConfig {
/// The time in milliseconds to wait before retrying a pull
/// that failed due to client id lookup.
/// that fails due to client id lookup.
uint pull_timeout_ms;
/// Maximum number of sends allowed.
int max_sends;
Expand All @@ -41,8 +41,11 @@ struct ObjectManagerConfig {
uint64_t object_chunk_size;
/// The stored socked name.
std::string store_socket_name;
/// Maximun number of push retries.
int max_push_retries;
/// The time in milliseconds to wait until a Push request
/// fails due to unsatisfied local object. Special value:
/// Negative: waiting infinitely.
/// 0: giving up retrying immediately.
int push_timeout_ms;
};

class ObjectManagerInterface {
Expand Down Expand Up @@ -99,9 +102,8 @@ class ObjectManager : public ObjectManagerInterface {
///
/// \param object_id The object's object id.
/// \param client_id The remote node's client id.
/// \param retry The count down retries, -1 means the default maximum retries.
/// \return Status of whether the push request successfully initiated.
ray::Status Push(const ObjectID &object_id, const ClientID &client_id, int retry = -1);
ray::Status Push(const ObjectID &object_id, const ClientID &client_id);

/// Pull an object from ClientID. Returns UniqueID asociated with
/// an invocation of this method.
Expand Down Expand Up @@ -194,6 +196,13 @@ class ObjectManager : public ObjectManagerInterface {
/// Cache of locally available objects.
std::unordered_map<ObjectID, ObjectInfoT> local_objects_;

/// Unfulfilled Push tasks.
/// The timer is for removing a push task due to unsatisfied local object.
std::unordered_map<
ObjectID,
std::unordered_map<ClientID, std::unique_ptr<boost::asio::deadline_timer>>>
unfulfilled_push_tasks_;

/// Handle starting, running, and stopping asio io_service.
void StartIOService();
void RunSendService();
Expand Down Expand Up @@ -261,6 +270,8 @@ class ObjectManager : public ObjectManagerInterface {
/// Handles disconnect message of an existing client connection.
void DisconnectClient(std::shared_ptr<TcpClientConnection> &conn,
const uint8_t *message);
/// Handle Push task timeout.
void HandlePushTaskTimeout(const ObjectID &object_id, const ClientID &client_id);
};

} // namespace ray
Expand Down
6 changes: 3 additions & 3 deletions src/ray/object_manager/test/object_manager_stress_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ class TestObjectManagerBase : public ::testing::Test {
int max_sends = 2;
int max_receives = 2;
uint64_t object_chunk_size = static_cast<uint64_t>(std::pow(10, 3));
int max_push_retries = 1000;
int push_timeout_ms = 10000;

// start first server
gcs_client_1 = std::shared_ptr<gcs::AsyncGcsClient>(new gcs::AsyncGcsClient());
Expand All @@ -134,7 +134,7 @@ class TestObjectManagerBase : public ::testing::Test {
om_config_1.max_sends = max_sends;
om_config_1.max_receives = max_receives;
om_config_1.object_chunk_size = object_chunk_size;
om_config_1.max_push_retries = max_push_retries;
om_config_1.push_timeout_ms = push_timeout_ms;
server1.reset(new MockServer(main_service, om_config_1, gcs_client_1));

// start second server
Expand All @@ -145,7 +145,7 @@ class TestObjectManagerBase : public ::testing::Test {
om_config_2.max_sends = max_sends;
om_config_2.max_receives = max_receives;
om_config_2.object_chunk_size = object_chunk_size;
om_config_2.max_push_retries = max_push_retries;
om_config_2.push_timeout_ms = push_timeout_ms;
server2.reset(new MockServer(main_service, om_config_2, gcs_client_2));

// connect to stores.
Expand Down
62 changes: 46 additions & 16 deletions src/ray/object_manager/test/object_manager_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ class TestObjectManager : public ::testing::Test {
int max_sends = 2;
int max_receives = 2;
uint64_t object_chunk_size = static_cast<uint64_t>(std::pow(10, 3));
int max_push_retries = 1000;
push_timeout_ms = 1000;

// start first server
gcs_client_1 = std::shared_ptr<gcs::AsyncGcsClient>(new gcs::AsyncGcsClient());
Expand All @@ -124,7 +124,8 @@ class TestObjectManager : public ::testing::Test {
om_config_1.max_sends = max_sends;
om_config_1.max_receives = max_receives;
om_config_1.object_chunk_size = object_chunk_size;
om_config_1.max_push_retries = max_push_retries;
// Push will stop immediately if local object is not satisfied.
om_config_1.push_timeout_ms = push_timeout_ms;
server1.reset(new MockServer(main_service, om_config_1, gcs_client_1));

// start second server
Expand All @@ -135,7 +136,8 @@ class TestObjectManager : public ::testing::Test {
om_config_2.max_sends = max_sends;
om_config_2.max_receives = max_receives;
om_config_2.object_chunk_size = object_chunk_size;
om_config_2.max_push_retries = max_push_retries;
// Push will wait infinitely until local object is satisfied.
om_config_2.push_timeout_ms = push_timeout_ms;
server2.reset(new MockServer(main_service, om_config_2, gcs_client_2));

// connect to stores.
Expand All @@ -155,8 +157,8 @@ class TestObjectManager : public ::testing::Test {
StopStore(store_id_2);
}

ObjectID WriteDataToClient(plasma::PlasmaClient &client, int64_t data_size) {
ObjectID object_id = ObjectID::from_random();
ObjectID WriteDataToClient(plasma::PlasmaClient &client, int64_t data_size,
ObjectID object_id) {
RAY_LOG(DEBUG) << "ObjectID Created: " << object_id;
uint8_t metadata[] = {5};
int64_t metadata_size = sizeof(metadata);
Expand Down Expand Up @@ -186,16 +188,20 @@ class TestObjectManager : public ::testing::Test {

std::string store_id_1;
std::string store_id_2;

uint push_timeout_ms;
};

class TestObjectManagerCommands : public TestObjectManager {
public:
int num_connected_clients = 0;
uint num_expected_objects;
ClientID client_id_1;
ClientID client_id_2;

ObjectID created_object_id;
ObjectID created_object_id1;
ObjectID created_object_id2;

std::unique_ptr<boost::asio::deadline_timer> timer;

void WaitConnections() {
client_id_1 = gcs_client_1->client_table().GetLocalClientId();
Expand All @@ -222,21 +228,45 @@ class TestObjectManagerCommands : public TestObjectManager {
status = server1->object_manager_.SubscribeObjAdded(
[this](const ObjectInfoT &object_info) {
object_added_handler_1(ObjectID::from_binary(object_info.object_id));
if (v1.size() == num_expected_objects) {
NotificationTestComplete(created_object_id,
ObjectID::from_binary(object_info.object_id));
}
NotificationTestCompleteIfSatisfied();
});
RAY_CHECK_OK(status);
status = server2->object_manager_.SubscribeObjAdded(
[this](const ObjectInfoT &object_info) {
object_added_handler_2(ObjectID::from_binary(object_info.object_id));
NotificationTestCompleteIfSatisfied();
});
RAY_CHECK_OK(status);

num_expected_objects = 1;
uint data_size = 1000000;
created_object_id = WriteDataToClient(client1, data_size);

// dummy_id is not local. The push function will timeout.
ObjectID dummy_id = ObjectID::from_random();
status = server1->object_manager_.Push(
dummy_id, gcs_client_2->client_table().GetLocalClientId());

created_object_id1 = ObjectID::from_random();
WriteDataToClient(client1, data_size, created_object_id1);
// Server1 holds Object1 so this Push call will success.
status = server1->object_manager_.Push(
created_object_id1, gcs_client_2->client_table().GetLocalClientId());

// This timer is used to guarantee that the Push function for dummy_id will timeout.
timer.reset(new boost::asio::deadline_timer(main_service));
auto period = boost::posix_time::milliseconds(push_timeout_ms + 10);
timer->expires_from_now(period);
created_object_id2 = ObjectID::from_random();
timer->async_wait([this, data_size](const boost::system::error_code &error) {
WriteDataToClient(client2, data_size, created_object_id2);
});
}

void NotificationTestComplete(ObjectID object_id_1, ObjectID object_id_2) {
ASSERT_EQ(object_id_1, object_id_2);
main_service.stop();
void NotificationTestCompleteIfSatisfied() {
uint num_expected_objects1 = 1;
uint num_expected_objects2 = 2;
if (v1.size() == num_expected_objects1 && v2.size() == num_expected_objects2) {
main_service.stop();
}
}

void TestConnections() {
Expand Down
4 changes: 2 additions & 2 deletions src/ray/raylet/main.cc
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,8 @@ int main(int argc, char *argv[]) {
object_manager_config.max_sends = RayConfig::instance().object_manager_max_sends();
object_manager_config.max_receives =
RayConfig::instance().object_manager_max_receives();
object_manager_config.max_push_retries =
RayConfig::instance().object_manager_max_push_retries();
object_manager_config.push_timeout_ms =
RayConfig::instance().object_manager_push_timeout_ms();
object_manager_config.object_chunk_size =
RayConfig::instance().object_manager_default_chunk_size();

Expand Down
4 changes: 2 additions & 2 deletions src/ray/raylet/object_manager_integration_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ class TestObjectManagerBase : public ::testing::Test {
gcs_client_1 = std::shared_ptr<gcs::AsyncGcsClient>(new gcs::AsyncGcsClient());
ObjectManagerConfig om_config_1;
om_config_1.store_socket_name = store_sock_1;
om_config_1.max_push_retries = 1000;
om_config_1.push_timeout_ms = 10000;
server1.reset(new ray::raylet::Raylet(
main_service, "raylet_1", "0.0.0.0", "127.0.0.1", 6379,
GetNodeManagerConfig("raylet_1", store_sock_1), om_config_1, gcs_client_1));
Expand All @@ -64,7 +64,7 @@ class TestObjectManagerBase : public ::testing::Test {
gcs_client_2 = std::shared_ptr<gcs::AsyncGcsClient>(new gcs::AsyncGcsClient());
ObjectManagerConfig om_config_2;
om_config_2.store_socket_name = store_sock_2;
om_config_2.max_push_retries = 1000;
om_config_2.push_timeout_ms = 10000;
server2.reset(new ray::raylet::Raylet(
main_service, "raylet_2", "0.0.0.0", "127.0.0.1", 6379,
GetNodeManagerConfig("raylet_2", store_sock_2), om_config_2, gcs_client_2));
Expand Down

0 comments on commit c1de03a

Please sign in to comment.