Skip to content

Commit

Permalink
[ISSUE 9783][pulsar-client] Allow pulsar client receive external timer (
Browse files Browse the repository at this point in the history
apache#9802)

Fixed apache#9783

### Motivation

Allow pulsar client to receive external timer instance

### Modifications

Add new constructor to provide an external timer, and share timer in pulsar proxy

### Verifying this change
- [x] Make sure that the change passes the CI checks.
  • Loading branch information
linlinnn authored Mar 29, 2021
1 parent a070c33 commit af6eaba
Show file tree
Hide file tree
Showing 4 changed files with 61 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ public class PulsarClientImpl implements PulsarClient {
private LookupService lookup;
private final ConnectionPool cnxPool;
private final Timer timer;
private boolean needStopTimer;
private final ExecutorProvider externalExecutorProvider;
private final ExecutorProvider internalExecutorService;

Expand Down Expand Up @@ -131,11 +132,16 @@ public PulsarClientImpl(ClientConfigurationData conf) throws PulsarClientExcepti
}

public PulsarClientImpl(ClientConfigurationData conf, EventLoopGroup eventLoopGroup) throws PulsarClientException {
this(conf, eventLoopGroup, new ConnectionPool(conf, eventLoopGroup));
this(conf, eventLoopGroup, new ConnectionPool(conf, eventLoopGroup), null);
}

public PulsarClientImpl(ClientConfigurationData conf, EventLoopGroup eventLoopGroup, ConnectionPool cnxPool)
throws PulsarClientException {
this(conf, eventLoopGroup, cnxPool, null);
}

public PulsarClientImpl(ClientConfigurationData conf, EventLoopGroup eventLoopGroup, ConnectionPool cnxPool, Timer timer)
throws PulsarClientException {
if (conf == null || isBlank(conf.getServiceUrl()) || eventLoopGroup == null) {
throw new PulsarClientException.InvalidConfigurationException("Invalid client configuration");
}
Expand All @@ -152,7 +158,12 @@ public PulsarClientImpl(ClientConfigurationData conf, EventLoopGroup eventLoopGr
} else {
lookup = new BinaryProtoLookupService(this, conf.getServiceUrl(), conf.getListenerName(), conf.isUseTls(), externalExecutorProvider.getExecutor());
}
timer = new HashedWheelTimer(getThreadFactory("pulsar-timer"), 1, TimeUnit.MILLISECONDS);
if (timer == null) {
this.timer = new HashedWheelTimer(getThreadFactory("pulsar-timer"), 1, TimeUnit.MILLISECONDS);
needStopTimer = true;
} else {
this.timer = timer;
}
producers = Collections.newSetFromMap(new ConcurrentHashMap<>());
consumers = Collections.newSetFromMap(new ConcurrentHashMap<>());

Expand Down Expand Up @@ -671,7 +682,9 @@ public void shutdown() throws PulsarClientException {
try {
lookup.close();
cnxPool.close();
timer.stop();
if (needStopTimer) {
timer.stop();
}
externalExecutorProvider.shutdownNow();
internalExecutorService.shutdownNow();
conf.getAuthentication().close();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import static org.mockito.Mockito.verify;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertNotSame;
import static org.testng.Assert.assertSame;
Expand All @@ -33,8 +34,10 @@
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPromise;
import io.netty.channel.EventLoopGroup;
import io.netty.util.HashedWheelTimer;
import io.netty.util.concurrent.DefaultThreadFactory;

import java.lang.reflect.Field;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.util.ArrayList;
Expand All @@ -53,6 +56,7 @@
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
import org.apache.pulsar.common.util.netty.EventLoopUtil;
import org.mockito.Mockito;
import org.powermock.reflect.Whitebox;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
Expand Down Expand Up @@ -146,4 +150,32 @@ public void testConsumerIsClosed() throws Exception {
assertSame(consumer.getState(), HandlerState.State.Closed));
}

@Test
public void testInitializeWithoutTimer() throws Exception {
ClientConfigurationData conf = new ClientConfigurationData();
conf.setServiceUrl("pulsar://localhost:6650");
PulsarClientImpl client = new PulsarClientImpl(conf);

HashedWheelTimer timer = mock(HashedWheelTimer.class);
Field field = client.getClass().getDeclaredField("timer");
field.setAccessible(true);
field.set(client, timer);

client.shutdown();
verify(timer).stop();
}

@Test
public void testInitializeWithTimer() throws PulsarClientException {
ClientConfigurationData conf = new ClientConfigurationData();
EventLoopGroup eventLoop = EventLoopUtil.newEventLoopGroup(1, new DefaultThreadFactory("test"));
ConnectionPool pool = Mockito.spy(new ConnectionPool(conf, eventLoop));
conf.setServiceUrl("pulsar://localhost:6650");

HashedWheelTimer timer = new HashedWheelTimer();
PulsarClientImpl client = new PulsarClientImpl(conf, eventLoop, pool, timer);

client.shutdown();
client.timer().stop();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -299,7 +299,7 @@ remoteAddress, protocolVersionToAdvertise, getRemoteEndpointProtocolVersion(),
if (!service.getConfiguration().isAuthenticationEnabled()) {
this.client = new PulsarClientImpl(clientConf, service.getWorkerGroup(),
new ProxyConnectionPool(clientConf, service.getWorkerGroup(),
() -> new ClientCnx(clientConf, service.getWorkerGroup(), protocolVersion)));
() -> new ClientCnx(clientConf, service.getWorkerGroup(), protocolVersion)), service.getTimer());

completeConnect();
return;
Expand Down Expand Up @@ -436,7 +436,7 @@ private PulsarClientImpl createClient(final ClientConfigurationData clientConf,
final String clientAuthMethod, final int protocolVersion) throws PulsarClientException {
return new PulsarClientImpl(clientConf, service.getWorkerGroup(),
new ProxyConnectionPool(clientConf, service.getWorkerGroup(), () -> new ProxyClientCnx(clientConf,
service.getWorkerGroup(), clientAuthRole, clientAuthData, clientAuthMethod, protocolVersion)));
service.getWorkerGroup(), clientAuthRole, clientAuthData, clientAuthMethod, protocolVersion)), service.getTimer());
}

private static int getProtocolVersionToAdvertise(CommandConnect connect) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.util.concurrent.DefaultThreadFactory;
import io.netty.util.HashedWheelTimer;
import io.netty.util.Timer;
import io.prometheus.client.Counter;
import io.prometheus.client.Gauge;
import lombok.Getter;
Expand Down Expand Up @@ -69,6 +71,7 @@
public class ProxyService implements Closeable {

private final ProxyConfiguration proxyConfig;
private final Timer timer;
private String serviceUrl;
private String serviceUrlTls;
private ConfigurationMetadataCacheService configurationCacheService;
Expand Down Expand Up @@ -128,6 +131,7 @@ public ProxyService(ProxyConfiguration proxyConfig,
AuthenticationService authenticationService) throws IOException {
checkNotNull(proxyConfig);
this.proxyConfig = proxyConfig;
this.timer = new HashedWheelTimer(new DefaultThreadFactory("pulsar-timer", Thread.currentThread().isDaemon()), 1, TimeUnit.MILLISECONDS);
this.clientCnxs = Sets.newConcurrentHashSet();
this.topicStats = Maps.newConcurrentMap();

Expand Down Expand Up @@ -261,6 +265,9 @@ public void close() throws IOException {
}
acceptorGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
if (timer != null) {
timer.stop();
}
}

public String getServiceUrl() {
Expand All @@ -275,6 +282,10 @@ public ProxyConfiguration getConfiguration() {
return proxyConfig;
}

public Timer getTimer() {
return timer;
}

public AuthenticationService getAuthenticationService() {
return authenticationService;
}
Expand Down

0 comments on commit af6eaba

Please sign in to comment.