Skip to content

Commit

Permalink
[PIP-82] [pulsar-broker] Misc fixes: (apache#11821)
Browse files Browse the repository at this point in the history
- fix updateLocalQuota to not attempt Dispatch changes
    - add checks for totalUsed of 0, and unconfigured RG in ResourceQuotaCalculatorImpl
    - UT for above

Co-authored-by: Kaushik Ghosh <[email protected]>
  • Loading branch information
kaushik-develop and kaushikg-splunk authored Aug 31, 2021
1 parent a68853f commit 60a5698
Show file tree
Hide file tree
Showing 4 changed files with 96 additions and 29 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,9 @@ protected ResourceGroupOpStatus registerUsage(String name, ResourceGroupRefTypes

// If this is the first ref, register with the transport manager.
if (this.resourceGroupTenantRefs.size() + this.resourceGroupNamespaceRefs.size() == 1) {
log.debug("registerUsage for RG={}: registering with transport-mgr", this.resourceGroupName);
if (log.isDebugEnabled()) {
log.debug("registerUsage for RG={}: registering with transport-mgr", this.resourceGroupName);
}
transportManager.registerResourceUsagePublisher(this.ruPublisher);
transportManager.registerResourceUsageConsumer(this.ruConsumer);
}
Expand All @@ -192,7 +194,9 @@ protected ResourceGroupOpStatus registerUsage(String name, ResourceGroupRefTypes

// If this was the last ref, unregister from the transport manager.
if (this.resourceGroupTenantRefs.size() + this.resourceGroupNamespaceRefs.size() == 0) {
log.debug("unRegisterUsage for RG={}: un-registering from transport-mgr", this.resourceGroupName);
if (log.isDebugEnabled()) {
log.debug("unRegisterUsage for RG={}: un-registering from transport-mgr", this.resourceGroupName);
}
transportManager.unregisterResourceUsageConsumer(this.ruConsumer);
transportManager.unregisterResourceUsagePublisher(this.ruPublisher);
}
Expand Down Expand Up @@ -321,6 +325,14 @@ protected BytesAndMessagesCount getGlobalUsageStats(ResourceGroupMonitoringClass

protected BytesAndMessagesCount updateLocalQuota(ResourceGroupMonitoringClass monClass,
BytesAndMessagesCount newQuota) throws PulsarAdminException {
// Only the Publish side is functional now; add the Dispatch side code when the consume side is ready.
if (!ResourceGroupMonitoringClass.Publish.equals(monClass)) {
if (log.isDebugEnabled()) {
log.debug("Doing nothing for monClass={}; only Publish is functional", monClass);
}
return null;
}

this.checkMonitoringClass(monClass);
BytesAndMessagesCount oldBMCount;

Expand All @@ -333,8 +345,10 @@ protected BytesAndMessagesCount updateLocalQuota(ResourceGroupMonitoringClass mo
} finally {
monEntity.localUsageStatsLock.unlock();
}
log.debug("updateLocalQuota for RG={}: set local {} quota to bytes={}, messages={}",
this.resourceGroupName, monClass, newQuota.bytes, newQuota.messages);
if (log.isDebugEnabled()) {
log.debug("updateLocalQuota for RG={}: set local {} quota to bytes={}, messages={}",
this.resourceGroupName, monClass, newQuota.bytes, newQuota.messages);
}

return oldBMCount;
}
Expand Down Expand Up @@ -434,11 +448,16 @@ protected boolean setUsageInMonitoredEntity(ResourceGroupMonitoringClass monClas
double sentCount = sendReport ? 1 : 0;
rgLocalUsageReportCount.labels(rgName, monClass.name()).inc(sentCount);
if (sendReport) {
log.debug("fillResourceUsage for RG={}: filled a {} update; bytes={}, messages={}",
rgName, monClass, bytesUsed, messagesUsed);
if (log.isDebugEnabled()) {
log.debug("fillResourceUsage for RG={}: filled a {} update; bytes={}, messages={}",
rgName, monClass, bytesUsed, messagesUsed);
}
} else {
log.debug("fillResourceUsage for RG={}: report for {} suppressed (suppressions={} since last sent report)",
rgName, monClass, numSuppressions);
if (log.isDebugEnabled()) {
log.debug("fillResourceUsage for RG={}: report for {} suppressed "
+ "(suppressions={} since last sent report)",
rgName, monClass, numSuppressions);
}
}

return sendReport;
Expand Down Expand Up @@ -479,11 +498,13 @@ private void getUsageFromMonitoredEntity(ResourceGroupMonitoringClass monClass,
oldMessageCount = oldUsageStats.usedValues.messages;
}

log.debug("resourceUsageListener for RG={}: updated {} stats for broker={} "
+ "with bytes={} (old ={}), messages={} (old={})",
this.resourceGroupName, monClass, broker,
newByteCount, oldByteCount,
newMessageCount, oldMessageCount);
if (log.isDebugEnabled()) {
log.debug("resourceUsageListener for RG={}: updated {} stats for broker={} "
+ "with bytes={} (old ={}), messages={} (old={})",
this.resourceGroupName, monClass, broker,
newByteCount, oldByteCount,
newMessageCount, oldMessageCount);
}
}

private void setResourceGroupMonitoringClassFields() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -445,10 +445,12 @@ private void updateStatsWithDiff(String topicName, String tenantString, String n

try {
boolean statsUpdated = this.incrementUsage(tenantString, nsString, monClass, bmDiff);
log.debug("updateStatsWithDiff for topic={}: monclass={} statsUpdated={} for tenant={}, namespace={}; "
+ "by {} bytes, {} mesgs",
topicName, monClass, statsUpdated, tenantString, nsString,
bmDiff.bytes, bmDiff.messages);
if (log.isDebugEnabled()) {
log.debug("updateStatsWithDiff for topic={}: monclass={} statsUpdated={} for tenant={}, namespace={}; "
+ "by {} bytes, {} mesgs",
topicName, monClass, statsUpdated, tenantString, nsString,
bmDiff.bytes, bmDiff.messages);
}
hm.put(topicName, bmNewCount);
} catch (Throwable t) {
log.error("updateStatsWithDiff: got ex={} while aggregating for {} side",
Expand Down Expand Up @@ -553,7 +555,9 @@ protected void aggregateResourceGroupLocalUsages() {
ResourceGroupMonitoringClass.Dispatch);
}
double diffTimeSeconds = aggrUsageTimer.observeDuration();
log.debug("aggregateResourceGroupLocalUsages took {} milliseconds", diffTimeSeconds * 1000);
if (log.isDebugEnabled()) {
log.debug("aggregateResourceGroupLocalUsages took {} milliseconds", diffTimeSeconds * 1000);
}

// Check any re-scheduling requirements for next time.
// Use the same period as getResourceUsagePublishIntervalInSecs;
Expand Down Expand Up @@ -610,20 +614,40 @@ protected void calculateQuotaForAllResourceGroups() {
globUsageMessagesArray);

BytesAndMessagesCount oldBMCount = resourceGroup.updateLocalQuota(monClass, updatedQuota);
rgCalculatedQuotaMessages.labels(rgName, monClass.name()).inc(updatedQuota.messages);
rgCalculatedQuotaBytes.labels(rgName, monClass.name()).inc(updatedQuota.bytes);
long messagesIncrement = updatedQuota.messages - oldBMCount.messages;
long bytesIncrement = updatedQuota.bytes - oldBMCount.bytes;
log.debug("calculateQuota for RG {} [class {}]: bytes incremented by {}, messages by {}",
rgName, monClass, messagesIncrement, bytesIncrement);
// Guard against unconfigured quota settings, for which computeLocalQuota will return negative.
if (updatedQuota.messages >= 0) {
rgCalculatedQuotaMessages.labels(rgName, monClass.name()).inc(updatedQuota.messages);
}
if (updatedQuota.bytes >= 0) {
rgCalculatedQuotaBytes.labels(rgName, monClass.name()).inc(updatedQuota.bytes);
}
if (oldBMCount != null) {
long messagesIncrement = updatedQuota.messages - oldBMCount.messages;
long bytesIncrement = updatedQuota.bytes - oldBMCount.bytes;
if (log.isDebugEnabled()) {
log.debug("calculateQuota for RG={} [class {}]: "
+ "updatedlocalBytes={}, updatedlocalMesgs={}; "
+ "old bytes={}, old mesgs={}; incremented bytes by {}, messages by {}",
rgName, monClass, updatedQuota.bytes, updatedQuota.messages,
oldBMCount.bytes, oldBMCount.messages,
bytesIncrement, messagesIncrement);
}
} else {
if (log.isDebugEnabled()) {
log.debug("calculateQuota for RG={} [class {}]: got back null from updateLocalQuota",
rgName, monClass);
}
}
} catch (Throwable t) {
log.error("Got exception={} while calculating new quota for monitoring-class={} of RG={}",
t.getMessage(), monClass, rgName);
}
}
});
double diffTimeSeconds = quotaCalcTimer.observeDuration();
log.debug("calculateQuotaForAllResourceGroups took {} milliseconds", diffTimeSeconds * 1000);
if (log.isDebugEnabled()) {
log.debug("calculateQuotaForAllResourceGroups took {} milliseconds", diffTimeSeconds * 1000);
}

// Check any re-scheduling requirements for next time.
// Use the same period as getResourceUsagePublishIntervalInSecs;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,12 @@ public long computeLocalQuota(long confUsage, long myUsage, long[] allUsages) th

if (confUsage < 0) {
// This can happen if the RG is not configured with this particular limit (message or byte count) yet.
// It is safe to return a high value (so we don't limit) for the quota.
log.debug("Configured usage {} is not set; returning a high calculated quota", confUsage);
return Long.MAX_VALUE;
val retVal = -1;
if (log.isDebugEnabled()) {
log.debug("Configured usage ({}) is not set; returning a special value ({}) for calculated quota",
confUsage, retVal);
}
return retVal;
}

if (myUsage < 0 || totalUsage < 0) {
Expand All @@ -51,6 +54,15 @@ public long computeLocalQuota(long confUsage, long myUsage, long[] allUsages) th
throw new PulsarAdminException(errMesg);
}

// If the total usage is zero (which may happen during initial transients), just return the configured value.
// The caller is expected to check the value returned, or not call here with a zero global usage.
// [This avoids a division by zero when calculating the local share.]
if (totalUsage == 0) {
log.warn("computeLocalQuota: totalUsage is zero; returning the configured usage ({}) as new local quota",
confUsage);
return confUsage;
}

if (myUsage > totalUsage) {
String errMesg = String.format("Local usage (%d) is greater than total usage (%d)",
myUsage, totalUsage);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,8 @@ protected void cleanup() throws Exception {
public void testRQCalcNegativeConfTest() throws PulsarAdminException {
final long[] allUsage = { 0 };
long calculatedQuota = this.rqCalc.computeLocalQuota(-1, 0, allUsage);
Assert.assertEquals(calculatedQuota, Long.MAX_VALUE);
long expectedQuota = -1;
Assert.assertEquals(calculatedQuota, expectedQuota);
}

@Test
Expand Down Expand Up @@ -102,5 +103,14 @@ public void testRQCalcProportionalIncrementTest() throws PulsarAdminException {
Assert.assertEquals(initialUsageRatio, proposedUsageRatio);
}

@Test
public void testRQCalcGlobUsedZeroTest() throws PulsarAdminException {
final long config = 10; // don't care
final long localUsed = 0; // don't care
final long[] allUsage = { 0 };
final long newQuota = this.rqCalc.computeLocalQuota(config, localUsed, allUsage);
Assert.assertTrue(newQuota == config);
}

private ResourceQuotaCalculatorImpl rqCalc;
}

0 comments on commit 60a5698

Please sign in to comment.