diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/GemFireCacheImpl.java b/geode-core/src/main/java/org/apache/geode/internal/cache/GemFireCacheImpl.java index 978e86341035..de7558c22b15 100755 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/GemFireCacheImpl.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/GemFireCacheImpl.java @@ -2039,6 +2039,16 @@ public OffHeapEvictor getOffHeapEvictor() { } } + /** Used by test to inject an evictor */ + void setOffHeapEvictor(OffHeapEvictor evictor) { + this.offHeapEvictor = evictor; + } + + /** Used by test to inject an evictor */ + void setHeapEvictor(HeapEvictor evictor) { + this.heapEvictor = evictor; + } + @Override public PersistentMemberManager getPersistentMemberManager() { return this.persistentMemberManager; @@ -2313,10 +2323,8 @@ public void close(String reason, Throwable systemFailureCause, boolean keepAlive if (cms != null) { cms.close(); } - HeapEvictor he = this.heapEvictor; - if (he != null) { - he.close(); - } + closeHeapEvictor(); + closeOffHeapEvictor(); } catch (CancelException ignore) { // make sure the disk stores get closed closeDiskStores(); @@ -2385,6 +2393,20 @@ public void close(String reason, Throwable systemFailureCause, boolean keepAlive } + private void closeOffHeapEvictor() { + OffHeapEvictor evictor = this.offHeapEvictor; + if (evictor != null) { + evictor.close(); + } + } + + private void closeHeapEvictor() { + HeapEvictor evictor = this.heapEvictor; + if (evictor != null) { + evictor.close(); + } + } + @Override public boolean isReconnecting() { return this.system.isReconnecting(); diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/RegionEvictorTask.java b/geode-core/src/main/java/org/apache/geode/internal/cache/RegionEvictorTask.java index da6a671b46c6..a4677265574d 100755 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/RegionEvictorTask.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/RegionEvictorTask.java @@ -34,7 +34,7 @@ * @since GemFire 6.0 * */ -public class RegionEvictorTask implements Callable { +public class RegionEvictorTask implements Runnable { private static final Logger logger = LogService.getLogger(); @@ -85,7 +85,8 @@ private HeapEvictor getHeapEvictor() { return this.evictor; } - public Object call() throws Exception { + @Override + public void run() { getGemFireCache().getCachePerfStats().incEvictorJobsStarted(); long bytesEvicted = 0; long totalBytesEvicted = 0; @@ -96,7 +97,7 @@ public Object call() throws Exception { synchronized (this.regionSet) { if (this.regionSet.isEmpty()) { lastTaskCompletionTime = System.currentTimeMillis(); - return null; + return; } // TODO: Yogesh : try Fisher-Yates shuffle algorithm Iterator iter = regionSet.iterator(); @@ -111,7 +112,7 @@ public Object call() throws Exception { if (totalBytesEvicted >= bytesToEvictPerTask || !getHeapEvictor().mustEvict() || this.regionSet.size() == 0) { lastTaskCompletionTime = System.currentTimeMillis(); - return null; + return; } } catch (RegionDestroyedException rd) { region.cache.getCancelCriterion().checkCancelInProgress(rd); diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/lru/HeapEvictor.java b/geode-core/src/main/java/org/apache/geode/internal/cache/lru/HeapEvictor.java index b22bb0ec5fa5..707b4080a667 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/lru/HeapEvictor.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/lru/HeapEvictor.java @@ -89,12 +89,12 @@ public class HeapEvictor implements ResourceListener { protected final Cache cache; - private final ArrayList testTaskSetSizes = new ArrayList(); + private final ArrayList testTaskSetSizes = new ArrayList<>(); public volatile int testAbortAfterLoopCount = Integer.MAX_VALUE; private BlockingQueue poolQueue; - private AtomicBoolean isRunning = new AtomicBoolean(true); + private final AtomicBoolean isRunning = new AtomicBoolean(true); public HeapEvictor(Cache gemFireCache) { this.cache = gemFireCache; @@ -198,12 +198,19 @@ public Thread newThread(Runnable command) { /** * The task(i.e the region on which eviction needs to be performed) is assigned to the threadpool. */ - private void submitRegionEvictionTask(Callable task) { - evictorThreadPool.submit(task); + private void executeInThreadPool(Runnable task) { + try { + evictorThreadPool.execute(task); + } catch (RejectedExecutionException ex) { + // ignore rejection if evictor no longer running + if (isRunning()) { + throw ex; + } + } } public ThreadPoolExecutor getEvictorThreadPool() { - if (isRunning.get()) { + if (isRunning()) { return evictorThreadPool; } return null; @@ -215,7 +222,7 @@ public ThreadPoolExecutor getEvictorThreadPool() { * @return sum of scheduled and running tasks */ public int getRunningAndScheduledTasks() { - if (isRunning.get()) { + if (isRunning()) { return this.evictorThreadPool.getActiveCount() + this.evictorThreadPool.getQueue().size(); } return -1; @@ -243,35 +250,36 @@ private void createAndSubmitWeightedRegionEvictionTasks() { long bytesToEvictPerTask = (long) (getTotalBytesToEvict() * percentage); regionsForSingleTask.add(lr); if (mustEvict()) { - submitRegionEvictionTask( - new RegionEvictorTask(regionsForSingleTask, this, bytesToEvictPerTask)); + executeInThreadPool(new RegionEvictorTask(regionsForSingleTask, this, bytesToEvictPerTask)); } else { break; } } } - private Set> createRegionEvictionTasks() { - Set> evictorTaskSet = new HashSet>(); - int threadsAvailable = getEvictorThreadPool().getCorePoolSize(); + private Set createRegionEvictionTasks() { + ThreadPoolExecutor pool = getEvictorThreadPool(); + if (pool == null) { + return Collections.emptySet(); + } + int threadsAvailable = pool.getCorePoolSize(); long bytesToEvictPerTask = getTotalBytesToEvict() / threadsAvailable; List allRegionList = getAllRegionList(); + if (allRegionList.isEmpty()) { + return Collections.emptySet(); + } // This shuffling is not required when eviction triggered for the first time Collections.shuffle(allRegionList); int allRegionSetSize = allRegionList.size(); - if (allRegionList.isEmpty()) { - return evictorTaskSet; - } + Set evictorTaskSet = new HashSet<>(); if (allRegionSetSize <= threadsAvailable) { for (LocalRegion region : allRegionList) { List regionList = new ArrayList(1); regionList.add(region); - Callable task = new RegionEvictorTask(regionList, this, bytesToEvictPerTask); + RegionEvictorTask task = new RegionEvictorTask(regionList, this, bytesToEvictPerTask); evictorTaskSet.add(task); } - Iterator iterator = evictorTaskSet.iterator(); - while (iterator.hasNext()) { - RegionEvictorTask regionEvictorTask = (RegionEvictorTask) iterator.next(); + for (RegionEvictorTask regionEvictorTask : evictorTaskSet) { testTaskSetSizes.add(regionEvictorTask.getRegionList().size()); } return evictorTaskSet; @@ -295,9 +303,7 @@ private Set> createRegionEvictionTasks() { regionsForSingleTask.add(itr.next()); } - Iterator iterator = evictorTaskSet.iterator(); - while (iterator.hasNext()) { - RegionEvictorTask regionEvictorTask = (RegionEvictorTask) iterator.next(); + for (RegionEvictorTask regionEvictorTask : evictorTaskSet) { testTaskSetSizes.add(regionEvictorTask.getRegionList().size()); } return evictorTaskSet; @@ -327,7 +333,7 @@ public void onEvent(final MemoryEvent event) { // Do we care about eviction events and did the eviction event originate // in this VM ... - if (this.isRunning.get() && event.isLocal()) { + if (isRunning() && event.isLocal()) { if (event.getState().isEviction()) { final LogWriter logWriter = cache.getLogger(); @@ -378,8 +384,8 @@ public void run() { if (EVICT_HIGH_ENTRY_COUNT_BUCKETS_FIRST) { createAndSubmitWeightedRegionEvictionTasks(); } else { - for (Callable task : createRegionEvictionTasks()) { - submitRegionEvictionTask(task); + for (RegionEvictorTask task : createRegionEvictionTasks()) { + executeInThreadPool(task); } } RegionEvictorTask.setLastTaskCompletionTime(System.currentTimeMillis()); @@ -408,14 +414,14 @@ public void run() { if (HeapEvictor.this.mustEvict.get()) { // Submit this runnable back into the thread pool and execute // another pass at eviction. - HeapEvictor.this.evictorThreadPool.submit(this); + executeInThreadPool(this); } } catch (RegionDestroyedException e) { // A region destroyed exception might be thrown for Region.size() when a bucket // moves due to rebalancing. retry submitting the eviction task without // logging an error message. fixes bug 48162 if (HeapEvictor.this.mustEvict.get()) { - HeapEvictor.this.evictorThreadPool.submit(this); + executeInThreadPool(this); } } } @@ -423,7 +429,7 @@ public void run() { }; // Submit the first pass at eviction into the pool - this.evictorThreadPool.execute(evictionManagerTask); + executeInThreadPool(evictionManagerTask); } else { this.mustEvict.set(false); @@ -447,12 +453,17 @@ public boolean mustEvict() { } public void close() { - getEvictorThreadPool().shutdownNow(); - isRunning.set(false); + if (isRunning.compareAndSet(true, false)) { + evictorThreadPool.shutdownNow(); + } + } + + public boolean isRunning() { + return isRunning.get(); } - public ArrayList testOnlyGetSizeOfTasks() { - if (isRunning.get()) + public ArrayList testOnlyGetSizeOfTasks() { + if (isRunning()) return testTaskSetSizes; return null; } diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/EvictionTestBase.java b/geode-core/src/test/java/org/apache/geode/internal/cache/EvictionTestBase.java index a0f7af5849d3..e30636c48e9a 100755 --- a/geode-core/src/test/java/org/apache/geode/internal/cache/EvictionTestBase.java +++ b/geode-core/src/test/java/org/apache/geode/internal/cache/EvictionTestBase.java @@ -283,7 +283,7 @@ public void createCache() { } } - public ArrayList getTestTaskSetSizes() { + public ArrayList getTestTaskSetSizes() { return getEvictor().testOnlyGetSizeOfTasks(); } diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/GemFireCacheImplTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/GemFireCacheImplTest.java index 6838e748f112..a24fc5a0898e 100644 --- a/geode-core/src/test/java/org/apache/geode/internal/cache/GemFireCacheImplTest.java +++ b/geode-core/src/test/java/org/apache/geode/internal/cache/GemFireCacheImplTest.java @@ -28,6 +28,8 @@ import org.apache.geode.distributed.internal.InternalDistributedSystem; import org.apache.geode.internal.SystemTimer; +import org.apache.geode.internal.cache.lru.HeapEvictor; +import org.apache.geode.internal.cache.lru.OffHeapEvictor; import org.apache.geode.pdx.internal.TypeRegistry; import org.apache.geode.test.fake.Fakes; import org.apache.geode.test.junit.categories.UnitTest; @@ -61,6 +63,25 @@ public void checkPurgeCCPTimer() { } } + @Test + public void checkEvictorsClosed() { + InternalDistributedSystem ds = Fakes.distributedSystem(); + CacheConfig cc = new CacheConfig(); + TypeRegistry typeRegistry = mock(TypeRegistry.class); + SystemTimer ccpTimer = mock(SystemTimer.class); + HeapEvictor he = mock(HeapEvictor.class); + OffHeapEvictor ohe = mock(OffHeapEvictor.class); + GemFireCacheImpl gfc = GemFireCacheImpl.createWithAsyncEventListeners(ds, cc, typeRegistry); + try { + gfc.setHeapEvictor(he); + gfc.setOffHeapEvictor(ohe); + } finally { + gfc.close(); + } + verify(he, times(1)).close(); + verify(ohe, times(1)).close(); + } + @Test public void checkThatAsyncEventListenersUseAllThreadsInPool() { InternalDistributedSystem ds = Fakes.distributedSystem();