Skip to content

Commit

Permalink
GEODE-8333: Fix PUBSUB hang (apache#5349)
Browse files Browse the repository at this point in the history
PUBSUB hangs with concurrent publishers and subscribers on multiple
servers. Changed Publish executor to execute publish on background
thread. Removed separate separate subscriber group.  Added DUnit test to
recreate failure.
A commandQueue is now used to preserve the order
in which commands are executed even if a command
if executed async. Currently the only async command
is PUBLISH.

Co-authored-by: Murtuza Boxwala <[email protected]>
Co-authored-by: Sarah Abbey <[email protected]>
  • Loading branch information
3 people authored Jul 24, 2020
1 parent c3cefe1 commit 067194e
Show file tree
Hide file tree
Showing 12 changed files with 357 additions and 144 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

import org.junit.AfterClass;
import org.junit.Before;
Expand All @@ -39,6 +40,7 @@
import redis.clients.jedis.Jedis;
import redis.clients.jedis.exceptions.JedisException;

import org.apache.geode.logging.internal.executors.LoggingThread;
import org.apache.geode.redis.MockSubscriber;
import org.apache.geode.test.awaitility.GeodeAwaitility;
import org.apache.geode.test.dunit.rules.MemberVM;
Expand All @@ -49,6 +51,7 @@
public class PubSubDUnitTest {

public static final String CHANNEL_NAME = "salutations";
public static final int JEDIS_TIMEOUT = Math.toIntExact(GeodeAwaitility.getTimeout().toMillis());

@ClassRule
public static RedisClusterStartupRule cluster = new RedisClusterStartupRule(6);
Expand Down Expand Up @@ -125,6 +128,67 @@ public static void tearDown() {
server5.stop();
}

@Test
public void shouldNotHang_givenPublishingAndSubscribingSimultaneously() {
ArrayList<Thread> threads = new ArrayList<>();
AtomicInteger subscribeCount = new AtomicInteger();
AtomicInteger publishCount = new AtomicInteger();
Random random = new Random();

for (int i = 0; i < 200; i++) {
String channelName = "theBestChannel" + i;
Thread thread = new LoggingThread(channelName, () -> {
ArrayList<MockSubscriber> mockSubscribers = new ArrayList<>();
ArrayList<JedisWithLatch> clients = new ArrayList<>();
for (int j = 0; j < 5; j++) {
CountDownLatch latch = new CountDownLatch(1);
MockSubscriber mockSubscriber = new MockSubscriber(latch);
executor.submit(() -> {
Jedis client = getConnection(random);
JedisWithLatch jedisWithLatch = new JedisWithLatch(client);
clients.add(jedisWithLatch);
client.subscribe(mockSubscriber, channelName);
subscribeCount.getAndIncrement();
jedisWithLatch.finishSubscribe();
});
try {
latch.await();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
mockSubscribers.add(mockSubscriber);
}

Jedis localPublisher = getConnection(random);
localPublisher.publish(channelName, "hi");
publishCount.getAndIncrement();
try {
localPublisher.close();
} catch (Exception ex) {
}

mockSubscribers.forEach(s -> {
GeodeAwaitility.await().ignoreExceptions()
.until(() -> s.getReceivedMessages().get(0).equals("hi"));
s.unsubscribe(channelName);
});
clients.forEach(JedisWithLatch::close);
});
threads.add(thread);
thread.start();
}

threads.forEach(thread -> {
try {
thread.join();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
});
assertThat(publishCount.get()).isEqualTo(200);
assertThat(subscribeCount.get()).isEqualTo(1000);
}

@Test
public void shouldContinueToFunction_whenOneServerShutsDownGracefully_givenTwoSubscribersOnePublisher()
throws InterruptedException {
Expand All @@ -133,13 +197,12 @@ public void shouldContinueToFunction_whenOneServerShutsDownGracefully_givenTwoSu
MockSubscriber mockSubscriber1 = new MockSubscriber(latch);
MockSubscriber mockSubscriber2 = new MockSubscriber(latch);

Future<Void> subscriber1Future = executor.submit(
() -> subscriber1.subscribe(mockSubscriber1, CHANNEL_NAME));
Future<Void> subscriber2Future = executor.submit(
() -> subscriber2.subscribe(mockSubscriber2, CHANNEL_NAME));
Future<Void> subscriber1Future =
executor.submit(() -> subscriber1.subscribe(mockSubscriber1, CHANNEL_NAME));
Future<Void> subscriber2Future =
executor.submit(() -> subscriber2.subscribe(mockSubscriber2, CHANNEL_NAME));

assertThat(latch.await(30, TimeUnit.SECONDS))
.as("channel subscription was not received")
assertThat(latch.await(30, TimeUnit.SECONDS)).as("channel subscription was not received")
.isTrue();

Long result = publisher1.publish(CHANNEL_NAME, "hello");
Expand All @@ -164,13 +227,12 @@ public void shouldContinueToFunction_whenOneServerShutsDownAbruptly_givenTwoSubs
MockSubscriber mockSubscriber1 = new MockSubscriber(latch);
MockSubscriber mockSubscriber2 = new MockSubscriber(latch);

Future<Void> subscriber1Future = executor.submit(
() -> subscriber1.subscribe(mockSubscriber1, CHANNEL_NAME));
Future<Void> subscriber2Future = executor.submit(
() -> subscriber2.subscribe(mockSubscriber2, CHANNEL_NAME));
Future<Void> subscriber1Future =
executor.submit(() -> subscriber1.subscribe(mockSubscriber1, CHANNEL_NAME));
Future<Void> subscriber2Future =
executor.submit(() -> subscriber2.subscribe(mockSubscriber2, CHANNEL_NAME));

assertThat(latch.await(30, TimeUnit.SECONDS))
.as("channel subscription was not received")
assertThat(latch.await(30, TimeUnit.SECONDS)).as("channel subscription was not received")
.isTrue();

Long result = publisher1.publish(CHANNEL_NAME, "hello");
Expand Down Expand Up @@ -216,13 +278,12 @@ public void shouldContinueToFunction_whenOneServerShutsDownGracefully_givenTwoSu
MockSubscriber mockSubscriber1 = new MockSubscriber(latch);
MockSubscriber mockSubscriber2 = new MockSubscriber(latch);

Future<Void> subscriber1Future = executor.submit(
() -> subscriber1.subscribe(mockSubscriber1, CHANNEL_NAME));
Future<Void> subscriber2Future = executor.submit(
() -> subscriber2.subscribe(mockSubscriber2, CHANNEL_NAME));
Future<Void> subscriber1Future =
executor.submit(() -> subscriber1.subscribe(mockSubscriber1, CHANNEL_NAME));
Future<Void> subscriber2Future =
executor.submit(() -> subscriber2.subscribe(mockSubscriber2, CHANNEL_NAME));

assertThat(latch.await(30, TimeUnit.SECONDS))
.as("channel subscription was not received")
assertThat(latch.await(30, TimeUnit.SECONDS)).as("channel subscription was not received")
.isTrue();

Long resultPublisher1 = publisher1.publish(CHANNEL_NAME, "hello");
Expand All @@ -249,13 +310,12 @@ public void testSubscribePublishUsingDifferentServers() throws Exception {
MockSubscriber mockSubscriber1 = new MockSubscriber(latch);
MockSubscriber mockSubscriber2 = new MockSubscriber(latch);

Future<Void> subscriber1Future = executor.submit(
() -> subscriber1.subscribe(mockSubscriber1, CHANNEL_NAME));
Future<Void> subscriber2Future = executor.submit(
() -> subscriber2.subscribe(mockSubscriber2, CHANNEL_NAME));
Future<Void> subscriber1Future =
executor.submit(() -> subscriber1.subscribe(mockSubscriber1, CHANNEL_NAME));
Future<Void> subscriber2Future =
executor.submit(() -> subscriber2.subscribe(mockSubscriber2, CHANNEL_NAME));

assertThat(latch.await(30, TimeUnit.SECONDS))
.as("channel subscription was not received")
assertThat(latch.await(30, TimeUnit.SECONDS)).as("channel subscription was not received")
.isTrue();

Long result = publisher1.publish(CHANNEL_NAME, "hello");
Expand All @@ -277,13 +337,12 @@ public void testConcurrentPubSub() throws Exception {
MockSubscriber mockSubscriber1 = new MockSubscriber(latch);
MockSubscriber mockSubscriber2 = new MockSubscriber(latch);

Future<Void> subscriber1Future = executor.submit(
() -> subscriber1.subscribe(mockSubscriber1, CHANNEL_NAME));
Future<Void> subscriber2Future = executor.submit(
() -> subscriber2.subscribe(mockSubscriber2, CHANNEL_NAME));
Future<Void> subscriber1Future =
executor.submit(() -> subscriber1.subscribe(mockSubscriber1, CHANNEL_NAME));
Future<Void> subscriber2Future =
executor.submit(() -> subscriber2.subscribe(mockSubscriber2, CHANNEL_NAME));

assertThat(latch.await(30, TimeUnit.SECONDS))
.as("channel subscription was not received")
assertThat(latch.await(30, TimeUnit.SECONDS)).as("channel subscription was not received")
.isTrue();

List<Future<Void>> futures = new LinkedList<>();
Expand Down Expand Up @@ -410,10 +469,8 @@ private void restartServerVM2() {
}

private void waitForRestart() {
await()
.untilAsserted(() -> gfsh.executeAndAssertThat("list members")
.statusIsSuccess()
.hasTableSection()
await().untilAsserted(
() -> gfsh.executeAndAssertThat("list members").statusIsSuccess().hasTableSection()
.hasColumn("Name")
.containsOnly("locator-0", "server-1", "server-2", "server-3", "server-4", "server-5"));
}
Expand All @@ -425,4 +482,48 @@ private void reconnectSubscriber1() {
private void reconnectSubscriber2() {
subscriber2 = new Jedis(LOCAL_HOST, redisServerPort2);
}


private static class JedisWithLatch {
public final Jedis jedis;
public final CountDownLatch latch;

JedisWithLatch(Jedis jedis) {
this.latch = new CountDownLatch(1);
this.jedis = jedis;
}

public void finishSubscribe() {
latch.countDown();
}

public void close() {
try {
latch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
jedis.close();
}
}

private Jedis getConnection(Random random) {
Jedis client;

for (int i = 0; i < 10; i++) {
int randPort = random.nextInt(4) + 1;
client = new Jedis("localhost", cluster.getRedisPort(randPort), JEDIS_TIMEOUT);
try {
client.ping();
return client;
} catch (Exception e) {
try {
client.close();
} catch (Exception exception) {

}
}
}
throw new RuntimeException("Tried 10 times, but could not get a good connection.");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

import org.junit.After;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.ClassRule;
Expand All @@ -44,6 +45,7 @@ public class PubSubIntegrationTest {
static Jedis publisher;
static Jedis subscriber;
static final int REDIS_CLIENT_TIMEOUT = 100000;
public static final int JEDIS_TIMEOUT = Math.toIntExact(GeodeAwaitility.getTimeout().toMillis());

@ClassRule
public static GeodeRedisServerRule server = new GeodeRedisServerRule();
Expand All @@ -63,6 +65,11 @@ public static void tearDown() {
publisher.close();
}

@After
public void afterMethodTearDown() {
System.gc();
}

public int getPort() {
return server.getPort();
}
Expand Down Expand Up @@ -378,12 +385,12 @@ public void testSubscribingAndUnsubscribingFromMultipleChannels() {
waitFor(() -> mockSubscriber.getSubscribedChannels() == 0);
waitFor(() -> !subscriberThread.isAlive());

List<String> unsubscribedChannels = mockSubscriber.unsubscribeInfos.stream()
.map(x -> x.channel).collect(Collectors.toList());
List<String> unsubscribedChannels = mockSubscriber.unsubscribeInfos.stream().map(x -> x.channel)
.collect(Collectors.toList());
assertThat(unsubscribedChannels).containsExactlyInAnyOrder("salutations", "yuletide");

List<Integer> channelCounts = mockSubscriber.unsubscribeInfos.stream()
.map(x -> x.count).collect(Collectors.toList());
List<Integer> channelCounts = mockSubscriber.unsubscribeInfos.stream().map(x -> x.count)
.collect(Collectors.toList());
assertThat(channelCounts).containsExactlyInAnyOrder(1, 0);

}
Expand All @@ -403,12 +410,12 @@ public void testUnsubscribingImplicitlyFromAllChannels() {
waitFor(() -> mockSubscriber.getSubscribedChannels() == 0);
waitFor(() -> !subscriberThread.isAlive());

List<String> unsubscribedChannels = mockSubscriber.unsubscribeInfos.stream()
.map(x -> x.channel).collect(Collectors.toList());
List<String> unsubscribedChannels = mockSubscriber.unsubscribeInfos.stream().map(x -> x.channel)
.collect(Collectors.toList());
assertThat(unsubscribedChannels).containsExactlyInAnyOrder("salutations", "yuletide");

List<Integer> channelCounts = mockSubscriber.unsubscribeInfos.stream()
.map(x -> x.count).collect(Collectors.toList());
List<Integer> channelCounts = mockSubscriber.unsubscribeInfos.stream().map(x -> x.count)
.collect(Collectors.toList());
assertThat(channelCounts).containsExactlyInAnyOrder(1, 0);

Long result = publisher.publish("salutations", "greetings");
Expand All @@ -429,9 +436,9 @@ public void testPsubscribingAndPunsubscribingFromMultipleChannels() {
mockSubscriber.punsubscribe("yul*", "sal*");
waitFor(() -> mockSubscriber.getSubscribedChannels() == 0);
waitFor(() -> !subscriberThread.isAlive());
assertThat(mockSubscriber.punsubscribeInfos).containsExactly(
new MockSubscriber.UnsubscribeInfo("yul*", 1),
new MockSubscriber.UnsubscribeInfo("sal*", 0));
assertThat(mockSubscriber.punsubscribeInfos)
.containsExactly(new MockSubscriber.UnsubscribeInfo("yul*", 1),
new MockSubscriber.UnsubscribeInfo("sal*", 0));
}

@Test
Expand All @@ -448,9 +455,9 @@ public void testPunsubscribingImplicitlyFromAllChannels() {
mockSubscriber.punsubscribe();
waitFor(() -> mockSubscriber.getSubscribedChannels() == 0);
waitFor(() -> !subscriberThread.isAlive());
assertThat(mockSubscriber.punsubscribeInfos).containsExactly(
new MockSubscriber.UnsubscribeInfo("sal*", 1),
new MockSubscriber.UnsubscribeInfo("yul*", 0));
assertThat(mockSubscriber.punsubscribeInfos)
.containsExactly(new MockSubscriber.UnsubscribeInfo("sal*", 1),
new MockSubscriber.UnsubscribeInfo("yul*", 0));
}

@Test
Expand Down Expand Up @@ -588,8 +595,7 @@ public void testSubscribeToSamePattern() {

waitFor(() -> mockSubscriber.getSubscribedChannels() == 1);
mockSubscriber.psubscribe("sal*s");
GeodeAwaitility.await()
.during(5, TimeUnit.SECONDS)
GeodeAwaitility.await().during(5, TimeUnit.SECONDS)
.until(() -> mockSubscriber.getSubscribedChannels() == 1);

String message = "hello-" + System.currentTimeMillis();
Expand Down Expand Up @@ -675,8 +681,8 @@ public void testPatternWithoutAGlob() {
}

private void waitFor(Callable<Boolean> booleanCallable) {
GeodeAwaitility.await()
.ignoreExceptions() // ignoring socket closed exceptions
GeodeAwaitility.await().atMost(30, TimeUnit.SECONDS).ignoreExceptions() // ignoring socket
// closed exceptions
.until(booleanCallable);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,9 @@ public GeodeRedisServer(String bindAddress, int port, InternalCache cache) {
new PassiveExpirationManager(regionProvider.getDataRegion(), redisStats);
nettyRedisServer = new NettyRedisServer(() -> cache.getInternalDistributedSystem().getConfig(),
regionProvider, pubSub,
this::allowUnsupportedCommands, this::shutdown, port, bindAddress, redisStats);
this::allowUnsupportedCommands, this::shutdown, port, bindAddress, redisStats,
cache.getInternalDistributedSystem().getDistributionManager().getExecutors()
.getWaitingThreadPool());
}

public void setAllowUnsupportedCommands(boolean allowUnsupportedCommands) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -401,6 +401,10 @@ public boolean isUnimplemented() {
return supportLevel == UNIMPLEMENTED;
}

public boolean isAsync() {
return this == PUBLISH;
}

public RedisResponse executeCommand(Command command,
ExecutionHandlerContext executionHandlerContext) {
parameterRequirements.checkParameters(command, executionHandlerContext);
Expand Down
Loading

0 comments on commit 067194e

Please sign in to comment.