Skip to content

Commit

Permalink
GEODE-10380: use waitingThreadPool to notify dispatcher at re_auth (a…
Browse files Browse the repository at this point in the history
  • Loading branch information
jinmeiliao authored Jun 15, 2022
1 parent 03ab34d commit b3fef2a
Show file tree
Hide file tree
Showing 2 changed files with 49 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
Expand Down Expand Up @@ -727,7 +728,11 @@ public void notifyReAuthentication() {
if (_messageDispatcher == null) {
return;
}
_messageDispatcher.notifyReAuthentication();

// use another thread to do the notification so that the server operation won't be blocked
ExecutorService threadPool =
_cache.getDistributionManager().getExecutors().getWaitingThreadPool();
threadPool.submit(() -> _messageDispatcher.notifyReAuthentication());
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,9 @@
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyBoolean;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.spy;
Expand All @@ -30,19 +32,26 @@

import java.net.InetAddress;
import java.net.Socket;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicBoolean;

import org.apache.shiro.subject.Subject;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.mockito.stubbing.Answer;

import org.apache.geode.StatisticsFactory;
import org.apache.geode.distributed.internal.DistributionManager;
import org.apache.geode.distributed.internal.OperationExecutors;
import org.apache.geode.internal.cache.Conflatable;
import org.apache.geode.internal.cache.InternalCache;
import org.apache.geode.internal.cache.tier.sockets.CacheClientProxy.CacheClientProxyStatsFactory;
import org.apache.geode.internal.cache.tier.sockets.CacheClientProxy.MessageDispatcherFactory;
import org.apache.geode.internal.security.SecurityService;
import org.apache.geode.internal.serialization.KnownVersion;
import org.apache.geode.internal.statistics.StatisticsClock;
import org.apache.geode.test.junit.rules.ExecutorServiceRule;

public class CacheClientProxyTest {
private CacheClientProxy proxyWithSingleUser;
Expand Down Expand Up @@ -71,6 +80,7 @@ public void before() throws Exception {
when(socket.getInetAddress()).thenReturn(inetAddress);
when(notifier.getAcceptorStats()).thenReturn(stats);
id = mock(ClientProxyMembershipID.class);
when(id.getDurableId()).thenReturn("proxy_id");
version = KnownVersion.TEST_VERSION;
securityService = mock(SecurityService.class);
subject = mock(Subject.class);
Expand Down Expand Up @@ -175,4 +185,37 @@ public void close_multiUser_calls_ClientUserAuthsCleanUp() {
verify(subject, never()).logout();
verify(clientUserAuths, times(1)).cleanup(anyBoolean());
}

@Rule
public ExecutorServiceRule executorService = new ExecutorServiceRule();

@Test
public void notifyReAuthenticationIsNotBlocked() {
CacheClientProxy spy = spy(proxyWithSingleUser);
MessageDispatcher dispatcher = mock(MessageDispatcher.class);
doReturn(dispatcher).when(spy).createMessageDispatcher(any());
spy.initializeMessageDispatcher();
DistributionManager manager = mock(DistributionManager.class);
OperationExecutors executors = mock(OperationExecutors.class);
ExecutorService executor = executorService.getExecutorService();
when(cache.getDistributionManager()).thenReturn(manager);
when(manager.getExecutors()).thenReturn(executors);
when(executors.getWaitingThreadPool()).thenReturn(executor);

AtomicBoolean updated = new AtomicBoolean(false);

// simulating a blocked message dispatcher when notify reauth
doAnswer((Answer<Void>) invocation -> {
while (!updated.get()) {
Thread.sleep(200);
}
return null;
}).when(dispatcher).notifyReAuthentication();

// proxy.notifyReauthentication won't be blocked
spy.notifyReAuthentication();
assertThat(updated.get()).isFalse();
}


}

0 comments on commit b3fef2a

Please sign in to comment.