Skip to content

Commit

Permalink
[threading] Switch to a native implementation of LowLevelLifoSemaphore (
Browse files Browse the repository at this point in the history
dotnet#2098)

This change is motivated on two main fronts. The first is maintainability - the current managed implementation is difficult to understand and I worry diagnosing any potential issues would be a massive time sink. The second is performance - the current implementation appears to suffer from significant lock contention when running the TechEmpower plaintext benchmark. My hope is that the simpler, cleaner native implementation here will avoid both problems, but I don't want to merge it until we have benchmarking data. However, even if the numbers are similar, I still think it's worth merging just from a maintainability perspective.

The native LifoSemaphore implementation has only ever been tested on posix-like platforms, so Windows behavior is unknown. Currently the Windows implementation of LowLevelLifoSemaphore is very different, so unless we have need for the LifoSemaphore elsewhere in the runtime this isn't a concern.

Many thanks to @filipnavara for the initial implementation.
  • Loading branch information
CoffeeFlux authored Feb 12, 2020
1 parent 084dd1a commit 3eaff60
Show file tree
Hide file tree
Showing 9 changed files with 192 additions and 104 deletions.
7 changes: 7 additions & 0 deletions src/mono/mono/metadata/icall-decl.h
Original file line number Diff line number Diff line change
Expand Up @@ -276,4 +276,11 @@ ICALL_EXPORT gpointer ves_icall_System_Threading_Semaphore_CreateSemaphore_ic
ICALL_EXPORT gpointer ves_icall_System_Threading_Semaphore_OpenSemaphore_icall (const gunichar2 *name, gint32 name_length, gint32 rights, gint32 *win32error);
ICALL_EXPORT MonoBoolean ves_icall_System_Threading_Semaphore_ReleaseSemaphore_internal (gpointer handle, gint32 releaseCount, gint32 *prevcount);

#ifdef ENABLE_NETCORE
ICALL_EXPORT gpointer ves_icall_System_Threading_LowLevelLifoSemaphore_InitInternal (void);
ICALL_EXPORT void ves_icall_System_Threading_LowLevelLifoSemaphore_DeleteInternal (gpointer sem_ptr);
ICALL_EXPORT gint32 ves_icall_System_Threading_LowLevelLifoSemaphore_TimedWaitInternal (gpointer sem_ptr, gint32 timeout_ms);
ICALL_EXPORT void ves_icall_System_Threading_LowLevelLifoSemaphore_ReleaseInternal (gpointer sem_ptr, gint32 count);
#endif

#endif // __MONO_METADATA_ICALL_DECL_H__
6 changes: 6 additions & 0 deletions src/mono/mono/metadata/icall-def-netcore.h
Original file line number Diff line number Diff line change
Expand Up @@ -467,6 +467,12 @@ NOHANDLES(ICALL(ILOCK_21, "Increment(long&)", ves_icall_System_Threading_Interlo
NOHANDLES(ICALL(ILOCK_22, "MemoryBarrierProcessWide", ves_icall_System_Threading_Interlocked_MemoryBarrierProcessWide))
NOHANDLES(ICALL(ILOCK_23, "Read(long&)", ves_icall_System_Threading_Interlocked_Read_Long))

ICALL_TYPE(LIFOSEM, "System.Threading.LowLevelLifoSemaphore", LIFOSEM_1)
NOHANDLES(ICALL(LIFOSEM_1, "DeleteInternal", ves_icall_System_Threading_LowLevelLifoSemaphore_DeleteInternal))
NOHANDLES(ICALL(LIFOSEM_2, "InitInternal", ves_icall_System_Threading_LowLevelLifoSemaphore_InitInternal))
NOHANDLES(ICALL(LIFOSEM_3, "ReleaseInternal", ves_icall_System_Threading_LowLevelLifoSemaphore_ReleaseInternal))
NOHANDLES(ICALL(LIFOSEM_4, "TimedWaitInternal", ves_icall_System_Threading_LowLevelLifoSemaphore_TimedWaitInternal))

ICALL_TYPE(MONIT, "System.Threading.Monitor", MONIT_0)
HANDLES(MONIT_0, "Enter", ves_icall_System_Threading_Monitor_Monitor_Enter, void, 1, (MonoObject))
HANDLES(MONIT_1, "Exit", mono_monitor_exit_icall, void, 1, (MonoObject))
Expand Down
29 changes: 28 additions & 1 deletion src/mono/mono/metadata/threads.c
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,8 @@
#include <mono/utils/mono-state.h>
#include <mono/metadata/w32subset.h>
#include <mono/metadata/mono-config.h>
#include "mono/utils/mono-tls-inline.h"
#include <mono/utils/mono-tls-inline.h>
#include <mono/utils/lifo-semaphore.h>

#ifdef HAVE_SYS_WAIT_H
#include <sys/wait.h>
Expand Down Expand Up @@ -6774,4 +6775,30 @@ ves_icall_System_Threading_Thread_GetCurrentProcessorNumber (MonoError *error)
return mono_native_thread_processor_id_get ();
}

gpointer
ves_icall_System_Threading_LowLevelLifoSemaphore_InitInternal (void)
{
return (gpointer)mono_lifo_semaphore_init ();
}

void
ves_icall_System_Threading_LowLevelLifoSemaphore_DeleteInternal (gpointer sem_ptr)
{
LifoSemaphore *sem = (LifoSemaphore *)sem_ptr;
mono_lifo_semaphore_delete (sem);
}

gint32
ves_icall_System_Threading_LowLevelLifoSemaphore_TimedWaitInternal (gpointer sem_ptr, gint32 timeout_ms)
{
LifoSemaphore *sem = (LifoSemaphore *)sem_ptr;
return mono_lifo_semaphore_timed_wait (sem, timeout_ms);
}

void
ves_icall_System_Threading_LowLevelLifoSemaphore_ReleaseInternal (gpointer sem_ptr, gint32 count)
{
LifoSemaphore *sem = (LifoSemaphore *)sem_ptr;
mono_lifo_semaphore_release (sem, count);
}
#endif
2 changes: 2 additions & 0 deletions src/mono/mono/utils/Makefile.am
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,8 @@ monoutils_sources = \
mono-stack-unwinding.h \
hazard-pointer.c \
hazard-pointer.h \
lifo-semaphore.c \
lifo-semaphore.h \
lock-free-queue.c \
lock-free-queue.h \
lock-free-alloc.c \
Expand Down
89 changes: 89 additions & 0 deletions src/mono/mono/utils/lifo-semaphore.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
#include <mono/utils/lifo-semaphore.h>

LifoSemaphore *
mono_lifo_semaphore_init (void)
{
LifoSemaphore *semaphore = g_new0 (LifoSemaphore, 1);
if (semaphore == NULL)
return NULL;

mono_coop_mutex_init (&semaphore->mutex);

return semaphore;
}

void
mono_lifo_semaphore_delete (LifoSemaphore *semaphore)
{
g_assert (semaphore->head == NULL);
mono_coop_mutex_destroy (&semaphore->mutex);
g_free (semaphore);
}

int32_t
mono_lifo_semaphore_timed_wait (LifoSemaphore *semaphore, int32_t timeout_ms)
{
LifoSemaphoreWaitEntry wait_entry = { 0 };

mono_coop_cond_init (&wait_entry.condition);
mono_coop_mutex_lock (&semaphore->mutex);

if (semaphore->pending_signals > 0) {
--semaphore->pending_signals;
mono_coop_cond_destroy (&wait_entry.condition);
mono_coop_mutex_unlock (&semaphore->mutex);
return 1;
}

// Enqueue out entry into the LIFO wait list
wait_entry.previous = NULL;
wait_entry.next = semaphore->head;
if (semaphore->head != NULL)
semaphore->head->previous = &wait_entry;
semaphore->head = &wait_entry;

// Wait for a signal or timeout
int wait_error = 0;
do {
wait_error = mono_coop_cond_timedwait (&wait_entry.condition, &semaphore->mutex, timeout_ms);
} while (wait_error == 0 && !wait_entry.signaled);

if (wait_error == -1) {
if (semaphore->head == &wait_entry)
semaphore->head = wait_entry.next;
if (wait_entry.next != NULL)
wait_entry.next->previous = wait_entry.previous;
if (wait_entry.previous != NULL)
wait_entry.previous->next = wait_entry.next;
}

mono_coop_cond_destroy (&wait_entry.condition);
mono_coop_mutex_unlock (&semaphore->mutex);

return wait_entry.signaled;
}

void
mono_lifo_semaphore_release (LifoSemaphore *semaphore, uint32_t count)
{
mono_coop_mutex_lock (&semaphore->mutex);

while (count > 0) {
LifoSemaphoreWaitEntry *wait_entry = semaphore->head;
if (wait_entry != NULL) {
semaphore->head = wait_entry->next;
if (semaphore->head != NULL)
semaphore->head->previous = NULL;
wait_entry->previous = NULL;
wait_entry->next = NULL;
wait_entry->signaled = 1;
mono_coop_cond_signal (&wait_entry->condition);
--count;
} else {
semaphore->pending_signals += count;
count = 0;
}
}

mono_coop_mutex_unlock (&semaphore->mutex);
}
34 changes: 34 additions & 0 deletions src/mono/mono/utils/lifo-semaphore.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
#ifndef __MONO_LIFO_SEMAPHORE_H__
#define __MONO_LIFO_SEMAPHORE_H__

#include <mono/utils/mono-coop-mutex.h>

typedef struct _LifoSemaphore LifoSemaphore;
typedef struct _LifoSemaphoreWaitEntry LifoSemaphoreWaitEntry;

struct _LifoSemaphoreWaitEntry {
MonoCoopCond condition;
int signaled;
LifoSemaphoreWaitEntry *previous;
LifoSemaphoreWaitEntry *next;
};

struct _LifoSemaphore {
MonoCoopMutex mutex;
LifoSemaphoreWaitEntry *head;
uint32_t pending_signals;
};

LifoSemaphore *
mono_lifo_semaphore_init (void);

void
mono_lifo_semaphore_delete (LifoSemaphore *semaphore);

int32_t
mono_lifo_semaphore_timed_wait (LifoSemaphore *semaphore, int32_t timeout_ms);

void
mono_lifo_semaphore_release (LifoSemaphore *semaphore, uint32_t count);

#endif // __MONO_LIFO_SEMAPHORE_H__
2 changes: 2 additions & 0 deletions src/mono/msvc/libmonoutils-common.targets
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,8 @@
<ClInclude Include="$(MonoSourceLocation)\mono\utils\mono-stack-unwinding.h" />
<ClCompile Include="$(MonoSourceLocation)\mono\utils\hazard-pointer.c" />
<ClInclude Include="$(MonoSourceLocation)\mono\utils\hazard-pointer.h" />
<ClCompile Include="$(MonoSourceLocation)\mono\utils\lifo-semaphore.c" />
<ClInclude Include="$(MonoSourceLocation)\mono\utils\lifo-semaphore.h" />
<ClCompile Include="$(MonoSourceLocation)\mono\utils\lock-free-queue.c" />
<ClInclude Include="$(MonoSourceLocation)\mono\utils\lock-free-queue.h" />
<ClCompile Include="$(MonoSourceLocation)\mono\utils\lock-free-alloc.c" />
Expand Down
6 changes: 6 additions & 0 deletions src/mono/msvc/libmonoutils-common.targets.filters
Original file line number Diff line number Diff line change
Expand Up @@ -271,6 +271,12 @@
<ClInclude Include="$(MonoSourceLocation)\mono\utils\hazard-pointer.h">
<Filter>Header Files$(MonoUtilsFilterSubFolder)\common</Filter>
</ClInclude>
<ClCompile Include="$(MonoSourceLocation)\mono\utils\lifo-semaphore.c">
<Filter>Source Files$(MonoUtilsFilterSubFolder)\common</Filter>
</ClCompile>
<ClInclude Include="$(MonoSourceLocation)\mono\utils\lifo-semaphore.h">
<Filter>Header Files$(MonoUtilsFilterSubFolder)\common</Filter>
</ClInclude>
<ClCompile Include="$(MonoSourceLocation)\mono\utils\lock-free-queue.c">
<Filter>Source Files$(MonoUtilsFilterSubFolder)\common</Filter>
</ClCompile>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,130 +2,45 @@
// The .NET Foundation licenses this file to you under the MIT license.
// See the LICENSE file in the project root for more information.

using Internal.Runtime.CompilerServices;
using System.Runtime.CompilerServices;

namespace System.Threading
{
internal unsafe sealed partial class LowLevelLifoSemaphore : IDisposable
{
struct WaitEntry
{
public object condition;
public bool signaled;
public void* previous;
public void* next;
}
IntPtr lifo_semaphore;

private object mutex;
private void* head;
private uint pending_signals;
[MethodImplAttribute (MethodImplOptions.InternalCall)]
extern static IntPtr InitInternal ();

private void Create (int maximumSignalCount)
{
mutex = new object();
head = null;
pending_signals = 0;
lifo_semaphore = InitInternal ();
}

[MethodImplAttribute (MethodImplOptions.InternalCall)]
extern static void DeleteInternal (IntPtr semaphore);

public void Dispose ()
{
DeleteInternal (lifo_semaphore);
lifo_semaphore = IntPtr.Zero;
}

[MethodImplAttribute (MethodImplOptions.InternalCall)]
extern static int TimedWaitInternal (IntPtr semaphore, int timeoutMs);

private bool WaitCore (int timeoutMs)
{
WaitEntry wait_entry = new WaitEntry ();
bool mutexLocked = false;
bool waitEntryLocked = false;

try {
Monitor.try_enter_with_atomic_var (mutex, Timeout.Infinite, false, ref mutexLocked);

if (pending_signals > 0) {
--pending_signals;
return true;
}

wait_entry.condition = new object();
wait_entry.previous = null;
wait_entry.next = head;
if (head != null) {
Unsafe.AsRef<WaitEntry> (head).previous = Unsafe.AsPointer<WaitEntry> (ref wait_entry);
}
head = Unsafe.AsPointer<WaitEntry> (ref wait_entry);
}
finally {
if (mutexLocked) {
Monitor.Exit (mutex);
}
}

try {
Monitor.try_enter_with_atomic_var (wait_entry.condition, Timeout.Infinite, false, ref waitEntryLocked);
if (!wait_entry.signaled) {
Monitor.Monitor_wait (wait_entry.condition, timeoutMs, false);
}
}
finally {
if (waitEntryLocked)
Monitor.Exit (wait_entry.condition);
}

mutexLocked = false;
try {
Monitor.try_enter_with_atomic_var (mutex, Timeout.Infinite, false, ref mutexLocked);

if (!wait_entry.signaled) {
if (head == Unsafe.AsPointer<WaitEntry> (ref wait_entry)) {
head = wait_entry.next;
}
if (wait_entry.next != null) {
Unsafe.AsRef<WaitEntry> (wait_entry.next).previous = wait_entry.previous;
}
if (wait_entry.previous != null) {
Unsafe.AsRef<WaitEntry> (wait_entry.previous).next = wait_entry.next;
}
}
}
finally {
if (mutexLocked) {
Monitor.Exit (mutex);
}
}

return wait_entry.signaled;
return TimedWaitInternal (lifo_semaphore, timeoutMs) != 0;
}

[MethodImplAttribute (MethodImplOptions.InternalCall)]
extern static void ReleaseInternal (IntPtr semaphore, int count);

private void ReleaseCore (int count)
{
bool mutexLocked = false;
try {
Monitor.try_enter_with_atomic_var (mutex, Timeout.Infinite, false, ref mutexLocked);
while (count > 0) {
if (head != null) {
ref WaitEntry wait_entry = ref Unsafe.AsRef<WaitEntry> (head);
head = wait_entry.next;
if (head != null) {
Unsafe.AsRef<WaitEntry> (head).previous = null;
}
wait_entry.previous = null;
wait_entry.next = null;
lock (wait_entry.condition)
{
wait_entry.signaled = true;
Monitor.Pulse (wait_entry.condition);
}
--count;
} else {
pending_signals += (uint)count;
count = 0;
}
}
}
finally {
if (mutexLocked) {
Monitor.Exit (mutex);
}
}
ReleaseInternal (lifo_semaphore, count);
}
}
}

0 comments on commit 3eaff60

Please sign in to comment.