Skip to content

Commit

Permalink
GEODE-9338: Remove strong guarantees for Radish PUBLISH command (apac…
Browse files Browse the repository at this point in the history
…he#6704)

GEODE-9338: Remove strong guarantees for Radish PUBLISH command

Previously we guaranteed that when we published something, we would return the number of members who acknowledged that message. This is not required based on native Redis’s behavior, so this commit makes changes to instead return the number of currently subscribed members.

- Use StripedCoordinator instead of StripedExecutor
- Remove event loop latches in executors
- Remove subscriber group
- Max number of publish threads set to 10
- Use a striped runnable in PubSubImpl to do the publish operation
- Add StripedExecutorService from Heinz Kabutz
- Netty version 4.1.59 -> 4.1.66
- Change pubsub work queue to a LinkedBlockingQueue

Co-authored-by: Jens Deppe <[email protected]>
  • Loading branch information
nonbinaryprogrammer and jdeppe-pivotal authored Aug 11, 2021
1 parent bca2aeb commit 8aeec39
Show file tree
Hide file tree
Showing 39 changed files with 1,273 additions and 279 deletions.
4 changes: 4 additions & 0 deletions NOTICE
Original file line number Diff line number Diff line change
Expand Up @@ -19,3 +19,7 @@ Copyright 2016 AddThis

This product includes software developed by the MX4J
project (http://mx4j.sourceforge.net).

striped-executor-service
Copyright 2000-2012 Heinz Max Kabutz
From The Java Specialists' Newsletter (http://www.javaspecialists.eu)
2 changes: 1 addition & 1 deletion boms/geode-all-bom/src/test/resources/expected-pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,7 @@
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.59.Final</version>
<version>4.1.66.Final</version>
</dependency>
<dependency>
<groupId>io.swagger</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ class DependencyConstraints implements Plugin<Project> {
api(group: 'io.github.resilience4j', name: 'resilience4j-retry', version: '1.7.1')
api(group: 'io.lettuce', name: 'lettuce-core', version: '6.1.3.RELEASE')
api(group: 'io.micrometer', name: 'micrometer-core', version: get('micrometer.version'))
api(group: 'io.netty', name: 'netty-all', version: '4.1.59.Final')
api(group: 'io.netty', name: 'netty-all', version: '4.1.66.Final')
api(group: 'io.swagger', name: 'swagger-annotations', version: '1.6.2')
api(group: 'it.unimi.dsi', name: 'fastutil', version: get('fastutil.version'))
api(group: 'javax.annotation', name: 'javax.annotation-api', version: '1.3.2')
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,17 +15,34 @@

package org.apache.geode.redis.internal.executor.pubsub;

import org.apache.logging.log4j.Logger;
import org.junit.AfterClass;
import org.junit.ClassRule;

import org.apache.geode.NativeRedisTestRule;
import org.apache.geode.logging.internal.log4j.api.LogService;

public class PubSubNativeRedisAcceptanceTest extends AbstractPubSubIntegrationTest {

private static final Logger logger = LogService.getLogger();

@ClassRule
public static NativeRedisTestRule redis = new NativeRedisTestRule();

@AfterClass
public static void cleanup() throws InterruptedException {
// This test consumes a lot of sockets and any subsequent tests may fail because of spurious
// bind exceptions. Even though sockets are closed, they will remain in TIME_WAIT state so we
// need to wait for that to clear up. It shouldn't take more than a minute or so.
// There will be a better solution for this from GEODE-9495, but for now a thread sleep is the
// simplest way to wait for the sockets to be out of the TIME_WAIT state. The timeout of 240 sec
// was chosen because that is the default duration for TIME_WAIT on Windows. The timeouts for
// both mac and linux are significantly shorter.
Thread.sleep(240000);
}

@Override
public int getPort() {
return redis.getPort();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@

package org.apache.geode.redis.mocks;

import static org.assertj.core.api.Assertions.assertThat;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
Expand All @@ -26,6 +28,7 @@
import redis.clients.jedis.JedisPubSub;
import redis.clients.jedis.exceptions.JedisConnectionException;


public class MockSubscriber extends JedisPubSub {

private final CountDownLatch subscriptionLatch;
Expand All @@ -40,6 +43,8 @@ public class MockSubscriber extends JedisPubSub {
Collections.synchronizedList(new ArrayList<>());
public final List<UnsubscribeInfo> punsubscribeInfos =
Collections.synchronizedList(new ArrayList<>());
private CountDownLatch messageReceivedLatch = new CountDownLatch(0);
private CountDownLatch pMessageReceivedLatch = new CountDownLatch(0);
private String localSocketAddress;
private Client client;

Expand Down Expand Up @@ -99,13 +104,15 @@ public void onMessage(String channel, String message) {
switchThreadName(String.format("MESSAGE %s %s", channel, message));
receivedMessages.add(message);
receivedEvents.add("message");
messageReceivedLatch.countDown();
}

@Override
public void onPMessage(String pattern, String channel, String message) {
switchThreadName(String.format("PMESSAGE %s %s %s", pattern, channel, message));
receivedPMessages.add(message);
receivedEvents.add("pmessage");
pMessageReceivedLatch.countDown();
}

@Override
Expand Down Expand Up @@ -193,6 +200,30 @@ public void awaitPunsubscribe(String pChannel) {
}
}

public void preparePMessagesReceivedLatch(int expectedMessages) {
pMessageReceivedLatch = new CountDownLatch(expectedMessages);
}

public void awaitPMessagesReceived() {
try {
assertThat(pMessageReceivedLatch.await(30, TimeUnit.SECONDS)).isTrue();
} catch (InterruptedException ex) {
throw new RuntimeException(ex);
}
}

public void prepareMessagesReceivedLatch(int expectedMessages) {
messageReceivedLatch = new CountDownLatch(expectedMessages);
}

public void awaitMessagesReceived() {
try {
assertThat(messageReceivedLatch.await(30, TimeUnit.SECONDS)).isTrue();
} catch (InterruptedException ex) {
throw new RuntimeException(ex);
}
}

public static class UnsubscribeInfo {
public final String channel;
public final int count;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,8 @@
import redis.clients.jedis.exceptions.JedisMovedDataException;

import org.apache.geode.redis.internal.data.RedisKey;
import org.apache.geode.redis.internal.executor.StripedExecutor;
import org.apache.geode.redis.internal.executor.SynchronizedStripedExecutor;
import org.apache.geode.redis.internal.services.StripedCoordinator;
import org.apache.geode.redis.internal.services.SynchronizedStripedCoordinator;
import org.apache.geode.test.awaitility.GeodeAwaitility;
import org.apache.geode.test.dunit.rules.MemberVM;
import org.apache.geode.test.dunit.rules.RedisClusterStartupRule;
Expand Down Expand Up @@ -138,13 +138,13 @@ private Set<String> getKeysOnSameRandomStripe(int numKeysNeeded) {
Random random = new Random();
String key1 = "{rename}keyz" + random.nextInt();
RedisKey key1RedisKey = new RedisKey(key1.getBytes());
StripedExecutor stripedExecutor = new SynchronizedStripedExecutor();
StripedCoordinator stripedCoordinator = new SynchronizedStripedCoordinator();
Set<String> keys = new HashSet<>();
keys.add(key1);

do {
String key2 = "{rename}key" + random.nextInt();
if (stripedExecutor.compareStripes(key1RedisKey,
if (stripedCoordinator.compareStripes(key1RedisKey,
new RedisKey(key2.getBytes())) == 0) {
keys.add(key2);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -167,11 +167,13 @@ public void shouldNotHang_givenPublishingAndSubscribingSimultaneously() {
mockSubscribers.forEach(x -> x.awaitSubscribe(channelName));

Jedis localPublisher = getConnection(random);
long published = localPublisher.publish(channelName, "hi");
publishCount.getAndAdd(published);
mockSubscribers.forEach(x -> x.prepareMessagesReceivedLatch(1));
localPublisher.publish(channelName, "hi");
mockSubscribers.forEach(MockSubscriber::awaitMessagesReceived);
localPublisher.close();

mockSubscribers.forEach(s -> {
publishCount.addAndGet(s.getReceivedMessages().size());
s.unsubscribe(channelName);
s.awaitUnsubscribe(channelName);
});
Expand Down Expand Up @@ -209,20 +211,19 @@ public void shouldContinueToFunction_whenOneServerShutsDownGracefully_givenTwoSu
MockSubscriber mockSubscriber1 = new MockSubscriber(latch);
MockSubscriber mockSubscriber2 = new MockSubscriber(latch);

Future<Void> subscriber1Future =
executor.submit(() -> subscriber1.subscribe(mockSubscriber1, CHANNEL_NAME));
executor.submit(() -> subscriber1.subscribe(mockSubscriber1, CHANNEL_NAME));
Future<Void> subscriber2Future =
executor.submit(() -> subscriber2.subscribe(mockSubscriber2, CHANNEL_NAME));

assertThat(latch.await(30, TimeUnit.SECONDS)).as("channel subscription was not received")
.isTrue();

Long result = publisher1.publish(CHANNEL_NAME, "hello");
assertThat(result).isEqualTo(2);
publisher1.publish(CHANNEL_NAME, "hello");

server1.stop();
Long resultFromSecondMessage = publisher1.publish(CHANNEL_NAME, "hello again");
assertThat(resultFromSecondMessage).isEqualTo(1);
mockSubscriber2.prepareMessagesReceivedLatch(1);
publisher1.publish(CHANNEL_NAME, "hello again");
mockSubscriber2.awaitMessagesReceived();

mockSubscriber2.unsubscribe(CHANNEL_NAME);
GeodeAwaitility.await().untilAsserted(subscriber2Future::get);
Expand All @@ -244,18 +245,17 @@ public void shouldContinueToFunction_whenOneServerShutsDownAbruptly_givenTwoSubs
Future<Void> subscriber2Future =
executor.submit(() -> subscriber2.subscribe(mockSubscriber2, CHANNEL_NAME));

assertThat(latch.await(30, TimeUnit.SECONDS)).as("channel subscription was not received")
.isTrue();
assertThat(latch.await(GeodeAwaitility.getTimeout().getSeconds(), TimeUnit.SECONDS))
.as("channel subscription was not received").isTrue();

Long result = publisher1.publish(CHANNEL_NAME, "hello");
assertThat(result).isEqualTo(2);
publisher1.publish(CHANNEL_NAME, "hello");

cluster.crashVM(2);

boolean published = false;
do {
try {
result = publisher1.publish(CHANNEL_NAME, "hello again");
publisher1.publish(CHANNEL_NAME, "hello again");
published = true;
} catch (JedisConnectionException ex) {
if (ex.getMessage().contains("Unexpected end of stream.")) {
Expand All @@ -266,7 +266,6 @@ public void shouldContinueToFunction_whenOneServerShutsDownAbruptly_givenTwoSubs
}
}
} while (!published);
assertThat(result).isLessThanOrEqualTo(1);

mockSubscriber1.unsubscribe(CHANNEL_NAME);

Expand All @@ -291,22 +290,22 @@ public void shouldContinueToFunction_whenOneServerShutsDownGracefully_givenTwoSu

Future<Void> subscriber1Future =
executor.submit(() -> subscriber1.subscribe(mockSubscriber1, CHANNEL_NAME));
Future<Void> subscriber2Future =
executor.submit(() -> subscriber2.subscribe(mockSubscriber2, CHANNEL_NAME));
executor.submit(() -> subscriber2.subscribe(mockSubscriber2, CHANNEL_NAME));

assertThat(latch.await(30, TimeUnit.SECONDS)).as("channel subscription was not received")
.isTrue();

Long resultPublisher1 = publisher1.publish(CHANNEL_NAME, "hello");
Long resultPublisher2 = publisher2.publish(CHANNEL_NAME, "hello");
assertThat(resultPublisher1).isEqualTo(2);
assertThat(resultPublisher2).isEqualTo(2);
publisher1.publish(CHANNEL_NAME, "hello");
publisher2.publish(CHANNEL_NAME, "hello");

server2.stop();

mockSubscriber1.prepareMessagesReceivedLatch(1);
publisher1.publish(CHANNEL_NAME, "hello again");
publisher2.publish(CHANNEL_NAME, "hello again");

mockSubscriber1.awaitMessagesReceived();

mockSubscriber1.unsubscribe(CHANNEL_NAME);

GeodeAwaitility.await().untilAsserted(subscriber1Future::get);
Expand All @@ -329,8 +328,12 @@ public void testSubscribePublishUsingDifferentServers() throws Exception {
assertThat(latch.await(30, TimeUnit.SECONDS)).as("channel subscription was not received")
.isTrue();

Long result = publisher1.publish(CHANNEL_NAME, "hello");
assertThat(result).isEqualTo(2);
mockSubscriber1.prepareMessagesReceivedLatch(1);
mockSubscriber2.prepareMessagesReceivedLatch(1);
publisher1.publish(CHANNEL_NAME, "hello");

mockSubscriber1.awaitMessagesReceived();
mockSubscriber2.awaitMessagesReceived();

mockSubscriber1.unsubscribe(CHANNEL_NAME);
mockSubscriber2.unsubscribe(CHANNEL_NAME);
Expand Down Expand Up @@ -361,9 +364,10 @@ public void testConcurrentPubSub() throws Exception {
for (int i = 0; i < CLIENT_COUNT; i++) {
Jedis publisher = new Jedis("localhost", ports[i % 2]);

int localI = i;
Callable<Void> callable = () -> {
for (int j = 0; j < ITERATIONS; j++) {
publisher.publish(CHANNEL_NAME, "hello");
publisher.publish(CHANNEL_NAME, String.format("hello-%d-%d", localI, j));
}
publisher.close();
return null;
Expand All @@ -376,14 +380,18 @@ public void testConcurrentPubSub() throws Exception {
GeodeAwaitility.await().untilAsserted(future::get);
}

GeodeAwaitility.await()
.untilAsserted(() -> assertThat(mockSubscriber1.getReceivedMessages().size())
.isEqualTo(CLIENT_COUNT * ITERATIONS));
GeodeAwaitility.await()
.untilAsserted(() -> assertThat(mockSubscriber2.getReceivedMessages().size())
.isEqualTo(CLIENT_COUNT * ITERATIONS));

mockSubscriber1.unsubscribe(CHANNEL_NAME);
mockSubscriber2.unsubscribe(CHANNEL_NAME);

GeodeAwaitility.await().untilAsserted(subscriber1Future::get);
GeodeAwaitility.await().untilAsserted(subscriber2Future::get);

assertThat(mockSubscriber1.getReceivedMessages().size()).isEqualTo(CLIENT_COUNT * ITERATIONS);
assertThat(mockSubscriber2.getReceivedMessages().size()).isEqualTo(CLIENT_COUNT * ITERATIONS);
subscriber1Future.get();
subscriber2Future.get();
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@

package org.apache.geode.redis;

import static org.apache.geode.test.dunit.rules.RedisClusterStartupRule.BIND_ADDRESS;
import static org.apache.geode.test.dunit.rules.RedisClusterStartupRule.REDIS_CLIENT_TIMEOUT;
import static org.assertj.core.api.Assertions.assertThat;

import java.util.Arrays;
Expand All @@ -35,16 +37,14 @@
public abstract class AbstractCommandPipeliningIntegrationTest implements RedisIntegrationTest {
private Jedis publisher;
private Jedis subscriber;
private static final int REDIS_CLIENT_TIMEOUT =
Math.toIntExact(GeodeAwaitility.getTimeout().toMillis());

@ClassRule
public static ExecutorServiceRule executor = new ExecutorServiceRule();

@Before
public void setUp() {
subscriber = new Jedis("localhost", getPort(), REDIS_CLIENT_TIMEOUT);
publisher = new Jedis("localhost", getPort(), REDIS_CLIENT_TIMEOUT);
subscriber = new Jedis(BIND_ADDRESS, getPort(), REDIS_CLIENT_TIMEOUT);
publisher = new Jedis(BIND_ADDRESS, getPort(), REDIS_CLIENT_TIMEOUT);
}

@After
Expand Down Expand Up @@ -77,11 +77,12 @@ public void whenPipelining_commandResponsesAreNotCorrupted() {

List<Object> responses = pipe.syncAndReturnAll();

GeodeAwaitility.await().untilAsserted(
() -> assertThat(mockSubscriber.getReceivedMessages()).isEqualTo(expectedMessages));

mockSubscriber.unsubscribe("salutations");
waitFor(() -> mockSubscriber.getSubscribedChannels() == 0);
waitFor(() -> !subscriberThread.isAlive());

assertThat(mockSubscriber.getReceivedMessages()).isEqualTo(expectedMessages);
}

@Test
Expand Down
Loading

0 comments on commit 8aeec39

Please sign in to comment.