Skip to content

Commit

Permalink
Introduce local scheduler heartbeats which carry load information. (r…
Browse files Browse the repository at this point in the history
…ay-project#155)

* Introduce local scheduler heartbeats which carry load information.
  • Loading branch information
robertnishihara authored and atumanov committed Dec 25, 2016
1 parent 9bb9f8c commit 3d697c7
Show file tree
Hide file tree
Showing 10 changed files with 248 additions and 1 deletion.
2 changes: 1 addition & 1 deletion src/common/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ BUILD = build

all: hiredis redis redismodule $(BUILD)/libcommon.a

$(BUILD)/libcommon.a: event_loop.o common.o task.o io.o net.o state/redis.o state/table.o state/object_table.o state/task_table.o state/db_client_table.o thirdparty/ae/ae.o thirdparty/sha256.o
$(BUILD)/libcommon.a: event_loop.o common.o task.o io.o net.o state/redis.o state/table.o state/object_table.o state/task_table.o state/db_client_table.o state/local_scheduler_table.o thirdparty/ae/ae.o thirdparty/sha256.o
ar rcs $@ $^

$(BUILD)/common_tests: test/common_tests.c $(BUILD)/libcommon.a
Expand Down
27 changes: 27 additions & 0 deletions src/common/state/local_scheduler_table.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
#include "local_scheduler_table.h"
#include "redis.h"

void local_scheduler_table_subscribe(
db_handle *db_handle,
local_scheduler_table_subscribe_callback subscribe_callback,
void *subscribe_context,
retry_info *retry) {
local_scheduler_table_subscribe_data *sub_data =
malloc(sizeof(local_scheduler_table_subscribe_data));
sub_data->subscribe_callback = subscribe_callback;
sub_data->subscribe_context = subscribe_context;

init_table_callback(db_handle, NIL_ID, __func__, sub_data, retry, NULL,
redis_local_scheduler_table_subscribe, NULL);
}

void local_scheduler_table_send_info(db_handle *db_handle,
local_scheduler_info *info,
retry_info *retry) {
local_scheduler_table_send_info_data *data =
malloc(sizeof(local_scheduler_table_send_info_data));
data->info = *info;

init_table_callback(db_handle, NIL_ID, __func__, data, retry, NULL,
redis_local_scheduler_table_send_info, NULL);
}
65 changes: 65 additions & 0 deletions src/common/state/local_scheduler_table.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
#ifndef LOCAL_SCHEDULER_TABLE_H
#define LOCAL_SCHEDULER_TABLE_H

#include "db.h"
#include "table.h"

typedef struct {
int task_queue_length;
int available_workers;
} local_scheduler_info;

/*
* ==== Subscribing to the local scheduler table ====
*/

/* Callback for subscribing to the local scheduler table. */
typedef void (*local_scheduler_table_subscribe_callback)(
db_client_id client_id,
local_scheduler_info info,
void *user_context);

/**
* Register a callback for a local scheduler table event.
*
* @param db_handle Database handle.
* @param subscribe_callback Callback that will be called when the local
* scheduler event happens.
* @param subscribe_context Context that will be passed into the
* subscribe_callback.
* @param retry Information about retrying the request to the database.
* @return Void.
*/
void local_scheduler_table_subscribe(
db_handle *db_handle,
local_scheduler_table_subscribe_callback subscribe_callback,
void *subscribe_context,
retry_info *retry);

/* Data that is needed to register local scheduler table subscribe callbacks
* with the state database. */
typedef struct {
local_scheduler_table_subscribe_callback subscribe_callback;
void *subscribe_context;
} local_scheduler_table_subscribe_data;

/**
* Send a heartbeat to all subscriers to the local scheduler table. This
* heartbeat contains some information about the load on the local scheduler.
*
* @param db_handle Database handle.
* @param info Information about the local scheduler, including the load on the
* local scheduler.
* @param retry Information about retrying the request to the database.
*/
void local_scheduler_table_send_info(db_handle *db_handle,
local_scheduler_info *info,
retry_info *retry);

/* Data that is needed to publish local scheduer heartbeats to the local
* scheduler table. */
typedef struct {
local_scheduler_info info;
} local_scheduler_table_send_info_data;

#endif /* LOCAL_SCHEDULER_TABLE_H */
77 changes: 77 additions & 0 deletions src/common/state/redis.c
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
#include "common.h"
#include "db.h"
#include "db_client_table.h"
#include "local_scheduler_table.h"
#include "object_table.h"
#include "object_info.h"
#include "task.h"
Expand Down Expand Up @@ -1081,6 +1082,82 @@ void redis_db_client_table_subscribe(table_callback_data *callback_data) {
}
}

void redis_local_scheduler_table_subscribe_callback(redisAsyncContext *c,
void *r,
void *privdata) {
REDIS_CALLBACK_HEADER(db, callback_data, r);

redisReply *reply = r;
CHECK(reply->type == REDIS_REPLY_ARRAY);
CHECK(reply->elements == 3);
redisReply *message_type = reply->element[0];
LOG_DEBUG("Local scheduer table subscribe callback, message %s",
message_type->str);

if (strcmp(message_type->str, "message") == 0) {
/* Handle a local scheduler heartbeat. Parse the payload and call the
* subscribe callback. */
redisReply *payload = reply->element[2];
local_scheduler_table_subscribe_data *data = callback_data->data;
db_client_id client_id;
local_scheduler_info info;
/* The payload should be the concatenation of these two structs. */
CHECK(sizeof(client_id) + sizeof(info) == payload->len);
memcpy(&client_id, payload->str, sizeof(client_id));
memcpy(&info, payload->str + sizeof(client_id), sizeof(info));
if (data->subscribe_callback) {
data->subscribe_callback(client_id, info, data->subscribe_context);
}
} else if (strcmp(message_type->str, "subscribe") == 0) {
/* The reply for the initial SUBSCRIBE command. */
CHECK(callback_data->done_callback == NULL);
/* If the initial SUBSCRIBE was successful, clean up the timer, but don't
* destroy the callback data. */
event_loop_remove_timer(db->loop, callback_data->timer_id);

} else {
LOG_FATAL("Unexpected reply type from local scheduler subscribe.");
}
}

void redis_local_scheduler_table_subscribe(table_callback_data *callback_data) {
db_handle *db = callback_data->db_handle;
int status = redisAsyncCommand(
db->sub_context, redis_local_scheduler_table_subscribe_callback,
(void *) callback_data->timer_id, "SUBSCRIBE local_schedulers");
if ((status == REDIS_ERR) || db->sub_context->err) {
LOG_REDIS_DEBUG(db->sub_context,
"error in redis_local_scheduler_table_subscribe");
}
}

void redis_local_scheduler_table_send_info_callback(redisAsyncContext *c,
void *r,
void *privdata) {
REDIS_CALLBACK_HEADER(db, callback_data, r);

redisReply *reply = r;
CHECK(reply->type == REDIS_REPLY_INTEGER);
LOG_DEBUG("%" PRId64 " subscribers received this publish.\n", reply->integer);

CHECK(callback_data->done_callback == NULL);
/* Clean up the timer and callback. */
destroy_timer_callback(db->loop, callback_data);
}

void redis_local_scheduler_table_send_info(table_callback_data *callback_data) {
db_handle *db = callback_data->db_handle;
local_scheduler_table_send_info_data *data = callback_data->data;
int status = redisAsyncCommand(
db->context, redis_local_scheduler_table_send_info_callback,
(void *) callback_data->timer_id, "PUBLISH local_schedulers %b%b",
db->client.id, sizeof(db->client.id), &data->info, sizeof(data->info));
if ((status == REDIS_ERR) || db->context->err) {
LOG_REDIS_DEBUG(db->context,
"error in redis_local_scheduler_table_send_info");
}
}

void redis_object_info_subscribe_callback(redisAsyncContext *c,
void *r,
void *privdata) {
Expand Down
18 changes: 18 additions & 0 deletions src/common/state/redis.h
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,24 @@ void redis_task_table_subscribe(table_callback_data *callback_data);
*/
void redis_db_client_table_subscribe(table_callback_data *callback_data);

/**
* Subscribe to updates from the local scheduler table.
*
* @param callback_data Data structure containing redis connection and timeout
* information.
* @return Void.
*/
void redis_local_scheduler_table_subscribe(table_callback_data *callback_data);

/**
* Publish an update to the local scheduler table.
*
* @param callback_data Data structure containing redis connection and timeout
* information.
* @return Void.
*/
void redis_local_scheduler_table_send_info(table_callback_data *callback_data);

void redis_object_info_subscribe(table_callback_data *callback_data);

#endif /* REDIS_H */
21 changes: 21 additions & 0 deletions src/global_scheduler/global_scheduler.c
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
#include "net.h"
#include "object_info.h"
#include "state/db_client_table.h"
#include "state/local_scheduler_table.h"
#include "state/object_table.h"
#include "state/table.h"
#include "state/task_table.h"
Expand Down Expand Up @@ -192,6 +193,21 @@ void object_table_subscribe_callback(object_id object_id,
}
}

void local_scheduler_table_handler(db_client_id client_id,
local_scheduler_info info,
void *user_context) {
/* Extract global scheduler state from the callback context. */
global_scheduler_state *state = (global_scheduler_state *) user_context;
UNUSED(state);
char id_string[ID_STRING_SIZE];
LOG_DEBUG(
"Local scheduler heartbeat from db_client_id %s",
object_id_to_string((object_id) client_id, id_string, ID_STRING_SIZE));
UNUSED(id_string);
LOG_DEBUG("Task queue length is %d", info.task_queue_length);
LOG_DEBUG("Num available workers is %d", info.available_workers);
}

void start_server(const char *redis_addr, int redis_port) {
event_loop *loop = event_loop_create();
g_state = init_global_scheduler(loop, redis_addr, redis_port);
Expand All @@ -214,6 +230,11 @@ void start_server(const char *redis_addr, int redis_port) {
object_table_subscribe_to_notifications(g_state->db, true,
object_table_subscribe_callback,
g_state, &retry, NULL, NULL);
/* Subscribe to notifications from local schedulers. These notifications serve
* as heartbeats and contain informaion about the load on the local
* schedulers. */
local_scheduler_table_subscribe(g_state->db, local_scheduler_table_handler,
g_state, NULL);
/* Start the event loop. */
event_loop_run(loop);
}
Expand Down
9 changes: 9 additions & 0 deletions src/photon/photon_algorithm.c
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
#include "utlist.h"

#include "state/task_table.h"
#include "state/local_scheduler_table.h"
#include "state/object_table.h"
#include "photon.h"
#include "photon_scheduler.h"
Expand Down Expand Up @@ -90,6 +91,14 @@ void free_scheduling_algorithm_state(
free(algorithm_state);
}

void provide_scheduler_info(local_scheduler_state *state,
scheduling_algorithm_state *algorithm_state,
local_scheduler_info *info) {
task_queue_entry *elt;
DL_COUNT(algorithm_state->task_queue, elt, info->task_queue_length);
info->available_workers = utarray_len(algorithm_state->available_workers);
}

/**
* Check if all of the remote object arguments for a task are available in the
* local object store.
Expand Down
8 changes: 8 additions & 0 deletions src/photon/photon_algorithm.h
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

#include "photon.h"
#include "common/task.h"
#include "state/local_scheduler_table.h"

/* The duration that the local scheduler will wait before reinitiating a fetch
* request for a missing task dependency. TODO(rkn): We may want this to be
Expand Down Expand Up @@ -33,6 +34,13 @@ scheduling_algorithm_state *make_scheduling_algorithm_state(void);
void free_scheduling_algorithm_state(
scheduling_algorithm_state *algorithm_state);

/**
*
*/
void provide_scheduler_info(local_scheduler_state *state,
scheduling_algorithm_state *algorithm_state,
local_scheduler_info *info);

/**
* This function will be called when a new task is submitted by a worker for
* execution.
Expand Down
19 changes: 19 additions & 0 deletions src/photon/photon_scheduler.c
Original file line number Diff line number Diff line change
Expand Up @@ -291,6 +291,18 @@ void handle_task_scheduled_callback(task *original_task, void *user_context) {
task_task_spec(original_task));
}

int heartbeat_handler(event_loop *loop, timer_id id, void *context) {
local_scheduler_state *state = context;
scheduling_algorithm_state *algorithm_state = state->algorithm_state;
local_scheduler_info info;
/* Ask the scheduling algorithm to fill out the scheduler info struct. */
provide_scheduler_info(state, algorithm_state, &info);
/* Publish the heartbeat to all subscribers of the local scheduler table. */
local_scheduler_table_send_info(state->db, &info, NULL);
/* Reset the timer. */
return LOCAL_SCHEDULER_HEARTBEAT_TIMEOUT_MILLISECONDS;
}

void start_server(const char *node_ip_address,
const char *socket_name,
const char *redis_addr,
Expand Down Expand Up @@ -323,6 +335,13 @@ void start_server(const char *node_ip_address,
TASK_STATUS_SCHEDULED, handle_task_scheduled_callback,
NULL, &retry, NULL, NULL);
}
/* Create a timer for publishing information about the load on the local
* scheduler to the local scheduler table. This message also serves as a
* heartbeat. */
if (g_state->db != NULL) {
event_loop_add_timer(loop, LOCAL_SCHEDULER_HEARTBEAT_TIMEOUT_MILLISECONDS,
heartbeat_handler, g_state);
}
/* Run event loop. */
event_loop_run(loop);
}
Expand Down
3 changes: 3 additions & 0 deletions src/photon/photon_scheduler.h
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@
#include "task.h"
#include "event_loop.h"

/* The duration between local scheduler heartbeats. */
#define LOCAL_SCHEDULER_HEARTBEAT_TIMEOUT_MILLISECONDS 1000

/**
* Establish a connection to a new client.
*
Expand Down

0 comments on commit 3d697c7

Please sign in to comment.