Skip to content

Commit

Permalink
GEODE-1571: post process for get, query, cq and register interest.
Browse files Browse the repository at this point in the history
  • Loading branch information
jinmeiliao committed Jun 30, 2016
1 parent beb50ad commit 39c5684
Show file tree
Hide file tree
Showing 18 changed files with 515 additions and 110 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,34 @@ protected static boolean processQueryUsingParams(Message msg, Query query,

if (result instanceof SelectResults) {
SelectResults selectResults = (SelectResults)result;

// post process, iterate through the result for post processing
List list = selectResults.asList();
for(Iterator<Object> valItr = list.iterator(); valItr.hasNext();){
Object value = valItr.next();
Object newValue = value;
if(value instanceof CqEntry){
CqEntry cqEntry = (CqEntry)value;
Object cqNewValue = GeodeSecurityUtil.postProcess(null, cqEntry.getKey(), cqEntry.getValue());
if(!cqEntry.getValue().equals(cqNewValue)){
selectResults.remove(value);
if(cqNewValue!=null){
selectResults.add(new CqEntry(cqEntry.getKey(), cqNewValue));
}
}
}
else {
newValue = GeodeSecurityUtil.postProcess(null, null, value);
if(!value.equals(newValue)){
selectResults.remove(value);
// only add the newValue back if it's not null
if(newValue!=null){
selectResults.add(newValue);
}
}
}
}

if (logger.isDebugEnabled()) {
logger.debug("Query Result size for : {} is {}", query.getQueryString(), selectResults.size());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import java.util.concurrent.TimeUnit;

import org.apache.logging.log4j.Logger;
import org.apache.shiro.subject.Subject;

import com.gemstone.gemfire.CancelException;
import com.gemstone.gemfire.DataSerializer;
Expand Down Expand Up @@ -394,12 +395,14 @@ protected void registerGFEClient(DataInputStream dis, DataOutputStream dos,
new IllegalArgumentException("Invalid conflation byte"), clientVersion);
return;
}


proxy = registerClient(socket, proxyID, proxy, isPrimary, clientConflation,
clientVersion, acceptorId, notifyBySubscription);

//TODO:hitesh
Properties credentials = HandShake.readCredentials(dis, dos,
authenticator, system);
if (credentials != null) {
if (credentials != null && proxy!=null) {
if (securityLogWriter.fineEnabled()) {
securityLogWriter.fine("CacheClientNotifier: verifying credentials for proxyID: " + proxyID);
}
Expand All @@ -424,11 +427,14 @@ protected void registerGFEClient(DataInputStream dis, DataOutputStream dos,
authzCallback = (AccessControl) authzMethod.invoke(null, (Object[]) null);
authzCallback.init(principal, member, this.getCache());
}
proxy.setPostAuthzCallback(authzCallback);
}
else if(subject instanceof Subject){
proxy.setSubject((Subject)subject);
}
}
}
catch (ClassNotFoundException e) {

throw new IOException(LocalizedStrings.CacheClientNotifier_CLIENTPROXYMEMBERSHIPID_OBJECT_COULD_NOT_BE_CREATED_EXCEPTION_OCCURRED_WAS_0.toLocalizedString(e));
}
catch (AuthenticationRequiredException ex) {
Expand All @@ -441,24 +447,19 @@ protected void registerGFEClient(DataInputStream dis, DataOutputStream dos,
writeException(dos, HandShake.REPLY_EXCEPTION_AUTHENTICATION_FAILED, ex, clientVersion);
return;
}
catch (Exception ex) {
logger.warn(LocalizedMessage.create(LocalizedStrings.CacheClientNotifier_AN_EXCEPTION_WAS_THROWN_FOR_CLIENT_0_1, new Object[] {proxyID, ""}), ex);
writeException(dos, Acceptor.UNSUCCESSFUL_SERVER_TO_CLIENT, ex, clientVersion);
return;
}
try {
proxy = registerClient(socket, proxyID, proxy, isPrimary, clientConflation,
clientVersion, acceptorId, notifyBySubscription);
}
catch (CacheException e) {
logger.warn(LocalizedMessage.create(LocalizedStrings.CacheClientNotifier_0_REGISTERCLIENT_EXCEPTION_ENCOUNTERED_IN_REGISTRATION_1, new Object[] {this, e}), e);
IOException io = new IOException(LocalizedStrings.CacheClientNotifier_EXCEPTION_OCCURRED_WHILE_TRYING_TO_REGISTER_INTEREST_DUE_TO_0.toLocalizedString(e.getMessage()));
io.initCause(e);
throw io;
}
if (authzCallback != null && proxy != null) {
proxy.setPostAuthzCallback(authzCallback);
catch (Exception ex) {
logger.warn(LocalizedMessage.create(LocalizedStrings.CacheClientNotifier_AN_EXCEPTION_WAS_THROWN_FOR_CLIENT_0_1, new Object[] {proxyID, ""}), ex);
writeException(dos, Acceptor.UNSUCCESSFUL_SERVER_TO_CLIENT, ex, clientVersion);
return;
}


this._statistics.endClientRegistration(startTime);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,55 @@

package com.gemstone.gemfire.internal.cache.tier.sockets;

import java.io.ByteArrayInputStream;
import java.io.DataInputStream;
import java.io.IOException;
import java.net.Socket;
import java.net.SocketException;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.regex.Pattern;

import org.apache.logging.log4j.Logger;
import org.apache.shiro.subject.Subject;
import org.apache.shiro.util.ThreadState;

import com.gemstone.gemfire.CancelException;
import com.gemstone.gemfire.DataSerializer;
import com.gemstone.gemfire.StatisticsFactory;
import com.gemstone.gemfire.cache.*;
import com.gemstone.gemfire.cache.Cache;
import com.gemstone.gemfire.cache.CacheClosedException;
import com.gemstone.gemfire.cache.CacheException;
import com.gemstone.gemfire.cache.ClientSession;
import com.gemstone.gemfire.cache.DynamicRegionFactory;
import com.gemstone.gemfire.cache.InterestRegistrationEvent;
import com.gemstone.gemfire.cache.InterestResultPolicy;
import com.gemstone.gemfire.cache.Region;
import com.gemstone.gemfire.cache.RegionDestroyedException;
import com.gemstone.gemfire.cache.RegionExistsException;
import com.gemstone.gemfire.cache.client.internal.RegisterInterestTracker;
import com.gemstone.gemfire.cache.operations.*;
import com.gemstone.gemfire.cache.operations.DestroyOperationContext;
import com.gemstone.gemfire.cache.operations.InvalidateOperationContext;
import com.gemstone.gemfire.cache.operations.OperationContext;
import com.gemstone.gemfire.cache.operations.PutOperationContext;
import com.gemstone.gemfire.cache.operations.RegionClearOperationContext;
import com.gemstone.gemfire.cache.operations.RegionCreateOperationContext;
import com.gemstone.gemfire.cache.operations.RegionDestroyOperationContext;
import com.gemstone.gemfire.cache.query.CqException;
import com.gemstone.gemfire.cache.query.internal.cq.CqService;
import com.gemstone.gemfire.cache.query.internal.cq.InternalCqQuery;
Expand All @@ -34,8 +77,21 @@
import com.gemstone.gemfire.internal.SystemTimer;
import com.gemstone.gemfire.internal.SystemTimer.SystemTimerTask;
import com.gemstone.gemfire.internal.Version;
import com.gemstone.gemfire.internal.cache.*;
import com.gemstone.gemfire.internal.cache.CacheDistributionAdvisee;
import com.gemstone.gemfire.internal.cache.CacheDistributionAdvisor.InitialImageAdvice;
import com.gemstone.gemfire.internal.cache.ClientServerObserver;
import com.gemstone.gemfire.internal.cache.ClientServerObserverHolder;
import com.gemstone.gemfire.internal.cache.Conflatable;
import com.gemstone.gemfire.internal.cache.DistributedRegion;
import com.gemstone.gemfire.internal.cache.EntryEventImpl;
import com.gemstone.gemfire.internal.cache.EnumListenerEvent;
import com.gemstone.gemfire.internal.cache.EventID;
import com.gemstone.gemfire.internal.cache.FilterProfile;
import com.gemstone.gemfire.internal.cache.GemFireCacheImpl;
import com.gemstone.gemfire.internal.cache.InterestRegistrationEventImpl;
import com.gemstone.gemfire.internal.cache.LocalRegion;
import com.gemstone.gemfire.internal.cache.PartitionedRegion;
import com.gemstone.gemfire.internal.cache.StateFlushOperation;
import com.gemstone.gemfire.internal.cache.ha.HAContainerWrapper;
import com.gemstone.gemfire.internal.cache.ha.HARegionQueue;
import com.gemstone.gemfire.internal.cache.ha.HARegionQueueAttributes;
Expand All @@ -51,24 +107,8 @@
import com.gemstone.gemfire.internal.logging.log4j.LocalizedMessage;
import com.gemstone.gemfire.internal.logging.log4j.LogMarker;
import com.gemstone.gemfire.internal.security.AuthorizeRequestPP;
import com.gemstone.gemfire.internal.security.GeodeSecurityUtil;
import com.gemstone.gemfire.security.AccessControl;
import org.apache.logging.log4j.Logger;

import java.io.ByteArrayInputStream;
import java.io.DataInputStream;
import java.io.IOException;
import java.net.Socket;
import java.net.SocketException;
import java.nio.ByteBuffer;
import java.util.*;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.regex.Pattern;

/**
* Class <code>CacheClientProxy</code> represents the server side of the
Expand Down Expand Up @@ -223,6 +263,7 @@ public class CacheClientProxy implements ClientSession {
boolean keepalive = false;

private AccessControl postAuthzCallback;
private Subject subject;

/**
* For multiuser environment..
Expand Down Expand Up @@ -362,6 +403,16 @@ public void setPostAuthzCallback(AccessControl authzCallback) {
this.postAuthzCallback = authzCallback;
}
}

public void setSubject(Subject subject) {
//TODO:hitesh synchronization
synchronized (this.clientUserAuthsLock) {
if (this.subject != null) {
subject.logout();
}
this.subject = subject;
}
}

public void setCQVsUserAuth(String cqName, long uniqueId, boolean isDurable)
{
Expand Down Expand Up @@ -1591,6 +1642,7 @@ else if (this.postAuthzCallback != null) {
*/
protected void deliverMessage(Conflatable conflatable)
{
ThreadState state = GeodeSecurityUtil.bindSubject(this.subject);
ClientUpdateMessage clientMessage = null;
if(conflatable instanceof HAEventWrapper) {
clientMessage = ((HAEventWrapper)conflatable).getClientUpdateMessage();
Expand All @@ -1600,6 +1652,21 @@ protected void deliverMessage(Conflatable conflatable)

this._statistics.incMessagesReceived();

// post process
Object oldValue = clientMessage.getValue();
if(oldValue instanceof byte[]){
EntryEventImpl.deserialize((byte[])oldValue);
Object newValue = GeodeSecurityUtil.postProcess(clientMessage.getRegionName(),
clientMessage.getKeyOfInterest(),
EntryEventImpl.deserialize((byte[])oldValue));
clientMessage.setLatestValue(EntryEventImpl.serialize(newValue));
}
else{
Object newValue = GeodeSecurityUtil.postProcess(clientMessage.getRegionName(),
clientMessage.getKeyOfInterest(), oldValue);
clientMessage.setLatestValue(newValue);
}

if (clientMessage.needsNoAuthorizationCheck() || postDeliverAuthCheckPassed(clientMessage)) {
// If dispatcher is getting initialized, add the event to temporary queue.
if (this.messageDispatcherInit) {
Expand All @@ -1625,6 +1692,9 @@ protected void deliverMessage(Conflatable conflatable)
} else {
this._statistics.incMessagesFailedQueued();
}

if(state!=null)
state.clear();
}

protected void sendMessageDirectly(ClientMessage message) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
import com.gemstone.gemfire.internal.logging.LogService;
import com.gemstone.gemfire.internal.security.AuthorizeRequest;
import com.gemstone.gemfire.internal.security.AuthorizeRequestPP;
import com.gemstone.gemfire.security.NotAuthorizedException;

public class ClientUserAuths
{
Expand Down Expand Up @@ -133,11 +132,6 @@ public void setUserAuthAttributesForCq(String cqName, long uniqueId, boolean isD
}
}
}
else
{
//TODO:throw not authorized exception? will this ever happen??
throw new NotAuthorizedException("User is not authorized for CQ");
}
}

public void removeUserAuthAttributesForCq(String cqName, boolean isDurable)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -630,15 +630,6 @@ private boolean acceptHandShake(byte epType, int qSize)
}
return true;
}

// public static AuthorizeRequestPP getPostAuthorizeCallback(ClientProxyMembershipID proxyId, String cqName)
// {
// ClientUserAuths cua = proxyIdVsClientUserAuths.get(proxyId);
// UserAuthAttributes uaa = cua.getUserAuthAttributes(cqName);
// if (uaa != null)
// return uaa.getPostAuthzRequest();
// return null;
// }

public void setCq(String cqName, boolean isDurable) throws Exception
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -343,7 +343,8 @@ public static Object postProcess(String regionPath, Object key, Object result){
if(subject == null)
return result;

return postProcessor.processRegionValue((Principal)subject.getPrincipal(), regionPath, key, result);
String regionName = StringUtils.stripStart(regionPath, "/");
return postProcessor.processRegionValue((Principal)subject.getPrincipal(), regionName, key, result);
}

public static Object getObject(String factoryName) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,5 +22,5 @@

public interface PostProcessor {
void init(Properties securityProps);
Object processRegionValue(Principal principal, String regionPath, Object key, Object value);
Object processRegionValue(Principal principal, String regionName, Object key, Object value);
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,9 @@ public void init(final Properties securityProps) {

@Override
public Object processRegionValue(Principal principal,
String regionPath,
String regionName,
Object key,
Object value) {
return principal.getName()+"/"+regionPath+"/"+key+"/"+value;
return principal.getName()+"/"+regionName+"/"+key+"/"+value;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,16 @@
*/
package com.gemstone.gemfire.management.internal.security;

import com.gemstone.gemfire.internal.AvailablePort;
import com.gemstone.gemfire.management.CacheServerMXBean;
import com.gemstone.gemfire.test.junit.categories.IntegrationTest;
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;

import com.gemstone.gemfire.internal.AvailablePort;
import com.gemstone.gemfire.management.CacheServerMXBean;
import com.gemstone.gemfire.test.junit.categories.IntegrationTest;

@Category(IntegrationTest.class)
public class CacheServerMBeanAuthenticationJUnitTest {
private static int jmxManagerPort = AvailablePort.getRandomAvailablePort(AvailablePort.SOCKET);
Expand All @@ -33,7 +34,7 @@ public class CacheServerMBeanAuthenticationJUnitTest {

@ClassRule
public static JsonAuthorizationCacheStartRule serverRule = new JsonAuthorizationCacheStartRule(
jmxManagerPort, "cacheServer.json", false);
jmxManagerPort, "cacheServer.json");

@Rule
public MBeanServerConnectionRule connectionRule = new MBeanServerConnectionRule(jmxManagerPort);
Expand Down
Loading

0 comments on commit 39c5684

Please sign in to comment.