Skip to content

Commit

Permalink
Remove deprecated PersistentTopics in favor of Topics (apache#3943)
Browse files Browse the repository at this point in the history
The PersistentTopics was been deprecated in 2.0, which can be replaced with Topics completely. Remove it in favor of topics.
  • Loading branch information
Like authored and jiazhai committed Apr 3, 2019
1 parent 1e8b3d3 commit b51e435
Show file tree
Hide file tree
Showing 9 changed files with 21 additions and 55 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -463,7 +463,7 @@ protected void internalDeletePartitionedTopic(boolean authoritative, boolean for
try {
for (int i = 0; i < numPartitions; i++) {
TopicName topicNamePartition = topicName.getPartition(i);
pulsar().getAdminClient().persistentTopics().deleteAsync(topicNamePartition.toString(), force)
pulsar().getAdminClient().topics().deleteAsync(topicNamePartition.toString(), force)
.whenComplete((r, ex) -> {
if (ex != null) {
if (ex instanceof NotFoundException) {
Expand Down Expand Up @@ -937,7 +937,7 @@ protected void internalCreateSubscription(String subscriptionName, MessageIdImpl
AtomicInteger failureCount = new AtomicInteger(0);

for (int i = 0; i < partitionMetadata.partitions; i++) {
admin.persistentTopics()
admin.topics()
.createSubscriptionAsync(topicName.getPartition(i).toString(), subscriptionName, messageId)
.handle((result, ex) -> {
if (ex != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,16 +96,16 @@ private void testOffload(String topicName, String mlName) throws Exception {
ManagedLedgerInfo info = pulsar.getManagedLedgerFactory().getManagedLedgerInfo(mlName);
Assert.assertEquals(info.ledgers.size(), 2);

Assert.assertEquals(admin.persistentTopics().offloadStatus(topicName).status,
Assert.assertEquals(admin.topics().offloadStatus(topicName).status,
LongRunningProcessStatus.Status.NOT_RUN);

admin.persistentTopics().triggerOffload(topicName, currentId);
admin.topics().triggerOffload(topicName, currentId);

Assert.assertEquals(admin.persistentTopics().offloadStatus(topicName).status,
Assert.assertEquals(admin.topics().offloadStatus(topicName).status,
LongRunningProcessStatus.Status.RUNNING);

try {
admin.persistentTopics().triggerOffload(topicName, currentId);
admin.topics().triggerOffload(topicName, currentId);
Assert.fail("Should have failed");
} catch (ConflictException e) {
// expected
Expand All @@ -114,19 +114,19 @@ private void testOffload(String topicName, String mlName) throws Exception {
// fail first time
promise.completeExceptionally(new Exception("Some random failure"));

Assert.assertEquals(admin.persistentTopics().offloadStatus(topicName).status,
Assert.assertEquals(admin.topics().offloadStatus(topicName).status,
LongRunningProcessStatus.Status.ERROR);
Assert.assertTrue(admin.persistentTopics().offloadStatus(topicName).lastError.contains("Some random failure"));
Assert.assertTrue(admin.topics().offloadStatus(topicName).lastError.contains("Some random failure"));

// Try again
doReturn(CompletableFuture.completedFuture(null))
.when(offloader).offload(anyObject(), anyObject(), anyObject());

admin.persistentTopics().triggerOffload(topicName, currentId);
admin.topics().triggerOffload(topicName, currentId);

Assert.assertEquals(admin.persistentTopics().offloadStatus(topicName).status,
Assert.assertEquals(admin.topics().offloadStatus(topicName).status,
LongRunningProcessStatus.Status.SUCCESS);
MessageIdImpl firstUnoffloaded = admin.persistentTopics().offloadStatus(topicName).firstUnoffloadedMessage;
MessageIdImpl firstUnoffloaded = admin.topics().offloadStatus(topicName).firstUnoffloadedMessage;
// First unoffloaded is the first entry of current ledger
Assert.assertEquals(firstUnoffloaded.getLedgerId(), info.ledgers.get(1).ledgerId);
Assert.assertEquals(firstUnoffloaded.getEntryId(), 0);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ public void testPersistentList() throws Exception {
policies.replication_clusters = ImmutableSet.of("test");
admin.namespaces().createNamespace("tenant/ns", policies);
try {
admin.persistentTopics().getList("tenant/ns");
admin.topics().getList("tenant/ns");
} catch (PulsarAdminException ex) {
ex.printStackTrace();
fail("Should not have thrown an exception");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ public void testForcefullyTopicDeletion() throws Exception {
.enableBatching(false).messageRoutingMode(MessageRoutingMode.SinglePartition).create();
producer1.close();

admin1.persistentTopics().delete(topicName, true);
admin1.topics().delete(topicName, true);

MockedPulsarServiceBaseTest
.retryStrategically((test) -> !pulsar1.getBrokerService().getTopics().containsKey(topicName), 5, 150);
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -333,15 +333,6 @@ public Bookies bookies() {
return bookies;
}

/**
* @return the persistentTopics management object
* @deprecated Since 2.0. See {@link #topics()}
*/
@Deprecated
public PersistentTopics persistentTopics() {
return topics;
}

/**
* @return the persistentTopics management object
* @deprecated Since 2.0. See {@link #topics()}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,6 @@

import org.apache.pulsar.client.admin.LongRunningProcessStatus;
import org.apache.pulsar.client.admin.OffloadProcessStatus;
import org.apache.pulsar.client.admin.PersistentTopics;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.admin.PulsarAdminException.NotFoundException;
import org.apache.pulsar.client.admin.Topics;
Expand All @@ -78,8 +77,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@SuppressWarnings("deprecation")
public class TopicsImpl extends BaseResource implements Topics, PersistentTopics {
public class TopicsImpl extends BaseResource implements Topics {
private final WebTarget adminTopics;
private final WebTarget adminV2Topics;
private final String BATCH_HEADER = "X-Pulsar-num-batch-message";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@
import org.apache.pulsar.client.admin.Lookup;
import org.apache.pulsar.client.admin.Namespaces;
import org.apache.pulsar.client.admin.NonPersistentTopics;
import org.apache.pulsar.client.admin.PersistentTopics;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.ResourceQuotas;
import org.apache.pulsar.client.admin.Tenants;
Expand Down Expand Up @@ -648,8 +647,8 @@ public boolean matches(Object argument) {
@Test
void persistentTopics() throws Exception {
PulsarAdmin admin = Mockito.mock(PulsarAdmin.class);
PersistentTopics mockTopics = mock(PersistentTopics.class);
when(admin.persistentTopics()).thenReturn(mockTopics);
Topics mockTopics = mock(Topics.class);
when(admin.topics()).thenReturn(mockTopics);

CmdPersistentTopics topics = new CmdPersistentTopics(admin);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,9 @@

import static org.apache.commons.lang3.StringUtils.isNotBlank;
import org.apache.pulsar.client.admin.LongRunningProcessStatus;
import org.apache.pulsar.client.admin.PersistentTopics;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.admin.Topics;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.impl.BatchMessageIdImpl;
Expand All @@ -44,14 +44,14 @@
import io.netty.buffer.ByteBufUtil;
import io.netty.buffer.Unpooled;

@SuppressWarnings("deprecation")
@Parameters(commandDescription = "Operations on persistent topics", hidden = true)
@Parameters(commandDescription = "Operations on persistent topics. The persistent-topics " +
"has been deprecated in favor of topics", hidden = true)
public class CmdPersistentTopics extends CmdBase {
private final PersistentTopics persistentTopics;
private final Topics persistentTopics;

public CmdPersistentTopics(PulsarAdmin admin) {
super("persistent", admin);
persistentTopics = admin.persistentTopics();
persistentTopics = admin.topics();

jcommander.addCommand("list", new ListCmd());
jcommander.addCommand("list-partitioned-topics", new PartitionedTopicListCmd());
Expand Down

0 comments on commit b51e435

Please sign in to comment.