Skip to content

Commit

Permalink
GEODE-9484: Improve sending message to multy destinations (apache#7381)
Browse files Browse the repository at this point in the history
* GEODE-9484: New solution to first try only one attempt to create all connections
  • Loading branch information
mivanac authored May 2, 2022
1 parent 50f5d6c commit 62cd12c
Show file tree
Hide file tree
Showing 10 changed files with 320 additions and 68 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import static org.apache.geode.distributed.ConfigurationProperties.MCAST_PORT;
import static org.apache.geode.internal.AvailablePortHelper.getRandomAvailableTCPPort;
import static org.apache.geode.test.awaitility.GeodeAwaitility.await;
import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.Assert.assertEquals;

import java.io.IOException;
Expand Down Expand Up @@ -50,6 +51,8 @@
import org.apache.geode.cache.util.CacheListenerAdapter;
import org.apache.geode.cache30.CacheSerializableRunnable;
import org.apache.geode.distributed.internal.ServerLocationAndMemberId;
import org.apache.geode.distributed.internal.membership.api.MembershipManagerHelper;
import org.apache.geode.test.dunit.AsyncInvocation;
import org.apache.geode.test.dunit.Host;
import org.apache.geode.test.dunit.IgnoredException;
import org.apache.geode.test.dunit.NetworkUtils;
Expand All @@ -68,53 +71,89 @@
* the same across servers
*/
@Category({ClientSubscriptionTest.class})
public class UpdatePropagationDUnitTest extends JUnit4CacheTestCase {
public class UpdatePropagationDistributedTest extends JUnit4CacheTestCase {

private static final String REGION_NAME = "UpdatePropagationDUnitTest_region";

private VM server1 = null;
private VM server2 = null;
private VM server3 = null;
private VM client1 = null;
private VM client2 = null;

private int PORT1;
private int PORT2;
private int PORT3;

private final int minNumEntries = 2;

private String hostnameServer1;
private String hostnameServer3;

@Override
public final void postSetUp() throws Exception {
disconnectAllFromDS();

final Host host = Host.getHost(0);
// Server1 VM

server1 = host.getVM(0);

// Server2 VM
server2 = host.getVM(1);

// Client 1 VM
client1 = host.getVM(2);
server3 = host.getVM(2);

// client 2 VM
client2 = host.getVM(3);
client1 = host.getVM(3);

PORT1 = server1.invoke(this::createServerCache);
PORT2 = server2.invoke(this::createServerCache);
client2 = host.getVM(4);

client1.invoke(
() -> createClientCache(NetworkUtils.getServerHostName(server1.getHost()), PORT1, PORT2));
client2.invoke(
() -> createClientCache(NetworkUtils.getServerHostName(server1.getHost()), PORT1, PORT2));
PORT1 = server1.invoke(() -> createServerCache());
PORT2 = server2.invoke(() -> createServerCache());
PORT3 = server3.invoke(() -> createServerCache());

hostnameServer1 = NetworkUtils.getServerHostName(server1.getHost());
hostnameServer3 = NetworkUtils.getServerHostName(server3.getHost());

IgnoredException.addIgnoredException("java.net.SocketException");
IgnoredException.addIgnoredException("Unexpected IOException");
}



@Test
public void updatesArePropagatedToAllMembersWhenOneKilled() throws Exception {
client1.invoke(
() -> createClientCache(hostnameServer1, PORT1));
client2.invoke(
() -> createClientCache(hostnameServer3, PORT3));
int entries = 20;
AsyncInvocation invocation = client1.invokeAsync(() -> doPuts(entries));

// Wait for some entries to be put
server1.invoke(this::verifyMinEntriesInserted);

// Simulate crash
server2.invoke(() -> {
MembershipManagerHelper.crashDistributedSystem(getSystemStatic());
});

invocation.await();

int notNullEntriesIn1 = client1.invoke(() -> getNotNullEntriesNumber(entries));
int notNullEntriesIn3 = client2.invoke(() -> getNotNullEntriesNumber(entries));
assertThat(notNullEntriesIn3).isEqualTo(notNullEntriesIn1);
}

/**
* This tests whether the updates are received by other clients or not , if there are situation of
* Interest List fail over
*/
@Test
public void updatesAreProgegatedAfterFailover() {
client1.invoke(
() -> createClientCache(hostnameServer1, PORT1, PORT2));
client2.invoke(
() -> createClientCache(hostnameServer1, PORT1, PORT2));

// First create entries on both servers via the two client
client1.invoke(this::createEntriesK1andK2);
client2.invoke(this::createEntriesK1andK2);
Expand Down Expand Up @@ -248,6 +287,18 @@ private void createClientCache(String host, Integer port1, Integer port2) throws
.addCacheListener(new EventTrackingCacheListener()).create(REGION_NAME);
}

private void createClientCache(String host, Integer port1) {
Properties props = new Properties();
props.setProperty(LOCATORS, "");
ClientCacheFactory cf = new ClientCacheFactory();
cf.addPoolServer(host, port1).setPoolSubscriptionEnabled(false)
.setPoolSubscriptionRedundancy(-1).setPoolMinConnections(4).setPoolSocketBufferSize(1000)
.setPoolReadTimeout(100).setPoolPingInterval(300);
ClientCache cache = getClientCache(cf);
cache.createClientRegionFactory(ClientRegionShortcut.PROXY)
.create(REGION_NAME);
}

private Integer createServerCache() throws Exception {
Cache cache = getCache();
RegionAttributes attrs = createCacheServerAttributes();
Expand All @@ -258,7 +309,7 @@ private Integer createServerCache() throws Exception {
server.setPort(port);
server.setNotifyBySubscription(true);
server.start();
return server.getPort();
return new Integer(server.getPort());
}

protected RegionAttributes createCacheServerAttributes() {
Expand Down Expand Up @@ -305,6 +356,36 @@ private void verifyUpdates() {
});
}

private void verifyMinEntriesInserted() {
await().untilAsserted(() -> assertThat(getCache().getRegion(SEPARATOR + REGION_NAME))
.hasSizeGreaterThan(minNumEntries));
}

private void doPuts(int entries) throws Exception {
Region<String, String> r1 = getCache().getRegion(REGION_NAME);
assertThat(r1).isNotNull();
for (int i = 0; i < entries; i++) {
try {
r1.put("" + i, "" + i);
} catch (Exception e) {
}
Thread.sleep(1000);
}
}

private int getNotNullEntriesNumber(int entries) {
int notNullEntries = 0;
Region<String, String> r1 = getCache().getRegion(SEPARATOR + REGION_NAME);
assertThat(r1).isNotNull();
for (int i = 0; i < entries; i++) {
Object value = r1.get("" + i, "" + i);
if (value != null) {
notNullEntries++;
}
}
return notNullEntries;
}

private static class EventTrackingCacheListener extends CacheListenerAdapter {

List<EntryEvent> receivedEvents = new ArrayList<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
/**
* subclass of UpdatePropagationDUnitTest to exercise partitioned regions
*/
public class UpdatePropagationPRDUnitTest extends UpdatePropagationDUnitTest {
public class UpdatePropagationPRDistributedTest extends UpdatePropagationDistributedTest {

@Override
protected RegionAttributes createCacheServerAttributes() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ public void sharedSenderShouldRecoverFromClosedSocket() {
InternalDistributedSystem distributedSystem = getCache().getInternalDistributedSystem();
InternalDistributedMember otherMember = distributedSystem.getDistributionManager()
.getOtherNormalDistributionManagerIds().iterator().next();
Connection connection = conTable.getConduit().getConnection(otherMember, true, false,
Connection connection = conTable.getConduit().getConnection(otherMember, true,
System.currentTimeMillis(), 15000, 0);
await().untilAsserted(() -> {
// grab the shared, ordered "sender" connection to vm0. It should have a residual
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ public void basicAcceptConnection() throws Exception {
assertThat(connectionTable.hasReceiversFor(otherMember)).isTrue();

Connection sharedUnordered = connectionTable.get(otherMember, false,
System.currentTimeMillis(), 15000, 0);
System.currentTimeMillis(), 15000, 0, false);
sharedUnordered.requestClose("for testing");
// the sender connection has been closed so we should only have 2 senders now
assertThat(ConnectionTable.getNumSenderSharedConnections()).isEqualTo(2);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -281,18 +281,27 @@ private int sendToMany(final Membership mgr,
directReply = false;
}
if (ce != null) {
if (failedCe != null) {
failedCe.getMembers().addAll(ce.getMembers());
failedCe.getCauses().addAll(ce.getCauses());

if (!retry) {
retryInfo = ce;
} else {
failedCe = ce;

if (failedCe != null) {
failedCe.getMembers().addAll(ce.getMembers());
failedCe.getCauses().addAll(ce.getCauses());
} else {
failedCe = ce;
}
}
ce = null;
}
if (cons.isEmpty()) {
if (failedCe != null) {
throw failedCe;
}
if (retryInfo != null) {
continue;
}
return bytesWritten;
}

Expand Down Expand Up @@ -338,7 +347,12 @@ private int sendToMany(final Membership mgr,
}

if (ce != null) {
retryInfo = ce;
if (retryInfo != null) {
retryInfo.getMembers().addAll(ce.getMembers());
retryInfo.getCauses().addAll(ce.getCauses());
} else {
retryInfo = ce;
}
ce = null;
}

Expand Down Expand Up @@ -423,13 +437,13 @@ private ConnectExceptions readAcks(List sentCons, long startTime, long ackTimeou
* @param retry whether this is a retransmission
* @param ackTimeout the ack warning timeout
* @param ackSDTimeout the ack severe alert timeout
* @param cons a list to hold the connections
* @param connectionsList a list to hold the connections
* @return null if everything went okay, or a ConnectExceptions object if some connections
* couldn't be obtained
*/
private ConnectExceptions getConnections(Membership mgr, DistributionMessage msg,
InternalDistributedMember[] destinations, boolean preserveOrder, boolean retry,
long ackTimeout, long ackSDTimeout, List cons) {
long ackTimeout, long ackSDTimeout, List<Connection> connectionsList) {
ConnectExceptions ce = null;
for (InternalDistributedMember destination : destinations) {
if (destination == null) {
Expand Down Expand Up @@ -458,12 +472,18 @@ private ConnectExceptions getConnections(Membership mgr, DistributionMessage msg
if (ackTimeout > 0) {
startTime = System.currentTimeMillis();
}
Connection con = conduit.getConnection(destination, preserveOrder, retry, startTime,
ackTimeout, ackSDTimeout);
final Connection connection;
if (!retry) {
connection = conduit.getFirstScanForConnection(destination, preserveOrder, startTime,
ackTimeout, ackSDTimeout);
} else {
connection = conduit.getConnection(destination, preserveOrder, startTime,
ackTimeout, ackSDTimeout);
}

con.setInUse(true, startTime, 0, 0, null); // fix for bug#37657
cons.add(con);
if (con.isSharedResource() && msg instanceof DirectReplyMessage) {
connection.setInUse(true, startTime, 0, 0, null); // fix for bug#37657
connectionsList.add(connection);
if (connection.isSharedResource() && msg instanceof DirectReplyMessage) {
DirectReplyMessage directMessage = (DirectReplyMessage) msg;
directMessage.registerProcessor();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -961,7 +961,7 @@ static Connection createSender(final Membership<InternalDistributedMember> mgr,
final ConnectionTable t,
final boolean preserveOrder, final InternalDistributedMember remoteAddr,
final boolean sharedResource,
final long startTime, final long ackTimeout, final long ackSATimeout)
final long startTime, final long ackTimeout, final long ackSATimeout, boolean doNotRetry)
throws IOException, DistributedSystemDisconnectedException {
boolean success = false;
Connection conn = null;
Expand Down Expand Up @@ -1021,7 +1021,9 @@ static Connection createSender(final Membership<InternalDistributedMember> mgr,
// do not change the text of this exception - it is looked for in exception handlers
throw new IOException("Cannot form connection to alert listener " + remoteAddr);
}

if (doNotRetry) {
throw new IOException("Connection not created in first try to " + remoteAddr);
}
// Wait briefly...
interrupted = Thread.interrupted() || interrupted;
try {
Expand Down
Loading

0 comments on commit 62cd12c

Please sign in to comment.