Skip to content

Commit

Permalink
Merge pull request #871 from somdoron/thread_safe
Browse files Browse the repository at this point in the history
problem: netmq is not thread safe
  • Loading branch information
somdoron authored May 19, 2020
2 parents a1c452f + 2a8f519 commit 4cd27a1
Show file tree
Hide file tree
Showing 25 changed files with 2,846 additions and 438 deletions.
50 changes: 0 additions & 50 deletions src/NetMQ-unix.sln

This file was deleted.

63 changes: 63 additions & 0 deletions src/NetMQ.Tests/ClientServer.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
using NetMQ;
using NetMQ.Sockets;
using Xunit;

namespace NetMQ.Tests
{
public class ClientServer
{
[Fact]
public void Inproc()
{
using var server = new ServerSocket();
using var client = new ClientSocket();
server.Bind("inproc://client-server");
client.Connect("inproc://client-server");

client.Send("Hello");
var (routingId, clientMsg) = server.ReceiveString();
Assert.NotEqual<uint>(0, routingId);
Assert.Equal("Hello", clientMsg);

server.Send(routingId, "World");
var serverMsg = client.ReceiveString();
Assert.Equal("World", serverMsg);
}

[Fact]
public void Tcp()
{
using var server = new ServerSocket();
using var client = new ClientSocket();
int port = server.BindRandomPort("tcp://*");
client.Connect($"tcp://localhost:{port}");

client.Send("Hello");
var (routingId, clientMsg) = server.ReceiveString();
Assert.NotEqual<uint>(0, routingId);
Assert.Equal("Hello", clientMsg);

server.Send(routingId, "World");
var serverMsg = client.ReceiveString();
Assert.Equal("World", serverMsg);
}

[Fact]
public async void Async()
{
using var server = new ServerSocket();
using var client = new ClientSocket();
int port = server.BindRandomPort("tcp://*");
client.Connect($"tcp://localhost:{port}");

await client.SendAsync("Hello");
var (routingId, clientMsg) = await server.ReceiveStringAsync();
Assert.NotEqual<uint>(0, routingId);
Assert.Equal("Hello", clientMsg);

await server.SendAsync(routingId, "World");
var serverMsg = await client.ReceiveStringAsync();
Assert.Equal("World", serverMsg);
}
}
}
13 changes: 13 additions & 0 deletions src/NetMQ/Core/IMailbox.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
using JetBrains.Annotations;

namespace NetMQ.Core
{
internal interface IMailbox
{
void Send([NotNull] Command command);

bool TryRecv(int timeout, out Command command);

void Close();
}
}
14 changes: 6 additions & 8 deletions src/NetMQ/Core/Mailbox.cs
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,6 @@ You should have received a copy of the GNU Lesser General Public License

namespace NetMQ.Core
{
internal interface IMailbox
{
void Send([NotNull] Command command);

void Close();
}

internal interface IMailboxEvent
{
void Ready();
Expand Down Expand Up @@ -92,11 +85,16 @@ public void Send(Command command)
}
}

public bool TryRecv(int timeout, out Command command)
{
throw new System.NotImplementedException();
}

public bool TryRecv(out Command command)
{
return m_commandPipe.TryRead(out command);
}

public void RaiseEvent()
{
if (!m_disposed)
Expand Down
120 changes: 120 additions & 0 deletions src/NetMQ/Core/MailboxSafe.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
using System.Collections.Generic;
using System.Diagnostics;
using System.Threading;
using JetBrains.Annotations;
using NetMQ.Core.Utils;

namespace NetMQ.Core
{
internal class MailboxSafe : IMailbox
{
/// <summary>
/// The pipe to store actual commands.
/// </summary>
private readonly YPipe<Command> m_commandPipe = new YPipe<Command>(Config.CommandPipeGranularity, "mailbox");

// Synchronize access to the mailbox from receivers and senders
private object m_sync;

private List<Signaler> m_signalers = new List<Signaler>();

#if DEBUG
/// <summary>Mailbox name. Only used for debugging.</summary>
[NotNull] private readonly string m_name;
#endif

/// <summary>
/// Create a new MailboxSafe with the given name.
/// </summary>
/// <param name="name">the name to give this new Mailbox</param>
/// <param name="sync">Synchronize access to the mailbox from receivers and senders</param>
public MailboxSafe([NotNull] string name, object sync)
{
m_sync = sync;

// Get the pipe into passive state. That way, if the users starts by
// polling on the associated file descriptor it will get woken up when
// new command is posted.
bool ok = m_commandPipe.TryRead(out Command cmd);
Debug.Assert(!ok);

#if DEBUG
m_name = name;
#endif
}

public void AddSignaler(Signaler signaler)
{
m_signalers.Add(signaler);
}

public void RemoveSignaler(Signaler signaler)
{
m_signalers.Remove(signaler);
}

public void ClearSignalers()
{
m_signalers.Clear();
}

public void Send(Command cmd)
{
lock (m_sync)
{
m_commandPipe.Write(ref cmd, false);
bool ok = m_commandPipe.Flush();

if (!ok)
{
Monitor.PulseAll(m_sync);

foreach (var signaler in m_signalers)
{
signaler.Send();
}
}
}
}

public bool TryRecv(int timeout, out Command command)
{
// Try to get the command straight away.
if (m_commandPipe.TryRead(out command))
return true;

// If the timeout is zero, it will be quicker to release the lock, giving other a chance to send a command
// and immediately relock it.
if (timeout == 0)
{
Monitor.Exit(m_sync);
Monitor.Enter(m_sync);
}
else
{
// Wait for signal from the command sender.
Monitor.Wait(m_sync, timeout);
}

// Another thread may already fetch the command
return m_commandPipe.TryRead(out command);
}

public void Close()
{
Monitor.Enter(m_sync);
Monitor.Exit(m_sync);
}

#if DEBUG
/// <summary>
/// Override ToString to provide the type-name, plus the Mailbox name within brackets.
/// </summary>
/// <returns>a string of the form Mailbox[name]</returns>
public override string ToString()
{
return base.ToString() + "[" + m_name + "]";
}
#endif
}
}
10 changes: 10 additions & 0 deletions src/NetMQ/Core/Mechanisms/Mechanism.cs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ static class SocketNames
public const string Xsub = "XSUB";
public const string Stream = "STREAM";
public const string Peer = "PEER";
public const string Server = "SERVER";
public const string Client = "CLIENT";
}

const int NameLengthSize = sizeof(byte);
Expand Down Expand Up @@ -119,6 +121,10 @@ protected string GetSocketName(ZmqSocketType socketType)
return SocketNames.Stream;
case ZmqSocketType.Peer:
return SocketNames.Peer;
case ZmqSocketType.Server:
return SocketNames.Server;
case ZmqSocketType.Client:
return SocketNames.Client;
default:
throw new ArgumentOutOfRangeException(nameof(socketType), socketType, null);
}
Expand Down Expand Up @@ -300,6 +306,10 @@ private bool CheckSocketType(string type)
return type == SocketNames.Push;
case ZmqSocketType.Push:
return type == SocketNames.Pull;
case ZmqSocketType.Server:
return type == SocketNames.Client;
case ZmqSocketType.Client:
return type == SocketNames.Server;
case ZmqSocketType.Peer:
return type == SocketNames.Peer;
default:
Expand Down
Loading

0 comments on commit 4cd27a1

Please sign in to comment.