Skip to content

Commit

Permalink
GEODE-4738: move eventSeqNum and versionVector setting in constructor…
Browse files Browse the repository at this point in the history
…s. (apache#1504)

* GEODE-4738: move eventSeqNum and versionVector setting in constructors.
  • Loading branch information
pivotal-eshu authored Feb 27, 2018
1 parent 889da89 commit 3dad0a3
Show file tree
Hide file tree
Showing 6 changed files with 220 additions and 41 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,7 @@ public BucketRegion(String regionName, RegionAttributes attrs, LocalRegion paren
Assert.assertTrue(internalRegionArgs.getPartitionedRegion() != null);
this.redundancy = internalRegionArgs.getPartitionedRegionBucketRedundancy();
this.partitionedRegion = internalRegionArgs.getPartitionedRegion();
setEventSeqNum();
}

// Attempt to direct the GII process to the primary first
Expand All @@ -228,28 +229,6 @@ public void initialize(InputStream snapshotInputStream, InternalDistributedMembe
getBucketAdvisor().getProxyBucketRegion().setBucketRegion(this);
boolean success = false;
try {
if (this.partitionedRegion.isShadowPR()
&& this.partitionedRegion.getColocatedWith() != null) {
PartitionedRegion parentPR = ColocationHelper.getLeaderRegion(this.partitionedRegion);
BucketRegion parentBucket = parentPR.getDataStore().getLocalBucketById(getId());
// needs to be set only once.
if (parentBucket.eventSeqNum == null) {
parentBucket.eventSeqNum = new AtomicLong5(getId());
}
}
if (this.partitionedRegion.getColocatedWith() == null) {
this.eventSeqNum = new AtomicLong5(getId());
} else {
PartitionedRegion parentPR = ColocationHelper.getLeaderRegion(this.partitionedRegion);
BucketRegion parentBucket = parentPR.getDataStore().getLocalBucketById(getId());
if (parentBucket == null && logger.isDebugEnabled()) {
logger.debug("The parentBucket of region {} bucketId {} is NULL",
this.partitionedRegion.getFullPath(), getId());
}
Assert.assertTrue(parentBucket != null);
this.eventSeqNum = parentBucket.eventSeqNum;
}

final InternalDistributedMember primaryHolder = getBucketAdvisor().basicGetPrimaryMember();
if (primaryHolder != null && !primaryHolder.equals(getMyId())) {
// Ignore the provided image target, use an existing primary (if any)
Expand All @@ -267,6 +246,28 @@ public void initialize(InputStream snapshotInputStream, InternalDistributedMembe
}
}

private void setEventSeqNum() {
if (this.partitionedRegion.isShadowPR() && this.partitionedRegion.getColocatedWith() != null) {
PartitionedRegion parentPR = ColocationHelper.getLeaderRegion(this.partitionedRegion);
BucketRegion parentBucket = parentPR.getDataStore().getLocalBucketById(getId());
// needs to be set only once.
if (parentBucket.eventSeqNum == null) {
parentBucket.eventSeqNum = new AtomicLong5(getId());
}
}
if (this.partitionedRegion.getColocatedWith() == null) {
this.eventSeqNum = new AtomicLong5(getId());
} else {
PartitionedRegion parentPR = ColocationHelper.getLeaderRegion(this.partitionedRegion);
BucketRegion parentBucket = parentPR.getDataStore().getLocalBucketById(getId());
if (parentBucket == null && logger.isDebugEnabled()) {
logger.debug("The parentBucket of region {} bucketId {} is NULL",
this.partitionedRegion.getFullPath(), getId());
}
Assert.assertTrue(parentBucket != null);
this.eventSeqNum = parentBucket.eventSeqNum;
}
}


@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1029,11 +1029,6 @@ public void initialize(InputStream snapshotInputStream, InternalDistributedMembe
logger.debug("DistributedRegion.initialize BEGIN: {}", getFullPath());
}

// if we're versioning entries we need a region-level version vector
if (this.scope.isDistributed() && this.getConcurrencyChecksEnabled()) {
createVersionVector();
}

if (this.scope.isGlobal()) {
getLockService(); // create lock service eagerly now
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -337,7 +337,7 @@ public enum IteratorType {
/**
* tracks region-level version information for members
*/
private RegionVersionVector versionVector;
private final RegionVersionVector versionVector;

private static final Pattern[] QUERY_PATTERNS = new Pattern[] {
Pattern.compile("^\\(*select .*",
Expand Down Expand Up @@ -661,6 +661,8 @@ protected LocalRegion(String regionName, RegionAttributes attrs, LocalRegion par

this.testCallable = internalRegionArgs.getTestCallable();
eventTracker = createEventTracker();

versionVector = createRegionVersionVector();
}

protected EventTracker createEventTracker() {
Expand Down Expand Up @@ -711,24 +713,32 @@ Object getSizeGuard() {
}
}

protected RegionVersionVector createRegionVersionVector() {
if (getConcurrencyChecksEnabled()) {
return createVersionVector();
}
return null;
}

/** initializes a new version vector for this region */
void createVersionVector() {
this.versionVector = RegionVersionVector.create(getVersionMember(), this);
private RegionVersionVector createVersionVector() {
RegionVersionVector regionVersionVector = RegionVersionVector.create(getVersionMember(), this);

if (this.getDataPolicy().withPersistence()) {
// copy the versions that we have recovered from disk into
// the version vector.
RegionVersionVector diskVector = this.diskRegion.getRegionVersionVector();
this.versionVector.recordVersions(diskVector.getCloneForTransmission());
RegionVersionVector diskVector = diskRegion.getRegionVersionVector();
regionVersionVector.recordVersions(diskVector.getCloneForTransmission());
} else if (!this.getDataPolicy().withStorage()) {
// version vectors are currently only necessary in empty regions for
// tracking canonical member IDs
this.versionVector.turnOffRecordingForEmptyRegion();
regionVersionVector.turnOffRecordingForEmptyRegion();
}
if (this.serverRegionProxy != null) {
this.versionVector.setIsClientVector();
if (serverRegionProxy != null) {
regionVersionVector.setIsClientVector();
}
this.cache.getDistributionManager().addMembershipListener(this.versionVector);
cache.getDistributionManager().addMembershipListener(regionVersionVector);
return regionVersionVector;
}

@Override
Expand Down Expand Up @@ -2288,10 +2298,6 @@ public void initialize(InputStream snapshotInputStream, InternalDistributedMembe
}
}

// if we're versioning entries we need a region-level version vector
if (this.getConcurrencyChecksEnabled() && this.versionVector == null) {
createVersionVector();
}
// if not local, then recovery happens in InitialImageOperation
if (this.scope.isLocal()) {
createOQLIndexes(internalRegionArgs);
Expand Down Expand Up @@ -3357,7 +3363,6 @@ protected void enableConcurrencyChecks() {
Assert.assertTrue(this.entries.isEmpty(),
"RegionMap should be empty but was of size:" + this.entries.size());
this.entries.setEntryFactory(versionedEntryFactory);
createVersionVector();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1167,6 +1167,11 @@ public void initialize(InputStream snapshotInputStream, InternalDistributedMembe

}

@Override
protected RegionVersionVector createRegionVersionVector() {
return null;
}

/**
* Initializes the Node for this Map.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ protected void setInternalRegionArguments(InternalRegionArguments ira) {
ReadWriteLock primaryMoveLock = new ReentrantReadWriteLock();
Lock activeWriteLock = primaryMoveLock.readLock();
when(ba.getActiveWriteLock()).thenReturn(activeWriteLock);
when(ba.getProxyBucketRegion()).thenReturn(mock(ProxyBucketRegion.class));
when(ba.isPrimary()).thenReturn(true);

ira.setPartitionedRegion(pr).setPartitionedRegionBucketRedundancy(1).setBucketAdvisor(ba);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,172 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license
* agreements. See the NOTICE file distributed with this work for additional information regarding
* copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.geode.internal.cache.versions;

import static org.apache.geode.distributed.ConfigurationProperties.LOCATORS;
import static org.apache.geode.distributed.ConfigurationProperties.MCAST_PORT;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;

import java.util.Properties;
import java.util.Set;

import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.junit.experimental.categories.Category;

import org.apache.geode.cache.CacheFactory;
import org.apache.geode.cache.PartitionAttributesFactory;
import org.apache.geode.cache.Region;
import org.apache.geode.cache.RegionShortcut;
import org.apache.geode.internal.cache.BucketRegion;
import org.apache.geode.internal.cache.GemFireCacheImpl;
import org.apache.geode.internal.cache.InternalCache;
import org.apache.geode.internal.cache.LocalRegion;
import org.apache.geode.internal.cache.PartitionedRegion;
import org.apache.geode.test.junit.categories.IntegrationTest;

@Category(IntegrationTest.class)
public class RegionVersionVectorIntegrationTest {

private Properties props = new Properties();
private InternalCache cache = null;
private final String REGION_NAME = "region";
private Region region = null;

@Before
public void setup() {
props.setProperty(MCAST_PORT, "0");
props.setProperty(LOCATORS, "");
createCache();
}

@After
public void tearDown() {
cache.close();
}

private void createCache() {
cache = (InternalCache) new CacheFactory(props).create();
}

private void createData() {
// create buckets
for (int i = 0; i < 10; i++) {
region.put(i, "value");
}
}

@Test
public void partitionedRegionDoesNotCreateRegionVersionVector() {
PartitionAttributesFactory paf = new PartitionAttributesFactory();
paf.setTotalNumBuckets(10);
region = cache.createRegionFactory(RegionShortcut.PARTITION)
.setPartitionAttributes(paf.create()).create(REGION_NAME);
createData();
assertNull(((LocalRegion) region).getVersionVector());
}

@Test
public void persistPartitionedRegionDoesNotCreateRegionVersionVector() {
PartitionAttributesFactory paf = new PartitionAttributesFactory();
paf.setTotalNumBuckets(10);
region = cache.createRegionFactory(RegionShortcut.PARTITION_PERSISTENT)
.setPartitionAttributes(paf.create()).create(REGION_NAME);
createData();
assertNull(((LocalRegion) region).getVersionVector());
}

@Test
public void bucketRegionCreatesRegionVersionVector() {
PartitionAttributesFactory paf = new PartitionAttributesFactory();
paf.setTotalNumBuckets(10);
region = cache.createRegionFactory(RegionShortcut.PARTITION)
.setPartitionAttributes(paf.create()).create(REGION_NAME);
createData();
PartitionedRegion partitionedRegion = (PartitionedRegion) region;
Set<BucketRegion> bucketRegions = partitionedRegion.getDataStore().getAllLocalBucketRegions();
for (BucketRegion bucketRegion : bucketRegions) {
assertNotNull(bucketRegion.getVersionVector());
}
}

@Test
public void persistBucketRegionCreatesRegionVersionVector() {
PartitionAttributesFactory paf = new PartitionAttributesFactory();
paf.setTotalNumBuckets(10);
region = cache.createRegionFactory(RegionShortcut.PARTITION_PERSISTENT)
.setPartitionAttributes(paf.create()).create(REGION_NAME);
createData();
PartitionedRegion partitionedRegion = (PartitionedRegion) region;
Set<BucketRegion> bucketRegions = partitionedRegion.getDataStore().getAllLocalBucketRegions();
for (BucketRegion bucketRegion : bucketRegions) {
assertNotNull(bucketRegion.getVersionVector());
}
}

@Test
public void bucketRegionOnPartitionedRegionWithConcurrencyCheckDisabledDoesNotCreateRegionVersionVector() {
PartitionAttributesFactory paf = new PartitionAttributesFactory();
paf.setTotalNumBuckets(10);
region = cache.createRegionFactory(RegionShortcut.PARTITION).setConcurrencyChecksEnabled(false)
.setPartitionAttributes(paf.create()).create(REGION_NAME);
createData();
PartitionedRegion partitionedRegion = (PartitionedRegion) region;
Set<BucketRegion> bucketRegions = partitionedRegion.getDataStore().getAllLocalBucketRegions();
for (BucketRegion bucketRegion : bucketRegions) {
assertNull(bucketRegion.getVersionVector());
}
}

@Test
public void distributedRegionCreatesRegionVersionVector() {
region = cache.createRegionFactory(RegionShortcut.REPLICATE).create(REGION_NAME);
assertNotNull(((LocalRegion) region).getVersionVector());
}

@Test
public void persistentDistributedRegionWithPersistenceCreateRegionVersionVector() {
region = cache.createRegionFactory(RegionShortcut.REPLICATE_PERSISTENT).create(REGION_NAME);
assertNotNull(((LocalRegion) region).getVersionVector());
}

@Test
public void distributedRegionWithConcurrencyCheckDisabledDoesNotCreateRegionVersionVector() {
region = cache.createRegionFactory(RegionShortcut.REPLICATE).setConcurrencyChecksEnabled(false)
.create(REGION_NAME);
assertNull(((LocalRegion) region).getVersionVector());
}

@Test
public void localRegionCreateRegionVersionVector() {
region = cache.createRegionFactory(RegionShortcut.LOCAL).create(REGION_NAME);
assertNotNull(((LocalRegion) region).getVersionVector());
}

@Test
public void localPersistentRegionCreateRegionVersionVector() {
region = cache.createRegionFactory(RegionShortcut.LOCAL_PERSISTENT).create(REGION_NAME);
assertNotNull(((LocalRegion) region).getVersionVector());
}

@Test
public void localRegionDisableConcurrencyCheckDoesNotCreateRegionVersionVector() {
region = cache.createRegionFactory(RegionShortcut.LOCAL).setConcurrencyChecksEnabled(false)
.create(REGION_NAME);
assertNull(((LocalRegion) region).getVersionVector());
}

}

0 comments on commit 3dad0a3

Please sign in to comment.