Skip to content

Commit

Permalink
GEODE-8958: When tombstone timestamps get corrupted. (apache#6042)
Browse files Browse the repository at this point in the history
- The system would wait, now it does expires it and moves on.
  • Loading branch information
mhansonp authored Mar 1, 2021
1 parent 6edf695 commit ab59830
Show file tree
Hide file tree
Showing 2 changed files with 148 additions and 26 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import java.util.Properties;
import java.util.concurrent.CountDownLatch;

import org.apache.logging.log4j.Logger;
import org.junit.After;
import org.junit.Rule;
import org.junit.Test;
Expand All @@ -48,7 +49,10 @@
import org.apache.geode.internal.cache.InitialImageOperation;
import org.apache.geode.internal.cache.InternalCache;
import org.apache.geode.internal.cache.LocalRegion;
import org.apache.geode.internal.cache.PartitionedRegion;
import org.apache.geode.internal.cache.RegionEntry;
import org.apache.geode.internal.cache.TombstoneService;
import org.apache.geode.logging.internal.log4j.api.LogService;
import org.apache.geode.test.dunit.AsyncInvocation;
import org.apache.geode.test.dunit.NetworkUtils;
import org.apache.geode.test.dunit.VM;
Expand Down Expand Up @@ -117,35 +121,139 @@ public void testTombstoneGcMessagesBetweenPersistentAndNonPersistentRegion() {
});
}


@Test
public void testGetOldestTombstoneTimeReplicate() {
public void testWhenAnOutOfRangeTimeStampIsSeenWeExpireItInReplicateTombstoneSweeper() {
VM server1 = VM.getVM(0);
VM server2 = VM.getVM(1);
final int FAR_INTO_THE_FUTURE = 1000000; // 1 million millis into the future
final int count = 10;

// Create a cache and load some boiler plate entries
server1.invoke(() -> {
createCacheAndRegion(REPLICATE_PERSISTENT);
region.put("K1", "V1");
region.put("K2", "V2");
createCacheAndRegion(RegionShortcut.REPLICATE);
for (int i = 0; i < count; i++) {
region.put("K" + i, "V" + i);
}
});

server2.invoke(() -> createCacheAndRegion(REPLICATE));
server2.invoke(() -> createCacheAndRegion(RegionShortcut.REPLICATE));

server1.invoke(() -> {
// Send tombstone gc message to vm1.
region.destroy("K1");

// Now that we have a cache and a region specifically with data, we can start the real work
TombstoneService.TombstoneSweeper tombstoneSweeper =
((InternalCache) cache).getTombstoneService().getSweeper((LocalRegion) region);

assertThat(tombstoneSweeper.getOldestTombstoneTime()).isGreaterThan(0)
.isLessThan(((InternalCache) cache).cacheTimeMillis());
performGC(1);
// Get one of the entries
RegionEntry regionEntry = ((LocalRegion) region).getRegionEntry("K0");

/*
* Create a version tag with a timestamp far off in the future...
* It should be in the near past, but we are testing that a future tombstone will be cleared
*/
VersionTag<?> versionTag = regionEntry.getVersionStamp().asVersionTag();
versionTag.setVersionTimeStamp(System.currentTimeMillis() + FAR_INTO_THE_FUTURE);

// Create the forged tombstone with the versionTag from the future
TombstoneService.Tombstone modifiedTombstone =
new TombstoneService.Tombstone(regionEntry, (LocalRegion) region,
versionTag);

// Add it to the list of tombstones so that when checkOldestUnexpired is called it will see it
tombstoneSweeper.tombstones.add(modifiedTombstone);
tombstoneSweeper.checkOldestUnexpired(System.currentTimeMillis());

// Validate that the tombstone was cleared.
assertThat(tombstoneSweeper.getOldestTombstoneTime()).isEqualTo(0);
});
}

@Test
public void testGetOldestTombstoneTimeNonReplicate() {
public void testWhenAnOutOfRangeTimeStampIsSeenWeExpireItInNonReplicateTombstoneSweeper() {
VM server1 = VM.getVM(0);
VM server2 = VM.getVM(1);
final int FAR_INTO_THE_FUTURE = 1000000; // 1 million millis into the future
final int count = 2000;
Logger logger = LogService.getLogger();
// Create a cache and load some boiler plate entries
server1.invoke(() -> {
createCacheAndRegion(RegionShortcut.PARTITION);
for (int i = 0; i < count; i++) {
region.put("K" + i, "V" + i);
}
});

server2.invoke(() -> createCacheAndRegion(RegionShortcut.PARTITION));

server1.invoke(() -> {

// Now that we have a cache and a region specifically with data, we can start the real work
TombstoneService.TombstoneSweeper tombstoneSweeper =
((InternalCache) cache).getTombstoneService().getSweeper((LocalRegion) region);

// Get one of the entries

PartitionedRegion partitionedRegion = (PartitionedRegion) region;
RegionEntry regionEntry = partitionedRegion.getBucketRegion("K0").getRegionEntry("K0");

/*
* Create a version tag with a timestamp far off in the future...
* It should be in the near past, but we are testing that a future tombstone will be cleared
*/

VersionTag<?> versionTag = regionEntry.getVersionStamp().asVersionTag();
versionTag.setVersionTimeStamp(System.currentTimeMillis() + FAR_INTO_THE_FUTURE);

// Create the forged tombstone with the versionTag from the future
TombstoneService.Tombstone modifiedTombstone =
new TombstoneService.Tombstone(regionEntry, (LocalRegion) region,
versionTag);

// Add it to the list of tombstones so that when checkOldestUnexpired is called it will see it
tombstoneSweeper.tombstones.add(modifiedTombstone);
tombstoneSweeper.checkOldestUnexpired(System.currentTimeMillis());

// Validate that the tombstone was cleared.
assertThat(tombstoneSweeper.getOldestTombstoneTime()).isEqualTo(0);
});
}



@Test
public void testGetOldestTombstoneTimeForReplicateTombstoneSweeper() {
VM server1 = VM.getVM(0);
VM server2 = VM.getVM(1);
final int count = 10;
server1.invoke(() -> {
createCacheAndRegion(RegionShortcut.REPLICATE);
for (int i = 0; i < count; i++) {
region.put("K" + i, "V" + i);
}
});

server2.invoke(() -> createCacheAndRegion(RegionShortcut.REPLICATE));

server1.invoke(() -> {
TombstoneService.TombstoneSweeper tombstoneSweeper =
((InternalCache) cache).getTombstoneService().getSweeper((LocalRegion) region);
// Send tombstone gc message to vm1.
for (int i = 0; i < count; i++) {
region.destroy("K" + i);
assertThat(
tombstoneSweeper.getOldestTombstoneTime()
+ TombstoneService.REPLICATE_TOMBSTONE_TIMEOUT_DEFAULT - System.currentTimeMillis())
.isGreaterThan(0);
performGC(1);
}

assertThat(tombstoneSweeper.getOldestTombstoneTime()).isEqualTo(0);
});
}

@Test
public void testGetOldestTombstoneTimeForNonReplicateTombstoneSweeper() {
VM client = VM.getVM(0);
VM server = VM.getVM(1);

Expand Down Expand Up @@ -188,12 +296,12 @@ public void testGetOldestTombstoneTimeNonReplicate() {
* and validate that it matches the tombstone of the entry we removed.
*/
@Test
public void testGetOldestTombstoneReplicate() {
public void testGetOldestTombstoneForReplicateTombstoneSweeper() {
VM server1 = VM.getVM(0);
VM server2 = VM.getVM(1);

server1.invoke(() -> {
createCacheAndRegion(REPLICATE_PERSISTENT);
createCacheAndRegion(REPLICATE);
region.put("K1", "V1");
region.put("K2", "V2");
});
Expand Down Expand Up @@ -221,7 +329,7 @@ public void testGetOldestTombstoneReplicate() {
* as a client is required to have this non-replicate tombstone.
*/
@Test
public void testGetOldestTombstoneNonReplicate() {
public void testGetOldestTombstoneForNonReplicateTombstoneSweeper() {
VM client = VM.getVM(0);
VM server = VM.getVM(1);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -373,7 +373,8 @@ public Object getBlockGCLock() {
return this.replicatedTombstoneSweeper.getBlockGCLock();
}

protected static class Tombstone extends CompactVersionHolder {
@VisibleForTesting
public static class Tombstone extends CompactVersionHolder {
// tombstone overhead size
public static final int PER_TOMBSTONE_OVERHEAD =
ReflectionSingleObjectSizer.REFERENCE_SIZE // queue's reference to the tombstone
Expand All @@ -385,7 +386,8 @@ protected static class Tombstone extends CompactVersionHolder {
RegionEntry entry;
LocalRegion region;

Tombstone(RegionEntry entry, LocalRegion region, VersionTag destroyedVersion) {
@VisibleForTesting
public Tombstone(RegionEntry entry, LocalRegion region, VersionTag destroyedVersion) {
super(destroyedVersion);
this.entry = entry;
this.region = region;
Expand Down Expand Up @@ -423,8 +425,13 @@ protected void updateStatistics() {
}

@Override
protected boolean hasExpired(long msTillHeadTombstoneExpires) {
return msTillHeadTombstoneExpires <= 0;
protected boolean hasExpired(long msUntilTombstoneExpires) {
/*
* In case the tombstone expiration time would be too far out lets cap it. This is just
* making the system fault tolerant in the case that there are large clock jumps or
* unrealistically large timestamps.
*/
return msUntilTombstoneExpires <= 0 || msUntilTombstoneExpires > EXPIRY_TIME;
}

@Override
Expand Down Expand Up @@ -744,12 +751,17 @@ private boolean isFreeMemoryLow() {
}

@Override
protected boolean hasExpired(long msTillHeadTombstoneExpires) {
protected boolean hasExpired(long msUntilTombstoneExpires) {
if (testHook_forceExpirationCount > 0) {
testHook_forceExpirationCount--;
return true;
}
return msTillHeadTombstoneExpires <= 0;
/*
* In case the tombstone expiration time would be too far out lets cap it. This is just
* making the system fault tolerant in the case that there are large clock jumps or
* unrealistically large timestamps.
*/
return msUntilTombstoneExpires <= 0 || msUntilTombstoneExpires > EXPIRY_TIME;
}

@Override
Expand Down Expand Up @@ -847,7 +859,8 @@ public abstract static class TombstoneSweeper implements Runnable {
* are left in this queue and the sweeper thread figures out that they are no longer valid
* tombstones.
*/
protected final Queue<Tombstone> tombstones;
@VisibleForTesting
public final Queue<Tombstone> tombstones;
/**
* Estimate of the amount of memory used by this sweeper
*/
Expand Down Expand Up @@ -1063,7 +1076,8 @@ private void purgeObsoleteTombstones(final long now) {
/**
* See if the oldest unexpired tombstone should be expired.
*/
private void checkOldestUnexpired(long now) {
@VisibleForTesting
public void checkOldestUnexpired(long now) {
sleepTime = 0;
lockQueueHead();
Tombstone oldest = tombstones.peek();
Expand All @@ -1078,8 +1092,8 @@ private void checkOldestUnexpired(long now) {
if (logger.isTraceEnabled(LogMarker.TOMBSTONE_VERBOSE)) {
logger.trace(LogMarker.TOMBSTONE_VERBOSE, "oldest unexpired tombstone is {}", oldest);
}
long msTillHeadTombstoneExpires = oldest.getVersionTimeStamp() + EXPIRY_TIME - now;
if (hasExpired(msTillHeadTombstoneExpires)) {
long msUntilHeadTombstoneExpires = oldest.getVersionTimeStamp() + EXPIRY_TIME - now;
if (hasExpired(msUntilHeadTombstoneExpires)) {
try {
tombstones.remove();
expireTombstone(oldest);
Expand All @@ -1089,7 +1103,7 @@ private void checkOldestUnexpired(long now) {
logger.warn("Unexpected exception while processing tombstones", e);
}
} else {
sleepTime = msTillHeadTombstoneExpires;
sleepTime = Math.min(msUntilHeadTombstoneExpires, EXPIRY_TIME);
}
}
} finally {
Expand Down Expand Up @@ -1121,7 +1135,7 @@ public String toString() {

protected abstract void handleNoUnexpiredTombstones();

protected abstract boolean hasExpired(long msTillTombstoneExpires);
protected abstract boolean hasExpired(long msUntilTombstoneExpires);

protected abstract void expireTombstone(Tombstone tombstone);

Expand Down

0 comments on commit ab59830

Please sign in to comment.