Skip to content

Commit

Permalink
[PIP-82] [pulsar-broker] Add resource-group configuration listener. (a…
Browse files Browse the repository at this point in the history
…pache#10657)

* Add resource-group configuration listener.

* Fix ResourceGroupServiceTest unit test.

* Incorporate review feedback.

* Added javadoc comments.

Co-authored-by: Bharani Chadalavada <[email protected]>
  • Loading branch information
bharanic-dev and Bharani Chadalavada authored Jun 22, 2021
1 parent dbcaa98 commit b83ea98
Show file tree
Hide file tree
Showing 12 changed files with 960 additions and 376 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import static org.apache.commons.lang3.StringUtils.isBlank;
import static org.apache.commons.lang3.StringUtils.isNotBlank;
import static org.apache.pulsar.broker.cache.ConfigurationCacheService.POLICIES;
import static org.apache.pulsar.broker.resourcegroup.ResourceUsageTransportManager.DISABLE_RESOURCE_USAGE_TRANSPORT_MANAGER;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
Expand Down Expand Up @@ -91,6 +92,8 @@
import org.apache.pulsar.broker.loadbalance.impl.LoadManagerShared;
import org.apache.pulsar.broker.namespace.NamespaceService;
import org.apache.pulsar.broker.protocol.ProtocolHandlers;
import org.apache.pulsar.broker.resourcegroup.ResourceGroupService;
import org.apache.pulsar.broker.resourcegroup.ResourceUsageTopicTransportManager;
import org.apache.pulsar.broker.resourcegroup.ResourceUsageTransportManager;
import org.apache.pulsar.broker.resources.PulsarResources;
import org.apache.pulsar.broker.service.BrokerService;
Expand Down Expand Up @@ -190,6 +193,7 @@ public class PulsarService implements AutoCloseable {
private LocalZooKeeperConnectionService localZooKeeperConnectionProvider;
private Compactor compactor;
private ResourceUsageTransportManager resourceUsageTransportManager;
private ResourceGroupService resourceGroupServiceManager;

private final ScheduledExecutorService executor;
private final ScheduledExecutorService cacheExecutor;
Expand Down Expand Up @@ -797,12 +801,14 @@ config, localMetadataStore, getZkClient(), bkClientFactory, ioEventLoopGroup
}

// Start the task to publish resource usage, if necessary
this.resourceUsageTransportManager = DISABLE_RESOURCE_USAGE_TRANSPORT_MANAGER;
if (isNotBlank(config.getResourceUsageTransportClassName())) {
Class<?> clazz = Class.forName(config.getResourceUsageTransportClassName());
Constructor<?> ctor = clazz.getConstructor(PulsarService.class);
Object object = ctor.newInstance(new Object[]{this});
this.resourceUsageTransportManager = (ResourceUsageTransportManager) object;
this.resourceUsageTransportManager = (ResourceUsageTopicTransportManager) object;
}
this.resourceGroupServiceManager = new ResourceGroupService(this);

long currentTimestamp = System.currentTimeMillis();
final long bootstrapTimeSeconds = TimeUnit.MILLISECONDS.toSeconds(currentTimestamp - startTimestamp);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,9 +70,10 @@ public enum ResourceGroupRefTypes {

// Default ctor: it is not expected that anything outside of this package will need to directly
// construct a ResourceGroup (i.e., without going through ResourceGroupService).
protected ResourceGroup(ResourceGroupService rgs, ResourceGroupConfigInfo rgConfig) {
protected ResourceGroup(ResourceGroupService rgs, String name,
org.apache.pulsar.common.policies.data.ResourceGroup rgConfig) {
this.rgs = rgs;
this.resourceGroupName = rgConfig.getName();
this.resourceGroupName = name;
this.setResourceGroupMonitoringClassFields();
this.setResourceGroupConfigParameters(rgConfig);
this.setDefaultResourceUsageTransportHandlers();
Expand All @@ -81,10 +82,11 @@ protected ResourceGroup(ResourceGroupService rgs, ResourceGroupConfigInfo rgConf
// ctor for overriding the transport-manager fill/set buffer.
// It is not expected that anything outside of this package will need to directly
// construct a ResourceGroup (i.e., without going through ResourceGroupService).
protected ResourceGroup(ResourceGroupService rgs, ResourceGroupConfigInfo rgConfig,
protected ResourceGroup(ResourceGroupService rgs, String rgName,
org.apache.pulsar.common.policies.data.ResourceGroup rgConfig,
ResourceUsagePublisher rgPublisher, ResourceUsageConsumer rgConsumer) {
this.rgs = rgs;
this.resourceGroupName = rgConfig.getName();
this.resourceGroupName = rgName;
this.setResourceGroupMonitoringClassFields();
this.setResourceGroupConfigParameters(rgConfig);
this.ruPublisher = rgPublisher;
Expand Down Expand Up @@ -128,7 +130,7 @@ public ResourceGroup(ResourceGroup other) {
}
}

protected void updateResourceGroup(ResourceGroupConfigInfo rgConfig) {
protected void updateResourceGroup(org.apache.pulsar.common.policies.data.ResourceGroup rgConfig) {
this.setResourceGroupConfigParameters(rgConfig);
}

Expand Down Expand Up @@ -179,7 +181,7 @@ protected ResourceGroupOpStatus registerUsage(String name, ResourceGroupRefTypes
if (this.resourceGroupTenantRefs.size() + this.resourceGroupNamespaceRefs.size() == 0) {
log.info("unRegisterUsage for RG={}: un-registering from transport-mgr", this.resourceGroupName);
transportManager.unregisterResourceUsageConsumer(this.ruConsumer);
// ToDo: call the unregister for publisher after typo is fixed in transport manager
transportManager.unregisterResourceUsagePublisher(this.ruPublisher);
}
}

Expand Down Expand Up @@ -293,8 +295,6 @@ protected void updateLocalQuota(ResourceGroupMonitoringClass monClass, BytesAndM
} finally {
monEntity.localUsageStatsLock.unlock();
}
log.info("updateLocalQuota for RG={}: set local {} quota to bytes={}, messages={}",
this.resourceGroupName, monClass, newQuota.bytes, newQuota.messages);
}

private void checkMonitoringClass(ResourceGroupMonitoringClass monClass) throws PulsarAdminException {
Expand Down Expand Up @@ -420,16 +420,16 @@ private void setResourceGroupMonitoringClassFields() {
}
}

private void setResourceGroupConfigParameters(ResourceGroupConfigInfo rgConfig) {
private void setResourceGroupConfigParameters(org.apache.pulsar.common.policies.data.ResourceGroup rgConfig) {
int idx;

idx = ResourceGroupMonitoringClass.Publish.ordinal();
this.monitoringClassFields[idx].configValuesPerPeriod.bytes = rgConfig.getPublishBytesPerPeriod();
this.monitoringClassFields[idx].configValuesPerPeriod.messages = rgConfig.getPublishMessagesPerPeriod();
this.monitoringClassFields[idx].configValuesPerPeriod.bytes = rgConfig.getPublishRateInBytes();
this.monitoringClassFields[idx].configValuesPerPeriod.messages = rgConfig.getPublishRateInMsgs();

idx = ResourceGroupMonitoringClass.Dispatch.ordinal();
this.monitoringClassFields[idx].configValuesPerPeriod.bytes = rgConfig.getDispatchBytesPerPeriod();
this.monitoringClassFields[idx].configValuesPerPeriod.messages = rgConfig.getDispatchMessagesPerPeriod();
this.monitoringClassFields[idx].configValuesPerPeriod.bytes = rgConfig.getDispatchRateInBytes();
this.monitoringClassFields[idx].configValuesPerPeriod.messages = rgConfig.getDispatchRateInMsgs();
}

private void setDefaultResourceUsageTransportHandlers() {
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,159 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.broker.resourcegroup;

import static org.apache.pulsar.broker.cache.ConfigurationCacheService.RESOURCEGROUPS;
import static org.apache.pulsar.common.policies.path.PolicyPath.path;
import com.google.common.collect.Sets;
import java.util.HashSet;
import java.util.Set;
import java.util.function.Consumer;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.resources.ResourceGroupResources;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.common.policies.data.ResourceGroup;
import org.apache.pulsar.metadata.api.Notification;
import org.apache.pulsar.metadata.api.NotificationType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Resource Group Config Listener
*
* <P>Meta data store listener of updates to resource group config.
* <P>Listens to resource group configuration changes and updates internal datastructures.
*
* @see <a href="https://github.com/apache/pulsar/wiki/PIP-82%3A-Tenant-and-namespace-level-rate-limiting">Global-quotas</a>
*
*/
public class ResourceGroupConfigListener implements Consumer<Notification> {

private static final Logger LOG = LoggerFactory.getLogger(ResourceGroupConfigListener.class);
private final ResourceGroupService rgService;
private final PulsarService pulsarService;
private final ResourceGroupResources rgResources;
private final ResourceGroupNamespaceConfigListener rgNamespaceConfigListener;

public ResourceGroupConfigListener(ResourceGroupService rgService, PulsarService pulsarService) {
this.rgService = rgService;
this.pulsarService = pulsarService;
this.rgResources = pulsarService.getPulsarResources().getResourcegroupResources();
loadAllResourceGroups();
this.rgResources.getStore().registerListener(this);
rgNamespaceConfigListener = new ResourceGroupNamespaceConfigListener(
rgService, pulsarService, this);
}

private void loadAllResourceGroups() {
rgResources.getChildrenAsync(path(RESOURCEGROUPS)).whenCompleteAsync((rgList, ex) -> {
if (ex != null) {
LOG.error("Exception when fetching resource groups", ex);
return;
}
final Set<String> existingSet = rgService.resourceGroupGetAll();
HashSet<String> newSet = new HashSet<>();

for (String rgName : rgList) {
newSet.add(rgName);
}

final Sets.SetView<String> deleteList = Sets.difference(existingSet, newSet);

for (String rgName: deleteList) {
deleteResourceGroup(rgName);
}

final Sets.SetView<String> addList = Sets.difference(newSet, existingSet);
for (String rgName: addList) {
final String resourceGroupPath = path(RESOURCEGROUPS, rgName);
pulsarService.getPulsarResources().getResourcegroupResources()
.getAsync(resourceGroupPath).thenAcceptAsync((optionalRg) -> {
ResourceGroup rg = optionalRg.get();
createResourceGroup(rgName, rg);
}).exceptionally((ex1) -> {
LOG.error("Failed to fetch resourceGroup", ex1);
return null;
});
}
});
}

public synchronized void deleteResourceGroup(String rgName) {
try {
if (rgService.resourceGroupGet(rgName) != null) {
LOG.info("Deleting resource group {}", rgName);
rgService.resourceGroupDelete(rgName);
}
} catch (PulsarAdminException e) {
LOG.error("Got exception while deleting resource group {}, {}", rgName, e);
}
}

public synchronized void createResourceGroup(String rgName, ResourceGroup rg) {
if (rgService.resourceGroupGet(rgName) == null) {
LOG.info("Creating resource group {}, {}", rgName, rg.toString());
try {
rgService.resourceGroupCreate(rgName, rg);
} catch (PulsarAdminException ex1) {
LOG.error("Got an exception while creating RG {}", rgName, ex1);
}
}
}

private void updateResourceGroup(String notifyPath) {
String rgName = notifyPath.substring(notifyPath.lastIndexOf('/') + 1);

rgResources.getAsync(notifyPath).whenComplete((optionalRg, ex) -> {
if (ex != null) {
LOG.error("Exception when getting resource group {}", rgName, ex);
return;
}
ResourceGroup rg = optionalRg.get();
try {
LOG.info("Updating resource group {}, {}", rgName, rg.toString());
rgService.resourceGroupUpdate(rgName, rg);
} catch (PulsarAdminException ex1) {
LOG.error("Got an exception while creating resource group {}", rgName, ex1);
}
});
}

@Override
public void accept(Notification notification) {
String notifyPath = notification.getPath();

if (!notifyPath.startsWith(path(RESOURCEGROUPS))) {
return;
}
LOG.info("Metadata store notification: Path {}, Type {}", notifyPath, notification.getType());

String rgName = notifyPath.substring(notifyPath.lastIndexOf('/') + 1);
if (notification.getType() == NotificationType.ChildrenChanged) {
loadAllResourceGroups();
} else if (!RESOURCEGROUPS.equals(rgName)) {
switch (notification.getType()) {
case Modified:
updateResourceGroup(notifyPath);
break;
default:
break;
}
}
}
}
Loading

0 comments on commit b83ea98

Please sign in to comment.