Skip to content

Commit

Permalink
Merge pull request apache#2015 from apache/feature/GEODE-5277
Browse files Browse the repository at this point in the history
GEODE-5277: Release server affinity immediately after commit
  • Loading branch information
jchen21 authored Jun 6, 2018
2 parents c9e7d88 + 4e2f2bc commit bdfab05
Show file tree
Hide file tree
Showing 2 changed files with 51 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -109,13 +109,17 @@ public ClientTXStateStub(InternalCache cache, DistributionManager dm, TXStatePro
public void commit() throws CommitConflictException {
obtainLocalLocks();
try {
TXCommitMessage txcm = firstProxy.commit(proxy.getTxId().getUniqId());
TXCommitMessage txcm = null;
try {
txcm = firstProxy.commit(proxy.getTxId().getUniqId());
} finally {
this.firstProxy.getPool().releaseServerAffinity();
}
afterServerCommit(txcm);
} catch (TransactionDataNodeHasDepartedException e) {
throw new TransactionInDoubtException(e);
} finally {
lockReq.releaseLocal();
this.firstProxy.getPool().releaseServerAffinity();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,26 +16,33 @@

import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.inOrder;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.when;

import org.junit.Before;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.mockito.InOrder;

import org.apache.geode.CancelCriterion;
import org.apache.geode.CancelException;
import org.apache.geode.InternalGemFireError;
import org.apache.geode.cache.CacheClosedException;
import org.apache.geode.cache.client.internal.InternalPool;
import org.apache.geode.cache.client.internal.ServerRegionProxy;
import org.apache.geode.distributed.DistributedMember;
import org.apache.geode.distributed.internal.DistributionManager;
import org.apache.geode.internal.cache.InternalCache;
import org.apache.geode.internal.cache.LocalRegion;
import org.apache.geode.internal.cache.TXCommitMessage;
import org.apache.geode.internal.cache.TXId;
import org.apache.geode.internal.cache.TXLockRequest;
import org.apache.geode.internal.cache.TXManagerImpl;
import org.apache.geode.internal.cache.TXRegionLockRequestImpl;
import org.apache.geode.internal.cache.TXStateProxy;
import org.apache.geode.test.junit.categories.UnitTest;
Expand All @@ -50,6 +57,7 @@ public class ClientTXStateStubTest {
private LocalRegion region;
private ServerRegionProxy serverRegionProxy;
private CancelCriterion cancelCriterion;
private InternalPool internalPool;

@Before
public void setUp() {
Expand All @@ -59,10 +67,11 @@ public void setUp() {
target = mock(DistributedMember.class);
region = mock(LocalRegion.class);
serverRegionProxy = mock(ServerRegionProxy.class);
internalPool = mock(InternalPool.class);
cancelCriterion = mock(CancelCriterion.class);

when(region.getServerProxy()).thenReturn(serverRegionProxy);
when(serverRegionProxy.getPool()).thenReturn(mock(InternalPool.class));
when(serverRegionProxy.getPool()).thenReturn(internalPool);
when(stateProxy.getTxId()).thenReturn(mock(TXId.class));
when(cache.getCancelCriterion()).thenReturn(cancelCriterion);
doThrow(new CacheClosedException()).when(cancelCriterion).checkCancelInProgress(any());
Expand All @@ -79,4 +88,39 @@ public void commitThrowsCancelExceptionIfCacheIsClosed() {
assertThatThrownBy(() -> stub.commit()).isInstanceOf(CancelException.class);
}

@Test
public void commitReleasesServerAffinityAfterCommit() {
TXCommitMessage txCommitMessage = mock(TXCommitMessage.class);
TXManagerImpl txManager = mock(TXManagerImpl.class);
when(cache.getTxManager()).thenReturn(txManager);
when(serverRegionProxy.commit(anyInt())).thenReturn(txCommitMessage);

doNothing().when(cancelCriterion).checkCancelInProgress(null);
doNothing().when(txManager).setTXState(null);
ClientTXStateStub stub = spy(new ClientTXStateStub(cache, dm, stateProxy, target, region));

InOrder order = inOrder(serverRegionProxy, internalPool, cancelCriterion);
stub.commit();

order.verify(serverRegionProxy).commit(anyInt());
order.verify(internalPool).releaseServerAffinity();
order.verify(cancelCriterion).checkCancelInProgress(null);
}

@Test
public void commitReleasesServerAffinity_whenCommitThrowsAnException() {
TXManagerImpl txManager = mock(TXManagerImpl.class);
when(cache.getTxManager()).thenReturn(txManager);
when(serverRegionProxy.commit(anyInt())).thenThrow(new InternalGemFireError());

doNothing().when(cancelCriterion).checkCancelInProgress(null);
doNothing().when(txManager).setTXState(null);
ClientTXStateStub stub = spy(new ClientTXStateStub(cache, dm, stateProxy, target, region));

InOrder order = inOrder(serverRegionProxy, internalPool);
assertThatThrownBy(() -> stub.commit()).isInstanceOf(InternalGemFireError.class);

order.verify(serverRegionProxy).commit(anyInt());
order.verify(internalPool).releaseServerAffinity();
}
}

0 comments on commit bdfab05

Please sign in to comment.