Skip to content

Commit

Permalink
Fix timeout support in multithreading
Browse files Browse the repository at this point in the history
The clone wrapper was not used for thread creation.
Thus threads were not registering themselves.
Now a thread register at its first MPI call and the helper thread check
if they are alive or not for unregistration

Add a timeout test using another thread.
Fix the existing timeout test to compile without warning and be more
strict in checking the output
  • Loading branch information
kevin-juilly committed Feb 15, 2024
1 parent 9de05c9 commit a239ea9
Show file tree
Hide file tree
Showing 8 changed files with 70 additions and 53 deletions.
1 change: 1 addition & 0 deletions Testing/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ add_subdirectory (mpirun)
add_subdirectory (mpi_init)
if(WI4MPI_TIMEOUT)
add_subdirectory (timeout)
add_subdirectory (timeout_thread)
endif()
add_subdirectory (mpiio)
add_subdirectory (mpi_max)
3 changes: 1 addition & 2 deletions Testing/timeout/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -9,5 +9,4 @@ file (COPY ${CMAKE_CURRENT_SOURCE_DIR}/run_timeout_tests.sh
add_test (NAME MPI_Reduce_timeout
COMMAND bash ${CMAKE_CURRENT_BINARY_DIR}/run_timeout_tests.sh
WORKING_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR})
set_tests_properties (MPI_Reduce_timeout PROPERTIES PASS_REGULAR_EXPRESSION 0
TIMEOUT 30)
set_tests_properties (MPI_Reduce_timeout PROPERTIES PASS_REGULAR_EXPRESSION "^0\n$" TIMEOUT 30)
1 change: 1 addition & 0 deletions Testing/timeout/timeout_slow_add.c
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
#include <mpi.h>
#include <unistd.h>
void slow_add(void *in, void *out, int *len, MPI_Datatype *dat) {
int i;
int *iin = (int *)in;
Expand Down
13 changes: 13 additions & 0 deletions Testing/timeout_thread/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
project (timeout_omp)
cmake_minimum_required (VERSION 3.0.0)

file (COPY ${CMAKE_CURRENT_SOURCE_DIR}/timeout_omp_slow_add.cpp
DESTINATION ${CMAKE_CURRENT_BINARY_DIR})
file (COPY ${CMAKE_CURRENT_SOURCE_DIR}/run_timeout_omp_tests.sh
DESTINATION ${CMAKE_CURRENT_BINARY_DIR})

add_test (NAME MPI_Reduce_timeout_multithread
COMMAND bash ${CMAKE_CURRENT_BINARY_DIR}/run_timeout_omp_tests.sh
WORKING_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR})
set_tests_properties (MPI_Reduce_timeout_multithread PROPERTIES PASS_REGULAR_EXPRESSION "^0\n$"
TIMEOUT 30)
13 changes: 13 additions & 0 deletions Testing/timeout_thread/run_timeout_omp_tests.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
#!/bin/bash

source ../etc/mpivars.sh || exit 1


mpicxx timeout_omp_slow_add.cpp -o timeout_omp_slow_add

export WI4MPI_Reduce_timeout=1
${WI4MPI_MPRUN} ${WI4MPI_NPROC} ${WI4MPI_NCORE} ${WI4MPI_PARTITION} ${WI4MPI_EXTRA_OPTS} ./timeout_omp_slow_add &> timeout_omp_slow_add.log

grep -q 'processus .* on host .* has reached a timeout' timeout_omp_slow_add.log

echo $?
29 changes: 29 additions & 0 deletions Testing/timeout_thread/timeout_omp_slow_add.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
#include <mpi.h>
#include <unistd.h>
#include <thread>

void caller_mpi(int rank, MPI_Op slow_op) {
int srank;
MPI_Reduce(&rank, &srank, 1, MPI_INT, slow_op, 0, MPI_COMM_WORLD);
}
void slow_add(void *in, void *out, int *len, MPI_Datatype *dat) {
int i;
int *iin = (int *)in;
int *iout = (int *)out;
(*iout) = 0;
for (i = 0; i < *len; i++) {
(*iout) += iin[i];
if (*iin == 0)
sleep(10);
}
}
int main(int argc, char **argv) {
int rank;
MPI_Op slow_op;
MPI_Init(&argc, &argv);
MPI_Comm_rank(MPI_COMM_WORLD, &rank);
MPI_Op_create(&slow_add, 1, &slow_op);
std::thread t1{caller_mpi, rank, slow_op};
t1.join();
MPI_Finalize();
}
61 changes: 11 additions & 50 deletions src/common/helper.c
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
#include <sched.h>
#include <stdio.h>
#include <sys/syscall.h>
#include <errno.h>
int gettid(void) { return syscall(SYS_gettid); }
/* data structure to keep information on timeout for each thread, the linked
* list property is use only by controler thread*/
Expand All @@ -29,7 +30,6 @@ int timeout_thread_end;

void* wi4mpi_timeout_main_loop(void *);
void wi4mpi_timeout_thread_register(int th);
void wi4mpi_timeout_thread_unregister();
unsigned long long gettimestamp(void) {
#ifdef __aarch64__
int64_t t;
Expand All @@ -41,51 +41,6 @@ unsigned long long gettimestamp(void) {
return ((unsigned long long)a) | (((long long)d) << 32);
#endif
}
/*libc clone is a weak symbol of __clone, we use a wrapper function to clone in
* order to do the registration/unregistration of the threads*/
struct clone_arg {
int (*fn)(void *arg);
void *arg;
};
/*registation wrapper*/
int clone_wrap_fn(void *arg) {
int ret;
struct clone_arg *casted_arg = (struct clone_arg *)arg;
int (*fn)(void *arg) = casted_arg->fn;
void *rarg = casted_arg->arg;
/* can be freed here */
free(arg);
/* to avoid registation of timout control thread*/
if (fn != (int (*)(void *arg))wi4mpi_timeout_main_loop)
wi4mpi_timeout_thread_register(gettid());
ret = fn(rarg);
/* to avoid unregistation of timout control thread*/
if (fn != (int (*)(void *arg))wi4mpi_timeout_main_loop)
wi4mpi_timeout_thread_unregister();
return ret;
}
int __clone(int (*fn)(void *arg), void *child_stack, int flags, void *arg, ...);
int clone(int (*fn)(void *arg), void *child_stack, int flags, void *arg, ...) {
int ret;
pid_t *ptid;
void *tls;
pid_t *ctid;
va_list va;
va_start(va, arg);
ptid = va_arg(va, pid_t *);
tls = va_arg(va, void *);
ctid = va_arg(va, pid_t *);
/* will be freed in the created thread */
struct clone_arg *warg = malloc(sizeof(struct clone_arg));
warg->fn = fn;
warg->arg = arg;
ret = __clone(clone_wrap_fn, child_stack, flags, warg, ptid, tls, ctid);
if (ret == -1) {
/* clone didn't work so wrag hasn't be freed */
free(warg);
}
return ret;
}
th_reg_list *last_elt;
/* each thread has a pointer on is own control structure*/
__thread th_reg_list *my_elt;
Expand All @@ -96,10 +51,16 @@ void* wi4mpi_timeout_main_loop(void *felement) {
pthread_mutex_lock(&mutex_list_lock);
my_elt = (th_reg_list *)felement;
pthread_mutex_unlock(&mutex_list_lock);
unsigned long long next_check = gettimestamp() + 0xfffffu;
while (!timeout_thread_end) {
unsigned long long ts = gettimestamp();
th_reg_list *otmp;
for (th_reg_list *tmp = my_elt; tmp != NULL; tmp = tmp->next) {
// check if the thread still exist, for unregistration
int kret = kill(tmp->tid, 0);
if (kret == -1 && errno == ESRCH) {
tmp->active = 0;
}
if (tmp->active) {
if (ts >= tmp->timeout) {
/* effective timeout kill the responsible thread*/
Expand Down Expand Up @@ -132,6 +93,10 @@ void* wi4mpi_timeout_main_loop(void *felement) {
}

void wi4mpi_set_timeout(unsigned long long timeout_val) {
// First time the thread does a call to MPI, register it
if (!my_elt) {
wi4mpi_timeout_thread_register(gettid());
}
/* may need a memfence to ensure that compiler doen't do timeout =
ts;timeout+=timeout_val; or in the reverse order*/
unsigned long long ts = gettimestamp();
Expand All @@ -156,10 +121,6 @@ void wi4mpi_timeout_thread_register(int th) {
pthread_mutex_unlock(&mutex_list_lock);
tmp->next = my_elt;
}
void wi4mpi_timeout_thread_unregister() {
/*effective destruction is done in helper thread to avoid list corruption*/
my_elt->active = 0;
}
__attribute__((constructor)) void timeout_init(void) {
pthread_t timeout_thread;
/*register the main thread */
Expand Down
2 changes: 1 addition & 1 deletion src/common/helper.h
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
#ifndef __TIMEOUT_HELPER_H__
#define __TIMEOUT_HELPER_H__ 1
#define WI4MPI_MAX_TIME ((unsigned long long)0xffffffffffffffff)
#define WI4MPI_MAX_TIME (0xffffffffffffffffull)
#define WI4MPI_TIME_PREC 100
#define helper_sleep usleep(WI4MPI_TIME_PREC)
void wi4mpi_set_timeout(unsigned long long);
Expand Down

0 comments on commit a239ea9

Please sign in to comment.