Skip to content

Commit

Permalink
[pulsar-proxy] Remove zk direct dependency from Proxy (apache#9973)
Browse files Browse the repository at this point in the history
  • Loading branch information
rdhabalia authored Mar 22, 2021
1 parent b1085d0 commit cdd87fb
Show file tree
Hide file tree
Showing 27 changed files with 31 additions and 216 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.discovery.service.web;
package org.apache.pulsar.broker.resources;

import java.io.Closeable;
import java.io.IOException;
Expand All @@ -28,8 +28,6 @@

import org.apache.bookkeeper.common.util.OrderedScheduler;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.pulsar.broker.resources.LoadManagerReportResources;
import org.apache.pulsar.broker.resources.PulsarResources;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.metadata.api.NotificationType;
import org.apache.pulsar.policies.data.loadbalancer.LoadManagerReport;
Expand All @@ -48,7 +46,7 @@ public class MetadataStoreCacheLoader implements Closeable {
private volatile List<LoadManagerReport> availableBrokers;

private final OrderedScheduler orderedExecutor = OrderedScheduler.newSchedulerBuilder().numThreads(8)
.name("pulsar-discovery-ordered-cache").build();
.name("pulsar-metadata-cache-loader-ordered-cache").build();

public static final String LOADBALANCE_BROKERS_ROOT = "/loadbalance/brokers";

Expand Down Expand Up @@ -145,4 +143,4 @@ private CompletableFuture<Void> updateBrokerList(List<String> brokerNodes) {

private static final Logger log = LoggerFactory.getLogger(MetadataStoreCacheLoader.class);

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,13 +31,13 @@

import org.apache.bookkeeper.common.util.OrderedScheduler;
import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
import org.apache.pulsar.broker.resources.MetadataStoreCacheLoader;
import org.apache.pulsar.broker.resources.PulsarResources;
import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
import org.apache.pulsar.common.policies.data.TenantInfo;
import org.apache.pulsar.discovery.service.server.ServiceConfig;
import org.apache.pulsar.discovery.service.web.MetadataStoreCacheLoader;
import org.apache.pulsar.metadata.api.MetadataStoreException.NotFoundException;
import org.apache.pulsar.policies.data.loadbalancer.LoadManagerReport;
import org.slf4j.Logger;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import javax.servlet.http.HttpServletResponse;
import javax.ws.rs.core.Response.Status;

import org.apache.pulsar.broker.resources.MetadataStoreCacheLoader;
import org.apache.pulsar.broker.resources.PulsarResources;
import org.apache.pulsar.common.util.RestException;
import org.apache.pulsar.metadata.api.MetadataStoreException;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
*/
package org.apache.pulsar.discovery.service;

import static org.apache.pulsar.discovery.service.web.MetadataStoreCacheLoader.LOADBALANCE_BROKERS_ROOT;
import static org.apache.pulsar.broker.resources.MetadataStoreCacheLoader.LOADBALANCE_BROKERS_ROOT;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.spy;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
*/
package org.apache.pulsar.discovery.service;

import static org.apache.pulsar.discovery.service.web.MetadataStoreCacheLoader.LOADBALANCE_BROKERS_ROOT;
import static org.apache.pulsar.broker.resources.MetadataStoreCacheLoader.LOADBALANCE_BROKERS_ROOT;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotEquals;
import static org.testng.Assert.fail;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
*/
package org.apache.pulsar.discovery.service.web;

import static org.apache.pulsar.discovery.service.web.MetadataStoreCacheLoader.LOADBALANCE_BROKERS_ROOT;
import static org.apache.pulsar.broker.resources.MetadataStoreCacheLoader.LOADBALANCE_BROKERS_ROOT;

import java.nio.charset.StandardCharsets;
import java.util.Optional;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
package org.apache.pulsar.discovery.service.web;

import static javax.ws.rs.core.Response.Status.BAD_GATEWAY;
import static org.apache.pulsar.discovery.service.web.MetadataStoreCacheLoader.LOADBALANCE_BROKERS_ROOT;
import static org.apache.pulsar.broker.resources.MetadataStoreCacheLoader.LOADBALANCE_BROKERS_ROOT;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;
Expand Down Expand Up @@ -54,6 +54,7 @@
import javax.ws.rs.client.WebTarget;
import javax.ws.rs.core.MediaType;

import org.apache.pulsar.broker.resources.MetadataStoreCacheLoader;
import org.apache.pulsar.broker.resources.PulsarResources;
import org.apache.pulsar.common.policies.data.BundlesData;
import org.apache.pulsar.common.util.ObjectMapperFactory;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
*/
package org.apache.pulsar.discovery.service.web;

import static org.apache.pulsar.discovery.service.web.MetadataStoreCacheLoader.LOADBALANCE_BROKERS_ROOT;
import static org.apache.pulsar.broker.resources.MetadataStoreCacheLoader.LOADBALANCE_BROKERS_ROOT;
import static org.testng.Assert.fail;

import java.io.IOException;
Expand All @@ -27,6 +27,7 @@
import java.util.Set;
import java.util.stream.Collectors;

import org.apache.pulsar.broker.resources.MetadataStoreCacheLoader;
import org.apache.pulsar.broker.resources.PulsarResources;
import org.apache.pulsar.common.util.ObjectMapperFactory;
import org.apache.pulsar.policies.data.loadbalancer.LoadManagerReport;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,13 +32,12 @@
import org.apache.bookkeeper.common.util.OrderedScheduler;
import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
import org.apache.pulsar.broker.resources.MetadataStoreCacheLoader;
import org.apache.pulsar.broker.resources.PulsarResources;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
import org.apache.pulsar.common.policies.data.TenantInfo;
import org.apache.pulsar.policies.data.loadbalancer.LoadManagerReport;
import org.apache.pulsar.proxy.server.util.ZookeeperCacheLoader;
import org.apache.pulsar.zookeeper.ZooKeeperClientFactory;
import org.apache.zookeeper.KeeperException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -53,7 +52,7 @@
*/
public class BrokerDiscoveryProvider implements Closeable {

final ZookeeperCacheLoader localZkCache;
final MetadataStoreCacheLoader metadataStoreCacheLoader;
final PulsarResources pulsarResources;

private final AtomicInteger counter = new AtomicInteger();
Expand All @@ -65,13 +64,12 @@ public class BrokerDiscoveryProvider implements Closeable {

private static final String PARTITIONED_TOPIC_PATH_ZNODE = "partitioned-topics";

public BrokerDiscoveryProvider(ProxyConfiguration config, ZooKeeperClientFactory zkClientFactory,
PulsarResources pulsarResources)
public BrokerDiscoveryProvider(ProxyConfiguration config, PulsarResources pulsarResources)
throws PulsarServerException {
try {
localZkCache = new ZookeeperCacheLoader(zkClientFactory, config.getZookeeperServers(),
config.getZookeeperSessionTimeoutMs());
this.pulsarResources = pulsarResources;
this.metadataStoreCacheLoader = new MetadataStoreCacheLoader(pulsarResources,
config.getZookeeperSessionTimeoutMs());
} catch (Exception e) {
LOG.error("Failed to start ZooKeeper {}", e.getMessage(), e);
throw new PulsarServerException("Failed to start zookeeper :" + e.getMessage(), e);
Expand All @@ -85,7 +83,7 @@ public BrokerDiscoveryProvider(ProxyConfiguration config, ZooKeeperClientFactory
* @throws PulsarServerException
*/
LoadManagerReport nextBroker() throws PulsarServerException {
List<LoadManagerReport> availableBrokers = localZkCache.getAvailableBrokers();
List<LoadManagerReport> availableBrokers = metadataStoreCacheLoader.getAvailableBrokers();

if (availableBrokers.isEmpty()) {
throw new PulsarServerException("No active broker is available");
Expand Down Expand Up @@ -165,7 +163,7 @@ public static String path(String... parts) {

@Override
public void close() throws IOException {
localZkCache.close();
metadataStoreCacheLoader.close();
orderedExecutor.shutdown();
scheduledExecutorScheduler.shutdownNow();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,6 @@
import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
import org.apache.pulsar.broker.web.plugin.servlet.AdditionalServlets;
import org.apache.pulsar.proxy.stats.TopicStats;
import org.apache.pulsar.zookeeper.ZooKeeperClientFactory;
import org.apache.pulsar.zookeeper.ZookeeperClientFactoryImpl;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -76,7 +74,6 @@ public class ProxyService implements Closeable {
private ConfigurationMetadataCacheService configurationCacheService;
private final AuthenticationService authenticationService;
private AuthorizationService authorizationService;
private ZooKeeperClientFactory zkClientFactory = null;
private MetadataStoreExtended localMetadataStore;
private MetadataStoreExtended configMetadataStore;
private PulsarResources pulsarResources;
Expand Down Expand Up @@ -172,8 +169,7 @@ public void start() throws Exception {
localMetadataStore = createLocalMetadataStore();
configMetadataStore = createConfigurationMetadataStore();
pulsarResources = new PulsarResources(localMetadataStore, configMetadataStore);
discoveryProvider = new BrokerDiscoveryProvider(this.proxyConfig, getZooKeeperClientFactory(),
pulsarResources);
discoveryProvider = new BrokerDiscoveryProvider(this.proxyConfig, pulsarResources);
this.configurationCacheService = new ConfigurationMetadataCacheService(pulsarResources, null);
authorizationService = new AuthorizationService(PulsarConfigurationLoader.convertFrom(proxyConfig),
configurationCacheService);
Expand Down Expand Up @@ -223,14 +219,6 @@ public void start() throws Exception {
}
}

public ZooKeeperClientFactory getZooKeeperClientFactory() {
if (zkClientFactory == null) {
zkClientFactory = new ZookeeperClientFactoryImpl();
}
// Return default factory
return zkClientFactory;
}

public BrokerDiscoveryProvider getDiscoveryProvider() {
return discoveryProvider;
}
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ protected void setup() throws Exception {
new ZKMetadataStore(mockZooKeeperGlobal));
webServer = new WebServer(proxyConfig, new AuthenticationService(
PulsarConfigurationLoader.convertFrom(proxyConfig)));
discoveryProvider = spy(new BrokerDiscoveryProvider(proxyConfig, mockZooKeeperClientFactory, resource));
discoveryProvider = spy(new BrokerDiscoveryProvider(proxyConfig, resource));
LoadManagerReport report = new LoadReport(brokerUrl.toString(), brokerUrlTls.toString(), null, null);
doReturn(report).when(discoveryProvider).nextBroker();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,6 @@ protected void setup() throws Exception {

proxyService = Mockito.spy(new ProxyService(proxyConfig,
new AuthenticationService(PulsarConfigurationLoader.convertFrom(proxyConfig))));
doReturn(mockZooKeeperClientFactory).when(proxyService).getZooKeeperClientFactory();
doReturn(new ZKMetadataStore(mockZooKeeper)).when(proxyService).createLocalMetadataStore();
doReturn(new ZKMetadataStore(mockZooKeeperGlobal)).when(proxyService).createConfigurationMetadataStore();

Expand Down
Loading

0 comments on commit cdd87fb

Please sign in to comment.