Skip to content
This repository was archived by the owner on Jan 27, 2021. It is now read-only.

Commit 27b8e47

Browse files
GEODE-8333: Change Redis adapter threading model - fixes pubsub issues (apache#5488)
- Do not use Netty threads for the entire request lifecycle. Each instance of ExecutionHandlerContext (essentially each client connection) uses a command queue which is actioned by a single thread taken from the Geode 'waiting pool'. - Every SUBSCRIBEed client is moved to a separate EventGroupLoop so that PUBLISHed messages are not sent back using the 'normal' Worker EventLoopGroup. This avoids a deadlock issue where PUBLISHed messages need to be sent using the same thread that the PUBLISH response needs to happen on. - Fix issues with PubSub where switching the EventLoopGroup may fail (because a client has already closed the connection) thus resulting in hanging PUBLISHers waiting for a Subscription to be marked as 'readyForPublish'. - Consolidate various MockSubscriber classes
1 parent 20a35ec commit 27b8e47

File tree

31 files changed

+930
-309
lines changed

31 files changed

+930
-309
lines changed

boms/geode-all-bom/src/test/resources/expected-pom.xml

+6
Original file line numberDiff line numberDiff line change
@@ -547,6 +547,12 @@
547547
<version>3.2.0</version>
548548
<scope>compile</scope>
549549
</dependency>
550+
<dependency>
551+
<groupId>io.lettuce</groupId>
552+
<artifactId>lettuce-core</artifactId>
553+
<version>5.2.1.RELEASE</version>
554+
<scope>compile</scope>
555+
</dependency>
550556
<dependency>
551557
<groupId>xerces</groupId>
552558
<artifactId>xercesImpl</artifactId>

buildSrc/src/main/groovy/org/apache/geode/gradle/plugins/DependencyConstraints.groovy

+1
Original file line numberDiff line numberDiff line change
@@ -169,6 +169,7 @@ class DependencyConstraints implements Plugin<Project> {
169169
api(group: 'org.testcontainers', name: 'testcontainers', version: '1.13.0')
170170
api(group: 'pl.pragmatists', name: 'JUnitParams', version: '1.1.0')
171171
api(group: 'redis.clients', name: 'jedis', version: '3.2.0')
172+
api(group: 'io.lettuce', name: 'lettuce-core', version: '5.2.1.RELEASE')
172173
api(group: 'xerces', name: 'xercesImpl', version: '2.12.0')
173174
}
174175
}

geode-redis/build.gradle

+4
Original file line numberDiff line numberDiff line change
@@ -45,11 +45,14 @@ dependencies {
4545

4646
commonTestImplementation(project(':geode-junit'))
4747
commonTestImplementation(project(':geode-dunit'))
48+
commonTestImplementation('redis.clients:jedis')
4849

4950
integrationTestImplementation(project(':geode-dunit'))
5051
integrationTestImplementation(project(':geode-junit'))
5152
integrationTestImplementation(sourceSets.commonTest.output)
5253
integrationTestImplementation('redis.clients:jedis')
54+
integrationTestImplementation('io.lettuce:lettuce-core')
55+
integrationTestImplementation('org.apache.logging.log4j:log4j-core')
5356
integrationTestRuntime(project(':geode-log4j'))
5457

5558
acceptanceTestImplementation(sourceSets.integrationTest.output)
@@ -65,6 +68,7 @@ dependencies {
6568
}
6669
acceptanceTestImplementation('org.springframework.boot:spring-boot-starter-data-redis')
6770
acceptanceTestImplementation('org.springframework.session:spring-session-data-redis')
71+
acceptanceTestImplementation('org.apache.logging.log4j:log4j-core')
6872

6973
distributedTestCompile('org.apache.logging.log4j:log4j-core')
7074
distributedTestImplementation(project(':geode-dunit'))

geode-redis/src/acceptanceTest/java/org/apache/geode/redis/internal/executor/pubsub/PubSubNativeRedisAcceptanceTest.java

+3-2
Original file line numberDiff line numberDiff line change
@@ -42,8 +42,8 @@ public class PubSubNativeRedisAcceptanceTest extends PubSubIntegrationTest {
4242
public static void setUp() {
4343
redisContainer = new GenericContainer<>("redis:5.0.6").withExposedPorts(6379);
4444
redisContainer.start();
45-
subscriber = new Jedis("localhost", redisContainer.getFirstMappedPort(), REDIS_CLIENT_TIMEOUT);
46-
publisher = new Jedis("localhost", redisContainer.getFirstMappedPort(), REDIS_CLIENT_TIMEOUT);
45+
subscriber = new Jedis("localhost", redisContainer.getFirstMappedPort(), JEDIS_TIMEOUT);
46+
publisher = new Jedis("localhost", redisContainer.getFirstMappedPort(), JEDIS_TIMEOUT);
4747
}
4848

4949
@AfterClass
@@ -55,4 +55,5 @@ public static void tearDown() {
5555
public int getPort() {
5656
return redisContainer.getFirstMappedPort();
5757
}
58+
5859
}

geode-redis/src/integrationTest/java/org/apache/geode/redis/mocks/MockSubscriber.java geode-redis/src/commonTest/java/org/apache/geode/redis/mocks/MockSubscriber.java

+84-16
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,6 @@
1111
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
1212
* or implied. See the License for the specific language governing permissions and limitations under
1313
* the License.
14-
*
1514
*/
1615

1716
package org.apache.geode.redis.mocks;
@@ -20,33 +19,112 @@
2019
import java.util.Collections;
2120
import java.util.List;
2221
import java.util.Objects;
22+
import java.util.concurrent.CountDownLatch;
23+
import java.util.concurrent.TimeUnit;
2324

25+
import redis.clients.jedis.Client;
2426
import redis.clients.jedis.JedisPubSub;
2527

2628
public class MockSubscriber extends JedisPubSub {
29+
30+
private final CountDownLatch subscriptionLatch;
31+
private final CountDownLatch unsubscriptionLatch;
2732
private final List<String> receivedMessages = Collections.synchronizedList(new ArrayList<>());
2833
private final List<String> receivedPMessages = Collections.synchronizedList(new ArrayList<>());
34+
public final List<UnsubscribeInfo> unsubscribeInfos =
35+
Collections.synchronizedList(new ArrayList<>());
36+
public final List<UnsubscribeInfo> punsubscribeInfos =
37+
Collections.synchronizedList(new ArrayList<>());
38+
private String localSocketAddress;
39+
40+
public MockSubscriber() {
41+
this(new CountDownLatch(1));
42+
}
43+
44+
public MockSubscriber(CountDownLatch subscriptionLatch) {
45+
this(subscriptionLatch, new CountDownLatch(1));
46+
}
47+
48+
public MockSubscriber(CountDownLatch subscriptionLatch, CountDownLatch unsubscriptionLatch) {
49+
this.subscriptionLatch = subscriptionLatch;
50+
this.unsubscriptionLatch = unsubscriptionLatch;
51+
}
52+
53+
@Override
54+
public void proceed(Client client, String... channels) {
55+
localSocketAddress = client.getSocket().getLocalSocketAddress().toString();
56+
super.proceed(client, channels);
57+
}
58+
59+
private void switchThreadName(String suffix) {
60+
String threadName = Thread.currentThread().getName();
61+
int suffixIndex = threadName.indexOf(" -- ");
62+
if (suffixIndex >= 0) {
63+
threadName = threadName.substring(0, suffixIndex);
64+
}
65+
66+
threadName += " -- " + suffix + " [" + localSocketAddress + "]";
67+
Thread.currentThread().setName(threadName);
68+
}
2969

3070
public List<String> getReceivedMessages() {
31-
return new ArrayList<>(receivedMessages);
71+
return receivedMessages;
3272
}
3373

3474
public List<String> getReceivedPMessages() {
3575
return new ArrayList<>(receivedPMessages);
3676
}
3777

38-
public final List<UnsubscribeInfo> unsubscribeInfos =
39-
Collections.synchronizedList(new ArrayList<>());
40-
public final List<UnsubscribeInfo> punsubscribeInfos =
41-
Collections.synchronizedList(new ArrayList<>());
78+
@Override
79+
public void onMessage(String channel, String message) {
80+
switchThreadName(String.format("MESSAGE %s %s", channel, message));
81+
receivedMessages.add(message);
82+
}
83+
84+
@Override
85+
public void onPMessage(String pattern, String channel, String message) {
86+
switchThreadName(String.format("PMESSAGE %s %s %s", pattern, channel, message));
87+
receivedPMessages.add(message);
88+
}
89+
90+
@Override
91+
public void onSubscribe(String channel, int subscribedChannels) {
92+
switchThreadName(String.format("SUBSCRIBE %s", channel));
93+
subscriptionLatch.countDown();
94+
}
95+
96+
private static final int AWAIT_TIMEOUT_MILLIS = 30000;
97+
98+
public void awaitSubscribe(String channel) {
99+
try {
100+
if (!subscriptionLatch.await(AWAIT_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) {
101+
throw new RuntimeException("awaitSubscribe timed out for channel: " + channel);
102+
}
103+
} catch (InterruptedException e) {
104+
throw new RuntimeException(e);
105+
}
106+
}
42107

43108
@Override
44109
public void onUnsubscribe(String channel, int subscribedChannels) {
110+
switchThreadName(String.format("UNSUBSCRIBE %s %d", channel, subscribedChannels));
45111
unsubscribeInfos.add(new UnsubscribeInfo(channel, subscribedChannels));
112+
unsubscriptionLatch.countDown();
113+
}
114+
115+
public void awaitUnsubscribe(String channel) {
116+
try {
117+
if (!unsubscriptionLatch.await(AWAIT_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) {
118+
throw new RuntimeException("awaitUnsubscribe timed out for channel: " + channel);
119+
}
120+
} catch (InterruptedException e) {
121+
throw new RuntimeException(e);
122+
}
46123
}
47124

48125
@Override
49126
public void onPUnsubscribe(String pattern, int subscribedChannels) {
127+
switchThreadName(String.format("PUNSUBSCRIBE %s %d", pattern, subscribedChannels));
50128
punsubscribeInfos.add(new UnsubscribeInfo(pattern, subscribedChannels));
51129
}
52130

@@ -86,14 +164,4 @@ public String toString() {
86164
}
87165
}
88166

89-
90-
@Override
91-
public void onMessage(String channel, String message) {
92-
receivedMessages.add(message);
93-
}
94-
95-
@Override
96-
public void onPMessage(String pattern, String channel, String message) {
97-
receivedPMessages.add(message);
98-
}
99167
}

geode-redis/src/distributedTest/java/org/apache/geode/redis/MockSubscriber.java

-62
This file was deleted.

0 commit comments

Comments
 (0)