Skip to content

Commit

Permalink
Performance fix (ray-project#2110)
Browse files Browse the repository at this point in the history
  • Loading branch information
eric-jj authored and robertnishihara committed May 21, 2018
1 parent f795173 commit eb07876
Show file tree
Hide file tree
Showing 7 changed files with 18 additions and 17 deletions.
2 changes: 1 addition & 1 deletion src/common/task.cc
Original file line number Diff line number Diff line change
Expand Up @@ -426,7 +426,7 @@ TaskExecutionSpec::TaskExecutionSpec(TaskExecutionSpec *other)
spec_ = std::unique_ptr<TaskSpec[]>(spec_copy);
}

std::vector<ObjectID> TaskExecutionSpec::ExecutionDependencies() const {
const std::vector<ObjectID> &TaskExecutionSpec::ExecutionDependencies() const {
return execution_dependencies_;
}

Expand Down
2 changes: 1 addition & 1 deletion src/common/task.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ class TaskExecutionSpec {
///
/// @return A vector of object IDs representing this task's execution
/// dependencies.
std::vector<ObjectID> ExecutionDependencies() const;
const std::vector<ObjectID> &ExecutionDependencies() const;

/// Set the task's execution dependencies.
///
Expand Down
8 changes: 4 additions & 4 deletions src/ray/object_manager/object_store_notification_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -63,25 +63,25 @@ void ObjectStoreNotificationManager::ProcessStoreNotification(
}

void ObjectStoreNotificationManager::ProcessStoreAdd(const ObjectInfoT &object_info) {
for (auto handler : add_handlers_) {
for (auto &handler : add_handlers_) {
handler(object_info);
}
}

void ObjectStoreNotificationManager::ProcessStoreRemove(const ObjectID &object_id) {
for (auto handler : rem_handlers_) {
for (auto &handler : rem_handlers_) {
handler(object_id);
}
}

void ObjectStoreNotificationManager::SubscribeObjAdded(
std::function<void(const ObjectInfoT &)> callback) {
add_handlers_.push_back(callback);
add_handlers_.push_back(std::move(callback));
}

void ObjectStoreNotificationManager::SubscribeObjDeleted(
std::function<void(const ObjectID &)> callback) {
rem_handlers_.push_back(callback);
rem_handlers_.push_back(std::move(callback));
}

} // namespace ray
10 changes: 6 additions & 4 deletions src/ray/raylet/node_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -229,13 +229,14 @@ void NodeManager::HeartbeatAdded(gcs::AsyncGcsClient *client, const ClientID &cl
}
// Locate the client id in remote client table and update available resources based on
// the received heartbeat information.
if (this->cluster_resource_map_.count(client_id) == 0) {
auto it = this->cluster_resource_map_.find(client_id);
if (it == cluster_resource_map_.end()) {
// Haven't received the client registration for this client yet, skip this heartbeat.
RAY_LOG(INFO) << "[HeartbeatAdded]: received heartbeat from unknown client id "
<< client_id;
return;
}
SchedulingResources &resources = this->cluster_resource_map_[client_id];
SchedulingResources &resources = it->second;
ResourceSet heartbeat_resource_available(heartbeat_data.resources_available_label,
heartbeat_data.resources_available_capacity);
resources.SetAvailableResources(
Expand Down Expand Up @@ -786,13 +787,14 @@ ray::Status NodeManager::ForwardTask(const Task &task, const ClientID &node_id)
auto client_info = gcs_client_->client_table().GetClient(node_id);

// Lookup remote server connection for this node_id and use it to send the request.
if (remote_server_connections_.count(node_id) == 0) {
auto it = remote_server_connections_.find(node_id);
if (it == remote_server_connections_.end()) {
// TODO(atumanov): caller must handle failure to ensure tasks are not lost.
RAY_LOG(INFO) << "No NodeManager connection found for GCS client id " << node_id;
return ray::Status::IOError("NodeManager connection not found");
}

auto &server_conn = remote_server_connections_.at(node_id);
auto &server_conn = it->second;
auto status = server_conn.WriteMessage(protocol::MessageType_ForwardTaskRequest,
fbb.GetSize(), fbb.GetBufferPointer());
if (status.ok()) {
Expand Down
2 changes: 1 addition & 1 deletion src/ray/raylet/scheduling_policy.cc
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ std::unordered_map<TaskID, ClientID> SchedulingPolicy::Schedule(
for (const auto &client_resource_pair : cluster_resources) {
// pair = ClientID, SchedulingResources
ClientID node_client_id = client_resource_pair.first;
SchedulingResources node_resources = client_resource_pair.second;
const auto &node_resources = client_resource_pair.second;
RAY_LOG(DEBUG) << "client_id " << node_client_id << " resources: "
<< node_resources.GetAvailableResources().ToString();
if (resource_demand.IsSubset(node_resources.GetTotalResources())) {
Expand Down
6 changes: 2 additions & 4 deletions src/ray/raylet/scheduling_queue.cc
Original file line number Diff line number Diff line change
Expand Up @@ -51,10 +51,8 @@ void removeTasksFromQueue(std::list<Task> &queue, std::unordered_set<TaskID> &ta
}

// Helper function to queue the given tasks to the given queue.
void queueTasks(std::list<Task> &queue, const std::vector<Task> &tasks) {
for (auto &task : tasks) {
queue.push_back(task);
}
inline void queueTasks(std::list<Task> &queue, const std::vector<Task> &tasks) {
queue.insert(queue.end(), tasks.begin(), tasks.end());
}

std::vector<Task> SchedulingQueue::RemoveTasks(std::unordered_set<TaskID> task_ids) {
Expand Down
5 changes: 3 additions & 2 deletions src/ray/raylet/worker_pool.cc
Original file line number Diff line number Diff line change
Expand Up @@ -90,8 +90,9 @@ void WorkerPool::RegisterWorker(std::shared_ptr<Worker> worker) {
auto pid = worker->Pid();
RAY_LOG(DEBUG) << "Registering worker with pid " << pid;
registered_workers_.push_back(std::move(worker));
RAY_CHECK(started_worker_pids_.count(pid) > 0);
started_worker_pids_.erase(pid);
auto it = started_worker_pids_.find(pid);
RAY_CHECK(it != started_worker_pids_.end());
started_worker_pids_.erase(it);
}

std::shared_ptr<Worker> WorkerPool::GetRegisteredWorker(
Expand Down

0 comments on commit eb07876

Please sign in to comment.