Skip to content

Commit 6cb7675

Browse files
authored
GEODE-10420: Finish distribute() work if interrupted (apache#7854)
It is possible that an event of which a gateway sender is to be notified is lost if during the process the thread is interrupted. The reason is that the distribute() method in the AbstractGatewaySender when it catches the InterruptedException at some point, just returns, but does not put the event in the queue and neither drops it. The fix consists of handling the event correctly (put it in the queue or drop it) if the InterruptedException is caught but when the method returns set again the interrupt flag so that the caller is aware.
1 parent 0852113 commit 6cb7675

File tree

3 files changed

+192
-9
lines changed

3 files changed

+192
-9
lines changed

geode-core/src/main/java/org/apache/geode/internal/cache/EntryEventImpl.java

+3-2
Original file line numberDiff line numberDiff line change
@@ -340,8 +340,9 @@ public EntryEventImpl(
340340
op = other.op;
341341
distributedMember = other.distributedMember;
342342
filterInfo = other.filterInfo;
343-
keyInfo = other.keyInfo.isDistKeyInfo() ? new DistTxKeyInfo((DistTxKeyInfo) other.keyInfo)
344-
: new KeyInfo(other.keyInfo);
343+
keyInfo =
344+
other.getKeyInfo().isDistKeyInfo() ? new DistTxKeyInfo((DistTxKeyInfo) other.getKeyInfo())
345+
: new KeyInfo(other.getKeyInfo());
345346
if (other.getRawCallbackArgument() instanceof GatewaySenderEventCallbackArgument) {
346347
keyInfo.setCallbackArg((new GatewaySenderEventCallbackArgument(
347348
(GatewaySenderEventCallbackArgument) other.getRawCallbackArgument())));

geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySender.java

+13-7
Original file line numberDiff line numberDiff line change
@@ -1042,6 +1042,7 @@ public void distribute(EnumListenerEvent operation, EntryEventImpl event,
10421042
List<Integer> allRemoteDSIds, boolean isLastEventInTransaction) {
10431043

10441044
final boolean isDebugEnabled = logger.isDebugEnabled();
1045+
boolean wasInterrupted = false;
10451046

10461047
// released by this method or transfers ownership to TmpQueueEvent
10471048
@Released
@@ -1156,15 +1157,17 @@ public void distribute(EnumListenerEvent operation, EntryEventImpl event,
11561157
}
11571158
}
11581159
if (enqueuedAllTempQueueEvents) {
1159-
try {
1160-
while (!getLifeCycleLock().readLock().tryLock(10, TimeUnit.MILLISECONDS)) {
1161-
if (!getIsRunningAndDropEventIfNotRunning(event, isDebugEnabled, clonedEvent)) {
1162-
return;
1160+
while (true) {
1161+
try {
1162+
while (!getLifeCycleLock().readLock().tryLock(10, TimeUnit.MILLISECONDS)) {
1163+
if (!getIsRunningAndDropEventIfNotRunning(event, isDebugEnabled, clonedEvent)) {
1164+
return;
1165+
}
11631166
}
1167+
break;
1168+
} catch (InterruptedException e) {
1169+
wasInterrupted = true;
11641170
}
1165-
} catch (InterruptedException e) {
1166-
Thread.currentThread().interrupt();
1167-
return;
11681171
}
11691172
}
11701173
}
@@ -1213,6 +1216,9 @@ this, getId(), operation, clonedEvent),
12131216
if (freeClonedEvent) {
12141217
clonedEvent.release(); // fix for bug 48035
12151218
}
1219+
if (wasInterrupted) {
1220+
Thread.currentThread().interrupt();
1221+
}
12161222
}
12171223
}
12181224

geode-core/src/test/java/org/apache/geode/internal/cache/wan/AbstractGatewaySenderTest.java

+176
Original file line numberDiff line numberDiff line change
@@ -18,15 +18,28 @@
1818
import static org.mockito.Mockito.mock;
1919
import static org.mockito.Mockito.when;
2020

21+
import java.io.IOException;
2122
import java.util.ArrayList;
2223
import java.util.Collection;
24+
import java.util.Collections;
2325
import java.util.HashSet;
26+
import java.util.List;
2427
import java.util.Set;
28+
import java.util.concurrent.CountDownLatch;
2529

2630
import org.junit.Test;
2731

32+
import org.apache.geode.cache.CacheException;
33+
import org.apache.geode.cache.DataPolicy;
34+
import org.apache.geode.cache.EntryEvent;
35+
import org.apache.geode.cache.Operation;
2836
import org.apache.geode.cache.Region;
2937
import org.apache.geode.cache.wan.GatewayQueueEvent;
38+
import org.apache.geode.distributed.internal.DistributionAdvisor;
39+
import org.apache.geode.internal.cache.EntryEventImpl;
40+
import org.apache.geode.internal.cache.EnumListenerEvent;
41+
import org.apache.geode.internal.cache.InternalRegion;
42+
import org.apache.geode.internal.cache.KeyInfo;
3043
import org.apache.geode.internal.cache.RegionQueue;
3144

3245
public class AbstractGatewaySenderTest {
@@ -58,4 +71,167 @@ public void getSynchronizationEventCanHandleRegionIsNullCase() {
5871

5972
assertThat(event).isSameAs(gatewaySenderEvent);
6073
}
74+
75+
@Test
76+
public void distributeFinishesWorkWhenInterrupted() throws InterruptedException {
77+
DummyGatewaySenderEventProcessor processor = new DummyGatewaySenderEventProcessor();
78+
TestableGatewaySender gatewaySender = new TestableGatewaySender(processor);
79+
EnumListenerEvent operationType = EnumListenerEvent.AFTER_CREATE;
80+
EntryEventImpl event = mock(EntryEventImpl.class);
81+
when(event.getKeyInfo()).thenReturn(mock(KeyInfo.class));
82+
Operation operation = mock(Operation.class);
83+
when(operation.isLocal()).thenReturn(false);
84+
when(operation.isExpiration()).thenReturn(false);
85+
when(event.getOperation()).thenReturn(operation);
86+
InternalRegion region = mock(InternalRegion.class);
87+
when(region.getDataPolicy()).thenReturn(DataPolicy.PARTITION);
88+
when(event.getRegion()).thenReturn(region);
89+
List<Integer> allRemoteDSIds = Collections.singletonList(1);
90+
91+
CountDownLatch lockAcquiredLatch = new CountDownLatch(1);
92+
CountDownLatch unlockLatch = new CountDownLatch(1);
93+
94+
// Get lifeCycleLock in write mode in new thread so that
95+
// the thread calling distribute will not be able
96+
// to acquire it
97+
Thread thread = new Thread(() -> {
98+
gatewaySender.getLifeCycleLock().writeLock().lock();
99+
lockAcquiredLatch.countDown();
100+
try {
101+
unlockLatch.await();
102+
} catch (InterruptedException ignore) {
103+
}
104+
gatewaySender.getLifeCycleLock().writeLock().unlock();
105+
});
106+
thread.start();
107+
lockAcquiredLatch.await();
108+
109+
// Send interrupted and then call distribute
110+
Thread.currentThread().interrupt();
111+
gatewaySender.distribute(operationType, event, allRemoteDSIds, true);
112+
113+
unlockLatch.countDown();
114+
115+
// Check that the interrupted exception has been reset
116+
assertThat(Thread.currentThread().isInterrupted()).isTrue();
117+
// Check that the work was finished even if the interrupt signal was set
118+
assertThat(processor.getTimesRegisterEventDroppedInPrimaryQueueCalled()).isEqualTo(1);
119+
}
120+
121+
public static class TestableGatewaySender extends AbstractGatewaySender {
122+
private int isRunningTimesCalled = 0;
123+
124+
public TestableGatewaySender(AbstractGatewaySenderEventProcessor eventProcessor) {
125+
this.eventProcessor = eventProcessor;
126+
enqueuedAllTempQueueEvents = true;
127+
}
128+
129+
@Override
130+
public void fillInProfile(DistributionAdvisor.Profile profile) {}
131+
132+
@Override
133+
public void start() {}
134+
135+
@Override
136+
public boolean isPrimary() {
137+
return true;
138+
}
139+
140+
@Override
141+
public void startWithCleanQueue() {}
142+
143+
@Override
144+
public void prepareForStop() {}
145+
146+
@Override
147+
public void stop() {}
148+
149+
@Override
150+
public void setModifiedEventId(EntryEventImpl clonedEvent) {}
151+
152+
@Override
153+
public GatewaySenderStats getStatistics() {
154+
return mock(GatewaySenderStats.class);
155+
}
156+
157+
@Override
158+
public GatewaySenderAdvisor getSenderAdvisor() {
159+
return mock(GatewaySenderAdvisor.class);
160+
}
161+
162+
@Override
163+
public boolean isRunning() {
164+
if (isRunningTimesCalled++ == 0) {
165+
return true;
166+
}
167+
return false;
168+
}
169+
170+
@Override
171+
public String getId() {
172+
return "test";
173+
}
174+
}
175+
176+
public static class DummyGatewaySenderEventProcessor extends AbstractGatewaySenderEventProcessor {
177+
178+
private int timesEnqueueEventCalled = 0;
179+
private int timesRegisterEventDroppedInPrimaryQueueCalled = 0;
180+
181+
public DummyGatewaySenderEventProcessor() {
182+
super("", new DummyGatewaySender(), null);
183+
}
184+
185+
@Override
186+
public void enqueueEvent(EnumListenerEvent operation, EntryEvent event, Object substituteValue,
187+
boolean isLastEventInTransaction) throws IOException, CacheException {
188+
timesEnqueueEventCalled++;
189+
}
190+
191+
public int getTimesEnqueueEventCalled() {
192+
return timesEnqueueEventCalled;
193+
}
194+
195+
@Override
196+
protected void initializeMessageQueue(String id, boolean cleanQueues) {}
197+
198+
@Override
199+
protected void rebalance() {}
200+
201+
public int getTimesRegisterEventDroppedInPrimaryQueueCalled() {
202+
return timesRegisterEventDroppedInPrimaryQueueCalled;
203+
}
204+
205+
@Override
206+
protected void registerEventDroppedInPrimaryQueue(EntryEventImpl droppedEvent) {
207+
timesRegisterEventDroppedInPrimaryQueueCalled++;
208+
}
209+
210+
@Override
211+
public void initializeEventDispatcher() {}
212+
213+
@Override
214+
protected void enqueueEvent(GatewayQueueEvent event) {}
215+
}
216+
217+
public static class DummyGatewaySender extends AbstractGatewaySender {
218+
@Override
219+
public void fillInProfile(DistributionAdvisor.Profile profile) {}
220+
221+
@Override
222+
public void start() {}
223+
224+
@Override
225+
public void startWithCleanQueue() {}
226+
227+
@Override
228+
public void prepareForStop() {}
229+
230+
@Override
231+
public void stop() {}
232+
233+
@Override
234+
public void setModifiedEventId(EntryEventImpl clonedEvent) {}
235+
236+
}
61237
}

0 commit comments

Comments
 (0)