Skip to content

Commit

Permalink
GEODE-4712 GEODE-5943: shut down the bucketSorter when destroying the…
Browse files Browse the repository at this point in the history
… partitioned region (apache#2845)
  • Loading branch information
jinmeiliao authored Nov 15, 2018
1 parent c39030a commit 3f4474c
Show file tree
Hide file tree
Showing 4 changed files with 67 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ public void before() {
server1 = cluster.startServerVM(1, s -> s.withNoCacheServer()
.withProperties(properties).withConnectionToLocator(locatorPort));

VMProvider.invokeInEveryMember(() -> {
VMProvider.invokeInEveryMember("setup VM", () -> {
HeapMemoryMonitor.setTestDisableMemoryUpdates(true);
System.setProperty("gemfire.memoryEventTolerance", "0");
InternalCache cache = ClusterStartupRule.getCache();
Expand All @@ -104,7 +104,7 @@ public void before() {

@Test
public void testDummyInlineNCentralizedEviction() {
VMProvider.invokeInEveryMember(() -> {
VMProvider.invokeInEveryMember("create region", () -> {
ServerStarterRule server = (ServerStarterRule) ClusterStartupRule.memberStarter;
server.createPartitionRegion("PR1",
f -> f.setOffHeap(offHeap).setEvictionAttributes(
Expand All @@ -113,7 +113,7 @@ public void testDummyInlineNCentralizedEviction() {

}, server0, server1);

server0.invoke(() -> {
server0.invoke("put data", () -> {
Region region = ClusterStartupRule.getCache().getRegion("PR1");
for (int counter = 1; counter <= 50; counter++) {
region.put(counter, new byte[ENTRY_SIZE]);
Expand All @@ -124,7 +124,7 @@ public void testDummyInlineNCentralizedEviction() {
int server1ExpectedEviction = server1.invoke(() -> sendEventAndWaitForExpectedEviction("PR1"));

// do 4 puts again in PR1
server0.invoke(() -> {
server0.invoke("put more data", () -> {
Region region = ClusterStartupRule.getCache().getRegion("PR1");
for (int counter = 1; counter <= 4; counter++) {
region.put(counter, new byte[ENTRY_SIZE]);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
package org.apache.geode.internal.cache;

import static org.assertj.core.api.Assertions.assertThat;

import java.util.concurrent.ScheduledExecutorService;

import org.junit.Rule;
import org.junit.Test;

import org.apache.geode.cache.EvictionAction;
import org.apache.geode.cache.EvictionAttributes;
import org.apache.geode.cache.RegionShortcut;
import org.apache.geode.test.junit.rules.ServerStarterRule;

public class PartitionedRegionIntegrationTest {

@Rule
public ServerStarterRule server = new ServerStarterRule().withNoCacheServer().withAutoStart();

@Test
public void bucketSorterShutdownAfterRegionDestroy() {
PartitionedRegion region =
(PartitionedRegion) server.createRegion(RegionShortcut.PARTITION, "PR1",
f -> f.setEvictionAttributes(
EvictionAttributes.createLRUHeapAttributes(null, EvictionAction.LOCAL_DESTROY)));

ScheduledExecutorService bucketSorter = region.getBucketSorter();
assertThat(bucketSorter).isNotNull();

region.destroyRegion();

assertThat(bucketSorter.isShutdown()).isTrue();
}

@Test
public void bucketSorterIsNotCreatedIfNoEviction() {
PartitionedRegion region =
(PartitionedRegion) server.createRegion(RegionShortcut.PARTITION, "PR1",
rf -> rf.setOffHeap(false));
ScheduledExecutorService bucketSorter = region.getBucketSorter();
assertThat(bucketSorter).isNull();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -467,6 +467,9 @@ public void removeColocationListener(ColocationListener colocationListener) {
colocationListeners.remove(colocationListener);
}

ScheduledExecutorService getBucketSorter() {
return bucketSorter;
}

static PRIdMap getPrIdToPR() {
return prIdToPR;
Expand Down Expand Up @@ -7628,6 +7631,10 @@ protected void postDestroyRegion(boolean destroyDiskRegion, RegionEventImpl even
colocatedWithRegion.getColocatedByList().remove(this);
}

if (bucketSorter != null) {
bucketSorter.shutdown();
}

RegionLogger.logDestroy(getName(),
this.cache.getInternalDistributedSystem().getDistributedMember(), null, op.isClose());
}
Expand Down Expand Up @@ -9243,11 +9250,11 @@ public PartitionedRegion getColocatedWithRegion() {
public List<BucketRegion> getSortedBuckets() {
if (!bucketSorterStarted.get()) {
bucketSorterStarted.set(true);
this.bucketSorter.scheduleAtFixedRate(new BucketSorterThread(), 0,
this.bucketSorter.scheduleAtFixedRate(new BucketSorterRunnable(), 0,
HeapEvictor.BUCKET_SORTING_INTERVAL, TimeUnit.MILLISECONDS);
if (logger.isDebugEnabled()) {
logger.debug(
"Started BucketSorter to sort the buckets according to numver of entries in each bucket for every {} milliseconds",
"Started BucketSorter to sort the buckets according to number of entries in each bucket for every {} milliseconds",
HeapEvictor.BUCKET_SORTING_INTERVAL);
}
}
Expand All @@ -9259,7 +9266,7 @@ public List<BucketRegion> getSortedBuckets() {
return bucketList;
}

class BucketSorterThread implements Runnable {
class BucketSorterRunnable implements Runnable {
@Override
public void run() {
try {
Expand Down Expand Up @@ -9290,7 +9297,7 @@ public int compare(BucketRegion buk1, BucketRegion buk2) {
}
} catch (Exception e) {
if (logger.isDebugEnabled()) {
logger.debug("BucketSorterThread : encountered Exception ", e);
logger.debug("BucketSorterRunnable : encountered Exception ", e);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,15 @@ public static void invokeInEveryMember(SerializableRunnableIF runnableIF, VMProv
Arrays.stream(members).forEach(member -> member.invoke(runnableIF));
}

public static void invokeInEveryMember(String name, SerializableRunnableIF runnableIF,
VMProvider... members) {
if (ArrayUtils.isEmpty(members)) {
throw new IllegalArgumentException("Array of members must not be null nor empty.");
}

Arrays.stream(members).forEach(member -> member.invoke(name, runnableIF));
}

public abstract VM getVM();

public void stop() {
Expand Down

0 comments on commit 3f4474c

Please sign in to comment.