Skip to content

Commit

Permalink
problem: NetMQ doesn't support Scatter-Gather pattern
Browse files Browse the repository at this point in the history
  • Loading branch information
somdoron committed May 26, 2020
1 parent 1e47aa0 commit 7d44a87
Show file tree
Hide file tree
Showing 9 changed files with 293 additions and 1 deletion.
70 changes: 70 additions & 0 deletions src/NetMQ.Tests/ScatterGather.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
using System.Threading;
using NetMQ.Sockets;
using Xunit;

namespace NetMQ.Tests
{
public class ScatterGather
{
[Fact]
public void TestTcp()
{
using var scatter = new ScatterSocket();
using var gather = new GatherSocket();

int port = scatter.BindRandomPort("tcp://*");
gather.Connect($"tcp://127.0.0.1:{port}");

scatter.Send("1");
scatter.Send("2");

var m1 = gather.ReceiveString();
Assert.Equal("1", m1);

var m2 = gather.ReceiveString();
Assert.Equal("2", m2);
}

[Fact]
public void TestBlocking()
{
using var scatter = new ScatterSocket();
using var gather = new GatherSocket();
using var gather2 = new GatherSocket();

scatter.Bind("inproc://test-scatter-gather");
gather.Connect("inproc://test-scatter-gather");
gather2.Connect("inproc://test-scatter-gather");

scatter.Send("1");
scatter.Send("2");

var m1 = gather.ReceiveString();
Assert.Equal("1", m1);

var m2 = gather2.ReceiveString();
Assert.Equal("2", m2);
}

[Fact]
public async void TestAsync()
{
using var scatter = new ScatterSocket();
using var gather = new GatherSocket();
using var gather2 = new GatherSocket();

scatter.Bind("inproc://test-scatter-gather");
gather.Connect("inproc://test-scatter-gather");
gather2.Connect("inproc://test-scatter-gather");

await scatter.SendAsync("1");
await scatter.SendAsync("2");

var m1 = await gather.ReceiveStringAsync();
Assert.Equal("1", m1);

var m2 = await gather2.ReceiveStringAsync();
Assert.Equal("2", m2);
}
}
}
10 changes: 10 additions & 0 deletions src/NetMQ/Core/Mechanisms/Mechanism.cs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ static class SocketNames
public const string Peer = "PEER";
public const string Server = "SERVER";
public const string Client = "CLIENT";
public const string Gather = "GATHER";
public const string Scatter = "SCATTER";
}

const int NameLengthSize = sizeof(byte);
Expand Down Expand Up @@ -125,6 +127,10 @@ protected string GetSocketName(ZmqSocketType socketType)
return SocketNames.Server;
case ZmqSocketType.Client:
return SocketNames.Client;
case ZmqSocketType.Gather:
return SocketNames.Gather;
case ZmqSocketType.Scatter:
return SocketNames.Scatter;
default:
throw new ArgumentOutOfRangeException(nameof(socketType), socketType, null);
}
Expand Down Expand Up @@ -312,6 +318,10 @@ private bool CheckSocketType(string type)
return type == SocketNames.Server;
case ZmqSocketType.Peer:
return type == SocketNames.Peer;
case ZmqSocketType.Gather:
return type == SocketNames.Scatter;
case ZmqSocketType.Scatter:
return type == SocketNames.Gather;
default:
return false;
}
Expand Down
86 changes: 86 additions & 0 deletions src/NetMQ/Core/Patterns/Gather.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
#nullable enable

using System.Diagnostics;
using NetMQ.Core.Patterns.Utils;

namespace NetMQ.Core.Patterns
{
internal sealed class Gather : SocketBase
{
/// <summary>
/// Fair queueing object for inbound pipes.
/// </summary>
private readonly FairQueueing m_fairQueueing;

public Gather(Ctx parent, int threadId, int socketId)
: base(parent, threadId, socketId)
{
m_options.SocketType = ZmqSocketType.Gather;

m_fairQueueing = new FairQueueing();
}

/// <summary>
/// Register the pipe with this socket.
/// </summary>
/// <param name="pipe">the Pipe to attach</param>
/// <param name="icanhasall">not used</param>
protected override void XAttachPipe(Pipe pipe, bool icanhasall)
{
Debug.Assert(pipe != null);
m_fairQueueing.Attach(pipe);
}

/// <summary>
/// Indicate the given pipe as being ready for reading by this socket.
/// </summary>
/// <param name="pipe">the <c>Pipe</c> that is now becoming available for reading</param>
protected override void XReadActivated(Pipe pipe)
{
m_fairQueueing.Activated(pipe);
}

/// <summary>
/// This is an override of the abstract method that gets called to signal that the given pipe is to be removed from this socket.
/// </summary>
/// <param name="pipe">the Pipe that is being removed</param>
protected override void XTerminated(Pipe pipe)
{
m_fairQueueing.Terminated(pipe);
}

/// <summary>
/// Receive a message. The <c>Recv</c> method calls this lower-level method to do the actual receiving.
/// </summary>
/// <param name="msg">the <c>Msg</c> to receive the message into</param>
/// <returns><c>true</c> if the message was received successfully, <c>false</c> if there were no messages to receive</returns>
protected override bool XRecv(ref Msg msg)
{
bool received = m_fairQueueing.Recv(ref msg);

// Drop any messages with more flag
while (received && msg.HasMore)
{
// drop all frames of the current multi-frame message
received = m_fairQueueing.Recv(ref msg);

while (received && msg.HasMore)
received = m_fairQueueing.Recv(ref msg);

// get the new message
if (received)
received = m_fairQueueing.Recv(ref msg);
}

if (!received)
return false;

return true;
}

protected override bool XHasIn()
{
return m_fairQueueing.HasIn();
}
}
}
76 changes: 76 additions & 0 deletions src/NetMQ/Core/Patterns/Scatter.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
#nullable enable

using System.Diagnostics;
using NetMQ.Core.Patterns.Utils;

namespace NetMQ.Core.Patterns
{
internal sealed class Scatter : SocketBase
{
/// <summary>
/// Load balancer managing the outbound pipes.
/// </summary>
private readonly LoadBalancer m_loadBalancer;

public Scatter(Ctx parent, int threadId, int socketId)
: base(parent, threadId, socketId, true)
{
m_options.SocketType = ZmqSocketType.Scatter;

m_loadBalancer = new LoadBalancer();
}

/// <summary>
/// Register the pipe with this socket.
/// </summary>
/// <param name="pipe">the Pipe to attach</param>
/// <param name="icanhasall">not used</param>
protected override void XAttachPipe(Pipe pipe, bool icanhasall)
{
Debug.Assert(pipe != null);

// Don't delay pipe termination as there is no one
// to receive the delimiter.
pipe.SetNoDelay();

m_loadBalancer.Attach(pipe);
}

/// <summary>
/// Indicate the given pipe as being ready for writing to by this socket.
/// This gets called by the WriteActivated method.
/// </summary>
/// <param name="pipe">the <c>Pipe</c> that is now becoming available for writing</param>
protected override void XWriteActivated(Pipe pipe)
{
m_loadBalancer.Activated(pipe);
}

/// <summary>
/// This is an override of the abstract method that gets called to signal that the given pipe is to be removed from this socket.
/// </summary>
/// <param name="pipe">the Pipe that is being removed</param>
protected override void XTerminated(Pipe pipe)
{
m_loadBalancer.Terminated(pipe);
}

/// <summary>
/// Transmit the given message. The <c>Send</c> method calls this to do the actual sending.
/// </summary>
/// <param name="msg">the message to transmit</param>
/// <returns><c>true</c> if the message was sent successfully</returns>
protected override bool XSend(ref Msg msg)
{
if (msg.HasMore)
throw new InvalidException("SCATTER sockets do not allow multipart data (ZMQ_SNDMORE)");

return m_loadBalancer.Send(ref msg);
}

protected override bool XHasOut()
{
return m_loadBalancer.HasOut();
}
}
}
4 changes: 3 additions & 1 deletion src/NetMQ/Core/SessionBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,9 @@ public static SessionBase Create([NotNull] IOThread ioThread, bool connect, [Not
case ZmqSocketType.Stream:
case ZmqSocketType.Peer:
case ZmqSocketType.Server:
case ZmqSocketType.Client:
case ZmqSocketType.Client:
case ZmqSocketType.Gather:
case ZmqSocketType.Scatter:
if (options.CanSendHelloMsg && options.HelloMsg != null)
return new HelloMsgSession(ioThread, connect, socket, options, addr);
else
Expand Down
4 changes: 4 additions & 0 deletions src/NetMQ/Core/SocketBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,10 @@ public static SocketBase Create(ZmqSocketType type, [NotNull] Ctx parent, int th
return new Server(parent, threadId, socketId);
case ZmqSocketType.Client:
return new Client(parent, threadId, socketId);
case ZmqSocketType.Gather:
return new Gather(parent, threadId, socketId);
case ZmqSocketType.Scatter:
return new Scatter(parent, threadId, socketId);
default:
throw new InvalidException("SocketBase.Create called with invalid type of " + type);
}
Expand Down
17 changes: 17 additions & 0 deletions src/NetMQ/Sockets/GatherSocket.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
#nullable enable

namespace NetMQ.Sockets
{
/// <summary>
/// Gather socket, thread-safe alternative for Pull socket
/// </summary>
public class GatherSocket : ThreadSafeSocket, IThreadSafeInSocket
{
/// <summary>
/// Create a new Gather Socket.
/// </summary>
public GatherSocket() : base(ZmqSocketType.Gather)
{
}
}
}
17 changes: 17 additions & 0 deletions src/NetMQ/Sockets/ScatterSocket.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
#nullable enable

namespace NetMQ.Sockets
{
/// <summary>
/// Scatter socket, thread-safe alternative for Push socket
/// </summary>
public class ScatterSocket : ThreadSafeSocket, IThreadSafeOutSocket
{
/// <summary>
/// Create a new Scatter Socket.
/// </summary>
public ScatterSocket() : base(ZmqSocketType.Scatter)
{
}
}
}
10 changes: 10 additions & 0 deletions src/NetMQ/ZmqSocketType.cs
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,16 @@ public enum ZmqSocketType
/// </summary>
Client = 13,

/// <summary>
/// This denotes an Gather socket.
/// </summary>
Gather = 16,

/// <summary>
/// This denotes an Scatter socket.
/// </summary>
Scatter = 17,

/// <summary>
/// This denotes a Peer socket.
/// </summary>
Expand Down

0 comments on commit 7d44a87

Please sign in to comment.