Skip to content

Commit

Permalink
[cleanup][broker] Override close method to avoid caching exception (a…
Browse files Browse the repository at this point in the history
  • Loading branch information
Technoboy- authored May 11, 2022
1 parent dd6d372 commit 526979a
Show file tree
Hide file tree
Showing 7 changed files with 11 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1051,9 +1051,7 @@ public void updateResourceGroupLimiter(Optional<Policies> optPolicies) {
}

// attach the resource-group level rate limiters, if set
String rgName = policies.resource_group_name != null
? policies.resource_group_name
: null;
String rgName = policies.resource_group_name;
if (rgName != null) {
final ResourceGroup resourceGroup =
brokerService.getPulsar().getResourceGroupServiceManager().resourceGroupGet(rgName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ public boolean tryAcquire(int numbers, long bytes) {
}

@Override
public void close() throws Exception {
public void close() {
rateLimitFunction.apply();
replaceLimiters(null);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,4 +71,9 @@ public interface PublishRateLimiter extends AutoCloseable {
* @param bytes
*/
boolean tryAcquire(int numbers, long bytes);

/**
* Close the limiter.
*/
void close();
}
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ public boolean tryAcquire(int numbers, long bytes) {
}

@Override
public void close() throws Exception {
public void close() {
// No-op
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ public boolean tryAcquire(int numbers, long bytes) {
}

@Override
public void close() throws Exception {
public void close() {
// no-op
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -491,11 +491,7 @@ public CompletableFuture<Void> close(boolean closeWithoutWaitingClientDisconnect
replicators.forEach((cluster, replicator) -> futures.add(replicator.disconnect()));
producers.values().forEach(producer -> futures.add(producer.disconnect()));
if (topicPublishRateLimiter != null) {
try {
topicPublishRateLimiter.close();
} catch (Exception e) {
log.warn("Error closing topicPublishRateLimiter for topic {}", topic, e);
}
topicPublishRateLimiter.close();
}
subscriptions.forEach((s, sub) -> futures.add(sub.disconnect()));
if (this.resourceGroupPublishLimiter != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1281,11 +1281,7 @@ public CompletableFuture<Void> close(boolean closeWithoutWaitingClientDisconnect
replicators.forEach((cluster, replicator) -> futures.add(replicator.disconnect()));
producers.values().forEach(producer -> futures.add(producer.disconnect()));
if (topicPublishRateLimiter != null) {
try {
topicPublishRateLimiter.close();
} catch (Exception e) {
log.warn("Error closing topicPublishRateLimiter for topic {}", topic, e);
}
topicPublishRateLimiter.close();
}
subscriptions.forEach((s, sub) -> futures.add(sub.disconnect()));
if (this.resourceGroupPublishLimiter != null) {
Expand Down

0 comments on commit 526979a

Please sign in to comment.