Skip to content

Commit

Permalink
Support topicl level offload (apache#7883)
Browse files Browse the repository at this point in the history
### Motivation
Support set Offload Policy on topic level

### Modifications
Support set/get/remove Offload policy on topic level.

### Verifying this change
Added Unit test to verify set/get/remove Offload policy at Topic level work as expected when Topic level policy is enabled/disabled
`org.apache.pulsar.broker.admin.AdminApiOffloadTest#testOffloadPoliciesApi`
`org.apache.pulsar.broker.admin.AdminApiOffloadTest#testTopicLevelOffload`
  • Loading branch information
315157973 authored Sep 4, 2020
1 parent 1632dca commit 62e3df3
Show file tree
Hide file tree
Showing 9 changed files with 524 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@

import org.apache.bookkeeper.mledger.AsyncCallbacks;
import org.apache.bookkeeper.mledger.AsyncCallbacks.ManagedLedgerInfoCallback;
import org.apache.bookkeeper.mledger.LedgerOffloader;
import org.apache.bookkeeper.mledger.ManagedLedgerException.MetadataNotFoundException;
import org.apache.bookkeeper.mledger.Entry;
import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
Expand Down Expand Up @@ -97,6 +98,9 @@
import org.apache.pulsar.common.naming.PartitionedManagedLedgerInfo;
import org.apache.pulsar.common.policies.data.DispatchRate;
import org.apache.pulsar.common.policies.data.PersistencePolicies;
import org.apache.pulsar.common.policies.data.PolicyName;
import org.apache.pulsar.common.policies.data.PolicyOperation;
import org.apache.pulsar.common.policies.data.OffloadPolicies;
import org.apache.pulsar.common.policies.data.DelayedDeliveryPolicies;
import org.apache.pulsar.common.policies.data.PublishRate;
import org.apache.pulsar.common.protocol.Commands;
Expand Down Expand Up @@ -801,6 +805,70 @@ protected void internalUnloadTopic(AsyncResponse asyncResponse, boolean authorit
}
}

protected CompletableFuture<Void> internalSetOffloadPolicies(OffloadPolicies offloadPolicies) {
TopicPolicies topicPolicies = null;
try {
topicPolicies = pulsar().getTopicPoliciesService().getTopicPolicies(topicName);
} catch (BrokerServiceException.TopicPoliciesCacheNotInitException e) {
log.error("Topic {} policies cache have not init.", topicName);
throw new RestException(Status.PRECONDITION_FAILED, "Policies cache have not init");
}
if (topicPolicies == null) {
topicPolicies = new TopicPolicies();
}
topicPolicies.setOffloadPolicies(offloadPolicies);
CompletableFuture<Void> completableFuture = new CompletableFuture<>();
pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, topicPolicies)
.thenCompose((res) -> {
//The policy update is asynchronous. Cache at this step may not be updated yet.
//So we need to set the loader by the incoming offloadPolicies instead of topic policies cache.
PartitionedTopicMetadata metadata = fetchPartitionedTopicMetadata(pulsar(), topicName);
if (metadata.partitions > 0) {
List<CompletableFuture<Void>> futures = new ArrayList<>(metadata.partitions);
for (int i = 0; i < metadata.partitions; i++) {
futures.add(internalUpdateOffloadPolicies(offloadPolicies, topicName.getPartition(i)));
}
return FutureUtil.waitForAll(futures);
} else {
return internalUpdateOffloadPolicies(offloadPolicies, topicName);
}
})
.whenComplete((result, e) -> {
if (e != null) {
completableFuture.completeExceptionally(e);
} else {
completableFuture.complete(null);
}
});
return completableFuture;
}

private CompletableFuture<Void> internalUpdateOffloadPolicies(OffloadPolicies offloadPolicies, TopicName topicName) {
return pulsar().getBrokerService().getTopicIfExists(topicName.toString())
.thenAccept(optionalTopic -> {
try {
if (!optionalTopic.isPresent() || !topicName.isPersistent()) {
return;
}
PersistentTopic persistentTopic = (PersistentTopic) optionalTopic.get();
ManagedLedgerConfig managedLedgerConfig = persistentTopic.getManagedLedger().getConfig();
if (offloadPolicies == null) {
LedgerOffloader namespaceOffloader = pulsar().getLedgerOffloaderMap().get(topicName.getNamespaceObject());
LedgerOffloader topicOffloader = managedLedgerConfig.getLedgerOffloader();
if (topicOffloader != null && topicOffloader != namespaceOffloader) {
topicOffloader.close();
}
managedLedgerConfig.setLedgerOffloader(namespaceOffloader);
} else {
managedLedgerConfig.setLedgerOffloader(pulsar().createManagedLedgerOffloader(offloadPolicies));
}
persistentTopic.getManagedLedger().setConfig(managedLedgerConfig);
} catch (PulsarServerException e) {
throw new RestException(e);
}
});
}

protected CompletableFuture<Void> internalSetMaxUnackedMessagesOnSubscription(Integer maxUnackedNum) {
TopicPolicies topicPolicies = null;
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@
import org.apache.pulsar.common.policies.data.DispatchRate;
import org.apache.pulsar.common.policies.data.PersistencePolicies;
import org.apache.pulsar.common.policies.data.DelayedDeliveryPolicies;
import org.apache.pulsar.common.policies.data.OffloadPolicies;
import org.apache.pulsar.common.policies.data.PersistentOfflineTopicStats;
import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats;
import org.apache.pulsar.common.policies.data.PublishRate;
Expand Down Expand Up @@ -250,6 +251,69 @@ public void createNonPartitionedTopic(
internalCreateNonPartitionedTopic(authoritative);
}

@GET
@Path("/{tenant}/{namespace}/{topic}/offloadPolicies")
@ApiOperation(value = "Get offload policies on a topic.")
@ApiResponses(value = {@ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Tenant or cluster or namespace or topic doesn't exist"),
@ApiResponse(code = 500, message = "Internal server error"),})
public void getOffloadPolicies(@Suspended final AsyncResponse asyncResponse,
@PathParam("tenant") String tenant,
@PathParam("namespace") String namespace,
@PathParam("topic") @Encoded String encodedTopic) {
validateTopicName(tenant, namespace, encodedTopic);
TopicPolicies topicPolicies = getTopicPolicies(topicName).orElse(new TopicPolicies());
if (topicPolicies.isOffloadPoliciesSet()) {
asyncResponse.resume(topicPolicies.getOffloadPolicies());
} else {
asyncResponse.resume(Response.noContent().build());
}
}

@POST
@Path("/{tenant}/{namespace}/{topic}/offloadPolicies")
@ApiOperation(value = "Set offload policies on a topic.")
@ApiResponses(value = {@ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Tenant or cluster or namespace or topic doesn't exist"),})
public void setOffloadPolicies(@Suspended final AsyncResponse asyncResponse,
@PathParam("tenant") String tenant,
@PathParam("namespace") String namespace,
@PathParam("topic") @Encoded String encodedTopic,
@ApiParam(value = "Offload policies for the specified topic")
OffloadPolicies offloadPolicies) {
validateTopicName(tenant, namespace, encodedTopic);
validateAdminAccessForTenant(tenant);
validatePoliciesReadOnlyAccess();
checkTopicLevelPolicyEnable();
if (topicName.isGlobal()) {
validateGlobalNamespaceOwnership(namespaceName);
}
internalSetOffloadPolicies(offloadPolicies).whenComplete((res, ex) -> {
if (ex instanceof RestException) {
log.error("Failed set offloadPolicies", ex);
asyncResponse.resume(ex);
} else if (ex != null) {
log.error("Failed set offloadPolicies", ex);
asyncResponse.resume(new RestException(ex));
} else {
asyncResponse.resume(Response.noContent().build());
}
});
}

@DELETE
@Path("/{tenant}/{namespace}/{topic}/offloadPolicies")
@ApiOperation(value = "Delete offload policies on a topic.")
@ApiResponses(value = {@ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Tenant or cluster or namespace or topic doesn't exist"),})
public void removeOffloadPolicies(@Suspended final AsyncResponse asyncResponse,
@PathParam("tenant") String tenant,
@PathParam("namespace") String namespace,
@PathParam("topic") @Encoded String encodedTopic) {
validateTopicName(tenant, namespace, encodedTopic);
setOffloadPolicies(asyncResponse, tenant, namespace, encodedTopic, null);
}

@GET
@Path("/{tenant}/{namespace}/{topic}/maxUnackedMessagesOnConsumer")
@ApiOperation(value = "Get max unacked messages per consumer config on a topic.")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@
import org.apache.bookkeeper.common.util.OrderedScheduler;
import org.apache.bookkeeper.mledger.AsyncCallbacks.DeleteLedgerCallback;
import org.apache.bookkeeper.mledger.AsyncCallbacks.OpenLedgerCallback;
import org.apache.bookkeeper.mledger.LedgerOffloader;
import org.apache.bookkeeper.mledger.ManagedLedger;
import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
import org.apache.bookkeeper.mledger.ManagedLedgerException;
Expand Down Expand Up @@ -1069,6 +1070,7 @@ public CompletableFuture<ManagedLedgerConfig> getManagedLedgerConfig(TopicName t

PersistencePolicies persistencePolicies = null;
RetentionPolicies retentionPolicies = null;
OffloadPolicies topicLevelOffloadPolicies = null;

if (pulsar.getConfig().isTopicLevelPoliciesEnabled()) {
TopicName cloneTopicName = topicName;
Expand All @@ -1080,6 +1082,7 @@ public CompletableFuture<ManagedLedgerConfig> getManagedLedgerConfig(TopicName t
if (topicPolicies != null) {
persistencePolicies = topicPolicies.getPersistence();
retentionPolicies = topicPolicies.getRetentionPolicies();
topicLevelOffloadPolicies = topicPolicies.getOffloadPolicies();
}
} catch (BrokerServiceException.TopicPoliciesCacheNotInitException e) {
log.warn("Topic {} policies cache have not init.", topicName);
Expand Down Expand Up @@ -1158,11 +1161,23 @@ public CompletableFuture<ManagedLedgerConfig> getManagedLedgerConfig(TopicName t
managedLedgerConfig.setRetentionSizeInMB(retentionPolicies.getRetentionSizeInMB());
managedLedgerConfig.setAutoSkipNonRecoverableData(serviceConfig.isAutoSkipNonRecoverableData());
OffloadPolicies offloadPolicies = policies.map(p -> p.offload_policies).orElse(null);
managedLedgerConfig.setLedgerOffloader(pulsar.getManagedLedgerOffloader(namespace, offloadPolicies));
if (topicLevelOffloadPolicies != null) {
try {
LedgerOffloader topicLevelLedgerOffLoader = pulsar().createManagedLedgerOffloader(topicLevelOffloadPolicies);
managedLedgerConfig.setLedgerOffloader(topicLevelLedgerOffLoader);
} catch (PulsarServerException e) {
future.completeExceptionally(e);
return;
}
} else {
//If the topic level policy is null, use the namespace level
managedLedgerConfig.setLedgerOffloader(pulsar.getManagedLedgerOffloader(namespace, offloadPolicies));
}

managedLedgerConfig.setDeletionAtBatchIndexLevelEnabled(serviceConfig.isAcknowledgmentAtBatchIndexLevelEnabled());
managedLedgerConfig.setNewEntriesCheckDelayInMillis(serviceConfig.getManagedLedgerNewEntriesCheckDelayInMillis());


future.complete(managedLedgerConfig);
}, (exception) -> future.completeExceptionally(exception)));

Expand Down
Loading

0 comments on commit 62e3df3

Please sign in to comment.