Skip to content

Commit

Permalink
[pulsar-broker] broker resources use metadata-store api (apache#9346)
Browse files Browse the repository at this point in the history
* [pulsar-broker] broker resources use metadata-store api

fix test

* fix api
  • Loading branch information
rdhabalia authored Jan 30, 2021
1 parent 6b0d226 commit fa66a24
Show file tree
Hide file tree
Showing 9 changed files with 102 additions and 93 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -484,7 +484,7 @@ public void start() throws PulsarServerException {
coordinationService = new CoordinationServiceImpl(localMetadataStore);

configurationMetadataStore = createConfigurationMetadataStore();
pulsarResources = new PulsarResources(configurationMetadataStore);
pulsarResources = new PulsarResources(localMetadataStore, configurationMetadataStore);

orderedExecutor = OrderedExecutor.newBuilder()
.numThreads(config.getNumOrderedExecutorThreads())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,6 @@
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.Lists;
import com.google.errorprone.annotations.CanIgnoreReturnValue;
import java.net.MalformedURLException;
import java.net.URI;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
Expand All @@ -39,7 +37,6 @@
import javax.ws.rs.container.AsyncResponse;
import javax.ws.rs.core.Response;
import javax.ws.rs.core.Response.Status;
import javax.ws.rs.core.UriBuilder;
import org.apache.bookkeeper.util.ZkUtils;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
Expand Down Expand Up @@ -417,30 +414,6 @@ protected void validateTopicName(String property, String cluster, String namespa
}
}

/**
* Redirect the call to the specified broker.
*
* @param broker
* Broker name
* @throws MalformedURLException
* In case the redirect happens
*/
protected void validateBrokerName(String broker) throws MalformedURLException {
String brokerUrl = String.format("http://%s", broker);
String brokerUrlTls = String.format("https://%s", broker);
if (!brokerUrl.equals(pulsar().getSafeWebServiceAddress())
&& !brokerUrlTls.equals(pulsar().getWebServiceAddressTls())) {
String[] parts = broker.split(":");
checkArgument(parts.length == 2, String.format("Invalid broker url %s", broker));
String host = parts[0];
int port = Integer.parseInt(parts[1]);

URI redirect = UriBuilder.fromUri(uri.getRequestUri()).host(host).port(port).build();
log.debug("[{}] Redirecting the rest call to {}: broker={}", clientAppId(), redirect, broker);
throw new WebApplicationException(Response.temporaryRedirect(redirect).build());
}
}

protected Policies getNamespacePolicies(NamespaceName namespaceName) {
try {
final String namespace = namespaceName.toString();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,13 +97,24 @@ public CompletableFuture<Void> setAsync(String path, Function<T, T> modifyFuncti
return cache.readModifyUpdate(path, modifyFunction);
}

public void create(String path, T data) throws MetadataStoreException {
create(path, t -> data);
public void setWithCreate(String path, Function<Optional<T>, T> createFunction) throws MetadataStoreException {
try {
setWithCreateAsync(path, createFunction).get();
} catch (ExecutionException e) {
throw (e.getCause() instanceof MetadataStoreException) ? (MetadataStoreException) e.getCause()
: new MetadataStoreException(e.getCause());
} catch (Exception e) {
throw new MetadataStoreException("Failed to set/create " + path, e);
}
}

public void create(String path, Function<Optional<T>, T> createFunction) throws MetadataStoreException {
public CompletableFuture<Void> setWithCreateAsync(String path, Function<Optional<T>, T> createFunction) {
return cache.readModifyUpdateOrCreate(path, createFunction);
}

public void create(String path, T data) throws MetadataStoreException {
try {
createAsync(path, createFunction).get();
createAsync(path, data).get();
} catch (ExecutionException e) {
throw (e.getCause() instanceof MetadataStoreException) ? (MetadataStoreException) e.getCause()
: new MetadataStoreException(e.getCause());
Expand All @@ -113,11 +124,7 @@ public void create(String path, Function<Optional<T>, T> createFunction) throws
}

public CompletableFuture<Void> createAsync(String path, T data) {
return createAsync(path, t -> data);
}

public CompletableFuture<Void> createAsync(String path, Function<Optional<T>, T> createFunction) {
return cache.readModifyUpdateOrCreate(path, createFunction);
return cache.create(path, data);
}

public void delete(String path) throws MetadataStoreException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import io.swagger.annotations.ApiOperation;
import io.swagger.annotations.ApiResponse;
import io.swagger.annotations.ApiResponses;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
Expand All @@ -40,14 +41,13 @@
import javax.ws.rs.container.Suspended;
import javax.ws.rs.core.Response;
import javax.ws.rs.core.Response.Status;
import org.apache.bookkeeper.util.ZkUtils;
import org.apache.pulsar.broker.PulsarService.State;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.admin.AdminResource;
import org.apache.pulsar.broker.loadbalance.LoadManager;
import org.apache.pulsar.broker.namespace.NamespaceService;
import org.apache.pulsar.broker.service.BrokerService;
import org.apache.pulsar.broker.service.Subscription;
import org.apache.pulsar.broker.web.PulsarWebResource;
import org.apache.pulsar.broker.web.RestException;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
Expand All @@ -57,19 +57,14 @@
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.common.conf.InternalConfigurationData;
import org.apache.pulsar.common.policies.data.NamespaceOwnershipStatus;
import org.apache.pulsar.common.util.ObjectMapperFactory;
import org.apache.pulsar.zookeeper.ZooKeeperDataCache;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.ZooDefs;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Broker admin base.
*/
public class BrokersBase extends AdminResource {
public class BrokersBase extends PulsarWebResource {
private static final Logger LOG = LoggerFactory.getLogger(BrokersBase.class);
private int serviceConfigZkVersion = -1;

@GET
@Path("/{cluster}")
Expand All @@ -90,7 +85,7 @@ public Set<String> getActiveBrokers(@PathParam("cluster") String cluster) throws

try {
// Add Native brokers
return pulsar().getLocalZkCache().getChildren(LoadManager.LOADBALANCE_BROKERS_ROOT);
return new HashSet<>(dynamicConfigurationResources().getChildren(LoadManager.LOADBALANCE_BROKERS_ROOT));
} catch (Exception e) {
LOG.error("[{}] Failed to get active broker list: cluster={}", clientAppId(), cluster, e);
throw new RestException(e);
Expand Down Expand Up @@ -134,7 +129,7 @@ public Map<String, NamespaceOwnershipStatus> getOwnedNamespaces(@PathParam("clus
public void updateDynamicConfiguration(@PathParam("configName") String configName,
@PathParam("configValue") String configValue) throws Exception {
validateSuperUserAccess();
updateDynamicConfigurationOnZk(configName, configValue);
persistDynamicConfiguration(configName, configValue);
}

@DELETE
Expand All @@ -159,12 +154,8 @@ public void deleteDynamicConfiguration(@PathParam("configName") String configNam
@ApiResponse(code = 500, message = "Internal server error")})
public Map<String, String> getAllDynamicConfigurations() throws Exception {
validateSuperUserAccess();

ZooKeeperDataCache<Map<String, String>> dynamicConfigurationCache = pulsar().getBrokerService()
.getDynamicConfigurationCache();
Map<String, String> configurationMap = null;
try {
configurationMap = dynamicConfigurationCache.get(BROKER_SERVICE_CONFIGURATION_PATH)
return dynamicConfigurationResources().get(BROKER_SERVICE_CONFIGURATION_PATH)
.orElseThrow(() -> new RestException(Status.NOT_FOUND, "Couldn't find configuration in zk"));
} catch (RestException e) {
LOG.error("[{}] couldn't find any configuration in zk {}", clientAppId(), e.getMessage(), e);
Expand All @@ -173,7 +164,6 @@ public Map<String, String> getAllDynamicConfigurations() throws Exception {
LOG.error("[{}] Failed to retrieve configuration from zk {}", clientAppId(), e.getMessage(), e);
throw new RestException(e);
}
return configurationMap;
}

@GET
Expand Down Expand Up @@ -204,29 +194,17 @@ public Map<String, String> getRuntimeConfiguration() {
* @param configValue
* : configuration value
*/
private synchronized void updateDynamicConfigurationOnZk(String configName, String configValue) {
private synchronized void persistDynamicConfiguration(String configName, String configValue) {
try {
if (!BrokerService.validateDynamicConfiguration(configName, configValue)) {
throw new RestException(Status.PRECONDITION_FAILED, " Invalid dynamic-config value");
}
if (BrokerService.isDynamicConfiguration(configName)) {
ZooKeeperDataCache<Map<String, String>> dynamicConfigurationCache = pulsar().getBrokerService()
.getDynamicConfigurationCache();
Map<String, String> configurationMap = dynamicConfigurationCache.get(BROKER_SERVICE_CONFIGURATION_PATH)
.orElse(null);
if (configurationMap != null) {
configurationMap.put(configName, configValue);
byte[] content = ObjectMapperFactory.getThreadLocal().writeValueAsBytes(configurationMap);
dynamicConfigurationCache.invalidate(BROKER_SERVICE_CONFIGURATION_PATH);
serviceConfigZkVersion = localZk()
.setData(BROKER_SERVICE_CONFIGURATION_PATH, content, serviceConfigZkVersion).getVersion();
} else {
configurationMap = Maps.newHashMap();
dynamicConfigurationResources().setWithCreate(BROKER_SERVICE_CONFIGURATION_PATH, (old) -> {
Map<String, String> configurationMap = old.isPresent() ? old.get() : Maps.newHashMap();
configurationMap.put(configName, configValue);
byte[] content = ObjectMapperFactory.getThreadLocal().writeValueAsBytes(configurationMap);
ZkUtils.createFullPathOptimistic(localZk(), BROKER_SERVICE_CONFIGURATION_PATH, content,
ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
}
return configurationMap;
});
LOG.info("[{}] Updated Service configuration {}/{}", clientAppId(), configName, configValue);
} else {
if (LOG.isDebugEnabled()) {
Expand Down Expand Up @@ -393,17 +371,12 @@ private void healthcheckReadLoop(CompletableFuture<Reader<String>> readerFuture,
private synchronized void deleteDynamicConfigurationOnZk(String configName) {
try {
if (BrokerService.isDynamicConfiguration(configName)) {
ZooKeeperDataCache<Map<String, String>> dynamicConfigurationCache = pulsar().getBrokerService()
.getDynamicConfigurationCache();
Map<String, String> configurationMap = dynamicConfigurationCache.get(BROKER_SERVICE_CONFIGURATION_PATH)
.orElse(null);
if (configurationMap != null && configurationMap.containsKey(configName)) {
configurationMap.remove(configName);
byte[] content = ObjectMapperFactory.getThreadLocal().writeValueAsBytes(configurationMap);
dynamicConfigurationCache.invalidate(BROKER_SERVICE_CONFIGURATION_PATH);
serviceConfigZkVersion = localZk()
.setData(BROKER_SERVICE_CONFIGURATION_PATH, content, serviceConfigZkVersion).getVersion();
}
dynamicConfigurationResources().set(BROKER_SERVICE_CONFIGURATION_PATH, (old) -> {
if (old != null) {
old.remove(configName);
}
return old;
});
LOG.info("[{}] Deleted Service configuration {}", clientAppId(), configName);
} else {
if (LOG.isDebugEnabled()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -685,7 +685,8 @@ public void setNamespaceIsolationPolicy(
NamespaceIsolationPolicies nsIsolationPolicies = namespaceIsolationPolicies()
.getPolicies(nsIsolationPolicyPath).orElseGet(() -> {
try {
namespaceIsolationPolicies().create(nsIsolationPolicyPath, Collections.emptyMap());
namespaceIsolationPolicies().setWithCreate(nsIsolationPolicyPath,
(p) -> Collections.emptyMap());
return new NamespaceIsolationPolicies();
} catch (Exception e) {
throw new RestException(e);
Expand Down Expand Up @@ -835,7 +836,8 @@ public void deleteNamespaceIsolationPolicy(
NamespaceIsolationPolicies nsIsolationPolicies = namespaceIsolationPolicies()
.getPolicies(nsIsolationPolicyPath).orElseGet(() -> {
try {
namespaceIsolationPolicies().create(nsIsolationPolicyPath, Collections.emptyMap());
namespaceIsolationPolicies().setWithCreate(nsIsolationPolicyPath,
(p) -> Collections.emptyMap());
return new NamespaceIsolationPolicies();
} catch (Exception e) {
throw new RestException(e);
Expand Down Expand Up @@ -893,7 +895,7 @@ public void setFailureDomain(
try {
String domainPath = joinPath(pulsar().getConfigurationCache().CLUSTER_FAILURE_DOMAIN_ROOT, domainName);
FailureDomainResources failureDomainListCache = clusterResources().getFailureDomainResources();
failureDomainListCache.create(domainPath, old -> domain);
failureDomainListCache.setWithCreate(domainPath, old -> domain);
} catch (NotFoundException nne) {
log.warn("[{}] Failed to update domain {}. clusters {} Does not exist", clientAppId(), cluster,
domainName);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.broker.admin.impl;

import com.fasterxml.jackson.core.type.TypeReference;
import java.util.Map;
import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;

public class DynamicConfigurationResources extends BaseResources<Map<String, String>> {

public DynamicConfigurationResources(MetadataStoreExtended store) {
super(store, new TypeReference<Map<String, String>>(){});
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,12 @@ public class PulsarResources {
private TenantResources tenatResources;
private ClusterResources clusterResources;
private NamespaceResources namespaceResources;
private DynamicConfigurationResources dynamicConfigResources;

public PulsarResources(MetadataStoreExtended configurationMetadataStore) {
public PulsarResources(MetadataStoreExtended localMetadataStore, MetadataStoreExtended configurationMetadataStore) {
tenatResources = new TenantResources(configurationMetadataStore);
clusterResources = new ClusterResources(configurationMetadataStore);
namespaceResources = new NamespaceResources(configurationMetadataStore);
dynamicConfigResources = new DynamicConfigurationResources(localMetadataStore);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.admin.AdminResource;
import org.apache.pulsar.broker.admin.impl.ClusterResources;
import org.apache.pulsar.broker.admin.impl.DynamicConfigurationResources;
import org.apache.pulsar.broker.admin.impl.NamespaceResources;
import org.apache.pulsar.broker.admin.impl.NamespaceResources.IsolationPolicyResources;
import org.apache.pulsar.broker.admin.impl.TenantResources;
Expand Down Expand Up @@ -879,6 +880,10 @@ protected IsolationPolicyResources namespaceIsolationPolicies(){
return namespaceResources().getIsolationPolicies();
}

protected DynamicConfigurationResources dynamicConfigurationResources() {
return pulsar().getPulsarResources().getDynamicConfigResources();
}

public static ObjectMapper jsonMapper() {
return ObjectMapperFactory.getThreadLocal();
}
Expand Down Expand Up @@ -992,4 +997,28 @@ protected CompletableFuture<Void> canUpdateCluster(String tenant, Set<String> ol
return activeNamespaceFuture.isEmpty() ? CompletableFuture.completedFuture(null)
: FutureUtil.waitForAll(activeNamespaceFuture);
}

/**
* Redirect the call to the specified broker.
*
* @param broker
* Broker name
* @throws MalformedURLException
* In case the redirect happens
*/
protected void validateBrokerName(String broker) throws MalformedURLException {
String brokerUrl = String.format("http://%s", broker);
String brokerUrlTls = String.format("https://%s", broker);
if (!brokerUrl.equals(pulsar().getSafeWebServiceAddress())
&& !brokerUrlTls.equals(pulsar().getWebServiceAddressTls())) {
String[] parts = broker.split(":");
checkArgument(parts.length == 2, String.format("Invalid broker url %s", broker));
String host = parts[0];
int port = Integer.parseInt(parts[1]);

URI redirect = UriBuilder.fromUri(uri.getRequestUri()).host(host).port(port).build();
log.debug("[{}] Redirecting the rest call to {}: broker={}", clientAppId(), redirect, broker);
throw new WebApplicationException(Response.temporaryRedirect(redirect).build());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -134,10 +134,6 @@ public void setup() throws Exception {

clusters = spy(new Clusters());
clusters.setPulsar(pulsar);
/*doReturn(mockZooKeeperGlobal).when(clusters).globalZk();
doReturn(configurationCache.clustersCache()).when(clusters).clustersCache();
doReturn(configurationCache.clustersListCache()).when(clusters).clustersListCache();
doReturn(configurationCache.namespaceIsolationPoliciesCache()).when(clusters).namespaceIsolationPoliciesCache();*/
doReturn("test").when(clusters).clientAppId();
doNothing().when(clusters).validateSuperUserAccess();

Expand All @@ -160,11 +156,7 @@ public void setup() throws Exception {
doNothing().when(namespaces).validateAdminAccessForTenant("new-property");

brokers = spy(new Brokers());
brokers.setServletContext(new MockServletContext());
brokers.setPulsar(pulsar);
doReturn(mockZooKeeperGlobal).when(brokers).globalZk();
doReturn(mockZooKeeper).when(brokers).localZk();
doReturn(configurationCache.clustersListCache()).when(brokers).clustersListCache();
doReturn("test").when(brokers).clientAppId();
doNothing().when(brokers).validateSuperUserAccess();

Expand Down

0 comments on commit fa66a24

Please sign in to comment.