Skip to content

Commit

Permalink
GEODE-1569: post process for serialized domain objects
Browse files Browse the repository at this point in the history
* for client/server retreival, post process the value before it was put into the message
* for gfsh commands, post process the value before it was put into the command result json
  • Loading branch information
jinmeiliao committed Aug 1, 2016
1 parent c7530d0 commit 90e00bf
Show file tree
Hide file tree
Showing 15 changed files with 479 additions and 138 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -157,39 +157,14 @@ protected static boolean processQueryUsingParams(Message msg, Query query,
if (result instanceof SelectResults) {
SelectResults selectResults = (SelectResults)result;

// post process, iterate through the result for post processing
if(GeodeSecurityUtil.needPostProcess()) {
List list = selectResults.asList();
for (Iterator<Object> valItr = list.iterator(); valItr.hasNext(); ) {
Object value = valItr.next();
if (value == null)
continue;

if (value instanceof CqEntry) {
CqEntry cqEntry = (CqEntry) value;
Object cqNewValue = GeodeSecurityUtil.postProcess(null, cqEntry.getKey(), cqEntry.getValue());
if (!cqEntry.getValue().equals(cqNewValue)) {
selectResults.remove(value);
selectResults.add(new CqEntry(cqEntry.getKey(), cqNewValue));
}
} else {
Object newValue = GeodeSecurityUtil.postProcess(null, null, value);
if (!value.equals(newValue)) {
selectResults.remove(value);
selectResults.add(newValue);
}
}
}
}

if (logger.isDebugEnabled()) {
logger.debug("Query Result size for : {} is {}", query.getQueryString(), selectResults.size());
}

CollectionType collectionType = null;
boolean sendCqResultsWithKey = true;
boolean isStructs = false;

// check if resultset has serialized objects, so that they could be sent
// as ObjectPartList
boolean hasSerializedObjects = ((DefaultQuery) query)
Expand All @@ -201,7 +176,7 @@ protected static boolean processQueryUsingParams(Message msg, Query query,
// The results in a StructSet are stored in Object[]s
// Get them as Object[]s for the objs[] in order to avoid duplicating
// the StructTypes

// Object[] objs = new Object[selectResults.size()];
// Get the collection type (which includes the element type)
// (used to generate the appropriate instance on the client)
Expand All @@ -215,23 +190,23 @@ protected static boolean processQueryUsingParams(Message msg, Query query,
if (cqQuery != null){
// Check if the key can be sent to the client based on its version.
sendCqResultsWithKey = sendCqResultsWithKey(servConn);

if (sendCqResultsWithKey){
// Update the collection type to include key info.
collectionType = new CollectionTypeImpl(Collection.class,
collectionType = new CollectionTypeImpl(Collection.class,
new StructTypeImpl(new String[]{"key", "value"}));
isStructs = collectionType.getElementType().isStructType();
isStructs = collectionType.getElementType().isStructType();
}
}

int numberOfChunks = (int)Math.ceil(selectResults.size() * 1.0
/ maximumChunkSize);

if (logger.isTraceEnabled()) {
logger.trace("{}: Query results size: {}: Entries in chunk: {}: Number of chunks: {}",
servConn.getName(), selectResults.size(), maximumChunkSize, numberOfChunks);
}

long oldStart = start;
start = DistributionStats.getStatTime();
stats.incProcessQueryTime(start - oldStart);
Expand Down Expand Up @@ -265,12 +240,12 @@ protected static boolean processQueryUsingParams(Message msg, Query query,
isStructs, collectionType, queryString, cqQuery, sendCqResultsWithKey, sendResults);
}
}

if(cqQuery != null){
// Set the CQ query result cache initialized flag.
cqQuery.setCqResultsCacheInitialized();
}

}
else if (result instanceof Integer) {
if (sendResults) {
Expand Down Expand Up @@ -319,7 +294,7 @@ else if (result instanceof Integer) {
return false;
} finally {
// Since the query object is being shared in case of bind queries,
// resetting the flag may cause inconsistency.
// resetting the flag may cause inconsistency.
// Also since this flag is only being set in code path executed by
// remote query execution, resetting it is not required.

Expand All @@ -333,7 +308,7 @@ else if (result instanceof Integer) {
stats.incWriteQueryResponseTime(DistributionStats.getStatTime() - start);
return true;
}

private static boolean sendCqResultsWithKey(ServerConnection servConn) {
Version clientVersion = servConn.getClientVersion();
if (clientVersion.compareTo(Version.GFE_65) >= 0) {
Expand Down Expand Up @@ -390,9 +365,9 @@ protected static void sendCqResponse(int msgType, String msgStr, int txId,
logger.debug("CQ Response sent successfully");
}
}

private static void sendResultsAsObjectArray(SelectResults selectResults,
int numberOfChunks, ServerConnection servConn,
int numberOfChunks, ServerConnection servConn,
boolean isStructs, CollectionType collectionType, String queryString, ServerCQ cqQuery, boolean sendCqResultsWithKey, boolean sendResults)
throws IOException {
int resultIndex = 0;
Expand Down Expand Up @@ -422,7 +397,7 @@ private static void sendResultsAsObjectArray(SelectResults selectResults,
// that results[i] is not null.
i--;
continue;
}
}
// Add the key into CQ results cache.
// For PR the Result caching is not yet supported.
// cqQuery.cqResultsCacheInitialized is added to take care
Expand All @@ -431,13 +406,13 @@ private static void sendResultsAsObjectArray(SelectResults selectResults,
if (!cqQuery.isPR()) {
cqQuery.addToCqResultKeys(e.getKey());
}

// Add to the Results object array.
if (sendCqResultsWithKey) {
results[i] = e.getKeyValuePair();
} else {
results[i] = e.getValue();
}
}
} else {
// instance check added to fix bug 40516.
if (isStructs && (objs[resultIndex] instanceof Struct)) {
Expand Down Expand Up @@ -468,7 +443,7 @@ private static void sendResultsAsObjectArray(SelectResults selectResults,
if (sendResults) {
writeQueryResponseChunk(results, collectionType,
(resultIndex == selectResults.size()), servConn);

if (logger.isDebugEnabled()) {
logger.debug("{}: Sent chunk ({} of {}) of query response for query: {}",
servConn.getName(), (j + 1), numberOfChunks, queryString);
Expand Down Expand Up @@ -507,7 +482,8 @@ private static void sendResultsAsObjectPartList(int numberOfChunks,
if (e.getValue() == null) {
resultIndex++;
continue;
}
}

// Add the key into CQ results cache.
// For PR the Result caching is not yet supported.
// cqQuery.cqResultsCacheInitialized is added to take care
Expand All @@ -516,13 +492,13 @@ private static void sendResultsAsObjectPartList(int numberOfChunks,
if (!cqQuery.isPR()) {
cqQuery.addToCqResultKeys(e.getKey());
}

// Add to the Results object array.
if (sendCqResultsWithKey) {
result = e.getKeyValuePair();
} else {
result = e.getValue();
}
}
}
else {
result = objs.get(resultIndex);
Expand All @@ -533,7 +509,7 @@ private static void sendResultsAsObjectPartList(int numberOfChunks,
}
resultIndex++;
}

if (sendResults) {
writeQueryResponseChunk(serializedObjs, collectionType,
((j + 1) == numberOfChunks), servConn);
Expand All @@ -545,24 +521,17 @@ private static void sendResultsAsObjectPartList(int numberOfChunks,
}
}
}

private static void addToObjectPartList(ObjectPartList serializedObjs,
Object res, CollectionType collectionType, boolean lastChunk,
ServerConnection servConn, boolean isStructs) throws IOException {

if (isStructs && (res instanceof Struct)) {
Object[] values = ((Struct) res).getFieldValues();
// create another ObjectPartList for the struct
ObjectPartList serializedValueObjs = new ObjectPartList(values.length,
false);
for (Object value : values) {
if (value instanceof CachedDeserializable) {
serializedValueObjs.addPart(null,
((CachedDeserializable) value).getSerializedValue(),
ObjectPartList.OBJECT, null);
} else {
addDeSerializedObjectToObjectPartList(serializedValueObjs, value);
}
addObjectToPartList(serializedValueObjs, null, value);
}
serializedObjs.addPart(null, serializedValueObjs, ObjectPartList.OBJECT,
null);
Expand All @@ -571,33 +540,33 @@ private static void addToObjectPartList(ObjectPartList serializedObjs,
// create another ObjectPartList for the Object[]
ObjectPartList serializedValueObjs = new ObjectPartList(values.length,
false);
for (Object value : values) {
if (value instanceof CachedDeserializable) {
serializedValueObjs.addPart(null,
((CachedDeserializable) value).getSerializedValue(),
ObjectPartList.OBJECT, null);
} else {
addDeSerializedObjectToObjectPartList(serializedValueObjs, value);
}
for(int i=0; i<values.length; i+=2) {
Object key = values[i];
Object value = values[i+1];
addObjectToPartList(serializedValueObjs, key, value);
}
serializedObjs.addPart(null, serializedValueObjs, ObjectPartList.OBJECT,
null);
} else if (res instanceof CachedDeserializable) {
serializedObjs.addPart(null,
((CachedDeserializable) res).getSerializedValue(),
ObjectPartList.OBJECT, null);
} else { // for deserialized objects
addDeSerializedObjectToObjectPartList(serializedObjs, res);
} else { //for deserialized objects
addObjectToPartList(serializedObjs, null, res);
}
}

private static void addDeSerializedObjectToObjectPartList(
ObjectPartList objPartList, Object obj) {
if (obj instanceof byte[]) {
objPartList.addPart(null, obj, ObjectPartList.BYTES, null);
} else {
objPartList.addPart(null, obj, ObjectPartList.OBJECT, null);

private static void addObjectToPartList(ObjectPartList objPartList, Object key, Object value) {
Object object = value;
boolean isObject = true;
if (value instanceof CachedDeserializable) {
object = ((CachedDeserializable)value).getSerializedValue();
}
else if(value instanceof byte[]){
isObject = false;
}

object = GeodeSecurityUtil.postProcess(null, key, object, isObject);
if(key!=null){
objPartList.addPart(null, key, ObjectPartList.OBJECT, null);
}
objPartList.addPart(null, object, isObject?ObjectPartList.OBJECT:ObjectPartList.BYTES, null);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@

import com.gemstone.gemfire.CancelException;
import com.gemstone.gemfire.DataSerializer;
import com.gemstone.gemfire.GemFireIOException;
import com.gemstone.gemfire.StatisticsFactory;
import com.gemstone.gemfire.cache.Cache;
import com.gemstone.gemfire.cache.CacheClosedException;
Expand Down Expand Up @@ -84,7 +83,6 @@
import com.gemstone.gemfire.internal.cache.ClientServerObserverHolder;
import com.gemstone.gemfire.internal.cache.Conflatable;
import com.gemstone.gemfire.internal.cache.DistributedRegion;
import com.gemstone.gemfire.internal.cache.EntryEventImpl;
import com.gemstone.gemfire.internal.cache.EnumListenerEvent;
import com.gemstone.gemfire.internal.cache.EventID;
import com.gemstone.gemfire.internal.cache.FilterProfile;
Expand All @@ -109,7 +107,6 @@
import com.gemstone.gemfire.internal.logging.log4j.LogMarker;
import com.gemstone.gemfire.internal.security.AuthorizeRequestPP;
import com.gemstone.gemfire.internal.security.GeodeSecurityUtil;
import com.gemstone.gemfire.internal.util.BlobHelper;
import com.gemstone.gemfire.security.AccessControl;

/**
Expand Down Expand Up @@ -1678,18 +1675,8 @@ protected void deliverMessage(Conflatable conflatable)
// post process
if(GeodeSecurityUtil.needPostProcess()) {
Object oldValue = clientMessage.getValue();
if (clientMessage.valueIsObject()) {
Object newValue = GeodeSecurityUtil.postProcess(clientMessage.getRegionName(), clientMessage.getKeyOfInterest(), EntryEventImpl
.deserialize((byte[]) oldValue));
try {
clientMessage.setLatestValue(BlobHelper.serializeToBlob(newValue));
} catch (IOException e) {
throw new GemFireIOException("Exception serializing entry value", e);
}
} else {
Object newValue = GeodeSecurityUtil.postProcess(clientMessage.getRegionName(), clientMessage.getKeyOfInterest(), oldValue);
clientMessage.setLatestValue(newValue);
}
Object newValue = GeodeSecurityUtil.postProcess(clientMessage.getRegionName(), clientMessage.getKeyOfInterest(), oldValue, clientMessage.valueIsObject());
clientMessage.setLatestValue(newValue);
}

if (clientMessage.needsNoAuthorizationCheck() || postDeliverAuthCheckPassed(clientMessage)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,7 @@ public void cmdExecute(Message msg, ServerConnection servConn, long startparam)
}

// post process
data = GeodeSecurityUtil.postProcess(regionName, key, data);
data = GeodeSecurityUtil.postProcess(regionName, key, data, entry.isObject);

long oldStart = start;
start = DistributionStats.getStatTime();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -242,7 +242,7 @@ private void fillAndSendGetAllResponseChunks(Region region,
}

// post process
value = GeodeSecurityUtil.postProcess(regionName, key, value);
value = GeodeSecurityUtil.postProcess(regionName, key, value, isObject);

if (logger.isDebugEnabled()) {
logger.debug("{}: Returning value for key={}: {}", servConn.getName(), key, value);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -242,7 +242,7 @@ private void fillAndSendGetAllResponseChunks(Region region,
}
}

value = GeodeSecurityUtil.postProcess(regionName, key, value);
value = GeodeSecurityUtil.postProcess(regionName, key, value, isObject);

if (isDebugEnabled) {
logger.debug("{}: Returning value for key={}: {}", servConn.getName(), key, value);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -272,7 +272,7 @@ private void fillAndSendGetAllResponseChunks(Region region,
}
}

data = GeodeSecurityUtil.postProcess(regionName, key, data);
data = GeodeSecurityUtil.postProcess(regionName, key, data, entry.isObject);

// Add the entry to the list that will be returned to the client
if (keyNotPresent) {
Expand Down
Loading

0 comments on commit 90e00bf

Please sign in to comment.