Skip to content

Commit

Permalink
[pulsar-discovery] Replace MetadataStore with ZooKeeper in discoverys…
Browse files Browse the repository at this point in the history
…ervice (apache#9967)

* [pulsar-discovery] Replace MetadataStore with ZooKeeper in discovery service

fix test

fix test

fix test

* fix test
  • Loading branch information
rdhabalia authored Mar 19, 2021
1 parent 058236c commit dd32435
Show file tree
Hide file tree
Showing 15 changed files with 367 additions and 421 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.broker.resources;

import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
import org.apache.pulsar.policies.data.loadbalancer.LoadManagerReport;

public class LoadManagerReportResources extends BaseResources<LoadManagerReport> {

public LoadManagerReportResources(MetadataStoreExtended configurationStore, int operationTimeoutSec) {
super(configurationStore, LoadManagerReport.class, operationTimeoutSec);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ public class PulsarResources {
private NamespaceResources namespaceResources;
private DynamicConfigurationResources dynamicConfigResources;
private LocalPoliciesResources localPolicies;
private LoadManagerReportResources loadReportResources;
private Optional<MetadataStoreExtended> localMetadataStore;
private Optional<MetadataStoreExtended> configurationMetadataStore;

Expand All @@ -52,6 +53,7 @@ public PulsarResources(MetadataStoreExtended localMetadataStore, MetadataStoreEx
if (localMetadataStore != null) {
dynamicConfigResources = new DynamicConfigurationResources(localMetadataStore, operationTimeoutSec);
localPolicies = new LocalPoliciesResources(localMetadataStore, operationTimeoutSec);
loadReportResources = new LoadManagerReportResources(localMetadataStore, operationTimeoutSec);
}
this.localMetadataStore = Optional.ofNullable(localMetadataStore);
this.configurationMetadataStore = Optional.ofNullable(configurationMetadataStore);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@
import org.apache.pulsar.common.util.SecurityUtility;
import org.apache.pulsar.discovery.service.DiscoveryService;
import org.apache.pulsar.discovery.service.server.ServiceConfig;
import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
import org.apache.pulsar.metadata.impl.ZKMetadataStore;
import org.asynchttpclient.AsyncCompletionHandler;
import org.asynchttpclient.AsyncHttpClient;
Expand Down Expand Up @@ -477,11 +478,7 @@ public void testDiscoveryLookup() throws Exception {
config.setBindOnLocalhost(true);

@Cleanup
DiscoveryService discoveryService = spy(new DiscoveryService(config));
doReturn(mockZooKeeperClientFactory).when(discoveryService).getZooKeeperClientFactory();
doReturn(new ZKMetadataStore(mockZooKeeper)).when(discoveryService).createLocalMetadataStore();
doReturn(new ZKMetadataStore(mockZooKeeperGlobal)).when(discoveryService).createConfigurationMetadataStore();
discoveryService.start();
DiscoveryService discoveryService = createAndStartDiscoveryService(config);

// (2) lookup using discovery service
final String discoverySvcUrl = discoveryService.getServiceUrl();
Expand Down Expand Up @@ -548,11 +545,7 @@ public void testDiscoveryLookupTls() throws Exception {
config.setTlsKeyFilePath(TLS_SERVER_KEY_FILE_PATH);

@Cleanup
DiscoveryService discoveryService = spy(new DiscoveryService(config));
doReturn(mockZooKeeperClientFactory).when(discoveryService).getZooKeeperClientFactory();
doReturn(new ZKMetadataStore(mockZooKeeper)).when(discoveryService).createLocalMetadataStore();
doReturn(new ZKMetadataStore(mockZooKeeperGlobal)).when(discoveryService).createConfigurationMetadataStore();
discoveryService.start();
DiscoveryService discoveryService = createAndStartDiscoveryService(config);

// (3) lookup using discovery service
final String discoverySvcUrl = discoveryService.getServiceUrlTls();
Expand Down Expand Up @@ -610,11 +603,7 @@ public void testDiscoveryLookupAuthAndAuthSuccess() throws Exception {
config.setConfigurationStoreServers("localhost:3181");

@Cleanup
DiscoveryService discoveryService = spy(new DiscoveryService(config));
doReturn(mockZooKeeperClientFactory).when(discoveryService).getZooKeeperClientFactory();
doReturn(new ZKMetadataStore(mockZooKeeper)).when(discoveryService).createLocalMetadataStore();
doReturn(new ZKMetadataStore(mockZooKeeperGlobal)).when(discoveryService).createConfigurationMetadataStore();
discoveryService.start();
DiscoveryService discoveryService = createAndStartDiscoveryService(config);

// (2) lookup using discovery service
final String discoverySvcUrl = discoveryService.getServiceUrl();
Expand Down Expand Up @@ -688,11 +677,7 @@ public void testDiscoveryLookupAuthenticationFailure() throws Exception {
config.setAuthorizationEnabled(true);

@Cleanup
DiscoveryService discoveryService = spy(new DiscoveryService(config));
doReturn(mockZooKeeperClientFactory).when(discoveryService).getZooKeeperClientFactory();
doReturn(new ZKMetadataStore(mockZooKeeper)).when(discoveryService).createLocalMetadataStore();
doReturn(new ZKMetadataStore(mockZooKeeperGlobal)).when(discoveryService).createConfigurationMetadataStore();
discoveryService.start();
DiscoveryService discoveryService = createAndStartDiscoveryService(config);
// (2) lookup using discovery service
final String discoverySvcUrl = discoveryService.getServiceUrl();

Expand Down Expand Up @@ -752,11 +737,7 @@ public void testDiscoveryLookupAuthorizationFailure() throws Exception {
config.setAuthorizationEnabled(true);

@Cleanup
DiscoveryService discoveryService = spy(new DiscoveryService(config));
doReturn(mockZooKeeperClientFactory).when(discoveryService).getZooKeeperClientFactory();
doReturn(new ZKMetadataStore(mockZooKeeper)).when(discoveryService).createLocalMetadataStore();
doReturn(new ZKMetadataStore(mockZooKeeperGlobal)).when(discoveryService).createConfigurationMetadataStore();
discoveryService.start();
DiscoveryService discoveryService = createAndStartDiscoveryService(config);
// (2) lookup using discovery service
final String discoverySvcUrl = discoveryService.getServiceUrl();

Expand Down Expand Up @@ -1211,4 +1192,16 @@ public String authenticate(AuthenticationDataSource authData) throws Authenticat
return "invalid";
}
}

private DiscoveryService createAndStartDiscoveryService(ServiceConfig config) throws Exception {
MetadataStoreExtended localMetadatastore = new ZKMetadataStore(mockZooKeeper);
MetadataStoreExtended configMetadatastore = new ZKMetadataStore(mockZooKeeperGlobal);
DiscoveryService discoveryService = spy(new DiscoveryService(config));
doReturn(localMetadatastore).when(discoveryService).createLocalMetadataStore();
doReturn(configMetadatastore).when(discoveryService).createConfigurationMetadataStore();
doReturn(localMetadatastore).when(discoveryService).createLocalMetadataStore();
doReturn(configMetadatastore).when(discoveryService).createConfigurationMetadataStore();
discoveryService.start();
return discoveryService;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,14 @@
import static org.testng.Assert.fail;

import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.gson.Gson;
import com.google.gson.JsonObject;
import com.google.gson.JsonParseException;

import java.util.Map;
import java.util.Optional;
import java.util.TreeMap;
import java.util.concurrent.CompletableFuture;

import javax.ws.rs.HttpMethod;
import javax.ws.rs.client.Client;
Expand All @@ -44,8 +44,8 @@
import org.apache.pulsar.common.policies.data.BundlesData;
import org.apache.pulsar.discovery.service.server.ServerManager;
import org.apache.pulsar.discovery.service.server.ServiceConfig;
import org.apache.pulsar.zookeeper.ZooKeeperClientFactory;
import org.apache.zookeeper.ZooKeeper;
import org.apache.pulsar.metadata.api.MetadataStoreException;
import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
import org.glassfish.jersey.client.ClientConfig;
import org.glassfish.jersey.logging.LoggingFeature;
import org.testng.annotations.AfterMethod;
Expand All @@ -56,6 +56,10 @@
public class DiscoveryServiceWebTest extends ProducerConsumerBase {

private final Client client = ClientBuilder.newClient(new ClientConfig().register(LoggingFeature.class));
// DiscoveryServiceServlet gets initialized by a server and this map will help to retrieve ZK while mocking
// DiscoveryServiceServlet
private static final Map<String, MetadataStoreExtended> metadataStoreInstanceCache = Maps.newConcurrentMap();
private ServerManager server;

@BeforeMethod
@Override
Expand All @@ -64,12 +68,25 @@ protected void setup() throws Exception {
super.producerBaseSetup();
super.conf.setAuthorizationEnabled(true);
super.conf.setAuthenticationEnabled(true);

// start server
ServiceConfig config = new ServiceConfig();
config.setWebServicePort(Optional.of(0));
server = new ServerManager(config);
Map<String, String> params = new TreeMap<>();
String zkServerUrl = "mockZkServerUrl";
metadataStoreInstanceCache.put(zkServerUrl, pulsar.createLocalMetadataStore());
params.put("zookeeperServers", zkServerUrl);
server.addServlet("/", DiscoveryServiceServletTest.class, params);
server.start();
}

@AfterMethod(alwaysRun = true)
@Override
protected void cleanup() throws Exception {
super.internalCleanup();
server.stop();
metadataStoreInstanceCache.clear();
}

/**
Expand All @@ -80,17 +97,6 @@ protected void cleanup() throws Exception {
*/
@Test
public void testRedirectUrlWithServerStarted() throws Exception {
// 1. start server
ServiceConfig config = new ServiceConfig();
config.setWebServicePort(Optional.of(0));
ServerManager server = new ServerManager(config);
DiscoveryZooKeeperClientFactoryImpl.zk = mockZooKeeper;
Map<String, String> params = new TreeMap<>();
params.put("zookeeperServers", "");
params.put("zookeeperClientFactoryClass", DiscoveryZooKeeperClientFactoryImpl.class.getName());
server.addServlet("/", DiscoveryServiceServlet.class, params);
server.start();

String serviceUrl = server.getServiceUri().toString();
String putRequestUrl = serviceUrl + "admin/v2/namespaces/p1/n1";
String postRequestUrl = serviceUrl + "admin/v2/namespaces/p1/n1/replication";
Expand All @@ -105,9 +111,6 @@ public void testRedirectUrlWithServerStarted() throws Exception {
"Need to authenticate to perform the request");
assertEquals(hitBrokerService(HttpMethod.PUT, putRequestUrl, new BundlesData(1)), "Need to authenticate to perform the request");
assertEquals(hitBrokerService(HttpMethod.GET, getRequestUrl, null), "Need to authenticate to perform the request");

server.stop();

}

public String hitBrokerService(String method, String url, Object data) throws JsonParseException {
Expand All @@ -134,14 +137,10 @@ public String hitBrokerService(String method, String url, Object data) throws Js
return jsonObject.get("reason").getAsString();
}

static class DiscoveryZooKeeperClientFactoryImpl implements ZooKeeperClientFactory {
static ZooKeeper zk;

public static class DiscoveryServiceServletTest extends DiscoveryServiceServlet {
@Override
public CompletableFuture<ZooKeeper> create(String serverList, SessionType sessionType,
int zkSessionTimeoutMillis) {
return CompletableFuture.completedFuture(zk);
public MetadataStoreExtended createLocalMetadataStore(String zookeeperServers, int operationimeoutMs) throws MetadataStoreException {
return metadataStoreInstanceCache.get(zookeeperServers);
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -37,11 +37,9 @@
import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
import org.apache.pulsar.common.policies.data.TenantInfo;
import org.apache.pulsar.discovery.service.server.ServiceConfig;
import org.apache.pulsar.discovery.service.web.ZookeeperCacheLoader;
import org.apache.pulsar.discovery.service.web.MetadataStoreCacheLoader;
import org.apache.pulsar.metadata.api.MetadataStoreException.NotFoundException;
import org.apache.pulsar.policies.data.loadbalancer.LoadManagerReport;
import org.apache.pulsar.zookeeper.ZooKeeperClientFactory;
import org.apache.zookeeper.KeeperException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -55,7 +53,7 @@
*/
public class BrokerDiscoveryProvider implements Closeable {

final ZookeeperCacheLoader localZkCache;
final MetadataStoreCacheLoader metadataStoreCacheLoader;
private final AtomicInteger counter = new AtomicInteger();
private PulsarResources pulsarResources;

Expand All @@ -66,11 +64,10 @@ public class BrokerDiscoveryProvider implements Closeable {

private static final String PARTITIONED_TOPIC_PATH_ZNODE = "partitioned-topics";

public BrokerDiscoveryProvider(ServiceConfig config, ZooKeeperClientFactory zkClientFactory,
PulsarResources pulsarResources) throws PulsarServerException {
public BrokerDiscoveryProvider(ServiceConfig config, PulsarResources pulsarResources) throws PulsarServerException {
try {
this.pulsarResources = pulsarResources;
localZkCache = new ZookeeperCacheLoader(zkClientFactory, config.getZookeeperServers(),
this.metadataStoreCacheLoader = new MetadataStoreCacheLoader(pulsarResources,
config.getZookeeperSessionTimeoutMs());
} catch (Exception e) {
LOG.error("Failed to start ZooKeeper {}", e.getMessage(), e);
Expand All @@ -97,7 +94,7 @@ LoadManagerReport nextBroker() throws PulsarServerException {
}

List<LoadManagerReport> getAvailableBrokers() {
List<LoadManagerReport> availableBrokers = localZkCache.getAvailableBrokers();
List<LoadManagerReport> availableBrokers = metadataStoreCacheLoader.getAvailableBrokers();
return availableBrokers;
}

Expand All @@ -110,7 +107,8 @@ CompletableFuture<PartitionedTopicMetadata> getPartitionedTopicMetadata(Discover
final String path = path(PARTITIONED_TOPIC_PATH_ZNODE,
topicName.getNamespaceObject().toString(), "persistent", topicName.getEncodedLocalName());
// gets the number of partitions from the zk cache
pulsarResources.getNamespaceResources().getPartitionedTopicResources().getAsync(path).thenAccept(metadata -> {
pulsarResources.getNamespaceResources().getPartitionedTopicResources().getAsync(path)
.thenAccept(metadata -> {
// if the partitioned topic is not found in zk, then the topic
// is not partitioned
if (metadata.isPresent()) {
Expand Down Expand Up @@ -153,7 +151,8 @@ protected static void checkAuthorization(DiscoveryService service, TopicName top
throw new IllegalAccessException(String.format("Failed to get property %s admin data due to %s",
topicName.getTenant(), e.getMessage()));
}
if (!service.getAuthorizationService().isTenantAdmin(topicName.getTenant(), role, tenantInfo, authenticationData).get()) {
if (!service.getAuthorizationService()
.isTenantAdmin(topicName.getTenant(), role, tenantInfo, authenticationData).get()) {
throw new IllegalAccessException("Don't have permission to administrate resources on this property");
}
}
Expand All @@ -171,7 +170,7 @@ public static String path(String... parts) {

@Override
public void close() throws IOException {
localZkCache.close();
metadataStoreCacheLoader.close();
orderedExecutor.shutdown();
scheduledExecutorScheduler.shutdownNow();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,6 @@
import org.apache.pulsar.discovery.service.server.ServiceConfig;
import org.apache.pulsar.metadata.api.MetadataStoreException;
import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
import org.apache.pulsar.zookeeper.ZooKeeperClientFactory;
import org.apache.pulsar.zookeeper.ZookeeperClientFactoryImpl;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -66,7 +64,6 @@ public class DiscoveryService implements Closeable {
private ConfigurationMetadataCacheService configurationCacheService;
private AuthenticationService authenticationService;
private AuthorizationService authorizationService;
private ZooKeeperClientFactory zkClientFactory = null;
private BrokerDiscoveryProvider discoveryProvider;
private final EventLoopGroup acceptorGroup;
private MetadataStoreExtended localMetadataStore;
Expand Down Expand Up @@ -98,7 +95,7 @@ public void start() throws Exception {
localMetadataStore = createLocalMetadataStore();
configMetadataStore = createConfigurationMetadataStore();
pulsarResources = new PulsarResources(localMetadataStore, configMetadataStore);
discoveryProvider = new BrokerDiscoveryProvider(this.config, getZooKeeperClientFactory(), pulsarResources);
discoveryProvider = new BrokerDiscoveryProvider(this.config, pulsarResources);
this.configurationCacheService = new ConfigurationMetadataCacheService(pulsarResources, null);
ServiceConfiguration serviceConfiguration = PulsarConfigurationLoader.convertFrom(config);
authenticationService = new AuthenticationService(serviceConfiguration);
Expand Down Expand Up @@ -145,14 +142,6 @@ public void startServer() throws Exception {
this.serviceUrlTls = serviceUrlTls();
}

public ZooKeeperClientFactory getZooKeeperClientFactory() {
if (zkClientFactory == null) {
zkClientFactory = new ZookeeperClientFactoryImpl();
}
// Return default factory
return zkClientFactory;
}

public BrokerDiscoveryProvider getDiscoveryProvider() {
return discoveryProvider;
}
Expand Down Expand Up @@ -235,8 +224,6 @@ public void setConfigurationCacheService(ConfigurationMetadataCacheService confi
this.configurationCacheService = configurationCacheService;
}

private static final Logger LOG = LoggerFactory.getLogger(DiscoveryService.class);

public MetadataStoreExtended createLocalMetadataStore() throws MetadataStoreException {
return PulsarResources.createMetadataStore(config.getZookeeperServers(), config.getZookeeperSessionTimeoutMs());
}
Expand All @@ -245,4 +232,6 @@ public MetadataStoreExtended createConfigurationMetadataStore() throws MetadataS
return PulsarResources.createMetadataStore(config.getConfigurationStoreServers(),
config.getZookeeperSessionTimeoutMs());
}

private static final Logger LOG = LoggerFactory.getLogger(DiscoveryService.class);
}
Loading

0 comments on commit dd32435

Please sign in to comment.