Skip to content

Commit

Permalink
GEODE-2485: fix leak in tx suspend/resume
Browse files Browse the repository at this point in the history
The CCPTimer is now purged for every 1000 cancels done.
So we will now no longer have more than 1000
cancelled tasks eating up memory.
Now uses internalSuspend in two places the previously used suspend.
Since internalSuspend does not schedule a timer task
these places will have no more issues with leaking memory
and these code paths will perform better.

renamed resume(TxStateProxy) to internalResume for clarity.

internalResume no longer checks for a TimerTask to cancel
since internalSuspend does not add one.
Instead the only code that checks for a TimerTask is "resume".
  • Loading branch information
dschneider-pivotal committed Apr 10, 2017
1 parent 8183b1f commit 344f93d
Show file tree
Hide file tree
Showing 21 changed files with 188 additions and 123 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -599,7 +599,7 @@ public Object executeUsingContext(ExecutionContext context) throws FunctionDomai
updateStatistics(endTime - startTime);
pdxClassToFieldsMap.remove();
pdxClassToMethodsMap.remove();
((TXManagerImpl) this.cache.getCacheTransactionManager()).resume(tx);
((TXManagerImpl) this.cache.getCacheTransactionManager()).internalResume(tx);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -407,7 +407,7 @@ public Index createIndex(String indexName, IndexType indexType, String origIndex
DefaultQuery.setPdxReadSerialized(this.region.getCache(), oldReadSerialized);

if (tx != null) {
((TXManagerImpl) this.region.getCache().getCacheTransactionManager()).resume(tx);
((TXManagerImpl) this.region.getCache().getCacheTransactionManager()).internalResume(tx);
}
}
}
Expand Down Expand Up @@ -1138,7 +1138,7 @@ private void processAction(RegionEntry entry, int action, int opCode) throws Que
} finally {
DefaultQuery.setPdxReadSerialized(this.region.getCache(), false);
if (tx != null) {
((TXManagerImpl) this.region.getCache().getCacheTransactionManager()).resume(tx);
((TXManagerImpl) this.region.getCache().getCacheTransactionManager()).internalResume(tx);
}
getCachePerfStats().endIndexUpdate(startPA);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@
* TODO -- with Java 1.5, this will be a template type so that the swarm's class can be
* specified.
*/
public final class SystemTimer {
public class SystemTimer {
private static final Logger logger = LogService.getLogger();

final static private boolean isIBM =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3949,7 +3949,7 @@ private void setMemoryThresholdReachedCounterTrue(final DistributedMember idm) {
protected VersionTag fetchRemoteVersionTag(Object key) {
VersionTag tag = null;
assert this.dataPolicy != DataPolicy.REPLICATE;
TransactionId txId = cache.getCacheTransactionManager().suspend();
final TXStateProxy tx = cache.getTXMgr().internalSuspend();
try {
boolean retry = true;
InternalDistributedMember member = getRandomReplicate();
Expand All @@ -3971,8 +3971,8 @@ protected VersionTag fetchRemoteVersionTag(Object key) {
}
}
} finally {
if (txId != null) {
cache.getCacheTransactionManager().resume(txId);
if (tx != null) {
cache.getTXMgr().internalResume(tx);
}
}
return tag;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2437,7 +2437,7 @@ public void close(String reason, Throwable systemFailureCause, boolean keepalive
}
((DynamicRegionFactoryImpl) DynamicRegionFactory.get()).close();
if (this.txMgr != null) {
this.txMgr.resume(tx);
this.txMgr.internalResume(tx);
}
TXCommitMessage.getTracker().clearForCacheClose();
}
Expand Down Expand Up @@ -3936,6 +3936,32 @@ public SystemTimer getCCPTimer() {
}
}

/**
* For use by unit tests to inject a mocked ccpTimer
*/
void setCCPTimer(SystemTimer ccpTimer) {
this.ccpTimer = ccpTimer;
}

static final int PURGE_INTERVAL = 1000;
private int cancelCount = 0;

/**
* Does a periodic purge of the CCPTimer to prevent a large number of cancelled tasks from
* building up in it. See GEODE-2485.
*/
public void purgeCCPTimer() {
synchronized (ccpTimerMutex) {
if (ccpTimer != null) {
cancelCount++;
if (cancelCount == PURGE_INTERVAL) {
cancelCount = 0;
ccpTimer.timerPurge();
}
}
}
}

/**
* @see LocalRegion
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6556,7 +6556,7 @@ void basicDestroyRegion(RegionEventImpl event, boolean cacheWrite, boolean lock,
}
}
} finally {
this.cache.getTXMgr().resume(tx);
this.cache.getTXMgr().internalResume(tx);
}
}

Expand Down Expand Up @@ -7180,7 +7180,7 @@ void basicInvalidateRegion(RegionEventImpl event) {
dispatchListenerEvent(EnumListenerEvent.AFTER_REGION_INVALIDATE, event);
}
} finally {
this.cache.getTXMgr().resume(tx);
this.cache.getTXMgr().internalResume(tx);
}
}

Expand Down Expand Up @@ -9572,7 +9572,7 @@ Map basicGetAll(Collection keys, Object callback) {
key);
}
} finally {
this.cache.getTXMgr().resume(tx);
this.cache.getTXMgr().internalResume(tx);
}
getCachePerfStats().endPut(startPut, event.isOriginRemote());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ public Entry getEntry(KeyInfo keyInfo, LocalRegion localRegion, boolean allowTom
PartitionedRegion pr = (PartitionedRegion) localRegion;
return pr.nonTXGetEntry(keyInfo, false, allowTombstones);
} finally {
localRegion.cache.getTXMgr().resume(tx);
localRegion.cache.getTXMgr().internalResume(tx);
}
}

Expand All @@ -72,7 +72,7 @@ public Object findObject(KeyInfo key, LocalRegion r, boolean isCreate, boolean g
return r.findObjectInSystem(key, isCreate, tx, generateCallbacks, value, disableCopyOnRead,
preferCD, requestingClient, clientEvent, returnTombstones);
} finally {
r.cache.getTXMgr().resume(tx);
r.cache.getTXMgr().internalResume(tx);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -675,26 +675,26 @@ public final TXStateProxy internalSuspend() {
* @param tx the transaction to activate.
* @throws IllegalStateException if this thread already has an active transaction
*/
public final void resume(TXStateProxy tx) {
public void internalResume(TXStateProxy tx) {
if (tx != null) {
TransactionId tid = getTransactionId();
if (tid != null) {
throw new java.lang.IllegalStateException(
LocalizedStrings.TXManagerImpl_TRANSACTION_0_ALREADY_IN_PROGRESS
.toLocalizedString(tid));
}
if (tx instanceof TXState) {
throw new java.lang.IllegalStateException("Found instance of TXState: " + tx);
}
setTXState(tx);
tx.resume();
SystemTimerTask task = this.expiryTasks.remove(tx.getTransactionId());
if (task != null) {
task.cancel();
}
}
}

/**
* @deprecated use internalResume instead
*/
public final void resume(TXStateProxy tx) {
internalResume(tx);
}

public final boolean isClosed() {
return this.closed;
}
Expand Down Expand Up @@ -1265,7 +1265,7 @@ public void resume(TransactionId transactionId) {
throw new IllegalStateException(
LocalizedStrings.TXManagerImpl_UNKNOWN_TRANSACTION_OR_RESUMED.toLocalizedString());
}
resume(txProxy);
resumeProxy(txProxy);
}

public boolean isSuspended(TransactionId transactionId) {
Expand All @@ -1278,12 +1278,24 @@ public boolean tryResume(TransactionId transactionId) {
}
TXStateProxy txProxy = this.suspendedTXs.remove(transactionId);
if (txProxy != null) {
resume(txProxy);
resumeProxy(txProxy);
return true;
}
return false;
}

private void resumeProxy(TXStateProxy txProxy) {
assert txProxy != null;
internalResume(txProxy);
SystemTimerTask task = this.expiryTasks.remove(txProxy.getTransactionId());
if (task != null) {
if (task.cancel()) {
GemFireCacheImpl cache = (GemFireCacheImpl) this.cache;
cache.purgeCCPTimer();
}
}
}

/**
* this map keeps track of all the threads that are waiting in
* {@link #tryResume(TransactionId, long, TimeUnit)} for a particular transactionId
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1679,11 +1679,11 @@ public Object getEntryForIterator(KeyInfo curr, LocalRegion currRgn, boolean rem
if (!pr.getBucketPrimary(curr.getBucketId()).equals(pr.cache.getMyId())) {
// to fix bug 47893 suspend the tx before calling nonTXGetEntry
final TXManagerImpl txmgr = pr.getGemFireCache().getTXMgr();
TransactionId tid = txmgr.suspend();
final TXStateProxy tx = txmgr.internalSuspend();
try {
return pr.nonTXGetEntry(curr, false, allowTombstones);
} finally {
txmgr.resume(tid);
txmgr.internalResume(tx);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,8 +70,8 @@ public interface TXStateProxy extends TXStateInterface {
public void suspend();

/**
* Called by {@link TXManagerImpl#resume(TXStateProxy)} to perform additional tasks required to
* resume a transaction
* Called by {@link TXManagerImpl#internalResume(TXStateProxy)} to perform additional tasks
* required to resume a transaction
*/
public void resume();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -677,7 +677,7 @@ public int entryCount(LocalRegion localRegion) {
return getRealDeal(null, localRegion).entryCount(localRegion);
} finally {
if (resetTXState) {
getTxMgr().resume(txp);
getTxMgr().internalResume(txp);
} else if (txUnlocked) {
getLock().lock();
}
Expand Down Expand Up @@ -742,7 +742,7 @@ public Object getEntryForIterator(KeyInfo key, LocalRegion currRgn, boolean reme
allowTombstones);
} finally {
if (resetTxState) {
getTxMgr().resume(txp);
getTxMgr().internalResume(txp);
}
}

Expand Down Expand Up @@ -770,7 +770,7 @@ public Object getKeyForIterator(KeyInfo keyInfo, LocalRegion currRgn, boolean re
allowTombstones);
} finally {
if (resetTxState) {
getTxMgr().resume(txp);
getTxMgr().internalResume(txp);
}
}
}
Expand Down Expand Up @@ -940,7 +940,7 @@ public Set getBucketKeys(LocalRegion localRegion, int bucketId, boolean allowTom
return getRealDeal(null, localRegion).getBucketKeys(localRegion, bucketId, false);
} finally {
if (resetTxState) {
getTxMgr().resume(txp);
getTxMgr().internalResume(txp);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1970,7 +1970,9 @@ public void run2() {
*/
Object task = _durableExpirationTask.getAndSet(null);
if (task != null) {
((SystemTimerTask) task).cancel();
if (((SystemTimerTask) task).cancel()) {
_cache.purgeCCPTimer();
}
}
}

Expand All @@ -1988,7 +1990,9 @@ protected void cancelDurableExpirationTask(boolean logMessage) {
LocalizedStrings.CacheClientProxy_0_CANCELLING_EXPIRATION_TASK_SINCE_THE_CLIENT_HAS_RECONNECTED,
this));
}
task.cancel();
if (task.cancel()) {
_cache.purgeCCPTimer();
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -642,7 +642,7 @@ private TXStateProxy suspendTX() {
private void resumeTX(TXStateProxy state) {
if (state != null) {
TXManagerImpl txManager = state.getTxMgr();
txManager.resume(state);
txManager.internalResume(state);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,7 @@ public void afterRegionDestroy(RegionEvent<String, String> event) {
final TXManagerImpl txMgrImpl = (TXManagerImpl) this.txMgr;
TXStateProxy tx = txMgrImpl.internalSuspend();
exprReg.put("key0", checkVal);
txMgrImpl.resume(tx);
txMgrImpl.internalResume(tx);
try {
this.txMgr.commit();
fail("Expected CommitConflictException!");
Expand Down
Loading

0 comments on commit 344f93d

Please sign in to comment.