Skip to content

Commit

Permalink
Feature/geode 6016 (apache#2820)
Browse files Browse the repository at this point in the history
* GEODE-6016: Make client onRegion function transactional.

 * Do not use a function execution thread to send message to server even if
   singleHop is enabled when function is in transaction.
 * Throw UnsupportedOperationException if client executes onMembers function
   using transaction - to behave similar to onServers function when using
   transaction.
  • Loading branch information
pivotal-eshu authored Nov 9, 2018
1 parent 3b2ff44 commit c14d960
Show file tree
Hide file tree
Showing 3 changed files with 179 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
import org.apache.geode.cache.client.ClientRegionShortcut;
import org.apache.geode.cache.client.PoolFactory;
import org.apache.geode.cache.client.PoolManager;
import org.apache.geode.cache.client.ServerOperationException;
import org.apache.geode.cache.client.internal.PoolImpl;
import org.apache.geode.cache.execute.Execution;
import org.apache.geode.cache.execute.Function;
Expand Down Expand Up @@ -78,6 +79,13 @@ public class FixedPartitioningWithTransactionDistributedTest implements
private VM server1;
private VM server2;
private VM accessor;
private int port1;
private int port2;
private transient PoolImpl pool;

private enum Type {
ON_REGION, ON_SERVER, ON_MEMBER
}

private static final String FIXED_PARTITION_NAME = "singleBucket";

Expand Down Expand Up @@ -122,14 +130,11 @@ public void tearDown() {
public void executeFunctionOnMovedPrimaryBucketFailWithTransactionDataRebalancedException() {
createData();

server1.invoke(() -> {
cacheRule.closeAndNullCache();
});
server1.invoke(() -> cacheRule.closeAndNullCache());

TransactionId transactionId = server2.invoke(() -> doFunctionTransactionAndSuspend());

server1.invoke(() -> restartPrimary());

server2.invoke(() -> {
assertThatThrownBy(() -> resumeFunctionTransaction(transactionId))
.isInstanceOf(TransactionDataRebalancedException.class);
Expand Down Expand Up @@ -188,6 +193,100 @@ public void clientExecuteFunctionOnMovedPrimaryBucketFailWithTransactionDataReba
.isInstanceOf(TransactionDataRebalancedException.class);
}

@Test
public void clientCanRollbackFunctionOnRegionWithFilterAndWithSingleHopEnabled() {
setupServers();
setupClient();

Region region = clientCacheRule.getClientCache().getRegion(regionName);
CacheTransactionManager txManager =
clientCacheRule.getClientCache().getCacheTransactionManager();

TransactionId transactionId =
doFunctionTransactionAndSuspend(region, txManager, new MyTransactionFunction());
txManager.resume(transactionId);
txManager.rollback();

server1.invoke(() -> {
assertThat(cacheRule.getCache().getRegion(regionName).get(2)).isEqualTo(2);
});
}

@Test
public void clientCanRollbackFunctionOnRegionWithoutFilterAndWithSingleHopEnabled() {
setupServers();
setupClient();

Region region = clientCacheRule.getClientCache().getRegion(regionName);
CacheTransactionManager txManager =
clientCacheRule.getClientCache().getCacheTransactionManager();

try {
TransactionId transactionId = doFunctionTransactionAndSuspend(region, txManager,
new MyTransactionFunction(), Type.ON_REGION, false);
txManager.resume(transactionId);
txManager.rollback();
} catch (FunctionException functionException) {
// without filter function can target to any server and may not go to primary.
assertThat(functionException.getCause()).isInstanceOf(ServerOperationException.class);
assertThat(functionException.getCause().getCause()).isInstanceOf(FunctionException.class);
assertThat(functionException.getCause().getCause().getCause())
.isInstanceOf(TransactionDataRebalancedException.class);
txManager.rollback();
}

server1.invoke(() -> {
assertThat(cacheRule.getCache().getRegion(regionName).get(2)).isEqualTo(2);
});
}

@Test
public void clientTransactionFailsIfExecuteFunctionOnMember() {
setupServers();
setupClient();

Region region = clientCacheRule.getClientCache().getRegion(regionName);
CacheTransactionManager txManager =
clientCacheRule.getClientCache().getCacheTransactionManager();

Throwable caughtException = catchThrowable(() -> doFunctionTransactionAndSuspend(region,
txManager, new MyTransactionFunction(), Type.ON_MEMBER));

assertThat(caughtException).isInstanceOf(UnsupportedOperationException.class);
txManager.rollback();
}

@Test
public void clientTransactionFailsIfExecuteFunctionOnServer() {
setupServers();
setupClient();

Region region = clientCacheRule.getClientCache().getRegion(regionName);
CacheTransactionManager txManager =
clientCacheRule.getClientCache().getCacheTransactionManager();

Throwable caughtException = catchThrowable(() -> doFunctionTransactionAndSuspend(region,
txManager, new MyTransactionFunction(), Type.ON_SERVER));

assertThat(caughtException).isInstanceOf(FunctionException.class);
assertThat(caughtException.getCause()).isInstanceOf(UnsupportedOperationException.class);
txManager.rollback();
}

private void setupServers() {
port1 = server1.invoke(() -> createServerRegion(true, 1, 1));
port2 = server2.invoke(() -> createServerRegion(false, 1, 1));

server1.invoke(() -> registerFunctions());
server2.invoke(() -> registerFunctions());
}

private void setupClient() {
createClientRegion(true, true, port1, port2);
Region region = clientCacheRule.getClientCache().getRegion(regionName);
doPuts(region);
}

private void restartPrimary() throws Exception {
createServerRegion(true, 1, 1);
PartitionedRegion partitionedRegion =
Expand All @@ -204,7 +303,10 @@ private void createData() {
}

private void doPuts() {
Region region = cacheRule.getCache().getRegion(regionName);
doPuts(cacheRule.getCache().getRegion(regionName));
}

private void doPuts(Region region) {
region.put(1, 1);
region.put(2, 2);
region.put(3, 3);
Expand Down Expand Up @@ -239,12 +341,16 @@ private int createServerRegion(boolean isPrimary, int redundantCopies, int total
}

private void createClientRegion(boolean connectToFirstPort, int... ports) {
createClientRegion(connectToFirstPort, false, ports);
}

private void createClientRegion(boolean connectToFirstPort, boolean singleHopEnabled,
int... ports) {
clientCacheRule.createClientCache();

CacheServerTestUtil.disableShufflingOfEndpoints();
PoolImpl pool;
try {
pool = getPool(ports);
pool = getPool(singleHopEnabled, ports);
} finally {
CacheServerTestUtil.enableShufflingOfEndpoints();
}
Expand All @@ -260,12 +366,12 @@ private void createClientRegion(boolean connectToFirstPort, int... ports) {
}
}

private PoolImpl getPool(int... ports) {
private PoolImpl getPool(boolean singleHopEnabled, int... ports) {
PoolFactory factory = PoolManager.createFactory();
for (int port : ports) {
factory.addServer(hostName, port);
}
factory.setPRSingleHopEnabled(false);
factory.setPRSingleHopEnabled(singleHopEnabled);
return (PoolImpl) factory.create(uniqueName);
}

Expand All @@ -282,11 +388,43 @@ private TransactionId doFunctionTransactionAndSuspend() {

private TransactionId doFunctionTransactionAndSuspend(Region region,
CacheTransactionManager manager) {
Execution execution = FunctionService.onRegion(region);
manager.begin();
return doFunctionTransactionAndSuspend(region, manager, new MySuspendTransactionFunction());
}

private TransactionId doFunctionTransactionAndSuspend(Region region,
CacheTransactionManager manager, Function function) {
return doFunctionTransactionAndSuspend(region, manager, function, Type.ON_REGION);
}

private TransactionId doFunctionTransactionAndSuspend(Region region,
CacheTransactionManager manager, Function function, Type type) {
return doFunctionTransactionAndSuspend(region, manager, function, type, true);
}

private TransactionId doFunctionTransactionAndSuspend(Region region,
CacheTransactionManager manager, Function function, Type type, boolean withFilter) {
Execution execution;
Set keySet = new HashSet();
keySet.add(2);
ResultCollector resultCollector = execution.execute(new MySuspendTransactionFunction());
switch (type) {
case ON_MEMBER:
execution = FunctionService.onMembers();
break;
case ON_REGION:
execution = FunctionService.onRegion(region);
if (withFilter) {
execution = execution.withFilter(keySet);
}
break;
case ON_SERVER:
execution = FunctionService.onServers(pool);
break;
default:
throw new RuntimeException("unexpected type");
}

manager.begin();
ResultCollector resultCollector = execution.execute(function);
resultCollector.getResult();
return manager.suspend();
}
Expand All @@ -312,7 +450,7 @@ private void resumeFunctionTransaction(TransactionId transactionId, Region regio
}
}

private static class MyFixedPartitionResolver implements FixedPartitionResolver {
public static class MyFixedPartitionResolver implements FixedPartitionResolver {

public MyFixedPartitionResolver() {}

Expand Down Expand Up @@ -372,4 +510,26 @@ public void fromData(DataInput in) throws IOException, ClassNotFoundException {

}
}

public static class MyTransactionFunction implements Function, DataSerializable {
@Override
public void execute(FunctionContext context) {
if (context instanceof RegionFunctionContext) {
PartitionedRegion region =
(PartitionedRegion) ((RegionFunctionContext) context).getDataSet();
region.destroy(2);
context.getResultSender().lastResult(Boolean.TRUE);
}
}

@Override
public void toData(DataOutput out) throws IOException {

}

@Override
public void fromData(DataInput in) throws IOException, ClassNotFoundException {

}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -670,8 +670,9 @@ public void executeFunction(String rgnName, Function function,
serverRegionExecutor, resultCollector, Byte.valueOf(hasResult));

int retryAttempts = pool.getRetryAttempts();
boolean inTransaction = TXManagerImpl.getCurrentTXState() != null;

if (this.pool.getPRSingleHopEnabled()) {
if (this.pool.getPRSingleHopEnabled() && !inTransaction) {
ClientMetadataService cms = region.getCache().getClientMetadataService();
if (cms.isMetadataStable()) {
if (serverRegionExecutor.getFilter().isEmpty()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,10 @@ public void validateExecution(final Function function, final Set dest) {
"Function inside a transaction cannot execute on more than one node");
} else {
assert dest.size() == 1;
if (cache.isClient()) {
throw new UnsupportedOperationException(
"Client function execution on members is not supported with transaction");
}
DistributedMember funcTarget = (DistributedMember) dest.iterator().next();
DistributedMember target = cache.getTxManager().getTXState().getTarget();
if (target == null) {
Expand Down

0 comments on commit c14d960

Please sign in to comment.