Skip to content

Commit

Permalink
Add an REST endpoint to trigger backlogQuotaCheck (apache#8045)
Browse files Browse the repository at this point in the history
Fixes apache#4980

### Motivation
Expose an REST endpoint to trigger backlogQuotaCheck. That will make integration tests easier in future without forcing it to sleep and wait the block to kick in backlog quota check.

### Modifications
Expose an REST endpoint

### Verifying this change
unit test
org.apache.pulsar.broker.service.BacklogQuotaManagerTest#testTriggerBacklogQuotaWithReader
  • Loading branch information
315157973 authored Sep 23, 2020
1 parent fa3407b commit 36d4c68
Show file tree
Hide file tree
Showing 6 changed files with 139 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import javax.ws.rs.PathParam;
import javax.ws.rs.container.AsyncResponse;
import javax.ws.rs.container.Suspended;
import javax.ws.rs.core.Response;
import javax.ws.rs.core.Response.Status;

import org.apache.bookkeeper.conf.ClientConfiguration;
Expand Down Expand Up @@ -255,6 +256,26 @@ public InternalConfigurationData getInternalConfigurationData() {
return pulsar().getInternalConfigurationData();
}

@GET
@Path("/backlog-quota-check")
@ApiOperation(value = "An REST endpoint to trigger backlogQuotaCheck")
@ApiResponses(value = {
@ApiResponse(code = 200, message = "Everything is OK"),
@ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 500, message = "Internal server error")})
public void backlogQuotaCheck(@Suspended AsyncResponse asyncResponse) {
validateSuperUserAccess();
pulsar().getBrokerService().executor().execute(()->{
try {
pulsar().getBrokerService().monitorBacklogQuota();
asyncResponse.resume(Response.noContent().build());
} catch (Exception e) {
LOG.error("trigger backlogQuotaCheck fail", e);
asyncResponse.resume(new RestException(e));
}
});
}

@GET
@Path("/health")
@ApiOperation(value = "Run a healthcheck against the broker")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1376,7 +1376,7 @@ public BacklogQuotaManager getBacklogQuotaManager() {
return this.backlogQuotaManager;
}

public void monitorBacklogQuota() {
public synchronized void monitorBacklogQuota() {
forEachTopic(topic -> {
if (topic instanceof PersistentTopic) {
PersistentTopic persistentTopic = (PersistentTopic) topic;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@

import java.net.URL;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -200,6 +201,65 @@ public void testBacklogQuotaWithReader() throws Exception {
}
}

@Test
public void testTriggerBacklogQuotaWithReader() throws Exception {
assertEquals(admin.namespaces().getBacklogQuotaMap("prop/ns-quota"),
ConfigHelper.backlogQuotaMap(config));
admin.namespaces().setBacklogQuota("prop/ns-quota",
new BacklogQuota(10 * 1024, BacklogQuota.RetentionPolicy.producer_exception));
try (PulsarClient client = PulsarClient.builder().serviceUrl(adminUrl.toString()).statsInterval(0, TimeUnit.SECONDS).build();) {
final String topic1 = "persistent://prop/ns-quota/topic1" + UUID.randomUUID();
final int numMsgs = 20;
Reader<byte[]> reader = client.newReader().topic(topic1).receiverQueueSize(1).startMessageId(MessageId.latest).create();
Producer<byte[]> producer = client.newProducer().topic(topic1).sendTimeout(2, TimeUnit.SECONDS).create();
byte[] content = new byte[1024];
for (int i = 0; i < numMsgs; i++) {
content[0] = (byte) (content[0] + 1);
producer.send(content);
}
admin.brokers().backlogQuotaCheck();
rolloverStats();
TopicStats stats = admin.topics().getStats(topic1);
// overall backlogSize should be zero because we only have readers
assertEquals(stats.backlogSize, 0, "backlog size is [" + stats.backlogSize + "]");
// non-durable mes should still
assertEquals(stats.subscriptions.size(), 1);
long nonDurableSubscriptionBacklog = stats.subscriptions.values().iterator().next().msgBacklog;
assertEquals(nonDurableSubscriptionBacklog, MAX_ENTRIES_PER_LEDGER,
"non-durable subscription backlog is [" + nonDurableSubscriptionBacklog + "]"); ;
try {
// try to send over backlog quota and make sure it fails
for (int i = 0; i < numMsgs; i++) {
content[0] = (byte) (content[0] + 1);
producer.send(content);
}
} catch (PulsarClientException ce) {
fail("Should not have gotten exception: " + ce.getMessage());
}

// make sure ledgers are trimmed
PersistentTopicInternalStats internalStats = admin.topics().getInternalStats(topic1);

// check there is only one ledger left
assertEquals(internalStats.ledgers.size(), 1);

// check if its the expected ledger id given MAX_ENTRIES_PER_LEDGER
assertEquals(internalStats.ledgers.get(0).ledgerId, (2 * numMsgs / MAX_ENTRIES_PER_LEDGER) - 1);

// check reader can still read with out error

while (true) {
Message<byte[]> msg = reader.readNext(5, TimeUnit.SECONDS);
if (msg == null) {
break;
}
LOG.info("msg read: {} - {}", msg.getMessageId(), msg.getData()[0]);
}
producer.close();
reader.close();
}
}

@Test
public void testConsumerBacklogEviction() throws Exception {
assertEquals(admin.namespaces().getBacklogQuotaMap("prop/ns-quota"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,19 @@ Map<String, NamespaceOwnershipStatus> getOwnedNamespaces(String cluster, String
*/
CompletableFuture<InternalConfigurationData> getInternalConfigurationDataAsync();

/**
* Manually trigger backlogQuotaCheck.
*
* @throws PulsarAdminException
*/
void backlogQuotaCheck() throws PulsarAdminException;

/**
* Manually trigger backlogQuotaCheck asynchronously.
* @return
*/
CompletableFuture<Void> backlogQuotaCheckAsync();

/**
* Run a healthcheck on the broker.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -287,6 +287,38 @@ public void failed(Throwable throwable) {
return future;
}

@Override
public void backlogQuotaCheck() throws PulsarAdminException {
try {
backlogQuotaCheckAsync().get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
} catch (ExecutionException e) {
throw (PulsarAdminException) e.getCause();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new PulsarAdminException(e);
} catch (TimeoutException e) {
throw new PulsarAdminException.TimeoutException(e);
}
}

@Override
public CompletableFuture<Void> backlogQuotaCheckAsync() {
WebTarget path = adminBrokers.path("backlogQuotaCheck");
final CompletableFuture<Void> future = new CompletableFuture<>();
asyncGetRequest(path, new InvocationCallback<Void>() {
@Override
public void completed(Void unused) {
future.complete(null);
}

@Override
public void failed(Throwable throwable) {
future.completeExceptionally(throwable);
}
});
return future;
}

@Override
public void healthcheck() throws PulsarAdminException {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,17 @@ void run() throws Exception {

}

@Parameters(commandDescription = "Manually trigger backlogQuotaCheck")
private class BacklogQuotaCheckCmd extends CliCommand {

@Override
void run() throws Exception {
admin.brokers().backlogQuotaCheckAsync();
System.out.println("ok");
}

}

public CmdBrokers(PulsarAdmin admin) {
super("brokers", admin);
jcommander.addCommand("list", new List());
Expand All @@ -135,5 +146,6 @@ public CmdBrokers(PulsarAdmin admin) {
jcommander.addCommand("get-internal-config", new GetInternalConfigurationCmd());
jcommander.addCommand("get-runtime-config", new GetRuntimeConfigCmd());
jcommander.addCommand("healthcheck", new HealthcheckCmd());
jcommander.addCommand("backlog-quota-check", new BacklogQuotaCheckCmd());
}
}

0 comments on commit 36d4c68

Please sign in to comment.