Skip to content

Commit

Permalink
Avoid bundle-split with old load-report (apache#826)
Browse files Browse the repository at this point in the history
* Avoid bundle-split with old load-report

* Remove old split-bundle from stats so load-manager can read before stats update
  • Loading branch information
rdhabalia authored and merlimat committed Oct 17, 2017
1 parent a26c1e5 commit b7bbe90
Show file tree
Hide file tree
Showing 3 changed files with 25 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ public class SimpleLoadManagerImpl implements LoadManager, ZooKeeperCacheListene
// update LoadReport at most every 5 seconds
public static final long LOAD_REPORT_UPDATE_MIMIMUM_INTERVAL = TimeUnit.SECONDS.toMillis(5);
// last LoadReport stored in ZK
private LoadReport lastLoadReport;
private volatile LoadReport lastLoadReport;
// last timestamp resource usage was checked
private long lastResourceUsageTimestamp = -1;
// flag to force update load report
Expand Down Expand Up @@ -1098,11 +1098,14 @@ public SystemResourceUsage getSystemResourceUsage() throws IOException {

@Override
public LoadReport generateLoadReport() throws Exception {
if (!isLoadReportGenerationIntervalPassed()) {
return lastLoadReport;
}
return generateLoadReportForcefully();
}

private LoadReport generateLoadReportForcefully() throws Exception {
synchronized (bundleGainsCache) {
long timeSinceLastGenMillis = System.currentTimeMillis() - lastLoadReport.getTimestamp();
if (timeSinceLastGenMillis <= LOAD_REPORT_UPDATE_MIMIMUM_INTERVAL) {
return lastLoadReport;
}
try {
LoadReport loadReport = new LoadReport(pulsar.getWebServiceAddress(), pulsar.getWebServiceAddressTls(),
pulsar.getBrokerServiceUrl(), pulsar.getBrokerServiceUrlTls());
Expand Down Expand Up @@ -1262,7 +1265,7 @@ public void writeLoadReportOnZookeeper() throws Exception {
}

if (needUpdate) {
LoadReport lr = generateLoadReport();
LoadReport lr = generateLoadReportForcefully();
pulsar.getZkClient().setData(brokerZnodePath, ObjectMapperFactory.getThreadLocal().writeValueAsBytes(lr),
-1);
this.lastLoadReport = lr;
Expand All @@ -1272,6 +1275,17 @@ public void writeLoadReportOnZookeeper() throws Exception {
}
}

/**
* Check if last generated load-report time passed the minimum time for load-report update.
*
* @return true: if last load-report generation passed the minimum interval and load-report can be generated false:
* if load-report generation has not passed minimum interval to update load-report again
*/
private boolean isLoadReportGenerationIntervalPassed() {
long timeSinceLastGenMillis = System.currentTimeMillis() - lastLoadReport.getTimestamp();
return timeSinceLastGenMillis > LOAD_REPORT_UPDATE_MIMIMUM_INTERVAL;
}

// todo: changeme: this can be optimized, we don't have to iterate through everytime
private boolean isBrokerAvailableForRebalancing(String bundleName, long maxLoadLevel) {
NamespaceName namespaceName = new NamespaceName(LoadManagerShared.getNamespaceNameFromBundleName(bundleName));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -720,6 +720,7 @@ public void refreshTopicToStatsMaps(NamespaceBundle oldBundle) {
// remove old bundle from the map
synchronized (multiLayerTopicsMap) {
multiLayerTopicsMap.get(oldBundle.getNamespaceObject().toString()).remove(oldBundle.toString());
pulsarStats.invalidBundleStats(oldBundle.toString());
}
}
} catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,10 @@ public synchronized void updateStats(
}
}

public NamespaceBundleStats invalidBundleStats(String bundleName) {
return bundleStats.remove(bundleName);
}

public void getDimensionMetrics(Consumer<ByteBuf> consumer) {
bufferLock.readLock().lock();
try {
Expand Down

0 comments on commit b7bbe90

Please sign in to comment.