Skip to content

Commit

Permalink
Batch heartbeats from node manager together in the monitor. (ray-proj…
Browse files Browse the repository at this point in the history
  • Loading branch information
ujvl authored and robertnishihara committed Nov 20, 2018
1 parent abdc3b5 commit b0bfd10
Show file tree
Hide file tree
Showing 13 changed files with 134 additions and 36 deletions.
5 changes: 5 additions & 0 deletions src/ray/gcs/client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@ AsyncGcsClient::AsyncGcsClient(const std::string &address, int port,
client_table_.reset(new ClientTable({primary_context_}, this, client_id));
error_table_.reset(new ErrorTable({primary_context_}, this));
driver_table_.reset(new DriverTable({primary_context_}, this));
heartbeat_batch_table_.reset(new HeartbeatBatchTable({primary_context_}, this));
// Tables below would be sharded.
object_table_.reset(new ObjectTable(shard_contexts_, this, command_type));
actor_table_.reset(new ActorTable(shard_contexts_, this));
Expand Down Expand Up @@ -214,6 +215,10 @@ ClassTable &AsyncGcsClient::class_table() { return *class_table_; }

HeartbeatTable &AsyncGcsClient::heartbeat_table() { return *heartbeat_table_; }

HeartbeatBatchTable &AsyncGcsClient::heartbeat_batch_table() {
return *heartbeat_batch_table_;
}

ErrorTable &AsyncGcsClient::error_table() { return *error_table_; }

DriverTable &AsyncGcsClient::driver_table() { return *driver_table_; }
Expand Down
2 changes: 2 additions & 0 deletions src/ray/gcs/client.h
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ class RAY_EXPORT AsyncGcsClient {
TaskLeaseTable &task_lease_table();
ClientTable &client_table();
HeartbeatTable &heartbeat_table();
HeartbeatBatchTable &heartbeat_batch_table();
ErrorTable &error_table();
DriverTable &driver_table();
ProfileTable &profile_table();
Expand Down Expand Up @@ -89,6 +90,7 @@ class RAY_EXPORT AsyncGcsClient {
std::unique_ptr<TaskReconstructionLog> task_reconstruction_log_;
std::unique_ptr<TaskLeaseTable> task_lease_table_;
std::unique_ptr<HeartbeatTable> heartbeat_table_;
std::unique_ptr<HeartbeatBatchTable> heartbeat_batch_table_;
std::unique_ptr<ErrorTable> error_table_;
std::unique_ptr<ProfileTable> profile_table_;
std::unique_ptr<ClientTable> client_table_;
Expand Down
6 changes: 6 additions & 0 deletions src/ray/gcs/format/gcs.fbs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ enum TablePrefix:int {
FUNCTION,
TASK_RECONSTRUCTION,
HEARTBEAT,
HEARTBEAT_BATCH,
ERROR_INFO,
DRIVER,
PROFILE,
Expand All @@ -30,6 +31,7 @@ enum TablePubsub:int {
OBJECT,
ACTOR,
HEARTBEAT,
HEARTBEAT_BATCH,
ERROR_INFO,
TASK_LEASE,
DRIVER,
Expand Down Expand Up @@ -262,6 +264,10 @@ table HeartbeatTableData {
resource_load_capacity: [double];
}

table HeartbeatBatchTableData {
batch: [HeartbeatTableData];
}

// Data for a lease on task execution.
table TaskLeaseData {
// Node manager client ID.
Expand Down
1 change: 1 addition & 0 deletions src/ray/gcs/tables.cc
Original file line number Diff line number Diff line change
Expand Up @@ -479,6 +479,7 @@ template class Log<ActorID, ActorTableData>;
template class Log<TaskID, TaskReconstructionData>;
template class Table<TaskID, TaskLeaseData>;
template class Table<ClientID, HeartbeatTableData>;
template class Table<ClientID, HeartbeatBatchTableData>;
template class Log<JobID, ErrorTableData>;
template class Log<UniqueID, ClientTableData>;
template class Log<JobID, DriverTableData>;
Expand Down
12 changes: 12 additions & 0 deletions src/ray/gcs/tables.h
Original file line number Diff line number Diff line change
Expand Up @@ -351,6 +351,17 @@ class HeartbeatTable : public Table<ClientID, HeartbeatTableData> {
virtual ~HeartbeatTable() {}
};

class HeartbeatBatchTable : public Table<ClientID, HeartbeatBatchTableData> {
public:
HeartbeatBatchTable(const std::vector<std::shared_ptr<RedisContext>> &contexts,
AsyncGcsClient *client)
: Table(contexts, client) {
pubsub_channel_ = TablePubsub::HEARTBEAT_BATCH;
prefix_ = TablePrefix::HEARTBEAT_BATCH;
}
virtual ~HeartbeatBatchTable() {}
};

class DriverTable : public Log<JobID, DriverTableData> {
public:
DriverTable(const std::vector<std::shared_ptr<RedisContext>> &contexts,
Expand All @@ -359,6 +370,7 @@ class DriverTable : public Log<JobID, DriverTableData> {
pubsub_channel_ = TablePubsub::DRIVER;
prefix_ = TablePrefix::DRIVER;
};

virtual ~DriverTable() {}

/// Appends driver data to the driver table.
Expand Down
18 changes: 16 additions & 2 deletions src/ray/raylet/monitor.cc
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,16 @@ Monitor::Monitor(boost::asio::io_service &io_service, const std::string &redis_a
RAY_CHECK_OK(gcs_client_.Attach(io_service));
}

void Monitor::HandleHeartbeat(const ClientID &client_id) {
void Monitor::HandleHeartbeat(const ClientID &client_id,
const HeartbeatTableDataT &heartbeat_data) {
heartbeats_[client_id] = num_heartbeats_timeout_;
heartbeat_buffer_[client_id] = heartbeat_data;
}

void Monitor::Start() {
const auto heartbeat_callback = [this](gcs::AsyncGcsClient *client, const ClientID &id,
const HeartbeatTableDataT &heartbeat_data) {
HandleHeartbeat(id);
HandleHeartbeat(id, heartbeat_data);
};
RAY_CHECK_OK(gcs_client_.heartbeat_table().Subscribe(
UniqueID::nil(), UniqueID::nil(), heartbeat_callback, nullptr, nullptr));
Expand Down Expand Up @@ -66,6 +68,18 @@ void Monitor::Tick() {
}
}

// Send any buffered heartbeats as a single publish.
if (!heartbeat_buffer_.empty()) {
auto batch = std::make_shared<HeartbeatBatchTableDataT>();
for (const auto &heartbeat : heartbeat_buffer_) {
batch->batch.push_back(std::unique_ptr<HeartbeatTableDataT>(
new HeartbeatTableDataT(heartbeat.second)));
}
RAY_CHECK_OK(gcs_client_.heartbeat_batch_table().Add(UniqueID::nil(), UniqueID::nil(),
batch, nullptr));
heartbeat_buffer_.clear();
}

auto heartbeat_period = boost::posix_time::milliseconds(
RayConfig::instance().heartbeat_timeout_milliseconds());
heartbeat_timer_.expires_from_now(heartbeat_period);
Expand Down
6 changes: 5 additions & 1 deletion src/ray/raylet/monitor.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,9 @@ class Monitor {
/// Handle a heartbeat from a Raylet.
///
/// \param client_id The client ID of the Raylet that sent the heartbeat.
void HandleHeartbeat(const ClientID &client_id);
/// \param heartbeat_data The heartbeat sent by the client.
void HandleHeartbeat(const ClientID &client_id,
const HeartbeatTableDataT &heartbeat_data);

private:
/// A client to the GCS, through which heartbeats are received.
Expand All @@ -47,6 +49,8 @@ class Monitor {
std::unordered_map<ClientID, int64_t> heartbeats_;
/// The Raylets that have been marked as dead in the client table.
std::unordered_set<ClientID> dead_clients_;
/// A buffer containing heartbeats received from node managers in the last tick.
std::unordered_map<ClientID, HeartbeatTableDataT> heartbeat_buffer_;
};

} // namespace raylet
Expand Down
38 changes: 23 additions & 15 deletions src/ray/raylet/node_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -156,15 +156,16 @@ ray::Status NodeManager::RegisterGcs() {
};
gcs_client_->client_table().RegisterClientRemovedCallback(node_manager_client_removed);

// Subscribe to node manager heartbeats.
const auto heartbeat_added = [this](gcs::AsyncGcsClient *client, const ClientID &id,
const HeartbeatTableDataT &heartbeat_data) {
HeartbeatAdded(client, id, heartbeat_data);
// Subscribe to heartbeat batches from the monitor.
const auto &heartbeat_batch_added = [this](
gcs::AsyncGcsClient *client, const ClientID &id,
const HeartbeatBatchTableDataT &heartbeat_batch) {
HeartbeatBatchAdded(heartbeat_batch);
};
RAY_RETURN_NOT_OK(gcs_client_->heartbeat_table().Subscribe(
UniqueID::nil(), UniqueID::nil(), heartbeat_added, nullptr,
RAY_RETURN_NOT_OK(gcs_client_->heartbeat_batch_table().Subscribe(
UniqueID::nil(), UniqueID::nil(), heartbeat_batch_added, nullptr,
[](gcs::AsyncGcsClient *client) {
RAY_LOG(DEBUG) << "heartbeat table subscription done callback called.";
RAY_LOG(DEBUG) << "Heartbeat batch table subscription done.";
}));

// Subscribe to driver table updates.
Expand Down Expand Up @@ -399,14 +400,9 @@ void NodeManager::ClientRemoved(const ClientTableDataT &client_data) {
remote_server_connections_.erase(client_id);
}

void NodeManager::HeartbeatAdded(gcs::AsyncGcsClient *client, const ClientID &client_id,
void NodeManager::HeartbeatAdded(const ClientID &client_id,
const HeartbeatTableDataT &heartbeat_data) {
RAY_LOG(DEBUG) << "[HeartbeatAdded]: received heartbeat from client id " << client_id;
const ClientID &local_client_id = gcs_client_->client_table().GetLocalClientId();
if (client_id == local_client_id) {
// Skip heartbeats from self.
return;
}
// Locate the client id in remote client table and update available resources based on
// the received heartbeat information.
auto it = cluster_resource_map_.find(client_id);
Expand All @@ -427,9 +423,8 @@ void NodeManager::HeartbeatAdded(gcs::AsyncGcsClient *client, const ClientID &cl
remote_resources.SetAvailableResources(std::move(remote_available));
// Extract the load information and save it locally.
remote_resources.SetLoadResources(std::move(remote_load));

auto decision = scheduling_policy_.SpillOver(remote_resources);
// Extract decision for this local scheduler.
auto decision = scheduling_policy_.SpillOver(remote_resources);
std::unordered_set<TaskID> local_task_ids;
for (const auto &task_id : decision) {
// (See design_docs/task_states.rst for the state transition diagram.)
Expand All @@ -448,6 +443,19 @@ void NodeManager::HeartbeatAdded(gcs::AsyncGcsClient *client, const ClientID &cl
}
}

void NodeManager::HeartbeatBatchAdded(const HeartbeatBatchTableDataT &heartbeat_batch) {
const ClientID &local_client_id = gcs_client_->client_table().GetLocalClientId();
// Update load information provided by each heartbeat.
for (const auto &heartbeat_data : heartbeat_batch.batch) {
const ClientID &client_id = ClientID::from_binary(heartbeat_data->client_id);
if (client_id == local_client_id) {
// Skip heartbeats from self.
continue;
}
HeartbeatAdded(client_id, *heartbeat_data);
}
}

void NodeManager::HandleActorCreation(const ActorID &actor_id,
const std::vector<ActorTableDataT> &data) {
RAY_LOG(DEBUG) << "Actor creation notification received: " << actor_id;
Expand Down
8 changes: 5 additions & 3 deletions src/ray/raylet/node_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -127,12 +127,14 @@ class NodeManager {

/// Handler for a heartbeat notification from the GCS.
///
/// \param client The GCS client.
/// \param id The ID of the node manager that sent the heartbeat.
/// \param data The heartbeat data including load information.
/// \return Void.
void HeartbeatAdded(gcs::AsyncGcsClient *client, const ClientID &id,
const HeartbeatTableDataT &data);
void HeartbeatAdded(const ClientID &id, const HeartbeatTableDataT &data);
/// Handler for a heartbeat batch notification from the GCS
///
/// \param heartbeat_batch The batch of heartbeat data.
void HeartbeatBatchAdded(const HeartbeatBatchTableDataT &heartbeat_batch);

/// Methods for task scheduling.

Expand Down
16 changes: 10 additions & 6 deletions src/ray/raylet/scheduling_policy.cc
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
#include "scheduling_policy.h"

#include <algorithm>
#include <chrono>
#include <random>

#include "scheduling_policy.h"

#include "ray/util/logging.h"

Expand Down Expand Up @@ -123,21 +125,23 @@ std::vector<TaskID> SchedulingPolicy::SpillOver(

ResourceSet new_load(remote_scheduling_resources.GetLoadResources());

// Check if we can accommodate an infeasible task.
// Check if we can accommodate infeasible tasks.
for (const auto &task : scheduling_queue_.GetInfeasibleTasks()) {
const auto &spec = task.GetTaskSpecification();
if (spec.GetRequiredPlacementResources().IsSubset(
remote_scheduling_resources.GetTotalResources())) {
const auto &placement_resources = spec.GetRequiredPlacementResources();
if (placement_resources.IsSubset(remote_scheduling_resources.GetTotalResources())) {
decision.push_back(spec.TaskId());
new_load.AddResources(spec.GetRequiredResources());
}
}

// Try to accommodate up to a single ready task.
for (const auto &task : scheduling_queue_.GetReadyTasks()) {
const auto &spec = task.GetTaskSpecification();
if (!spec.IsActorTask()) {
// Make sure the node has enough available resources to prevent forwarding cycles.
if (spec.GetRequiredPlacementResources().IsSubset(
remote_scheduling_resources.GetTotalResources())) {
remote_scheduling_resources.GetAvailableResources())) {
decision.push_back(spec.TaskId());
new_load.AddResources(spec.GetRequiredResources());
break;
Expand Down
7 changes: 7 additions & 0 deletions src/ray/raylet/scheduling_policy.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,13 @@ class SchedulingPolicy {
std::unordered_map<ClientID, SchedulingResources> &cluster_resources,
const ClientID &local_client_id);

/// \brief Given a set of cluster resources perform a spill-over scheduling operation.
///
/// \param cluster_resources: a set of cluster resources containing resource and load
/// information for some subset of the cluster. For all client IDs in the returned
/// placement map, the corresponding SchedulingResources::resources_load_ is
/// incremented by the aggregate resource demand of the tasks assigned to it.
/// \return Scheduling decision, mapping tasks to raylets for placement.
std::vector<TaskID> SpillOver(SchedulingResources &remote_scheduling_resources) const;

/// \brief SchedulingPolicy destructor.
Expand Down
2 changes: 1 addition & 1 deletion src/ray/raylet/scheduling_resources.cc
Original file line number Diff line number Diff line change
Expand Up @@ -145,8 +145,8 @@ const std::string ResourceSet::ToString() const {
// Convert the first element to a string.
if (it != resource_capacity_.end()) {
return_string += "{" + it->first + "," + std::to_string(it->second) + "}";
it++;
}
it++;

// Add the remaining elements to the string (along with a comma).
for (; it != resource_capacity_.end(); ++it) {
Expand Down
49 changes: 41 additions & 8 deletions test/actor_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
from __future__ import print_function

import collections
import json
import random
import numpy as np
import os
Expand Down Expand Up @@ -919,29 +920,61 @@ def test_actor_multiple_gpus_from_multiple_tasks(shutdown_only):
num_local_schedulers=num_local_schedulers,
redirect_output=True,
num_cpus=(num_local_schedulers * [10 * num_gpus_per_scheduler]),
num_gpus=(num_local_schedulers * [num_gpus_per_scheduler]))
num_gpus=(num_local_schedulers * [num_gpus_per_scheduler]),
_internal_config=json.dumps({
"num_heartbeats_timeout": 1000
}))

@ray.remote
def create_actors(n):
def create_actors(i, n):
@ray.remote(num_gpus=1)
class Actor(object):
def __init__(self):
def __init__(self, i, j):
self.gpu_ids = ray.get_gpu_ids()

def get_location_and_ids(self):
return ((
ray.worker.global_worker.plasma_client.store_socket_name),
tuple(self.gpu_ids))

def sleep(self):
time.sleep(100)

# Create n actors.
for _ in range(n):
Actor.remote()
actors = []
for j in range(n):
actors.append(Actor.remote(i, j))

locations = ray.get(
[actor.get_location_and_ids.remote() for actor in actors])

ray.get([
create_actors.remote(num_gpus_per_scheduler)
for _ in range(num_local_schedulers)
# Put each actor to sleep for a long time to prevent them from getting
# terminated.
for actor in actors:
actor.sleep.remote()

return locations

all_locations = ray.get([
create_actors.remote(i, num_gpus_per_scheduler)
for i in range(num_local_schedulers)
])

# Make sure that no two actors are assigned to the same GPU.
node_names = {
location
for locations in all_locations for location, gpu_id in locations
}
assert len(node_names) == num_local_schedulers

# Keep track of which GPU IDs are being used for each location.
gpus_in_use = {node_name: [] for node_name in node_names}
for locations in all_locations:
for location, gpu_ids in locations:
gpus_in_use[location].extend(gpu_ids)
for node_name in node_names:
assert len(set(gpus_in_use[node_name])) == num_gpus_per_scheduler

@ray.remote(num_gpus=1)
class Actor(object):
def __init__(self):
Expand Down

0 comments on commit b0bfd10

Please sign in to comment.