diff --git a/geode-core/src/main/java/org/apache/geode/distributed/internal/membership/InternalDistributedMember.java b/geode-core/src/main/java/org/apache/geode/distributed/internal/membership/InternalDistributedMember.java index 3e7db6d5d063..70d6979d5175 100755 --- a/geode-core/src/main/java/org/apache/geode/distributed/internal/membership/InternalDistributedMember.java +++ b/geode-core/src/main/java/org/apache/geode/distributed/internal/membership/InternalDistributedMember.java @@ -545,7 +545,7 @@ public String getUniqueId() { return memberIdentifier.getUniqueId(); } - public void setVersionForTest(Version v) { + public void setVersionForTest(KnownVersion v) { memberIdentifier.setVersionForTest(v); } diff --git a/geode-membership/src/integrationTest/java/org/apache/geode/distributed/internal/membership/gms/GMSMemberDataVersionJUnitTest.java b/geode-membership/src/integrationTest/java/org/apache/geode/distributed/internal/membership/gms/GMSMemberDataVersionJUnitTest.java index af27e49a3fef..7a6895b9d46f 100644 --- a/geode-membership/src/integrationTest/java/org/apache/geode/distributed/internal/membership/gms/GMSMemberDataVersionJUnitTest.java +++ b/geode-membership/src/integrationTest/java/org/apache/geode/distributed/internal/membership/gms/GMSMemberDataVersionJUnitTest.java @@ -85,6 +85,13 @@ public void testReadEssentialData() throws IOException, ClassNotFoundException { validate(newMember); } + @Test + public void testSetVersionOrdinal() { + final GMSMemberData memberData = new GMSMemberData(); + memberData.setVersionOrdinal(unknownVersionOrdinal); + validate(memberData); + } + private AbstractShortAssert validate(final MemberData memberData) { return assertThat(memberData.getVersionOrdinal()).isEqualTo(unknownVersionOrdinal); } diff --git a/geode-membership/src/integrationTest/java/org/apache/geode/distributed/internal/membership/gms/GMSMembershipJUnitTest.java b/geode-membership/src/integrationTest/java/org/apache/geode/distributed/internal/membership/gms/GMSMembershipJUnitTest.java index e7a2edd5f65c..49b8b3a09415 100644 --- a/geode-membership/src/integrationTest/java/org/apache/geode/distributed/internal/membership/gms/GMSMembershipJUnitTest.java +++ b/geode-membership/src/integrationTest/java/org/apache/geode/distributed/internal/membership/gms/GMSMembershipJUnitTest.java @@ -59,21 +59,12 @@ import org.apache.geode.distributed.internal.membership.gms.interfaces.JoinLeave; import org.apache.geode.distributed.internal.membership.gms.interfaces.Messenger; import org.apache.geode.internal.serialization.DSFIDSerializer; -import org.apache.geode.internal.serialization.KnownVersion; -import org.apache.geode.internal.serialization.Version; -import org.apache.geode.internal.serialization.Versioning; import org.apache.geode.internal.serialization.internal.DSFIDSerializerImpl; import org.apache.geode.test.junit.categories.MembershipTest; @Category({MembershipTest.class}) public class GMSMembershipJUnitTest { - private static final Version OLDER_THAN_CURRENT_VERSION = - Versioning.getVersion((short) (KnownVersion.CURRENT_ORDINAL - 1)); - private static final Version NEWER_THAN_CURRENT_VERSION = - Versioning.getVersion((short) (KnownVersion.CURRENT_ORDINAL + 1));; - private static final int DEFAULT_PORT = 8888; - private Services services; private MembershipConfig mockConfig; private Authenticator authenticator; @@ -148,7 +139,7 @@ public long getAckSevereAlertThreshold() { Random r = new Random(); mockMembers = new MemberIdentifier[5]; for (int i = 0; i < mockMembers.length; i++) { - mockMembers[i] = createMemberID(DEFAULT_PORT + i); + mockMembers[i] = createMemberID(8888 + i); uuid = new UUID(r.nextLong(), r.nextLong()); mockMembers[i].setUUID(uuid); } @@ -184,7 +175,7 @@ public void testSendMessage() throws Exception { MemberIdentifier myGMSMemberId = myMemberId; List gmsMembers = members.stream().map(x -> ((MemberIdentifier) x)).collect(Collectors.toList()); - manager.getGMSManager().installView(new GMSMembershipView<>(myGMSMemberId, 1, gmsMembers)); + manager.getGMSManager().installView(new GMSMembershipView(myGMSMemberId, 1, gmsMembers)); MemberIdentifier[] destinations = new MemberIdentifier[] {mockMembers[0]}; Set failures = manager.send(destinations, m); @@ -208,9 +199,9 @@ public void testStartupEvents() throws Exception { manager.getGMSManager().started(); manager.isJoining = true; - List viewMembers = + List viewmembers = Arrays.asList(new MemberIdentifier[] {mockMembers[0], myMemberId}); - manager.getGMSManager().installView(createView(myMemberId, 2, viewMembers)); + manager.getGMSManager().installView(createView(myMemberId, 2, viewmembers)); // add a surprise member that will be shunned due to it's having // an old view ID @@ -228,7 +219,7 @@ public void testStartupEvents() throws Exception { // suspect a member MemberIdentifier suspectMember = mockMembers[1]; manager.handleOrDeferSuspect( - new SuspectMember<>(mockMembers[0], suspectMember, "testing")); + new SuspectMember(mockMembers[0], suspectMember, "testing")); // suspect messages aren't queued - they're ignored before joining the system assertEquals(2, manager.getStartupEvents().size()); verify(listener, never()).memberSuspect(suspectMember, mockMembers[0], "testing"); @@ -241,9 +232,9 @@ public void testStartupEvents() throws Exception { assertEquals(3, manager.getStartupEvents().size()); // this view officially adds surpriseMember2 - viewMembers = Arrays + viewmembers = Arrays .asList(new MemberIdentifier[] {mockMembers[0], myMemberId, surpriseMember2}); - manager.handleOrDeferViewEvent(new MembershipView<>(myMemberId, 3, viewMembers)); + manager.handleOrDeferViewEvent(new MembershipView(myMemberId, 3, viewmembers)); assertEquals(4, manager.getStartupEvents().size()); // add a surprise member that will be shunned due to it's having @@ -256,13 +247,13 @@ public void testStartupEvents() throws Exception { // process a new view after we finish joining but before event processing has started manager.isJoining = false; mockMembers[4].setVmViewId(4); - viewMembers = Arrays.asList(new MemberIdentifier[] {mockMembers[0], myMemberId, + viewmembers = Arrays.asList(new MemberIdentifier[] {mockMembers[0], myMemberId, surpriseMember2, mockMembers[4]}); - manager.handleOrDeferViewEvent(new MembershipView<>(myMemberId, 4, viewMembers)); + manager.handleOrDeferViewEvent(new MembershipView(myMemberId, 4, viewmembers)); assertEquals(6, manager.getStartupEvents().size()); // exercise the toString methods for code coverage - for (StartupEvent ev : manager.getStartupEvents()) { + for (StartupEvent ev : manager.getStartupEvents()) { ev.toString(); } @@ -280,14 +271,14 @@ public void testStartupEvents() throws Exception { // for code coverage also install a view after we finish joining but before // event processing has started. This should notify the distribution manager // with a LocalViewMessage to process the view - manager.handleOrDeferViewEvent(new MembershipView<>(myMemberId, 5, viewMembers)); + manager.handleOrDeferViewEvent(new MembershipView(myMemberId, 5, viewmembers)); await().untilAsserted(() -> assertEquals(manager.getView().getViewId(), 5)); // process a suspect now - it will be passed to the listener reset(listener); suspectMember = mockMembers[1]; manager.handleOrDeferSuspect( - new SuspectMember<>(mockMembers[0], suspectMember, "testing")); + new SuspectMember(mockMembers[0], suspectMember, "testing")); verify(listener).memberSuspect(suspectMember, mockMembers[0], "testing"); } @@ -301,15 +292,15 @@ public void testAddressesWithoutUUIDs() throws Exception { manager.getGMSManager().started(); manager.isJoining = true; - List viewMembers = + List viewmembers = Arrays.asList(new MemberIdentifier[] {mockMembers[0], mockMembers[1], myMemberId}); - GMSMembershipView view = createView(myMemberId, 2, viewMembers); + GMSMembershipView view = createView(myMemberId, 2, viewmembers); manager.getGMSManager().installView(view); when(services.getJoinLeave().getView()).thenReturn(view); - MemberIdentifier[] destinations = new MemberIdentifier[viewMembers.size()]; + MemberIdentifier[] destinations = new MemberIdentifier[viewmembers.size()]; for (int i = 0; i < destinations.length; i++) { - MemberIdentifier id = viewMembers.get(i); + MemberIdentifier id = viewmembers.get(i); destinations[i] = createMemberID(id.getMembershipPort()); } manager.checkAddressesForUUIDs(destinations); @@ -337,83 +328,4 @@ public void noDispatchWhenSick() throws MemberShunnedException, MemberStartupExc assertThat(spy.getStartupEvents()).isEmpty(); } - @Test - public void testIsMulticastAllowedWithOldVersionSurpriseMember() { - MembershipView view = createMembershipView(); - manager.addSurpriseMember(createSurpriseMember(OLDER_THAN_CURRENT_VERSION)); - - manager.processView(view); - - assertThat(manager.getGMSManager().isMulticastAllowed()).isFalse(); - } - - @Test - public void testIsMulticastAllowedWithCurrentVersionSurpriseMember() { - MembershipView view = createMembershipView(); - manager.addSurpriseMember(createSurpriseMember(KnownVersion.CURRENT)); - - manager.processView(view); - - assertThat(manager.getGMSManager().isMulticastAllowed()).isTrue(); - } - - @Test - public void testIsMulticastAllowedWithNewVersionSurpriseMember() { - MembershipView view = createMembershipView(); - manager.addSurpriseMember(createSurpriseMember(NEWER_THAN_CURRENT_VERSION)); - - manager.processView(view); - - assertThat(manager.getGMSManager().isMulticastAllowed()).isTrue(); - } - - @Test - public void testIsMulticastAllowedWithOldVersionViewMember() { - MembershipView view = createMembershipView(); - view.getMembers().get(0).setVersionForTest(OLDER_THAN_CURRENT_VERSION); - - manager.processView(view); - - assertThat(manager.getGMSManager().isMulticastAllowed()).isFalse(); - } - - @Test - public void testMulticastAllowedWithCurrentVersionViewMember() { - MembershipView view = createMembershipView(); - - manager.processView(view); - - assertThat(manager.getGMSManager().isMulticastAllowed()).isTrue(); - } - - @Test - public void testMulticastAllowedWithNewVersionViewMember() { - MembershipView view = createMembershipView(); - view.getMembers().get(0).setVersionForTest(NEWER_THAN_CURRENT_VERSION); - - manager.processView(view); - - assertThat(manager.getGMSManager().isMulticastAllowed()).isTrue(); - } - - private MemberIdentifier createSurpriseMember(Version version) { - MemberIdentifier surpriseMember = createMemberID(DEFAULT_PORT + 5); - surpriseMember.setVmViewId(3); - surpriseMember.setVersionForTest(version); - return surpriseMember; - } - - private MembershipView createMembershipView() { - List viewMembers = createMemberIdentifiers(); - return new MembershipView<>(myMemberId, 2, viewMembers); - } - - private List createMemberIdentifiers() { - List viewMembers = new ArrayList<>(); - for (int i = 0; i < 2; ++i) { - MemberIdentifier memberIdentifier = createMemberID(DEFAULT_PORT + 6 + i); - viewMembers.add(memberIdentifier); - } - return viewMembers; - } } diff --git a/geode-membership/src/main/java/org/apache/geode/distributed/internal/membership/api/MemberData.java b/geode-membership/src/main/java/org/apache/geode/distributed/internal/membership/api/MemberData.java index a8d038ee2335..3c44b39514c8 100644 --- a/geode-membership/src/main/java/org/apache/geode/distributed/internal/membership/api/MemberData.java +++ b/geode-membership/src/main/java/org/apache/geode/distributed/internal/membership/api/MemberData.java @@ -21,6 +21,7 @@ import org.jgroups.util.UUID; import org.apache.geode.internal.serialization.DeserializationContext; +import org.apache.geode.internal.serialization.KnownVersion; import org.apache.geode.internal.serialization.SerializationContext; import org.apache.geode.internal.serialization.Version; @@ -49,6 +50,8 @@ public interface MemberData { String getUniqueTag(); + void setVersionOrdinal(short versionOrdinal); + void setUUID(UUID u); UUID getUUID(); @@ -89,7 +92,7 @@ public interface MemberData { void setVmKind(int vmKind); - void setVersion(Version v); + void setVersion(KnownVersion v); void setDirectChannelPort(int directPort); diff --git a/geode-membership/src/main/java/org/apache/geode/distributed/internal/membership/api/MemberIdentifier.java b/geode-membership/src/main/java/org/apache/geode/distributed/internal/membership/api/MemberIdentifier.java index 6b4546c32438..30cc8dd96c99 100644 --- a/geode-membership/src/main/java/org/apache/geode/distributed/internal/membership/api/MemberIdentifier.java +++ b/geode-membership/src/main/java/org/apache/geode/distributed/internal/membership/api/MemberIdentifier.java @@ -27,6 +27,7 @@ import org.apache.geode.internal.serialization.DataSerializableFixedID; import org.apache.geode.internal.serialization.DeserializationContext; +import org.apache.geode.internal.serialization.KnownVersion; import org.apache.geode.internal.serialization.SerializationContext; import org.apache.geode.internal.serialization.Version; @@ -190,7 +191,7 @@ void _readEssentialData(DataInput in, Function hostnameReso String getUniqueId(); - void setVersionForTest(Version v); + void setVersionForTest(KnownVersion v); void setUniqueTag(String tag); diff --git a/geode-membership/src/main/java/org/apache/geode/distributed/internal/membership/gms/GMSMemberData.java b/geode-membership/src/main/java/org/apache/geode/distributed/internal/membership/gms/GMSMemberData.java index cb0efe2593ed..9e349fbcb9fd 100644 --- a/geode-membership/src/main/java/org/apache/geode/distributed/internal/membership/gms/GMSMemberData.java +++ b/geode-membership/src/main/java/org/apache/geode/distributed/internal/membership/gms/GMSMemberData.java @@ -235,6 +235,11 @@ public String getUniqueTag() { return uniqueTag; } + @Override + public void setVersionOrdinal(short versionOrdinal) { + this.version = Versioning.getVersion(versionOrdinal); + } + @Override public void setUUID(UUID u) { if (u == null) { @@ -502,8 +507,8 @@ public void setVmKind(int vmKind) { @Override - public void setVersion(Version version) { - this.version = version; + public void setVersion(KnownVersion v) { + setVersionOrdinal(v.ordinal()); } @Override @@ -578,7 +583,7 @@ public void setHostName(String hostName) { @Override public void readEssentialData(DataInput in, DeserializationContext context) throws IOException, ClassNotFoundException { - setVersion(Versioning.getVersion(VersioningIO.readOrdinal(in))); + setVersionOrdinal(VersioningIO.readOrdinal(in)); int flags = in.readShort(); this.networkPartitionDetectionEnabled = (flags & NPD_ENABLED_BIT) != 0; diff --git a/geode-membership/src/main/java/org/apache/geode/distributed/internal/membership/gms/GMSMembership.java b/geode-membership/src/main/java/org/apache/geode/distributed/internal/membership/gms/GMSMembership.java index b4a91b85826b..f8ef7dd36c2f 100644 --- a/geode-membership/src/main/java/org/apache/geode/distributed/internal/membership/gms/GMSMembership.java +++ b/geode-membership/src/main/java/org/apache/geode/distributed/internal/membership/gms/GMSMembership.java @@ -39,7 +39,6 @@ import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.function.Supplier; import java.util.stream.Collectors; -import java.util.stream.Stream; import org.apache.logging.log4j.Logger; @@ -62,6 +61,7 @@ import org.apache.geode.distributed.internal.membership.api.StopShunningMarker; import org.apache.geode.distributed.internal.membership.gms.interfaces.Manager; import org.apache.geode.internal.serialization.KnownVersion; +import org.apache.geode.internal.serialization.Version; import org.apache.geode.logging.internal.executors.LoggingExecutors; import org.apache.geode.logging.internal.executors.LoggingThread; import org.apache.geode.util.internal.GeodeGlossary; @@ -359,7 +359,7 @@ boolean isDistributionMessage() { /** * Analyze a given view object, generate events as appropriate */ - public void processView(MembershipView newView) { + public void processView(long newViewId, MembershipView newView) { // Sanity check... if (logger.isDebugEnabled()) { StringBuilder msg = new StringBuilder(200); @@ -377,17 +377,35 @@ public void processView(MembershipView newView) { // incoming events will not be lost in terms of our global view. latestViewWriteLock.lock(); try { - setIsMulticastAllowedFrom(newView, surpriseMembers); + // first determine the version for multicast message serialization + Version version = KnownVersion.CURRENT; + for (final Entry internalIDLongEntry : surpriseMembers + .entrySet()) { + ID mbr = internalIDLongEntry.getKey(); + final Version itsVersion = mbr.getVersion(); + if (itsVersion != null && version.compareTo(itsVersion) < 0) { + version = itsVersion; + } + } + for (ID mbr : newView.getMembers()) { + final Version itsVersion = mbr.getVersion(); + if (itsVersion != null && itsVersion.compareTo(version) < 0) { + version = mbr.getVersion(); + } + } + disableMulticastForRollingUpgrade = !version.equals(KnownVersion.CURRENT); + // Save previous view, for delta analysis MembershipView priorView = latestView; - if (newView.getViewId() < priorView.getViewId()) { + if (newViewId < priorView.getViewId()) { // ignore this view since it is old news return; } // update the view to reflect our changes, so that // callbacks will see the new (updated) view. + long newlatestViewId = newViewId; MembershipView newlatestView = new MembershipView<>(newView, newView.getViewId()); // look for additions @@ -513,18 +531,6 @@ public void processView(MembershipView newView) { } } - private void setIsMulticastAllowedFrom(final MembershipView newView, - final Map surpriseMembers) { - disableMulticastForRollingUpgrade = - anyMemberHasOlderVersion( - Stream.concat(surpriseMembers.keySet().stream(), newView.getMembers().stream())); - } - - private boolean anyMemberHasOlderVersion(final Stream members) { - return members - .anyMatch(member -> KnownVersion.CURRENT.isNewerThan(member.getVersion())); - } - @Override public V doWithViewLocked(Supplier function) { latestViewReadLock.lock(); @@ -977,7 +983,7 @@ protected void handleOrDeferViewEvent(MembershipView viewArg) { } } - viewExecutor.submit(() -> processView(viewArg)); + viewExecutor.submit(() -> processView(viewArg.getViewId(), viewArg)); } finally { latestViewWriteLock.unlock(); @@ -1036,7 +1042,7 @@ private void processStartupEvent(StartupEvent o) { // message from non-member - ignore } } else if (o.isGmsView()) { // view event - processView(o.gmsView); + processView(o.gmsView.getViewId(), o.gmsView); } else if (o.isSurpriseConnect()) { // connect processSurpriseConnect(o.member); } else { diff --git a/geode-membership/src/main/java/org/apache/geode/distributed/internal/membership/gms/MemberIdentifierImpl.java b/geode-membership/src/main/java/org/apache/geode/distributed/internal/membership/gms/MemberIdentifierImpl.java index f227a976cf5f..daa1a20b8310 100644 --- a/geode-membership/src/main/java/org/apache/geode/distributed/internal/membership/gms/MemberIdentifierImpl.java +++ b/geode-membership/src/main/java/org/apache/geode/distributed/internal/membership/gms/MemberIdentifierImpl.java @@ -982,7 +982,7 @@ public String getUniqueId() { return sb.toString(); } - public void setVersionForTest(Version v) { + public void setVersionForTest(KnownVersion v) { memberData.setVersion(v); cachedToString = null; }