Skip to content

Commit

Permalink
Fix unit test using deprecated API (apache#13015)
Browse files Browse the repository at this point in the history
  • Loading branch information
315157973 authored Nov 29, 2021
1 parent d3d580f commit 3d54beb
Show file tree
Hide file tree
Showing 2 changed files with 50 additions and 72 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -127,9 +127,9 @@ public void testConcurrentTopicAndSubscriptionDelete() throws Exception {
.setRequestId(1)
.setSubType(CommandSubscribe.SubType.Exclusive);

Future<Consumer> f1 = topic.subscribe(serverCnx, cmd.getSubscription(), cmd.getConsumerId(), cmd.getSubType(),
0, cmd.getConsumerName(), cmd.isDurable(), null, Collections.emptyMap(), cmd.isReadCompacted(), InitialPosition.Latest,
0 /*avoid reseting cursor*/, false /* replicated */, null);
SubscriptionOption subscriptionOption = getSubscriptionOption(cmd);

Future<Consumer> f1 = topic.subscribe(subscriptionOption);
f1.get();

final CyclicBarrier barrier = new CyclicBarrier(2);
Expand Down Expand Up @@ -189,9 +189,9 @@ public void testConcurrentTopicGCAndSubscriptionDelete() throws Exception {
.setRequestId(1)
.setSubType(CommandSubscribe.SubType.Exclusive);

Future<Consumer> f1 = topic.subscribe(serverCnx, cmd.getSubscription(), cmd.getConsumerId(), cmd.getSubType(),
0, cmd.getConsumerName(), cmd.isDurable(), null, Collections.emptyMap(), cmd.isReadCompacted(), InitialPosition.Latest,
0 /*avoid reseting cursor*/, false /* replicated */, null);
SubscriptionOption subscriptionOption = getSubscriptionOption(cmd);

Future<Consumer> f1 = topic.subscribe(subscriptionOption);
f1.get();

final CyclicBarrier barrier = new CyclicBarrier(2);
Expand Down Expand Up @@ -257,9 +257,9 @@ public void testConcurrentTopicDeleteAndUnsubscribe() throws Exception {
.setRequestId(1)
.setSubType(CommandSubscribe.SubType.Exclusive);

Future<Consumer> f1 = topic.subscribe(serverCnx, cmd.getSubscription(), cmd.getConsumerId(), cmd.getSubType(),
0, cmd.getConsumerName(), cmd.isDurable(), null, Collections.emptyMap(), cmd.isReadCompacted(), InitialPosition.Latest,
0 /*avoid reseting cursor*/, false /* replicated */, null);
SubscriptionOption subscriptionOption = getSubscriptionOption(cmd);

Future<Consumer> f1 = topic.subscribe(subscriptionOption);
f1.get();

final CyclicBarrier barrier = new CyclicBarrier(2);
Expand Down Expand Up @@ -319,9 +319,9 @@ public void testConcurrentTopicDeleteAndSubsUnsubscribe() throws Exception {
.setRequestId(1)
.setSubType(CommandSubscribe.SubType.Exclusive);

Future<Consumer> f1 = topic.subscribe(serverCnx, cmd.getSubscription(), cmd.getConsumerId(), cmd.getSubType(),
0, cmd.getConsumerName(), cmd.isDurable(), null, Collections.emptyMap(), cmd.isReadCompacted(), InitialPosition.Latest,
0 /*avoid reseting cursor*/, false /* replicated */, null);
SubscriptionOption subscriptionOption = getSubscriptionOption(cmd);

Future<Consumer> f1 = topic.subscribe(subscriptionOption);
f1.get();

final CyclicBarrier barrier = new CyclicBarrier(2);
Expand Down Expand Up @@ -369,4 +369,15 @@ public void run() {
counter.await();
assertFalse(gotException.get());
}

private SubscriptionOption getSubscriptionOption(CommandSubscribe cmd) {
return SubscriptionOption.builder().cnx(serverCnx)
.subscriptionName(cmd.getSubscription()).consumerId(cmd.getConsumerId()).subType(cmd.getSubType())
.priorityLevel(0).consumerName(cmd.getConsumerName()).isDurable(cmd.isDurable()).startMessageId(null)
.metadata(Collections.emptyMap()).readCompacted(cmd.isReadCompacted())
.subscriptionProperties(java.util.Optional.empty())
.initialPosition(InitialPosition.Latest)
.startMessageRollbackDurationSec(0).replicatedSubscriptionStateArg(false).keySharedMeta(null)
.build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -684,9 +684,9 @@ public void testSubscribeFail() throws Exception {
.setRequestId(1)
.setSubType(SubType.Exclusive);

Future<Consumer> f1 = topic.subscribe(serverCnx, cmd.getSubscription(), cmd.getConsumerId(), cmd.getSubType(),
0, cmd.getConsumerName(), cmd.isDurable(), null, Collections.emptyMap(), cmd.isReadCompacted(), InitialPosition.Latest,
0 /*avoid reseting cursor*/, false, null);
SubscriptionOption subscriptionOption = getSubscriptionOption(cmd);

Future<Consumer> f1 = topic.subscribe(subscriptionOption);
try {
f1.get();
fail("should fail with exception");
Expand All @@ -696,6 +696,17 @@ public void testSubscribeFail() throws Exception {
}
}

private SubscriptionOption getSubscriptionOption(CommandSubscribe cmd) {
SubscriptionOption subscriptionOption = SubscriptionOption.builder().cnx(serverCnx)
.subscriptionName(cmd.getSubscription()).consumerId(cmd.getConsumerId()).subType(cmd.getSubType())
.priorityLevel(0).consumerName(cmd.getConsumerName()).isDurable(cmd.isDurable()).startMessageId(null)
.metadata(Collections.emptyMap()).readCompacted(false)
.initialPosition(InitialPosition.Latest).subscriptionProperties(Optional.empty())
.startMessageRollbackDurationSec(0).replicatedSubscriptionStateArg(false).keySharedMeta(null)
.build();
return subscriptionOption;
}

@Test
public void testSubscribeUnsubscribe() throws Exception {
PersistentTopic topic = new PersistentTopic(successTopicName, ledgerMock, brokerService);
Expand All @@ -710,15 +721,11 @@ public void testSubscribeUnsubscribe() throws Exception {
.setSubType(SubType.Exclusive);

// 1. simple subscribe
Future<Consumer> f1 = topic.subscribe(serverCnx, cmd.getSubscription(), cmd.getConsumerId(), cmd.getSubType(),
0, cmd.getConsumerName(), cmd.isDurable(), null, Collections.emptyMap(), cmd.isReadCompacted(), InitialPosition.Latest,
0 /*avoid reseting cursor*/, false, null);
Future<Consumer> f1 = topic.subscribe(getSubscriptionOption(cmd));
f1.get();

// 2. duplicate subscribe
Future<Consumer> f2 = topic.subscribe(serverCnx, cmd.getSubscription(), cmd.getConsumerId(), cmd.getSubType(),
0, cmd.getConsumerName(), cmd.isDurable(), null, Collections.emptyMap(), cmd.isReadCompacted(), InitialPosition.Latest,
0 /*avoid reseting cursor*/, false, null);
Future<Consumer> f2 = topic.subscribe(getSubscriptionOption(cmd));
try {
f2.get();
fail("should fail with exception");
Expand Down Expand Up @@ -1254,9 +1261,7 @@ public void testDeleteTopic() throws Exception {
.setRequestId(1)
.setSubType(SubType.Exclusive);

Future<Consumer> f1 = topic.subscribe(serverCnx, cmd.getSubscription(), cmd.getConsumerId(), cmd.getSubType(),
0, cmd.getConsumerName(), cmd.isDurable(), null, Collections.emptyMap(), false /* read compacted */, InitialPosition.Latest,
0 /*avoid reseting cursor*/, false /* replicated */, null);
Future<Consumer> f1 = topic.subscribe(getSubscriptionOption(cmd));
f1.get();

assertTrue(topic.delete().isCompletedExceptionally());
Expand All @@ -1278,9 +1283,7 @@ public void testDeleteAndUnsubscribeTopic() throws Exception {
.setReadCompacted(false)
.setSubType(SubType.Exclusive);

Future<Consumer> f1 = topic.subscribe(serverCnx, cmd.getSubscription(), cmd.getConsumerId(), cmd.getSubType(),
0, cmd.getConsumerName(), cmd.isDurable(), null, Collections.emptyMap(), cmd.isReadCompacted(), InitialPosition.Latest,
0 /*avoid reseting cursor*/, false /* replicated */, null);
Future<Consumer> f1 = topic.subscribe(getSubscriptionOption(cmd));
f1.get();

final CyclicBarrier barrier = new CyclicBarrier(2);
Expand Down Expand Up @@ -1337,21 +1340,7 @@ public void testConcurrentTopicAndSubscriptionDelete() throws Exception {
.setRequestId(1)
.setSubType(SubType.Exclusive);

Future<Consumer> f1 = topic.subscribe(
serverCnx,
cmd.getSubscription(),
cmd.getConsumerId(),
cmd.getSubType(),
0,
cmd.getConsumerName(),
cmd.isDurable(),
null,
Collections.emptyMap(),
cmd.isReadCompacted(),
InitialPosition.Latest,
0 /*avoid reseting cursor*/,
false /* replicated */,
null);
Future<Consumer> f1 = topic.subscribe(getSubscriptionOption(cmd));

f1.get();

Expand Down Expand Up @@ -1445,9 +1434,7 @@ public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
.setRequestId(1)
.setSubType(SubType.Exclusive);

Future<Consumer> f = topic.subscribe(serverCnx, cmd.getSubscription(), cmd.getConsumerId(), cmd.getSubType(),
0, cmd.getConsumerName(), cmd.isDurable(), null, Collections.emptyMap(), cmd.isReadCompacted(), InitialPosition.Latest,
0 /*avoid reseting cursor*/, false /* replicated */, null);
Future<Consumer> f = topic.subscribe(getSubscriptionOption(cmd));
try {
f.get();
fail("should have failed");
Expand Down Expand Up @@ -1588,10 +1575,7 @@ public void testFailoverSubscription() throws Exception {
.setSubType(SubType.Failover);

// 1. Subscribe with non partition topic
Future<Consumer> f1 = topic1.subscribe(serverCnx, cmd1.getSubscription(), cmd1.getConsumerId(),
cmd1.getSubType(), 0, cmd1.getConsumerName(), cmd1.isDurable(), null, Collections.emptyMap(),
cmd1.isReadCompacted(), InitialPosition.Latest,
0 /*avoid reseting cursor*/, false /* replicated */, null);
Future<Consumer> f1 = topic1.subscribe(getSubscriptionOption(cmd1));
f1.get();

// 2. Subscribe with partition topic
Expand All @@ -1606,10 +1590,7 @@ public void testFailoverSubscription() throws Exception {
.setRequestId(1)
.setSubType(SubType.Failover);

Future<Consumer> f2 = topic2.subscribe(serverCnx, cmd2.getSubscription(), cmd2.getConsumerId(),
cmd2.getSubType(), 0, cmd2.getConsumerName(), cmd2.isDurable(), null, Collections.emptyMap(),
cmd2.isReadCompacted(), InitialPosition.Latest,
0 /*avoid reseting cursor*/, false /* replicated */, null);
Future<Consumer> f2 = topic2.subscribe(getSubscriptionOption(cmd2));
f2.get();

// 3. Subscribe and create second consumer
Expand All @@ -1622,10 +1603,7 @@ public void testFailoverSubscription() throws Exception {
.setRequestId(1)
.setSubType(SubType.Failover);

Future<Consumer> f3 = topic2.subscribe(serverCnx, cmd3.getSubscription(), cmd3.getConsumerId(),
cmd3.getSubType(), 0, cmd3.getConsumerName(), cmd3.isDurable(), null, Collections.emptyMap(),
cmd3.isReadCompacted(), InitialPosition.Latest,
0 /*avoid reseting cursor*/, false /* replicated */, null);
Future<Consumer> f3 = topic2.subscribe(getSubscriptionOption(cmd3));
f3.get();

assertEquals(
Expand All @@ -1649,10 +1627,7 @@ public void testFailoverSubscription() throws Exception {
.setRequestId(1)
.setSubType(SubType.Failover);

Future<Consumer> f4 = topic2.subscribe(serverCnx, cmd4.getSubscription(), cmd4.getConsumerId(),
cmd4.getSubType(), 0, cmd4.getConsumerName(), cmd4.isDurable(), null, Collections.emptyMap(),
cmd4.isReadCompacted(), InitialPosition.Latest,
0 /*avoid reseting cursor*/, false /* replicated */, null);
Future<Consumer> f4 = topic2.subscribe(getSubscriptionOption(cmd4));
f4.get();

assertEquals(
Expand Down Expand Up @@ -1681,10 +1656,7 @@ public void testFailoverSubscription() throws Exception {
.setRequestId(1)
.setSubType(SubType.Exclusive);

Future<Consumer> f5 = topic2.subscribe(serverCnx, cmd5.getSubscription(), cmd5.getConsumerId(),
cmd5.getSubType(), 0, cmd5.getConsumerName(), cmd5.isDurable(), null, Collections.emptyMap(),
cmd5.isReadCompacted(), InitialPosition.Latest,
0 /*avoid reseting cursor*/, false /* replicated */, null);
Future<Consumer> f5 = topic2.subscribe(getSubscriptionOption(cmd5));
try {
f5.get();
fail("should fail with exception");
Expand All @@ -1703,10 +1675,7 @@ public void testFailoverSubscription() throws Exception {
.setRequestId(1)
.setSubType(SubType.Exclusive);

Future<Consumer> f6 = topic2.subscribe(serverCnx, cmd6.getSubscription(), cmd6.getConsumerId(),
cmd6.getSubType(), 0, cmd6.getConsumerName(), cmd6.isDurable(), null, Collections.emptyMap(),
cmd6.isReadCompacted(), InitialPosition.Latest,
0 /*avoid reseting cursor*/, false /* replicated */, null);
Future<Consumer> f6 = topic2.subscribe(getSubscriptionOption(cmd6));
f6.get();

// 7. unsubscribe exclusive sub
Expand Down Expand Up @@ -2192,9 +2161,7 @@ public void testGetDurableSubscription() throws Exception {
.setRequestId(1)
.setSubType(SubType.Exclusive);

Future<Consumer> f1 = topic.subscribe(serverCnx, cmd.getSubscription(), cmd.getConsumerId(), cmd.getSubType(),
0, cmd.getConsumerName(), cmd.isDurable(), null, Collections.emptyMap(), cmd.isReadCompacted(), InitialPosition.Latest,
cmd.getStartMessageRollbackDurationSec(), false, null);
Future<Consumer> f1 = topic.subscribe(getSubscriptionOption(cmd));
f1.get();

Future<Void> f2 = topic.unsubscribe(successSubName);
Expand Down

0 comments on commit 3d54beb

Please sign in to comment.