Skip to content

Commit

Permalink
GEODE-1691: protect EntryEvent off-heap readers
Browse files Browse the repository at this point in the history
The methods on EntryEvent that can read off-heap
data from the event are now protected from a concurrent
release of the off-heap data.
  • Loading branch information
dschneider-pivotal committed Aug 17, 2016
1 parent e08c1f5 commit b2b5fca
Show file tree
Hide file tree
Showing 2 changed files with 409 additions and 62 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@
import org.apache.logging.log4j.Logger;

import java.io.*;
import java.util.function.Function;

import static com.gemstone.gemfire.internal.offheap.annotations.OffHeapIdentifier.ENTRY_EVENT_NEW_VALUE;
import static com.gemstone.gemfire.internal.offheap.annotations.OffHeapIdentifier.ENTRY_EVENT_OLD_VALUE;
Expand Down Expand Up @@ -740,12 +741,13 @@ public final Object getOldValue() {
boolean doCopyOnRead = getRegion().isCopyOnRead();
if (ov != null) {
if (ov instanceof CachedDeserializable) {
CachedDeserializable cd = (CachedDeserializable)ov;
if (doCopyOnRead) {
return cd.getDeserializedWritableCopy(this.region, this.re);
} else {
return cd.getDeserializedValue(this.region, this.re);
}
return callWithOffHeapLock((CachedDeserializable)ov, oldValueCD -> {
if (doCopyOnRead) {
return oldValueCD.getDeserializedWritableCopy(this.region, this.re);
} else {
return oldValueCD.getDeserializedValue(this.region, this.re);
}
});
}
else {
if (doCopyOnRead) {
Expand Down Expand Up @@ -955,15 +957,16 @@ public final Object getNewValue() {
return AbstractRegion.handleNotAvailable(nv);
}
if (nv instanceof CachedDeserializable) {
CachedDeserializable cd = (CachedDeserializable)nv;
Object v = null;
if (doCopyOnRead) {
v = cd.getDeserializedWritableCopy(this.region, this.re);
} else {
v = cd.getDeserializedValue(this.region, this.re);
}
assert !(v instanceof CachedDeserializable) : "for key "+this.getKey()+" found nested CachedDeserializable";
return v;
return callWithOffHeapLock((CachedDeserializable)nv, newValueCD -> {
Object v = null;
if (doCopyOnRead) {
v = newValueCD.getDeserializedWritableCopy(this.region, this.re);
} else {
v = newValueCD.getDeserializedValue(this.region, this.re);
}
assert !(v instanceof CachedDeserializable) : "for key "+this.getKey()+" found nested CachedDeserializable";
return v;
});
}
else {
if (doCopyOnRead) {
Expand All @@ -975,6 +978,25 @@ public final Object getNewValue() {
}
return null;
}

/**
* Invoke the given function with a lock if the given value is offheap.
* @return the value returned from invoking the function
*/
private <T,R> R callWithOffHeapLock(T value, Function<T, R> function) {
if (isOffHeapReference(value)) {
synchronized (this.offHeapLock) {
if (!this.offHeapOk) {
throw new IllegalStateException("Attempt to access off heap value after the EntryEvent was released.");
}
return function.apply(value);
}
} else {
return function.apply(value);
}
}

private final Object offHeapLock = new Object();

public final String getNewValueStringForm() {
return StringUtils.forceToString(basicGetNewValue());
Expand Down Expand Up @@ -2001,13 +2023,17 @@ public String toString() {
buf.append(this.getKey());
buf.append(";oldValue=");
try {
ArrayUtils.objectStringNonRecursive(basicGetOldValue(), buf);
synchronized (this.offHeapLock) {
ArrayUtils.objectStringNonRecursive(basicGetOldValue(), buf);
}
} catch (IllegalStateException ex) {
buf.append("OFFHEAP_VALUE_FREED");
}
buf.append(";newValue=");
try {
ArrayUtils.objectStringNonRecursive(basicGetNewValue(), buf);
synchronized (this.offHeapLock) {
ArrayUtils.objectStringNonRecursive(basicGetNewValue(), buf);
}
} catch (IllegalStateException ex) {
buf.append("OFFHEAP_VALUE_FREED");
}
Expand Down Expand Up @@ -2542,11 +2568,14 @@ public static final class SerializedCacheValueImpl
this.serializedValue = serializedBytes;
}

@Override
public byte[] getSerializedValue() {
if(this.serializedValue != null){
return this.serializedValue;
}
return getCd().getSerializedValue();
return callWithOffHeapLock(cd -> {
return cd.getSerializedValue();
});
}

private CachedDeserializable getCd() {
Expand All @@ -2555,19 +2584,37 @@ private CachedDeserializable getCd() {
}
return this.cd;
}
/**
* The only methods that need to use this method are those on the external SerializedCacheValue interface
* and any other method that a customer could call that may access the off-heap values.
* For example if toString was implemented on this class to access the value then it would
* need to use this method.
*/
private <R> R callWithOffHeapLock(Function<CachedDeserializable, R> function) {
if (this.event != null) {
// this call does not use getCd() to access this.cd
// because the check for offHeapOk is done by event.callWithOffHeapLock
return this.event.callWithOffHeapLock(this.cd, function);
} else {
return function.apply(getCd());
}
}

@Override
public Object getDeserializedValue() {
return getDeserializedValue(this.r, this.re);
}
public Object getDeserializedForReading() {
return OffHeapHelper.getHeapForm(getCd().getDeserializedForReading());
return getCd().getDeserializedForReading();
}
public Object getDeserializedWritableCopy(Region rgn, RegionEntry entry) {
return OffHeapHelper.getHeapForm(getCd().getDeserializedWritableCopy(rgn, entry));
return getCd().getDeserializedWritableCopy(rgn, entry);
}

public Object getDeserializedValue(Region rgn, RegionEntry reentry) {
return OffHeapHelper.getHeapForm(getCd().getDeserializedValue(rgn, reentry));
return callWithOffHeapLock(cd -> {
return cd.getDeserializedValue(rgn, reentry);
});
}
public Object getValue() {
if(this.serializedValue != null){
Expand Down Expand Up @@ -2694,31 +2741,38 @@ public boolean isSingleHopPutOp() {
* True if it is ok to use old/new values that are stored off heap.
* False if an exception should be thrown if an attempt is made to access old/new offheap values.
*/
private transient boolean offHeapOk = true;
transient boolean offHeapOk = true;

@Override
@Released({ENTRY_EVENT_NEW_VALUE, ENTRY_EVENT_OLD_VALUE})
public void release() {
// noop if already freed or values can not be off-heap
if (!this.offHeapOk) return;
// Note that this method does not set the old/new values to null but
// leaves them set to the off-heap value so that future calls to getOld/NewValue
// will fail with an exception.
Object ov = basicGetOldValue();
Object nv = basicGetNewValue();
this.offHeapOk = false;

if (ov instanceof StoredObject) {
//this.region.getCache().getLogger().info("DEBUG freeing ref to old value on " + System.identityHashCode(ov));
if (ReferenceCountHelper.trackReferenceCounts()) {
ReferenceCountHelper.setReferenceCountOwner(new OldValueOwner());
((StoredObject) ov).release();
ReferenceCountHelper.setReferenceCountOwner(null);
} else {
((StoredObject) ov).release();
synchronized (this.offHeapLock) {
// Note that this method does not set the old/new values to null but
// leaves them set to the off-heap value so that future calls to getOld/NewValue
// will fail with an exception.
testHookReleaseInProgress();
Object ov = basicGetOldValue();
Object nv = basicGetNewValue();
this.offHeapOk = false;

if (ov instanceof StoredObject) {
//this.region.getCache().getLogger().info("DEBUG freeing ref to old value on " + System.identityHashCode(ov));
if (ReferenceCountHelper.trackReferenceCounts()) {
ReferenceCountHelper.setReferenceCountOwner(new OldValueOwner());
((StoredObject) ov).release();
ReferenceCountHelper.setReferenceCountOwner(null);
} else {
((StoredObject) ov).release();
}
}
OffHeapHelper.releaseAndTrackOwner(nv, this);
}
OffHeapHelper.releaseAndTrackOwner(nv, this);
}

void testHookReleaseInProgress() {
// unit test can mock or override this method
}

/**
Expand All @@ -2729,7 +2783,9 @@ public void disallowOffHeapValues() {
if (isOffHeapReference(this.newValue) || isOffHeapReference(this.oldValue)) {
throw new IllegalStateException("This event does not support off-heap values");
}
this.offHeapOk = false;
synchronized (this.offHeapLock) {
this.offHeapOk = false;
}
}

/**
Expand All @@ -2738,26 +2794,28 @@ public void disallowOffHeapValues() {
*/
@Released({ENTRY_EVENT_NEW_VALUE, ENTRY_EVENT_OLD_VALUE})
public void copyOffHeapToHeap() {
Object ov = basicGetOldValue();
if (isOffHeapReference(ov)) {
if (ReferenceCountHelper.trackReferenceCounts()) {
ReferenceCountHelper.setReferenceCountOwner(new OldValueOwner());
this.oldValue = OffHeapHelper.copyAndReleaseIfNeeded(ov);
synchronized (this.offHeapLock) {
Object ov = basicGetOldValue();
if (isOffHeapReference(ov)) {
if (ReferenceCountHelper.trackReferenceCounts()) {
ReferenceCountHelper.setReferenceCountOwner(new OldValueOwner());
this.oldValue = OffHeapHelper.copyAndReleaseIfNeeded(ov);
ReferenceCountHelper.setReferenceCountOwner(null);
} else {
this.oldValue = OffHeapHelper.copyAndReleaseIfNeeded(ov);
}
}
Object nv = basicGetNewValue();
if (isOffHeapReference(nv)) {
ReferenceCountHelper.setReferenceCountOwner(this);
this.newValue = OffHeapHelper.copyAndReleaseIfNeeded(nv);
ReferenceCountHelper.setReferenceCountOwner(null);
} else {
this.oldValue = OffHeapHelper.copyAndReleaseIfNeeded(ov);
}
if (isOffHeapReference(this.newValue) || isOffHeapReference(this.oldValue)) {
throw new IllegalStateException("event's old/new value still off-heap after calling copyOffHeapToHeap");
}
this.offHeapOk = false;
}
Object nv = basicGetNewValue();
if (isOffHeapReference(nv)) {
ReferenceCountHelper.setReferenceCountOwner(this);
this.newValue = OffHeapHelper.copyAndReleaseIfNeeded(nv);
ReferenceCountHelper.setReferenceCountOwner(null);
}
if (isOffHeapReference(this.newValue) || isOffHeapReference(this.oldValue)) {
throw new IllegalStateException("event's old/new value still off-heap after calling copyOffHeapToHeap");
}
this.offHeapOk = false;
}

public boolean isOldValueOffHeap() {
Expand Down
Loading

0 comments on commit b2b5fca

Please sign in to comment.