Skip to content

Commit

Permalink
handle SubscriptionBusyException in resetCursor api (apache#7335)
Browse files Browse the repository at this point in the history
  • Loading branch information
jiazhai authored Jun 22, 2020
1 parent 06994ed commit aeee10f
Show file tree
Hide file tree
Showing 2 changed files with 76 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1768,6 +1768,9 @@ private void internalCreateSubscriptionForNonPartitionedTopic(AsyncResponse asyn
"Unable to find position for position specified: " + t.getMessage()));
} else if (e instanceof WebApplicationException) {
asyncResponse.resume(e);
} else if (t instanceof SubscriptionBusyException) {
asyncResponse.resume(new RestException(Status.PRECONDITION_FAILED,
"Failed for Subscription Busy: " + t.getMessage()));
} else {
asyncResponse.resume(new RestException(e));
}
Expand Down Expand Up @@ -1811,6 +1814,9 @@ protected void internalResetCursorOnPosition(String subName, boolean authoritati
} else if (t instanceof SubscriptionInvalidCursorPosition) {
throw new RestException(Status.PRECONDITION_FAILED,
"Unable to find position for position specified: " + t.getMessage());
} else if (t instanceof SubscriptionBusyException) {
throw new RestException(Status.PRECONDITION_FAILED,
"Failed for SubscriptionBusy: " + t.getMessage());
} else {
throw new RestException(e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,18 +24,22 @@
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;

import com.google.common.collect.Lists;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.TimeUnit;

import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.impl.MessageIdImpl;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.util.RelativeTimeUtil;
import org.testng.annotations.AfterClass;
Expand All @@ -44,6 +48,7 @@

/**
*/
@Slf4j
public class SubscriptionSeekTest extends BrokerTestBase {
@BeforeClass
@Override
Expand Down Expand Up @@ -93,6 +98,71 @@ public void testSeek() throws Exception {
Thread.sleep(500);
consumer.seek(messageIds.get(5));
assertEquals(sub.getNumberOfEntriesInBacklog(false), 5);

MessageIdImpl messageId = (MessageIdImpl) messageIds.get(5);
MessageIdImpl beforeEarliest = new MessageIdImpl(
messageId.getLedgerId() - 1, messageId.getEntryId(), messageId.getPartitionIndex());
MessageIdImpl afterLatest = new MessageIdImpl(
messageId.getLedgerId() + 1, messageId.getEntryId(), messageId.getPartitionIndex());

log.info("MessageId {}: beforeEarliest: {}, afterLatest: {}", messageId, beforeEarliest, afterLatest);

Thread.sleep(500);
consumer.seek(beforeEarliest);
assertEquals(sub.getNumberOfEntriesInBacklog(false), 10);

Thread.sleep(500);
consumer.seek(afterLatest);
assertEquals(sub.getNumberOfEntriesInBacklog(false), 0);
}

@Test
public void testConcurrentResetCursor() throws Exception {
final String topicName = "persistent://prop/use/ns-abc/testConcurrentReset_" + System.currentTimeMillis();
final String subscriptionName = "test-sub-name";

Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName).create();

admin.topics().createSubscription(topicName, subscriptionName, MessageId.earliest);

PersistentTopic topicRef = (PersistentTopic) pulsar.getBrokerService().getTopicReference(topicName).get();
assertNotNull(topicRef);
assertEquals(topicRef.getProducers().size(), 1);

List<MessageId> messageIds = new ArrayList<>();
for (int i = 0; i < 10; i++) {
String message = "my-message-" + i;
MessageId msgId = producer.send(message.getBytes());
messageIds.add(msgId);
}

List<PulsarAdminException> exceptions = Lists.newLinkedList();
class ResetCursorThread extends Thread {
public void run() {
try {
admin.topics().resetCursor(topicName, subscriptionName, messageIds.get(3));
} catch (PulsarAdminException e) {
exceptions.add(e);
}
}
}

List<ResetCursorThread> resetCursorThreads = Lists.newLinkedList();
for (int i = 0; i < 4; i ++) {
ResetCursorThread thread = new ResetCursorThread();
resetCursorThreads.add(thread);
}
for (int i = 0; i < 4; i ++) {
resetCursorThreads.get(i).start();
}
for (int i = 0; i < 4; i ++) {
resetCursorThreads.get(i).join();
}

for (int i = 0; i < exceptions.size(); i++) {
log.error("Meet Exception", exceptions.get(i));
assertTrue(exceptions.get(i).getMessage().contains("Failed to fence subscription"));
}
}

@Test
Expand Down

0 comments on commit aeee10f

Please sign in to comment.