Skip to content

Commit

Permalink
thread: add support for affinity (libuv#3774)
Browse files Browse the repository at this point in the history
Backported thread affinity feature and related dependency commits
from master. It will add support for those APIs: uv_cpumask_size,
uv_thread_setaffinity, uv_thread_getaffinity.
The supported platforms are Linux, Freebsd, and Windows.
Empty implementations (returning UV_ENOTSUP) on non-supported platforms
(such as OS X and AIX).
  • Loading branch information
qdaoming authored Oct 21, 2022
1 parent 357d28a commit e900006
Show file tree
Hide file tree
Showing 11 changed files with 352 additions and 0 deletions.
1 change: 1 addition & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -615,6 +615,7 @@ if(LIBUV_BUILD_TESTS)
test/test-tcp-write-to-half-open-connection.c
test/test-tcp-writealot.c
test/test-test-macros.c
test/test-thread-affinity.c
test/test-thread-equal.c
test/test-thread.c
test/test-threadpool-cancel.c
Expand Down
1 change: 1 addition & 0 deletions Makefile.am
Original file line number Diff line number Diff line change
Expand Up @@ -283,6 +283,7 @@ test_run_tests_SOURCES = test/blackhole-server.c \
test/test-test-macros.c \
test/test-thread-equal.c \
test/test-thread.c \
test/test-thread-affinity.c \
test/test-threadpool-cancel.c \
test/test-threadpool.c \
test/test-timer-again.c \
Expand Down
7 changes: 7 additions & 0 deletions docs/src/misc.rst
Original file line number Diff line number Diff line change
Expand Up @@ -365,6 +365,13 @@ API
Frees the `cpu_infos` array previously allocated with :c:func:`uv_cpu_info`.
.. c:function:: int uv_cpumask_size(void)
Returns the maximum size of the mask used for process/thread affinities,
or ``UV_ENOTSUP`` if affinities are not supported on the current platform.
.. versionadded:: 1.45.0
.. c:function:: int uv_interface_addresses(uv_interface_address_t** addresses, int* count)
Gets address information about the network interfaces on the system. An
Expand Down
31 changes: 31 additions & 0 deletions docs/src/threading.rst
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,37 @@ Threads
.. versionadded:: 1.26.0
.. c:function:: int uv_thread_setaffinity(uv_thread_t* tid, char* cpumask, char* oldmask, size_t mask_size)
Sets the specified thread's affinity to cpumask, which is specified in
bytes. Optionally returning the previous affinity setting in oldmask.
On Unix, uses :man:`pthread_getaffinity_np(3)` to get the affinity setting
and maps the cpu_set_t to bytes in oldmask. Then maps the bytes in cpumask
to a cpu_set_t and uses :man:`pthread_setaffinity_np(3)`. On Windows, maps
the bytes in cpumask to a bitmask and uses SetThreadAffinityMask() which
returns the previous affinity setting.
The mask_size specifies the number of entries (bytes) in cpumask / oldmask,
and must be greater-than-or-equal-to :c:func:`uv_cpumask_size`.
.. note::
Thread affinity setting is not atomic on Windows. Unsupported on macOS.
.. versionadded:: 1.45.0
.. c:function:: int uv_thread_getaffinity(uv_thread_t* tid, char* cpumask, size_t mask_size)
Gets the specified thread's affinity setting. On Unix, this maps the
cpu_set_t returned by :man:`pthread_getaffinity_np(3)` to bytes in cpumask.
The mask_size specifies the number of entries (bytes) in cpumask,
and must be greater-than-or-equal-to :c:func:`uv_cpumask_size`.
.. note::
Thread affinity getting is not atomic on Windows. Unsupported on macOS.
.. versionadded:: 1.45.0
.. c:function:: uv_thread_t uv_thread_self(void)
.. c:function:: int uv_thread_join(uv_thread_t *tid)
.. c:function:: int uv_thread_equal(const uv_thread_t* t1, const uv_thread_t* t2)
Expand Down
8 changes: 8 additions & 0 deletions include/uv.h
Original file line number Diff line number Diff line change
Expand Up @@ -1240,6 +1240,7 @@ UV_EXTERN int uv_os_setpriority(uv_pid_t pid, int priority);
UV_EXTERN unsigned int uv_available_parallelism(void);
UV_EXTERN int uv_cpu_info(uv_cpu_info_t** cpu_infos, int* count);
UV_EXTERN void uv_free_cpu_info(uv_cpu_info_t* cpu_infos, int count);
UV_EXTERN int uv_cpumask_size(void);

UV_EXTERN int uv_interface_addresses(uv_interface_address_t** addresses,
int* count);
Expand Down Expand Up @@ -1782,6 +1783,13 @@ UV_EXTERN int uv_thread_create_ex(uv_thread_t* tid,
const uv_thread_options_t* params,
uv_thread_cb entry,
void* arg);
UV_EXTERN int uv_thread_setaffinity(uv_thread_t* tid,
char* cpumask,
char* oldmask,
size_t mask_size);
UV_EXTERN int uv_thread_getaffinity(uv_thread_t* tid,
char* cpumask,
size_t mask_size);
UV_EXTERN uv_thread_t uv_thread_self(void);
UV_EXTERN int uv_thread_join(uv_thread_t *tid);
UV_EXTERN int uv_thread_equal(const uv_thread_t* t1, const uv_thread_t* t2);
Expand Down
9 changes: 9 additions & 0 deletions src/unix/core.c
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,8 @@ extern char** environ;
# include <sys/sysctl.h>
# include <sys/filio.h>
# include <sys/wait.h>
# include <sys/param.h>
# include <sys/cpuset.h>
# if defined(__FreeBSD__)
# define uv__accept4 accept4
# endif
Expand Down Expand Up @@ -1420,6 +1422,13 @@ uv_pid_t uv_os_getppid(void) {
return getppid();
}

int uv_cpumask_size(void) {
#if defined(__linux__) || defined(__FreeBSD__)
return CPU_SETSIZE;
#else
return UV_ENOTSUP;
#endif
}

int uv_os_getpriority(uv_pid_t pid, int* priority) {
int r;
Expand Down
98 changes: 98 additions & 0 deletions src/unix/thread.c
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,17 @@
#include <gnu/libc-version.h> /* gnu_get_libc_version() */
#endif

#if defined(__linux__)
# include <sched.h>
# define uv__cpu_set_t cpu_set_t
#elif defined(__FreeBSD__)
# include <sys/param.h>
# include <sys/cpuset.h>
# include <pthread_np.h>
# define uv__cpu_set_t cpuset_t
#endif


#undef NANOSEC
#define NANOSEC ((uint64_t) 1e9)

Expand Down Expand Up @@ -284,6 +295,93 @@ int uv_thread_create_ex(uv_thread_t* tid,
return UV__ERR(err);
}

#if defined(__linux__) || defined(__FreeBSD__)

int uv_thread_setaffinity(uv_thread_t* tid,
char* cpumask,
char* oldmask,
size_t mask_size) {
int i;
int r;
uv__cpu_set_t cpuset;
int cpumasksize;

cpumasksize = uv_cpumask_size();
if (cpumasksize < 0)
return cpumasksize;
if (mask_size < (size_t)cpumasksize)
return UV_EINVAL;

if (oldmask != NULL) {
r = uv_thread_getaffinity(tid, oldmask, mask_size);
if (r < 0)
return r;
}

CPU_ZERO(&cpuset);
for (i = 0; i < cpumasksize; i++)
if (cpumask[i])
CPU_SET(i, &cpuset);

#if defined(__ANDROID__)
if (sched_setaffinity(pthread_gettid_np(*tid), sizeof(cpuset), &cpuset))
r = errno;
else
r = 0;
#else
r = pthread_setaffinity_np(*tid, sizeof(cpuset), &cpuset);
#endif

return UV__ERR(r);
}


int uv_thread_getaffinity(uv_thread_t* tid,
char* cpumask,
size_t mask_size) {
int r;
int i;
uv__cpu_set_t cpuset;
int cpumasksize;

cpumasksize = uv_cpumask_size();
if (cpumasksize < 0)
return cpumasksize;
if (mask_size < (size_t)cpumasksize)
return UV_EINVAL;

CPU_ZERO(&cpuset);
#if defined(__ANDROID__)
if (sched_getaffinity(pthread_gettid_np(*tid), sizeof(cpuset), &cpuset))
r = errno;
else
r = 0;
#else
r = pthread_getaffinity_np(*tid, sizeof(cpuset), &cpuset);
#endif
if (r)
return UV__ERR(r);
for (i = 0; i < cpumasksize; i++)
cpumask[i] = !!CPU_ISSET(i, &cpuset);

return 0;
}
#else
int uv_thread_setaffinity(uv_thread_t* tid,
char* cpumask,
char* oldmask,
size_t mask_size) {
return UV_ENOTSUP;
}


int uv_thread_getaffinity(uv_thread_t* tid,
char* cpumask,
size_t mask_size) {
return UV_ENOTSUP;
}
#endif /* defined(__linux__) || defined(UV_BSD_H) */


uv_thread_t uv_thread_self(void) {
return pthread_self();
Expand Down
72 changes: 72 additions & 0 deletions src/win/thread.c
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,78 @@ int uv_thread_create_ex(uv_thread_t* tid,
return UV_EIO;
}

int uv_thread_setaffinity(uv_thread_t* tid,
char* cpumask,
char* oldmask,
size_t mask_size) {
int i;
HANDLE hproc;
DWORD_PTR procmask;
DWORD_PTR sysmask;
DWORD_PTR threadmask;
DWORD_PTR oldthreadmask;
int cpumasksize;

cpumasksize = uv_cpumask_size();
assert(cpumasksize > 0);
if (mask_size < (size_t)cpumasksize)
return UV_EINVAL;

hproc = GetCurrentProcess();
if (!GetProcessAffinityMask(hproc, &procmask, &sysmask))
return uv_translate_sys_error(GetLastError());

threadmask = 0;
for (i = 0; i < cpumasksize; i++) {
if (cpumask[i]) {
if (procmask & (1 << i))
threadmask |= 1 << i;
else
return UV_EINVAL;
}
}

oldthreadmask = SetThreadAffinityMask(*tid, threadmask);
if (oldthreadmask == 0)
return uv_translate_sys_error(GetLastError());

if (oldmask != NULL) {
for (i = 0; i < cpumasksize; i++)
oldmask[i] = (oldthreadmask >> i) & 1;
}

return 0;
}

int uv_thread_getaffinity(uv_thread_t* tid,
char* cpumask,
size_t mask_size) {
int i;
HANDLE hproc;
DWORD_PTR procmask;
DWORD_PTR sysmask;
DWORD_PTR threadmask;
int cpumasksize;

cpumasksize = uv_cpumask_size();
assert(cpumasksize > 0);
if (mask_size < (size_t)cpumasksize)
return UV_EINVAL;

hproc = GetCurrentProcess();
if (!GetProcessAffinityMask(hproc, &procmask, &sysmask))
return uv_translate_sys_error(GetLastError());

threadmask = SetThreadAffinityMask(*tid, procmask);
if (threadmask == 0 || SetThreadAffinityMask(*tid, threadmask) == 0)
return uv_translate_sys_error(GetLastError());

for (i = 0; i < cpumasksize; i++)
cpumask[i] = (threadmask >> i) & 1;

return 0;
}


uv_thread_t uv_thread_self(void) {
uv_thread_t key;
Expand Down
7 changes: 7 additions & 0 deletions test/task.h
Original file line number Diff line number Diff line change
Expand Up @@ -370,4 +370,11 @@ UNUSED static int can_ipv6(void) {
"Cygwin runtime hangs on listen+connect in same process."
#endif

#if !defined(__linux__) && \
!defined(__FreeBSD__) && \
!defined(_WIN32)
# define NO_CPU_AFFINITY \
"affinity not supported on this platform."
#endif

#endif /* TASK_H_ */
2 changes: 2 additions & 0 deletions test/test-list.h
Original file line number Diff line number Diff line change
Expand Up @@ -450,6 +450,7 @@ TEST_DECLARE (thread_rwlock)
TEST_DECLARE (thread_rwlock_trylock)
TEST_DECLARE (thread_create)
TEST_DECLARE (thread_equal)
TEST_DECLARE (thread_affinity)
TEST_DECLARE (dlerror)
#if (defined(__unix__) || (defined(__APPLE__) && defined(__MACH__))) && \
!defined(__sun)
Expand Down Expand Up @@ -1122,6 +1123,7 @@ TASK_LIST_START
TEST_ENTRY (thread_rwlock_trylock)
TEST_ENTRY (thread_create)
TEST_ENTRY (thread_equal)
TEST_ENTRY (thread_affinity)
TEST_ENTRY (dlerror)
TEST_ENTRY (ip4_addr)
TEST_ENTRY (ip6_addr_link_local)
Expand Down
Loading

0 comments on commit e900006

Please sign in to comment.