Skip to content

Commit

Permalink
[ADDED] Global Message Delivery Thread Pool
Browse files Browse the repository at this point in the history
This PR adds the ability to set a library level thread pool
responsible to dispatch asynchronous subscriptions' messages.
All messages from a given subscription will be delivered in
the order they arrived.
A connection option allows the user to force the subscription
to use its own thread to dispatch its message even if the
library pool has been set.

Resolves nats-io#114
  • Loading branch information
kozlovic committed Oct 4, 2017
1 parent e10590d commit 818c837
Show file tree
Hide file tree
Showing 40 changed files with 892 additions and 767 deletions.
3 changes: 0 additions & 3 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,6 @@ deploy:
repo: nats-io/java-nats
tags: true
jdk: oraclejdk8
notifications:
email:
- [email protected]
env:
global:
- GPG_DIR=${TRAVIS_BUILD_DIR}/.travis/keyrings
Expand Down
29 changes: 2 additions & 27 deletions src/it/java/io/nats/client/ITAuthTest.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2015-2016 Apcera Inc. All rights reserved. This program and the accompanying
* Copyright (c) 2015-2017 Apcera Inc. All rights reserved. This program and the accompanying
* materials are made available under the terms of the MIT License (MIT) which accompanies this
* distribution, and is available at http://opensource.org/licenses/MIT
*/
Expand All @@ -17,11 +17,6 @@
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;

import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;

Expand All @@ -30,29 +25,9 @@
import java.util.concurrent.TimeUnit;

@Category(IntegrationTest.class)
public class ITAuthTest {
@Rule
public TestCasePrinterRule pr = new TestCasePrinterRule(System.out);

public class ITAuthTest extends ITBaseTest {
private int hitDisconnect;

@BeforeClass
public static void setUpBeforeClass() throws Exception {
}

@AfterClass
public static void tearDownAfterClass() throws Exception {
}

@Before
public void setUp() throws Exception {

}

@After
public void tearDown() throws Exception {
}

@Test
public void testAuth() {
String noAuthUrl = "nats://localhost:1222";
Expand Down
58 changes: 58 additions & 0 deletions src/it/java/io/nats/client/ITBaseTest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
/*
* Copyright (c) 2017 Apcera Inc. All rights reserved. This program and the accompanying
* materials are made available under the terms of the MIT License (MIT) which accompanies this
* distribution, and is available at http://opensource.org/licenses/MIT
*/

package io.nats.client;

import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.experimental.categories.Category;

@Category(IntegrationTest.class)
public class ITBaseTest {
static int msgDeliveryPoolSize = 0;

@Rule
public TestCasePrinterRule pr = new TestCasePrinterRule(System.out);

@BeforeClass
public static void setUpBeforeClass() throws Exception {
String prop = System.getProperty("msgDeliveryPoolSize");
if (prop != null) {
msgDeliveryPoolSize = Integer.parseInt(prop);
}
if (msgDeliveryPoolSize == 0) {
// If the system environment variable JNATS_MSG_DELIVERY_THREAD_POOL_SIZE
// exists (and number if positive), the library will create the thread
// pool. We need to shutdown the pool here. Individual tests may create
// the pool with the size they want.
Nats.shutdownMsgDeliveryThreadPool();
}
}

@AfterClass
public static void tearDownAfterClass() throws Exception {
}

@Before
public void setUp() throws Exception {
// If need be, setup the message delivery thread pool for each
// test. This allows finding out if a test is holding onto
// a resource in the message callback preventing the pool to
// be shutdown.
if (msgDeliveryPoolSize > 0) {
Nats.setMsgDeliveryThreadPoolSize(msgDeliveryPoolSize);
}
}

@After
public void tearDown() throws Exception {
// Shutdown the message delivery pool if one was set.
Nats.shutdownMsgDeliveryThreadPool();
}
}
39 changes: 9 additions & 30 deletions src/it/java/io/nats/client/ITBasicTest.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2015-2016 Apcera Inc. All rights reserved. This program and the accompanying
* Copyright (c) 2015-2017 Apcera Inc. All rights reserved. This program and the accompanying
* materials are made available under the terms of the MIT License (MIT) which accompanies this
* distribution, and is available at http://opensource.org/licenses/MIT
*/
Expand All @@ -25,10 +25,7 @@
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;

import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
Expand All @@ -49,37 +46,19 @@
import java.util.concurrent.atomic.AtomicInteger;

@Category(IntegrationTest.class)
public class ITBasicTest {

public class ITBasicTest extends ITBaseTest {
@Rule
public final ExpectedException thrown = ExpectedException.none();

@Rule
public TestCasePrinterRule pr = new TestCasePrinterRule(System.out);

private final ExecutorService executor = Executors.newCachedThreadPool();
UnitTestUtilities utils = new UnitTestUtilities();

@BeforeClass
public static void setUpBeforeClass() throws Exception {
}

@AfterClass
public static void tearDownAfterClass() throws Exception {
}

@Before
public void setUp() throws Exception {
super.setUp();
MockitoAnnotations.initMocks(this);
}

/**
* @throws java.lang.Exception if a problem occurs
*/
@After
public void tearDown() throws Exception {
}

@Test
public void testConnectedServer() throws Exception {
try (NatsServer srv = runDefaultServer()) {
Expand Down Expand Up @@ -285,17 +264,17 @@ public void testQueueSubscriber() throws Exception {
c.publish("foo", omsg);
c.flush();

int r1 = s1.getQueuedMessageCount();
int r2 = s2.getQueuedMessageCount();
int r1 = s1.getPendingMsgs();
int r2 = s2.getPendingMsgs();
assertEquals("Received too many messages for multiple queue subscribers", 1,
r1 + r2);

// Drain the messages.
s1.nextMessage(250, TimeUnit.MILLISECONDS);
assertEquals(0, s1.getQueuedMessageCount());
assertEquals(0, s1.getPendingMsgs());

s2.nextMessage(250, TimeUnit.MILLISECONDS);
assertEquals(0, s2.getQueuedMessageCount());
assertEquals(0, s2.getPendingMsgs());

int total = 1000;
for (int i = 0; i < total; i++) {
Expand All @@ -308,8 +287,8 @@ public void testQueueSubscriber() throws Exception {
assertEquals(total, si1.getChannel().size() + si2.getChannel().size());

final int variance = (int) (total * 0.15);
r1 = s1.getQueuedMessageCount();
r2 = s2.getQueuedMessageCount();
r1 = s1.getPendingMsgs();
r2 = s2.getPendingMsgs();
assertEquals("Incorrect total number of messages: ", total, r1 + r2);

double expected = total / 2;
Expand Down
29 changes: 2 additions & 27 deletions src/it/java/io/nats/client/ITClusterTest.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2015-2016 Apcera Inc. All rights reserved. This program and the accompanying
* Copyright (c) 2015-2017 Apcera Inc. All rights reserved. This program and the accompanying
* materials are made available under the terms of the MIT License (MIT) which accompanies this
* distribution, and is available at http://opensource.org/licenses/MIT
*/
Expand All @@ -20,11 +20,6 @@

import java.util.concurrent.TimeUnit;
import org.apache.commons.lang3.StringUtils;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;

Expand All @@ -43,27 +38,7 @@
import java.util.concurrent.atomic.AtomicInteger;

@Category(IntegrationTest.class)
public class ITClusterTest {

@Rule
public TestCasePrinterRule pr = new TestCasePrinterRule(System.out);

@BeforeClass
public static void setUpBeforeClass() throws Exception {
}

@AfterClass
public static void tearDownAfterClass() throws Exception {
}

@Before
public void setUp() throws Exception {
}

@After
public void tearDown() throws Exception {
}

public class ITClusterTest extends ITBaseTest {
private static final String[] testServers = new String[] {"nats://localhost:1222",
"nats://localhost:1223", "nats://localhost:1224", "nats://localhost:1225",
"nats://localhost:1226", "nats://localhost:1227", "nats://localhost:1228"};
Expand Down
29 changes: 2 additions & 27 deletions src/it/java/io/nats/client/ITConnectionTest.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2015-2016 Apcera Inc. All rights reserved. This program and the accompanying
* Copyright (c) 2015-2017 Apcera Inc. All rights reserved. This program and the accompanying
* materials are made available under the terms of the MIT License (MIT) which accompanies this
* distribution, and is available at http://opensource.org/licenses/MIT
*/
Expand All @@ -23,16 +23,11 @@

import io.nats.client.ConnectionImpl.Srv;
import org.hamcrest.core.IsNot;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.rules.ExpectedException;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
Expand All @@ -46,32 +41,12 @@
import java.util.concurrent.atomic.AtomicLong;

@Category(IntegrationTest.class)
public class ITConnectionTest {

public class ITConnectionTest extends ITBaseTest {
@Rule
public ExpectedException thrown = ExpectedException.none();

@Rule
public TestCasePrinterRule pr = new TestCasePrinterRule(System.out);

ExecutorService executor = Executors.newFixedThreadPool(5);

@BeforeClass
public static void setUpBeforeClass() throws Exception {
}

@AfterClass
public static void tearDownAfterClass() throws Exception {
}

@Before
public void setUp() throws Exception {
}

@After
public void tearDown() throws Exception {
}

@Test
public void testDefaultConnection() throws Exception {
try (NatsServer srv = runDefaultServer()) {
Expand Down
28 changes: 2 additions & 26 deletions src/it/java/io/nats/client/ITReconnectTest.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2015-2016 Apcera Inc. All rights reserved. This program and the accompanying
* Copyright (c) 2015-2017 Apcera Inc. All rights reserved. This program and the accompanying
* materials are made available under the terms of the MIT License (MIT) which accompanies this
* distribution, and is available at http://opensource.org/licenses/MIT
*/
Expand All @@ -16,11 +16,6 @@
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;

import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;

Expand All @@ -33,29 +28,10 @@
import java.util.concurrent.atomic.AtomicInteger;

@Category(IntegrationTest.class)
public class ITReconnectTest {

@Rule
public TestCasePrinterRule pr = new TestCasePrinterRule(System.out);
public class ITReconnectTest extends ITBaseTest {

private final Properties reconnectOptions = getReconnectOptions();

@BeforeClass
public static void setUpBeforeClass() throws Exception {
}

@AfterClass
public static void tearDownAfterClass() throws Exception {
}

@Before
public void setUp() throws Exception {
}

@After
public void tearDown() throws Exception {
}

private static Properties getReconnectOptions() {
Properties props = new Properties();

Expand Down
Loading

0 comments on commit 818c837

Please sign in to comment.