Skip to content

Commit

Permalink
Merge pull request zeromq#599 from somdoron/master
Browse files Browse the repository at this point in the history
problem: Polling once with NetMQPoller is complicated
  • Loading branch information
c-rack authored Aug 6, 2016
2 parents d4386f4 + f90d675 commit 91de4a0
Show file tree
Hide file tree
Showing 3 changed files with 47 additions and 44 deletions.
14 changes: 7 additions & 7 deletions src/NetMQ/NetMQPoller.cs
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,10 @@ public sealed class NetMQPoller :
private readonly List<NetMQTimer> m_timers = new List<NetMQTimer>();
private readonly Dictionary<Socket, Action<Socket>> m_pollinSockets = new Dictionary<Socket, Action<Socket>>();
private readonly Switch m_switch = new Switch(false);
private readonly Selector m_selector = new Selector();
private readonly NetMQSelector m_netMqSelector = new NetMQSelector();
private readonly StopSignaler m_stopSignaler = new StopSignaler();

private SelectItem[] m_pollSet;
private NetMQSelector.Item[] m_pollSet;
private NetMQSocket[] m_pollact;

private volatile bool m_isPollSetDirty = true;
Expand Down Expand Up @@ -340,7 +340,7 @@ public void Run()

if (m_pollSet.Length != 0)
{
isItemAvailable = m_selector.Select(m_pollSet, m_pollSet.Length, timeout);
isItemAvailable = m_netMqSelector.Select(m_pollSet, m_pollSet.Length, timeout);
}
else if (timeout > 0)
{
Expand Down Expand Up @@ -372,7 +372,7 @@ public void Run()

for (var i = 0; i < m_pollSet.Length; i++)
{
SelectItem item = m_pollSet[i];
NetMQSelector.Item item = m_pollSet[i];

if (item.Socket != null)
{
Expand Down Expand Up @@ -473,7 +473,7 @@ private void RebuildPollset()
#endif

// Recreate the m_pollSet and m_pollact arrays.
m_pollSet = new SelectItem[m_sockets.Count + m_pollinSockets.Count];
m_pollSet = new NetMQSelector.Item[m_sockets.Count + m_pollinSockets.Count];
m_pollact = new NetMQSocket[m_sockets.Count];

// For each socket in m_sockets,
Expand All @@ -482,14 +482,14 @@ private void RebuildPollset()

foreach (var socket in m_sockets)
{
m_pollSet[index] = new SelectItem(socket.SocketHandle, socket.GetPollEvents());
m_pollSet[index] = new NetMQSelector.Item(socket, socket.GetPollEvents());
m_pollact[index] = socket;
index++;
}

foreach (var socket in m_pollinSockets.Keys)
{
m_pollSet[index] = new SelectItem(socket, PollEvents.PollError | PollEvents.PollIn);
m_pollSet[index] = new NetMQSelector.Item(socket, PollEvents.PollError | PollEvents.PollIn);
index++;
}

Expand Down
67 changes: 35 additions & 32 deletions src/NetMQ/Core/Utils/Selector.cs → src/NetMQ/NetMQSelector.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,55 +3,58 @@
using System.Diagnostics;
using System.Net.Sockets;
using JetBrains.Annotations;
using NetMQ.Core;
using NetMQ.Core.Utils;

namespace NetMQ.Core.Utils
{
namespace NetMQ
{
/// <summary>
/// A SelectItem is a pairing of a (Socket or SocketBase) and a PollEvents value.
/// For selecting on NetMQSocket or regualr .net Socket.
/// This is for advanced usages, for most cases NetMQPoller is the suggested alternative.
/// </summary>
internal sealed class SelectItem
public sealed class NetMQSelector
{
public SelectItem(SocketBase socket, PollEvents @event)
{
Socket = socket;
Event = @event;
}
private readonly List<Socket> m_checkRead = new List<Socket>();
private readonly List<Socket> m_checkWrite = new List<Socket>();
private readonly List<Socket> m_checkError = new List<Socket>();

public SelectItem(Socket fileDescriptor, PollEvents @event)
/// <summary>
/// Selector Item used to hold the NetMQSocket/Socket and PollEvents
/// </summary>
public sealed class Item
{
FileDescriptor = fileDescriptor;
Event = @event;
}
public Item(NetMQSocket socket, PollEvents @event)
{
Socket = socket;
Event = @event;
}

public Socket FileDescriptor { get; }
public Item(Socket fileDescriptor, PollEvents @event)
{
FileDescriptor = fileDescriptor;
Event = @event;
}

public SocketBase Socket { get; }
public Socket FileDescriptor { get; }

public PollEvents Event { get; }
public NetMQSocket Socket { get; }

public PollEvents ResultEvent { get; set; }
}
public PollEvents Event { get; }

/// <summary>
/// A Selector holds three lists of Sockets - for read, write, and error,
/// and provides a Select method.
/// </summary>
internal sealed class Selector
{
private readonly List<Socket> m_checkRead = new List<Socket>();
private readonly List<Socket> m_checkWrite = new List<Socket>();
private readonly List<Socket> m_checkError = new List<Socket>();
public PollEvents ResultEvent { get; set; }
}

/// <summary>
/// Select on NetMQSocket or Socket, similar behavior to Socket.Select.
/// </summary>
/// <param name="items"> (must not be null)</param>
/// <param name="itemsCount"></param>
/// <param name="items">Items to select on (must not be null)</param>
/// <param name="itemsCount">Number of items in the array to select on</param>
/// <param name="timeout">a time-out period, in milliseconds</param>
/// <returns></returns>
/// <exception cref="FaultException">The internal select operation failed.</exception>
/// <exception cref="ArgumentNullException"><paramref name="items"/> is <c>null</c>.</exception>
/// <exception cref="TerminatingException">The socket has been stopped.</exception>
public bool Select([NotNull] SelectItem[] items, int itemsCount, long timeout)
public bool Select([NotNull] Item[] items, int itemsCount, long timeout)
{
if (items == null)
throw new ArgumentNullException(nameof(items));
Expand Down Expand Up @@ -103,8 +106,8 @@ public bool Select([NotNull] SelectItem[] items, int itemsCount, long timeout)

if (pollItem.Socket != null)
{
if (pollItem.Event != PollEvents.None && pollItem.Socket.Handle.Connected)
m_checkRead.Add(pollItem.Socket.Handle);
if (pollItem.Event != PollEvents.None && pollItem.Socket.SocketHandle.Handle.Connected)
m_checkRead.Add(pollItem.Socket.SocketHandle.Handle);
}
else
{
Expand Down
10 changes: 5 additions & 5 deletions src/NetMQ/NetMQSocket.cs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ public abstract class NetMQSocket : IOutgoingSocket, IReceivingSocket, ISocketPo
{
private readonly SocketBase m_socketHandle;
private readonly NetMQSocketEventArgs m_socketEventArgs;
private readonly Selector m_selector;
private readonly NetMQSelector m_netMqSelector;

private EventHandler<NetMQSocketEventArgs> m_receiveReady;
private EventHandler<NetMQSocketEventArgs> m_sendReady;
Expand All @@ -37,7 +37,7 @@ internal enum DefaultAction
internal NetMQSocket(ZmqSocketType socketType, string connectionString, DefaultAction defaultAction)
{
m_socketHandle = NetMQConfig.Context.CreateSocket(socketType);
m_selector = new Selector();
m_netMqSelector = new NetMQSelector();
Options = new SocketOptions(this);
m_socketEventArgs = new NetMQSocketEventArgs(this);

Expand Down Expand Up @@ -77,7 +77,7 @@ internal NetMQSocket(ZmqSocketType socketType, string connectionString, DefaultA
/// <param name="socketHandle">a SocketBase object to assign to the new socket</param>
internal NetMQSocket([NotNull] SocketBase socketHandle)
{
m_selector = new Selector();
m_netMqSelector = new NetMQSelector();
m_socketHandle = socketHandle;
Options = new SocketOptions(this);
m_socketEventArgs = new NetMQSocketEventArgs(this);
Expand Down Expand Up @@ -296,9 +296,9 @@ public bool Poll(TimeSpan timeout)
/// <exception cref="TerminatingException">The socket has been stopped.</exception>
public PollEvents Poll(PollEvents pollEvents, TimeSpan timeout)
{
SelectItem[] items = { new SelectItem(SocketHandle, pollEvents) };
NetMQSelector.Item[] items = { new NetMQSelector.Item(this, pollEvents) };

m_selector.Select(items, 1, (long)timeout.TotalMilliseconds);
m_netMqSelector.Select(items, 1, (long)timeout.TotalMilliseconds);
return items[0].ResultEvent;
}

Expand Down

0 comments on commit 91de4a0

Please sign in to comment.