Skip to content

Commit

Permalink
GEODE-2811: close OffHeapEvictor when cache is closed
Browse files Browse the repository at this point in the history
Rejected executions are now ignored if shutting down.
execute now used instead of submit.
Close logic on HeapEvictor improved to prevent race conditions and NPEs.
  • Loading branch information
dschneider-pivotal committed Apr 27, 2017
1 parent c98bc8b commit bee0b7d
Show file tree
Hide file tree
Showing 5 changed files with 95 additions and 40 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2039,6 +2039,16 @@ public OffHeapEvictor getOffHeapEvictor() {
}
}

/** Used by test to inject an evictor */
void setOffHeapEvictor(OffHeapEvictor evictor) {
this.offHeapEvictor = evictor;
}

/** Used by test to inject an evictor */
void setHeapEvictor(HeapEvictor evictor) {
this.heapEvictor = evictor;
}

@Override
public PersistentMemberManager getPersistentMemberManager() {
return this.persistentMemberManager;
Expand Down Expand Up @@ -2313,10 +2323,8 @@ public void close(String reason, Throwable systemFailureCause, boolean keepAlive
if (cms != null) {
cms.close();
}
HeapEvictor he = this.heapEvictor;
if (he != null) {
he.close();
}
closeHeapEvictor();
closeOffHeapEvictor();
} catch (CancelException ignore) {
// make sure the disk stores get closed
closeDiskStores();
Expand Down Expand Up @@ -2385,6 +2393,20 @@ public void close(String reason, Throwable systemFailureCause, boolean keepAlive

}

private void closeOffHeapEvictor() {
OffHeapEvictor evictor = this.offHeapEvictor;
if (evictor != null) {
evictor.close();
}
}

private void closeHeapEvictor() {
HeapEvictor evictor = this.heapEvictor;
if (evictor != null) {
evictor.close();
}
}

@Override
public boolean isReconnecting() {
return this.system.isReconnecting();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@
* @since GemFire 6.0
*
*/
public class RegionEvictorTask implements Callable<Object> {
public class RegionEvictorTask implements Runnable {

private static final Logger logger = LogService.getLogger();

Expand Down Expand Up @@ -85,7 +85,8 @@ private HeapEvictor getHeapEvictor() {
return this.evictor;
}

public Object call() throws Exception {
@Override
public void run() {
getGemFireCache().getCachePerfStats().incEvictorJobsStarted();
long bytesEvicted = 0;
long totalBytesEvicted = 0;
Expand All @@ -96,7 +97,7 @@ public Object call() throws Exception {
synchronized (this.regionSet) {
if (this.regionSet.isEmpty()) {
lastTaskCompletionTime = System.currentTimeMillis();
return null;
return;
}
// TODO: Yogesh : try Fisher-Yates shuffle algorithm
Iterator<LocalRegion> iter = regionSet.iterator();
Expand All @@ -111,7 +112,7 @@ public Object call() throws Exception {
if (totalBytesEvicted >= bytesToEvictPerTask || !getHeapEvictor().mustEvict()
|| this.regionSet.size() == 0) {
lastTaskCompletionTime = System.currentTimeMillis();
return null;
return;
}
} catch (RegionDestroyedException rd) {
region.cache.getCancelCriterion().checkCancelInProgress(rd);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,12 +89,12 @@ public class HeapEvictor implements ResourceListener<MemoryEvent> {

protected final Cache cache;

private final ArrayList testTaskSetSizes = new ArrayList();
private final ArrayList<Integer> testTaskSetSizes = new ArrayList<>();
public volatile int testAbortAfterLoopCount = Integer.MAX_VALUE;

private BlockingQueue<Runnable> poolQueue;

private AtomicBoolean isRunning = new AtomicBoolean(true);
private final AtomicBoolean isRunning = new AtomicBoolean(true);

public HeapEvictor(Cache gemFireCache) {
this.cache = gemFireCache;
Expand Down Expand Up @@ -198,12 +198,19 @@ public Thread newThread(Runnable command) {
/**
* The task(i.e the region on which eviction needs to be performed) is assigned to the threadpool.
*/
private void submitRegionEvictionTask(Callable<Object> task) {
evictorThreadPool.submit(task);
private void executeInThreadPool(Runnable task) {
try {
evictorThreadPool.execute(task);
} catch (RejectedExecutionException ex) {
// ignore rejection if evictor no longer running
if (isRunning()) {
throw ex;
}
}
}

public ThreadPoolExecutor getEvictorThreadPool() {
if (isRunning.get()) {
if (isRunning()) {
return evictorThreadPool;
}
return null;
Expand All @@ -215,7 +222,7 @@ public ThreadPoolExecutor getEvictorThreadPool() {
* @return sum of scheduled and running tasks
*/
public int getRunningAndScheduledTasks() {
if (isRunning.get()) {
if (isRunning()) {
return this.evictorThreadPool.getActiveCount() + this.evictorThreadPool.getQueue().size();
}
return -1;
Expand Down Expand Up @@ -243,35 +250,36 @@ private void createAndSubmitWeightedRegionEvictionTasks() {
long bytesToEvictPerTask = (long) (getTotalBytesToEvict() * percentage);
regionsForSingleTask.add(lr);
if (mustEvict()) {
submitRegionEvictionTask(
new RegionEvictorTask(regionsForSingleTask, this, bytesToEvictPerTask));
executeInThreadPool(new RegionEvictorTask(regionsForSingleTask, this, bytesToEvictPerTask));
} else {
break;
}
}
}

private Set<Callable<Object>> createRegionEvictionTasks() {
Set<Callable<Object>> evictorTaskSet = new HashSet<Callable<Object>>();
int threadsAvailable = getEvictorThreadPool().getCorePoolSize();
private Set<RegionEvictorTask> createRegionEvictionTasks() {
ThreadPoolExecutor pool = getEvictorThreadPool();
if (pool == null) {
return Collections.emptySet();
}
int threadsAvailable = pool.getCorePoolSize();
long bytesToEvictPerTask = getTotalBytesToEvict() / threadsAvailable;
List<LocalRegion> allRegionList = getAllRegionList();
if (allRegionList.isEmpty()) {
return Collections.emptySet();
}
// This shuffling is not required when eviction triggered for the first time
Collections.shuffle(allRegionList);
int allRegionSetSize = allRegionList.size();
if (allRegionList.isEmpty()) {
return evictorTaskSet;
}
Set<RegionEvictorTask> evictorTaskSet = new HashSet<>();
if (allRegionSetSize <= threadsAvailable) {
for (LocalRegion region : allRegionList) {
List<LocalRegion> regionList = new ArrayList<LocalRegion>(1);
regionList.add(region);
Callable<Object> task = new RegionEvictorTask(regionList, this, bytesToEvictPerTask);
RegionEvictorTask task = new RegionEvictorTask(regionList, this, bytesToEvictPerTask);
evictorTaskSet.add(task);
}
Iterator iterator = evictorTaskSet.iterator();
while (iterator.hasNext()) {
RegionEvictorTask regionEvictorTask = (RegionEvictorTask) iterator.next();
for (RegionEvictorTask regionEvictorTask : evictorTaskSet) {
testTaskSetSizes.add(regionEvictorTask.getRegionList().size());
}
return evictorTaskSet;
Expand All @@ -295,9 +303,7 @@ private Set<Callable<Object>> createRegionEvictionTasks() {
regionsForSingleTask.add(itr.next());
}

Iterator iterator = evictorTaskSet.iterator();
while (iterator.hasNext()) {
RegionEvictorTask regionEvictorTask = (RegionEvictorTask) iterator.next();
for (RegionEvictorTask regionEvictorTask : evictorTaskSet) {
testTaskSetSizes.add(regionEvictorTask.getRegionList().size());
}
return evictorTaskSet;
Expand Down Expand Up @@ -327,7 +333,7 @@ public void onEvent(final MemoryEvent event) {

// Do we care about eviction events and did the eviction event originate
// in this VM ...
if (this.isRunning.get() && event.isLocal()) {
if (isRunning() && event.isLocal()) {
if (event.getState().isEviction()) {
final LogWriter logWriter = cache.getLogger();

Expand Down Expand Up @@ -378,8 +384,8 @@ public void run() {
if (EVICT_HIGH_ENTRY_COUNT_BUCKETS_FIRST) {
createAndSubmitWeightedRegionEvictionTasks();
} else {
for (Callable<Object> task : createRegionEvictionTasks()) {
submitRegionEvictionTask(task);
for (RegionEvictorTask task : createRegionEvictionTasks()) {
executeInThreadPool(task);
}
}
RegionEvictorTask.setLastTaskCompletionTime(System.currentTimeMillis());
Expand Down Expand Up @@ -408,22 +414,22 @@ public void run() {
if (HeapEvictor.this.mustEvict.get()) {
// Submit this runnable back into the thread pool and execute
// another pass at eviction.
HeapEvictor.this.evictorThreadPool.submit(this);
executeInThreadPool(this);
}
} catch (RegionDestroyedException e) {
// A region destroyed exception might be thrown for Region.size() when a bucket
// moves due to rebalancing. retry submitting the eviction task without
// logging an error message. fixes bug 48162
if (HeapEvictor.this.mustEvict.get()) {
HeapEvictor.this.evictorThreadPool.submit(this);
executeInThreadPool(this);
}
}
}
}
};

// Submit the first pass at eviction into the pool
this.evictorThreadPool.execute(evictionManagerTask);
executeInThreadPool(evictionManagerTask);

} else {
this.mustEvict.set(false);
Expand All @@ -447,12 +453,17 @@ public boolean mustEvict() {
}

public void close() {
getEvictorThreadPool().shutdownNow();
isRunning.set(false);
if (isRunning.compareAndSet(true, false)) {
evictorThreadPool.shutdownNow();
}
}

public boolean isRunning() {
return isRunning.get();
}

public ArrayList testOnlyGetSizeOfTasks() {
if (isRunning.get())
public ArrayList<Integer> testOnlyGetSizeOfTasks() {
if (isRunning())
return testTaskSetSizes;
return null;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -283,7 +283,7 @@ public void createCache() {
}
}

public ArrayList getTestTaskSetSizes() {
public ArrayList<Integer> getTestTaskSetSizes() {
return getEvictor().testOnlyGetSizeOfTasks();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@

import org.apache.geode.distributed.internal.InternalDistributedSystem;
import org.apache.geode.internal.SystemTimer;
import org.apache.geode.internal.cache.lru.HeapEvictor;
import org.apache.geode.internal.cache.lru.OffHeapEvictor;
import org.apache.geode.pdx.internal.TypeRegistry;
import org.apache.geode.test.fake.Fakes;
import org.apache.geode.test.junit.categories.UnitTest;
Expand Down Expand Up @@ -61,6 +63,25 @@ public void checkPurgeCCPTimer() {
}
}

@Test
public void checkEvictorsClosed() {
InternalDistributedSystem ds = Fakes.distributedSystem();
CacheConfig cc = new CacheConfig();
TypeRegistry typeRegistry = mock(TypeRegistry.class);
SystemTimer ccpTimer = mock(SystemTimer.class);
HeapEvictor he = mock(HeapEvictor.class);
OffHeapEvictor ohe = mock(OffHeapEvictor.class);
GemFireCacheImpl gfc = GemFireCacheImpl.createWithAsyncEventListeners(ds, cc, typeRegistry);
try {
gfc.setHeapEvictor(he);
gfc.setOffHeapEvictor(ohe);
} finally {
gfc.close();
}
verify(he, times(1)).close();
verify(ohe, times(1)).close();
}

@Test
public void checkThatAsyncEventListenersUseAllThreadsInPool() {
InternalDistributedSystem ds = Fakes.distributedSystem();
Expand Down

0 comments on commit bee0b7d

Please sign in to comment.