Skip to content

Commit

Permalink
GEODE-8127: Reintroduces changes that account for primary bucket chan…
Browse files Browse the repository at this point in the history
…ging (apache#5179)

* flaky test ignored

Co-authored-by: john Hutchison <[email protected]>
Co-authored-by: Jens Deppe <[email protected]>
  • Loading branch information
3 people authored May 29, 2020
1 parent 83b40f5 commit 343e114
Show file tree
Hide file tree
Showing 5 changed files with 332 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,7 @@
import org.apache.geode.internal.cache.eviction.EvictionController;
import org.apache.geode.internal.cache.eviction.HeapEvictor;
import org.apache.geode.internal.cache.execute.AbstractExecution;
import org.apache.geode.internal.cache.execute.BucketMovedException;
import org.apache.geode.internal.cache.execute.FunctionExecutionNodePruner;
import org.apache.geode.internal.cache.execute.FunctionRemoteContext;
import org.apache.geode.internal.cache.execute.InternalFunctionInvocationTargetException;
Expand Down Expand Up @@ -610,6 +611,29 @@ PartitionedRegionRedundancyTracker getRedundancyTracker() {
return redundancyTracker;
}

public void computeWithPrimaryLocked(Object key, Runnable r) {
int bucketId = PartitionedRegionHelper.getHashKey(this, null, key, null, null);

BucketRegion br;
try {
br = this.dataStore.getInitializedBucketForId(key, bucketId);
} catch (ForceReattemptException e) {
throw new BucketMovedException(e);
}

try {
br.doLockForPrimary(false);
} catch (PrimaryBucketException e) {
throw new BucketMovedException(e);
}

try {
r.run();
} finally {
br.doUnlockForPrimary();
}
}


public static class PRIdMap extends HashMap {
private static final long serialVersionUID = 3667357372967498179L;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license
* agreements. See the NOTICE file distributed with this work for additional information regarding
* copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*
*/

package org.apache.geode.redis;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

import org.apache.geode.cache.Region;
import org.apache.geode.cache.execute.Function;
import org.apache.geode.cache.execute.FunctionContext;
import org.apache.geode.cache.execute.ResultSender;
import org.apache.geode.cache.partition.PartitionRegionHelper;
import org.apache.geode.distributed.DistributedMember;
import org.apache.geode.internal.cache.LocalDataSet;
import org.apache.geode.internal.cache.execute.RegionFunctionContextImpl;
import org.apache.geode.logging.internal.log4j.api.LogService;
import org.apache.geode.redis.internal.executor.SingleResultRedisFunction;
import org.apache.geode.test.awaitility.GeodeAwaitility;

@SuppressWarnings("unchecked")
public class CheckPrimaryBucketFunction implements Function {
private static final CountDownLatch signalFunctionHasStarted = new CountDownLatch(1);
private static final CountDownLatch signalPrimaryHasMoved = new CountDownLatch(1);

public static void waitForFunctionToStart() {
try {
signalFunctionHasStarted.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}

public static void finishedMovingPrimary() {
signalPrimaryHasMoved.countDown();
}

@Override
public void execute(FunctionContext context) {
RegionFunctionContextImpl regionFunctionContext = (RegionFunctionContextImpl) context;
String key = (String) regionFunctionContext.getFilter().iterator().next();
boolean releaseLatchEarly = (boolean) context.getArguments();

ResultSender result = context.getResultSender();
DistributedMember member = context.getCache().getDistributedSystem().getDistributedMember();

if (!isMemberPrimary(regionFunctionContext, key, member)) {
LogService.getLogger().error("Member is not primary.");
result.lastResult(false);
return;
}

Region<?, ?> localRegion =
regionFunctionContext.getLocalDataSet(regionFunctionContext.getDataSet());

if (releaseLatchEarly) {
signalFunctionHasStarted.countDown();
// now wait until test has moved primary
try {
signalPrimaryHasMoved.await();
} catch (InterruptedException e) {
throw new IllegalStateException(e);
}
}

Runnable r = () -> {
if (!releaseLatchEarly) {
signalFunctionHasStarted.countDown();
}

try {
GeodeAwaitility.await()
.during(10, TimeUnit.SECONDS)
.atMost(11, TimeUnit.SECONDS)
.until(() -> isMemberPrimary(regionFunctionContext, key, member));
result.lastResult(true);
} catch (Exception e) {
e.printStackTrace();
result.lastResult(false);
}
};

SingleResultRedisFunction.computeWithPrimaryLocked(key, (LocalDataSet) localRegion, r);
}

private boolean isMemberPrimary(RegionFunctionContextImpl context, String key,
DistributedMember member) {
DistributedMember primaryForKey = PartitionRegionHelper
.getPrimaryMemberForKey(context.getDataSet(), key);

return primaryForKey.equals(member);
}

@Override
public boolean optimizeForWrite() {
return true;
}

@Override
public boolean isHA() {
return true;
}

@Override
public String getId() {
return CheckPrimaryBucketFunction.class.getName();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,169 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license
* agreements. See the NOTICE file distributed with this work for additional information regarding
* copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*
*/

package org.apache.geode.redis;

import static org.assertj.core.api.Assertions.assertThat;

import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;

import org.junit.Before;
import org.junit.Ignore;
import org.junit.Rule;
import org.junit.Test;

import org.apache.geode.cache.Cache;
import org.apache.geode.cache.Region;
import org.apache.geode.cache.control.RebalanceFactory;
import org.apache.geode.cache.control.ResourceManager;
import org.apache.geode.cache.execute.FunctionService;
import org.apache.geode.cache.execute.ResultCollector;
import org.apache.geode.cache.partition.PartitionRegionHelper;
import org.apache.geode.internal.cache.BucketAdvisor;
import org.apache.geode.internal.cache.InternalCache;
import org.apache.geode.internal.cache.PartitionedRegion;
import org.apache.geode.internal.cache.PartitionedRegionHelper;
import org.apache.geode.test.awaitility.GeodeAwaitility;
import org.apache.geode.test.dunit.AsyncInvocation;
import org.apache.geode.test.dunit.rules.ClusterStartupRule;
import org.apache.geode.test.dunit.rules.MemberVM;
import org.apache.geode.test.junit.rules.GfshCommandRule;

@Ignore("Geode-8127")
public class EnsurePrimaryStaysPutDUnitTest {

@Rule
public ClusterStartupRule cluster = new ClusterStartupRule();

@Rule
public GfshCommandRule gfsh = new GfshCommandRule();

private MemberVM locator;
private MemberVM server1;
private MemberVM server2;

private static final String KEY = "foo";
private static final String VALUE = "bar";

@Before
public void setup() throws Exception {
locator = cluster.startLocatorVM(0);
int locatorPort = locator.getPort();
server1 = cluster.startServerVM(1, cf -> cf.withConnectionToLocator(locatorPort));
server2 = cluster.startServerVM(2, cf -> cf.withConnectionToLocator(locatorPort));

gfsh.connectAndVerify(locator);
gfsh.executeAndAssertThat("create region --name=TEST --type=PARTITION_REDUNDANT")
.statusIsSuccess();

server1.invoke(() -> FunctionService.registerFunction(new CheckPrimaryBucketFunction()));
server2.invoke(() -> FunctionService.registerFunction(new CheckPrimaryBucketFunction()));
}

@Test
public void primaryRemainsWhileLocalFunctionExecutes() throws InterruptedException {
primaryRemainsWhileFunctionExecutes(true, false);
}

@Test
public void primaryRemainsWhileRemoteFunctionExecutes() throws InterruptedException {
primaryRemainsWhileFunctionExecutes(false, false);
}

@Test
public void localFunctionRetriesIfNotOnPrimary() throws InterruptedException {
primaryRemainsWhileFunctionExecutes(true, true);
}

@Test
public void remoteFunctionRetriesIfNotOnPrimary() throws InterruptedException {
primaryRemainsWhileFunctionExecutes(false, true);
}

private void primaryRemainsWhileFunctionExecutes(boolean runLocally, boolean releaseLatchEarly)
throws InterruptedException {
// Create entry and return name of primary
String memberForPrimary = server1.invoke(() -> {
InternalCache cache = ClusterStartupRule.getCache();
Region<String, String> region = cache.getRegion("TEST");
region.put(KEY, VALUE);

GeodeAwaitility.await()
.until(() -> PartitionRegionHelper.getRedundantMembersForKey(region, KEY).size() == 1);

rebalanceRegions(cache, region);

return PartitionRegionHelper.getPrimaryMemberForKey(region, KEY).getName();
});

// who is primary?
MemberVM primary = memberForPrimary.equals("server-1") ? server1 : server2;
MemberVM secondary = memberForPrimary.equals("server-1") ? server2 : server1;

MemberVM memberToRunOn = runLocally ? primary : secondary;

AsyncInvocation<Boolean> asyncChecking = memberToRunOn.invokeAsync(() -> {
InternalCache cache = ClusterStartupRule.getCache();
Region<String, String> region = cache.getRegion("TEST");

@SuppressWarnings("unchecked")
ResultCollector<?, List<Boolean>> rc = FunctionService.onRegion(region)
.withFilter(Collections.singleton(KEY))
.setArguments(releaseLatchEarly)
.execute(CheckPrimaryBucketFunction.class.getName());

return rc.getResult().get(0);
});

primary.invoke(CheckPrimaryBucketFunction::waitForFunctionToStart);

// switch primary to secondary while running test fn()
secondary.invoke(() -> {
InternalCache cache = ClusterStartupRule.getCache();
PartitionedRegion region = (PartitionedRegion) cache.getRegion("TEST");

// get bucketId
int bucketId = PartitionedRegionHelper.getHashKey(region, KEY);
BucketAdvisor bucketAdvisor = region.getRegionAdvisor().getBucketAdvisor(bucketId);

bucketAdvisor.becomePrimary(false);
CheckPrimaryBucketFunction.finishedMovingPrimary();
});
primary.invoke(CheckPrimaryBucketFunction::finishedMovingPrimary);

assertThat(asyncChecking.get())
.as("CheckPrimaryBucketFunction determined that the primary has moved")
.isTrue();
}

private static void rebalanceRegions(Cache cache, Region<?, ?> region) {
ResourceManager manager = cache.getResourceManager();
Set<String> includeRegions = new HashSet<>();
includeRegions.add(region.getName());

RebalanceFactory factory = manager.createRebalanceFactory();
factory.includeRegions(includeRegions);

try {
factory.start().getResults();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@

import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Ignore;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import redis.clients.jedis.Jedis;
Expand Down Expand Up @@ -110,7 +109,6 @@ public void run() {
}

@Test
@Ignore("GEODE-8127")
public void testConcurrentSaddOperations_runWithoutException_orDataLoss()
throws InterruptedException {
List<String> set1 = new ArrayList<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
import org.apache.geode.cache.Region;
import org.apache.geode.cache.execute.Function;
import org.apache.geode.cache.execute.FunctionContext;
import org.apache.geode.internal.cache.LocalDataSet;
import org.apache.geode.internal.cache.PartitionedRegion;
import org.apache.geode.internal.cache.execute.RegionFunctionContextImpl;
import org.apache.geode.redis.internal.ByteArrayWrapper;
import org.apache.geode.redis.internal.RedisCommandType;
Expand All @@ -38,12 +40,19 @@ public void execute(FunctionContext<Object[]> context) {

ByteArrayWrapper key =
(ByteArrayWrapper) regionFunctionContext.getFilter().iterator().next();

Region<ByteArrayWrapper, RedisSet> localRegion =
regionFunctionContext.getLocalDataSet(regionFunctionContext.getDataSet());

Object[] args = context.getArguments();
RedisCommandType command = (RedisCommandType) args[0];
Object result = compute(localRegion, key, command, args);
context.getResultSender().lastResult(result);

Runnable computation = () -> {
Object result = compute(localRegion, key, command, args);
context.getResultSender().lastResult(result);
};

computeWithPrimaryLocked(key, (LocalDataSet) localRegion, computation);
}

@Override
Expand All @@ -55,4 +64,11 @@ public boolean optimizeForWrite() {
public boolean isHA() {
return true;
}

public static void computeWithPrimaryLocked(Object key, LocalDataSet localDataSet, Runnable r) {
PartitionedRegion partitionedRegion = localDataSet.getProxy();

partitionedRegion.computeWithPrimaryLocked(key, r);
}

}

0 comments on commit 343e114

Please sign in to comment.