Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
* also update first allocation when using max mode

* consolidate category_{bucketing,}_dynamic_task_* to category_task

* round up to histogram bucket size for "classical modes"

* add test

* add factory to PATH

* add worker to PATH

* fix max -> min

* set timeout on test

* disable on mac because the monitor does not work there
  • Loading branch information
btovar authored Apr 5, 2023
1 parent 084c64f commit 4150934
Show file tree
Hide file tree
Showing 8 changed files with 176 additions and 67 deletions.
64 changes: 14 additions & 50 deletions dttools/src/category.c
Original file line number Diff line number Diff line change
Expand Up @@ -370,6 +370,9 @@ int64_t category_first_allocation_min_waste(struct histogram *h, int64_t top_res
free(times_accum);
free(keys);

/* round up to bucket size */
a_1 = histogram_round_up(h, a_1);

return a_1;
}

Expand Down Expand Up @@ -440,6 +443,9 @@ int64_t category_first_allocation_max_throughput(struct histogram *h, int64_t to
free(times_accum);
free(keys);

/* round up to bucket size */
a_1 = histogram_round_up(h, a_1);

return a_1;
}

Expand Down Expand Up @@ -832,51 +838,9 @@ category_allocation_t category_next_label(struct category *c, category_allocatio
return current_label;
}

const struct rmsummary *category_dynamic_task_max_resources(struct category *c, struct rmsummary *user, category_allocation_t request) {
/* we keep an internal label so that the caller does not have to worry
* about memory leaks. */
static struct rmsummary *internal = NULL;

if(internal) {
rmsummary_delete(internal);
}

internal = rmsummary_create(-1);

if(c->allocation_mode != CATEGORY_ALLOCATION_MODE_FIXED &&
c->allocation_mode != CATEGORY_ALLOCATION_MODE_MAX) {
if (category_in_steady_state(c) &&
(c->allocation_mode == CATEGORY_ALLOCATION_MODE_MIN_WASTE ||
c->allocation_mode == CATEGORY_ALLOCATION_MODE_MAX_THROUGHPUT))
{
/* load max seen values, but only if not in fixed or max mode.
* In max mode, max seen is the first allocation, and next allocation
* is to use whole workers. */
rmsummary_merge_override(internal, c->max_resources_seen);

/* Never go below what first_allocation computer */
rmsummary_merge_max(internal, c->first_allocation);
}
}

/* load explicit category max values */
rmsummary_merge_override(internal, c->max_allocation);

if(category_in_steady_state(c) &&
(c->allocation_mode == CATEGORY_ALLOCATION_MODE_MIN_WASTE ||
c->allocation_mode ==CATEGORY_ALLOCATION_MODE_MAX_THROUGHPUT) &&
request == CATEGORY_ALLOCATION_FIRST) {
rmsummary_merge_override(internal, c->first_allocation);
}

/* chip in user values if explicitly given */
rmsummary_merge_override(internal, user);

return internal;
}

//taskid >=0 means real task needs prediction, -1 means function called for other purposes
const struct rmsummary *category_bucketing_dynamic_task_max_resources(struct category *c, struct rmsummary *user, category_allocation_t request, int taskid) {
const struct rmsummary *category_task_max_resources(struct category *c, struct rmsummary *user, category_allocation_t request, int taskid) {
/* we keep an internal label so that the caller does not have to worry
* about memory leaks. */
static struct rmsummary *internal = NULL;
Expand All @@ -898,7 +862,7 @@ const struct rmsummary *category_bucketing_dynamic_task_max_resources(struct cat
* is to use whole workers. */
rmsummary_merge_override(internal, c->max_resources_seen);

/* Never go below what first_allocation computer */
/* Never go below what first_allocation computed */
rmsummary_merge_max(internal, c->first_allocation);
}
else if (taskid >= 0 && category_in_bucketing_mode(c))
Expand All @@ -912,10 +876,10 @@ const struct rmsummary *category_bucketing_dynamic_task_max_resources(struct cat
/* load explicit category max values */
rmsummary_merge_override(internal, c->max_allocation);

if(category_in_steady_state(c) &&
(c->allocation_mode == CATEGORY_ALLOCATION_MODE_MIN_WASTE ||
c->allocation_mode ==CATEGORY_ALLOCATION_MODE_MAX_THROUGHPUT) &&
request == CATEGORY_ALLOCATION_FIRST) {
if(category_in_steady_state(c) && request == CATEGORY_ALLOCATION_FIRST &&
(c->allocation_mode == CATEGORY_ALLOCATION_MODE_MIN_WASTE ||
c->allocation_mode ==CATEGORY_ALLOCATION_MODE_MAX_THROUGHPUT ||
c->allocation_mode ==CATEGORY_ALLOCATION_MODE_MAX)) {
rmsummary_merge_override(internal, c->first_allocation);
}

Expand All @@ -925,9 +889,9 @@ const struct rmsummary *category_bucketing_dynamic_task_max_resources(struct cat
return internal;
}

const struct rmsummary *category_dynamic_task_min_resources(struct category *c, struct rmsummary *user, category_allocation_t request) {
const struct rmsummary *category_task_min_resources(struct category *c, struct rmsummary *user, category_allocation_t request, int taskid) {
static struct rmsummary *internal = NULL;
const struct rmsummary *allocation = category_dynamic_task_max_resources(c, user, request);
const struct rmsummary *allocation = category_task_max_resources(c, user, request, taskid);

if(internal) {
rmsummary_delete(internal);
Expand Down
6 changes: 2 additions & 4 deletions dttools/src/category.h
Original file line number Diff line number Diff line change
Expand Up @@ -131,10 +131,8 @@ int category_in_steady_state(struct category *c);

category_allocation_t category_next_label(struct category *c, category_allocation_t current_label, int resource_overflow, struct rmsummary *user, struct rmsummary *measured);

const struct rmsummary *category_dynamic_task_max_resources(struct category *c, struct rmsummary *user, category_allocation_t request);
const struct rmsummary *category_task_max_resources(struct category *c, struct rmsummary *user, category_allocation_t request, int taskid);

const struct rmsummary *category_bucketing_dynamic_task_max_resources(struct category *c, struct rmsummary *user, category_allocation_t request, int taskid);

const struct rmsummary *category_dynamic_task_min_resources(struct category *c, struct rmsummary *user, category_allocation_t request);
const struct rmsummary *category_task_min_resources(struct category *c, struct rmsummary *user, category_allocation_t request, int taskid);

#endif
2 changes: 1 addition & 1 deletion makeflow/src/dag_node.c
Original file line number Diff line number Diff line change
Expand Up @@ -421,7 +421,7 @@ struct jx * dag_node_env_create( struct dag *d, struct dag_node *n, int should_s
/* Return resources according to request. */

const struct rmsummary *dag_node_dynamic_label(const struct dag_node *n) {
return category_dynamic_task_max_resources(n->category, n->resources_requested, n->resource_request);
return category_task_max_resources(n->category, n->resources_requested, n->resource_request, -1);
}

/* Return JX object containing cmd, inputs, outputs, env, and resources. */
Expand Down
6 changes: 3 additions & 3 deletions taskvine/src/manager/vine_manager.c
Original file line number Diff line number Diff line change
Expand Up @@ -4964,13 +4964,13 @@ const struct rmsummary *vine_manager_task_resources_max(struct vine_manager *q,

struct category *c = vine_category_lookup_or_create(q, t->category);

return category_bucketing_dynamic_task_max_resources(c, t->resources_requested, t->resource_request, t->task_id);
return category_task_max_resources(c, t->resources_requested, t->resource_request, t->task_id);
}

const struct rmsummary *vine_manager_task_resources_min(struct vine_manager *q, struct vine_task *t) {
struct category *c = vine_category_lookup_or_create(q, t->category);

const struct rmsummary *s = category_dynamic_task_min_resources(c, t->resources_requested, t->resource_request);
const struct rmsummary *s = category_task_min_resources(c, t->resources_requested, t->resource_request, t->task_id);

if(t->resource_request != CATEGORY_ALLOCATION_FIRST || !q->current_max_worker) {
return s;
Expand All @@ -4989,7 +4989,7 @@ const struct rmsummary *vine_manager_task_resources_min(struct vine_manager *q,
rmsummary_merge_override(r, q->current_max_worker);
rmsummary_merge_override(r, t->resources_requested);

s = category_dynamic_task_min_resources(c, r, t->resource_request);
s = category_task_min_resources(c, r, t->resource_request, t->task_id);
rmsummary_delete(r);
}

Expand Down
6 changes: 3 additions & 3 deletions taskvine/src/manager/vine_txn_log.c
Original file line number Diff line number Diff line change
Expand Up @@ -160,12 +160,12 @@ void vine_txn_log_write_category(struct vine_manager *q, struct category *c)
buffer_init(&B);

buffer_printf(&B, "CATEGORY %s MAX ", c->name);
rmsummary_print_buffer(&B, category_bucketing_dynamic_task_max_resources(c, NULL, CATEGORY_ALLOCATION_MAX, -1), 1);
rmsummary_print_buffer(&B, category_task_max_resources(c, NULL, CATEGORY_ALLOCATION_MAX, -1), 1);
vine_txn_log_write(q, buffer_tostring(&B));
buffer_rewind(&B, 0);

buffer_printf(&B, "CATEGORY %s MIN ", c->name);
rmsummary_print_buffer(&B, category_dynamic_task_min_resources(c, NULL, CATEGORY_ALLOCATION_FIRST), 1);
rmsummary_print_buffer(&B, category_task_min_resources(c, NULL, CATEGORY_ALLOCATION_FIRST, -1), 1);
vine_txn_log_write(q, buffer_tostring(&B));
buffer_rewind(&B, 0);

Expand Down Expand Up @@ -194,7 +194,7 @@ void vine_txn_log_write_category(struct vine_manager *q, struct category *c)
}

buffer_printf(&B, "CATEGORY %s FIRST %s ", c->name, mode);
rmsummary_print_buffer(&B, category_bucketing_dynamic_task_max_resources(c, NULL, CATEGORY_ALLOCATION_FIRST, -1), 1);
rmsummary_print_buffer(&B, category_task_max_resources(c, NULL, CATEGORY_ALLOCATION_FIRST, -1), 1);
vine_txn_log_write(q, buffer_tostring(&B));

buffer_free(&B);
Expand Down
65 changes: 65 additions & 0 deletions taskvine/test/TR_vine_auto_modes.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
#!/bin/sh

set -e

. ../../dttools/test/test_runner_common.sh

import_config_val CCTOOLS_PYTHON_TEST_EXEC
import_config_val CCTOOLS_PYTHON_TEST_DIR
import_config_val CCTOOLS_OPSYS

export PYTHONPATH=$(pwd)/../src/bindings/${CCTOOLS_PYTHON_TEST_DIR}:$PYTHONPATH
export PATH=$(pwd)/../../batch_job/src:$PATH
export PATH=$(pwd)/../src/worker:$PATH

STATUS_FILE=vine.status

check_needed()
{
[ -n "${CCTOOLS_PYTHON_TEST_EXEC}" ] || return 1

# disable on mac because the resource_monitor does not work there
[ "${CCTOOLS_OPSYS}" = DARWIN ] && return 1
}

prepare()
{
rm -f $STATUS_FILE

return 0
}

run()
{
(${CCTOOLS_PYTHON_TEST_EXEC} auto_modes.py; echo $? > $STATUS_FILE) &

wait_for_file_creation $STATUS_FILE 30

# retrieve exit status
status=$(cat $STATUS_FILE)
if [ $status -ne 0 ]
then
# display log files in case of failure.
logfile=$(latest_vine_debug_log)
if [ -f ${logfile} ]
then
echo "manager log:"
cat ${logfile}
fi

exit 1
fi

exit 0
}

clean()
{
rm -f $STATUS_FILE
rm -rf vine-run-info

exit 0
}


dispatch "$@"
82 changes: 82 additions & 0 deletions taskvine/test/auto_modes.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
#! /usr/bin/env python

import taskvine as vine
import sys
import time

m = vine.Manager(port=0, ssl=True)
m.enable_monitoring()

m.tune("category-steady-n-tasks", 1)

bucket_size = 250
worker = {
"cores": 4,
"memory": bucket_size * 4,
"disk": bucket_size * 8
}

factory = vine.Factory(manager=m)
factory.max_workers = 1
factory.min_workers = 1
factory.cores = worker["cores"]
factory.memory = worker["memory"]
factory.disk = worker["disk"]

modes = {
"max": vine.VINE_ALLOCATION_MODE_MAX,
"thr": vine.VINE_ALLOCATION_MODE_MAX_THROUGHPUT,
"wst": vine.VINE_ALLOCATION_MODE_MIN_WASTE,
}

expected_proportions = {
"max": 0.5, # half of the disk, so half of the resources
"thr": 1/worker["cores"],
"wst": 1/worker["cores"]
}

error_found = False

last_returned_time = time.time()

with factory:
for (category, mode) in modes.items():
m.set_category_mode(category, mode)

# first task needs little less than half of the disk, which should round up to half the disk
t = vine.Task(
f"dd count=0 bs={int(worker['disk']/2.5)}M seek=1 of=nulls")
t.set_category(category)
m.submit(t)

for i in range(10):
t = vine.Task("dd count=0 bs=1M seek=1 of=nulls")
t.set_category(category)
m.submit(t)

print(f"\n{category}: ", end="")
while not m.empty():
t = m.wait(5)
if t:
print(".", end="")
last_returned_time = time.time()

# if no task for 15s, something went wrong with the test
if time.time() - last_returned_time > 15:
print("\nno task finished recently")
sys.exit(1)

rs = "cores memory disk".split()
mr = {r: getattr(t.resources_measured, r) for r in rs}
ar = {r: getattr(t.resources_allocated, r) for r in rs}

print("")
for r in rs:
sign = "="
expected = expected_proportions[category] * worker[r]
if ar[r] != expected:
error_found = True
sign = "!"
print(f"{r} measured {mr[r]}, allocated {ar[r]} {sign}= {expected}")

sys.exit(error_found)
12 changes: 6 additions & 6 deletions work_queue/src/work_queue.c
Original file line number Diff line number Diff line change
Expand Up @@ -7808,12 +7808,12 @@ static void write_transaction_category(struct work_queue *q, struct category *c)
buffer_init(&B);

buffer_printf(&B, "CATEGORY %s MAX ", c->name);
rmsummary_print_buffer(&B, category_bucketing_dynamic_task_max_resources(c, NULL, CATEGORY_ALLOCATION_MAX, -1), 1);
rmsummary_print_buffer(&B, category_task_max_resources(c, NULL, CATEGORY_ALLOCATION_MAX, -1), 1);
write_transaction(q, buffer_tostring(&B));
buffer_rewind(&B, 0);

buffer_printf(&B, "CATEGORY %s MIN ", c->name);
rmsummary_print_buffer(&B, category_dynamic_task_min_resources(c, NULL, CATEGORY_ALLOCATION_FIRST), 1);
rmsummary_print_buffer(&B, category_task_min_resources(c, NULL, CATEGORY_ALLOCATION_FIRST, -1), 1);
write_transaction(q, buffer_tostring(&B));
buffer_rewind(&B, 0);

Expand Down Expand Up @@ -7842,7 +7842,7 @@ static void write_transaction_category(struct work_queue *q, struct category *c)
}

buffer_printf(&B, "CATEGORY %s FIRST %s ", c->name, mode);
rmsummary_print_buffer(&B, category_bucketing_dynamic_task_max_resources(c, NULL, CATEGORY_ALLOCATION_FIRST, -1), 1);
rmsummary_print_buffer(&B, category_task_max_resources(c, NULL, CATEGORY_ALLOCATION_FIRST, -1), 1);
write_transaction(q, buffer_tostring(&B));

buffer_free(&B);
Expand Down Expand Up @@ -8098,13 +8098,13 @@ const struct rmsummary *task_max_resources(struct work_queue *q, struct work_que

struct category *c = work_queue_category_lookup_or_create(q, t->category);

return category_bucketing_dynamic_task_max_resources(c, t->resources_requested, t->resource_request, t->taskid);
return category_task_max_resources(c, t->resources_requested, t->resource_request, t->taskid);
}

const struct rmsummary *task_min_resources(struct work_queue *q, struct work_queue_task *t) {
struct category *c = work_queue_category_lookup_or_create(q, t->category);

const struct rmsummary *s = category_dynamic_task_min_resources(c, t->resources_requested, t->resource_request);
const struct rmsummary *s = category_task_min_resources(c, t->resources_requested, t->resource_request, t->taskid);

if(t->resource_request != CATEGORY_ALLOCATION_FIRST || !q->current_max_worker) {
return s;
Expand All @@ -8123,7 +8123,7 @@ const struct rmsummary *task_min_resources(struct work_queue *q, struct work_que
rmsummary_merge_override(r, q->current_max_worker);
rmsummary_merge_override(r, t->resources_requested);

s = category_dynamic_task_min_resources(c, r, t->resource_request);
s = category_task_min_resources(c, r, t->resource_request, t->taskid);
rmsummary_delete(r);
}

Expand Down

0 comments on commit 4150934

Please sign in to comment.