Skip to content

Commit

Permalink
Convert actor data structures to C++. (ray-project#454)
Browse files Browse the repository at this point in the history
  • Loading branch information
robertnishihara authored and pcmoritz committed Apr 12, 2017
1 parent 94f32db commit dad57e3
Show file tree
Hide file tree
Showing 3 changed files with 77 additions and 115 deletions.
30 changes: 7 additions & 23 deletions src/local_scheduler/local_scheduler.cc
Original file line number Diff line number Diff line change
Expand Up @@ -198,15 +198,6 @@ void LocalSchedulerState_free(LocalSchedulerState *state) {
state->config.start_worker_command = NULL;
}

/* Free the mapping from the actor ID to the ID of the local scheduler
* responsible for that actor. */
actor_map_entry *current_actor_map_entry, *temp_actor_map_entry;
HASH_ITER(hh, state->actor_mapping, current_actor_map_entry,
temp_actor_map_entry) {
HASH_DEL(state->actor_mapping, current_actor_map_entry);
free(current_actor_map_entry);
}

/* Free the algorithm state. */
SchedulingAlgorithmState_free(state->algorithm_state);
state->algorithm_state = NULL;
Expand Down Expand Up @@ -334,9 +325,6 @@ LocalSchedulerState *LocalSchedulerState_init(

state->loop = loop;

/* Initialize the hash table mapping actor ID to the ID of the local scheduler
* that is responsible for that actor. */
state->actor_mapping = NULL;
/* Connect to Redis if a Redis address is provided. */
if (redis_addr != NULL) {
int num_args;
Expand Down Expand Up @@ -670,10 +658,8 @@ void handle_client_register(LocalSchedulerState *state,
if (!ActorID_equal(actor_id, NIL_ACTOR_ID)) {
/* Make sure that the local scheduler is aware that it is responsible for
* this actor. */
actor_map_entry *entry;
HASH_FIND(hh, state->actor_mapping, &actor_id, sizeof(actor_id), entry);
CHECK(entry != NULL);
CHECK(DBClientID_equal(entry->local_scheduler_id,
CHECK(state->actor_mapping.count(actor_id) == 1);
CHECK(DBClientID_equal(state->actor_mapping[actor_id].local_scheduler_id,
get_db_client_id(state->db)));
/* Update the worker struct with this actor ID. */
CHECK(ActorID_equal(worker->actor_id, NIL_ACTOR_ID));
Expand Down Expand Up @@ -906,16 +892,14 @@ void handle_actor_creation_callback(ActorInfo info, void *context) {
* TODO(rkn): We will need to remove this check to handle the case where the
* corresponding publish is retried and the case in which a task that creates
* an actor is resubmitted due to fault tolerance. */
actor_map_entry *entry;
HASH_FIND(hh, state->actor_mapping, &actor_id, sizeof(actor_id), entry);
CHECK(entry == NULL);
CHECK(state->actor_mapping.count(actor_id) == 0);
/* Create a new entry and add it to the actor mapping table. TODO(rkn):
* Currently this is never removed (except when the local scheduler state is
* deleted). */
entry = (actor_map_entry *) malloc(sizeof(actor_map_entry));
entry->actor_id = actor_id;
entry->local_scheduler_id = local_scheduler_id;
HASH_ADD(hh, state->actor_mapping, actor_id, sizeof(entry->actor_id), entry);
ActorMapEntry entry;
entry.actor_id = actor_id;
entry.local_scheduler_id = local_scheduler_id;
state->actor_mapping[actor_id] = entry;
/* If this local scheduler is responsible for the actor, then start a new
* worker for the actor. */
if (DBClientID_equal(local_scheduler_id, get_db_client_id(state->db))) {
Expand Down
153 changes: 66 additions & 87 deletions src/local_scheduler/local_scheduler_algorithm.cc
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ struct SchedulingAlgorithmState {
/** This is a hash table from actor ID to information about that actor. In
* particular, a queue of tasks that are waiting to execute on that actor.
* This is only used for actors that exist locally. */
LocalActorInfo *local_actor_infos;
std::unordered_map<ActorID, LocalActorInfo, UniqueIDHasher> local_actor_infos;
/** An array of actor tasks that have been submitted but this local scheduler
* doesn't know which local scheduler is responsible for them, so cannot
* assign them to the correct local scheduler yet. Whenever a notification
Expand Down Expand Up @@ -132,8 +132,6 @@ SchedulingAlgorithmState *SchedulingAlgorithmState_init(void) {
utarray_new(algorithm_state->cached_submitted_actor_task_sizes,
&task_spec_size_icd);

algorithm_state->local_actor_infos = NULL;

return algorithm_state;
}

Expand All @@ -151,12 +149,10 @@ void SchedulingAlgorithmState_free(SchedulingAlgorithmState *algorithm_state) {
algorithm_state->dispatch_task_queue->clear();
delete algorithm_state->dispatch_task_queue;
/* Remove all of the remaining actors. */
LocalActorInfo *actor_entry, *tmp_actor_entry;
HASH_ITER(hh, algorithm_state->local_actor_infos, actor_entry,
tmp_actor_entry) {
/* We do not call HASH_DELETE here because it will be called inside of
* remove_actor. */
remove_actor(algorithm_state, actor_entry->actor_id);
while (algorithm_state->local_actor_infos.size() != 0) {
auto it = algorithm_state->local_actor_infos.begin();
ActorID actor_id = it->first;
remove_actor(algorithm_state, actor_id);
}
/* Free the list of cached actor task specs and the task specs themselves. */
for (int i = 0;
Expand Down Expand Up @@ -245,16 +241,14 @@ void provide_scheduler_info(LocalSchedulerState *state,
void create_actor(SchedulingAlgorithmState *algorithm_state,
ActorID actor_id,
LocalSchedulerClient *worker) {
/* This will be freed when the actor is removed in remove_actor. */
LocalActorInfo *entry = (LocalActorInfo *) malloc(sizeof(LocalActorInfo));
entry->actor_id = actor_id;
entry->task_counter = 0;
/* Initialize the doubly-linked list to NULL. */
entry->task_queue = new std::list<TaskQueueEntry>();
entry->worker = worker;
entry->worker_available = false;
HASH_ADD(hh, algorithm_state->local_actor_infos, actor_id, sizeof(actor_id),
entry);
LocalActorInfo entry;
entry.actor_id = actor_id;
entry.task_counter = 0;
entry.task_queue = new std::list<TaskQueueEntry>();
entry.worker = worker;
entry.worker_available = false;
CHECK(algorithm_state->local_actor_infos.count(actor_id) == 0)
algorithm_state->local_actor_infos[actor_id] = entry;

/* Log some useful information about the actor that we created. */
char id_string[ID_STRING_SIZE];
Expand All @@ -264,15 +258,13 @@ void create_actor(SchedulingAlgorithmState *algorithm_state,
}

void remove_actor(SchedulingAlgorithmState *algorithm_state, ActorID actor_id) {
LocalActorInfo *entry;
HASH_FIND(hh, algorithm_state->local_actor_infos, &actor_id, sizeof(actor_id),
entry);
/* Make sure the actor actually exists. */
CHECK(entry != NULL);
CHECK(algorithm_state->local_actor_infos.count(actor_id) == 1);
LocalActorInfo &entry =
algorithm_state->local_actor_infos.find(actor_id)->second;

/* Log some useful information about the actor that we're removing. */
char id_string[ID_STRING_SIZE];
size_t count = entry->task_queue->size();
size_t count = entry.task_queue->size();
if (count > 0) {
LOG_WARN("Removing actor with ID %s and %lld remaining tasks.",
ObjectID_to_string(actor_id, id_string, ID_STRING_SIZE),
Expand All @@ -281,30 +273,26 @@ void remove_actor(SchedulingAlgorithmState *algorithm_state, ActorID actor_id) {
UNUSED(id_string);

/* Free all remaining tasks in the actor queue. */
for (auto &task : *entry->task_queue) {
for (auto &task : *entry.task_queue) {
TaskQueueEntry_free(&task);
}
entry->task_queue->clear();
delete entry->task_queue;
/* Remove the entry from the hash table and free it. */
HASH_DELETE(hh, algorithm_state->local_actor_infos, entry);
free(entry);
entry.task_queue->clear();
delete entry.task_queue;
/* Remove the entry from the hash table. */
algorithm_state->local_actor_infos.erase(actor_id);
}

void handle_actor_worker_connect(LocalSchedulerState *state,
SchedulingAlgorithmState *algorithm_state,
ActorID actor_id,
LocalSchedulerClient *worker) {
LocalActorInfo *entry;
HASH_FIND(hh, algorithm_state->local_actor_infos, &actor_id, sizeof(actor_id),
entry);
if (entry == NULL) {
if (algorithm_state->local_actor_infos.count(actor_id) == 0) {
create_actor(algorithm_state, actor_id, worker);
} else {
/* In this case, the LocalActorInfo struct was already been created by the
* first call to add_task_to_actor_queue. However, the worker field was not
* filled out, so fill out the correct worker field now. */
entry->worker = worker;
algorithm_state->local_actor_infos[actor_id].worker = worker;
}
}

Expand Down Expand Up @@ -341,29 +329,27 @@ void add_task_to_actor_queue(LocalSchedulerState *state,
char tmp[ID_STRING_SIZE];
ObjectID_to_string(actor_id, tmp, ID_STRING_SIZE);
DCHECK(!ActorID_equal(actor_id, NIL_ACTOR_ID));
/* Get the local actor entry for this actor. */
LocalActorInfo *entry;
HASH_FIND(hh, algorithm_state->local_actor_infos, &actor_id, sizeof(actor_id),
entry);

/* Handle the case in which there is no LocalActorInfo struct yet. */
if (entry == NULL) {
if (algorithm_state->local_actor_infos.count(actor_id) == 0) {
/* Create the actor struct with a NULL worker because the worker struct has
* not been created yet. The correct worker struct will be inserted when the
* actor worker connects to the local scheduler. */
create_actor(algorithm_state, actor_id, NULL);
HASH_FIND(hh, algorithm_state->local_actor_infos, &actor_id,
sizeof(actor_id), entry);
CHECK(entry != NULL);
CHECK(algorithm_state->local_actor_infos.count(actor_id) == 1);
}

/* Get the local actor entry for this actor. */
LocalActorInfo &entry =
algorithm_state->local_actor_infos.find(actor_id)->second;

int64_t task_counter = TaskSpec_actor_counter(spec);
/* As a sanity check, the counter of the new task should be greater than the
* number of tasks that have executed on this actor so far (since we are
* guaranteeing in-order execution of the tasks on the actor). TODO(rkn): This
* check will fail if the fault-tolerance mechanism resubmits a task on an
* actor. */
CHECK(task_counter >= entry->task_counter);
CHECK(task_counter >= entry.task_counter);

/* Create a new task queue entry. */
TaskQueueEntry elt = TaskQueueEntry_init(spec, task_spec_size);
Expand All @@ -372,12 +358,12 @@ void add_task_to_actor_queue(LocalSchedulerState *state,
* to find the right place to insert the task queue entry. TODO(pcm): This
* makes submitting multiple actor tasks take quadratic time, which needs to
* be optimized. */
auto it = entry->task_queue->begin();
while (it != entry->task_queue->end() &&
auto it = entry.task_queue->begin();
while (it != entry.task_queue->end() &&
(task_counter > TaskSpec_actor_counter(it->spec))) {
++it;
}
entry->task_queue->insert(it, elt);
entry.task_queue->insert(it, elt);

/* Update the task table. */
if (state->db != NULL) {
Expand Down Expand Up @@ -409,45 +395,42 @@ bool dispatch_actor_task(LocalSchedulerState *state,
/* Make sure this worker actually is an actor. */
CHECK(!ActorID_equal(actor_id, NIL_ACTOR_ID));
/* Make sure this actor belongs to this local scheduler. */
actor_map_entry *actor_entry;
HASH_FIND(hh, state->actor_mapping, &actor_id, sizeof(actor_id), actor_entry);
CHECK(actor_entry != NULL);
CHECK(DBClientID_equal(actor_entry->local_scheduler_id,
get_db_client_id(state->db)));
CHECK(state->actor_mapping.count(actor_id) == 1);
CHECK(DBClientID_equal(state->actor_mapping[actor_id].local_scheduler_id,
get_db_client_id(state->db)))

/* Get the local actor entry for this actor. */
LocalActorInfo *entry;
HASH_FIND(hh, algorithm_state->local_actor_infos, &actor_id, sizeof(actor_id),
entry);
CHECK(entry != NULL);
CHECK(algorithm_state->local_actor_infos.count(actor_id) != 0);
LocalActorInfo &entry =
algorithm_state->local_actor_infos.find(actor_id)->second;

if (entry->task_queue->empty()) {
if (entry.task_queue->empty()) {
/* There are no queued tasks for this actor, so we cannot dispatch a task to
* the actor. */
return false;
}
TaskQueueEntry first_task = entry->task_queue->front();
TaskQueueEntry first_task = entry.task_queue->front();
int64_t next_task_counter = TaskSpec_actor_counter(first_task.spec);
if (next_task_counter != entry->task_counter) {
if (next_task_counter != entry.task_counter) {
/* We cannot execute the next task on this actor without violating the
* in-order execution guarantee for actor tasks. */
CHECK(next_task_counter > entry->task_counter);
CHECK(next_task_counter > entry.task_counter);
return false;
}
/* If the worker is not available, we cannot assign a task to it. */
if (!entry->worker_available) {
if (!entry.worker_available) {
return false;
}
/* Assign the first task in the task queue to the worker and mark the worker
* as unavailable. */
entry->task_counter += 1;
entry.task_counter += 1;
assign_task_to_worker(state, first_task.spec, first_task.task_spec_size,
entry->worker);
entry->worker_available = false;
entry.worker);
entry.worker_available = false;
/* Free the task queue entry. */
TaskQueueEntry_free(&first_task);
/* Remove the task from the actor's task queue. */
entry->task_queue->pop_front();
entry.task_queue->pop_front();
return true;
}

Expand Down Expand Up @@ -854,11 +837,7 @@ void handle_actor_task_submitted(LocalSchedulerState *state,
ActorID actor_id = TaskSpec_actor_id(spec);
CHECK(!ActorID_equal(actor_id, NIL_ACTOR_ID));

/* Find the local scheduler responsible for this actor. */
actor_map_entry *entry;
HASH_FIND(hh, state->actor_mapping, &actor_id, sizeof(actor_id), entry);

if (entry == NULL) {
if (state->actor_mapping.count(actor_id) == 0) {
/* Add this task to a queue of tasks that have been submitted but the local
* scheduler doesn't know which actor is responsible for them. These tasks
* will be resubmitted (internally by the local scheduler) whenever a new
Expand All @@ -869,7 +848,7 @@ void handle_actor_task_submitted(LocalSchedulerState *state,
return;
}

if (DBClientID_equal(entry->local_scheduler_id,
if (DBClientID_equal(state->actor_mapping[actor_id].local_scheduler_id,
get_db_client_id(state->db))) {
/* This local scheduler is responsible for the actor, so handle the task
* locally. */
Expand All @@ -878,10 +857,12 @@ void handle_actor_task_submitted(LocalSchedulerState *state,
/* Attempt to dispatch tasks to this actor. */
dispatch_actor_task(state, algorithm_state, actor_id);
} else {
/* This local scheduler is not responsible for the task, so assign the task
* directly to the actor that is responsible. */
give_task_to_local_scheduler(state, algorithm_state, spec, task_spec_size,
entry->local_scheduler_id);
/* This local scheduler is not responsible for the task, so find the local
* scheduler that is responsible for this actor and assign the task directly
* to that local scheduler. */
give_task_to_local_scheduler(
state, algorithm_state, spec, task_spec_size,
state->actor_mapping[actor_id].local_scheduler_id);
}
}

Expand Down Expand Up @@ -937,15 +918,13 @@ void handle_actor_task_scheduled(LocalSchedulerState *state,
* is responsible for. */
ActorID actor_id = TaskSpec_actor_id(spec);
DCHECK(!ActorID_equal(actor_id, NIL_ACTOR_ID));
actor_map_entry *entry;
HASH_FIND(hh, state->actor_mapping, &actor_id, sizeof(actor_id), entry);
if (entry != NULL) {
if (state->actor_mapping.count(actor_id) == 1) {
/* This means that an actor has been assigned to this local scheduler, and a
* task for that actor has been received by this local scheduler, but this
* local scheduler has not yet processed the notification about the actor
* creation. This may be possible though should be very uncommon. If it does
* happen, it's ok. */
DCHECK(DBClientID_equal(entry->local_scheduler_id,
DCHECK(DBClientID_equal(state->actor_mapping[actor_id].local_scheduler_id,
get_db_client_id(state->db)));
} else {
LOG_INFO(
Expand Down Expand Up @@ -1019,13 +998,13 @@ void handle_actor_worker_available(LocalSchedulerState *state,
ActorID actor_id = worker->actor_id;
CHECK(!ActorID_equal(actor_id, NIL_ACTOR_ID));
/* Get the actor info for this worker. */
LocalActorInfo *entry;
HASH_FIND(hh, algorithm_state->local_actor_infos, &actor_id, sizeof(actor_id),
entry);
CHECK(entry != NULL);
CHECK(worker == entry->worker);
CHECK(!entry->worker_available);
entry->worker_available = true;
CHECK(algorithm_state->local_actor_infos.count(actor_id) == 1);
LocalActorInfo &entry =
algorithm_state->local_actor_infos.find(actor_id)->second;

CHECK(worker == entry.worker);
CHECK(!entry.worker_available);
entry.worker_available = true;
/* Assign a task to this actor if possible. */
dispatch_actor_task(state, algorithm_state, actor_id);
}
Expand Down
Loading

0 comments on commit dad57e3

Please sign in to comment.