Skip to content

Commit

Permalink
GEODE-3967: fix the offheap memory leak in serial gateway sender's un…
Browse files Browse the repository at this point in the history
…processedEvents.

When ConcurrentCacheModificationException happened, GatewaySenderEventImpl
should save the status and notify gatewaysender if it hold primary queue,
because other member might have put the event into the secondary queue

Let event with CME only enqueue to primary, but not to dispatch. The old
logic does not allow CME event to  enqueue. This is wrong, because an event
without CME might have been added into the secondary queue.

We should not dispatch the CME event, otherwise it will cause remote site
data inconsistency since these CME events are misordered.

So we should enqueue it, but not to dispatch.

Also add rollingUpgradeTests
  • Loading branch information
gesterzhou committed Jan 4, 2019
1 parent a075b0e commit 7f2950c
Show file tree
Hide file tree
Showing 11 changed files with 652 additions and 36 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -471,7 +471,7 @@ public void run() {
// ARM.updateEntry will be called one more time, so there will be 2 conflicted events
assertThat(CCRegion.getCachePerfStats().getConflatedEventsCount())
.describedAs("conflated event count")
.isEqualTo(1);
.isEqualTo(2);
}
});
} finally {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1988,9 +1988,11 @@ org/apache/geode/internal/cache/wan/GatewaySenderEventCallbackArgument,2
fromData,63
toData,87

org/apache/geode/internal/cache/wan/GatewaySenderEventImpl,2
fromData,183
toData,133
org/apache/geode/internal/cache/wan/GatewaySenderEventImpl,4
fromData,30
fromDataPre_GEODE_1_9_0_0,183
toData,17
toDataPre_GEODE_1_9_0_0,134

org/apache/geode/internal/cache/wan/GatewaySenderQueueEntrySynchronizationOperation$GatewaySenderQueueEntrySynchronizationEntry,2
fromData,20
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,25 @@ private static boolean shouldDoRemoteCreate(LocalRegion rgn, EntryEventImpl ev)
}
}

private static boolean checkIfToUpdateAfterCreateFailed(LocalRegion rgn, EntryEventImpl ev) {
// Try to create is failed due to found the entry exist, double check if should update
boolean doUpdate = true;
if (ev.oldValueIsDestroyedToken()) {
if (rgn.getVersionVector() != null && ev.getVersionTag() != null) {
rgn.getVersionVector().recordVersion(
(InternalDistributedMember) ev.getDistributedMember(), ev.getVersionTag());
}
doUpdate = false;
}
if (ev.isConcurrencyConflict()) {
if (logger.isDebugEnabled()) {
logger.debug("basicUpdate failed with CME, not to retry:" + ev);
}
doUpdate = false;
}
return doUpdate;
}

/**
* Does a remote update (could be create or put). This code was factored into a static for
* QueuedOperation.
Expand Down Expand Up @@ -134,13 +153,7 @@ public static boolean doPutOrCreate(LocalRegion rgn, EntryEventImpl ev, long las
updated = true;
} else { // already exists. If it was blocked by the DESTROYED token, then
// do no update.
if (ev.oldValueIsDestroyedToken()) {
if (rgn.getVersionVector() != null && ev.getVersionTag() != null) {
rgn.getVersionVector().recordVersion(
(InternalDistributedMember) ev.getDistributedMember(), ev.getVersionTag());
}
doUpdate = false;
}
doUpdate = checkIfToUpdateAfterCreateFailed(rgn, ev);
}
} finally {
if (isBucket) {
Expand Down Expand Up @@ -174,7 +187,7 @@ public static boolean doPutOrCreate(LocalRegion rgn, EntryEventImpl ev, long las
|| (rgn.getDataPolicy().withReplication() && rgn.getConcurrencyChecksEnabled())) {
overwriteDestroyed = true;
ev.makeCreate();
rgn.basicUpdate(ev, true /* ifNew */, false/* ifOld */, lastMod,
rgn.basicUpdate(ev, false /* ifNew */, false/* ifOld */, lastMod,
overwriteDestroyed);
rgn.getCachePerfStats().endPut(startPut, ev.isOriginRemote());
updated = true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5708,6 +5708,8 @@ public boolean virtualPut(final EntryEventImpl event, final boolean ifNew, final
logger.debug("caught concurrent modification attempt when applying {}", event);
}
notifyBridgeClients(event);
notifyGatewaySender(event.getOperation().isUpdate() ? EnumListenerEvent.AFTER_UPDATE
: EnumListenerEvent.AFTER_CREATE, event);
return false;
}

Expand Down Expand Up @@ -6204,8 +6206,7 @@ public boolean notifiesMultipleSerialGateways() {
}

protected void notifyGatewaySender(EnumListenerEvent operation, EntryEventImpl event) {
if (isPdxTypesRegion() || event.isConcurrencyConflict()) {
// isConcurrencyConflict is usually a concurrent cache modification problem
if (isPdxTypesRegion()) {
return;
}

Expand Down Expand Up @@ -6600,6 +6601,7 @@ private boolean mapDestroy(final EntryEventImpl event, final boolean cacheWrite,
// Notify clients only if its NOT a gateway event.
if (event.getVersionTag() != null && !event.getVersionTag().isGatewayTag()) {
notifyBridgeClients(event);
notifyGatewaySender(EnumListenerEvent.AFTER_DESTROY, event);
}
return true; // event was elided

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -559,6 +559,20 @@ protected void processQueue() {
}
}
}

// filter out the events with CME
Iterator<GatewaySenderEventImpl> cmeItr = filteredList.iterator();
while (cmeItr.hasNext()) {
GatewaySenderEventImpl event = cmeItr.next();
if (event.isConcurrencyConflict()) {
cmeItr.remove();
logger.debug("The CME event: {} is removed from Gateway Sender queue: {}", event,
sender);
statistics.incEventsNotQueued();
continue;
}
}

/*
* if (filteredList.isEmpty()) { eventQueueRemove(events.size()); continue; }
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.apache.geode.internal.InternalDataSerializer;
import org.apache.geode.internal.Version;
import org.apache.geode.internal.VersionedDataInputStream;
import org.apache.geode.internal.VersionedDataSerializable;
import org.apache.geode.internal.cache.CachedDeserializable;
import org.apache.geode.internal.cache.CachedDeserializableFactory;
import org.apache.geode.internal.cache.Conflatable;
Expand All @@ -42,6 +43,7 @@
import org.apache.geode.internal.cache.LocalRegion;
import org.apache.geode.internal.cache.Token;
import org.apache.geode.internal.cache.WrappedCallbackArgument;
import org.apache.geode.internal.cache.tier.sockets.Message;
import org.apache.geode.internal.offheap.OffHeapHelper;
import org.apache.geode.internal.offheap.ReferenceCountHelper;
import org.apache.geode.internal.offheap.Releasable;
Expand All @@ -61,12 +63,15 @@
*
*/
public class GatewaySenderEventImpl
implements AsyncEvent, DataSerializableFixedID, Conflatable, Sizeable, Releasable {
implements AsyncEvent, DataSerializableFixedID, Conflatable, Sizeable, Releasable,
VersionedDataSerializable {
private static final long serialVersionUID = -5690172020872255422L;

protected static final Object TOKEN_NULL = new Object();

protected static final short VERSION = 0x11;
// It should use current version. But it was hard-coded to be 0x11, i.e. GEODE_120_ORDINAL,
// by mistake since 120 to pre-190
protected static final short VERSION = Version.GEODE_190.ordinal();

protected EnumListenerEvent operation;

Expand All @@ -85,7 +90,7 @@ public class GatewaySenderEventImpl
/**
* The number of parts for the <code>Message</code>
*
* @see org.apache.geode.internal.cache.tier.sockets.Message
* @see Message
*/
protected int numberOfParts;

Expand Down Expand Up @@ -170,6 +175,10 @@ public class GatewaySenderEventImpl

protected boolean isInitialized;

private transient boolean isConcurrencyConflict = false;

private short version;

/**
* Is this thread in the process of serializing this event?
*/
Expand Down Expand Up @@ -309,6 +318,7 @@ public GatewaySenderEventImpl(EnumListenerEvent operation, CacheEvent ce, Object
if (initialize) {
initialize();
}
this.isConcurrencyConflict = event.isConcurrencyConflict();
}

/**
Expand Down Expand Up @@ -670,7 +680,13 @@ public int getDSFID() {
return GATEWAY_SENDER_EVENT_IMPL;
}

@Override
public void toData(DataOutput out) throws IOException {
toDataPre_GEODE_1_9_0_0(out);
DataSerializer.writeBoolean(this.isConcurrencyConflict, out);
}

public void toDataPre_GEODE_1_9_0_0(DataOutput out) throws IOException {
// Make sure we are initialized before we serialize.
initialize();
out.writeShort(VERSION);
Expand All @@ -694,11 +710,16 @@ protected void serializeKey(DataOutput out) throws IOException {
DataSerializer.writeObject(this.key, out);
}

@Override
public void fromData(DataInput in) throws IOException, ClassNotFoundException {
short version = in.readShort();
if (version != VERSION) {
// warning?`
fromDataPre_GEODE_1_9_0_0(in);
if (version >= Version.GEODE_190.ordinal()) {
this.isConcurrencyConflict = DataSerializer.readBoolean(in);
}
}

public void fromDataPre_GEODE_1_9_0_0(DataInput in) throws IOException, ClassNotFoundException {
version = in.readShort();
this.isInitialized = true;
this.action = in.readInt();
this.numberOfParts = in.readInt();
Expand Down Expand Up @@ -741,7 +762,8 @@ public String toString() {
.append(";creationTime=").append(this.creationTime).append(";shadowKey=")
.append(this.shadowKey).append(";timeStamp=").append(this.versionTimeStamp)
.append(";acked=").append(this.isAcked).append(";dispatched=").append(this.isDispatched)
.append(";bucketId=").append(this.bucketId).append("]");
.append(";bucketId=").append(this.bucketId).append(";isConcurrencyConflict=")
.append(this.isConcurrencyConflict).append("]");
return buffer.toString();
}

Expand Down Expand Up @@ -1123,6 +1145,10 @@ public int getBucketId() {
return bucketId;
}

public boolean isConcurrencyConflict() {
return isConcurrencyConflict;
}

/**
* @param tailKey the tailKey to set
*/
Expand All @@ -1139,7 +1165,7 @@ public Long getShadowKey() {

@Override
public Version[] getSerializationVersions() {
return null;
return new Version[] {Version.GEODE_190};
}

public int getSerializedValueSize() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -369,6 +369,9 @@ private void releaseUnprocessedEvents() {
if (m != null) {
for (EventWrapper ew : m.values()) {
GatewaySenderEventImpl gatewayEvent = ew.event;
if (logger.isDebugEnabled()) {
logger.debug("releaseUnprocessedEvents:" + gatewayEvent);
}
gatewayEvent.release();
}
this.unprocessedEvents = null;
Expand Down Expand Up @@ -412,10 +415,18 @@ public void enqueueEvent(EnumListenerEvent operation, EntryEvent event, Object s
isPrimary = true;
} else {
// If it is not, create an uninitialized GatewayEventImpl and
// put it into the map of unprocessed events.
senderEvent = new GatewaySenderEventImpl(operation, event, substituteValue, false); // OFFHEAP
// ok
handleSecondaryEvent(senderEvent);
// put it into the map of unprocessed events, except 2 Special cases:
// 1) UPDATE_VERSION_STAMP: only enqueue to primary
// 2) CME && !originRemote: only enqueue to primary
boolean isUpdateVersionStamp =
event.getOperation().equals(Operation.UPDATE_VERSION_STAMP);
boolean isCME_And_NotOriginRemote =
((EntryEventImpl) event).isConcurrencyConflict() && !event.isOriginRemote();
if (!(isUpdateVersionStamp || isCME_And_NotOriginRemote)) {
senderEvent =
new GatewaySenderEventImpl(operation, event, substituteValue, false);
handleSecondaryEvent(senderEvent);
}
}
}
}
Expand Down
Loading

0 comments on commit 7f2950c

Please sign in to comment.