Skip to content

Commit

Permalink
[Tests] Remove ".atMost" definitions for Awaitility in most cases (ap…
Browse files Browse the repository at this point in the history
  • Loading branch information
lhotari authored Apr 25, 2021
1 parent 217aed9 commit 306a448
Show file tree
Hide file tree
Showing 38 changed files with 307 additions and 307 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ public void testDisableDelayedDelivery() throws Exception {
DelayedDeliveryPolicies delayedDeliveryPolicies = new DelayedDeliveryPolicies(2000, false);
admin.namespaces().setDelayedDeliveryMessages(namespace, delayedDeliveryPolicies);
//zk update takes time
Awaitility.await().atMost(3, TimeUnit.SECONDS).until(() ->
Awaitility.await().until(() ->
admin.namespaces().getDelayedDelivery(namespace) != null);
assertFalse(admin.namespaces().getDelayedDelivery(namespace).isActive());
assertEquals(2000, admin.namespaces().getDelayedDelivery(namespace).getTickTime());
Expand Down Expand Up @@ -125,11 +125,11 @@ public void testNamespaceDelayedDeliveryPolicyApi() throws Exception {
assertNull(admin.namespaces().getDelayedDelivery(namespace));
DelayedDeliveryPolicies delayedDeliveryPolicies = new DelayedDeliveryPolicies(3, true);
admin.namespaces().setDelayedDeliveryMessages(namespace, delayedDeliveryPolicies);
Awaitility.await().atMost(3, TimeUnit.SECONDS).untilAsserted(()
Awaitility.await().untilAsserted(()
-> assertEquals(admin.namespaces().getDelayedDelivery(namespace), delayedDeliveryPolicies));

admin.namespaces().removeDelayedDeliveryMessages(namespace);
Awaitility.await().atMost(3, TimeUnit.SECONDS).untilAsserted(()
Awaitility.await().untilAsserted(()
-> assertNull(admin.namespaces().getDelayedDelivery(namespace)));
}

Expand All @@ -143,7 +143,7 @@ public void testDelayedDeliveryApplied() throws Exception {
final String topic = "persistent://" + namespace + "/test" + UUID.randomUUID();
admin.namespaces().createNamespace(namespace);
pulsarClient.newProducer().topic(topic).create().close();
Awaitility.await().atMost(3, TimeUnit.SECONDS)
Awaitility.await()
.until(() -> pulsar.getTopicPoliciesService().cacheIsInitialized(TopicName.get(topic)));
//namespace-level default value is null
assertNull(admin.namespaces().getDelayedDelivery(namespace));
Expand All @@ -158,26 +158,26 @@ public void testDelayedDeliveryApplied() throws Exception {
DelayedDeliveryPolicies namespaceLevelPolicy =
new DelayedDeliveryPolicies(100, true);
admin.namespaces().setDelayedDeliveryMessages(namespace, namespaceLevelPolicy);
Awaitility.await().atMost(3, TimeUnit.SECONDS).untilAsserted(()
Awaitility.await().untilAsserted(()
-> assertNotNull(admin.namespaces().getDelayedDelivery(namespace)));
DelayedDeliveryPolicies policyFromBroker = admin.topics().getDelayedDeliveryPolicy(topic, true);
assertEquals(policyFromBroker.getTickTime(), 100);
assertTrue(policyFromBroker.isActive());
// set topic-level policy
DelayedDeliveryPolicies topicLevelPolicy = new DelayedDeliveryPolicies(200, true);
admin.topics().setDelayedDeliveryPolicy(topic, topicLevelPolicy);
Awaitility.await().atMost(3, TimeUnit.SECONDS).untilAsserted(()
Awaitility.await().untilAsserted(()
-> assertNotNull(admin.topics().getDelayedDeliveryPolicy(topic)));
policyFromBroker = admin.topics().getDelayedDeliveryPolicy(topic, true);
assertEquals(policyFromBroker.getTickTime(), 200);
assertTrue(policyFromBroker.isActive());
//remove topic-level policy
admin.topics().removeDelayedDeliveryPolicy(topic);
Awaitility.await().atMost(3, TimeUnit.SECONDS).untilAsserted(()
Awaitility.await().untilAsserted(()
-> assertEquals(admin.topics().getDelayedDeliveryPolicy(topic, true), namespaceLevelPolicy));
//remove namespace-level policy
admin.namespaces().removeDelayedDeliveryMessages(namespace);
Awaitility.await().atMost(3, TimeUnit.SECONDS).untilAsserted(()
Awaitility.await().untilAsserted(()
-> assertEquals(admin.topics().getDelayedDeliveryPolicy(topic, true), brokerLevelPolicy));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -118,11 +118,11 @@ public void testMaxUnackedMessagesOnSubscription() throws Exception {
String namespace = "max-unacked-messages/default-on-subscription";
assertNull(admin.namespaces().getMaxUnackedMessagesPerSubscription(namespace));
admin.namespaces().setMaxUnackedMessagesPerSubscription(namespace, 2*200000);
Awaitility.await().atMost(5, TimeUnit.SECONDS).untilAsserted(()
Awaitility.await().untilAsserted(()
-> assertEquals(2*200000, admin.namespaces().getMaxUnackedMessagesPerSubscription(namespace).intValue()));

admin.namespaces().removeMaxUnackedMessagesPerSubscription(namespace);
Awaitility.await().atMost(5, TimeUnit.SECONDS).untilAsserted(()
Awaitility.await().untilAsserted(()
-> assertNull(admin.namespaces().getMaxUnackedMessagesPerSubscription(namespace)));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -185,20 +185,20 @@ public void testOffloadPoliciesApi() throws Exception {
final String topicName = testTopic + UUID.randomUUID().toString();
admin.topics().createPartitionedTopic(topicName, 3);
pulsarClient.newProducer().topic(topicName).create().close();
Awaitility.await().atMost(10, TimeUnit.SECONDS)
Awaitility.await()
.until(() -> pulsar.getTopicPoliciesService().cacheIsInitialized(TopicName.get(topicName)));
OffloadPolicies offloadPolicies = admin.topics().getOffloadPolicies(topicName);
assertNull(offloadPolicies);
OffloadPolicies offload = new OffloadPolicies();
String path = "fileSystemPath";
offload.setFileSystemProfilePath(path);
admin.topics().setOffloadPolicies(topicName, offload);
Awaitility.await().atMost(10, TimeUnit.SECONDS).untilAsserted(()
Awaitility.await().untilAsserted(()
-> assertNotNull(admin.topics().getOffloadPolicies(topicName)));
assertEquals(admin.topics().getOffloadPolicies(topicName), offload);
assertEquals(admin.topics().getOffloadPolicies(topicName).getFileSystemProfilePath(), path);
admin.topics().removeOffloadPolicies(topicName);
Awaitility.await().atMost(10, TimeUnit.SECONDS).untilAsserted(()
Awaitility.await().untilAsserted(()
-> assertNull(admin.topics().getOffloadPolicies(topicName)));
assertNull(admin.topics().getOffloadPolicies(topicName));
}
Expand All @@ -208,7 +208,7 @@ public void testOffloadPoliciesAppliedApi() throws Exception {
final String topicName = testTopic + UUID.randomUUID().toString();
admin.topics().createPartitionedTopic(topicName, 3);
pulsarClient.newProducer().topic(topicName).create().close();
Awaitility.await().atMost(10, TimeUnit.SECONDS)
Awaitility.await()
.until(() -> pulsar.getTopicPoliciesService().cacheIsInitialized(TopicName.get(topicName)));
OffloadPolicies offloadPolicies = admin.topics().getOffloadPolicies(topicName, true);
OffloadPolicies brokerPolicies = OffloadPolicies
Expand Down Expand Up @@ -270,7 +270,7 @@ private void testOffload(boolean isPartitioned) throws Exception {
admin.topics().createNonPartitionedTopic(topicName);
}
pulsarClient.newProducer().topic(topicName).enableBatching(false).create().close();
Awaitility.await().atMost(10, TimeUnit.SECONDS)
Awaitility.await()
.until(()-> pulsar.getTopicPoliciesService().cacheIsInitialized(TopicName.get(topicName)));
//2 namespace level policy should use NullLedgerOffloader by default
if (isPartitioned) {
Expand Down Expand Up @@ -299,12 +299,12 @@ private void testOffload(boolean isPartitioned) throws Exception {
when(topicOffloader.getOffloadDriverName()).thenReturn("mock");
doReturn(topicOffloader).when(pulsar).createManagedLedgerOffloader(any());

Awaitility.await().atMost(3, TimeUnit.SECONDS)
Awaitility.await()
.until(() -> pulsar.getTopicPoliciesService().cacheIsInitialized(TopicName.get(topicName)));

//4 set topic level offload policies
admin.topics().setOffloadPolicies(topicName, offloadPolicies);
Awaitility.await().atMost(10, TimeUnit.SECONDS).untilAsserted(()
Awaitility.await().untilAsserted(()
-> assertNotNull(admin.topics().getOffloadPolicies(topicName)));
//5 name of offload should become "mock"
if (isPartitioned) {
Expand All @@ -330,7 +330,7 @@ private void testOffload(boolean isPartitioned) throws Exception {
doReturn(map).when(pulsar).getLedgerOffloaderMap();

admin.topics().removeOffloadPolicies(topicName);
Awaitility.await().atMost(10, TimeUnit.SECONDS).untilAsserted(()
Awaitility.await().untilAsserted(()
-> assertNull(admin.topics().getOffloadPolicies(topicName)));
// topic level offloader should be closed
if (isPartitioned) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2194,7 +2194,7 @@ public void testPersistentTopicsExpireMessages() throws Exception {
Thread.sleep(1000);
admin.topics().expireMessages("persistent://prop-xyz/ns1/ds2", "my-sub1", 1);
// Wait at most 2 seconds for sub1's message to expire.
Awaitility.await().atMost(2, TimeUnit.SECONDS).untilAsserted(() -> assertTrue(
Awaitility.await().untilAsserted(() -> assertTrue(
admin.topics().getStats("persistent://prop-xyz/ns1/ds2").subscriptions.get("my-sub1").lastMarkDeleteAdvancedTimestamp > 0L));
topicStats = admin.topics().getStats("persistent://prop-xyz/ns1/ds2");
SubscriptionStats subStats1 = topicStats.subscriptions.get("my-sub1");
Expand All @@ -2209,7 +2209,7 @@ public void testPersistentTopicsExpireMessages() throws Exception {
admin.topics().expireMessages("persistent://prop-xyz/ns1/ds2", "my-sub2",
messageIds.get(4), false);
// Wait at most 2 seconds for sub2's message to expire.
Awaitility.await().atMost(2, TimeUnit.SECONDS).untilAsserted(() -> assertTrue(
Awaitility.await().untilAsserted(() -> assertTrue(
admin.topics().getStats("persistent://prop-xyz/ns1/ds2").subscriptions.get("my-sub2").lastMarkDeleteAdvancedTimestamp > 0L));
topicStats = admin.topics().getStats("persistent://prop-xyz/ns1/ds2");
subStats1 = topicStats.subscriptions.get("my-sub1");
Expand All @@ -2229,7 +2229,7 @@ public void testPersistentTopicsExpireMessages() throws Exception {
assertTrue(e.getMessage().startsWith("Expire message by timestamp not issued on topic"));
}
// Wait at most 2 seconds for sub3's message to expire.
Awaitility.await().atMost(2, TimeUnit.SECONDS).untilAsserted(() -> assertTrue(
Awaitility.await().untilAsserted(() -> assertTrue(
admin.topics().getStats("persistent://prop-xyz/ns1/ds2").subscriptions.get("my-sub3").lastMarkDeleteAdvancedTimestamp > 0L));
topicStats = admin.topics().getStats("persistent://prop-xyz/ns1/ds2");
subStats1 = topicStats.subscriptions.get("my-sub1");
Expand Down Expand Up @@ -2672,7 +2672,7 @@ public void testTopicStatsLastExpireTimestampForSubscription() throws PulsarAdmi
Thread.sleep(10000);
// Update policy to trigger message expiry check.
admin.namespaces().setNamespaceMessageTTL("prop-xyz/ns1", 5);
Awaitility.await().atMost(2, TimeUnit.SECONDS).until(() -> admin.topics().getStats(topic).subscriptions.values().iterator().next().lastExpireTimestamp > 0L);
Awaitility.await().until(() -> admin.topics().getStats(topic).subscriptions.values().iterator().next().lastExpireTimestamp > 0L);
}

@Test(timeOut = 150000)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1675,7 +1675,7 @@ public void testMaxSubPerTopicApi() throws Exception {
admin.namespaces().setMaxSubscriptionsPerTopicAsync(myNamespace,200).get();
assertEquals(admin.namespaces().getMaxSubscriptionsPerTopicAsync(myNamespace).get().intValue(),200);
admin.namespaces().removeMaxSubscriptionsPerTopicAsync(myNamespace);
Awaitility.await().atMost(3, TimeUnit.SECONDS).untilAsserted(()
Awaitility.await().untilAsserted(()
-> assertNull(admin.namespaces().getMaxSubscriptionsPerTopicAsync(myNamespace).get()));

try {
Expand All @@ -1696,7 +1696,7 @@ public void testMaxSubPerTopic() throws Exception {
PersistentTopic persistentTopic = (PersistentTopic) pulsar.getBrokerService().getTopicIfExists(topic).get().get();
Field field = PersistentTopic.class.getSuperclass().getDeclaredField("maxSubscriptionsPerTopic");
field.setAccessible(true);
Awaitility.await().atMost(2, TimeUnit.SECONDS).until(() -> (int) field.get(persistentTopic) == maxSub);
Awaitility.await().until(() -> (int) field.get(persistentTopic) == maxSub);

List<Consumer<?>> consumerList = new ArrayList<>(maxSub);
for (int i = 0; i < maxSub; i++) {
Expand All @@ -1713,7 +1713,7 @@ public void testMaxSubPerTopic() throws Exception {
}
//After removing the restriction, it should be able to create normally
admin.namespaces().removeMaxSubscriptionsPerTopic(myNamespace);
Awaitility.await().atMost(2, TimeUnit.SECONDS).until(() -> field.get(persistentTopic) == null);
Awaitility.await().until(() -> field.get(persistentTopic) == null);
Consumer<?> consumer = pulsarClient.newConsumer().topic(topic).subscriptionName(UUID.randomUUID().toString())
.subscribe();
consumerList.add(consumer);
Expand Down Expand Up @@ -1760,14 +1760,14 @@ public void testMaxSubPerTopicPriority() throws Exception {
PersistentTopic persistentTopic = (PersistentTopic) pulsar.getBrokerService().getTopicIfExists(topic).get().get();
Field field = PersistentTopic.class.getSuperclass().getDeclaredField("maxSubscriptionsPerTopic");
field.setAccessible(true);
Awaitility.await().atMost(2, TimeUnit.SECONDS).until(() -> (int) field.get(persistentTopic) == nsLevelMaxSub);
Awaitility.await().until(() -> (int) field.get(persistentTopic) == nsLevelMaxSub);
Consumer<?> consumer = pulsarClient.newConsumer().topic(topic).subscriptionName(UUID.randomUUID().toString())
.subscribe();
consumerList.add(consumer);
assertEquals(consumerList.size(), 3);
//After removing the restriction, it should fail again
admin.namespaces().removeMaxSubscriptionsPerTopic(myNamespace);
Awaitility.await().atMost(2, TimeUnit.SECONDS).until(() -> field.get(persistentTopic) == null);
Awaitility.await().until(() -> field.get(persistentTopic) == null);
try {
client.newConsumer().topic(topic).subscriptionName(UUID.randomUUID().toString()).subscribe();
fail("should fail");
Expand Down Expand Up @@ -1796,7 +1796,7 @@ public void testMaxProducersPerTopicUnlimited() throws Exception {
final String topic = "persistent://" + myNamespace + "/testMaxProducersPerTopicUnlimited";
//the policy is set to 0, so there will be no restrictions
admin.namespaces().setMaxProducersPerTopic(myNamespace, 0);
Awaitility.await().atMost(3, TimeUnit.SECONDS).until(()
Awaitility.await().until(()
-> admin.namespaces().getMaxProducersPerTopic(myNamespace) == 0);
List<Producer<byte[]>> producers = new ArrayList<>();
for (int i = 0; i < maxProducersPerTopic + 1; i++) {
Expand All @@ -1805,7 +1805,7 @@ public void testMaxProducersPerTopicUnlimited() throws Exception {
}

admin.namespaces().removeMaxProducersPerTopic(myNamespace);
Awaitility.await().atMost(3, TimeUnit.SECONDS).until(()
Awaitility.await().until(()
-> admin.namespaces().getMaxProducersPerTopic(myNamespace) == null);
try {
pulsarClient.newProducer().topic(topic).create();
Expand All @@ -1815,7 +1815,7 @@ public void testMaxProducersPerTopicUnlimited() throws Exception {
}
//set the limit to 3
admin.namespaces().setMaxProducersPerTopic(myNamespace, 3);
Awaitility.await().atMost(3, TimeUnit.SECONDS).until(()
Awaitility.await().until(()
-> admin.namespaces().getMaxProducersPerTopic(myNamespace) == 3);
// should success
Producer<byte[]> producer = pulsarClient.newProducer().topic(topic).create();
Expand Down Expand Up @@ -1851,7 +1851,7 @@ public void testMaxConsumersPerTopicUnlimited() throws Exception {
assertNull(admin.namespaces().getMaxConsumersPerTopic(myNamespace));
//the policy is set to 0, so there will be no restrictions
admin.namespaces().setMaxConsumersPerTopic(myNamespace, 0);
Awaitility.await().atMost(3, TimeUnit.SECONDS).until(()
Awaitility.await().until(()
-> admin.namespaces().getMaxConsumersPerTopic(myNamespace) == 0);
List<Consumer<byte[]>> consumers = new ArrayList<>();
for (int i = 0; i < maxConsumersPerTopic + 1; i++) {
Expand All @@ -1861,7 +1861,7 @@ public void testMaxConsumersPerTopicUnlimited() throws Exception {
}

admin.namespaces().removeMaxConsumersPerTopic(myNamespace);
Awaitility.await().atMost(3, TimeUnit.SECONDS).until(()
Awaitility.await().until(()
-> admin.namespaces().getMaxConsumersPerTopic(myNamespace) == null);
try {
pulsarClient.newConsumer().subscriptionName(UUID.randomUUID().toString()).topic(topic).subscribe();
Expand All @@ -1871,7 +1871,7 @@ public void testMaxConsumersPerTopicUnlimited() throws Exception {
}
//set the limit to 3
admin.namespaces().setMaxConsumersPerTopic(myNamespace, 3);
Awaitility.await().atMost(3, TimeUnit.SECONDS).until(()
Awaitility.await().until(()
-> admin.namespaces().getMaxConsumersPerTopic(myNamespace) == 3);
// should success
Consumer<byte[]> consumer =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,15 +122,15 @@ public void testIncrementPartitionsWithNoSubscriptions() throws Exception {

admin.topics().updatePartitionedTopic(partitionedTopicName, 2);
//zk update takes some time
Awaitility.await().atMost(3, TimeUnit.SECONDS).untilAsserted(() ->
Awaitility.await().untilAsserted(() ->
assertEquals(admin.topics().getPartitionedTopicMetadata(partitionedTopicName).partitions, 2));

admin.topics().updatePartitionedTopic(partitionedTopicName, 10);
Awaitility.await().atMost(3, TimeUnit.SECONDS).untilAsserted(() ->
Awaitility.await().untilAsserted(() ->
assertEquals(admin.topics().getPartitionedTopicMetadata(partitionedTopicName).partitions, 10));

admin.topics().updatePartitionedTopic(partitionedTopicName, 20);
Awaitility.await().atMost(3, TimeUnit.SECONDS).untilAsserted(() ->
Awaitility.await().untilAsserted(() ->
assertEquals(admin.topics().getPartitionedTopicMetadata(partitionedTopicName).partitions, 20));
}

Expand Down
Loading

0 comments on commit 306a448

Please sign in to comment.