Skip to content

Commit

Permalink
Revert "Revert "Revert "GEODE-6807: cache adviseUpdate and adviseAllE…
Browse files Browse the repository at this point in the history
…ventsOrCached" (apache#4189)" (apache#4212)" (apache#4442)

This reverts commit 80efe5c.
  • Loading branch information
jhuynh1 authored Dec 11, 2019
2 parents 03e31f0 + 1448c83 commit bde1f71
Show file tree
Hide file tree
Showing 6 changed files with 30 additions and 372 deletions.

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -170,12 +170,6 @@ public class DistributionAdvisor {
*/
private int numActiveProfiles = 0;

/**
* Profiles version number
*/
protected volatile long profilesVersion = 0;


/**
* A collection of MembershipListeners that want to be notified when a profile is added to or
* removed from this DistributionAdvisor. The keys are membership listeners and the values are
Expand Down Expand Up @@ -1319,7 +1313,8 @@ public Set adviseProfileRemove() {
// must synchronize when modifying profile array
private synchronized boolean basicAddProfile(Profile p) {
// don't add more than once, but replace existing profile
profilesVersion++;
// try {

int index = indexOfMemberId(p.getId());
if (index >= 0) {
Profile[] oldProfiles = profiles; // volatile read
Expand All @@ -1345,16 +1340,17 @@ private synchronized boolean basicAddProfile(Profile p) {
* Perform work of removing the given member from this advisor.
*/
private synchronized Profile basicRemoveMemberId(ProfileId id) {

// try {
int i = indexOfMemberId(id);
if (i >= 0) {
profilesVersion++;
Profile profileRemoved = profiles[i];
basicRemoveIndex(i);
return profileRemoved;
} else
return null;

// } finally {
// Assert.assertTrue(-1 == indexOfMemberId(id));
// }
}

private int indexOfMemberId(ProfileId id) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,19 +112,6 @@ public class CacheDistributionAdvisor extends DistributionAdvisor {

// moved removedProfiles to DistributionAdvisor

private Set<InternalDistributedMember> adviseSetforAllEvents = Collections.emptySet();
private volatile long adviseAllEventsVersion = -1;

private Set<InternalDistributedMember> adviseSetforUpdate = Collections.emptySet();
private volatile long adviseUpdateVersion = -1;

private volatile long inRecoveryVersion = 0;
private volatile long adviseInRecoveryVersion = -1;

public synchronized void incInRecoveryVersion() {
inRecoveryVersion++;
}

/** Creates a new instance of CacheDistributionAdvisor */
protected CacheDistributionAdvisor(CacheDistributionAdvisee region) {
super(region);
Expand Down Expand Up @@ -153,35 +140,19 @@ public String toString() {
/**
* Returns a the set of members that either want all events or are caching data.
*
* @param excludeInRecovery if true then members in recovery are excluded
*/
Set<InternalDistributedMember> adviseAllEventsOrCached()
private Set<InternalDistributedMember> adviseAllEventsOrCached(final boolean excludeInRecovery)
throws IllegalStateException {
getAdvisee().getCancelCriterion().checkCancelInProgress(null);

// minimize volatile reads by copying ref to local var
long tempProfilesVersion = profilesVersion; // volatile read
long tempInRecoveryVersion = inRecoveryVersion; // volatile read

if (adviseAllEventsVersion != tempProfilesVersion
|| adviseInRecoveryVersion != tempInRecoveryVersion) {
synchronized (adviseSetforAllEvents) {
if (adviseAllEventsVersion != tempProfilesVersion
|| adviseInRecoveryVersion != tempInRecoveryVersion) {

adviseSetforAllEvents = Collections.unmodifiableSet(adviseFilter(profile -> {
CacheProfile cp = (CacheProfile) profile;
if (cp.getInRecovery()) {
return false;
}
return cp.cachedOrAllEventsWithListener();
}));
adviseAllEventsVersion = tempProfilesVersion;
adviseInRecoveryVersion = tempInRecoveryVersion;
}
return adviseFilter(profile -> {
assert profile instanceof CacheProfile;
CacheProfile cp = (CacheProfile) profile;
if (excludeInRecovery && cp.inRecovery) {
return false;
}
}
return adviseSetforAllEvents;

return cp.cachedOrAllEventsWithListener();
});
}

/**
Expand All @@ -191,30 +162,18 @@ Set<InternalDistributedMember> adviseAllEventsOrCached()
Set adviseUpdate(final EntryEventImpl event) throws IllegalStateException {
if (event.hasNewValue() || event.getOperation().isPutAll()) {
// only need to distribute it to members that want all events or cache data
return adviseAllEventsOrCached();
return adviseAllEventsOrCached(true/* fixes 41147 */);
} else {
// The new value is null so this is a create with a null value,
// in which case we only need to distribute this message to replicates
// or all events that are not a proxy or if a proxy has a listener

// minimize volatile reads by copying ref to local var
long tempProfilesVersion = profilesVersion; // volatile read

if (adviseUpdateVersion != tempProfilesVersion) {
synchronized (adviseSetforUpdate) {
if (adviseUpdateVersion != tempProfilesVersion) {

adviseSetforUpdate = Collections.unmodifiableSet(adviseFilter(profile -> {
CacheProfile cp = (CacheProfile) profile;
DataPolicy dp = cp.getDataPolicy();
return dp.withReplication()
|| (cp.allEvents() && (dp.withStorage() || cp.hasCacheListener));
}));
adviseUpdateVersion = tempProfilesVersion;
}
}
}
return adviseSetforUpdate;
return adviseFilter(profile -> {
assert profile instanceof CacheProfile;
CacheProfile cp = (CacheProfile) profile;
DataPolicy dp = cp.dataPolicy;
return dp.withReplication()
|| (cp.allEvents() && (dp.withStorage() || cp.hasCacheListener));
});
}
}

Expand Down Expand Up @@ -291,7 +250,7 @@ public FilterRoutingInfo adviseFilterRouting(CacheEvent event, Set cacheOpRecipi
* Same as adviseGeneric except in recovery excluded.
*/
public Set<InternalDistributedMember> adviseCacheOp() {
return adviseAllEventsOrCached();
return adviseAllEventsOrCached(true);
}

/*
Expand All @@ -301,7 +260,7 @@ Set<InternalDistributedMember> adviseInvalidateRegion() {
return adviseFilter(profile -> {
assert profile instanceof CacheProfile;
CacheProfile cp = (CacheProfile) profile;
return !cp.getInRecovery();
return !cp.inRecovery;
});
}

Expand All @@ -324,7 +283,7 @@ public Set adviseNetWrite() {
assert profile instanceof CacheProfile;
CacheProfile prof = (CacheProfile) profile;
// if region in cache is in recovery, exclude
if (prof.getInRecovery()) {
if (prof.inRecovery) {
return false;
}

Expand Down Expand Up @@ -405,7 +364,7 @@ public InitialImageAdvice adviseInitialImage(InitialImageAdvice previousAdvice,
}

// if region in cache is in recovery, exclude
if (profile.getInRecovery()) {
if (profile.inRecovery) {
uninitialized.add(profile.getDistributedMember());
continue;
}
Expand Down Expand Up @@ -494,13 +453,12 @@ protected boolean evaluateProfiles(Profile newProfile, Profile oldProfile) {
*/
public static class CacheProfile extends DistributionAdvisor.Profile {
public DataPolicy dataPolicy = DataPolicy.REPLICATE;

public InterestPolicy interestPolicy = InterestPolicy.DEFAULT;
public boolean hasCacheLoader = false;
public boolean hasCacheWriter = false;
public boolean hasCacheListener = false;
public Scope scope = Scope.DISTRIBUTED_NO_ACK;
private boolean inRecovery = false;
public boolean inRecovery = false;
public Set<String> gatewaySenderIds = Collections.emptySet();
public Set<String> asyncEventQueueIds = Collections.emptySet();
/**
Expand Down Expand Up @@ -652,18 +610,6 @@ public boolean isPersistent() {
return dataPolicy.withPersistence();
}

public boolean getInRecovery() {
return inRecovery;
};

public void setInRecovery(boolean recovery) {
inRecovery = recovery;
};

public DataPolicy getDataPolicy() {
return dataPolicy;
}

/** Set the profile data information that is stored in a short */
protected void setIntInfo(int s) {
if ((s & REPLICATE_MASK) != 0) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -345,7 +345,7 @@ protected void _distribute() {

try {
// Recipients with CacheOp
Set<InternalDistributedMember> recipients = new HashSet<>(getRecipients());
Set<InternalDistributedMember> recipients = getRecipients();
Map<InternalDistributedMember, PersistentMemberID> persistentIds = null;
if (region.getDataPolicy().withPersistence()) {
persistentIds = region.getDistributionAdvisor().adviseInitializedPersistentMembers();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2214,12 +2214,7 @@ public void fillInProfile(Profile profile) {
cacheProfile.hasCacheListener = hasListener();
Assert.assertTrue(scope.isDistributed());
cacheProfile.scope = scope;

boolean newInRecovery = getImageState().getInRecovery();
if (cacheProfile.getInRecovery() != newInRecovery) {
distAdvisor.incInRecoveryVersion();
}
cacheProfile.setInRecovery(newInRecovery);
cacheProfile.inRecovery = getImageState().getInRecovery();
cacheProfile.isPersistent = getDataPolicy().withPersistence();
cacheProfile.setSubscriptionAttributes(getSubscriptionAttributes());

Expand Down
Loading

0 comments on commit bde1f71

Please sign in to comment.