Skip to content

Commit 7d7a98b

Browse files
authored
GEODE-10331: schedule delayed CloseEndpoint (apache#7849)
* GEODE-10331: schedule delayed CloseEndpoint * GEODE-10331: added TCs
1 parent c4e5a03 commit 7d7a98b

File tree

4 files changed

+168
-29
lines changed

4 files changed

+168
-29
lines changed

geode-core/src/main/java/org/apache/geode/distributed/internal/DistributionImpl.java

+2-18
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,6 @@
7474
import org.apache.geode.internal.tcp.ConnectExceptions;
7575
import org.apache.geode.internal.tcp.ConnectionException;
7676
import org.apache.geode.internal.util.Breadcrumbs;
77-
import org.apache.geode.logging.internal.executors.LoggingThread;
7877
import org.apache.geode.logging.internal.log4j.api.LogService;
7978
import org.apache.geode.security.AuthenticationRequiredException;
8079
import org.apache.geode.security.GemFireSecurityException;
@@ -648,28 +647,13 @@ private void setDirectChannelLocalAddress(final InternalDistributedMember addres
648647
}
649648
}
650649

651-
private void destroyMember(final InternalDistributedMember member, final String reason) {
650+
void destroyMember(final InternalDistributedMember member, final String reason) {
652651
final DirectChannel dc = directChannel;
653652
if (dc != null) {
654653
// Bug 37944: make sure this is always done in a separate thread,
655654
// so that shutdown conditions don't wedge the view lock
656655
// fix for bug 34010
657-
new LoggingThread("disconnect thread for " + member, () -> {
658-
try {
659-
Thread.sleep(Integer.getInteger("p2p.disconnectDelay", 3000));
660-
} catch (InterruptedException ie) {
661-
Thread.currentThread().interrupt();
662-
// Keep going, try to close the endpoint.
663-
}
664-
if (!dc.isOpen()) {
665-
return;
666-
}
667-
if (logger.isDebugEnabled()) {
668-
logger.debug("Membership: closing connections for departed member {}", member);
669-
}
670-
// close connections, but don't do membership notification since it's already been done
671-
dc.closeEndpoint(member, reason, false);
672-
}).start();
656+
dc.scheduleCloseEndpoint(member, reason, false);
673657
}
674658
}
675659

geode-core/src/main/java/org/apache/geode/distributed/internal/direct/DirectChannel.java

+46
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,9 @@
2424
import java.util.Map;
2525
import java.util.Properties;
2626
import java.util.Set;
27+
import java.util.concurrent.ScheduledExecutorService;
28+
import java.util.concurrent.ScheduledThreadPoolExecutor;
29+
import java.util.concurrent.TimeUnit;
2730

2831
import org.apache.logging.log4j.Logger;
2932

@@ -56,6 +59,7 @@
5659
import org.apache.geode.internal.tcp.MsgStreamer;
5760
import org.apache.geode.internal.tcp.TCPConduit;
5861
import org.apache.geode.internal.util.Breadcrumbs;
62+
import org.apache.geode.logging.internal.executors.LoggingExecutors;
5963
import org.apache.geode.logging.internal.log4j.api.LogService;
6064

6165
/**
@@ -86,6 +90,11 @@ public class DirectChannel {
8690

8791
InternalDistributedMember localAddr;
8892

93+
private ScheduledExecutorService closeEndpointExecutor;
94+
95+
private final int CLOSE_ENDPOINT_POOL_SIZE =
96+
Integer.getInteger("DirectChannel.CLOSE_ENDPOINT_POOL_SIZE", 1);
97+
8998
/**
9099
* Callback to set the local address, must be done before this channel is used.
91100
*
@@ -147,6 +156,9 @@ public DirectChannel(Membership<InternalDistributedMember> mgr,
147156
logger.info("GemFire P2P Listener started on {}",
148157
conduit.getSocketId());
149158

159+
closeEndpointExecutor = LoggingExecutors.newScheduledThreadPool(CLOSE_ENDPOINT_POOL_SIZE,
160+
"DirectChannel.closeEndpoint", false);
161+
150162
} catch (ConnectionException ce) {
151163
logger.fatal(String.format("Unable to initialize direct channel because: %s",
152164
ce.getMessage()),
@@ -667,6 +679,7 @@ public void emergencyClose() {
667679
public synchronized void disconnect(Exception cause) {
668680
disconnected = true;
669681
disconnectCompleted = false;
682+
closeEndpointExecutor.shutdownNow();
670683
conduit.stop(cause);
671684
disconnectCompleted = true;
672685
}
@@ -765,4 +778,37 @@ public void waitForChannelState(DistributedMember member, Map channelState)
765778
public boolean hasReceiversFor(DistributedMember mbr) {
766779
return conduit.hasReceiversFor(mbr);
767780
}
781+
782+
public void scheduleCloseEndpoint(InternalDistributedMember member, String reason,
783+
boolean notifyDisconnect) {
784+
if (disconnected) {
785+
return;
786+
}
787+
closeEndpointExecutor.schedule(new CloseEndpointRunnable(member, reason, notifyDisconnect),
788+
Integer.getInteger("p2p.disconnectDelay", 3000), TimeUnit.MILLISECONDS);
789+
}
790+
791+
int getCloseEndpointExecutorQueueSize() {
792+
ScheduledThreadPoolExecutor implementation =
793+
(ScheduledThreadPoolExecutor) closeEndpointExecutor;
794+
return implementation.getQueue().size();
795+
}
796+
797+
public class CloseEndpointRunnable implements Runnable {
798+
799+
protected final InternalDistributedMember member;
800+
protected final String reason;
801+
protected final boolean notifyDisconnect;
802+
803+
public CloseEndpointRunnable(InternalDistributedMember member, String reason, boolean notify) {
804+
this.member = member;
805+
this.reason = reason;
806+
this.notifyDisconnect = notify;
807+
}
808+
809+
@Override
810+
public void run() {
811+
closeEndpoint(member, reason, notifyDisconnect);
812+
}
813+
}
768814
}

geode-core/src/test/java/org/apache/geode/distributed/internal/DistributionTest.java

+20-11
Original file line numberDiff line numberDiff line change
@@ -15,13 +15,13 @@
1515
package org.apache.geode.distributed.internal;
1616

1717
import static org.apache.geode.distributed.internal.DistributionImpl.EMPTY_MEMBER_ARRAY;
18+
import static org.assertj.core.api.Assertions.assertThat;
1819
import static org.assertj.core.api.Assertions.assertThatThrownBy;
19-
import static org.junit.Assert.assertEquals;
20-
import static org.junit.Assert.assertTrue;
21-
import static org.junit.Assert.fail;
20+
import static org.assertj.core.api.Assertions.fail;
2221
import static org.mockito.ArgumentMatchers.any;
2322
import static org.mockito.ArgumentMatchers.anyInt;
2423
import static org.mockito.ArgumentMatchers.anyLong;
24+
import static org.mockito.ArgumentMatchers.eq;
2525
import static org.mockito.ArgumentMatchers.isA;
2626
import static org.mockito.Mockito.doThrow;
2727
import static org.mockito.Mockito.mock;
@@ -109,7 +109,7 @@ public void testDirectChannelSend() throws Exception {
109109
m.setRecipients(recipients);
110110
Set<InternalDistributedMember> failures = distribution
111111
.directChannelSend(recipients, m);
112-
assertTrue(failures == null);
112+
assertThat(failures == null).isTrue();
113113
verify(dc).send(any(), any(),
114114
any(), anyLong(), anyLong());
115115
}
@@ -126,9 +126,9 @@ public void testDirectChannelSendFailureToOneRecipient() throws Exception {
126126
when(dc.send(any(), any(mockMembers.getClass()),
127127
any(DistributionMessage.class), anyLong(), anyLong())).thenThrow(exception);
128128
failures = distribution.directChannelSend(recipients, m);
129-
assertTrue(failures != null);
130-
assertEquals(1, failures.size());
131-
assertEquals(recipients.get(0), failures.iterator().next());
129+
assertThat(failures != null).isTrue();
130+
assertThat(failures).hasSize(1);
131+
assertThat(failures.iterator().next()).isEqualTo(recipients.get(0));
132132
}
133133

134134
@Test
@@ -154,10 +154,10 @@ public void testDirectChannelSendAllRecipients() throws Exception {
154154
HighPriorityAckedMessage m = new HighPriorityAckedMessage();
155155
when(membership.getAllMembers(EMPTY_MEMBER_ARRAY)).thenReturn(mockMembers);
156156
m.setRecipient(DistributionMessage.ALL_RECIPIENTS);
157-
assertTrue(m.forAll());
157+
assertThat(m.forAll()).isTrue();
158158
Set<InternalDistributedMember> failures = distribution
159159
.directChannelSend(null, m);
160-
assertTrue(failures == null);
160+
assertThat(failures == null).isTrue();
161161
verify(dc).send(any(), isA(mockMembers.getClass()),
162162
isA(DistributionMessage.class), anyLong(), anyLong());
163163
}
@@ -188,8 +188,8 @@ public void testSendAdminMessageFailsDuringShutdown() throws Exception {
188188
Set<InternalDistributedMember> failures =
189189
distribution.send(Collections.singletonList(mockMembers[0]), m);
190190
verify(membership, never()).send(any(), any());
191-
assertEquals(1, failures.size());
192-
assertEquals(mockMembers[0], failures.iterator().next());
191+
assertThat(failures).hasSize(1);
192+
assertThat(failures.iterator().next()).isEqualTo(mockMembers[0]);
193193
}
194194

195195
@Test
@@ -228,4 +228,13 @@ public void testExceptionNestedOnStartStartupError() throws Exception {
228228
.isInstanceOf(SystemConnectException.class)
229229
.hasCause(exception);
230230
}
231+
232+
@Test
233+
public void testMemberDestroyed() throws Exception {
234+
distribution.destroyMember(mockMembers[0], null);
235+
distribution.destroyMember(mockMembers[1], null);
236+
237+
verify(dc).scheduleCloseEndpoint(eq(mockMembers[0]), eq(null), eq(false));
238+
verify(dc).scheduleCloseEndpoint(eq(mockMembers[1]), eq(null), eq(false));
239+
}
231240
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,100 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license
3+
* agreements. See the NOTICE file distributed with this work for additional information regarding
4+
* copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
5+
* "License"); you may not use this file except in compliance with the License. You may obtain a
6+
* copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software distributed under the License
11+
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
12+
* or implied. See the License for the specific language governing permissions and limitations under
13+
* the License.
14+
*/
15+
package org.apache.geode.distributed.internal.direct;
16+
17+
18+
import static org.assertj.core.api.Assertions.assertThat;
19+
import static org.mockito.Mockito.mock;
20+
import static org.mockito.Mockito.when;
21+
22+
import java.util.Random;
23+
24+
import org.jgroups.util.UUID;
25+
import org.junit.Before;
26+
import org.junit.Test;
27+
28+
import org.apache.geode.distributed.internal.ClusterDistributionManager;
29+
import org.apache.geode.distributed.internal.DistributionConfig;
30+
import org.apache.geode.distributed.internal.InternalDistributedSystem;
31+
import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
32+
import org.apache.geode.distributed.internal.membership.api.Membership;
33+
import org.apache.geode.distributed.internal.membership.api.MessageListener;
34+
import org.apache.geode.internal.net.SocketCreatorFactory;
35+
import org.apache.geode.internal.security.SecurableCommunicationChannel;
36+
37+
public class DirectChannelTest {
38+
39+
40+
private DirectChannel directChannel;
41+
private InternalDistributedMember[] mockMembers;
42+
43+
Membership<InternalDistributedMember> mgr;
44+
MessageListener<InternalDistributedMember> listener;
45+
ClusterDistributionManager dm;
46+
47+
DistributionConfig dc;
48+
49+
/**
50+
* Some tests require a DirectChannel mock
51+
*/
52+
@Before
53+
public void setUp() throws Exception {
54+
listener = mock(MessageListener.class);
55+
mgr = mock(Membership.class);
56+
dm = mock(ClusterDistributionManager.class);
57+
dc = mock(DistributionConfig.class);
58+
59+
Random r = new Random();
60+
mockMembers = new InternalDistributedMember[5];
61+
for (int i = 0; i < mockMembers.length; i++) {
62+
mockMembers[i] = new InternalDistributedMember("localhost", 8888 + i);
63+
UUID uuid = new UUID(r.nextLong(), r.nextLong());
64+
mockMembers[i].setUUID(uuid);
65+
}
66+
when(dm.getConfig()).thenReturn(dc);
67+
when(dm.getSystem()).thenReturn(mock(InternalDistributedSystem.class));
68+
69+
int[] range = new int[2];
70+
range[0] = 41000;
71+
range[1] = 61000;
72+
when(dc.getMembershipPortRange()).thenReturn(range);
73+
SecurableCommunicationChannel[] sslEnabledComponent = new SecurableCommunicationChannel[1];
74+
sslEnabledComponent[0] = SecurableCommunicationChannel.CLUSTER;
75+
76+
when(dc.getSecurableCommunicationChannels()).thenReturn(sslEnabledComponent);
77+
78+
SocketCreatorFactory.setDistributionConfig(dc);
79+
directChannel = new DirectChannel(mgr, listener, dm);
80+
}
81+
82+
@Test
83+
public void testScheduleCloseEndpoint() throws Exception {
84+
directChannel.scheduleCloseEndpoint(mockMembers[0], null, false);
85+
directChannel.scheduleCloseEndpoint(mockMembers[1], null, false);
86+
directChannel.scheduleCloseEndpoint(mockMembers[2], null, false);
87+
88+
assertThat(directChannel.getCloseEndpointExecutorQueueSize()).isEqualTo(3);
89+
}
90+
91+
@Test
92+
public void testScheduleCloseEndpointAndClearAllAtDisconnect() throws Exception {
93+
directChannel.scheduleCloseEndpoint(mockMembers[0], null, false);
94+
directChannel.scheduleCloseEndpoint(mockMembers[1], null, false);
95+
directChannel.scheduleCloseEndpoint(mockMembers[2], null, false);
96+
directChannel.disconnect(null);
97+
98+
assertThat(directChannel.getCloseEndpointExecutorQueueSize()).isEqualTo(0);
99+
}
100+
}

0 commit comments

Comments
 (0)