Skip to content

Commit

Permalink
[fix][flaky-test]LedgerOffloaderMetricsTest (apache#17106)
Browse files Browse the repository at this point in the history
  • Loading branch information
poorbarcode authored Aug 17, 2022
1 parent 2c2b75e commit 5354153
Showing 1 changed file with 0 additions and 59 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,30 +18,16 @@
*/
package org.apache.pulsar.broker.stats;

import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.spy;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertTrue;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.Map;
import java.util.List;
import java.util.Optional;
import java.util.Queue;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import org.apache.bookkeeper.mledger.LedgerOffloader;
import org.apache.bookkeeper.mledger.ManagedLedger;
import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
import org.apache.bookkeeper.mledger.impl.LedgerOffloaderStatsImpl;
import org.apache.pulsar.broker.service.BrokerService;
import org.apache.pulsar.broker.service.BrokerTestBase;
import org.apache.pulsar.broker.service.Topic;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.mockito.Mockito;
import org.mockito.stubbing.Answer;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.Test;

Expand All @@ -67,30 +53,9 @@ public void testTopicLevelMetrics() throws Exception {
String []topics = new String[3];

LedgerOffloaderStatsImpl offloaderStats = (LedgerOffloaderStatsImpl) pulsar.getOffloaderStats();

LedgerOffloader offloader = Mockito.mock(LedgerOffloader.class);
Topic topic = Mockito.mock(PersistentTopic.class);
CompletableFuture<Optional<Topic>> topicFuture = new CompletableFuture<>();
Optional<Topic> topicOptional = Optional.of(topic);
topicFuture.complete(topicOptional);
BrokerService brokerService = spy(pulsar.getBrokerService());
doReturn(brokerService).when(pulsar).getBrokerService();


for (int i = 0; i < 3; i++) {
String topicName = "persistent://prop/ns-abc1/testMetrics" + UUID.randomUUID();
topics[i] = topicName;
admin.topics().createNonPartitionedTopic(topicName);

doReturn(topicFuture).when(brokerService).getTopicIfExists(topicName);
assertTrue(topic instanceof PersistentTopic);

ManagedLedger ledgerM = Mockito.mock(ManagedLedger.class);
doReturn(ledgerM).when(((PersistentTopic) topic)).getManagedLedger();
ManagedLedgerConfig config = Mockito.mock(ManagedLedgerConfig.class);
doReturn(config).when(ledgerM).getConfig();
doReturn(offloader).when(config).getLedgerOffloader();

offloaderStats.recordOffloadError(topicName);
offloaderStats.recordOffloadError(topicName);
offloaderStats.recordOffloadBytes(topicName, 100);
Expand Down Expand Up @@ -123,15 +88,6 @@ public void testNamespaceLevelMetrics() throws Exception {
String ns2 = "prop/ns-abc2";

LedgerOffloaderStatsImpl offloaderStats = (LedgerOffloaderStatsImpl) pulsar.getOffloaderStats();

LedgerOffloader offloader = Mockito.mock(LedgerOffloader.class);
Topic topic = Mockito.mock(PersistentTopic.class);
CompletableFuture<Optional<Topic>> topicFuture = new CompletableFuture<>();
Optional<Topic> topicOptional = Optional.of(topic);
topicFuture.complete(topicOptional);
BrokerService brokerService = spy(pulsar.getBrokerService());
doReturn(brokerService).when(pulsar).getBrokerService();
Queue<String> queue = new LinkedList<>();
Map<String, List<String>> namespace2Topics = new HashMap<>();
for (int s = 0; s < 2; s++) {
String nameSpace = ns1;
Expand All @@ -146,20 +102,6 @@ public void testNamespaceLevelMetrics() throws Exception {
String topicName = baseTopic1 + UUID.randomUUID();
List<String> topicList = namespace2Topics.get(nameSpace);
topicList.add(topicName);

queue.add(topicName);
admin.topics().createNonPartitionedTopic(topicName);
doReturn(topicFuture).when(brokerService).getTopicIfExists(topicName);
assertTrue(topic instanceof PersistentTopic);


ManagedLedger ledgerM = Mockito.mock(ManagedLedger.class);
doReturn(ledgerM).when(((PersistentTopic) topic)).getManagedLedger();
ManagedLedgerConfig config = Mockito.mock(ManagedLedgerConfig.class);
doReturn(config).when(ledgerM).getConfig();
doReturn(offloader).when(config).getLedgerOffloader();
Mockito.when(ledgerM.getName()).thenAnswer((Answer<String>) invocation -> queue.poll());

offloaderStats.recordOffloadError(topicName);
offloaderStats.recordOffloadBytes(topicName, 100);
offloaderStats.recordReadLedgerLatency(topicName, 1000, TimeUnit.NANOSECONDS);
Expand All @@ -171,7 +113,6 @@ public void testNamespaceLevelMetrics() throws Exception {
}

for (Map.Entry<String, List<String>> entry : namespace2Topics.entrySet()) {
String namespace = entry.getKey();
List<String> topics = entry.getValue();
String topicName = topics.get(0);

Expand Down

0 comments on commit 5354153

Please sign in to comment.