Skip to content

Commit

Permalink
Merge pull request #115 from nats-io/v2.1.3
Browse files Browse the repository at this point in the history
V2.1.3
  • Loading branch information
sasbury authored Dec 21, 2018
2 parents 9450d31 + 109ef36 commit 8c7793e
Show file tree
Hide file tree
Showing 7 changed files with 113 additions and 6 deletions.
7 changes: 7 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,12 @@
# Change Log

## Version 2.1.3

* [Added] Methods to include error and connection listener with factory #113
* [Fixed] Protocol buffer build issues #112
* [Fixed] Made use of ossh password optional #111
* [Fixed] Typo in readme pointing to java repo #110

## Version 2.1.2

* [Removed] Dependency on guava
Expand Down
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -360,8 +360,8 @@ Subscription sub2 = sc.subscribe("foo", mcb2,
The build depends on Gradle, and contains `gradlew` to simplify the process. After cloning, you can build the repository and run the tests with a single command:
```bash
> git clone https://github.com/nats-io/java-nats.git
> cd java-nats
> git clone https://github.com/nats-io/java-nats-streaming.git
> cd java-nats-streaming
> ./gradlew build
```
Expand Down
9 changes: 6 additions & 3 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,9 @@ plugins {
// Update version here, repeated check-ins not into master will have snapshot on them
def versionMajor = 2
def versionMinor = 1
def versionPatch = 2
def versionPatch = 3
def versionModifier = ""
def jarVersion = "2.1.2"
def jarVersion = "2.1.3"
def branch = System.getenv("TRAVIS_BRANCH");

def getVersionName = { ->
Expand Down Expand Up @@ -64,6 +64,9 @@ sourceSets {
java.srcDirs = ['gen/main/java']
}
main {
proto {
srcDir 'src/main/proto'
}
java {
srcDirs = ['src/main/java', 'src/examples/java', 'gen/main/java']
}
Expand Down Expand Up @@ -191,7 +194,7 @@ uploadArchives {
repository(url: "file://$buildDir/repos")
if (project.hasProperty("local_archives") || ("true".equals(System.getenv("TRAVIS_PULL_REQUEST")))) {
repository(url: "file://$buildDir/repos")
} else {
} else if (project.hasProperty('ossrhUsername') && project.hasProperty('ossrhPassword')) {
beforeDeployment { MavenDeployment deployment -> signing.signPom(deployment) }

repository(url: "https://oss.sonatype.org/service/local/staging/deploy/maven2/") {
Expand Down
38 changes: 37 additions & 1 deletion src/main/java/io/nats/streaming/StreamingConnectionFactory.java
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@
package io.nats.streaming;

import io.nats.client.Connection;
import io.nats.client.ConnectionListener;
import io.nats.client.ErrorListener;

import java.io.IOException;
import java.time.Duration;
import java.util.concurrent.TimeUnit;
Expand All @@ -32,6 +35,8 @@ public class StreamingConnectionFactory {
private Connection natsConn;
private String clientId;
private String clusterId;
private ConnectionListener connListener;
private ErrorListener errListener;

public StreamingConnectionFactory() {
}
Expand All @@ -58,7 +63,7 @@ public StreamingConnection createConnection() throws IOException, InterruptedExc
Options options() {
return new Options.Builder().connectWait(connectTimeout).pubAckWait(ackTimeout)
.discoverPrefix(discoverPrefix).maxPubAcksInFlight(maxPubAcksInFlight)
.natsConn(natsConn).natsUrl(natsUrl).build();
.natsConn(natsConn).natsUrl(natsUrl).connectionListener(connListener).errorListener(errListener).build();
}

/**
Expand Down Expand Up @@ -242,4 +247,35 @@ public void setClusterId(String clusterId) {

this.clusterId = clusterId;
}

/**
* @return the connection listener configured for this factory
*/
public ConnectionListener getConnectionListener() {
return this.connListener;
}

/**
* Set a connection listener for the underlying nats connection.
* @param l The new connection listener
*/
public void setConnectionListener(ConnectionListener l) {
this.connListener = l;
}

/**
* @return the error listener associated with this factory
*/
public ErrorListener getErrorListener() {
return this.errListener;
}


/**
* Set a error listener for the underlying nats connection.
* @param l The new error listener
*/
public void setErrorListener(ErrorListener l) {
this.errListener = l;
}
}
1 change: 1 addition & 0 deletions src/main/java/io/nats/streaming/SubscriptionOptions.java
Original file line number Diff line number Diff line change
Expand Up @@ -379,6 +379,7 @@ public Builder deliverAllAvailable() {
*
* A unique dispatcher will be created automatically for each name. Reusing the name reuses the dispatcher.
*
* @param dispatcherName the shared name to use for the dispatcher
* @return this
*/
public Builder dispatcher(String dispatcherName) {
Expand Down
49 changes: 49 additions & 0 deletions src/test/java/io/nats/streaming/OptionsTests.java
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,38 @@ public void testErrorListener() throws Exception {
}
}

@Test
public void testErrorListenerViaFactory() throws Exception {
TestHandler handler = new TestHandler();
try (NatsStreamingTestServer srv = new NatsStreamingTestServer(clusterName, false)) {
StreamingConnectionFactory factory = new StreamingConnectionFactory(clusterName, clientName);
factory.setNatsUrl(srv.getURI());
factory.setErrorListener(handler);
try (StreamingConnection sc = factory.createConnection()) {
final CountDownLatch latch = new CountDownLatch(1);
assertNotNull(sc);

SubscriptionOptions sopts = new SubscriptionOptions.Builder().build();
Subscription sub = sc.subscribe("foo", msg -> {
latch.countDown();
throw new RuntimeException(); // trigger the error handler
}, sopts);
assertNotNull(sub);

sc.publish("foo", "Hello World!".getBytes());

// Wait for the latch, then wait a bit more for the exception to flow to the handler
latch.await(1, TimeUnit.SECONDS);
try {
Thread.sleep(500);
} catch (Exception ex) {
// ignore
}
assertEquals(handler.getExceptionCount(), 1);
}
}
}

@Test
public void testConnectionListener() throws Exception {
TestHandler handler = new TestHandler();
Expand All @@ -86,4 +118,21 @@ public void testConnectionListener() throws Exception {

assertEquals(handler.getEventCount(Events.CLOSED), 1);
}

@Test
public void testConnectionListenerViaFactory() throws Exception {
TestHandler handler = new TestHandler();
try (NatsStreamingTestServer srv = new NatsStreamingTestServer(clusterName, false)) {
StreamingConnectionFactory factory = new StreamingConnectionFactory(clusterName, clientName);
factory.setNatsUrl(srv.getURI());
factory.setConnectionListener(handler);
try (StreamingConnection sc = factory.createConnection()) {
assertNotNull(sc);
assertEquals(handler.getEventCount(Events.CONNECTED), 1);
assertNotNull(handler.getConnection());
}
}

assertEquals(handler.getEventCount(Events.CLOSED), 1);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@
import java.time.Duration;
import org.junit.Test;

import io.nats.client.ConnectionListener;
import io.nats.client.ErrorListener;

public class StreamingConnectionFactoryTest {
private static final String clusterName = "test-cluster";
private static final String clientName = "me";
Expand Down Expand Up @@ -55,6 +58,12 @@ public void testOptions() throws Exception {
cf.setDiscoverPrefix("_FOO");
cf.setMaxPubAcksInFlight(1000);

ErrorListener err = new TestHandler();
ConnectionListener conn = new TestHandler();

cf.setErrorListener(err);
cf.setConnectionListener(conn);

cf.setNatsUrl("nats://foobar:1234");

Options opts = cf.options();
Expand All @@ -65,6 +74,8 @@ public void testOptions() throws Exception {
assertEquals(cf.getMaxPubAcksInFlight(), opts.getMaxPubAcksInFlight());
assertEquals(cf.getNatsUrl(), opts.getNatsUrl());
assertEquals(cf.getNatsConnection(), opts.getNatsConn());
assertEquals(cf.getConnectionListener(), opts.getConnectionListener());
assertEquals(cf.getErrorListener(), opts.getErrorListener());
}

@Test(expected = NullPointerException.class)
Expand Down

0 comments on commit 8c7793e

Please sign in to comment.