Skip to content

Commit

Permalink
Fault tolerance race (ray-project#894)
Browse files Browse the repository at this point in the history
* Remove race between local scheduler disconnecting and global scheduler
assigning a task

* Fix number of workers started in component failures test

* Fix race between global scheduler retrying a task assignment and monitor
cleaning up task table. The global scheduler should only retry the task
assignment if the local scheduler is still alive.

* Clean up task_table_update callback if failure

* Look up current local scheduler mapping when retrying actor task submission

* Log warning if no subscribers received a task table update

* Clean up database handle memory in local scheduler
  • Loading branch information
stephanie-wang authored and robertnishihara committed Aug 31, 2017
1 parent deca29a commit 7496c98
Show file tree
Hide file tree
Showing 7 changed files with 128 additions and 85 deletions.
4 changes: 0 additions & 4 deletions src/common/redis_module/ray_redis_module.cc
Original file line number Diff line number Diff line change
Expand Up @@ -971,10 +971,6 @@ int TaskTableWrite(RedisModuleCtx *ctx,
}

if (num_clients == 0) {
LOG_WARN(
"No subscribers received this publish. This most likely means that "
"either the intended recipient has not subscribed yet or that the "
"pubsub connection to the intended recipient has been broken.");
/* This reply will be received by redis_task_table_update_callback or
* redis_task_table_add_task_callback in redis.cc, which will then reissue
* the command. */
Expand Down
8 changes: 8 additions & 0 deletions src/common/state/db.h
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,14 @@ void db_attach(DBHandle *db, event_loop *loop, bool reattach);
*/
void db_disconnect(DBHandle *db);

/**
* Free the database handle.
*
* @param db The database connection to clean up.
* @return Void.
*/
void DBHandle_free(DBHandle *db);

/**
* Returns the db client ID.
*
Expand Down
123 changes: 48 additions & 75 deletions src/common/state/redis.cc
Original file line number Diff line number Diff line change
Expand Up @@ -310,16 +310,7 @@ DBHandle *db_connect(const std::string &db_primary_address,
return db;
}

void db_disconnect(DBHandle *db) {
/* Notify others that this client is disconnecting from Redis. If a client of
* the same type on the same node wants to reconnect again, they must
* reconnect and get assigned a different client ID. */
redisReply *reply =
(redisReply *) redisCommand(db->sync_context, "RAY.DISCONNECT %b",
db->client.id, sizeof(db->client.id));
CHECK(strcmp(reply->str, "OK") == 0);
freeReplyObject(reply);

void DBHandle_free(DBHandle *db) {
/* Clean up the primary Redis connection state. */
redisFree(db->sync_context);
redisAsyncFree(db->context);
Expand All @@ -343,6 +334,19 @@ void db_disconnect(DBHandle *db) {
delete db;
}

void db_disconnect(DBHandle *db) {
/* Notify others that this client is disconnecting from Redis. If a client of
* the same type on the same node wants to reconnect again, they must
* reconnect and get assigned a different client ID. */
redisReply *reply =
(redisReply *) redisCommand(db->sync_context, "RAY.DISCONNECT %b",
db->client.id, sizeof(db->client.id));
CHECK(strcmp(reply->str, "OK") == 0);
freeReplyObject(reply);

DBHandle_free(db);
}

void db_attach(DBHandle *db, event_loop *loop, bool reattach) {
db->loop = loop;
/* Attach primary redis instance to the event loop. */
Expand Down Expand Up @@ -872,45 +876,28 @@ void redis_task_table_add_task_callback(redisAsyncContext *c,
void *privdata) {
REDIS_CALLBACK_HEADER(db, callback_data, r);

/* Do some minimal checking. */
redisReply *reply = (redisReply *) r;

/* If the publish which happens inside of the call to RAY.TASK_TABLE_ADD was
* not received by any subscribers, then reissue the command. TODO(rkn): This
* entire if block should be temporary. Once we address the problem where in
* which a global scheduler may publish a task to a local scheduler before the
* local scheduler has subscribed to the relevant channel, we shouldn't need
* this block any more. */
// If no subscribers received the message, call the failure callback. The
// caller should decide whether to retry the add. NOTE(swang): The caller
// should check whether the receiving subscriber is still alive in the
// db_client table before retrying the add.
if (reply->type == REDIS_REPLY_ERROR &&
strcmp(reply->str, "No subscribers received message.") == 0) {
Task *task = (Task *) callback_data->data;
TaskID task_id = Task_task_id(task);
DBClientID local_scheduler_id = Task_local_scheduler(task);
redisAsyncContext *context = get_redis_context(db, task_id);
int state = Task_state(task);
TaskSpec *spec = Task_task_spec(task);
/* Reissue the command. */
CHECKM(task != NULL, "NULL task passed to redis_task_table_add_task.");
int status = redisAsyncCommand(
context, redis_task_table_add_task_callback,
(void *) callback_data->timer_id, "RAY.TASK_TABLE_ADD %b %d %b %b",
task_id.id, sizeof(task_id.id), state, local_scheduler_id.id,
sizeof(local_scheduler_id.id), spec, Task_task_spec_size(task));
if ((status == REDIS_ERR) || context->err) {
LOG_REDIS_DEBUG(context, "error in redis_task_table_add_task");
LOG_WARN("No subscribers received the task_table_add message.");
if (callback_data->retry.fail_callback != NULL) {
callback_data->retry.fail_callback(
callback_data->id, callback_data->user_context, callback_data->data);
}
} else {
CHECKM(strcmp(reply->str, "OK") == 0, "reply->str is %s", reply->str);
/* Call the done callback if there is one. */
if (callback_data->done_callback != NULL) {
task_table_done_callback done_callback =
(task_table_done_callback) callback_data->done_callback;
done_callback(callback_data->id, callback_data->user_context);
}
/* Since we are reissuing the same command with the same callback data,
* return early to avoid freeing the callback data. */
return;
}

CHECKM(strcmp(reply->str, "OK") == 0, "reply->str is %s", reply->str);
/* Call the done callback if there is one. */
if (callback_data->done_callback != NULL) {
task_table_done_callback done_callback =
(task_table_done_callback) callback_data->done_callback;
done_callback(callback_data->id, callback_data->user_context);
}
/* Clean up the timer and callback. */
destroy_timer_callback(db->loop, callback_data);
}
Expand Down Expand Up @@ -940,44 +927,30 @@ void redis_task_table_update_callback(redisAsyncContext *c,
void *privdata) {
REDIS_CALLBACK_HEADER(db, callback_data, r);

/* Do some minimal checking. */
redisReply *reply = (redisReply *) r;

/* If the publish which happens inside of the call to RAY.TASK_TABLE_UPDATE
* was not received by any subscribers, then reissue the command. TODO(rkn):
* This entire if block should be temporary. Once we address the problem where
* in which a global scheduler may publish a task to a local scheduler before
* the local scheduler has subscribed to the relevant channel, we shouldn't
* need this block any more. */
// If no subscribers received the message, call the failure callback. The
// caller should decide whether to retry the update. NOTE(swang): Retrying a
// task table update can race with the liveness monitor. Do not retry the
// update unless the caller is sure that the receiving subscriber is still
// alive in the db_client table.
if (reply->type == REDIS_REPLY_ERROR &&
strcmp(reply->str, "No subscribers received message.") == 0) {
Task *task = (Task *) callback_data->data;
TaskID task_id = Task_task_id(task);
redisAsyncContext *context = get_redis_context(db, task_id);
DBClientID local_scheduler_id = Task_local_scheduler(task);
int state = Task_state(task);
/* Reissue the command. */
CHECKM(task != NULL, "NULL task passed to redis_task_table_update.");
int status = redisAsyncCommand(
context, redis_task_table_update_callback,
(void *) callback_data->timer_id, "RAY.TASK_TABLE_UPDATE %b %d %b",
task_id.id, sizeof(task_id.id), state, local_scheduler_id.id,
sizeof(local_scheduler_id.id));
if ((status == REDIS_ERR) || context->err) {
LOG_REDIS_DEBUG(context, "error in redis_task_table_update");
LOG_WARN("No subscribers received the task_table_update message.");
if (callback_data->retry.fail_callback != NULL) {
callback_data->retry.fail_callback(
callback_data->id, callback_data->user_context, callback_data->data);
}
/* Since we are reissuing the same command with the same callback data,
* return early to avoid freeing the callback data. */
return;
}
} else {
CHECKM(strcmp(reply->str, "OK") == 0, "reply->str is %s", reply->str);

CHECKM(strcmp(reply->str, "OK") == 0, "reply->str is %s", reply->str);
/* Call the done callback if there is one. */
if (callback_data->done_callback != NULL) {
task_table_done_callback done_callback =
(task_table_done_callback) callback_data->done_callback;
done_callback(callback_data->id, callback_data->user_context);
/* Call the done callback if there is one. */
if (callback_data->done_callback != NULL) {
task_table_done_callback done_callback =
(task_table_done_callback) callback_data->done_callback;
done_callback(callback_data->id, callback_data->user_context);
}
}

/* Clean up the timer and callback. */
destroy_timer_callback(db->loop, callback_data);
}
Expand Down
46 changes: 45 additions & 1 deletion src/global_scheduler/global_scheduler.cc
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,45 @@
#include "state/table.h"
#include "state/task_table.h"

/**
* Retry the task assignment. If the local scheduler that the task is assigned
* to is no longer active, do not retry the assignment.
* TODO(rkn): We currently only retry the method if the global scheduler
* publishes a task to a local scheduler before the local scheduler has
* subscribed to the channel. If we enforce that ordering, we can remove this
* retry method.
*
* @param id The task ID.
* @param user_context The global scheduler state.
* @param user_data The Task that failed to be assigned.
* @return Void.
*/
void assign_task_to_local_scheduler_retry(UniqueID id,
void *user_context,
void *user_data) {
GlobalSchedulerState *state = (GlobalSchedulerState *) user_context;
Task *task = (Task *) user_data;
CHECK(Task_state(task) == TASK_STATUS_SCHEDULED);

// If the local scheduler has died since we requested the task assignment, do
// not retry again.
DBClientID local_scheduler_id = Task_local_scheduler(task);
auto it = state->local_schedulers.find(local_scheduler_id);
if (it == state->local_schedulers.end()) {
return;
}

// The local scheduler is still alive. The failure is most likely due to the
// task assignment getting published before the local scheduler subscribed to
// the channel. Retry the assignment.
auto retryInfo = RetryInfo{
.num_retries = 0, // This value is unused.
.timeout = 0, // This value is unused.
.fail_callback = assign_task_to_local_scheduler_retry,
};
task_table_update(state->db, Task_copy(task), &retryInfo, NULL, user_context);
}

/**
* Assign the given task to the local scheduler, update Redis and scheduler data
* structures.
Expand All @@ -34,7 +73,12 @@ void assign_task_to_local_scheduler(GlobalSchedulerState *state,
LOG_DEBUG("Issuing a task table update for task = %s",
ObjectID_to_string(Task_task_id(task), id_string, ID_STRING_SIZE));
UNUSED(id_string);
task_table_update(state->db, Task_copy(task), NULL, NULL, NULL);
auto retryInfo = RetryInfo{
.num_retries = 0, // This value is unused.
.timeout = 0, // This value is unused.
.fail_callback = assign_task_to_local_scheduler_retry,
};
task_table_update(state->db, Task_copy(task), &retryInfo, NULL, state);

/* Update the object table info to reflect the fact that the results of this
* task will be created on the machine that the task was assigned to. This can
Expand Down
11 changes: 8 additions & 3 deletions src/local_scheduler/local_scheduler.cc
Original file line number Diff line number Diff line change
Expand Up @@ -171,10 +171,15 @@ void LocalSchedulerState_free(LocalSchedulerState *state) {
delete state->plasma_conn;
state->plasma_conn = NULL;

/* Disconnect from the database. */
/* Clean up the database connection. NOTE(swang): The global scheduler is
* responsible for deleting our entry from the db_client table, so do not
* delete it here. */
if (state->db != NULL) {
db_disconnect(state->db);
state->db = NULL;
/* TODO(swang): Add a null heartbeat that tells the global scheduler that
* we are dead. This avoids having to wait for the timeout before marking
* us as dead in the db_client table, in cases where we can do a clean
* exit. */
DBHandle_free(state->db);
}

/* Free the command for starting new workers. */
Expand Down
19 changes: 18 additions & 1 deletion src/local_scheduler/local_scheduler_algorithm.cc
Original file line number Diff line number Diff line change
Expand Up @@ -873,6 +873,18 @@ void queue_task_locally(LocalSchedulerState *state,
}
}

void give_task_to_local_scheduler_retry(UniqueID id,
void *user_context,
void *user_data) {
LocalSchedulerState *state = (LocalSchedulerState *) user_context;
Task *task = (Task *) user_data;
CHECK(Task_state(task) == TASK_STATUS_SCHEDULED);

TaskSpec *spec = Task_task_spec(task);
handle_actor_task_submitted(state, state->algorithm_state, spec,
Task_task_spec_size(task));
}

/**
* Give a task directly to another local scheduler. This is currently only used
* for assigning actor tasks to the local scheduler responsible for that actor.
Expand All @@ -896,7 +908,12 @@ void give_task_to_local_scheduler(LocalSchedulerState *state,
DCHECK(state->config.global_scheduler_exists);
Task *task = Task_alloc(spec, task_spec_size, TASK_STATUS_SCHEDULED,
local_scheduler_id);
task_table_add_task(state->db, task, NULL, NULL, NULL);
auto retryInfo = RetryInfo{
.num_retries = 0, // This value is unused.
.timeout = 0, // This value is unused.
.fail_callback = give_task_to_local_scheduler_retry,
};
task_table_add_task(state->db, task, &retryInfo, NULL, state);
}

/**
Expand Down
2 changes: 1 addition & 1 deletion test/component_failures_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ def f(x, j):
num_local_schedulers = 4
num_workers_per_scheduler = 8
ray.worker._init(
num_workers=num_local_schedulers * num_workers_per_scheduler,
num_workers=num_workers_per_scheduler,
num_local_schedulers=num_local_schedulers,
start_ray_local=True,
num_cpus=[num_workers_per_scheduler] * num_local_schedulers,
Expand Down

0 comments on commit 7496c98

Please sign in to comment.