Skip to content

Commit

Permalink
Remove ZMQ class
Browse files Browse the repository at this point in the history
  • Loading branch information
somdoron committed Nov 6, 2014
1 parent 28b18d4 commit f41e5dc
Show file tree
Hide file tree
Showing 22 changed files with 444 additions and 764 deletions.
1 change: 0 additions & 1 deletion src/NetMQ.Tests/NetMQ.Tests.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,6 @@
<Compile Include="SocketTests.cs" />
<Compile Include="StreamTests.cs" />
<Compile Include="zmq\YQueueTests.cs" />
<Compile Include="zmq\ZMQPollTests.cs" />
</ItemGroup>
<ItemGroup>
<ProjectReference Include="..\NetMQ\NetMQ.csproj">
Expand Down
1 change: 0 additions & 1 deletion src/NetMQ.Tests/NetMQ3.5.Tests.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,6 @@
<Compile Include="SocketTests.cs" />
<Compile Include="StreamTests.cs" />
<Compile Include="zmq\YQueueTests.cs" />
<Compile Include="zmq\ZMQPollTests.cs" />
</ItemGroup>
<ItemGroup>
<ProjectReference Include="..\NetMQ\NetMQ3.5.csproj">
Expand Down
42 changes: 0 additions & 42 deletions src/NetMQ.Tests/zmq/ZMQPollTests.cs

This file was deleted.

2 changes: 1 addition & 1 deletion src/NetMQ/Monitoring/NetMQMonitor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ public NetMQMonitor(NetMQContext context, NetMQSocket monitoredSocket, string en
Endpoint = endpoint;
Timeout = TimeSpan.FromSeconds(0.5);

ZMQ.SocketMonitor(monitoredSocket.SocketHandle, Endpoint, eventsToMonitor);
monitoredSocket.Monitor(endpoint, eventsToMonitor);

MonitoringSocket = context.CreatePairSocket();
MonitoringSocket.Options.Linger = TimeSpan.Zero;
Expand Down
5 changes: 1 addition & 4 deletions src/NetMQ/NetMQ.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,7 @@
<Compile Include="zmq\AtomicCounter.cs" />
<Compile Include="zmq\Blob.cs" />
<Compile Include="zmq\BufferPool.cs" />
<Compile Include="zmq\Selector.cs" />
<Compile Include="zmq\Transports\ByteArraySegment.cs" />
<Compile Include="zmq\Clock.cs" />
<Compile Include="zmq\Command.cs" />
Expand Down Expand Up @@ -172,11 +173,8 @@
<None Include="zmq\Native\Poller.cs" />
<Compile Include="zmq\Poller.cs" />
<Compile Include="zmq\PollerBase.cs" />
<Compile Include="zmq\PollItem.cs" />
<Compile Include="zmq\Proactor.cs" />
<Compile Include="zmq\Proxy.cs" />
<Compile Include="JetBrains.Annotations.cs" />
<None Include="zmq\Proxy.cs" />
<Compile Include="zmq\Patterns\Pub.cs" />
<Compile Include="zmq\Patterns\Pull.cs" />
<Compile Include="zmq\Patterns\Push.cs" />
Expand All @@ -203,7 +201,6 @@
<Compile Include="zmq\Patterns\XSub.cs" />
<Compile Include="zmq\YPipe.cs" />
<Compile Include="zmq\YQueue.cs" />
<Compile Include="zmq\ZMQ.cs" />
<Compile Include="zmq\ZObject.cs" />
</ItemGroup>
<ItemGroup />
Expand Down
5 changes: 1 addition & 4 deletions src/NetMQ/NetMQ3.5.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -136,12 +136,10 @@
<None Include="zmq\Native\Poller.cs" />
<Compile Include="zmq\Poller.cs" />
<Compile Include="zmq\PollerBase.cs" />
<Compile Include="zmq\PollItem.cs" />
<Compile Include="zmq\Proactor.cs" />
<Compile Include="zmq\Proxy.cs" />
<Compile Include="JetBrains.Annotations.cs" />
<None Include="zmq\Proxy.cs" />
<Compile Include="zmq\Reaper.cs" />
<Compile Include="zmq\Selector.cs" />
<Compile Include="zmq\SessionBase.cs" />
<Compile Include="zmq\Signaler.cs" />
<Compile Include="zmq\SocketBase.cs" />
Expand Down Expand Up @@ -172,7 +170,6 @@
<Compile Include="zmq\Transports\V2Protocol.cs" />
<Compile Include="zmq\YPipe.cs" />
<Compile Include="zmq\YQueue.cs" />
<Compile Include="zmq\ZMQ.cs" />
<Compile Include="zmq\ZObject.cs" />
</ItemGroup>
<Import Project="$(MSBuildToolsPath)\Microsoft.CSharp.targets" />
Expand Down
67 changes: 48 additions & 19 deletions src/NetMQ/NetMQContext.cs
Original file line number Diff line number Diff line change
Expand Up @@ -29,30 +29,57 @@ private NetMQContext(Ctx ctx)
/// <returns>The new context</returns>
public static NetMQContext Create()
{
return new NetMQContext(ZMQ.CtxNew());
return new NetMQContext(new Ctx());
}

/// <summary>
/// Number of IO Threads in the context, default is 1, 1 is good for most cases
/// </summary>
public int ThreadPoolSize
{
get { return ZMQ.CtxGet(m_ctx, ContextOption.IOThreads); }
set { ZMQ.CtxSet(m_ctx, ContextOption.IOThreads, value); }
get
{
m_ctx.CheckDisposed();

return m_ctx.Get(ContextOption.IOThreads);
}
set
{
m_ctx.CheckDisposed();

m_ctx.Set(ContextOption.IOThreads, value);
}
}

/// <summary>
/// Maximum number of sockets
/// </summary>
public int MaxSockets
{
get { return ZMQ.CtxGet(m_ctx, zmq.ContextOption.MaxSockets); }
set { ZMQ.CtxSet(m_ctx, ContextOption.MaxSockets, value); }
get
{
m_ctx.CheckDisposed();

return m_ctx.Get(ContextOption.MaxSockets);
}
set
{
m_ctx.CheckDisposed();

m_ctx.Set(ContextOption.MaxSockets, value);
}
}

private SocketBase CreateHandle(ZmqSocketType socketType)
{
m_ctx.CheckDisposed();

return m_ctx.CreateSocket(socketType);
}

public NetMQSocket CreateSocket(ZmqSocketType socketType)
{
var socketHandle = ZMQ.Socket(m_ctx, socketType);
var socketHandle = CreateHandle(socketType);

switch (socketType)
{
Expand Down Expand Up @@ -91,7 +118,7 @@ public NetMQSocket CreateSocket(ZmqSocketType socketType)
/// <returns></returns>
public RequestSocket CreateRequestSocket()
{
var socketHandle = ZMQ.Socket(m_ctx, ZmqSocketType.Req);
var socketHandle = CreateHandle(ZmqSocketType.Req);

return new RequestSocket(socketHandle);
}
Expand All @@ -102,7 +129,7 @@ public RequestSocket CreateRequestSocket()
/// <returns></returns>
public ResponseSocket CreateResponseSocket()
{
var socketHandle = ZMQ.Socket(m_ctx, ZmqSocketType.Rep);
var socketHandle = CreateHandle(ZmqSocketType.Rep);

return new ResponseSocket(socketHandle);
}
Expand All @@ -113,7 +140,7 @@ public ResponseSocket CreateResponseSocket()
/// <returns></returns>
public DealerSocket CreateDealerSocket()
{
var socketHandle = ZMQ.Socket(m_ctx, ZmqSocketType.Dealer);
var socketHandle = CreateHandle(ZmqSocketType.Dealer);

return new DealerSocket(socketHandle);
}
Expand All @@ -124,7 +151,7 @@ public DealerSocket CreateDealerSocket()
/// <returns></returns>
public RouterSocket CreateRouterSocket()
{
var socketHandle = ZMQ.Socket(m_ctx, ZmqSocketType.Router);
var socketHandle = CreateHandle(ZmqSocketType.Router);

return new RouterSocket(socketHandle);
}
Expand All @@ -135,7 +162,7 @@ public RouterSocket CreateRouterSocket()
/// <returns></returns>
public XPublisherSocket CreateXPublisherSocket()
{
var socketHandle = ZMQ.Socket(m_ctx, ZmqSocketType.Xpub);
var socketHandle = CreateHandle(ZmqSocketType.Xpub);

return new XPublisherSocket(socketHandle);
}
Expand All @@ -146,7 +173,7 @@ public XPublisherSocket CreateXPublisherSocket()
/// <returns></returns>
public PairSocket CreatePairSocket()
{
var socketHandle = ZMQ.Socket(m_ctx, ZmqSocketType.Pair);
var socketHandle = CreateHandle(ZmqSocketType.Pair);

return new PairSocket(socketHandle);
}
Expand All @@ -157,7 +184,7 @@ public PairSocket CreatePairSocket()
/// <returns></returns>
public PushSocket CreatePushSocket()
{
var socketHandle = ZMQ.Socket(m_ctx, ZmqSocketType.Push);
var socketHandle = CreateHandle(ZmqSocketType.Push);

return new PushSocket(socketHandle);
}
Expand All @@ -168,7 +195,7 @@ public PushSocket CreatePushSocket()
/// <returns></returns>
public PublisherSocket CreatePublisherSocket()
{
var socketHandle = ZMQ.Socket(m_ctx, ZmqSocketType.Pub);
var socketHandle = CreateHandle(ZmqSocketType.Pub);

return new PublisherSocket(socketHandle);
}
Expand All @@ -179,7 +206,7 @@ public PublisherSocket CreatePublisherSocket()
/// <returns></returns>
public PullSocket CreatePullSocket()
{
var socketHandle = ZMQ.Socket(m_ctx, ZmqSocketType.Pull);
var socketHandle = CreateHandle(ZmqSocketType.Pull);

return new PullSocket(socketHandle);
}
Expand All @@ -190,7 +217,7 @@ public PullSocket CreatePullSocket()
/// <returns></returns>
public SubscriberSocket CreateSubscriberSocket()
{
var socketHandle = ZMQ.Socket(m_ctx, ZmqSocketType.Sub);
var socketHandle = CreateHandle(ZmqSocketType.Sub);

return new SubscriberSocket(socketHandle);
}
Expand All @@ -201,14 +228,14 @@ public SubscriberSocket CreateSubscriberSocket()
/// <returns></returns>
public XSubscriberSocket CreateXSubscriberSocket()
{
var socketHandle = ZMQ.Socket(m_ctx, ZmqSocketType.Xsub);
var socketHandle = CreateHandle(ZmqSocketType.Xsub);

return new XSubscriberSocket(socketHandle);
}

public StreamSocket CreateStreamSocket()
{
var socketHandle = ZMQ.Socket(m_ctx, ZmqSocketType.Stream);
var socketHandle = CreateHandle(ZmqSocketType.Stream);

return new StreamSocket(socketHandle);
}
Expand All @@ -235,7 +262,9 @@ public void Terminate()
{
if (Interlocked.CompareExchange(ref m_isClosed, 1, 0) == 0)
{
ZMQ.Term(m_ctx);
m_ctx.CheckDisposed();

m_ctx.Terminate();
}
}

Expand Down
Loading

0 comments on commit f41e5dc

Please sign in to comment.