Skip to content

Commit

Permalink
GEODE-7088: Using ConcurrentSets for interested clients
Browse files Browse the repository at this point in the history
  • Loading branch information
mcmellawatt authored Aug 21, 2019
1 parent 9368e09 commit 174af1d
Show file tree
Hide file tree
Showing 4 changed files with 160 additions and 74 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1822,8 +1822,8 @@ fromData,63
toData,59

org/apache/geode/internal/cache/tier/sockets/ClientUpdateMessageImpl,2
fromData,171
toData,162
fromData,172
toData,196

org/apache/geode/internal/cache/tier/sockets/HAEventWrapper,2
fromData,440
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@
import org.apache.geode.internal.cache.tier.MessageType;
import org.apache.geode.internal.cache.tier.OverflowAttributes;
import org.apache.geode.internal.cache.versions.VersionTag;
import org.apache.geode.internal.concurrent.ConcurrentHashSet;
import org.apache.geode.internal.logging.InternalLogWriter;
import org.apache.geode.internal.logging.LogService;
import org.apache.geode.internal.net.SocketCloser;
Expand Down Expand Up @@ -774,10 +775,13 @@ private boolean isClientPermitted(ClientRegistrationMetadata clientRegistrationM
private void incMessagesNotQueuedOriginatorStat(final InternalCacheEvent event,
final Set<ClientProxyMembershipID> ids) {
// don't send to member of origin
if (ids.remove(event.getContext())) {
CacheClientProxy ccp = getClientProxy(event.getContext());
if (ccp != null) {
ccp.getStatistics().incMessagesNotQueuedOriginator();
ClientProxyMembershipID eventOriginator = event.getContext();
if (eventOriginator != null) {
if (ids.remove(eventOriginator)) {
CacheClientProxy ccp = getClientProxy(eventOriginator);
if (ccp != null) {
ccp.getStatistics().incMessagesNotQueuedOriginator();
}
}
}
}
Expand Down Expand Up @@ -912,7 +916,7 @@ private void singletonRouteClientMessage(Conflatable conflatable,
* collection of non-durable identifiers of clients connected to this VM
*/
Set<ClientProxyMembershipID> getProxyIDs(Set mixedDurableAndNonDurableIDs) {
Set<ClientProxyMembershipID> result = new HashSet<>();
Set<ClientProxyMembershipID> result = new ConcurrentHashSet<>();
for (Object id : mixedDurableAndNonDurableIDs) {
if (id instanceof String) {
CacheClientProxy clientProxy = getClientProxy((String) id, true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.apache.geode.DataSerializer;
import org.apache.geode.GemFireIOException;
import org.apache.geode.InternalGemFireError;
import org.apache.geode.annotations.VisibleForTesting;
import org.apache.geode.cache.query.internal.cq.InternalCqQuery;
import org.apache.geode.cache.util.ObjectSizer;
import org.apache.geode.internal.ByteArrayDataInput;
Expand All @@ -46,6 +47,7 @@
import org.apache.geode.internal.cache.ha.HAContainerRegion;
import org.apache.geode.internal.cache.tier.MessageType;
import org.apache.geode.internal.cache.versions.VersionTag;
import org.apache.geode.internal.concurrent.ConcurrentHashSet;
import org.apache.geode.internal.logging.LogService;
import org.apache.geode.internal.size.Sizeable;

Expand Down Expand Up @@ -1125,39 +1127,7 @@ public void setClientCqs(ClientCqConcurrentMap clientCqs) {
}
}

/*
* private void writeCqInfo(ObjectOutput out) throws IOException { // Write Client CQ Size
* out.writeInt(this._clientCqs.size()); // For each client. Iterator entries =
* this._clientCqs.entrySet().iterator(); while (entries.hasNext()) { Map.Entry entry =
* (Map.Entry)entries.next();
*
* // Write ProxyId. ClientProxyMembershipID proxyId = (ClientProxyMembershipID)entry.getKey();
* proxyId.toData(out);
*
* HashMap cqs = (HashMap)entry.getValue(); // Write CQ size for each Client.
* out.writeInt(cqs.size()); Iterator clients = cqs.entrySet().iterator(); while
* (clients.hasNext()) { Map.Entry client = (Map.Entry)clients.next(); // Write CQ Name. String cq
* = (String)client.getKey(); out.writeObject(cq); // Write CQ OP. int cqOp =
* ((Integer)client.getValue()).intValue(); out.writeInt(cqOp); } } // while }
*/

/*
* private void readCqInfo(ObjectInput in) throws IOException, ClassNotFoundException { // Read
* Client CQ Size int numClientIds = in.readInt(); this._clientCqs = new HashMap();
*
* // For each Client. for (int cCnt=0; cCnt < numClientIds; cCnt++){ ClientProxyMembershipID
* proxyId = new ClientProxyMembershipID();
*
* // Read Proxy id. proxyId.fromData(in); // read CQ size for each Client. int numCqs =
* in.readInt(); HashMap cqs = new HashMap();
*
* for (int cqCnt=0; cqCnt < numCqs; cqCnt++){ // Get CQ Name and CQ Op. // Read CQ Name. String
* cqName = (String)in.readObject(); int cqOp = in.readInt();
*
* // Read CQ Op. cqs.put(cqName, Integer.valueOf(cqOp)); } this._clientCqs.put(proxyId, cqs); } }
*/

public void addClientInterestList(Set clientIds, boolean receiveValues) {
void addClientInterestList(Set<ClientProxyMembershipID> clientIds, boolean receiveValues) {
if (receiveValues) {
if (this._clientInterestList == null) {
this._clientInterestList = clientIds;
Expand All @@ -1175,23 +1145,16 @@ public void addClientInterestList(Set clientIds, boolean receiveValues) {

public void addClientInterestList(ClientProxyMembershipID clientId, boolean receiveValues) {
// This happens under synchronization on HAContainer.
HashSet<ClientProxyMembershipID> newInterests;
if (receiveValues) {
if (this._clientInterestList == null) {
newInterests = new HashSet<ClientProxyMembershipID>();
} else {
newInterests = new HashSet<ClientProxyMembershipID>(this._clientInterestList);
this._clientInterestList = new ConcurrentHashSet<>();
}
newInterests.add(clientId);
this._clientInterestList = newInterests;
this._clientInterestList.add(clientId);
} else {
if (this._clientInterestListInv == null) {
newInterests = new HashSet<ClientProxyMembershipID>();
} else {
newInterests = new HashSet<ClientProxyMembershipID>(this._clientInterestListInv);
this._clientInterestListInv = new ConcurrentHashSet<>();
}
newInterests.add(clientId);
this._clientInterestListInv = newInterests;
this._clientInterestListInv.add(clientId);
}
}

Expand All @@ -1209,6 +1172,16 @@ public boolean isClientInterestedInInvalidates(ClientProxyMembershipID clientId)
return (this._clientInterestListInv != null && this._clientInterestListInv.contains(clientId));
}

@VisibleForTesting
boolean hasClientsInterestedInUpdates() {
return this._clientInterestList != null;
}

@VisibleForTesting
boolean hasClientsInterestedInInvalidates() {
return this._clientInterestListInv != null;
}

protected Object deserialize(byte[] serializedBytes) {
Object deserializedObject = serializedBytes;
// This is a debugging method so ignore all exceptions like
Expand Down Expand Up @@ -1257,17 +1230,24 @@ public void toData(DataOutput out) throws IOException {
}
out.writeByte(_valueIsObject);
DataSerializer.writeObject(_membershipId, out);
// DataSerializer.writeObject(_eventIdentifier,out);
out.writeBoolean(_shouldConflate);
out.writeBoolean(_isInterestListPassed);
DataSerializer.writeByteArray(this.deltaBytes, out);
out.writeBoolean(_hasCqs);
// if (_hasCqs) {
// DataSerializer.writeHashMap(this._clientCqs, out);
// }
DataSerializer.writeObject(_callbackArgument, out);
DataSerializer.writeHashSet((HashSet) this._clientInterestList, out);
DataSerializer.writeHashSet((HashSet) this._clientInterestListInv, out);

HashSet<ClientProxyMembershipID> clientInterestListSnapshot =
this._clientInterestList != null
? new HashSet<>(this._clientInterestList)
: null;
DataSerializer.writeHashSet(clientInterestListSnapshot, out);

HashSet<ClientProxyMembershipID> clientInterestListInvSnapshot =
this._clientInterestListInv != null
? new HashSet<>(this._clientInterestListInv)
: null;
DataSerializer.writeHashSet(clientInterestListInvSnapshot, out);

DataSerializer.writeObject(this.versionTag, out);
}

Expand All @@ -1279,33 +1259,25 @@ public void fromData(DataInput in) throws IOException, ClassNotFoundException {
this._value = DataSerializer.readByteArray(in);
this._valueIsObject = in.readByte();
this._membershipId = ClientProxyMembershipID.readCanonicalized(in);
// this._eventIdentifier = (EventID)DataSerializer.readObject(in);;
this._shouldConflate = in.readBoolean();
this._isInterestListPassed = in.readBoolean();
this.deltaBytes = DataSerializer.readByteArray(in);
this._hasCqs = in.readBoolean();

// if (this._hasCqs) {
// this._clientCqs = DataSerializer.readHashMap(in);
// }
this._callbackArgument = DataSerializer.readObject(in);

CacheClientNotifier ccn = CacheClientNotifier.getInstance();

HashSet ids = DataSerializer.readHashSet(in);
Set<ClientProxyMembershipID> clientInterestList = DataSerializer.readHashSet(in);
this._clientInterestList = ccn != null && clientInterestList != null
? ccn.getProxyIDs(clientInterestList)
: null;

if (ccn != null && ids != null) { // use canonical IDs in servers
ids = (HashSet) ccn.getProxyIDs(ids);
}
this._clientInterestList = ids;

ids = DataSerializer.readHashSet(in);
if (ccn != null && ids != null) {
ids = (HashSet) ccn.getProxyIDs(ids);
}
this._clientInterestListInv = ids;
Set<ClientProxyMembershipID> clientInterestListInv = DataSerializer.readHashSet(in);
this._clientInterestListInv = ccn != null && clientInterestListInv != null
? ccn.getProxyIDs(clientInterestListInv)
: null;

this.versionTag = (VersionTag) DataSerializer.readObject(in);
this.versionTag = DataSerializer.readObject(in);
}

private Object getOriginalCallbackArgument() {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license
* agreements. See the NOTICE file distributed with this work for additional information regarding
* copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/

package org.apache.geode.internal.cache.tier.sockets;

import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.Assert.assertNotNull;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import static org.mockito.Mockito.withSettings;

import java.io.Serializable;

import org.junit.Test;

import org.apache.geode.CopyHelper;
import org.apache.geode.distributed.DistributedMember;
import org.apache.geode.distributed.DurableClientAttributes;
import org.apache.geode.internal.cache.EnumListenerEvent;
import org.apache.geode.internal.cache.LocalRegion;
import org.apache.geode.internal.statistics.StatisticsClock;
import org.apache.geode.test.fake.Fakes;

public class ClientUpdateMessageImplTest implements Serializable {
@Test
public void addInterestedClientTest() {
ClientUpdateMessageImpl clientUpdateMessageImpl = new ClientUpdateMessageImpl();
ClientProxyMembershipID clientProxyMembershipID = mock(ClientProxyMembershipID.class);

assertThat(clientUpdateMessageImpl.isClientInterestedInUpdates(clientProxyMembershipID))
.isFalse();
clientUpdateMessageImpl.addClientInterestList(clientProxyMembershipID, true);
assertThat(clientUpdateMessageImpl.isClientInterestedInUpdates(clientProxyMembershipID))
.isTrue();

assertThat(clientUpdateMessageImpl.isClientInterestedInInvalidates(clientProxyMembershipID))
.isFalse();
clientUpdateMessageImpl.addClientInterestList(clientProxyMembershipID, false);
assertThat(clientUpdateMessageImpl.isClientInterestedInInvalidates(clientProxyMembershipID))
.isTrue();
}

@Test
public void serializeClientUpdateMessageNullInterestLists() {
ClientUpdateMessageImpl clientUpdateMessageImpl = getTestClientUpdateMessage();

ClientUpdateMessageImpl clientUpdateMessageCopy = CopyHelper.copy(clientUpdateMessageImpl);

assertNotNull(clientUpdateMessageCopy);
assertThat(clientUpdateMessageCopy.hasClientsInterestedInUpdates()).isFalse();
assertThat(clientUpdateMessageCopy.hasClientsInterestedInInvalidates()).isFalse();
}

@Test
public void serializeClientUpdateMessageWithInterestLists() {
ClientUpdateMessageImpl clientUpdateMessageImpl = getTestClientUpdateMessage();

DistributedMember distributedMember =
mock(DistributedMember.class, withSettings().serializable());
when(distributedMember.getDurableClientAttributes())
.thenReturn(mock(DurableClientAttributes.class, withSettings().serializable()));

ClientProxyMembershipID interestedClientID = new ClientProxyMembershipID(distributedMember);

clientUpdateMessageImpl.addClientInterestList(interestedClientID, false);
clientUpdateMessageImpl.addClientInterestList(interestedClientID, true);

// This creates the CacheClientNotifier singleton which is null checked in
// ClientUpdateMessageImpl.fromData(), so we need to do this for serialization to
// succeed.
CacheClientNotifier cacheClientNotifier =
CacheClientNotifier.getInstance(Fakes.cache(), mock(StatisticsClock.class),
mock(CacheServerStats.class), 10, 10,
mock(ConnectionListener.class), null, true);

// Mock the deserializing side to include the cache client
// proxy with the interested client ID, so that the ID is added to the interest
// collection in the message copy
CacheClientProxy cacheClientProxy = mock(CacheClientProxy.class);
when(cacheClientProxy.getProxyID()).thenReturn(interestedClientID);

cacheClientNotifier.addClientProxy(cacheClientProxy);

ClientUpdateMessageImpl clientUpdateMessageCopy = CopyHelper.copy(clientUpdateMessageImpl);

assertNotNull(clientUpdateMessageCopy);
assertThat(clientUpdateMessageCopy.isClientInterestedInUpdates(interestedClientID)).isTrue();
assertThat(clientUpdateMessageCopy.isClientInterestedInInvalidates(interestedClientID))
.isTrue();
}

private ClientUpdateMessageImpl getTestClientUpdateMessage() {
LocalRegion localRegion = mock(LocalRegion.class);
String regionName = "regionName";
when(localRegion.getName()).thenReturn(regionName);
return new ClientUpdateMessageImpl(EnumListenerEvent.AFTER_CREATE, null, null);
}
}

0 comments on commit 174af1d

Please sign in to comment.