Skip to content

Commit

Permalink
GEODE-10097: Avoid Thread.sleep for re-auth in MessageDispatcher (apa…
Browse files Browse the repository at this point in the history
  • Loading branch information
jinmeiliao authored Apr 11, 2022
1 parent 934f517 commit 4fbc35c
Show file tree
Hide file tree
Showing 7 changed files with 111 additions and 40 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import java.util.stream.IntStream;

import org.junit.After;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.contrib.java.lang.system.RestoreSystemProperties;
Expand Down Expand Up @@ -65,6 +66,12 @@ public class AuthExpirationDistributedTest {

private ClientVM clientVM;

@Before
public void before() throws Exception {
// this is enabled to show how many times authorize call is made with each permission key
getSecurityManager().setAllowDuplicate(true);
}

@After
public void after() {
if (clientVM != null) {
Expand Down Expand Up @@ -109,7 +116,7 @@ public void cqClientWillReAuthenticateAutomatically() throws Exception {
Map<String, List<String>> authorizedOps = getSecurityManager().getAuthorizedOps();
assertThat(authorizedOps.keySet().size()).isEqualTo(2);
assertThat(authorizedOps.get("user1")).asList().containsExactly("DATA:READ:region",
"DATA:READ:region:1");
"DATA:READ:region", "DATA:READ:region:1");
assertThat(authorizedOps.get("user2")).asList().containsExactly("DATA:READ:region:2");

Map<String, List<String>> unAuthorizedOps = getSecurityManager().getUnAuthorizedOps();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -723,6 +723,13 @@ public boolean isWaitingForReAuthentication() {
return _messageDispatcher.isWaitingForReAuthentication();
}

public void notifyReAuthentication() {
if (_messageDispatcher == null) {
return;
}
_messageDispatcher.notifyReAuthentication();
}

/**
* Returns whether the proxy is paused. It is paused if its message dispatcher is paused. This
* only applies to durable clients.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@ public class MessageDispatcher extends LoggingThread {
private volatile boolean _isStopped = true;

private volatile long waitForReAuthenticationStartTime = -1;
private final Object reAuthenticationLock = new Object();

/**
* A lock object used to control pausing this dispatcher
Expand Down Expand Up @@ -193,6 +194,15 @@ public boolean isWaitingForReAuthentication() {
return waitForReAuthenticationStartTime > 0;
}

private boolean subjectUpdated = false;

public void notifyReAuthentication() {
synchronized (reAuthenticationLock) {
subjectUpdated = true;
reAuthenticationLock.notifyAll();
}
}

private CacheClientProxy getProxy() {
return _proxy;
}
Expand Down Expand Up @@ -361,9 +371,6 @@ protected void runDispatcher() {
logger.debug("{}: Beginning to process events", this);
}

long reAuthenticateWaitTime =
getSystemProperty(RE_AUTHENTICATE_WAIT_TIME, DEFAULT_RE_AUTHENTICATE_WAIT_TIME);

ClientMessage clientMessage = null;

while (!isStopped()) {
Expand Down Expand Up @@ -432,34 +439,8 @@ protected void runDispatcher() {
_messageQueue.remove();
clientMessage = null;
} catch (AuthenticationExpiredException expired) {
if (waitForReAuthenticationStartTime == -1) {
waitForReAuthenticationStartTime = System.currentTimeMillis();
// only send the message to clients who can handle the message
if (getProxy().getVersion().isNewerThanOrEqualTo(RE_AUTHENTICATION_START_VERSION)) {
EventID eventId = createEventId();
sendMessageDirectly(new ClientReAuthenticateMessage(eventId));
}
// We wait for all versions of clients to re-authenticate. For older clients we still
// wait, just in case client will perform some operations to
// trigger credential refresh on its own.
Thread.sleep(200);
} else {
long elapsedTime = System.currentTimeMillis() - waitForReAuthenticationStartTime;
if (elapsedTime > reAuthenticateWaitTime) {
// reset the timer here since we are no longer waiting for re-auth to happen anymore
waitForReAuthenticationStartTime = -1;
synchronized (_stopDispatchingLock) {
logger.warn("Client did not re-authenticate back successfully in " + elapsedTime
+ "ms. Unregister this client proxy.");
pauseOrUnregisterProxy(expired);
}
exceptionOccurred = true;
} else {
Thread.sleep(200);
}
}
exceptionOccurred = handleAuthenticationExpiredException(expired);
}

} catch (MessageTooLargeException e) {
logger.warn("Message too large to send to client: {}, {}", clientMessage, e.getMessage());
} catch (IOException e) {
Expand Down Expand Up @@ -548,6 +529,48 @@ protected void runDispatcher() {
}
}

private boolean handleAuthenticationExpiredException(AuthenticationExpiredException expired)
throws InterruptedException {
long reAuthenticateWaitTime =
getSystemProperty(RE_AUTHENTICATE_WAIT_TIME, DEFAULT_RE_AUTHENTICATE_WAIT_TIME);
synchronized (reAuthenticationLock) {
// turn on the "isWaitingForReAuthentication" flag before we send the re-auth message
// if we do it the other way around, the re-auth might be finished before we turn on the
// flag for the notification to happen.
waitForReAuthenticationStartTime = System.currentTimeMillis();
subjectUpdated = false;
// only send the message to clients who can handle the message
if (getProxy().getVersion().isNewerThanOrEqualTo(RE_AUTHENTICATION_START_VERSION)) {
EventID eventId = createEventId();
sendMessageDirectly(new ClientReAuthenticateMessage(eventId));
}

// We wait for all versions of clients to re-authenticate. For older clients we still
// wait, just in case client will perform some operations to
// trigger credential refresh on its own.
long waitFinishTime = waitForReAuthenticationStartTime + reAuthenticateWaitTime;
long remainingWaitTime = waitFinishTime - System.currentTimeMillis();
while (!subjectUpdated && remainingWaitTime > 0) {
reAuthenticationLock.wait(remainingWaitTime);
remainingWaitTime = waitFinishTime - System.currentTimeMillis();
}
}
// the above wait timed out
if (!subjectUpdated) {
long elapsedTime = System.currentTimeMillis() - waitForReAuthenticationStartTime;
// reset the timer here since we are no longer waiting for re-auth to happen anymore
waitForReAuthenticationStartTime = -1;
synchronized (_stopDispatchingLock) {
logger.warn(
"Client did not re-authenticate back successfully in {} ms. Unregister this client proxy.",
elapsedTime);
pauseOrUnregisterProxy(expired);
return true;
}
}
return false;
}

@VisibleForTesting
void dispatchResidualMessages() {
List<ClientMessage> list = new ArrayList<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1231,6 +1231,7 @@ long putSubject(Subject subject, long existingUniqueId) {
secureLogger.debug("update subject on client proxy {} with uniqueId {}", clientProxy,
uniqueId);
clientProxy.setSubject(subject);
clientProxy.notifyReAuthentication();
}
return uniqueId;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.geode.internal.cache.tier.sockets;

import static org.apache.geode.internal.lang.SystemPropertyHelper.RE_AUTHENTICATE_WAIT_TIME;
import static org.apache.geode.test.awaitility.GeodeAwaitility.await;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyLong;
Expand All @@ -35,8 +36,8 @@
import java.io.IOException;

import org.apache.shiro.subject.Subject;
import org.junit.Before;
import org.junit.Test;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

import org.apache.geode.CancelCriterion;
import org.apache.geode.cache.RegionDestroyedException;
Expand Down Expand Up @@ -65,7 +66,7 @@ public class MessageDispatcherTest {
private CacheClientProxyStats proxyStats;
private EventID eventID;

@Before
@BeforeEach
public void before() throws Exception {
proxy = mock(CacheClientProxy.class);
message = mock(ClientUpdateMessageImpl.class);
Expand Down Expand Up @@ -137,15 +138,15 @@ public void normalRunWillDispatchMessage() throws Exception {

@Test
public void newClientWillGetClientReAuthenticateMessage() throws Exception {
doReturn(false, false, false, false, false, true).when(dispatcher).isStopped();
doReturn(false, false, false, true).when(dispatcher).isStopped();
doThrow(AuthenticationExpiredException.class).when(dispatcher).dispatchMessage(any());
when(messageQueue.peek()).thenReturn(message);
when(proxy.getVersion()).thenReturn(KnownVersion.GEODE_1_15_0);
doReturn(eventID).when(dispatcher).createEventId();
doNothing().when(dispatcher).sendMessageDirectly(any());

// make sure wait time is short
doReturn(-1L).when(dispatcher).getSystemProperty(eq(RE_AUTHENTICATE_WAIT_TIME), anyLong());
doReturn(1L).when(dispatcher).getSystemProperty(eq(RE_AUTHENTICATE_WAIT_TIME), anyLong());
dispatcher.runDispatcher();

// verify a ReAuthenticate message will be send to the user
Expand All @@ -159,9 +160,9 @@ public void newClientWillGetClientReAuthenticateMessage() throws Exception {

@Test
public void oldClientWillNotGetClientReAuthenticateMessage() throws Exception {
doReturn(false, false, false, false, false, true).when(dispatcher).isStopped();
doReturn(false, false, true).when(dispatcher).isStopped();
// make sure wait time is short
doReturn(-1L).when(dispatcher).getSystemProperty(eq(RE_AUTHENTICATE_WAIT_TIME), anyLong());
doReturn(1L).when(dispatcher).getSystemProperty(eq(RE_AUTHENTICATE_WAIT_TIME), anyLong());

doThrow(AuthenticationExpiredException.class).when(dispatcher).dispatchMessage(any());
when(messageQueue.peek()).thenReturn(message);
Expand All @@ -173,6 +174,32 @@ public void oldClientWillNotGetClientReAuthenticateMessage() throws Exception {
verify(dispatcher, never()).dispatchResidualMessages();
}


@Test
public void oldClientWillContinueToDeliverMessageIfNotified() throws Exception {
doReturn(false, false, true).when(dispatcher).isStopped();
// make sure wait time is short
doReturn(10000L).when(dispatcher).getSystemProperty(eq(RE_AUTHENTICATE_WAIT_TIME), anyLong());
doThrow(AuthenticationExpiredException.class).when(dispatcher).dispatchMessage(any());
when(messageQueue.peek()).thenReturn(message);
when(proxy.getVersion()).thenReturn(KnownVersion.GEODE_1_14_0);

Thread dispatcherThread = new Thread(() -> dispatcher.runDispatcher());
Thread notifyThread = new Thread(() -> dispatcher.notifyReAuthentication());

dispatcherThread.start();
await().until(() -> dispatcher.isWaitingForReAuthentication());
notifyThread.start();

dispatcherThread.join();
notifyThread.join();

verify(dispatcher, never()).sendMessageDirectly(any());
// dispatcher will dispatch message
verify(dispatcher, never()).pauseOrUnregisterProxy(any(AuthenticationExpiredException.class));
verify(dispatcher).dispatchResidualMessages();
}

@Test
public void ioExceptionHappenedForDurableClientWillContinueToPeekForNextMessage()
throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ public class ExpirableSecurityManager extends SimpleSecurityManager implements S
new ConcurrentHashMap<>();
private final Map<String, List<String>> unauthorizedOps =
new ConcurrentHashMap<>();
private boolean allowDuplicate = false;

@Override
public Object authenticate(final Properties credentials) throws AuthenticationFailedException {
Expand All @@ -65,6 +66,10 @@ public void addExpiredUser(String user) {
expired_users.add(user);
}

public void setAllowDuplicate(boolean allowDuplicate) {
this.allowDuplicate = allowDuplicate;
}

public Set<String> getExpiredUsers() {
return expired_users;
}
Expand All @@ -83,7 +88,7 @@ private void addToMap(Map<String, List<String>> maps, Object user,
if (list == null) {
list = new ArrayList<>();
}
if (!list.contains(permission.toString())) {
if (allowDuplicate || !list.contains(permission.toString())) {
list.add(permission.toString());
}
maps.put(user.toString(), list);
Expand All @@ -93,6 +98,7 @@ public void close() {
expired_users.clear();
authorizedOps.clear();
unauthorizedOps.clear();
allowDuplicate = false;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ org/apache/geode/pdx/Day,false
org/apache/geode/pdx/DomainObjectPdxAuto$Day,false
org/apache/geode/pdx/DomainObjectPdxAutoNoDefaultConstructor$Day,false
org/apache/geode/pdx/SimpleClass$SimpleEnum,false
org/apache/geode/security/ExpirableSecurityManager,false,authorizedOps:java/util/Map,expired_users:java/util/Set,unauthorizedOps:java/util/Map
org/apache/geode/security/ExpirableSecurityManager,false,allowDuplicate:boolean,authorizedOps:java/util/Map,expired_users:java/util/Set,unauthorizedOps:java/util/Map
org/apache/geode/security/query/data/PdxQueryTestObject,false,age:int,id:int,name:java/lang/String,shouldThrowException:boolean
org/apache/geode/security/query/data/PdxTrade,false,cusip:java/lang/String,id:java/lang/String,price:int,shares:int
org/apache/geode/security/query/data/QueryTestObject,false,dateField:java/util/Date,id:int,mapField:java/util/Map,name:java/lang/String
Expand Down

0 comments on commit 4fbc35c

Please sign in to comment.