Skip to content

Commit

Permalink
GEODE-8238: message loss during shutdown in Shutdown Hook when JVM ex…
Browse files Browse the repository at this point in the history
…its (apache#5232)

Remove invocation of removeEndpoint when a shared/unordered connection
shuts down.  Endpoint cleanup is already initiated by DistributionImpl
during membership view installation, so it isn't needed here.
  • Loading branch information
bschuchardt authored Jun 10, 2020
1 parent 1231db1 commit ece3a5a
Show file tree
Hide file tree
Showing 3 changed files with 26 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,10 @@
import org.apache.geode.distributed.ConfigurationProperties;
import org.apache.geode.distributed.DistributedSystem;
import org.apache.geode.distributed.Locator;
import org.apache.geode.distributed.internal.DistributionImpl;
import org.apache.geode.distributed.internal.InternalDistributedSystem;
import org.apache.geode.distributed.internal.SerialAckedMessage;
import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
import org.apache.geode.test.dunit.DistributedTestCase;
import org.apache.geode.test.dunit.DistributedTestUtils;
import org.apache.geode.test.dunit.IgnoredException;
Expand Down Expand Up @@ -96,6 +98,25 @@ public void basicAcceptConnection() throws Exception {
assertThat(ConnectionTable.getNumSenderSharedConnections()).isEqualTo(3);
});

// ensure that the closing of a shared/unordered connection to another node does not
// remove all connections for that node
InternalDistributedMember otherMember =
(InternalDistributedMember) system.getAllOtherMembers().iterator().next();
DistributionImpl distribution =
(DistributionImpl) system.getDistributionManager().getDistribution();
final ConnectionTable connectionTable =
distribution.getDirectChannel().getConduit().getConTable();

assertThat(connectionTable.hasReceiversFor(otherMember)).isTrue();

Connection sharedUnordered = connectionTable.get(otherMember, false,
System.currentTimeMillis(), 15000, 0);
sharedUnordered.requestClose("for testing");
// the sender connection has been closed so we should only have 2 senders now
assertThat(ConnectionTable.getNumSenderSharedConnections()).isEqualTo(2);
// there should still be receivers for the other member - endpoint not removed!
assertThat(connectionTable.hasReceiversFor(otherMember)).isTrue();

try {
await("for message to be sent").until(() -> {
final SerialAckedMessage serialAckedMessage = new SerialAckedMessage();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -267,6 +267,7 @@ private int sendToMany(final Membership mgr,
final List cons = new ArrayList(destinations.length);
ConnectExceptions ce = getConnections(mgr, msg, destinations, orderedMsg, retry, ackTimeout,
ackSDTimeout, cons);

if (directReply && msg.getProcessorId() > 0) { // no longer a direct-reply message?
directReply = false;
}
Expand Down Expand Up @@ -690,11 +691,6 @@ private InetAddress initAddress(DistributionConfig dc) {
}
}

public void closeEndpoint(InternalDistributedMember member, String reason) {
closeEndpoint(member, reason, true);
}


/**
* Closes any connections used to communicate with the given jgroupsAddress.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1261,7 +1261,7 @@ private void batchSend(ByteBuffer src) {
* Invoking this method ensures that the proper synchronization is done.
*/
void requestClose(String reason) {
close(reason, true, true, false, false);
close(reason, true, false, false, false);
}

boolean isClosing() {
Expand Down Expand Up @@ -1519,9 +1519,6 @@ private String p2pReaderName() {
}

private void readMessages() {
if (closing.get()) {
return;
}
// take a snapshot of uniqueId to detect reconnect attempts
SocketChannel channel;
try {
Expand Down Expand Up @@ -3268,8 +3265,9 @@ Version getRemoteVersion() {

@Override
public String toString() {
return String.valueOf(remoteAddr) + '@' + uniqueId
+ (remoteVersion != null ? '(' + remoteVersion.toString() + ')' : "");
return remoteAddr + "(uid=" + uniqueId + ")"
+ (remoteVersion != null && remoteVersion != Version.CURRENT
? "(v" + remoteVersion.toString() + ')' : "");
}

/**
Expand Down

0 comments on commit ece3a5a

Please sign in to comment.