Skip to content

Commit

Permalink
port a fix for disconnect from zeromq:zeromq/libzmq@cb35fd7
Browse files Browse the repository at this point in the history
  • Loading branch information
somdoron committed Aug 11, 2015
1 parent 831aef6 commit 8eb08ee
Show file tree
Hide file tree
Showing 2 changed files with 59 additions and 9 deletions.
33 changes: 33 additions & 0 deletions src/NetMQ.Tests/SocketTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -630,6 +630,39 @@ public void BindRandomThenUnbind()
}
}

[Test]
public void ReconnectOnRouterBug()
{
using (var context = NetMQContext.Create())
{
using (var dealer = context.CreateDealerSocket())
{
dealer.Options.Identity = Encoding.ASCII.GetBytes("dealer");
dealer.Bind("tcp://localhost:6667");

using (var router = context.CreateRouterSocket())
{
router.Options.RouterMandatory = true;
router.Connect("tcp://localhost:6667");
Thread.Sleep(100);

router.SendMoreFrame("dealer").SendFrame("Hello");
var message = dealer.ReceiveFrameString();
Assert.That(message == "Hello");

router.Disconnect("tcp://localhost:6667");
Thread.Sleep(1000);
router.Connect("tcp://localhost:6667");
Thread.Sleep(100);

router.SendMoreFrame("dealer").SendFrame("Hello");
message = dealer.ReceiveFrameString();
Assert.That(message == "Hello");
}
}
}
}

[Test]
public void InprocRouterDealerTest()
{
Expand Down
35 changes: 26 additions & 9 deletions src/NetMQ/Core/SocketBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,19 @@ namespace NetMQ.Core
{
internal abstract class SocketBase : Own, IPollEvents, Pipe.IPipeEvents
{
[NotNull] private readonly Dictionary<string, Own> m_endpoints = new Dictionary<string, Own>();
class Endpoint
{
public Endpoint(Own own, Pipe pipe)
{
Own = own;
Pipe = pipe;
}

public Own Own { get; private set; }
public Pipe Pipe { get; private set; }
}

[NotNull] private readonly Dictionary<string, Endpoint> m_endpoints = new Dictionary<string, Endpoint>();

[NotNull] private readonly Dictionary<string, Pipe> m_inprocs = new Dictionary<string, Pipe>();

Expand Down Expand Up @@ -473,7 +485,7 @@ public void Bind([NotNull] string addr)
}

m_options.LastEndpoint = listener.Address;
AddEndpoint(addr, listener);
AddEndpoint(addr, listener, null);
break;
}
case Address.PgmProtocol:
Expand All @@ -493,7 +505,7 @@ public void Bind([NotNull] string addr)
}

m_options.LastEndpoint = addr;
AddEndpoint(addr, listener);
AddEndpoint(addr, listener, null);
break;
}
case Address.IpcProtocol:
Expand All @@ -513,7 +525,7 @@ public void Bind([NotNull] string addr)
}

m_options.LastEndpoint = listener.Address;
AddEndpoint(addr, listener);
AddEndpoint(addr, listener,null);
break;
}
default:
Expand Down Expand Up @@ -682,6 +694,7 @@ public void Connect([NotNull] string addr)
// PGM does not support subscription forwarding; ask for all data to be
// sent to this pipe.
bool icanhasall = protocol == Address.PgmProtocol || protocol == Address.EpgmProtocol;
Pipe newPipe = null;

if (!m_options.DelayAttachOnConnect || icanhasall)
{
Expand All @@ -693,6 +706,7 @@ public void Connect([NotNull] string addr)

// Attach local end of the pipe to the socket object.
AttachPipe(pipes[0], icanhasall);
newPipe = pipes[0];

// Attach remote end of the pipe to the session object later on.
session.AttachPipe(pipes[1]);
Expand All @@ -701,7 +715,7 @@ public void Connect([NotNull] string addr)
// Save last endpoint URI
m_options.LastEndpoint = paddr.ToString();

AddEndpoint(addr, session);
AddEndpoint(addr, session, newPipe);
}

/// <summary>
Expand All @@ -723,11 +737,11 @@ private static void DecodeAddress([NotNull] string addr, out string address, out
/// <summary>
/// Take ownership of the given <paramref name="endpoint"/> and register it against the given <paramref name="address"/>.
/// </summary>
private void AddEndpoint([NotNull] string address, [NotNull] Own endpoint)
private void AddEndpoint([NotNull] string address, [NotNull] Own endpoint, Pipe pipe)
{
// Activate the session. Make it a child of this socket.
LaunchChild(endpoint);
m_endpoints[address] = endpoint;
m_endpoints[address] = new Endpoint(endpoint, pipe);
}

/// <summary>
Expand Down Expand Up @@ -771,11 +785,14 @@ public void TermEndpoint([NotNull] string addr)
}
else
{
Own endpoint;
Endpoint endpoint;
if (!m_endpoints.TryGetValue(addr, out endpoint))
throw new EndpointNotFoundException("Endpoint was not found and cannot be disconnected");

TermChild(endpoint);
if (endpoint.Pipe != null)
endpoint.Pipe.Terminate(false);

TermChild(endpoint.Own);
m_endpoints.Remove(addr);
}
}
Expand Down

0 comments on commit 8eb08ee

Please sign in to comment.