Skip to content

Commit

Permalink
Expose broker URLs in loadbalancer registration node (apache#11)
Browse files Browse the repository at this point in the history
  • Loading branch information
merlimat authored Sep 15, 2016
1 parent 5771aa4 commit 47a213c
Show file tree
Hide file tree
Showing 23 changed files with 87 additions and 57 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,9 @@
import io.swagger.annotations.ApiResponse;
import io.swagger.annotations.ApiResponses;
import com.yahoo.pulsar.common.naming.NamespaceName;
import com.yahoo.pulsar.common.policies.data.loadbalancer.LoadReport;
import com.yahoo.pulsar.common.stats.AllocatorStats;
import com.yahoo.pulsar.broker.loadbalance.ResourceUnit;
import com.yahoo.pulsar.broker.loadbalance.data.LoadReport;
import com.yahoo.pulsar.broker.loadbalance.impl.SimpleLoadManagerImpl;
import com.yahoo.pulsar.broker.stats.AllocatorStatsGenerator;
import com.yahoo.pulsar.broker.stats.BookieClientStatsGenerator;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@

import com.google.common.collect.Lists;
import com.yahoo.pulsar.common.naming.ServiceUnitId;
import com.yahoo.pulsar.common.policies.data.loadbalancer.LoadReport;
import com.yahoo.pulsar.broker.PulsarServerException;
import com.yahoo.pulsar.broker.loadbalance.data.LoadReport;
import com.yahoo.pulsar.broker.stats.Metrics;

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

import java.util.Map;

import com.yahoo.pulsar.broker.loadbalance.data.ResourceUsage;
import com.yahoo.pulsar.common.policies.data.loadbalancer.ResourceUsage;

/*
ResourceDescription is an abstraction to represent resources like memory, cpu, network and io combined;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
import com.yahoo.pulsar.broker.loadbalance.ResourceUnit;
import com.yahoo.pulsar.broker.loadbalance.ServiceRequest;
import com.yahoo.pulsar.broker.loadbalance.ServiceUnit;
import com.yahoo.pulsar.broker.loadbalance.data.SystemResourceUsage;
import com.yahoo.pulsar.common.policies.data.loadbalancer.SystemResourceUsage;
import com.yahoo.pulsar.common.util.ObjectMapperFactory;

public class PulsarLoadReportImpl implements LoadReport {
Expand All @@ -50,8 +50,8 @@ public static LoadReport parse(String loadReportJson) {
PulsarLoadReportImpl pulsarLoadReport = new PulsarLoadReportImpl();
ObjectMapper mapper = ObjectMapperFactory.create();
try {
com.yahoo.pulsar.broker.loadbalance.data.LoadReport report = mapper.readValue(loadReportJson,
com.yahoo.pulsar.broker.loadbalance.data.LoadReport.class);
com.yahoo.pulsar.common.policies.data.loadbalancer.LoadReport report = mapper.readValue(loadReportJson,
com.yahoo.pulsar.common.policies.data.loadbalancer.LoadReport.class);
SystemResourceUsage sru = report.getSystemResourceUsage();
String resourceUnitName = report.getName();
pulsarLoadReport.resourceDescription = new PulsarResourceDescription();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
import java.util.Map;

import com.yahoo.pulsar.broker.loadbalance.ResourceDescription;
import com.yahoo.pulsar.broker.loadbalance.data.ResourceUsage;
import com.yahoo.pulsar.common.policies.data.loadbalancer.ResourceUsage;

public class PulsarResourceDescription extends ResourceDescription {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@

import com.yahoo.pulsar.broker.loadbalance.LoadRanker;
import com.yahoo.pulsar.broker.loadbalance.ResourceDescription;
import com.yahoo.pulsar.broker.loadbalance.data.ResourceUsage;
import com.yahoo.pulsar.common.policies.data.loadbalancer.ResourceUsage;

import java.util.Comparator;
import java.util.Map;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,11 @@
import com.yahoo.pulsar.common.naming.NamespaceName;
import com.yahoo.pulsar.common.naming.ServiceUnitId;
import com.yahoo.pulsar.common.policies.data.ResourceQuota;
import com.yahoo.pulsar.common.policies.data.loadbalancer.LoadReport;
import com.yahoo.pulsar.common.policies.data.loadbalancer.NamespaceBundleStats;
import com.yahoo.pulsar.common.policies.data.loadbalancer.ResourceUnitRanking;
import com.yahoo.pulsar.common.policies.data.loadbalancer.SystemResourceUsage;
import com.yahoo.pulsar.common.policies.data.loadbalancer.SystemResourceUsage.ResourceType;
import com.yahoo.pulsar.common.util.ObjectMapperFactory;
import com.yahoo.pulsar.broker.PulsarService;
import com.yahoo.pulsar.broker.PulsarServerException;
Expand All @@ -68,11 +73,6 @@
import com.yahoo.pulsar.broker.loadbalance.LoadRanker;
import com.yahoo.pulsar.broker.loadbalance.PlacementStrategy;
import com.yahoo.pulsar.broker.loadbalance.ResourceUnit;
import com.yahoo.pulsar.broker.loadbalance.data.LoadReport;
import com.yahoo.pulsar.broker.loadbalance.data.NamespaceBundleStats;
import com.yahoo.pulsar.broker.loadbalance.data.ResourceUnitRanking;
import com.yahoo.pulsar.broker.loadbalance.data.SystemResourceUsage;
import com.yahoo.pulsar.broker.loadbalance.data.SystemResourceUsage.ResourceType;
import com.yahoo.pulsar.broker.stats.Metrics;
import com.yahoo.pulsar.client.admin.PulsarAdmin;
import com.yahoo.pulsar.client.api.Authentication;
Expand Down Expand Up @@ -177,7 +177,8 @@ public SimpleLoadManagerImpl(PulsarService pulsar) {
this.realtimeResourceQuotas.set(new HashMap<>());
this.realtimeAvgResourceQuota = new ResourceQuota();
placementStrategy = new WRRPlacementStrategy();
lastLoadReport = new LoadReport();
lastLoadReport = new LoadReport(pulsar.getWebServiceAddress(), pulsar.getWebServiceAddressTls(),
pulsar.getBrokerServiceUrl(), pulsar.getBrokerServiceUrlTls());
brokerHostUsage = new BrokerHostUsage(pulsar);
loadReportCacheZk = new ZooKeeperDataCache<LoadReport>(pulsar.getLocalZkCache()) {
@Override
Expand Down Expand Up @@ -279,7 +280,7 @@ public void start() throws PulsarServerException {
throw new PulsarServerException(e);
}
}

@Override
public void disableBroker() throws Exception {
if (isNotEmpty(brokerZnodePath)) {
Expand Down Expand Up @@ -625,7 +626,7 @@ public void writeResourceQuotasToZooKeeper() throws Exception {
* - Available capacity for weighted random selection (weightedRandomSelection): ranks ResourceUnits units based on
* estimation of their capacity which is basically how many bundles each ResourceUnit is able can handle with its
* available resources (CPU, memory, network, etc);
*
*
* - Load percentage for least loaded server (leastLoadedServer): ranks ResourceUnits units based on estimation of
* their load percentage which is basically how many percent of resource is allocated which is
* max(resource_actually_used, resource_quota)
Expand Down Expand Up @@ -1091,7 +1092,8 @@ public LoadReport generateLoadReport() throws Exception {
}

try {
LoadReport loadReport = new LoadReport();
LoadReport loadReport = new LoadReport(pulsar.getWebServiceAddress(), pulsar.getWebServiceAddressTls(),
pulsar.getBrokerServiceUrl(), pulsar.getBrokerServiceUrlTls());
loadReport.setName(String.format("%s:%s", pulsar.getHost(), pulsar.getConfiguration().getWebServicePort()));
SystemResourceUsage systemResourceUsage = this.getSystemResourceUsage();
loadReport.setOverLoaded(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,6 @@
import com.yahoo.pulsar.broker.admin.AdminResource;
import com.yahoo.pulsar.broker.authentication.AuthenticationService;
import com.yahoo.pulsar.broker.authorization.AuthorizationManager;
import com.yahoo.pulsar.broker.loadbalance.data.NamespaceBundleStats;
import com.yahoo.pulsar.broker.service.BrokerServiceException.PersistenceException;
import com.yahoo.pulsar.broker.service.BrokerServiceException.ServerMetadataException;
import com.yahoo.pulsar.broker.service.BrokerServiceException.ServiceUnitNotReadyException;
Expand All @@ -81,6 +80,7 @@
import com.yahoo.pulsar.common.policies.data.PersistentTopicStats;
import com.yahoo.pulsar.common.policies.data.Policies;
import com.yahoo.pulsar.common.policies.data.RetentionPolicies;
import com.yahoo.pulsar.common.policies.data.loadbalancer.NamespaceBundleStats;
import com.yahoo.pulsar.common.util.collections.ConcurrentOpenHashMap;
import com.yahoo.pulsar.zookeeper.ZooKeeperCacheListener;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,13 @@
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.yahoo.pulsar.broker.PulsarService;
import com.yahoo.pulsar.broker.loadbalance.data.NamespaceBundleStats;
import com.yahoo.pulsar.broker.service.persistent.PersistentTopic;
import com.yahoo.pulsar.broker.stats.BrokerOperabilityMetrics;
import com.yahoo.pulsar.broker.stats.ClusterReplicationMetrics;
import com.yahoo.pulsar.broker.stats.Metrics;
import com.yahoo.pulsar.broker.stats.NamespaceStats;
import com.yahoo.pulsar.common.naming.NamespaceBundle;
import com.yahoo.pulsar.common.policies.data.loadbalancer.NamespaceBundleStats;
import com.yahoo.pulsar.common.util.collections.ConcurrentOpenHashMap;
import com.yahoo.pulsar.utils.StatsOutputStream;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@
import com.yahoo.pulsar.common.policies.data.PersistentTopicInternalStats;
import com.yahoo.pulsar.common.policies.data.PersistentTopicInternalStats.CursorStats;
import com.yahoo.pulsar.common.policies.data.PersistentTopicInternalStats.LedgerInfo;
import com.yahoo.pulsar.common.policies.data.loadbalancer.NamespaceBundleStats;
import com.yahoo.pulsar.common.policies.data.PersistentTopicStats;
import com.yahoo.pulsar.common.policies.data.PublisherStats;
import com.yahoo.pulsar.common.policies.data.Policies;
Expand All @@ -67,7 +68,6 @@
import com.yahoo.pulsar.common.util.collections.ConcurrentOpenHashMap;
import com.yahoo.pulsar.common.util.collections.ConcurrentOpenHashSet;
import com.yahoo.pulsar.broker.admin.AdminResource;
import com.yahoo.pulsar.broker.loadbalance.data.NamespaceBundleStats;
import com.yahoo.pulsar.broker.service.Consumer;
import com.yahoo.pulsar.broker.service.BrokerService;
import com.yahoo.pulsar.broker.service.BrokerServiceException;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,6 @@
import com.yahoo.pulsar.broker.auth.MockedPulsarServiceBaseTest;
import com.yahoo.pulsar.broker.cache.ConfigurationCacheService;
import com.yahoo.pulsar.broker.loadbalance.ResourceUnit;
import com.yahoo.pulsar.broker.loadbalance.data.LoadReport;
import com.yahoo.pulsar.broker.stats.Metrics;
import com.yahoo.pulsar.broker.web.PulsarWebResource;
import com.yahoo.pulsar.broker.web.RestException;
Expand All @@ -83,6 +82,7 @@
import com.yahoo.pulsar.broker.web.PulsarWebResource;
import com.yahoo.pulsar.broker.web.RestException;
import com.yahoo.pulsar.common.policies.data.ResourceQuota;
import com.yahoo.pulsar.common.policies.data.loadbalancer.LoadReport;
import com.yahoo.pulsar.common.util.ObjectMapperFactory;

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,17 +61,12 @@
import com.yahoo.pulsar.broker.PulsarService;
import com.yahoo.pulsar.broker.ServiceConfiguration;
import com.yahoo.pulsar.broker.admin.AdminResource;
import com.yahoo.pulsar.broker.loadbalance.data.LoadReport;
import com.yahoo.pulsar.broker.loadbalance.data.NamespaceBundleStats;
import com.yahoo.pulsar.broker.loadbalance.data.ResourceUsage;
import com.yahoo.pulsar.broker.loadbalance.data.SystemResourceUsage;
import com.yahoo.pulsar.broker.loadbalance.impl.PulsarResourceDescription;
import com.yahoo.pulsar.broker.loadbalance.impl.ResourceAvailabilityRanker;
import com.yahoo.pulsar.broker.loadbalance.impl.SimpleLoadManagerImpl;
import com.yahoo.pulsar.broker.loadbalance.impl.SimpleResourceUnit;
import com.yahoo.pulsar.client.admin.PulsarAdmin;
import com.yahoo.pulsar.client.admin.internal.NamespacesImpl;
import com.yahoo.pulsar.client.admin.internal.BrokerStatsImpl;
import com.yahoo.pulsar.client.api.Authentication;
import com.yahoo.pulsar.common.naming.DestinationName;
import com.yahoo.pulsar.common.naming.NamespaceName;
Expand All @@ -81,6 +76,10 @@
import com.yahoo.pulsar.common.policies.data.NamespaceIsolationData;
import com.yahoo.pulsar.common.policies.data.Policies;
import com.yahoo.pulsar.common.policies.data.ResourceQuota;
import com.yahoo.pulsar.common.policies.data.loadbalancer.LoadReport;
import com.yahoo.pulsar.common.policies.data.loadbalancer.NamespaceBundleStats;
import com.yahoo.pulsar.common.policies.data.loadbalancer.ResourceUsage;
import com.yahoo.pulsar.common.policies.data.loadbalancer.SystemResourceUsage;
import com.yahoo.pulsar.common.policies.impl.NamespaceIsolationPolicies;
import com.yahoo.pulsar.common.util.ObjectMapperFactory;
import com.yahoo.pulsar.zookeeper.LocalBookkeeperEnsemble;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,12 +57,6 @@
import com.yahoo.pulsar.broker.PulsarService;
import com.yahoo.pulsar.broker.ServiceConfiguration;
import com.yahoo.pulsar.broker.admin.AdminResource;
import com.yahoo.pulsar.broker.loadbalance.data.BrokerUsage;
import com.yahoo.pulsar.broker.loadbalance.data.JvmUsage;
import com.yahoo.pulsar.broker.loadbalance.data.NamespaceBundleStats;
import com.yahoo.pulsar.broker.loadbalance.data.ResourceUnitRanking;
import com.yahoo.pulsar.broker.loadbalance.data.ResourceUsage;
import com.yahoo.pulsar.broker.loadbalance.data.SystemResourceUsage;
import com.yahoo.pulsar.broker.loadbalance.impl.PulsarLoadReportImpl;
import com.yahoo.pulsar.broker.loadbalance.impl.PulsarResourceDescription;
import com.yahoo.pulsar.broker.loadbalance.impl.ResourceAvailabilityRanker;
Expand All @@ -77,6 +71,12 @@
import com.yahoo.pulsar.common.policies.data.AutoFailoverPolicyType;
import com.yahoo.pulsar.common.policies.data.NamespaceIsolationData;
import com.yahoo.pulsar.common.policies.data.ResourceQuota;
import com.yahoo.pulsar.common.policies.data.loadbalancer.BrokerUsage;
import com.yahoo.pulsar.common.policies.data.loadbalancer.JvmUsage;
import com.yahoo.pulsar.common.policies.data.loadbalancer.NamespaceBundleStats;
import com.yahoo.pulsar.common.policies.data.loadbalancer.ResourceUnitRanking;
import com.yahoo.pulsar.common.policies.data.loadbalancer.ResourceUsage;
import com.yahoo.pulsar.common.policies.data.loadbalancer.SystemResourceUsage;
import com.yahoo.pulsar.common.policies.impl.NamespaceIsolationPolicies;
import com.yahoo.pulsar.common.util.ObjectMapperFactory;
import com.yahoo.pulsar.zookeeper.LocalBookkeeperEnsemble;
Expand Down Expand Up @@ -243,8 +243,8 @@ public void testPrimary() throws Exception {
LoadRanker lr = new ResourceAvailabilityRanker();

// inject the load report and rankings
Map<ResourceUnit, com.yahoo.pulsar.broker.loadbalance.data.LoadReport> loadReports = new HashMap<>();
com.yahoo.pulsar.broker.loadbalance.data.LoadReport loadReport = new com.yahoo.pulsar.broker.loadbalance.data.LoadReport();
Map<ResourceUnit, com.yahoo.pulsar.common.policies.data.loadbalancer.LoadReport> loadReports = new HashMap<>();
com.yahoo.pulsar.common.policies.data.loadbalancer.LoadReport loadReport = new com.yahoo.pulsar.common.policies.data.loadbalancer.LoadReport();
loadReport.setSystemResourceUsage(new SystemResourceUsage());
loadReports.put(ru1, loadReport);
setObjectField(SimpleLoadManagerImpl.class, loadManager, "currentLoadReports", loadReports);
Expand Down Expand Up @@ -349,7 +349,7 @@ public void testResourceDescription() {
public void testLoadReportParsing() throws Exception {

ObjectMapper mapper = ObjectMapperFactory.create();
com.yahoo.pulsar.broker.loadbalance.data.LoadReport reportData = new com.yahoo.pulsar.broker.loadbalance.data.LoadReport();
com.yahoo.pulsar.common.policies.data.loadbalancer.LoadReport reportData = new com.yahoo.pulsar.common.policies.data.loadbalancer.LoadReport();
reportData.setName("b1");
SystemResourceUsage resource = new SystemResourceUsage();
ResourceUsage resourceUsage = new ResourceUsage();
Expand Down Expand Up @@ -398,11 +398,11 @@ public void testDoLoadShedding() throws Exception {
stats.put("property/cluster/namespace1/0x00000000_0xFFFFFFFF", nsb1);
stats.put("property/cluster/namespace2/0x00000000_0xFFFFFFFF", nsb2);

Map<ResourceUnit, com.yahoo.pulsar.broker.loadbalance.data.LoadReport> loadReports = new HashMap<>();
com.yahoo.pulsar.broker.loadbalance.data.LoadReport loadReport1 = new com.yahoo.pulsar.broker.loadbalance.data.LoadReport();
Map<ResourceUnit, com.yahoo.pulsar.common.policies.data.loadbalancer.LoadReport> loadReports = new HashMap<>();
com.yahoo.pulsar.common.policies.data.loadbalancer.LoadReport loadReport1 = new com.yahoo.pulsar.common.policies.data.loadbalancer.LoadReport();
loadReport1.setSystemResourceUsage(systemResource);
loadReport1.setBundleStats(stats);
com.yahoo.pulsar.broker.loadbalance.data.LoadReport loadReport2 = new com.yahoo.pulsar.broker.loadbalance.data.LoadReport();
com.yahoo.pulsar.common.policies.data.loadbalancer.LoadReport loadReport2 = new com.yahoo.pulsar.common.policies.data.loadbalancer.LoadReport();
loadReport2.setSystemResourceUsage(new SystemResourceUsage());
loadReport2.setBundleStats(stats);
loadReports.put(ru1, loadReport1);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;

import com.yahoo.pulsar.broker.loadbalance.data.NamespaceBundleStats;
import com.yahoo.pulsar.broker.service.persistent.PersistentSubscription;
import com.yahoo.pulsar.broker.service.persistent.PersistentTopic;
import com.yahoo.pulsar.broker.stats.Metrics;
Expand All @@ -65,6 +64,7 @@
import com.yahoo.pulsar.client.impl.ProducerImpl;
import com.yahoo.pulsar.common.api.proto.PulsarApi.CommandSubscribe.SubType;
import com.yahoo.pulsar.common.naming.DestinationName;
import com.yahoo.pulsar.common.policies.data.loadbalancer.NamespaceBundleStats;

/**
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.yahoo.pulsar.broker.loadbalance.data;
package com.yahoo.pulsar.common.policies.data.loadbalancer;

import java.util.Map;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.yahoo.pulsar.broker.loadbalance.data;
package com.yahoo.pulsar.common.policies.data.loadbalancer;

import java.util.Map;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.yahoo.pulsar.broker.loadbalance.data;
package com.yahoo.pulsar.common.policies.data.loadbalancer;

import java.util.HashMap;
import java.util.HashSet;
Expand All @@ -23,16 +23,24 @@

import com.fasterxml.jackson.annotation.JsonIgnore;
import com.google.common.collect.Maps;
import com.yahoo.pulsar.broker.loadbalance.data.SystemResourceUsage.ResourceType;
import com.yahoo.pulsar.common.policies.data.loadbalancer.SystemResourceUsage.ResourceType;


/**
* This class represents the overall load of the broker - it includes overall {@link SystemResourceUsage} and
* {@link NamespaceUsage} for all the namespaces hosted by this broker.
*/
public class LoadReport {
private String name;

private final String webServiceUrl;
private final String webServiceUrlTls;

private final String pulsarServiceUrl;
private final String pulsarServieUrlTls;

private boolean isUnderLoaded;
private boolean isOverLoaded;
private String name;
private long timestamp;
private double msgRateIn;
private double msgRateOut;
Expand All @@ -42,6 +50,15 @@ public class LoadReport {
private long numBundles;

public LoadReport() {
this(null, null, null, null);
}

public LoadReport(String webServiceUrl, String webServiceUrlTls, String pulsarServiceUrl, String pulsarServieUrlTls) {
this.webServiceUrl = webServiceUrl;
this.webServiceUrlTls = webServiceUrlTls;
this.pulsarServiceUrl = pulsarServiceUrl;
this.pulsarServieUrlTls = pulsarServieUrlTls;

isUnderLoaded = false;
isOverLoaded = false;
timestamp = 0;
Expand Down Expand Up @@ -208,4 +225,20 @@ public TreeMap<String, NamespaceBundleStats> getSortedBundleStats(ResourceType r
sortedBundleStats.putAll(bundleStats);
return sortedBundleStats;
}

public String getWebServiceUrl() {
return webServiceUrl;
}

public String getWebServiceUrlTls() {
return webServiceUrlTls;
}

public String getPulsarServiceUrl() {
return pulsarServiceUrl;
}

public String getPulsarServieUrlTls() {
return pulsarServieUrlTls;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.yahoo.pulsar.broker.loadbalance.data;
package com.yahoo.pulsar.common.policies.data.loadbalancer;

/**
*/
Expand Down
Loading

0 comments on commit 47a213c

Please sign in to comment.