Skip to content

Commit

Permalink
GEODE-2547: Interest registration no longer causes a CacheLoader to b…
Browse files Browse the repository at this point in the history
…e invoked
  • Loading branch information
boglesby committed Feb 28, 2017
1 parent fb1fdf9 commit 1a36d36
Show file tree
Hide file tree
Showing 5 changed files with 148 additions and 17 deletions.
16 changes: 16 additions & 0 deletions geode-core/src/main/java/org/apache/geode/cache/Operation.java
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ public final class Operation implements java.io.Serializable {
private static final byte OP_TYPE_CLEAR = OpType.CLEAR;
private static final byte OP_TYPE_MARKER = OpType.MARKER;
private static final byte OP_TYPE_UPDATE_VERSION = OpType.UPDATE_ENTRY_VERSION;
private static final byte OP_TYPE_GET_FOR_REGISTER_INTEREST = OpType.GET_FOR_REGISTER_INTEREST;

private static final int OP_DETAILS_NONE = 0;
private static final int OP_DETAILS_SEARCH = 1;
Expand Down Expand Up @@ -531,6 +532,14 @@ public final class Operation implements java.io.Serializable {
false, // isRegion
OP_TYPE_DESTROY, OP_DETAILS_REMOVEALL);

/**
* A 'get for register interest' operation.
*/
public static final Operation GET_FOR_REGISTER_INTEREST =
new Operation("GET_FOR_REGISTER_INTEREST", false, // isLocal
false, // isRegion
OP_TYPE_GET_FOR_REGISTER_INTEREST, OP_DETAILS_NONE);

/** The name of this mirror type. */
private final transient String name;

Expand Down Expand Up @@ -635,6 +644,13 @@ public boolean isGetEntry() {
return this.opType == OP_TYPE_GET_ENTRY;
}

/**
* Returns true if this operation is a get for register interest.
*/
public boolean isGetForRegisterInterest() {
return this.opType == OP_TYPE_GET_FOR_REGISTER_INTEREST;
}

/**
* Returns true if the operation invalidated an entry.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2302,17 +2302,27 @@ protected Object findObjectInSystem(KeyInfo keyInfo, boolean isCreate, TXStateIn
if (requestingClient != null) {
event.setContext(requestingClient);
}
SearchLoadAndWriteProcessor processor = SearchLoadAndWriteProcessor.getProcessor();
try {
processor.initialize(this, key, aCallbackArgument);
// processor fills in event
processor.doSearchAndLoad(event, txState, localValue);
if (clientEvent != null && clientEvent.getVersionTag() == null) {
clientEvent.setVersionTag(event.getVersionTag());
// If this event is because of a register interest call, don't invoke the CacheLoader
boolean getForRegisterInterest = clientEvent != null && clientEvent.getOperation() != null
&& clientEvent.getOperation().isGetForRegisterInterest();
if (!getForRegisterInterest) {
SearchLoadAndWriteProcessor processor = SearchLoadAndWriteProcessor.getProcessor();
try {
processor.initialize(this, key, aCallbackArgument);
// processor fills in event
processor.doSearchAndLoad(event, txState, localValue);
if (clientEvent != null && clientEvent.getVersionTag() == null) {
clientEvent.setVersionTag(event.getVersionTag());
}
lastModified = processor.getLastModified();
} finally {
processor.release();
}
} else {
if (logger.isDebugEnabled()) {
logger.debug("DistributedRegion.findObjectInSystem skipping loader for region="
+ getFullPath() + "; key=" + key);
}
lastModified = processor.getLastModified();
} finally {
processor.release();
}
}
if (event.hasNewValue() && !isMemoryThresholdReachedForLoad()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@ private OpType() {}

public static final byte UPDATE_ENTRY_VERSION = 11;

public static final byte GET_FOR_REGISTER_INTEREST = 12;

public static final byte CLEAR = 16;

public static final byte MARKER = 32;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1074,7 +1074,7 @@ private static void handleKVSingleton(LocalRegion region, Object entryKey,

if (region != null) {
if (region.containsKey(entryKey) || region.containsTombstone(entryKey)) {
VersionTagHolder versionHolder = new VersionTagHolder();
VersionTagHolder versionHolder = createVersionTagHolder();
ClientProxyMembershipID id = servConn == null ? null : servConn.getProxyID();
// From Get70.getValueAndIsObject()
Object data = region.get(entryKey, null, true, true, true, id, versionHolder, true);
Expand Down Expand Up @@ -1161,7 +1161,7 @@ private static void handleKVAllKeys(LocalRegion region, String regex, boolean se
}

for (Object key : region.keySet(true)) {
VersionTagHolder versionHolder = new VersionTagHolder();
VersionTagHolder versionHolder = createVersionTagHolder();
if (keyPattern != null) {
if (!(key instanceof String)) {
// key is not a String, cannot apply regex to this entry
Expand Down Expand Up @@ -1263,12 +1263,10 @@ private static void updateValues(VersionedObjectList values, Object key, Object
public static void appendNewRegisterInterestResponseChunkFromLocal(LocalRegion region,
VersionedObjectList values, Object riKeys, Set keySet, ServerConnection servConn)
throws IOException {
Object key = null;
VersionTagHolder versionHolder = null;
ClientProxyMembershipID requestingClient = servConn == null ? null : servConn.getProxyID();
for (Iterator it = keySet.iterator(); it.hasNext();) {
key = it.next();
versionHolder = new VersionTagHolder();
Object key = it.next();
VersionTagHolder versionHolder = createVersionTagHolder();

Object value = region.get(key, null, true, true, true, requestingClient, versionHolder, true);

Expand Down Expand Up @@ -1454,7 +1452,7 @@ private static void handleKVList(final LocalRegion region, final List keyList,
for (Iterator it = keyList.iterator(); it.hasNext();) {
Object key = it.next();
if (region.containsKey(key) || region.containsTombstone(key)) {
VersionTagHolder versionHolder = new VersionTagHolder();
VersionTagHolder versionHolder = createVersionTagHolder();

ClientProxyMembershipID id = servConn == null ? null : servConn.getProxyID();
data = region.get(key, null, true, true, true, id, versionHolder, true);
Expand All @@ -1475,6 +1473,12 @@ private static void handleKVList(final LocalRegion region, final List keyList,
sendNewRegisterInterestResponseChunk(region, keyList, values, true, servConn);
}

private static VersionTagHolder createVersionTagHolder() {
VersionTagHolder versionHolder = new VersionTagHolder();
versionHolder.setOperation(Operation.GET_FOR_REGISTER_INTEREST);
return versionHolder;
}

/**
* Append an interest response
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -479,6 +479,80 @@ private void runRegisterInterestWithCacheLoaderTest(boolean addReplicatedRegion)
vm1.invoke(() -> InterestListDUnitTest.confirmNoCacheListenerInvalidates());
}

@Test
public void testRegisterInterestSingleKeyWithDestroyOnReplicatedRegionWithCacheLoader() {
List keysToDestroy = new ArrayList();
keysToDestroy.add("0");
runRegisterInterestWithDestroyAndCacheLoaderTest(true, keysToDestroy, keysToDestroy);
}

@Test
public void testRegisterInterestSingleKeyWithDestroyOnPartitionedRegionWithCacheLoader() {
List keysToDestroy = new ArrayList();
keysToDestroy.add("0");
runRegisterInterestWithDestroyAndCacheLoaderTest(false, keysToDestroy, keysToDestroy);
}

@Test
public void testRegisterInterestListOfKeysWithDestroyOnReplicatedRegionWithCacheLoader() {
List keysToDestroy = new ArrayList();
for (int i = 0; i < 5; i++) {
keysToDestroy.add(String.valueOf(i));
}
runRegisterInterestWithDestroyAndCacheLoaderTest(true, keysToDestroy, keysToDestroy);
}

@Test
public void testRegisterInterestListOfKeysWithDestroyOnPartitionedRegionWithCacheLoader() {
List keysToDestroy = new ArrayList();
for (int i = 0; i < 5; i++) {
keysToDestroy.add(String.valueOf(i));
}
runRegisterInterestWithDestroyAndCacheLoaderTest(false, keysToDestroy, keysToDestroy);
}

@Test
public void testRegisterInterestAllKeysWithDestroyOnReplicatedRegionWithCacheLoader() {
List keysToDestroy = new ArrayList();
keysToDestroy.add("0");
runRegisterInterestWithDestroyAndCacheLoaderTest(true, keysToDestroy, "ALL_KEYS");
}

@Test
public void testRegisterInterestAllKeysWithDestroyOnPartitionedRegionWithCacheLoader() {
List keysToDestroy = new ArrayList();
keysToDestroy.add("0");
runRegisterInterestWithDestroyAndCacheLoaderTest(false, keysToDestroy, "ALL_KEYS");
}

private void runRegisterInterestWithDestroyAndCacheLoaderTest(boolean addReplicatedRegion,
List keysToDestroy, Object keyToRegister) {
// The server was already started with a replicated region. Bounce it if necessary
int port1 = PORT1;
if (!addReplicatedRegion) {
vm0.invoke(() -> closeCache());
port1 =
((Integer) vm0.invoke(() -> InterestListDUnitTest.createServerCache(addReplicatedRegion)))
.intValue();
}
final int port = port1;

// Add a cache loader to the region
vm0.invoke(() -> addCacheLoader());

// Create client cache
vm1.invoke(() -> createClientCache(NetworkUtils.getServerHostName(vm0.getHost()), port));

// Destroy appropriate key(s)
vm1.invoke(() -> destroyKeys(keysToDestroy));

// Register interest in appropriate keys(s)
vm1.invoke(() -> registerKey(keyToRegister));

// Verify CacheLoader was not invoked
vm0.invoke(() -> verifyNoCacheLoaderLoads());
}

private void createCache(Properties props) throws Exception {
DistributedSystem ds = getSystem(props);
cache = CacheFactory.create(ds);
Expand Down Expand Up @@ -905,6 +979,20 @@ private static void putAgain(String vm) {
}
}

private static void destroyKeys(List keys) {
Region r = cache.getRegion(REGION_NAME);
for (Object key : keys) {
r.destroy(key);
}
}

private static void verifyNoCacheLoaderLoads() throws Exception {
Region region = cache.getRegion(REGION_NAME);
ReturnKeyCacheLoader cacheLoader =
(ReturnKeyCacheLoader) region.getAttributes().getCacheLoader();
assertEquals(0/* expected */, cacheLoader.getLoads()/* actual */);
}

private static void validateEntriesK1andK2(final String vm) {
WaitCriterion ev = new WaitCriterion() {
@Override
Expand Down Expand Up @@ -1076,14 +1164,25 @@ public boolean hasReceivedAllCreateEvents() {

private static class ReturnKeyCacheLoader implements CacheLoader {

private AtomicInteger loads = new AtomicInteger();

@Override
public void close() {
// Do nothing
}

@Override
public Object load(LoaderHelper helper) throws CacheLoaderException {
incrementLoads();
return helper.getKey();
}

private void incrementLoads() {
this.loads.incrementAndGet();
}

private int getLoads() {
return this.loads.get();
}
}
}

0 comments on commit 1a36d36

Please sign in to comment.