Skip to content

Commit

Permalink
GEODE-10403: Fix distributed deadlock with stop gw sender (apache#7830)
Browse files Browse the repository at this point in the history
There is a distributed deadlock that can appear
when stopping the gateway sender if a race condition
happens in which the stop gateway sender command gets blocked
indefinitely trying to get the size of the queue from remote peers
(ParallelGatewaySenderQueue.size() call) and
also one call to store one event in the queue tries to get
the lifecycle lock (acquired by the gateway sender command).

These two calls could get into a deadlock under heavy load and
make the system unresponsive for any traffic request (get, put, ...).

In order to avoid it, in the storage of the event in the gateway
sender queue (AbstractGatewaySender.distribute() call),
instead to trying to get the lifecycle lock without
any timeout, a try with a timeout is added. If the
try returns false it is checked if the gateway sender is running. If
it is not running, the event is dropped and there is no need to get the lock.
Otherwise, the lifecycle lock acquire is retried until it succeeds or
the gateway sender is stopped.
  • Loading branch information
albertogpz authored Aug 18, 2022
1 parent 4cbb960 commit 6c257d7
Show file tree
Hide file tree
Showing 4 changed files with 133 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,14 @@
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

import org.apache.logging.log4j.Logger;

import org.apache.geode.CancelException;
import org.apache.geode.InternalGemFireError;
import org.apache.geode.SystemFailure;
import org.apache.geode.annotations.internal.MutableForTesting;
import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
import org.apache.geode.distributed.internal.membership.gms.messages.ViewAckMessage;
import org.apache.geode.internal.logging.CoreLoggingExecutors;
Expand Down Expand Up @@ -167,6 +169,8 @@ public class ClusterOperationExecutors implements OperationExecutors {

private SerialQueuedExecutorPool serialQueuedExecutorPool;

@MutableForTesting
public static final AtomicInteger maxPrThreadsForTest = new AtomicInteger(-1);

ClusterOperationExecutors(DistributionStats stats,
InternalDistributedSystem system) {
Expand Down Expand Up @@ -252,10 +256,11 @@ public class ClusterOperationExecutors implements OperationExecutors {
this::doWaitingThread, stats.getWaitingPoolHelper(),
threadMonitor);

if (MAX_PR_THREADS > 1) {
int maxPrThreads = maxPrThreadsForTest.get() > 0 ? maxPrThreadsForTest.get() : MAX_PR_THREADS;
if (maxPrThreads > 1) {
partitionedRegionPool =
CoreLoggingExecutors.newThreadPoolWithFeedStatistics(
MAX_PR_THREADS, INCOMING_QUEUE_LIMIT, stats.getPartitionedRegionQueueHelper(),
maxPrThreads, INCOMING_QUEUE_LIMIT, stats.getPartitionedRegionQueueHelper(),
"PartitionedRegion Message Processor",
thread -> stats.incPartitionedRegionThreadStarts(), this::doPartitionRegionThread,
stats.getPartitionedRegionPoolHelper(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import java.util.Set;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.ReentrantReadWriteLock;

import org.apache.logging.log4j.Logger;
Expand Down Expand Up @@ -238,6 +239,9 @@ public abstract class AbstractGatewaySender implements InternalGatewaySender, Di

protected boolean enforceThreadsConnectSameReceiver;

@MutableForTesting
public static final AtomicBoolean doSleepForTestingInDistribute = new AtomicBoolean(false);

protected AbstractGatewaySender() {
statisticsClock = disabledClock();
}
Expand Down Expand Up @@ -1125,16 +1129,17 @@ public void distribute(EnumListenerEvent operation, EntryEventImpl event,
}

// If this gateway is not running, return
if (!isRunning()) {
if (isPrimary()) {
recordDroppedEvent(clonedEvent);
}
if (isDebugEnabled) {
logger.debug("Returning back without putting into the gateway sender queue:" + event);
}
if (!getIsRunningAndDropEventIfNotRunning(event, isDebugEnabled, clonedEvent)) {
return;
}

if (AbstractGatewaySender.doSleepForTestingInDistribute.get()) {
try {
Thread.sleep(5);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
if (!getLifeCycleLock().readLock().tryLock()) {
synchronized (queuedEventsSync) {
if (!enqueuedAllTempQueueEvents) {
Expand All @@ -1151,19 +1156,22 @@ public void distribute(EnumListenerEvent operation, EntryEventImpl event,
}
}
if (enqueuedAllTempQueueEvents) {
getLifeCycleLock().readLock().lock();
try {
while (!getLifeCycleLock().readLock().tryLock(10, TimeUnit.MILLISECONDS)) {
if (!getIsRunningAndDropEventIfNotRunning(event, isDebugEnabled, clonedEvent)) {
return;
}
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return;
}
}
}
try {
// If this gateway is not running, return
// The sender may have stopped, after we have checked the status in the beginning.
if (!isRunning()) {
if (isDebugEnabled) {
logger.debug("Returning back without putting into the gateway sender queue:" + event);
}
if (isPrimary()) {
recordDroppedEvent(clonedEvent);
}
if (!getIsRunningAndDropEventIfNotRunning(event, isDebugEnabled, clonedEvent)) {
return;
}

Expand Down Expand Up @@ -1208,6 +1216,20 @@ this, getId(), operation, clonedEvent),
}
}

private boolean getIsRunningAndDropEventIfNotRunning(EntryEventImpl event, boolean isDebugEnabled,
EntryEventImpl clonedEvent) {
if (isRunning()) {
return true;
}
if (isPrimary()) {
recordDroppedEvent(clonedEvent);
}
if (isDebugEnabled) {
logger.debug("Returning back without putting into the gateway sender queue:" + event);
}
return false;
}

private void recordDroppedEvent(EntryEventImpl event) {
if (eventProcessor != null) {
eventProcessor.registerEventDroppedInPrimaryQueue(event);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -917,6 +917,13 @@ public static void createManagementCache(Integer locPort) {
createCache(true, locPort);
}

public static void createCacheConserveSocketsInVMs(Boolean conserveSockets, Integer locPort,
VM... vms) {
for (VM vm : vms) {
vm.invoke(() -> createCacheConserveSockets(conserveSockets, locPort));
}
}

public static void createCacheConserveSockets(Boolean conserveSockets, Integer locPort) {
WANTestBase test = new WANTestBase();
Properties props = test.getDistributedSystemProperties();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@
import org.apache.geode.cache.wan.GatewayEventFilter;
import org.apache.geode.cache.wan.GatewaySender;
import org.apache.geode.distributed.internal.ClusterDistributionManager;
import org.apache.geode.distributed.internal.ClusterOperationExecutors;
import org.apache.geode.distributed.internal.DistributionMessage;
import org.apache.geode.distributed.internal.DistributionMessageObserver;
import org.apache.geode.internal.cache.BucketRegion;
Expand Down Expand Up @@ -346,6 +347,66 @@ public void testParallelPropagationSenderStop() {
vm2.invoke(() -> validateRegionSizeRemainsSame(getUniqueName() + "_PR", 100));
}

/**
* Verifies that no distributed deadlock occurs when stopping a gateway sender while receiving
* traffic.
* The distributed deadlock may occur when the gateway sender tries to get the
* size of the gateway sender queue (sending a size message to other members) while holding the
* lifeCycleLock lock. This lock is also taken when an event is to be distributed by the gateway
* sender.
* As this issue has only been observed in the field with a lot of traffic, in order to reproduce
* it in a test case, conserve-sockets is set to true (although the deadlock has also
* been seen with conserve-sockets=false), the size of the PartitionedRegion thread pool is set
* to a small value and an artificial timeout is added at a point in the distribute() call
* of the AbstractGatewaySeder class.
*/
@Test
public void testNoDistributedDeadlockWithGatewaySenderStop() throws Exception {
addIgnoredException("Broken pipe");
Integer[] locatorPorts = createLNAndNYLocators();
Integer lnPort = locatorPorts[0];
Integer nyPort = locatorPorts[1];
VM[] senders = {vm4, vm5, vm6, vm7};
try {
for (VM sender : senders) {
sender.invoke(() -> AbstractGatewaySender.doSleepForTestingInDistribute.set(true));
sender.invoke(() -> ClusterOperationExecutors.maxPrThreadsForTest.set(2));
}
vm2.invoke(() -> ClusterOperationExecutors.maxPrThreadsForTest.set(2));
vm3.invoke(() -> ClusterOperationExecutors.maxPrThreadsForTest.set(2));

createSendersReceiversAndPartitionedRegion(lnPort, nyPort, false, true, true);

// make sure all the senders are running before doing any puts
waitForSendersRunning();

// Send a fairly big amount of operations to provoke the deadlock
int invocationsPerServer = 4;
AsyncInvocation[] invocations = new AsyncInvocation[senders.length * invocationsPerServer];
for (int i = 0; i < senders.length; i++) {
for (int j = 0; j < invocationsPerServer; j++) {
invocations[i + (j * invocationsPerServer)] =
senders[i].invokeAsync(() -> doPuts(getUniqueName() + "_PR", 100));
}
}

// Wait for some elements to be replicated before stopping the senders
for (int i = 0; i < senders.length; i++) {
senders[i].invoke(() -> await()
.untilAsserted(() -> assertThat(getSenderStats("ln", -1).get(3)).isGreaterThan(1)));
}

stopSendersAsync();
for (int i = 0; i < invocations.length; i++) {
invocations[i].await();
}
} finally {
for (int i = 0; i < senders.length; i++) {
senders[i].invoke(() -> AbstractGatewaySender.doSleepForTestingInDistribute.set(false));
}
}
}

/**
* Normal scenario in which a sender is stopped and then started again.
*/
Expand Down Expand Up @@ -1271,7 +1332,13 @@ private void clearShadowBucketRegions(PartitionedRegion shadowRegion) {

private void createSendersReceiversAndPartitionedRegion(Integer lnPort, Integer nyPort,
boolean createAccessors, boolean startSenders) {
createSendersAndReceivers(lnPort, nyPort);
createSendersReceiversAndPartitionedRegion(lnPort, nyPort, createAccessors, startSenders,
false);
}

private void createSendersReceiversAndPartitionedRegion(Integer lnPort, Integer nyPort,
boolean createAccessors, boolean startSenders, boolean conserveSockets) {
createSendersAndReceivers(lnPort, nyPort, conserveSockets);

createPartitionedRegions(createAccessors);

Expand All @@ -1280,11 +1347,11 @@ private void createSendersReceiversAndPartitionedRegion(Integer lnPort, Integer
}
}

private void createSendersAndReceivers(Integer lnPort, Integer nyPort) {
createCacheInVMs(nyPort, vm2, vm3);
private void createSendersAndReceivers(Integer lnPort, Integer nyPort, boolean conserveSockets) {
createCacheConserveSocketsInVMs(conserveSockets, nyPort, vm2, vm3);
createReceiverInVMs(vm2, vm3);

createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
createCacheConserveSocketsInVMs(conserveSockets, lnPort, vm4, vm5, vm6, vm7);

vm4.invoke(() -> createSender("ln", 2, true, 100, 10, false, false, null, true));
vm5.invoke(() -> createSender("ln", 2, true, 100, 10, false, false, null, true));
Expand Down Expand Up @@ -1578,6 +1645,17 @@ private void stopSenders() {
vm7.invoke(() -> stopSender("ln"));
}

private void stopSendersAsync() throws InterruptedException {
AsyncInvocation inv1 = vm4.invokeAsync(() -> stopSender("ln"));
AsyncInvocation inv2 = vm5.invokeAsync(() -> stopSender("ln"));
AsyncInvocation inv3 = vm6.invokeAsync(() -> stopSender("ln"));
AsyncInvocation inv4 = vm7.invokeAsync(() -> stopSender("ln"));
inv1.await();
inv2.await();
inv3.await();
inv4.await();
}

private void waitForSendersRunning() {
vm4.invoke(() -> waitForSenderRunningState("ln"));
vm5.invoke(() -> waitForSenderRunningState("ln"));
Expand Down

0 comments on commit 6c257d7

Please sign in to comment.