Skip to content

Commit

Permalink
GEODE-8498: make AbstractSubscription write to channel synchronously (a…
Browse files Browse the repository at this point in the history
…pache#5550)

Co-authored-by: Ray Ingles <[email protected]>
Co-authored-by: Darrel Schneider <[email protected]>
Co-authored-by: Jens Deppe <[email protected]>
Co-authored-by: Sarah Abbey <[email protected]>
  • Loading branch information
5 people authored Sep 25, 2020
1 parent 06d7bc8 commit 22f2c52
Show file tree
Hide file tree
Showing 5 changed files with 95 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ public class MockSubscriber extends JedisPubSub {
private final CountDownLatch subscriptionLatch;
private final CountDownLatch psubscriptionLatch;
private final CountDownLatch unsubscriptionLatch;
private final CountDownLatch pUnsubscriptionLatch;
private final List<String> receivedMessages = Collections.synchronizedList(new ArrayList<>());
private final List<String> receivedPMessages = Collections.synchronizedList(new ArrayList<>());
private final List<String> receivedPings = Collections.synchronizedList(new ArrayList<>());
Expand All @@ -47,14 +48,16 @@ public MockSubscriber() {
}

public MockSubscriber(CountDownLatch subscriptionLatch) {
this(subscriptionLatch, new CountDownLatch(1), new CountDownLatch(1));
this(subscriptionLatch, new CountDownLatch(1), new CountDownLatch(1), new CountDownLatch(1));
}

public MockSubscriber(CountDownLatch subscriptionLatch, CountDownLatch unsubscriptionLatch,
CountDownLatch psubscriptionLatch) {
CountDownLatch psubscriptionLatch,
CountDownLatch pUnsubscriptionLatch) {
this.subscriptionLatch = subscriptionLatch;
this.psubscriptionLatch = psubscriptionLatch;
this.unsubscriptionLatch = unsubscriptionLatch;
this.pUnsubscriptionLatch = pUnsubscriptionLatch;
}

@Override
Expand Down Expand Up @@ -176,6 +179,18 @@ public void awaitUnsubscribe(String channel) {
public void onPUnsubscribe(String pattern, int subscribedChannels) {
switchThreadName(String.format("PUNSUBSCRIBE %s %d", pattern, subscribedChannels));
punsubscribeInfos.add(new UnsubscribeInfo(pattern, subscribedChannels));
pUnsubscriptionLatch.countDown();
}

public void awaitPunsubscribe(String pChannel) {

try {
if (!pUnsubscriptionLatch.await(AWAIT_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) {
throw new RuntimeException("awaitPUnsubscribe timed out for pattern: " + pChannel);
}
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}

public static class UnsubscribeInfo {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@
import org.junit.After;
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Ignore;
import org.junit.Test;

import org.apache.geode.redis.GeodeRedisServerRule;
Expand Down Expand Up @@ -152,7 +151,6 @@ public void message(String pattern, String channel, String message) {
}

@Test
@Ignore("GEODE-8498")
public void subscribePsubscribeSameClient() throws InterruptedException {
StatefulRedisPubSubConnection<String, String> subscriber = client.connectPubSub();
StatefulRedisPubSubConnection<String, String> publisher = client.connectPubSub();
Expand Down Expand Up @@ -354,6 +352,7 @@ public void concurrentPublishersToMultipleSubscribers_doNotLosePublishMessages()
}

long publishCount = 0;

for (Future<Long> r : results) {
publishCount += r.get();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -470,6 +470,64 @@ public void testPunsubscribingImplicitlyFromAllChannels() {
new MockSubscriber.UnsubscribeInfo("yul*", 0));
}

@Test
public void ensureOrderingOfPublishedMessages() throws Exception {
AtomicBoolean running = new AtomicBoolean(true);

Future<Void> future1 =
executor.submit(() -> runSubscribeAndPublish(1, 10000, running));

running.set(false);
future1.get();
}

private void runSubscribeAndPublish(int index, int minimumIterations, AtomicBoolean running)
throws Exception {

int iterationCount = 0;
Jedis publisher = getConnection();

while (iterationCount < minimumIterations || running.get()) {
List<String> result = subscribeAndPublish(index, iterationCount, publisher);

assertThat(result)
.as("Failed at iteration " + iterationCount)
.containsExactly("message", "pmessage");

iterationCount++;
}
}

private List<String> subscribeAndPublish(int index, int iteration, Jedis localPublisher)
throws Exception {
String channel = index + ".foo.bar";
String pChannel = index + ".foo.*";

MockSubscriber mockSubscriber = new MockSubscriber();

try (Jedis localSubscriber = getConnection()) {
Future<Void> future =
executor.submit(() -> localSubscriber.subscribe(mockSubscriber, channel));

mockSubscriber.awaitSubscribe(channel);

mockSubscriber.psubscribe(pChannel);
mockSubscriber.awaitPSubscribe(pChannel);

localPublisher.publish(channel, "hello-" + index + "-" + iteration);

mockSubscriber.unsubscribe(channel);
mockSubscriber.awaitUnsubscribe(channel);
mockSubscriber.punsubscribe(pChannel);
mockSubscriber.awaitPunsubscribe(pChannel);

future.get();
}

return mockSubscriber.getReceivedEvents();
}


@Test
public void testTwoSubscribersOneChannel() {
Jedis subscriber2 = new Jedis("localhost", getPort(), JEDIS_TIMEOUT);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@

import java.util.concurrent.CountDownLatch;

import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelFuture;
import org.apache.logging.log4j.Logger;

import org.apache.geode.logging.internal.log4j.api.LogService;
Expand Down Expand Up @@ -101,18 +101,20 @@ private RedisResponse constructResponse(byte[] channel, byte[] message) {
}

/**
* We want to determine if the response, to the client, resulted in an error - for example if
* the client has disconnected and the write fails. In such cases we need to be able to notify
* the caller.
* We want to determine if the response, to the client, resulted in an error - for example if the
* client has disconnected and the write fails. In such cases we need to be able to notify the
* caller.
*/
private void writeToChannel(RedisResponse response, PublishResultCollector resultCollector) {
context.writeToChannel(response)
.addListener((ChannelFutureListener) future -> {
if (future.cause() == null) {
resultCollector.success();
} else {
resultCollector.failure(client);
}
});

ChannelFuture result = context.writeToChannel(response)
.syncUninterruptibly();

if (result.cause() == null) {
resultCollector.success();
} else {
resultCollector.failure(client);
}

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,11 @@ public ChannelPromise addListener(
return null;
}

@Override
public ChannelPromise syncUninterruptibly() {
return this;
}

@Override
public Throwable cause() {
return new RuntimeException("aeotunhasoen");
Expand Down

0 comments on commit 22f2c52

Please sign in to comment.