Skip to content

Commit

Permalink
admin-cli-support-terminate-partitioned-topic (apache#11893)
Browse files Browse the repository at this point in the history
Co-authored-by: gavingaozhangmin <[email protected]>
  • Loading branch information
gaozhangmin and gavingaozhangmin authored Dec 21, 2021
1 parent d3d4b89 commit 2adb666
Show file tree
Hide file tree
Showing 8 changed files with 137 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
Expand Down Expand Up @@ -3224,23 +3225,29 @@ protected void internalTerminatePartitionedTopic(AsyncResponse asyncResponse, bo
validateTopicOwnership(topicName, authoritative);
validateTopicOperation(topicName, TopicOperation.TERMINATE);

List<MessageId> messageIds = new ArrayList<>();
Map<Integer, MessageId> messageIds = new ConcurrentHashMap<>();

PartitionedTopicMetadata partitionMetadata = getPartitionedTopicMetadata(topicName, authoritative, false);

if (partitionMetadata.partitions == 0) {
throw new RestException(Status.METHOD_NOT_ALLOWED, "Termination of a non-partitioned topic is "
+ "not allowed using partitioned-terminate, please use terminate commands.");
}
if (partitionMetadata.partitions > 0) {
final List<CompletableFuture<MessageId>> futures = Lists.newArrayList();

for (int i = 0; i < partitionMetadata.partitions; i++) {
TopicName topicNamePartition = topicName.getPartition(i);
try {
futures.add(pulsar().getAdminClient().topics()
int finalI = i;
futures.add(pulsar().getAdminClient().topics()
.terminateTopicAsync(topicNamePartition.toString()).whenComplete((messageId, throwable) -> {
if (throwable != null) {
log.error("[{}] Failed to terminate topic {}", clientAppId(), topicNamePartition,
throwable);
asyncResponse.resume(new RestException(throwable));
}
messageIds.add(messageId);
messageIds.put(finalI, messageId);
}));
} catch (Exception e) {
log.error("[{}] Failed to terminate topic {}", clientAppId(), topicNamePartition, e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2587,7 +2587,7 @@ public MessageId terminate(
+ "subscriber is not authorized to access this operation"),
@ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Topic does not exist"),
@ApiResponse(code = 405, message = "Termination of a partitioned topic is not allowed"),
@ApiResponse(code = 405, message = "Termination of a non-partitioned topic is not allowed"),
@ApiResponse(code = 412, message = "Topic name is not valid"),
@ApiResponse(code = 500, message = "Internal server error"),
@ApiResponse(code = 503, message = "Failed to validate global cluster configuration")})
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
import com.google.common.collect.Sets;
import java.lang.reflect.Field;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -336,7 +336,9 @@ public void testTerminatePartitionedTopic() {
// 9) terminate partitioned topic
response = mock(AsyncResponse.class);
persistentTopics.terminatePartitionedTopic(response, testTenant, testNamespace, testLocalTopicName, true);
verify(response, timeout(5000).times(1)).resume(Arrays.asList(new MessageIdImpl(3, -1, -1)));
Map<Integer, MessageId> messageIds = new ConcurrentHashMap<>();
messageIds.put(0, new MessageIdImpl(3, -1, -1));
verify(response, timeout(5000).times(1)).resume(messageIds);
}

@Test
Expand Down Expand Up @@ -1126,4 +1128,24 @@ public void testDeleteTopic() throws Exception {
Assert.assertEquals(e.getResponse().getStatus(), 404);
}
}

public void testAdminTerminatePartitionedTopic() throws Exception{
TenantInfoImpl tenantInfo = new TenantInfoImpl(Sets.newHashSet("role1", "role2"), Sets.newHashSet("test"));
admin.tenants().createTenant("prop-xyz", tenantInfo);
admin.namespaces().createNamespace("prop-xyz/ns12", Sets.newHashSet("test"));
final String topicName = "persistent://prop-xyz/ns12/testTerminatePartitionedTopic";

admin.topics().createPartitionedTopic(topicName, 1);
Map<Integer, MessageId> results = new HashMap<>();
results.put(0, new MessageIdImpl(3, -1, -1));
Assert.assertEquals(admin.topics().terminatePartitionedTopic(topicName), results);

// Check examine message not allowed on non-partitioned topic.
admin.topics().createNonPartitionedTopic("persistent://prop-xyz/ns12/test");
try {
admin.topics().terminatePartitionedTopic(topicName);
} catch (PulsarAdminException e) {
Assert.assertEquals(e.getMessage(), "Termination of a non-partitioned topic is not allowed using partitioned-terminate, please use terminate commands.");
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -747,6 +747,26 @@ default CompletableFuture<Void> deleteAsync(String topic, boolean force) {
*/
CompletableFuture<MessageId> terminateTopicAsync(String topic);

/**
* Terminate the partitioned topic and prevent any more messages being published on it.
* <p/>
*
* @param topic
* topic name
* @return the message id of the last message that was published in the each partition of topic
*/
Map<Integer, MessageId> terminatePartitionedTopic(String topic) throws PulsarAdminException;

/**
* Terminate the partitioned topic and prevent any more messages being published on it.
* <p/>
*
* @param topic
* topic name
* @return the message id of the last message that was published in the each partition of topic
*/
CompletableFuture<Map<Integer, MessageId>> terminatePartitionedTopicAsync(String topic);

/**
* Get the list of subscriptions.
* <p/>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import java.util.ArrayList;
import java.util.Base64;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
Expand Down Expand Up @@ -663,6 +664,54 @@ public void failed(Throwable throwable) {
return future;
}

@Override
public Map<Integer, MessageId> terminatePartitionedTopic(String topic) throws PulsarAdminException {
try {
return terminatePartitionedTopicAsync(topic).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
} catch (ExecutionException e) {
throw (PulsarAdminException) e.getCause();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new PulsarAdminException(e);
} catch (TimeoutException e) {
throw new PulsarAdminException.TimeoutException(e);
}
}

@Override
public CompletableFuture<Map<Integer, MessageId>> terminatePartitionedTopicAsync(String topic) {
TopicName tn = validateTopic(topic);

final CompletableFuture<Map<Integer, MessageId>> future = new CompletableFuture<>();
try {
final WebTarget path = topicPath(tn, "terminate", "partitions");

request(path).async().post(Entity.entity("", MediaType.APPLICATION_JSON),
new InvocationCallback<Map<Integer, MessageIdImpl>>() {

@Override
public void completed(Map<Integer, MessageIdImpl> messageId) {
Map<Integer, MessageId> messageIdImpl = new HashMap<>();
for (Map.Entry<Integer, MessageIdImpl> entry: messageId.entrySet()) {
messageIdImpl.put(entry.getKey(), entry.getValue());
}
future.complete(messageIdImpl);
}

@Override
public void failed(Throwable throwable) {
log.warn("[{}] Failed to perform http post request: {}", path.getUri(),
throwable.getMessage());
future.completeExceptionally(getApiException(throwable.getCause()));
}
});
} catch (PulsarAdminException cae) {
future.completeExceptionally(cae);
}

return future;
}

@Override
public List<String> getSubscriptions(String topic) throws PulsarAdminException {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import java.net.URL;
import java.net.URLClassLoader;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.EnumSet;
import java.util.HashMap;
Expand Down Expand Up @@ -1276,6 +1277,12 @@ public boolean matches(Long timestamp) {
cmdTopics.run(split("terminate persistent://myprop/clust/ns1/ds1"));
verify(mockTopics).terminateTopicAsync("persistent://myprop/clust/ns1/ds1");

Map<Integer, MessageId> results = new HashMap<>();
results.put(0, new MessageIdImpl(1, 1, 0));
when(mockTopics.terminatePartitionedTopic("persistent://myprop/clust/ns1/ds1")).thenReturn(results);
cmdTopics.run(split("partitioned-terminate persistent://myprop/clust/ns1/ds1"));
verify(mockTopics).terminatePartitionedTopic("persistent://myprop/clust/ns1/ds1");

cmdTopics.run(split("compact persistent://myprop/clust/ns1/ds1"));
verify(mockTopics).triggerCompaction("persistent://myprop/clust/ns1/ds1");

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.Supplier;
import java.util.stream.Collectors;

Expand Down Expand Up @@ -125,6 +126,7 @@ public CmdTopics(Supplier<PulsarAdmin> admin) {
jcommander.addCommand("get-message-id", new GetMessageId());
jcommander.addCommand("reset-cursor", new ResetCursor());
jcommander.addCommand("terminate", new Terminate());
jcommander.addCommand("partitioned-terminate", new PartitionedTerminate());
jcommander.addCommand("compact", new Compact());
jcommander.addCommand("compaction-status", new CompactionStatusCmd());
jcommander.addCommand("offload", new Offload());
Expand Down Expand Up @@ -874,6 +876,22 @@ void run() throws PulsarAdminException {
}
}

@Parameters(commandDescription = "Terminate a partitioned topic and don't allow any more messages to be published")
private class PartitionedTerminate extends CliCommand {
@Parameter(description = "persistent://tenant/namespace/topic", required = true)
private java.util.List<String> params;

@Override
void run() throws PulsarAdminException, TimeoutException {
String persistentTopic = validatePersistentTopic(params);
Map<Integer, MessageId> messageIds = getTopics().terminatePartitionedTopic(persistentTopic);
for (Map.Entry<Integer, MessageId> entry: messageIds.entrySet()) {
String topicName = persistentTopic + "-partition-" + entry.getKey();
System.out.println("Topic " + topicName + " succesfully terminated at " + entry.getValue());
}
}
}

@Parameters(commandDescription = "Peek some messages for the subscription")
private class PeekMessages extends CliCommand {
@Parameter(description = "persistent://tenant/namespace/topic", required = true)
Expand Down
8 changes: 8 additions & 0 deletions site2/docs/reference-pulsar-admin.md
Original file line number Diff line number Diff line change
Expand Up @@ -2161,6 +2161,14 @@ Usage
$ pulsar-admin topics terminate persistent://tenant/namespace/topic
```

### `partitioned-terminate`
Terminate a persistent topic (disallow further messages from being published on the topic)

Usage
```bash
$ pulsar-admin topics partitioned-terminate persistent://tenant/namespace/topic
```

### `permissions`
Get the permissions on a topic. Retrieve the effective permissions for a destination. These permissions are defined by the permissions set at the namespace level combined (union) with any eventual specific permissions set on the topic.

Expand Down

0 comments on commit 2adb666

Please sign in to comment.