Skip to content

Commit

Permalink
[FIXED] issue with reconnect buffer blocking protocol messages
Browse files Browse the repository at this point in the history
Now protocol messages have a fast path during reconnect
During normal communication protocol messages share the queue

Fixes nats-io#177
  • Loading branch information
Stephen Asbury committed Sep 18, 2018
1 parent 5ae8bdb commit 4fb2bb9
Show file tree
Hide file tree
Showing 5 changed files with 137 additions and 11 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
## Version 2.1.1

* [FIXED] Issue with version in Nats.java, also updated deploying.md with checklist
* [FIXED] Fixed issue during reconnect where buffered messages blocked protocol messages

## Version 2.1.0

Expand Down
25 changes: 19 additions & 6 deletions src/main/java/io/nats/client/impl/NatsConnection.java
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,8 @@ void reconnect() throws InterruptedException {
return;
}

this.writer.setReconnectMode(true);

while (!isConnected() && !isClosed() && !this.isClosing()) {
Collection<String> serversToTry = buildReconnectList();

Expand Down Expand Up @@ -259,6 +261,8 @@ void reconnect() throws InterruptedException {
} catch (Exception exp) {
this.processException(exp);
}

this.writer.setReconnectMode(false);

processConnectionEvent(Events.RESUBSCRIBED);
}
Expand Down Expand Up @@ -700,7 +704,7 @@ void sendUnsub(NatsSubscription sub, int after) {
protocolBuilder.append(String.valueOf(after));
}
NatsMessage unsubMsg = new NatsMessage(protocolBuilder.toString());
queueOutgoing(unsubMsg);
queueProtocolOutgoing(unsubMsg);
}

// Assumes the null/empty checks were handled elsewhere
Expand Down Expand Up @@ -740,7 +744,7 @@ void sendSubscriptionMessage(CharSequence sid, String subject, String queueName)
protocolBuilder.append(" ");
protocolBuilder.append(sid);
NatsMessage subMsg = new NatsMessage(protocolBuilder.toString());
queueOutgoing(subMsg);
queueProtocolOutgoing(subMsg);
}

String createInbox() {
Expand Down Expand Up @@ -965,9 +969,9 @@ void sendConnect(String serverURI) {
String connectOptions = this.options.buildProtocolConnectOptionsString(serverURI, info.isAuthRequired());
connectString.append(connectOptions);
NatsMessage msg = new NatsMessage(connectString.toString());
queueOutgoing(msg);
queueProtocolOutgoing(msg);
}

// Send a ping request and push a pong future on the queue.
// futures are completed in order, keep this one if a thread wants to wait
// for a specific pong. Note, if no pong returns the wait will not return
Expand All @@ -989,14 +993,16 @@ CompletableFuture<Boolean> sendPing() {
CompletableFuture<Boolean> pongFuture = new CompletableFuture<>();
NatsMessage msg = new NatsMessage(NatsConnection.OP_PING);
pongQueue.add(pongFuture);
queueOutgoing(msg);

queueProtocolOutgoing(msg);

this.statistics.incrementPingCount();
return pongFuture;
}

void sendPong() {
NatsMessage msg = new NatsMessage(NatsConnection.OP_PONG);
queueOutgoing(msg);
queueProtocolOutgoing(msg);
}

// Called by the reader
Expand Down Expand Up @@ -1086,6 +1092,13 @@ void queueOutgoing(NatsMessage msg) {
this.writer.queue(msg);
}

void queueProtocolOutgoing(NatsMessage msg) {
if (msg.getControlLineLength() > this.options.getMaxControlLine()) {
throw new IllegalArgumentException("Control line is too long");
}
this.writer.queueProtocolMessage(msg);
}

void deliverMessage(NatsMessage msg) {
this.statistics.incrementInMsgs();
this.statistics.incrementInBytes(msg.getSizeInBytes());
Expand Down
29 changes: 27 additions & 2 deletions src/main/java/io/nats/client/impl/NatsConnectionWriter.java
Original file line number Diff line number Diff line change
Expand Up @@ -32,21 +32,25 @@ class NatsConnectionWriter implements Runnable {
private CompletableFuture<Boolean> stopped;
private Future<DataPort> dataPortFuture;
private final AtomicBoolean running;
private final AtomicBoolean reconnectMode;

private byte[] sendBuffer;

private MessageQueue outgoing;
private MessageQueue reconnectOutgoing;

NatsConnectionWriter(NatsConnection connection) {
this.connection = connection;

this.running = new AtomicBoolean(false);
this.reconnectMode = new AtomicBoolean(false);
this.stopped = new CompletableFuture<>();
this.stopped.complete(Boolean.TRUE); // we are stopped on creation

this.sendBuffer = new byte[connection.getOptions().getBufferSize()];

outgoing = new MessageQueue(true);
reconnectOutgoing = new MessageQueue(true);
}

// Should only be called if the current thread has exited.
Expand All @@ -68,6 +72,7 @@ void start(Future<DataPort> dataPortFuture) {
Future<Boolean> stop() {
this.running.set(false);
this.outgoing.pause();
this.reconnectOutgoing.pause();

// Clear old ping/pong requests
byte[] pingRequest = NatsConnection.OP_PING.getBytes(StandardCharsets.UTF_8);
Expand All @@ -80,16 +85,24 @@ Future<Boolean> stop() {

public void run() {
Duration waitForMessage = Duration.ofMinutes(2); // This can be long since no one is sending
Duration reconnectWait = Duration.ofMillis(1); // This can be long since no one is sending
long maxMessages = 1000;

try {
DataPort dataPort = this.dataPortFuture.get(); // Will wait for the future to complete
NatsStatistics stats = this.connection.getNatsStatistics();
this.outgoing.resume();
this.reconnectOutgoing.resume();

while (this.running.get()) {
int sendPosition = 0;
NatsMessage msg = this.outgoing.accumulate(this.sendBuffer.length, maxMessages, waitForMessage);
NatsMessage msg = null;

if (this.reconnectMode.get()) {
msg = this.reconnectOutgoing.accumulate(this.sendBuffer.length, maxMessages, reconnectWait);
} else {
msg = this.outgoing.accumulate(this.sendBuffer.length, maxMessages, waitForMessage);
}

if (msg == null) { // Make sure we are still running
continue;
Expand Down Expand Up @@ -149,11 +162,23 @@ public void run() {
}
}

void setReconnectMode(boolean tf) {
reconnectMode.set(tf);
}

boolean canQueue(NatsMessage msg, long maxSize) {
return (maxSize <= 0 || (outgoing.sizeInBytes() + msg.getSizeInBytes()) < maxSize);
}

void queue(NatsMessage msg) {
outgoing.push(msg);
this.outgoing.push(msg);
}

void queueProtocolMessage(NatsMessage msg) {
if (this.reconnectMode.get()) {
this.reconnectOutgoing.push(msg);
} else {
this.outgoing.push(msg);
}
}
}
6 changes: 4 additions & 2 deletions src/test/java/io/nats/client/AuthTests.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@

import org.junit.Test;

import io.nats.client.Connection.Status;
import io.nats.client.ConnectionListener.Events;

public class AuthTests {
Expand Down Expand Up @@ -174,8 +175,9 @@ public void testUserPassInURLOnReconnect() throws Exception {
}

handler.waitForStatusChange(5, TimeUnit.SECONDS);
assertTrue("Reconnecting status", Connection.Status.RECONNECTING == nc.getStatus() ||
Connection.Status.DISCONNECTED == nc.getStatus());
Status status = nc.getStatus();
assertTrue("Reconnecting status", Connection.Status.RECONNECTING == status ||
Connection.Status.DISCONNECTED == status);
handler.prepForStatusChange(Events.RESUBSCRIBED);


Expand Down
87 changes: 86 additions & 1 deletion src/test/java/io/nats/client/impl/ReconnectTests.java
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,6 @@ public void testSimpleReconnect() throws Exception { //Includes test for subscri
}
}


@Test
public void testSubscribeDuringReconnect() throws Exception {
NatsConnection nc = null;
Expand Down Expand Up @@ -192,6 +191,92 @@ public void testSubscribeDuringReconnect() throws Exception {
}
}

@Test
public void testReconnectBuffer() throws Exception {
NatsConnection nc = null;
TestHandler handler = new TestHandler();
int port = NatsTestServer.nextPort();
Subscription sub;
long start = 0;
long end = 0;
String[] customArgs = {"--user","stephen","--pass","password"};

handler.setPrintExceptions(true);

try {
try (NatsTestServer ts = new NatsTestServer(customArgs, port, false)) {
Options options = new Options.Builder().
server(ts.getURI()).
maxReconnects(-1).
userInfo("stephen", "password").
reconnectWait(Duration.ofMillis(1000)).
connectionListener(handler).
build();
nc = (NatsConnection) Nats.connect(options);
assertTrue("Connected Status", Connection.Status.CONNECTED == nc.getStatus());

sub = nc.subscribe("subsubject");

final NatsConnection nnc = nc;
Dispatcher d = nc.createDispatcher((msg) -> {
nnc.publish(msg.getReplyTo(), msg.getData());
});
d.subscribe("dispatchSubject");
nc.flush(Duration.ofMillis(1000));

Future<Message> inc = nc.request("dispatchSubject", "test".getBytes(StandardCharsets.UTF_8));
Message msg = inc.get();
assertNotNull(msg);

nc.publish("subsubject", null);
msg = sub.nextMessage(Duration.ofMillis(100));
assertNotNull(msg);

handler.prepForStatusChange(Events.DISCONNECTED);
start = System.nanoTime();
}

flushAndWait(nc, handler);
checkReconnectingStatus(nc);

// Send a message to the dispatcher and one to the subscriber
// These should be sent on reconnect
Future<Message> inc = nc.request("dispatchSubject", "test".getBytes(StandardCharsets.UTF_8));
nc.publish("subsubject", null);
nc.publish("subsubject", null);

handler.prepForStatusChange(Events.RESUBSCRIBED);

try (NatsTestServer ts = new NatsTestServer(customArgs, port, false)) {
handler.waitForStatusChange(5000, TimeUnit.MILLISECONDS);
assertTrue("Connected Status", Connection.Status.CONNECTED == nc.getStatus());

end = System.nanoTime();

assertTrue("reconnect wait", 1_000_000 * (end-start) > 1000);

// Check the message we sent to dispatcher
Message msg = inc.get(500, TimeUnit.MILLISECONDS);
assertNotNull(msg);

// Check the two we sent to subscriber
msg = sub.nextMessage(Duration.ofMillis(500));
assertNotNull(msg);

msg = sub.nextMessage(Duration.ofMillis(500));
assertNotNull(msg);
}

assertEquals("reconnect count", 1, nc.getNatsStatistics().getReconnects());
assertTrue("exception count", nc.getNatsStatistics().getExceptions() > 0);
} finally {
if (nc != null) {
nc.close();
assertTrue("Closed Status", Connection.Status.CLOSED == nc.getStatus());
}
}
}

@Test
public void testMaxReconnects() throws Exception {
Connection nc = null;
Expand Down

0 comments on commit 4fb2bb9

Please sign in to comment.