Skip to content

Commit

Permalink
Support functions to count tasks in the ready list.
Browse files Browse the repository at this point in the history
  • Loading branch information
btovar committed Apr 12, 2016
1 parent 0af8049 commit f6663dd
Showing 1 changed file with 67 additions and 0 deletions.
67 changes: 67 additions & 0 deletions work_queue/src/work_queue.c
Original file line number Diff line number Diff line change
Expand Up @@ -257,6 +257,9 @@ static struct work_queue_task *task_state_any(struct work_queue *q, work_queue_t
/* number of tasks with state */
static int task_state_count( struct work_queue *q, const char *category, work_queue_task_state_t state);

/* number of tasks in the ready list with a proper label for the category. */
static int task_ready_count(struct work_queue *q, const char *category);

static work_queue_result_code_t get_result(struct work_queue *q, struct work_queue_worker *w, const char *line);
static work_queue_result_code_t get_available_results(struct work_queue *q, struct work_queue_worker *w);

Expand Down Expand Up @@ -1803,6 +1806,50 @@ static char *blacklisted_to_string( struct work_queue *q ) {
return result;
}

static struct rmsummary *largest_waiting_task(struct work_queue *q) {
struct rmsummary *max_resources_waiting = rmsummary_create(-1);
struct work_queue_task *t;

list_first_item(q->ready_list);
while((t = list_next_item(q->ready_list))) {
if(!t->resources_requested || t->resource_request != CATEGORY_ALLOCATION_USER) {
continue;
}
rmsummary_merge_max(max_resources_waiting, t->resources_requested);
}

return max_resources_waiting;
}

static int check_worker_fit(struct work_queue_worker *w, struct rmsummary *s) {
if(s->cores > w->resources->cores.total)
return 0;
if(s->memory > w->resources->memory.total)
return 0;
if(s->disk > w->resources->disk.total)
return 0;
if(s->gpus > w->resources->gpus.total)
return 0;

return 1;
}

static int count_workers_for_waiting_tasks(struct work_queue *q, struct rmsummary *s) {

int count = 0;

char *key;
struct work_queue_worker *w;
hash_table_firstkey(q->worker_table);
while(hash_table_nextkey(q->worker_table, &key, (void**)&w)) {
if( !s || check_worker_fit(w, s) ) {
count++;
}
}

return count;
}

/* category_to_jx creates a jx expression with category statistics that can be
sent to the catalog.
*/
Expand Down Expand Up @@ -4903,6 +4950,26 @@ static int task_state_count(struct work_queue *q, const char *category, work_que
return count;
}

/* count ready tasks with a proper label for a category, or for the 'manual-label' category. */
static int task_ready_count(struct work_queue *q, const char *category) {
struct work_queue_task *t;

int count = 0;

list_first_item(q->ready_list);
while((t = list_next_item(q->ready_list))) {
if(category) {
if(t->resource_request != CATEGORY_ALLOCATION_USER && t->category && !strcmp(category, t->category)) {
count++;
}
} else if(t->resource_request == CATEGORY_ALLOCATION_USER) {
count++;
}
}

return count;
}

int work_queue_submit_internal(struct work_queue *q, struct work_queue_task *t)
{
itable_insert(q->tasks, t->taskid, t);
Expand Down

0 comments on commit f6663dd

Please sign in to comment.