Skip to content

Commit

Permalink
clean up of patterns and patterns utils
Browse files Browse the repository at this point in the history
  • Loading branch information
somdoron committed Nov 6, 2014
1 parent 411cb45 commit 0e799c9
Show file tree
Hide file tree
Showing 26 changed files with 529 additions and 748 deletions.
38 changes: 19 additions & 19 deletions src/NetMQ/NetMQ.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -125,16 +125,16 @@
<Compile Include="zmq\CommandType.cs" />
<Compile Include="zmq\Config.cs" />
<Compile Include="zmq\Ctx.cs" />
<Compile Include="zmq\Dealer.cs" />
<Compile Include="zmq\Patterns\Dealer.cs" />
<Compile Include="zmq\Decoder.cs" />
<Compile Include="zmq\DecoderBase.cs" />
<Compile Include="zmq\Dist.cs" />
<Compile Include="zmq\Patterns\Utils\ArrayExtensions.cs" />
<Compile Include="zmq\Patterns\Utils\Distribution.cs" />
<Compile Include="zmq\Encoder.cs" />
<Compile Include="zmq\EncoderBase.cs" />
<Compile Include="zmq\Enums.cs" />
<Compile Include="zmq\ErrorCode.cs" />
<Compile Include="zmq\ErrorHelper.cs" />
<Compile Include="zmq\FQ.cs" />
<Compile Include="zmq\IDecoder.cs" />
<Compile Include="zmq\IEncoder.cs" />
<Compile Include="zmq\IEngine.cs" />
Expand All @@ -149,19 +149,20 @@
<Compile Include="zmq\IProcatorEvents.cs" />
<Compile Include="zmq\ITimerEvent.cs" />
<Compile Include="zmq\IZmqMonitor.cs" />
<Compile Include="zmq\LB.cs" />
<Compile Include="zmq\Patterns\Utils\FairQueueing.cs" />
<Compile Include="zmq\Patterns\Utils\LoadBalancer.cs" />
<Compile Include="zmq\Mailbox.cs" />
<Compile Include="zmq\MonitorEvent.cs" />
<Compile Include="zmq\Msg.cs" />
<Compile Include="zmq\Mtrie.cs" />
<Compile Include="zmq\Patterns\Utils\MultiTrie.cs" />
<None Include="NetMQ.nuspec" />
<None Include="NetMQ.snk" />
<None Include="packages.config" />
<None Include="zmq\Native\NativeMethods.cs" />
<Compile Include="zmq\Native\OpCode.cs" />
<Compile Include="zmq\Options.cs" />
<Compile Include="zmq\Own.cs" />
<Compile Include="zmq\Pair.cs" />
<Compile Include="zmq\Patterns\Pair.cs" />
<Compile Include="zmq\Pgm\PgmAddress.cs" />
<Compile Include="zmq\Pgm\PgmListener.cs" />
<Compile Include="zmq\Pgm\PgmSender.cs" />
Expand All @@ -176,37 +177,36 @@
<Compile Include="zmq\Proxy.cs" />
<Compile Include="JetBrains.Annotations.cs" />
<None Include="zmq\Proxy.cs" />
<Compile Include="zmq\Pub.cs" />
<Compile Include="zmq\Pull.cs" />
<Compile Include="zmq\Push.cs" />
<Compile Include="zmq\Patterns\Pub.cs" />
<Compile Include="zmq\Patterns\Pull.cs" />
<Compile Include="zmq\Patterns\Push.cs" />
<Compile Include="zmq\RawDecoder.cs" />
<Compile Include="zmq\RawEncoder.cs" />
<Compile Include="zmq\Reaper.cs" />
<Compile Include="zmq\Rep.cs" />
<Compile Include="zmq\Req.cs" />
<Compile Include="zmq\Router.cs" />
<Compile Include="zmq\Patterns\Rep.cs" />
<Compile Include="zmq\Patterns\Req.cs" />
<Compile Include="zmq\Patterns\Router.cs" />
<Compile Include="zmq\SessionBase.cs" />
<Compile Include="zmq\Signaler.cs" />
<Compile Include="zmq\SocketBase.cs" />
<Compile Include="zmq\Stream.cs" />
<Compile Include="zmq\Patterns\Stream.cs" />
<Compile Include="zmq\StreamEngine.cs" />
<Compile Include="zmq\Sub.cs" />
<Compile Include="zmq\Patterns\Sub.cs" />
<Compile Include="zmq\Tcp\TcpAddress.cs" />
<Compile Include="zmq\Tcp\TcpConnecter.cs" />
<Compile Include="zmq\Tcp\TcpListener.cs" />
<Compile Include="zmq\Trie.cs" />
<Compile Include="zmq\Utils.cs" />
<Compile Include="zmq\Patterns\Utils\Trie.cs" />
<Compile Include="zmq\V1Decoder.cs" />
<Compile Include="zmq\V1Encoder.cs" />
<Compile Include="zmq\V1Protocol.cs" />
<Compile Include="zmq\XPub.cs" />
<Compile Include="zmq\XSub.cs" />
<Compile Include="zmq\Patterns\XPub.cs" />
<Compile Include="zmq\Patterns\XSub.cs" />
<Compile Include="zmq\YPipe.cs" />
<Compile Include="zmq\YQueue.cs" />
<None Include="zmq\ZError.cs" />
<Compile Include="zmq\ZMQ.cs" />
<Compile Include="zmq\ZObject.cs" />
</ItemGroup>
<ItemGroup />
<Import Project="$(MSBuildToolsPath)\Microsoft.CSharp.targets" />
<!-- To modify your build process, add your task inside one of the targets below and uncomment it.
Other similar extension points exist, see Microsoft.Common.targets.
Expand Down
37 changes: 18 additions & 19 deletions src/NetMQ/NetMQ3.5.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -94,16 +94,13 @@
<Compile Include="zmq\CommandType.cs" />
<Compile Include="zmq\Config.cs" />
<Compile Include="zmq\Ctx.cs" />
<Compile Include="zmq\Dealer.cs" />
<Compile Include="zmq\Decoder.cs" />
<Compile Include="zmq\DecoderBase.cs" />
<Compile Include="zmq\Dist.cs" />
<Compile Include="zmq\Encoder.cs" />
<Compile Include="zmq\EncoderBase.cs" />
<Compile Include="zmq\Enums.cs" />
<Compile Include="zmq\ErrorCode.cs" />
<Compile Include="zmq\ErrorHelper.cs" />
<Compile Include="zmq\FQ.cs" />
<Compile Include="zmq\IDecoder.cs" />
<Compile Include="zmq\IEncoder.cs" />
<Compile Include="zmq\IEngine.cs" />
Expand All @@ -118,19 +115,34 @@
<Compile Include="zmq\IProcatorEvents.cs" />
<Compile Include="zmq\ITimerEvent.cs" />
<Compile Include="zmq\IZmqMonitor.cs" />
<Compile Include="zmq\LB.cs" />
<Compile Include="zmq\Mailbox.cs" />
<Compile Include="zmq\MonitorEvent.cs" />
<Compile Include="zmq\Msg.cs" />
<Compile Include="zmq\Mtrie.cs" />
<None Include="NetMQ.nuspec" />
<None Include="NetMQ.snk" />
<None Include="packages.NetMQ3.5.config" />
<None Include="zmq\Native\NativeMethods.cs" />
<Compile Include="zmq\Native\OpCode.cs" />
<Compile Include="zmq\Options.cs" />
<Compile Include="zmq\Own.cs" />
<Compile Include="zmq\Pair.cs" />
<Compile Include="zmq\Patterns\Dealer.cs" />
<Compile Include="zmq\Patterns\Pair.cs" />
<Compile Include="zmq\Patterns\Pub.cs" />
<Compile Include="zmq\Patterns\Pull.cs" />
<Compile Include="zmq\Patterns\Push.cs" />
<Compile Include="zmq\Patterns\Rep.cs" />
<Compile Include="zmq\Patterns\Req.cs" />
<Compile Include="zmq\Patterns\Router.cs" />
<Compile Include="zmq\Patterns\Stream.cs" />
<Compile Include="zmq\Patterns\Sub.cs" />
<Compile Include="zmq\Patterns\Utils\ArrayExtensions.cs" />
<Compile Include="zmq\Patterns\Utils\Distribution.cs" />
<Compile Include="zmq\Patterns\Utils\FairQueueing.cs" />
<Compile Include="zmq\Patterns\Utils\LoadBalancer.cs" />
<Compile Include="zmq\Patterns\Utils\MultiTrie.cs" />
<Compile Include="zmq\Patterns\Utils\Trie.cs" />
<Compile Include="zmq\Patterns\XPub.cs" />
<Compile Include="zmq\Patterns\XSub.cs" />
<Compile Include="zmq\Pgm\PgmAddress.cs" />
<Compile Include="zmq\Pgm\PgmListener.cs" />
<Compile Include="zmq\Pgm\PgmSender.cs" />
Expand All @@ -145,34 +157,21 @@
<Compile Include="zmq\Proxy.cs" />
<Compile Include="JetBrains.Annotations.cs" />
<None Include="zmq\Proxy.cs" />
<Compile Include="zmq\Pub.cs" />
<Compile Include="zmq\Pull.cs" />
<Compile Include="zmq\Push.cs" />
<Compile Include="zmq\RawDecoder.cs" />
<Compile Include="zmq\RawEncoder.cs" />
<Compile Include="zmq\Reaper.cs" />
<Compile Include="zmq\Rep.cs" />
<Compile Include="zmq\Req.cs" />
<Compile Include="zmq\Router.cs" />
<Compile Include="zmq\SessionBase.cs" />
<Compile Include="zmq\Signaler.cs" />
<Compile Include="zmq\SocketBase.cs" />
<Compile Include="zmq\Stream.cs" />
<Compile Include="zmq\StreamEngine.cs" />
<Compile Include="zmq\Sub.cs" />
<Compile Include="zmq\Tcp\TcpAddress.cs" />
<Compile Include="zmq\Tcp\TcpConnecter.cs" />
<Compile Include="zmq\Tcp\TcpListener.cs" />
<Compile Include="zmq\Trie.cs" />
<Compile Include="zmq\Utils.cs" />
<Compile Include="zmq\V1Decoder.cs" />
<Compile Include="zmq\V1Encoder.cs" />
<Compile Include="zmq\V1Protocol.cs" />
<Compile Include="zmq\XPub.cs" />
<Compile Include="zmq\XSub.cs" />
<Compile Include="zmq\YPipe.cs" />
<Compile Include="zmq\YQueue.cs" />
<None Include="zmq\ZError.cs" />
<Compile Include="zmq\ZMQ.cs" />
<Compile Include="zmq\ZObject.cs" />
</ItemGroup>
Expand Down
43 changes: 19 additions & 24 deletions src/NetMQ/zmq/Dealer.cs → src/NetMQ/zmq/Patterns/Dealer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,11 @@ You should have received a copy of the GNU Lesser General Public License

using System;
using System.Diagnostics;
using NetMQ.zmq.Patterns.Utils;

namespace NetMQ.zmq
namespace NetMQ.zmq.Patterns
{
public class Dealer : SocketBase
class Dealer : SocketBase
{

public class DealerSession : SessionBase
Expand All @@ -40,8 +41,8 @@ public DealerSession(IOThread ioThread, bool connect,

// Messages are fair-queued from inbound pipes. And load-balanced to
// the outbound pipes.
private readonly FQ m_fq;
private readonly LB m_lb;
private readonly FairQueueing m_fairQueueing;
private readonly LoadBalancer m_loadBalancer;

// Have we prefetched a message.
private bool m_prefetched;
Expand All @@ -56,13 +57,8 @@ public Dealer(Ctx parent, int threadId, int sid)
m_prefetched = false;
m_options.SocketType = ZmqSocketType.Dealer;

m_fq = new FQ();
m_lb = new LB();
// TODO: Uncomment the following line when DEALER will become true DEALER
// rather than generic dealer socket.
// If the socket is closing we can drop all the outbound requests. There'll
// be noone to receive the replies anyway.
// options.delay_on_close = false;
m_fairQueueing = new FairQueueing();
m_loadBalancer = new LoadBalancer();

m_options.RecvIdentity = true;

Expand All @@ -81,22 +77,21 @@ protected override void XAttachPipe(Pipe pipe, bool icanhasall)
{

Debug.Assert(pipe != null);
m_fq.Attach(pipe);
m_lb.Attach(pipe);
m_fairQueueing.Attach(pipe);
m_loadBalancer.Attach(pipe);
}

protected override bool XSend(ref Msg msg, SendReceiveOptions flags)
{
return m_lb.Send(ref msg, flags);
return m_loadBalancer.Send(ref msg, flags);
}


protected override bool XRecv(SendReceiveOptions flags, ref Msg msg)
{
return xxrecv(flags, ref msg);
return ReceiveInternal(flags, ref msg);
}

private bool xxrecv(SendReceiveOptions flags, ref Msg msg)
private bool ReceiveInternal(SendReceiveOptions flags, ref Msg msg)
{
// If there is a prefetched message, return it.
if (m_prefetched)
Expand All @@ -111,7 +106,7 @@ private bool xxrecv(SendReceiveOptions flags, ref Msg msg)
// DEALER socket doesn't use identities. We can safely drop it and
while (true)
{
bool isMessageAvailable = m_fq.Recv(ref msg);
bool isMessageAvailable = m_fairQueueing.Recv(ref msg);

if (!isMessageAvailable)
{
Expand All @@ -133,7 +128,7 @@ protected override bool XHasIn()

// Try to read the next message to the pre-fetch buffer.

bool isMessageAvailable = xxrecv(SendReceiveOptions.DontWait, ref m_prefetchedMsg);
bool isMessageAvailable = ReceiveInternal(SendReceiveOptions.DontWait, ref m_prefetchedMsg);

if (!isMessageAvailable)
{
Expand All @@ -148,23 +143,23 @@ protected override bool XHasIn()

protected override bool XHasOut()
{
return m_lb.HasOut();
return m_loadBalancer.HasOut();
}

protected override void XReadActivated(Pipe pipe)
{
m_fq.Activated(pipe);
m_fairQueueing.Activated(pipe);
}

protected override void XWriteActivated(Pipe pipe)
{
m_lb.Activated(pipe);
m_loadBalancer.Activated(pipe);
}

protected override void XTerminated(Pipe pipe)
{
m_fq.Terminated(pipe);
m_lb.Terminated(pipe);
m_fairQueueing.Terminated(pipe);
m_loadBalancer.Terminated(pipe);
}
}
}
2 changes: 1 addition & 1 deletion src/NetMQ/zmq/Pair.cs → src/NetMQ/zmq/Patterns/Pair.cs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ You should have received a copy of the GNU Lesser General Public License

using System.Diagnostics;

namespace NetMQ.zmq
namespace NetMQ.zmq.Patterns
{
public class Pair : SocketBase
{
Expand Down
4 changes: 2 additions & 2 deletions src/NetMQ/zmq/Pub.cs → src/NetMQ/zmq/Patterns/Pub.cs
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,9 @@ You should have received a copy of the GNU Lesser General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/

namespace NetMQ.zmq
namespace NetMQ.zmq.Patterns
{
public class Pub : XPub
class Pub : XPub
{

public class PubSession : XPub.XPubSession
Expand Down
Loading

0 comments on commit 0e799c9

Please sign in to comment.