Skip to content

Commit

Permalink
Add compaction threshold for topic level (apache#7881)
Browse files Browse the repository at this point in the history
Fix apache#7826 

### Motivation
Support compaction threshold on topic level.
Based on the system topic function.

### Modifications
Support set compaction threshold on topic level.
Support get compaction threshold on topic level.
Support remove compaction threshold on topic level.
  • Loading branch information
hangc0276 authored Aug 30, 2020
1 parent b06ea5e commit 3ff753c
Show file tree
Hide file tree
Showing 9 changed files with 422 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -3100,4 +3100,46 @@ protected CompletableFuture<Void> internalRemoveDispatchRate() {

}

protected Optional<Long> internalGetCompactionThreshold() {
validateAdminAccessForTenant(namespaceName.getTenant());
validatePoliciesReadOnlyAccess();
if (topicName.isGlobal()) {
validateGlobalNamespaceOwnership(namespaceName);
}
checkTopicLevelPolicyEnable();
return getTopicPolicies(topicName).map(TopicPolicies::getCompactionThreshold);
}

protected CompletableFuture<Void> internalSetCompactionThreshold(Long compactionThreshold) {
if (compactionThreshold != null && compactionThreshold < 0) {
throw new RestException(Status.PRECONDITION_FAILED, "Invalid value for compactionThreshold");
}
validateAdminAccessForTenant(namespaceName.getTenant());
validatePoliciesReadOnlyAccess();
if (topicName.isGlobal()) {
validateGlobalNamespaceOwnership(namespaceName);
}
checkTopicLevelPolicyEnable();

TopicPolicies topicPolicies = getTopicPolicies(topicName)
.orElseGet(TopicPolicies::new);
topicPolicies.setCompactionThreshold(compactionThreshold);
return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, topicPolicies);
}

protected CompletableFuture<Void> internalRemoveCompactionThreshold() {
validateAdminAccessForTenant(namespaceName.getTenant());
validatePoliciesReadOnlyAccess();
if (topicName.isGlobal()) {
validateGlobalNamespaceOwnership(namespaceName);
}
checkTopicLevelPolicyEnable();
Optional<TopicPolicies> topicPolicies = getTopicPolicies(topicName);
if (!topicPolicies.isPresent()) {
return CompletableFuture.completedFuture(null);
}
topicPolicies.get().setCompactionThreshold(null);
return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, topicPolicies.get());
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -1784,5 +1784,92 @@ public void removeDispatchRate(@Suspended final AsyncResponse asyncResponse,
});
}

@GET
@Path("/{tenant}/{namespace}/{topic}/compactionThreshold")
@ApiOperation(value = "Get compaction threshold configuration for specified topic.")
@ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Topic does not exist"),
@ApiResponse(code = 405, message = "Topic level policy is disabled, please enable the topic level policy and retry"),
@ApiResponse(code = 409, message = "Concurrent modification")})
public void getCompactionThreshold(@Suspended final AsyncResponse asyncResponse,
@PathParam("tenant") String tenant,
@PathParam("namespace") String namespace,
@PathParam("topic") @Encoded String encodedTopic) {
validateTopicName(tenant, namespace, encodedTopic);
try {
Optional<Long> compactionThreshold = internalGetCompactionThreshold();
if (!compactionThreshold.isPresent()) {
asyncResponse.resume(Response.noContent().build());
} else {
asyncResponse.resume(compactionThreshold.get());
}
} catch (RestException e) {
asyncResponse.resume(e);
} catch (Exception e) {
asyncResponse.resume(new RestException(e));
}
}

@POST
@Path("/{tenant}/{namespace}/{topic}/compactionThreshold")
@ApiOperation(value = "Set compaction threshold configuration for specified topic.")
@ApiResponses(value = { @ApiResponse(code = 403, message = "Topic does not exist"),
@ApiResponse(code = 404, message = "Topic does not exist"),
@ApiResponse(code = 405, message = "Topic level policy is disabled, please enable the topic level policy and retry"),
@ApiResponse(code = 409, message = "Concurrent modification")})
public void setCompactionThreshold(@Suspended final AsyncResponse asyncResponse,
@PathParam("tenant") String tenant,
@PathParam("namespace") String namespace,
@PathParam("topic") @Encoded String encodedTopic,
@ApiParam(value = "Dispatch rate for the specified topic") long compactionThreshold) {
validateTopicName(tenant, namespace, encodedTopic);
internalSetCompactionThreshold(compactionThreshold).whenComplete((r, ex) -> {
if (ex instanceof RestException) {
log.error("Failed to set topic dispatch rate", ex);
asyncResponse.resume(ex);
} else if (ex != null) {
log.error("Failed to set topic dispatch rate");
asyncResponse.resume(new RestException(ex));
} else {
try {
log.info("[{}] Successfully set topic compaction threshold: tenant={}, namespace={}, topic={}, compactionThreshold={}",
clientAppId(),
tenant,
namespace,
topicName.getLocalName(),
jsonMapper().writeValueAsString(compactionThreshold));
} catch (JsonProcessingException ignore) {}
asyncResponse.resume(Response.noContent().build());
}
});
}

@DELETE
@Path("/{tenant}/{namespace}/{topic}/compactionThreshold")
@ApiOperation(value = "Remove compaction threshold configuration for specified topic.")
@ApiResponses(value = { @ApiResponse(code = 403, message = "Topic does not exist"),
@ApiResponse(code = 404, message = "Topic does not exist"),
@ApiResponse(code = 405, message = "Topic level policy is disabled, please enable the topic level policy and retry"),
@ApiResponse(code = 409, message = "Concurrent modification")})
public void removeCompactionThreshold(@Suspended final AsyncResponse asyncResponse,
@PathParam("tenant") String tenant,
@PathParam("namespace") String namespace,
@PathParam("topic") @Encoded String encodedTopic) {
validateTopicName(tenant, namespace, encodedTopic);
internalRemoveCompactionThreshold().whenComplete((r, ex) -> {
if (ex != null) {
log.error("Failed to remove topic dispatch rate", ex);
asyncResponse.resume(new RestException(ex));
} else {
log.info("[{}] Successfully remove topic compaction threshold: tenant={}, namespace={}, topic={}",
clientAppId(),
tenant,
namespace,
topicName.getLocalName());
asyncResponse.resume(Response.noContent().build());
}
});
}

private static final Logger log = LoggerFactory.getLogger(PersistentTopics.class);
}
Original file line number Diff line number Diff line change
Expand Up @@ -1155,12 +1155,18 @@ public void checkMessageDeduplicationInfo() {
public void checkCompaction() {
TopicName name = TopicName.get(topic);
try {
Policies policies = brokerService.pulsar().getConfigurationCache().policiesCache()
.get(AdminResource.path(POLICIES, name.getNamespace()))
.orElseThrow(() -> new KeeperException.NoNodeException());
Long compactionThreshold = Optional.ofNullable(getTopicPolicies(name))
.map(TopicPolicies::getCompactionThreshold)
.orElse(null);
if (compactionThreshold == null) {
Policies policies = brokerService.pulsar().getConfigurationCache().policiesCache()
.get(AdminResource.path(POLICIES, name.getNamespace()))
.orElseThrow(() -> new KeeperException.NoNodeException());
compactionThreshold = policies.compaction_threshold;
}


if (isSystemTopic() || policies.compaction_threshold != 0
if (isSystemTopic() || compactionThreshold != 0
&& currentCompaction.isDone()) {

long backlogEstimate = 0;
Expand All @@ -1173,13 +1179,13 @@ public void checkCompaction() {
backlogEstimate = ledger.getEstimatedBacklogSize();
}

if (backlogEstimate > policies.compaction_threshold) {
if (backlogEstimate > compactionThreshold) {
try {
triggerCompaction();
} catch (AlreadyRunningException are) {
log.debug("[{}] Compaction already running, so don't trigger again, "
+ "even though backlog({}) is over threshold({})",
name, backlogEstimate, policies.compaction_threshold);
name, backlogEstimate, compactionThreshold);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -150,4 +150,24 @@ public void testDispatchRateDisabled() throws Exception {
Assert.assertEquals(e.getStatusCode(), 405);
}
}

@Test
public void testCompactionThresholdDisabled() {
Long compactionThreshold = 10000L;
log.info("Compaction threshold: {} will set to the topic: {}", compactionThreshold, testTopic);

try {
admin.topics().setCompactionThreshold(testTopic, compactionThreshold);
Assert.fail();
} catch (PulsarAdminException e) {
Assert.assertEquals(e.getStatusCode(), 405);
}

try {
admin.topics().getCompactionThreshold(testTopic);
Assert.fail();
} catch (PulsarAdminException e) {
Assert.assertEquals(e.getStatusCode(), 405);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -377,4 +377,42 @@ public void testRemoveDispatchRate() throws Exception {

admin.topics().deletePartitionedTopic(testTopic, true);
}

@Test
public void testGetSetCompactionThreshold() throws Exception {
long compactionThreshold = 100000;
log.info("Compaction threshold: {} will set to the topic: {}", compactionThreshold, testTopic);

admin.topics().setCompactionThreshold(testTopic, compactionThreshold);
log.info("Compaction threshold set success on topic: {}", testTopic);

Thread.sleep(3000);
long getCompactionThreshold = admin.topics().getCompactionThreshold(testTopic);
log.info("Compaction threshold: {} get on topic: {}", getCompactionThreshold, testTopic);
Assert.assertEquals(getCompactionThreshold, compactionThreshold);

admin.topics().deletePartitionedTopic(testTopic, true);
}

@Test
public void testRemoveCompactionThreshold() throws Exception {
Long compactionThreshold = 100000L;
log.info("Compaction threshold: {} will set to the topic: {}", compactionThreshold, testTopic);

admin.topics().setCompactionThreshold(testTopic, compactionThreshold);
log.info("Compaction threshold set success on topic: {}", testTopic);

Thread.sleep(3000);
Long getCompactionThreshold = admin.topics().getCompactionThreshold(testTopic);
log.info("Compaction threshold: {} get on topic: {}", getCompactionThreshold, testTopic);
Assert.assertEquals(getCompactionThreshold, compactionThreshold);

admin.topics().removeCompactionThreshold(testTopic);
Thread.sleep(3000);
log.info("Compaction threshold get on topic: {} after remove", getCompactionThreshold, testTopic);
getCompactionThreshold = admin.topics().getCompactionThreshold(testTopic);
Assert.assertNull(getCompactionThreshold);

admin.topics().deletePartitionedTopic(testTopic, true);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1913,4 +1913,98 @@ CompletableFuture<Void> setDelayedDeliveryPolicyAsync(String topic
*/
CompletableFuture<Void> removeDispatchRateAsync(String topic) throws PulsarAdminException;

/**
* Get the compactionThreshold for a topic. The maximum number of bytes
* can have before compaction is triggered. 0 disables.
* <p/>
* Response example:
*
* <pre>
* <code>10000000</code>
* </pre>
*
* @param topic
* Topic name
*
* @throws NotAuthorizedException
* Don't have admin permission
* @throws NotFoundException
* Namespace does not exist
* @throws PulsarAdminException
* Unexpected error
*/
Long getCompactionThreshold(String topic) throws PulsarAdminException;

/**
* Get the compactionThreshold for a topic asynchronously. The maximum number of bytes
* can have before compaction is triggered. 0 disables.
* <p/>
* Response example:
*
* <pre>
* <code>10000000</code>
* </pre>
*
* @param topic
* Topic name
*/
CompletableFuture<Long> getCompactionThresholdAsync(String topic);

/**
* Set the compactionThreshold for a topic. The maximum number of bytes
* can have before compaction is triggered. 0 disables.
* <p/>
* Request example:
*
* <pre>
* <code>10000000</code>
* </pre>
*
* @param topic
* Topic name
* @param compactionThreshold
* maximum number of backlog bytes before compaction is triggered
*
* @throws NotAuthorizedException
* Don't have admin permission
* @throws NotFoundException
* Namespace does not exist
* @throws PulsarAdminException
* Unexpected error
*/
void setCompactionThreshold(String topic, long compactionThreshold) throws PulsarAdminException;

/**
* Set the compactionThreshold for a topic asynchronously. The maximum number of bytes
* can have before compaction is triggered. 0 disables.
* <p/>
* Request example:
*
* <pre>
* <code>10000000</code>
* </pre>
*
* @param topic
* Topic name
* @param compactionThreshold
* maximum number of backlog bytes before compaction is triggered
*/
CompletableFuture<Void> setCompactionThresholdAsync(String topic, long compactionThreshold);

/**
* Remove the compactionThreshold for a topic.
* @param topic
* Topic name
* @throws PulsarAdminException
* Unexpected error
*/
void removeCompactionThreshold(String topic) throws PulsarAdminException;

/**
* Remove the compactionThreshold for a topic asynchronously.
* @param topic
* Topic name
*/
CompletableFuture<Void> removeCompactionThresholdAsync(String topic);

}
Loading

0 comments on commit 3ff753c

Please sign in to comment.