Skip to content

Commit 4cb75ae

Browse files
authored
GEODE-10395 remove locks from List if dlock.acquireTryLocks return false (apache#7846)
1 parent 7d7a98b commit 4cb75ae

File tree

3 files changed

+95
-6
lines changed

3 files changed

+95
-6
lines changed

geode-core/src/main/java/org/apache/geode/internal/cache/locks/TXLockServiceImpl.java

+22-4
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222

2323
import org.apache.logging.log4j.Logger;
2424

25+
import org.apache.geode.annotations.VisibleForTesting;
2526
import org.apache.geode.cache.Cache;
2627
import org.apache.geode.cache.CommitConflictException;
2728
import org.apache.geode.distributed.internal.InternalDistributedSystem;
@@ -53,7 +54,7 @@ public class TXLockServiceImpl extends TXLockService {
5354
/**
5455
* List of active txLockIds
5556
*/
56-
protected List txLockIdList = new ArrayList();
57+
protected final List<TXLockId> txLockIdList = new ArrayList<>();
5758

5859
/**
5960
* True if grantor recovery is in progress; used to keep <code>release</code> from waiting for
@@ -70,6 +71,14 @@ public class TXLockServiceImpl extends TXLockService {
7071
/** The distributed system for cancellation checks. */
7172
private final InternalDistributedSystem system;
7273

74+
@VisibleForTesting
75+
TXLockServiceImpl(InternalDistributedSystem sys, StoppableReentrantReadWriteLock recoveryLock,
76+
DLockService dlock) {
77+
system = sys;
78+
this.recoveryLock = recoveryLock;
79+
this.dlock = dlock;
80+
}
81+
7382
TXLockServiceImpl(String name, InternalDistributedSystem sys) {
7483
if (sys == null) {
7584
throw new IllegalStateException(
@@ -129,10 +138,16 @@ public TXLockId txLock(List regionLockReqs, Set txParticipants) throws CommitCon
129138
if (gotLocks) { // ...otherwise race can occur between tryLocks and readLock
130139
acquireRecoveryReadLock();
131140
} else if (keyIfFail[0] != null) {
141+
synchronized (txLockIdList) {
142+
txLockIdList.remove(txLockId);
143+
}
132144
throw new CommitConflictException(
133145
String.format("Concurrent transaction commit detected %s",
134146
keyIfFail[0]));
135147
} else {
148+
synchronized (txLockIdList) {
149+
txLockIdList.remove(txLockId);
150+
}
136151
throw new CommitConflictException(
137152
String.format("Failed to request try locks from grantor: %s",
138153
dlock.getLockGrantorId()));
@@ -225,9 +240,7 @@ public void release(TXLockId txLockId) {
225240
txLockId));
226241
}
227242

228-
dlock.releaseTryLocks(txLockId, () -> {
229-
return recovering;
230-
});
243+
dlock.releaseTryLocks(txLockId, () -> recovering);
231244

232245
txLockIdList.remove(txLockId);
233246
releaseRecoveryReadLock();
@@ -277,4 +290,9 @@ void basicDestroy() {
277290
dlock.destroyAndRemove();
278291
}
279292

293+
@VisibleForTesting
294+
public int getTxLockIdList() {
295+
return this.txLockIdList.size();
296+
}
297+
280298
}

geode-core/src/test/java/org/apache/geode/distributed/internal/StartupMessageJUnitTest.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -95,7 +95,7 @@ public void startupMessageGetProcessorTypeIsWaitingPool() {
9595
startupMessage.process(distributionManager);
9696

9797
assertThat(
98-
startupMessage.getProcessorType() == OperationExecutors.WAITING_POOL_EXECUTOR);
98+
startupMessage.getProcessorType()).isEqualTo(OperationExecutors.WAITING_POOL_EXECUTOR);
9999
}
100100

101101
@Test
@@ -111,6 +111,6 @@ public void startupResponseMessageGetProcessorTypeIsWaitingPool() {
111111

112112
assertThat(
113113
startupResponseMessage
114-
.getProcessorType() == OperationExecutors.WAITING_POOL_EXECUTOR);
114+
.getProcessorType()).isEqualTo(OperationExecutors.WAITING_POOL_EXECUTOR);
115115
}
116116
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,71 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license
3+
* agreements. See the NOTICE file distributed with this work for additional information regarding
4+
* copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
5+
* "License"); you may not use this file except in compliance with the License. You may obtain a
6+
* copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software distributed under the License
11+
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
12+
* or implied. See the License for the specific language governing permissions and limitations under
13+
* the License.
14+
*/
15+
16+
package org.apache.geode.internal.cache.locks;
17+
18+
import static org.assertj.core.api.Assertions.assertThat;
19+
import static org.junit.jupiter.api.Assertions.assertThrows;
20+
import static org.mockito.Mockito.mock;
21+
import static org.mockito.Mockito.when;
22+
23+
import java.util.ArrayList;
24+
import java.util.List;
25+
import java.util.Set;
26+
27+
import org.junit.Before;
28+
import org.junit.Test;
29+
30+
import org.apache.geode.cache.CommitConflictException;
31+
import org.apache.geode.distributed.internal.DistributionManager;
32+
import org.apache.geode.distributed.internal.InternalDistributedSystem;
33+
import org.apache.geode.distributed.internal.locks.DLockService;
34+
import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
35+
import org.apache.geode.internal.util.concurrent.StoppableReentrantReadWriteLock;
36+
37+
public class TXLockServiceImplTest {
38+
private TXLockServiceImpl txLockService;
39+
private InternalDistributedSystem internalDistributedSystem;
40+
private DLockService dlock;
41+
private List distLocks;
42+
private Set otherMembers;
43+
private DistributionManager distributionManager;
44+
private InternalDistributedMember distributedMember;
45+
private StoppableReentrantReadWriteLock recoverylock;
46+
47+
@Before
48+
public void setUp() {
49+
internalDistributedSystem = mock(InternalDistributedSystem.class);
50+
dlock = mock(DLockService.class);
51+
distributionManager = mock(DistributionManager.class);
52+
distributedMember = mock(InternalDistributedMember.class);
53+
recoverylock = mock(StoppableReentrantReadWriteLock.class);
54+
}
55+
56+
@Test
57+
public void testTxLockService() {
58+
distLocks = new ArrayList();
59+
txLockService = new TXLockServiceImpl(internalDistributedSystem, recoverylock, dlock);
60+
61+
when(dlock.getDistributionManager()).thenReturn(distributionManager);
62+
when(dlock.getDistributionManager().getId()).thenReturn(distributedMember);
63+
64+
assertThat((txLockService).getTxLockIdList()).isEqualTo(0);
65+
66+
assertThrows(CommitConflictException.class,
67+
() -> txLockService.txLock(distLocks, otherMembers));
68+
69+
assertThat((txLockService).getTxLockIdList()).isEqualTo(0);
70+
}
71+
}

0 commit comments

Comments
 (0)