From 492c7df1579c12701da6a77af1986d3aa7ace840 Mon Sep 17 00:00:00 2001 From: tison Date: Sat, 17 Sep 2022 17:27:34 +0800 Subject: [PATCH] [improve][test] remove powermock-reflect dependency (#17696) --- pom.xml | 13 --- .../broker/namespace/OwnershipCache.java | 6 +- .../persistent/PersistentSubscription.java | 5 + .../service/persistent/PersistentTopic.java | 5 + .../pendingack/impl/PendingAckHandleImpl.java | 6 ++ .../BookKeeperClientFactoryImplTest.java | 8 +- .../pulsar/broker/PulsarServiceCloseTest.java | 21 ++-- .../admin/AdminApiGetLastMessageIdTest.java | 19 +--- .../broker/admin/PersistentTopicsTest.java | 7 +- .../broker/admin/TopicAutoCreationTest.java | 28 +++--- .../pulsar/broker/admin/TopicsTest.java | 4 +- .../broker/loadbalance/LoadBalancerTest.java | 78 +++++++-------- .../ModularLoadManagerImplTest.java | 30 ++---- .../namespace/NamespaceServiceTest.java | 9 +- .../broker/service/ExclusiveProducerTest.java | 9 +- .../pulsar/broker/service/TopicOwnerTest.java | 97 +------------------ .../stats/ManagedCursorMetricsTest.java | 5 +- .../systopic/PartitionedSystemTopicTest.java | 25 ++--- .../broker/transaction/TransactionTest.java | 52 +++++----- .../buffer/TopicTransactionBufferTest.java | 4 +- .../buffer/TransactionLowWaterMarkTest.java | 12 +-- .../PendingAckInMemoryDeleteTest.java | 34 +++---- .../api/SimpleProducerConsumerTest.java | 24 ++++- .../api/v1/V1_ProducerConsumerTest.java | 25 ++++- .../pulsar/client/impl/NegativeAcksTest.java | 23 ++--- .../pulsar/client/impl/ConsumerImpl.java | 6 ++ .../client/impl/NegativeAcksTracker.java | 7 ++ .../pulsar/client/impl/PulsarClientImpl.java | 2 +- .../client/impl/schema/AvroSchemaTest.java | 2 - 29 files changed, 246 insertions(+), 320 deletions(-) diff --git a/pom.xml b/pom.xml index 5618e95b6603e..5f147915ff16f 100644 --- a/pom.xml +++ b/pom.xml @@ -241,7 +241,6 @@ flexible messaging model and an intuitive client API. 1.1.1 7.3.0 3.12.4 - 2.0.9 3.25.0-GA 1.5.0 3.1 @@ -340,12 +339,6 @@ flexible messaging model and an intuitive client API. ${mockito.version} - - org.powermock - powermock-reflect - ${powermock.version} - - org.apache.zookeeper zookeeper @@ -1331,12 +1324,6 @@ flexible messaging model and an intuitive client API. test - - org.powermock - powermock-reflect - test - - org.assertj assertj-core diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/OwnershipCache.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/OwnershipCache.java index 67e986b804cba..9c2030997fbdb 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/OwnershipCache.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/OwnershipCache.java @@ -106,7 +106,7 @@ public CompletableFuture asyncLoad(NamespaceBundle namespaceBundle, .thenRun(() -> { log.info("Resource lock for {} has expired", rl.getPath()); namespaceService.unloadNamespaceBundle(namespaceBundle); - ownedBundlesCache.synchronous().invalidate(namespaceBundle); + invalidateLocalOwnerCache(namespaceBundle); namespaceService.onNamespaceBundleUnload(namespaceBundle); }); return new OwnedBundle(namespaceBundle); @@ -330,6 +330,10 @@ public void invalidateLocalOwnerCache() { this.ownedBundlesCache.synchronous().invalidateAll(); } + public void invalidateLocalOwnerCache(NamespaceBundle namespaceBundle) { + this.ownedBundlesCache.synchronous().invalidate(namespaceBundle); + } + public synchronized boolean refreshSelfOwnerInfo() { this.selfOwnerInfo = new NamespaceEphemeralData(pulsar.getBrokerServiceUrl(), pulsar.getBrokerServiceUrlTls(), pulsar.getSafeWebServiceAddress(), diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java index 12359d5c413e3..e5d6251d177de 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java @@ -1281,6 +1281,11 @@ public ManagedCursor getCursor() { return cursor; } + @VisibleForTesting + public PendingAckHandle getPendingAckHandle() { + return pendingAckHandle; + } + public void syncBatchPositionBitSetForPendingAck(PositionImpl position) { this.pendingAckHandle.syncBatchPositionAckSetForTransaction(position); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java index 0893ecf9ea895..484120cce245a 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java @@ -382,6 +382,11 @@ private void initializeDispatchRateLimiterIfNeeded() { } } + @VisibleForTesting + public AtomicLong getPendingWriteOps() { + return pendingWriteOps; + } + private PersistentSubscription createPersistentSubscription(String subscriptionName, ManagedCursor cursor, boolean replicated, Map subscriptionProperties) { checkNotNull(compactedTopic); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleImpl.java index 283bc038d764d..a565a20b6649d 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleImpl.java @@ -21,6 +21,7 @@ import static org.apache.bookkeeper.mledger.util.PositionAckSetUtil.andAckSet; import static org.apache.bookkeeper.mledger.util.PositionAckSetUtil.compareToWithAckSet; import static org.apache.bookkeeper.mledger.util.PositionAckSetUtil.isAckSetOverlap; +import com.google.common.annotations.VisibleForTesting; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; @@ -1027,6 +1028,11 @@ public PositionInPendingAckStats checkPositionInPendingAckState(PositionImpl pos } } + @VisibleForTesting + public Map> getIndividualAckPositions() { + return individualAckPositions; + } + @Override public boolean checkIfPendingAckStoreInit() { return this.pendingAckStoreFuture != null && this.pendingAckStoreFuture.isDone(); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/BookKeeperClientFactoryImplTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/BookKeeperClientFactoryImplTest.java index e26b0aa756162..b9c32e91e4c18 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/BookKeeperClientFactoryImplTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/BookKeeperClientFactoryImplTest.java @@ -37,10 +37,10 @@ import org.apache.bookkeeper.net.CachedDNSToSwitchMapping; import org.apache.bookkeeper.stats.StatsLogger; import org.apache.commons.configuration.ConfigurationException; +import org.apache.commons.lang3.reflect.FieldUtils; import org.apache.pulsar.bookie.rackawareness.BookieRackAffinityMapping; import org.apache.pulsar.metadata.api.MetadataStore; import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended; -import org.powermock.reflect.Whitebox; import org.testng.annotations.Test; /** @@ -281,7 +281,7 @@ public void testOpportunisticStripingConfiguration() { } @Test - public void testBookKeeperIoThreadsConfiguration() { + public void testBookKeeperIoThreadsConfiguration() throws Exception { BookKeeperClientFactoryImpl factory = new BookKeeperClientFactoryImpl(); ServiceConfiguration conf = new ServiceConfiguration(); assertEquals(factory.createBkClientConfiguration(mock(MetadataStoreExtended.class), conf) @@ -292,11 +292,11 @@ public void testBookKeeperIoThreadsConfiguration() { EventLoopGroup eventLoopGroup = mock(EventLoopGroup.class); BookKeeper.Builder builder = factory.getBookKeeperBuilder(conf, eventLoopGroup, mock(StatsLogger.class), mock(ClientConfiguration.class)); - assertEquals(Whitebox.getInternalState(builder, "eventLoopGroup"), eventLoopGroup); + assertEquals(FieldUtils.readField(builder, "eventLoopGroup", true), eventLoopGroup); conf.setBookkeeperClientSeparatedIoThreadsEnabled(true); builder = factory.getBookKeeperBuilder(conf, eventLoopGroup, mock(StatsLogger.class), mock(ClientConfiguration.class)); - assertNull(Whitebox.getInternalState(builder, "eventLoopGroup")); + assertNull(FieldUtils.readField(builder, "eventLoopGroup", true)); } } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/PulsarServiceCloseTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/PulsarServiceCloseTest.java index c424132855b6d..1fbb40a6a5614 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/PulsarServiceCloseTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/PulsarServiceCloseTest.java @@ -24,9 +24,9 @@ import java.util.concurrent.ScheduledFuture; import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang3.reflect.FieldUtils; import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest; import org.apache.pulsar.broker.loadbalance.LoadSheddingTask; -import org.awaitility.reflect.WhiteboxImpl; import org.testng.annotations.AfterClass; import org.testng.annotations.BeforeClass; import org.testng.annotations.Test; @@ -62,18 +62,21 @@ protected PulsarService startBrokerWithoutAuthorization(ServiceConfiguration con @Test(timeOut = 30_000) public void closeInTimeTest() throws Exception { LoadSheddingTask task = pulsar.getLoadSheddingTask(); - boolean isCancel = WhiteboxImpl.getInternalState(task, "isCancel"); - assertFalse(isCancel); - ScheduledFuture loadSheddingFuture = WhiteboxImpl.getInternalState(task, "future"); - assertFalse(loadSheddingFuture.isCancelled()); + + { + assertFalse((boolean) FieldUtils.readField(task, "isCancel", true)); + ScheduledFuture loadSheddingFuture = (ScheduledFuture) FieldUtils.readField(task, "future", true); + assertFalse(loadSheddingFuture.isCancelled()); + } // The pulsar service is not used, so it should be closed gracefully in short time. pulsar.close(); - isCancel = WhiteboxImpl.getInternalState(task, "isCancel"); - assertTrue(isCancel); - loadSheddingFuture = WhiteboxImpl.getInternalState(task, "future"); - assertTrue(loadSheddingFuture.isCancelled()); + { + assertTrue((boolean) FieldUtils.readField(task, "isCancel", true)); + ScheduledFuture loadSheddingFuture = (ScheduledFuture) FieldUtils.readField(task, "future", true); + assertTrue(loadSheddingFuture.isCancelled()); + } } } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiGetLastMessageIdTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiGetLastMessageIdTest.java index 2a97bc4f8f2ea..d919958618185 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiGetLastMessageIdTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiGetLastMessageIdTest.java @@ -22,7 +22,6 @@ import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.spy; -import java.lang.reflect.Field; import java.util.Collection; import java.util.Date; import java.util.Map; @@ -32,13 +31,11 @@ import java.util.concurrent.TimeUnit; import javax.ws.rs.container.AsyncResponse; import javax.ws.rs.container.TimeoutHandler; -import javax.ws.rs.core.UriInfo; import org.apache.pulsar.broker.admin.v2.PersistentTopics; import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest; import org.apache.pulsar.broker.authentication.AuthenticationDataHttps; import org.apache.pulsar.broker.service.Topic; import org.apache.pulsar.broker.service.persistent.PersistentTopic; -import org.apache.pulsar.broker.web.PulsarWebResource; import org.apache.pulsar.client.api.MessageId; import org.apache.pulsar.client.api.MessageRoutingMode; import org.apache.pulsar.client.api.Producer; @@ -51,26 +48,16 @@ import org.awaitility.Awaitility; import org.testng.Assert; import org.testng.annotations.AfterMethod; -import org.testng.annotations.BeforeClass; import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; @Test(groups = "broker-admin") public class AdminApiGetLastMessageIdTest extends MockedPulsarServiceBaseTest { - private PersistentTopics persistentTopics; - private final String testTenant = "my-tenant"; - private final String testLocalCluster = "use"; - private final String testNamespace = "my-namespace"; - protected Field uriField; - protected UriInfo uriInfo; + private static final String testTenant = "my-tenant"; + private static final String testNamespace = "my-namespace"; - @BeforeClass - public void initPersistentTopics() throws Exception { - uriField = PulsarWebResource.class.getDeclaredField("uri"); - uriField.setAccessible(true); - uriInfo = mock(UriInfo.class); - } + private PersistentTopics persistentTopics; @Override @BeforeMethod diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java index de555c1715d61..3a9bd21245bf0 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java @@ -18,7 +18,6 @@ */ package org.apache.pulsar.broker.admin; -import static org.apache.pulsar.broker.BrokerTestUtil.spyWithClassAndConstructorArgs; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyBoolean; import static org.mockito.ArgumentMatchers.anyString; @@ -94,7 +93,6 @@ import org.apache.zookeeper.KeeperException; import org.awaitility.Awaitility; import org.mockito.ArgumentCaptor; -import org.powermock.reflect.Whitebox; import org.testng.Assert; import org.testng.annotations.AfterMethod; import org.testng.annotations.BeforeClass; @@ -149,9 +147,8 @@ protected void setup() throws Exception { PulsarResources resources = spy(new PulsarResources(pulsar.getLocalMetadataStore(), pulsar.getConfigurationMetadataStore())); - doReturn(spyWithClassAndConstructorArgs(TopicResources.class, pulsar.getLocalMetadataStore())).when(resources) - .getTopicResources(); - Whitebox.setInternalState(pulsar, "pulsarResources", resources); + doReturn(spy(new TopicResources(pulsar.getLocalMetadataStore()))).when(resources).getTopicResources(); + doReturn(resources).when(pulsar).getPulsarResources(); admin.clusters().createCluster("use", ClusterData.builder().serviceUrl("http://broker-use.com:8080").build()); admin.clusters().createCluster("test", ClusterData.builder().serviceUrl("http://broker-use.com:8080").build()); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicAutoCreationTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicAutoCreationTest.java index 7bd15992f640d..e8c0683569f95 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicAutoCreationTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicAutoCreationTest.java @@ -38,9 +38,9 @@ import org.apache.pulsar.client.api.ProducerConsumerBase; import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.client.impl.LookupService; +import org.apache.pulsar.client.impl.PulsarClientImpl; import org.apache.pulsar.common.naming.NamespaceName; import org.apache.pulsar.common.partition.PartitionedTopicMetadata; -import org.powermock.reflect.Whitebox; import org.testng.annotations.AfterMethod; import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; @@ -122,15 +122,14 @@ public void testPartitionedTopicAutoCreationForbiddenDuringNamespaceDeletion() }); - LookupService original = Whitebox.getInternalState(pulsarClient, "lookup"); + LookupService original = ((PulsarClientImpl) pulsarClient).getLookup(); try { // we want to skip the "lookup" phase, because it is blocked by the HTTP API LookupService mockLookup = mock(LookupService.class); - Whitebox.setInternalState(pulsarClient, "lookup", mockLookup); - when(mockLookup.getPartitionedTopicMetadata(any())).thenAnswer(i -> { - return CompletableFuture.completedFuture(new PartitionedTopicMetadata(0)); - }); + ((PulsarClientImpl) pulsarClient).setLookup(mockLookup); + when(mockLookup.getPartitionedTopicMetadata(any())).thenAnswer( + i -> CompletableFuture.completedFuture(new PartitionedTopicMetadata(0))); when(mockLookup.getBroker(any())).thenAnswer(i -> { InetSocketAddress brokerAddress = new InetSocketAddress(pulsar.getAdvertisedAddress(), pulsar.getBrokerListenPort().get()); @@ -139,20 +138,20 @@ public void testPartitionedTopicAutoCreationForbiddenDuringNamespaceDeletion() // Creating a producer and creating a Consumer may trigger automatic topic // creation, let's try to create a Producer and a Consumer - try (Producer producer = pulsarClient.newProducer() + try (Producer ignored = pulsarClient.newProducer() .sendTimeout(1, TimeUnit.SECONDS) .topic(topic) - .create();) { + .create()) { } catch (PulsarClientException.LookupException expected) { String msg = "Namespace bundle for topic (%s) not served by this instance"; log.info("Expected error", expected); assertTrue(expected.getMessage().contains(String.format(msg, topic))); } - try (Consumer consumer = pulsarClient.newConsumer() + try (Consumer ignored = pulsarClient.newConsumer() .topic(topic) .subscriptionName("test") - .subscribe();) { + .subscribe()) { } catch (PulsarClientException.LookupException expected) { String msg = "Namespace bundle for topic (%s) not served by this instance"; log.info("Expected error", expected); @@ -170,17 +169,16 @@ public void testPartitionedTopicAutoCreationForbiddenDuringNamespaceDeletion() admin.topics().getList(namespaceName).isEmpty(); // create now the topic using auto creation - Whitebox.setInternalState(pulsarClient, "lookup", original); - - try (Consumer consumer = pulsarClient.newConsumer() + ((PulsarClientImpl) pulsarClient).setLookup(original); + try (Consumer ignored = pulsarClient.newConsumer() .topic(topic) .subscriptionName("test") - .subscribe();) { + .subscribe()) { } admin.topics().getList(namespaceName).contains(topic); } finally { - Whitebox.setInternalState(pulsarClient, "lookup", original); + ((PulsarClientImpl) pulsarClient).setLookup(original); } } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicsTest.java index e7dfff6de15df..49ead1c5fcbf2 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicsTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicsTest.java @@ -49,6 +49,7 @@ import org.apache.avro.io.JsonEncoder; import org.apache.avro.reflect.ReflectDatumWriter; import org.apache.avro.util.Utf8; +import org.apache.commons.lang3.reflect.FieldUtils; import org.apache.pulsar.broker.PulsarService; import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest; import org.apache.pulsar.broker.authentication.AuthenticationDataHttps; @@ -88,7 +89,6 @@ import org.mockito.ArgumentCaptor; import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; -import org.powermock.reflect.Whitebox; import org.testng.Assert; import org.testng.annotations.AfterMethod; import org.testng.annotations.BeforeMethod; @@ -320,7 +320,7 @@ public void testLookUpWithRedirect() throws Exception { doReturn(false).when(topics).isRequestHttps(); UriInfo uriInfo = mock(UriInfo.class); doReturn(requestPath).when(uriInfo).getRequestUri(); - Whitebox.setInternalState(topics, "uri", uriInfo); + FieldUtils.writeField(topics, "uri", uriInfo, true); //do produce on another broker topics.setPulsar(pulsar2); AsyncResponse asyncResponse = mock(AsyncResponse.class); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/LoadBalancerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/LoadBalancerTest.java index 8be88350ea653..5d62ec2c58c84 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/LoadBalancerTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/LoadBalancerTest.java @@ -18,7 +18,6 @@ */ package org.apache.pulsar.broker.loadbalance; -import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; @@ -26,7 +25,6 @@ import static org.testng.Assert.assertNotNull; import static org.testng.Assert.assertTrue; import java.lang.reflect.Field; -import java.lang.reflect.Method; import java.net.URL; import java.util.ArrayList; import java.util.Collections; @@ -35,13 +33,17 @@ import java.util.Map; import java.util.Optional; import java.util.Set; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; +import lombok.SneakyThrows; import org.apache.bookkeeper.util.ZkUtils; +import org.apache.commons.lang3.reflect.FieldUtils; +import org.apache.commons.lang3.reflect.MethodUtils; import org.apache.pulsar.broker.PulsarService; import org.apache.pulsar.broker.ServiceConfiguration; import org.apache.pulsar.broker.loadbalance.impl.PulsarResourceDescription; @@ -68,7 +70,8 @@ import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.ZooDefs; import org.awaitility.Awaitility; -import org.powermock.reflect.Whitebox; +import org.mockito.MockedConstruction; +import org.mockito.Mockito; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.testng.Assert; @@ -92,12 +95,12 @@ public class LoadBalancerTest { private static final int MAX_RETRIES = 15; private static final int BROKER_COUNT = 5; - private int[] brokerWebServicePorts = new int[BROKER_COUNT]; - private int[] brokerNativeBrokerPorts = new int[BROKER_COUNT]; - private URL[] brokerUrls = new URL[BROKER_COUNT]; - private String[] lookupAddresses = new String[BROKER_COUNT]; - private PulsarService[] pulsarServices = new PulsarService[BROKER_COUNT]; - private PulsarAdmin[] pulsarAdmins = new PulsarAdmin[BROKER_COUNT]; + private final int[] brokerWebServicePorts = new int[BROKER_COUNT]; + private final int[] brokerNativeBrokerPorts = new int[BROKER_COUNT]; + private final URL[] brokerUrls = new URL[BROKER_COUNT]; + private final String[] lookupAddresses = new String[BROKER_COUNT]; + private final PulsarService[] pulsarServices = new PulsarService[BROKER_COUNT]; + private final PulsarAdmin[] pulsarAdmins = new PulsarAdmin[BROKER_COUNT]; @BeforeMethod void setup() throws Exception { @@ -222,7 +225,7 @@ public void testLoadReportsWrittenOnMetadataStore() throws Exception { } /* - * tests rankings get updated when we write write the new load reports to the zookeeper on loadbalance root node + * tests rankings get updated when we write the new load reports to the zookeeper on load-balance root node * tests writing pre-configured load report on the zookeeper translates the pre-calculated rankings */ @Test @@ -237,18 +240,15 @@ public void testUpdateLoadReportAndCheckUpdatedRanking() throws Exception { sru.setCpu(new ResourceUsage(5, 400)); lr.setSystemResourceUsage(sru); - Whitebox.setInternalState(pulsarServices[0].getLoadManager().get(), "lastLoadReport", lr); - ResourceLock lock = Whitebox.getInternalState(pulsarServices[i].getLoadManager().get(), - "brokerLock"); - lock.updateValue(lr).join(); + FieldUtils.writeField(pulsarServices[0].getLoadManager().get(), "lastLoadReport", lr, true); + updateLastReport(pulsarServices[i].getLoadManager().get(), lr); } for (int i = 0; i < BROKER_COUNT; i++) { - Method updateRanking = Whitebox.getMethod(SimpleLoadManagerImpl.class, "updateRanking"); - updateRanking.invoke(pulsarServices[0].getLoadManager().get()); + MethodUtils.invokeMethod(pulsarServices[0].getLoadManager().get(), true, "updateRanking"); } - // do lookup for bunch of bundles + // do lookup for a bunch of bundles int totalNamespaces = 200; Map namespaceOwner = new HashMap<>(); for (int i = 0; i < totalNamespaces; i++) { @@ -275,6 +275,13 @@ public void testUpdateLoadReportAndCheckUpdatedRanking() throws Exception { } } + @SuppressWarnings("unchecked") + @SneakyThrows + private void updateLastReport(LoadManager lm, LoadReport lr){ + ResourceLock lock = (ResourceLock) FieldUtils.readField(lm, "brokerLock", true); + lock.updateValue(lr).join(); + } + private AtomicReference>> getSortedRanking(PulsarService pulsar) throws NoSuchFieldException, IllegalAccessException { Field ranking = ((SimpleLoadManagerImpl) pulsar.getLoadManager().get()).getClass() @@ -312,17 +319,14 @@ public void testBrokerRanking() throws Exception { sru.setCpu(new ResourceUsage(60, 400)); lr.setSystemResourceUsage(sru); - ResourceLock lock = Whitebox.getInternalState(pulsarServices[i].getLoadManager().get(), - "brokerLock"); - lock.updateValue(lr).join(); + updateLastReport(pulsarServices[i].getLoadManager().get(), lr); } for (int i = 0; i < BROKER_COUNT; i++) { - Method method = Whitebox.getMethod(SimpleLoadManagerImpl.class, "getUpdateRankingHandle"); LoadManager loadManager = pulsarServices[i].getLoadManager().get(); Awaitility.await().until(() -> { - Object invoke = method.invoke(loadManager); - return invoke != null && ((Future) invoke).isDone(); + Future f = ((SimpleLoadManagerImpl) loadManager).getUpdateRankingHandle(); + return f != null && f.isDone(); }); } @@ -376,15 +380,12 @@ public void testTopicAssignmentWithExistingBundles() throws Exception { } lr.setBundleStats(bundleStats); - Whitebox.setInternalState(pulsarServices[0].getLoadManager().get(), "lastLoadReport", lr); - ResourceLock lock = Whitebox.getInternalState(pulsarServices[i].getLoadManager().get(), - "brokerLock"); - lock.updateValue(lr).join(); + FieldUtils.writeField(pulsarServices[0].getLoadManager().get(), "lastLoadReport", lr, true); + updateLastReport(pulsarServices[i].getLoadManager().get(), lr); } for (int i = 0; i < BROKER_COUNT; i++) { - Method updateRanking = Whitebox.getMethod(SimpleLoadManagerImpl.class, "updateRanking"); - updateRanking.invoke(pulsarServices[0].getLoadManager().get()); + MethodUtils.invokeMethod(pulsarServices[0].getLoadManager().get(), true, "updateRanking"); } // print ranking @@ -469,10 +470,7 @@ private void writeLoadReportsForDynamicQuota(long timestamp) throws Exception { bundleStats.put(bundleName, stats); } lr.setBundleStats(bundleStats); - - ResourceLock lock = Whitebox.getInternalState(pulsarServices[i].getLoadManager().get(), - "brokerLock"); - lock.updateValue(lr).join(); + updateLastReport(pulsarServices[i].getLoadManager().get(), lr); } } @@ -597,8 +595,12 @@ public void testNamespaceBundleAutoSplit() throws Exception { } // fake Namespaces Admin - NamespacesImpl namespaceAdmin = mock(NamespacesImpl.class); - Whitebox.setInternalState(pulsarServices[0].getAdminClient(), "namespaces", namespaceAdmin); + CompletableFuture namespaceAdminFuture = new CompletableFuture<>(); + try (MockedConstruction ignore = Mockito.mockConstruction( + NamespacesImpl.class, (allocator, context) -> namespaceAdminFuture.complete(allocator))) { + pulsarServices[0].getAdminClient(); + } + NamespacesImpl namespaceAdmin = namespaceAdminFuture.get(); // create load report // namespace 01~09 need to be split @@ -631,10 +633,8 @@ public void testNamespaceBundleAutoSplit() throws Exception { newBundleStats(maxTopics + 1, 0, 0, 0, 0, 0, 0)); lr.setBundleStats(bundleStats); - Whitebox.setInternalState(pulsarServices[0].getLoadManager().get(), "lastLoadReport", lr); - ResourceLock lock = Whitebox.getInternalState(pulsarServices[0].getLoadManager().get(), - "brokerLock"); - lock.updateValue(lr).join(); + FieldUtils.writeField(pulsarServices[0].getLoadManager().get(), "lastLoadReport", lr, true); + updateLastReport(pulsarServices[0].getLoadManager().get(), lr); // sleep to wait load ranking be triggered and trigger bundle split Thread.sleep(5000); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/ModularLoadManagerImplTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/ModularLoadManagerImplTest.java index a889bedb7c1c8..9b7ea315f59b3 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/ModularLoadManagerImplTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/ModularLoadManagerImplTest.java @@ -85,7 +85,6 @@ import org.apache.pulsar.zookeeper.LocalBookkeeperEnsemble; import org.awaitility.Awaitility; import org.mockito.Mockito; -import org.powermock.reflect.Whitebox; import org.testng.annotations.AfterMethod; import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; @@ -450,20 +449,16 @@ public void testNeedBrokerDataUpdate() throws Exception { /** * It verifies that deletion of broker-znode on broker-stop will invalidate availableBrokerCache list - * - * @throws Exception */ @Test public void testBrokerStopCacheUpdate() throws Exception { ModularLoadManagerWrapper loadManagerWrapper = (ModularLoadManagerWrapper) pulsar1.getLoadManager().get(); - ModularLoadManagerImpl lm = Whitebox.getInternalState(loadManagerWrapper, "loadManager"); + ModularLoadManagerImpl lm = (ModularLoadManagerImpl) loadManagerWrapper.getLoadManager(); assertEquals(lm.getAvailableBrokers().size(), 2); pulsar2.close(); - Awaitility.await().untilAsserted(() -> { - assertEquals(lm.getAvailableBrokers().size(), 1); - }); + Awaitility.await().untilAsserted(() -> assertEquals(lm.getAvailableBrokers().size(), 1)); } /** @@ -481,8 +476,6 @@ public void testBrokerStopCacheUpdate() throws Exception { * b. available-brokers: broker2, broker3 => result: broker2 * c. available-brokers: broker3 => result: NULL * - * - * @throws Exception */ @Test public void testNamespaceIsolationPoliciesForPrimaryAndSecondaryBrokers() throws Exception { @@ -664,27 +657,20 @@ public void testOwnBrokerZnodeByMultipleBroker() throws Exception { @Test public void testRemoveDeadBrokerTimeAverageData() throws Exception { ModularLoadManagerWrapper loadManagerWrapper = (ModularLoadManagerWrapper) pulsar1.getLoadManager().get(); - ModularLoadManagerImpl lm = Whitebox.getInternalState(loadManagerWrapper, "loadManager"); + ModularLoadManagerImpl lm = (ModularLoadManagerImpl) loadManagerWrapper.getLoadManager(); assertEquals(lm.getAvailableBrokers().size(), 2); pulsar2.close(); - Awaitility.await().untilAsserted(() -> { - assertEquals(lm.getAvailableBrokers().size(), 1); - }); + Awaitility.await().untilAsserted(() -> assertEquals(lm.getAvailableBrokers().size(), 1)); lm.updateAll(); List data = pulsar1.getLocalMetadataStore() - .getMetadataCache(TimeAverageBrokerData.class).getChildren(TIME_AVERAGE_BROKER_ZPATH).join(); - - Awaitility.await().untilAsserted(() -> { - assertTrue(pulsar1.getLeaderElectionService().isLeader()); - }); + .getMetadataCache(TimeAverageBrokerData.class) + .getChildren(TIME_AVERAGE_BROKER_ZPATH) + .join(); + Awaitility.await().untilAsserted(() -> assertTrue(pulsar1.getLeaderElectionService().isLeader())); assertEquals(data.size(), 1); - - - - } } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/NamespaceServiceTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/NamespaceServiceTest.java index 6f5462c1cea5d..cb806d3ccfe5d 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/NamespaceServiceTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/NamespaceServiceTest.java @@ -49,6 +49,7 @@ import lombok.Cleanup; import org.apache.bookkeeper.mledger.ManagedLedger; import org.apache.commons.collections4.CollectionUtils; +import org.apache.commons.lang3.reflect.FieldUtils; import org.apache.commons.lang3.tuple.Pair; import org.apache.pulsar.broker.loadbalance.LoadData; import org.apache.pulsar.broker.loadbalance.LoadManager; @@ -87,7 +88,6 @@ import org.apache.pulsar.policies.data.loadbalancer.BundleData; import org.awaitility.Awaitility; import org.mockito.stubbing.Answer; -import org.powermock.reflect.Whitebox; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.testng.Assert; @@ -822,11 +822,10 @@ private void waitResourceDataUpdateToZK(LoadManager loadManager) throws Exceptio // Wait until "ModularLoadManager" completes processing the ZK notification. ModularLoadManagerWrapper modularLoadManagerWrapper = (ModularLoadManagerWrapper) loadManager; ModularLoadManagerImpl modularLoadManager = (ModularLoadManagerImpl) modularLoadManagerWrapper.getLoadManager(); - ScheduledExecutorService scheduler = Whitebox.getInternalState(modularLoadManager, "scheduler"); + ScheduledExecutorService scheduler = (ScheduledExecutorService) FieldUtils.readField( + modularLoadManager, "scheduler", true); CompletableFuture waitForNoticeHandleFinishByLoadManager = new CompletableFuture<>(); - scheduler.execute(() -> { - waitForNoticeHandleFinishByLoadManager.complete(null); - }); + scheduler.execute(() -> waitForNoticeHandleFinishByLoadManager.complete(null)); waitForNoticeHandleFinishByLoadManager.join(); // Manually trigger "LoadResourceQuotaUpdaterTask" loadManager.writeResourceQuotasToZooKeeper(); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ExclusiveProducerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ExclusiveProducerTest.java index 604abd8d7095f..124372ea15a90 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ExclusiveProducerTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ExclusiveProducerTest.java @@ -39,7 +39,6 @@ import org.apache.pulsar.client.impl.PulsarClientImpl; import org.apache.pulsar.common.naming.TopicName; import org.awaitility.Awaitility; -import org.powermock.reflect.Whitebox; import org.testng.Assert; import org.testng.annotations.AfterClass; import org.testng.annotations.BeforeClass; @@ -294,17 +293,13 @@ public void producerFenced(ProducerAccessMode accessMode, boolean partitioned) t // Simulate a producer that takes over and fences p1 through the topic epoch if (!partitioned) { Topic t = pulsar.getBrokerService().getTopic(topic, false).get().get(); - CompletableFuture f = (CompletableFuture) Whitebox - .getMethod(AbstractTopic.class, "incrementTopicEpoch", Optional.class) - .invoke(t, Optional.of(0L)); + CompletableFuture f = ((AbstractTopic) t).incrementTopicEpoch(Optional.of(0L)); f.get(); } else { for (int i = 0; i < 3; i++) { String name = TopicName.get(topic).getPartition(i).toString(); Topic t = pulsar.getBrokerService().getTopic(name, false).get().get(); - CompletableFuture f = (CompletableFuture) Whitebox - .getMethod(AbstractTopic.class, "incrementTopicEpoch", Optional.class) - .invoke(t, Optional.of(0L)); + CompletableFuture f = ((AbstractTopic) t).incrementTopicEpoch(Optional.of(0L)); f.get(); } } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TopicOwnerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TopicOwnerTest.java index d8dadaf8b5bd3..d8485f3f051c0 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TopicOwnerTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TopicOwnerTest.java @@ -19,11 +19,8 @@ package org.apache.pulsar.broker.service; import static org.mockito.ArgumentMatchers.any; -import static org.mockito.ArgumentMatchers.nullable; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.spy; - -import com.github.benmanes.caffeine.cache.AsyncLoadingCache; import com.google.common.collect.Sets; import java.util.LinkedHashMap; import java.util.List; @@ -31,15 +28,14 @@ import java.util.Optional; import java.util.concurrent.CompletableFuture; import lombok.Cleanup; -import org.apache.commons.lang3.mutable.MutableBoolean; +import lombok.SneakyThrows; import org.apache.commons.lang3.mutable.MutableObject; -import org.apache.jute.Record; +import org.apache.commons.lang3.reflect.FieldUtils; import org.apache.pulsar.broker.PulsarService; import org.apache.pulsar.broker.ServiceConfiguration; import org.apache.pulsar.broker.lookup.LookupResult; import org.apache.pulsar.broker.namespace.LookupOptions; import org.apache.pulsar.broker.namespace.NamespaceService; -import org.apache.pulsar.broker.namespace.OwnedBundle; import org.apache.pulsar.broker.namespace.OwnershipCache; import org.apache.pulsar.client.admin.PulsarAdmin; import org.apache.pulsar.client.api.Consumer; @@ -48,16 +44,8 @@ import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.policies.data.ClusterData; import org.apache.pulsar.common.policies.data.TenantInfo; -import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended; -import org.apache.pulsar.metadata.api.extended.SessionEvent; import org.apache.pulsar.zookeeper.LocalBookkeeperEnsemble; -import org.apache.zookeeper.ZooKeeper; -import org.apache.zookeeper.proto.ReplyHeader; -import org.apache.zookeeper.server.Request; -import org.apache.zookeeper.server.ServerCnxn; -import org.apache.zookeeper.server.ZooKeeperServer; import org.mockito.stubbing.Answer; -import org.powermock.reflect.Whitebox; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.testng.Assert; @@ -136,6 +124,7 @@ void tearDown() throws Exception { } @SuppressWarnings("unchecked") + @SneakyThrows(IllegalAccessException.class) private MutableObject spyLeaderNamespaceServiceForAuthorizedBroker() { // Spy leader namespace service to inject authorized broker for namespace-bundle from leader, // this is a safe operation since it is just an recommendation if namespace-bundle has no owner @@ -157,85 +146,10 @@ private MutableObject spyLeaderNamespaceServiceForAuthorizedBroke return CompletableFuture.completedFuture(Optional.of(lookupResult)); }; doAnswer(answer).when(spyLeaderNamespaceService).getBrokerServiceUrlAsync(any(TopicName.class), any(LookupOptions.class)); - Whitebox.setInternalState(leaderPulsar, "nsService", spyLeaderNamespaceService); + FieldUtils.writeField(leaderPulsar, "nsService", spyLeaderNamespaceService, true); return leaderAuthorizedBroker; } - private CompletableFuture watchMetadataStoreReconnect(MetadataStoreExtended store) { - CompletableFuture reconnectedFuture = new CompletableFuture<>(); - store.registerSessionListener(event -> { - if (event == SessionEvent.Reconnected || event == SessionEvent.SessionReestablished) { - reconnectedFuture.complete(null); - } - }); - - return reconnectedFuture; - } - - @FunctionalInterface - interface RequestMatcher { - boolean match(Request request) throws Exception; - } - - private void spyZookeeperToDisconnectBeforePersist(ZooKeeper zooKeeper, RequestMatcher matcher) { - ZooKeeperServer zooKeeperServer = bkEnsemble.getZkServer(); - ServerCnxn zkServerConnection = bkEnsemble.getZookeeperServerConnection(zooKeeper); - ZooKeeperServer spyZooKeeperServer = spy(zooKeeperServer); - - // Spy zk server connection to close connection to cause connection loss after namespace-bundle - // deleted successfully. This mimics crash of connected zk server after committing requested operation. - Whitebox.setInternalState(zkServerConnection, "zkServer", spyZooKeeperServer); - doAnswer(invocation -> { - Request request = invocation.getArgument(0); - if (request.sessionId != zooKeeper.getSessionId()) { - return invocation.callRealMethod(); - } - if (!matcher.match(request)) { - return invocation.callRealMethod(); - } - Whitebox.setInternalState(zkServerConnection, "zkServer", zooKeeperServer); - bkEnsemble.disconnectZookeeper(zooKeeper); - return null; - }).when(spyZooKeeperServer).submitRequest(any(Request.class)); - } - - private void spyZookeeperToDisconnectAfterPersist(ZooKeeper zooKeeper, RequestMatcher matcher) { - ZooKeeperServer zooKeeperServer = bkEnsemble.getZkServer(); - ServerCnxn zkServerConnection = bkEnsemble.getZookeeperServerConnection(zooKeeper); - ZooKeeperServer spyZooKeeperServer = spy(zooKeeperServer); - - // Spy zk server connection to close connection to cause connection loss after namespace-bundle - // deleted successfully. This mimics crash of connected zk server after committing requested operation. - Whitebox.setInternalState(zkServerConnection, "zkServer", spyZooKeeperServer); - MutableBoolean disconnected = new MutableBoolean(); - doAnswer(invocation -> { - Request request = invocation.getArgument(0); - if (request.sessionId != zooKeeper.getSessionId()) { - return invocation.callRealMethod(); - } - if (!matcher.match(request)) { - return invocation.callRealMethod(); - } - - ServerCnxn spyZkServerConnection1 = spy(zkServerConnection); - doAnswer(responseInvocation -> { - synchronized (disconnected) { - ReplyHeader replyHeader = responseInvocation.getArgument(0); - if (replyHeader.getXid() == request.cxid && replyHeader.getErr() == 0) { - Whitebox.setInternalState(zkServerConnection, "zkServer", zooKeeperServer); - disconnected.setTrue(); - bkEnsemble.disconnectZookeeper(zooKeeper); - } else if (disconnected.isFalse()) { - return responseInvocation.callRealMethod(); - } - return null; - } - }).when(spyZkServerConnection1).sendResponse(any(ReplyHeader.class), nullable(Record.class), any(String.class)); - Whitebox.setInternalState(request, "cnxn", spyZkServerConnection1); - return invocation.callRealMethod(); - }).when(spyZooKeeperServer).submitRequest(any(Request.class)); - } - @Test public void testReestablishOwnershipAfterInvalidateCache() throws Exception { String topic1 = "persistent://my-tenant/my-ns/topic-1"; @@ -254,12 +168,11 @@ public void testReestablishOwnershipAfterInvalidateCache() throws Exception { Assert.assertEquals(pulsarAdmins[4].lookups().lookupTopic(topic1), pulsar1.getBrokerServiceUrl()); OwnershipCache ownershipCache1 = pulsar1.getNamespaceService().getOwnershipCache(); - AsyncLoadingCache ownedBundlesCache1 = Whitebox.getInternalState(ownershipCache1, "ownedBundlesCache"); leaderAuthorizedBroker.setValue(null); Assert.assertNotNull(ownershipCache1.getOwnedBundle(namespaceBundle)); - ownedBundlesCache1.synchronous().invalidate(namespaceBundle); + ownershipCache1.invalidateLocalOwnerCache(namespaceBundle); Assert.assertNull(ownershipCache1.getOwnedBundle(namespaceBundle)); // pulsar1 is still owner in zk. diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/ManagedCursorMetricsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/ManagedCursorMetricsTest.java index 72435aa265e54..6ad17b7d27eb1 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/ManagedCursorMetricsTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/ManagedCursorMetricsTest.java @@ -27,6 +27,7 @@ import org.apache.bookkeeper.client.LedgerHandle; import org.apache.bookkeeper.mledger.ManagedCursorMXBean; import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl; +import org.apache.commons.lang3.reflect.FieldUtils; import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest; import org.apache.pulsar.broker.service.persistent.PersistentSubscription; import org.apache.pulsar.broker.stats.metrics.ManagedCursorMetrics; @@ -41,7 +42,6 @@ import org.apache.pulsar.client.impl.PulsarTestClient; import org.apache.pulsar.common.stats.Metrics; import org.awaitility.Awaitility; -import org.powermock.reflect.Whitebox; import org.testng.Assert; import org.testng.annotations.AfterClass; import org.testng.annotations.BeforeClass; @@ -171,7 +171,8 @@ public void testManagedCursorMetrics() throws Exception { producer.send(message.getBytes()); consumer.acknowledge(consumer.receive().getMessageId()); // Make BK error. - LedgerHandle ledgerHandle = Whitebox.getInternalState(managedCursor, "cursorLedger"); + LedgerHandle ledgerHandle = (LedgerHandle) FieldUtils.readField( + managedCursor, "cursorLedger", true); ledgerHandle.close(); return managedCursorMXBean.getPersistLedgerErrors() > 0 && managedCursorMXBean.getPersistZookeeperSucceed() > 0; diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/systopic/PartitionedSystemTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/systopic/PartitionedSystemTopicTest.java index 4e03f8392137d..b13c8ff2cb2c6 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/systopic/PartitionedSystemTopicTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/systopic/PartitionedSystemTopicTest.java @@ -19,12 +19,19 @@ package org.apache.pulsar.broker.systopic; import com.google.common.collect.Sets; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; import java.util.HashSet; +import java.util.List; +import java.util.Optional; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; import lombok.Cleanup; import org.apache.bookkeeper.mledger.LedgerOffloader; import org.apache.bookkeeper.mledger.ManagedLedgerConfig; import org.apache.bookkeeper.mledger.impl.NullLedgerOffloader; import org.apache.commons.lang.RandomStringUtils; +import org.apache.commons.lang3.reflect.MethodUtils; import org.apache.pulsar.broker.admin.impl.BrokersBase; import org.apache.pulsar.broker.namespace.NamespaceService; import org.apache.pulsar.broker.service.BrokerTestBase; @@ -33,6 +40,7 @@ import org.apache.pulsar.client.admin.ListTopicsOptions; import org.apache.pulsar.client.api.Consumer; import org.apache.pulsar.client.api.Message; +import org.apache.pulsar.client.api.MessageId; import org.apache.pulsar.client.api.Producer; import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.api.Schema; @@ -40,26 +48,18 @@ import org.apache.pulsar.client.api.SubscriptionType; import org.apache.pulsar.common.naming.NamespaceName; import org.apache.pulsar.common.naming.SystemTopicNames; -import org.apache.pulsar.common.policies.data.BacklogQuota; -import org.apache.pulsar.common.policies.data.TenantInfo; -import org.apache.pulsar.client.api.MessageId; import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.naming.TopicVersion; +import org.apache.pulsar.common.policies.data.BacklogQuota; +import org.apache.pulsar.common.policies.data.TenantInfo; import org.apache.pulsar.common.policies.data.TenantInfoImpl; import org.apache.pulsar.common.util.FutureUtil; import org.awaitility.Awaitility; import org.mockito.Mockito; -import org.powermock.reflect.Whitebox; import org.testng.Assert; import org.testng.annotations.AfterMethod; import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; -import java.nio.charset.StandardCharsets; -import java.util.ArrayList; -import java.util.List; -import java.util.Optional; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.TimeUnit; @Test(groups = "broker") public class PartitionedSystemTopicTest extends BrokerTestBase { @@ -207,7 +207,10 @@ private void testSetBacklogCausedCreatingProducerFailure() throws Exception { config.setMinimumRolloverTime(1, TimeUnit.SECONDS); config.setMaximumRolloverTime(1, TimeUnit.SECONDS); persistentTopic.getManagedLedger().setConfig(config); - Whitebox.invokeMethod(persistentTopic.getManagedLedger(), "updateLastLedgerCreatedTimeAndScheduleRolloverTask"); + MethodUtils.invokeMethod( + persistentTopic.getManagedLedger(), + true, + "updateLastLedgerCreatedTimeAndScheduleRolloverTask"); String msg1 = "msg-1"; producer.send(msg1); Thread.sleep(3 * 1000); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java index d0674721c00be..ffc351e941353 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java @@ -60,6 +60,7 @@ import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; import lombok.Cleanup; +import lombok.Lombok; import lombok.extern.slf4j.Slf4j; import org.apache.bookkeeper.common.util.Bytes; import org.apache.bookkeeper.mledger.AsyncCallbacks; @@ -71,6 +72,8 @@ import org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl; import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl; import org.apache.bookkeeper.mledger.impl.PositionImpl; +import org.apache.commons.lang3.reflect.FieldUtils; +import org.apache.commons.lang3.reflect.MethodUtils; import org.apache.pulsar.broker.PulsarService; import org.apache.pulsar.broker.ServiceConfiguration; import org.apache.pulsar.broker.intercept.CounterBrokerInterceptor; @@ -138,7 +141,6 @@ import org.awaitility.Awaitility; import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; -import org.powermock.reflect.Whitebox; import org.testng.Assert; import org.testng.annotations.AfterClass; import org.testng.annotations.BeforeClass; @@ -1244,8 +1246,8 @@ public void testGetConnectExceptionForAckMsgWhenCnxIsNull() throws Exception { for (int i = 0; i < 10; i++) { producer.newMessage().value(Bytes.toBytes(i)).send(); } - ClientCnx cnx = Whitebox.invokeMethod(consumer, "cnx"); - Whitebox.invokeMethod(consumer, "connectionClosed", cnx); + ClientCnx cnx = (ClientCnx) MethodUtils.invokeMethod(consumer, true, "cnx"); + MethodUtils.invokeMethod(consumer, true, "connectionClosed", cnx); Message message = consumer.receive(); Transaction transaction = pulsarClient @@ -1391,39 +1393,37 @@ public Object answer(InvocationOnMock invocation) throws Throwable { */ @Test public void testTBRecoverChangeStateError() throws InterruptedException, TimeoutException { - final AtomicReference persistentTopic = new AtomicReference(); - AtomicInteger atomicInteger = new AtomicInteger(1); + final AtomicReference persistentTopic = new AtomicReference<>(); // Create Executor - ScheduledExecutorService executorService_recover = mock(ScheduledExecutorService.class); + ScheduledExecutorService executorServiceRecover = mock(ScheduledExecutorService.class); // Mock serviceConfiguration. ServiceConfiguration serviceConfiguration = mock(ServiceConfiguration.class); when(serviceConfiguration.isEnableReplicatedSubscriptions()).thenReturn(false); when(serviceConfiguration.isTransactionCoordinatorEnabled()).thenReturn(true); // Mock executorProvider. ExecutorProvider executorProvider = mock(ExecutorProvider.class); - when(executorProvider.getExecutor(any(Object.class))).thenReturn(executorService_recover); + when(executorProvider.getExecutor(any(Object.class))).thenReturn(executorServiceRecover); // Mock pendingAckStore. PendingAckStore pendingAckStore = mock(PendingAckStore.class); - doAnswer(new Answer() { - @Override - public Object answer(InvocationOnMock invocation) throws Throwable { - new Thread(() -> { - TopicTransactionBuffer.TopicTransactionBufferRecover recover - = (TopicTransactionBuffer.TopicTransactionBufferRecover)invocation.getArguments()[0]; - TopicTransactionBufferRecoverCallBack callBack - = Whitebox.getInternalState(recover, "callBack");; - try { - persistentTopic.get().getTransactionBuffer().closeAsync().get(); - } catch (InterruptedException e) { - throw new RuntimeException(e); - } catch (ExecutionException e) { - throw new RuntimeException(e); + doAnswer(invocation -> { + new Thread(() -> { + TopicTransactionBuffer.TopicTransactionBufferRecover recover + = (TopicTransactionBuffer.TopicTransactionBufferRecover) invocation.getArguments()[0]; + TopicTransactionBufferRecoverCallBack callBack = null; + try { + callBack = (TopicTransactionBufferRecoverCallBack) FieldUtils.readField( + recover, "callBack", true); + persistentTopic.get().getTransactionBuffer().closeAsync().get(); + } catch (Exception e) { + throw Lombok.sneakyThrow(e); + } finally { + if (callBack != null) { + callBack.recoverComplete(); } - callBack.recoverComplete(); - }).start(); - return null; - } - }).when(executorService_recover).execute(any()); + } + }).start(); + return null; + }).when(executorServiceRecover).execute(any()); // Mock executorProvider. TransactionPendingAckStoreProvider pendingAckStoreProvider = mock(TransactionPendingAckStoreProvider.class); when(pendingAckStoreProvider.checkInitializedBefore(any())) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TopicTransactionBufferTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TopicTransactionBufferTest.java index 576ef647248d4..ba7b8b6b40f36 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TopicTransactionBufferTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TopicTransactionBufferTest.java @@ -19,6 +19,7 @@ package org.apache.pulsar.broker.transaction.buffer; import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl; +import org.apache.commons.lang3.reflect.FieldUtils; import org.apache.pulsar.broker.service.persistent.PersistentTopic; import org.apache.pulsar.broker.transaction.TransactionTestBase; import org.apache.pulsar.client.api.Producer; @@ -29,7 +30,6 @@ import org.apache.pulsar.transaction.coordinator.TransactionMetadataStoreState; import org.apache.pulsar.transaction.coordinator.impl.MLTransactionMetadataStore; import org.awaitility.Awaitility; -import org.powermock.reflect.Whitebox; import org.testng.annotations.AfterMethod; import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; @@ -83,7 +83,7 @@ public void testTransactionBufferAppendMarkerWriteFailState() throws Exception { producer.newMessage(txn).value("test".getBytes()).send(); PersistentTopic persistentTopic = (PersistentTopic) getPulsarServiceList().get(0) .getBrokerService().getTopic(TopicName.get(topic).toString(), false).get().get(); - Whitebox.setInternalState(persistentTopic.getManagedLedger(), "state", ManagedLedgerImpl.State.WriteFailed); + FieldUtils.writeField(persistentTopic.getManagedLedger(), "state", ManagedLedgerImpl.State.WriteFailed, true); txn.commit().get(); } } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TransactionLowWaterMarkTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TransactionLowWaterMarkTest.java index 4cc29f396cfa9..0922c9b801b6d 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TransactionLowWaterMarkTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TransactionLowWaterMarkTest.java @@ -29,7 +29,6 @@ import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicLong; import lombok.Cleanup; import lombok.extern.slf4j.Slf4j; import org.apache.bookkeeper.mledger.impl.PositionImpl; @@ -61,7 +60,6 @@ import org.apache.pulsar.transaction.coordinator.TransactionMetadataStoreState; import org.apache.pulsar.transaction.coordinator.impl.MLTransactionMetadataStore; import org.awaitility.Awaitility; -import org.powermock.reflect.Whitebox; import org.testng.Assert; import org.testng.annotations.AfterMethod; import org.testng.annotations.BeforeMethod; @@ -324,9 +322,11 @@ public void testTBLowWaterMarkEndToEnd() throws Exception { field.setAccessible(true); field.set(txn1, TransactionImpl.State.OPEN); - AtomicLong pendingWriteOps = Whitebox.getInternalState(getPulsarServiceList().get(0) - .getBrokerService().getTopic(TopicName.get(TOPIC).toString(), - false).get().get(), "pendingWriteOps"); + PersistentTopic t = (PersistentTopic) getPulsarServiceList().get(0) + .getBrokerService() + .getTopic(TopicName.get(TOPIC).toString(), false) + .get() + .orElseThrow(); try { producer.newMessage(txn1).send(); fail(); @@ -334,7 +334,7 @@ public void testTBLowWaterMarkEndToEnd() throws Exception { // no-op } - assertEquals(pendingWriteOps.get(), 0); + assertEquals(t.getPendingWriteOps().get(), 0); } @Test diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckInMemoryDeleteTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckInMemoryDeleteTest.java index c35d15d96da05..58be651a7d183 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckInMemoryDeleteTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckInMemoryDeleteTest.java @@ -19,6 +19,15 @@ package org.apache.pulsar.broker.transaction.pendingack; +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertTrue; +import java.lang.reflect.Field; +import java.util.HashMap; +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentSkipListMap; +import java.util.concurrent.TimeUnit; import lombok.Cleanup; import lombok.extern.slf4j.Slf4j; import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl; @@ -40,23 +49,11 @@ import org.apache.pulsar.common.util.collections.BitSetRecyclable; import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap; import org.awaitility.Awaitility; -import org.powermock.reflect.Whitebox; import org.testng.Assert; import org.testng.annotations.AfterMethod; import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; -import java.lang.reflect.Field; -import java.util.HashMap; -import java.util.Map; -import java.util.Optional; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ConcurrentSkipListMap; -import java.util.concurrent.TimeUnit; - -import static org.testng.Assert.assertEquals; -import static org.testng.Assert.assertTrue; - @Slf4j @Test(groups = "broker") public class PendingAckInMemoryDeleteTest extends TransactionTestBase { @@ -309,12 +306,15 @@ public void testPendingAckClearPositionIsSmallerThanMarkDelete() throws Exceptio consumer.acknowledgeAsync(consumer.receive().getMessageId(), commitTxn).get(); - PendingAckHandle pendingAckHandle = Whitebox.getInternalState(getPulsarServiceList().get(0) - .getBrokerService().getTopic("persistent://" + normalTopic, false).get().get() - .getSubscription(subscriptionName), "pendingAckHandle"); - + Topic t = getPulsarServiceList().get(0) + .getBrokerService() + .getTopic("persistent://" + normalTopic, false) + .get() + .orElseThrow(); + PersistentSubscription subscription = (PersistentSubscription) t.getSubscription(subscriptionName); + PendingAckHandleImpl pendingAckHandle = (PendingAckHandleImpl) subscription.getPendingAckHandle(); Map> individualAckPositions = - Whitebox.getInternalState(pendingAckHandle, "individualAckPositions"); + pendingAckHandle.getIndividualAckPositions(); // one message in pending ack state assertEquals(1, individualAckPositions.size()); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java index 2f572a841b09a..669161f67c058 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java @@ -23,6 +23,7 @@ import static org.assertj.core.api.Assertions.assertThat; import static org.mockito.Mockito.any; import static org.mockito.Mockito.atLeastOnce; +import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.spy; import static org.mockito.Mockito.verify; import static org.testng.Assert.assertEquals; @@ -75,10 +76,14 @@ import lombok.EqualsAndHashCode; import org.apache.avro.Schema.Parser; import org.apache.bookkeeper.common.concurrent.FutureUtils; +import org.apache.bookkeeper.mledger.ManagedLedgerFactory; import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl; import org.apache.bookkeeper.mledger.impl.cache.EntryCache; +import org.apache.bookkeeper.mledger.impl.cache.EntryCacheManager; import org.apache.commons.lang3.RandomUtils; +import org.apache.commons.lang3.reflect.FieldUtils; import org.apache.commons.lang3.tuple.Pair; +import org.apache.pulsar.broker.PulsarService; import org.apache.pulsar.broker.service.persistent.PersistentTopic; import org.apache.pulsar.client.admin.PulsarAdminException; import org.apache.pulsar.client.api.schema.GenericRecord; @@ -104,7 +109,6 @@ import org.apache.pulsar.common.schema.SchemaType; import org.apache.pulsar.common.util.FutureUtil; import org.awaitility.Awaitility; -import org.powermock.reflect.Whitebox; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.testng.Assert; @@ -1032,6 +1036,20 @@ public void testSendBigMessageSizeButCompressed() throws Exception { } + @Override + protected void beforePulsarStartMocks(PulsarService pulsar) throws Exception { + super.beforePulsarStartMocks(pulsar); + doAnswer(i0 -> { + ManagedLedgerFactory factory = (ManagedLedgerFactory) spy(i0.callRealMethod()); + doAnswer(i1 -> { + EntryCacheManager manager = (EntryCacheManager) spy(i1.callRealMethod()); + doAnswer(i2 -> spy(i2.callRealMethod())).when(manager).getEntryCache(any()); + return manager; + }).when(factory).getEntryCacheManager(); + return factory; + }).when(pulsar).getManagedLedgerFactory(); + } + /** * Usecase 1: Only 1 Active Subscription - 1 subscriber - Produce Messages - EntryCache should cache messages - * EntryCache should be cleaned : Once active subscription consumes messages @@ -1068,8 +1086,8 @@ public void testActiveAndInActiveConsumerEntryCacheBehavior() throws Exception { PersistentTopic topicRef = (PersistentTopic) pulsar.getBrokerService().getTopicReference(topic).get(); ManagedLedgerImpl ledger = (ManagedLedgerImpl) topicRef.getManagedLedger(); - EntryCache entryCache = spy((EntryCache) Whitebox.getInternalState(ledger, "entryCache")); - Whitebox.setInternalState(ledger, "entryCache", entryCache); + + EntryCache entryCache = (EntryCache) FieldUtils.readField(ledger, "entryCache", true); Message msg; // 2. Produce messages diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v1/V1_ProducerConsumerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v1/V1_ProducerConsumerTest.java index c241b6e6cc76c..4f9c9080646f7 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v1/V1_ProducerConsumerTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v1/V1_ProducerConsumerTest.java @@ -20,6 +20,7 @@ import static org.mockito.Mockito.any; import static org.mockito.Mockito.atLeastOnce; +import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.spy; import static org.mockito.Mockito.verify; import static org.testng.Assert.assertEquals; @@ -47,8 +48,12 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.Collectors; import lombok.Cleanup; +import org.apache.bookkeeper.mledger.ManagedLedgerFactory; import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl; import org.apache.bookkeeper.mledger.impl.cache.EntryCache; +import org.apache.bookkeeper.mledger.impl.cache.EntryCacheManager; +import org.apache.commons.lang3.reflect.FieldUtils; +import org.apache.pulsar.broker.PulsarService; import org.apache.pulsar.broker.service.persistent.PersistentTopic; import org.apache.pulsar.client.api.Consumer; import org.apache.pulsar.client.api.ConsumerBuilder; @@ -71,7 +76,6 @@ import org.apache.pulsar.common.protocol.Commands; import org.apache.pulsar.common.util.FutureUtil; import org.awaitility.Awaitility; -import org.powermock.reflect.Whitebox; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.testng.Assert; @@ -608,6 +612,20 @@ public void testSendBigMessageSize() throws Exception { } } + @Override + protected void beforePulsarStartMocks(PulsarService pulsar) throws Exception { + super.beforePulsarStartMocks(pulsar); + doAnswer(i0 -> { + ManagedLedgerFactory factory = (ManagedLedgerFactory) spy(i0.callRealMethod()); + doAnswer(i1 -> { + EntryCacheManager manager = (EntryCacheManager) spy(i1.callRealMethod()); + doAnswer(i2 -> spy(i2.callRealMethod())).when(manager).getEntryCache(any()); + return manager; + }).when(factory).getEntryCacheManager(); + return factory; + }).when(pulsar).getManagedLedgerFactory(); + } + /** * Usecase 1: Only 1 Active Subscription - 1 subscriber - Produce Messages - EntryCache should cache messages - * EntryCache should be cleaned : Once active subscription consumes messages @@ -648,10 +666,9 @@ public void testActiveAndInActiveConsumerEntryCacheBehavior() throws Exception { PersistentTopic topicRef = (PersistentTopic) pulsar.getBrokerService().getTopicReference(topic).get(); ManagedLedgerImpl ledger = (ManagedLedgerImpl) topicRef.getManagedLedger(); - EntryCache entryCache = spy((EntryCache) Whitebox.getInternalState(ledger, "entryCache")); - Whitebox.setInternalState(ledger, "entryCache", entryCache); + EntryCache entryCache = (EntryCache) FieldUtils.readField(ledger, "entryCache", true); - Messagemsg = null; + Message msg = null; // 2. Produce messages for (int i = 0; i < 30; i++) { String message = "my-message-" + i; diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/NegativeAcksTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/NegativeAcksTest.java index 25a2539582e06..c5d030380393a 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/NegativeAcksTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/NegativeAcksTest.java @@ -20,16 +20,11 @@ import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertNull; - -import java.lang.reflect.Field; -import java.util.HashMap; import java.util.HashSet; import java.util.Set; import java.util.concurrent.TimeUnit; - import lombok.Cleanup; import lombok.extern.slf4j.Slf4j; - import org.apache.pulsar.broker.BrokerTestUtil; import org.apache.pulsar.client.api.Consumer; import org.apache.pulsar.client.api.Message; @@ -38,7 +33,6 @@ import org.apache.pulsar.client.api.ProducerConsumerBase; import org.apache.pulsar.client.api.Schema; import org.apache.pulsar.client.api.SubscriptionType; -import org.powermock.reflect.Whitebox; import org.testng.annotations.AfterClass; import org.testng.annotations.BeforeClass; import org.testng.annotations.DataProvider; @@ -267,7 +261,7 @@ public void testNegativeAcksWithBackoff(boolean batching, boolean usePartitions, public void testNegativeAcksDeleteFromUnackedTracker() throws Exception { String topic = BrokerTestUtil.newUniqueName("testNegativeAcksDeleteFromUnackedTracker"); @Cleanup - Consumer consumer = pulsarClient.newConsumer(Schema.STRING) + ConsumerImpl consumer = (ConsumerImpl) pulsarClient.newConsumer(Schema.STRING) .topic(topic) .subscriptionName("sub1") .acknowledgmentGroupTime(0, TimeUnit.SECONDS) @@ -282,18 +276,15 @@ public void testNegativeAcksDeleteFromUnackedTracker() throws Exception { BatchMessageIdImpl batchMessageId2 = new BatchMessageIdImpl(3, 1, 0, 1); BatchMessageIdImpl batchMessageId3 = new BatchMessageIdImpl(3, 1, 0, 2); - UnAckedMessageTracker unAckedMessageTracker = ((ConsumerImpl) consumer).getUnAckedMessageTracker(); + UnAckedMessageTracker unAckedMessageTracker = consumer.getUnAckedMessageTracker(); unAckedMessageTracker.add(topicMessageId); - Field fieldNegativeAcksTracker = Whitebox.getField(ConsumerImpl.class, "negativeAcksTracker"); - NegativeAcksTracker negativeAcksTracker = (NegativeAcksTracker) fieldNegativeAcksTracker.get(((ConsumerImpl) consumer)); - Field fieldNackedMessages = Whitebox.getField(NegativeAcksTracker.class, "nackedMessages"); // negative topic message id consumer.negativeAcknowledge(topicMessageId); - HashMap nackedMessages = (HashMap)fieldNackedMessages.get(negativeAcksTracker); - assertEquals(nackedMessages.size(), 1); + NegativeAcksTracker negativeAcksTracker = consumer.getNegativeAcksTracker(); + assertEquals(negativeAcksTracker.getNackedMessagesCount().orElse(-1).intValue(), 1); assertEquals(unAckedMessageTracker.size(), 0); - nackedMessages.clear(); + negativeAcksTracker.close(); // negative batch message id unAckedMessageTracker.add(batchMessageId); unAckedMessageTracker.add(batchMessageId2); @@ -301,9 +292,9 @@ public void testNegativeAcksDeleteFromUnackedTracker() throws Exception { consumer.negativeAcknowledge(batchMessageId); consumer.negativeAcknowledge(batchMessageId2); consumer.negativeAcknowledge(batchMessageId3); - assertEquals(nackedMessages.size(), 1); + assertEquals(negativeAcksTracker.getNackedMessagesCount().orElse(-1).intValue(), 1); assertEquals(unAckedMessageTracker.size(), 0); - nackedMessages.clear(); + negativeAcksTracker.close(); } @Test(timeOut = 10000) diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java index 5e84a30e8676b..38251a7ad8659 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java @@ -22,6 +22,7 @@ import static org.apache.pulsar.common.protocol.Commands.DEFAULT_CONSUMER_EPOCH; import static org.apache.pulsar.common.protocol.Commands.hasChecksum; import static org.apache.pulsar.common.util.Runnables.catchingAndLoggingThrowables; +import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.ComparisonChain; import com.google.common.collect.Iterables; import com.scurrilous.circe.checksum.Crc32cIntChecksum; @@ -391,6 +392,11 @@ public UnAckedMessageTracker getUnAckedMessageTracker() { return unAckedMessageTracker; } + @VisibleForTesting + NegativeAcksTracker getNegativeAcksTracker() { + return negativeAcksTracker; + } + @Override public CompletableFuture unsubscribeAsync() { if (getState() == State.Closing || getState() == State.Closed) { diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/NegativeAcksTracker.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/NegativeAcksTracker.java index 6273f4d582e18..86121dd2c34c7 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/NegativeAcksTracker.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/NegativeAcksTracker.java @@ -19,11 +19,13 @@ package org.apache.pulsar.client.impl; import static org.apache.pulsar.client.impl.UnAckedMessageTracker.addChunkedMessageIdsAndRemoveFromSequenceMap; +import com.google.common.annotations.VisibleForTesting; import io.netty.util.Timeout; import io.netty.util.Timer; import java.io.Closeable; import java.util.HashMap; import java.util.HashSet; +import java.util.Optional; import java.util.Set; import java.util.concurrent.TimeUnit; import org.apache.pulsar.client.api.Message; @@ -123,6 +125,11 @@ private synchronized void add(MessageId messageId, int redeliveryCount) { } } + @VisibleForTesting + Optional getNackedMessagesCount() { + return Optional.ofNullable(nackedMessages).map(HashMap::size); + } + @Override public synchronized void close() { if (timeout != null && !timeout.isCancelled()) { diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java index b0e78273dc6b3..6f0b75fb54064 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java @@ -994,7 +994,7 @@ public EventLoopGroup eventLoopGroup() { } @VisibleForTesting - void setLookup(LookupService lookup) { + public void setLookup(LookupService lookup) { this.lookup = lookup; } diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/AvroSchemaTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/AvroSchemaTest.java index cc2a351956569..7f3d5bfd5d54a 100644 --- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/AvroSchemaTest.java +++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/AvroSchemaTest.java @@ -396,8 +396,6 @@ public void discardBufferIfBadAvroData() { // Because data does not conform to schema expect a crash Assert.assertThrows( SchemaSerializationException.class, () -> avroWriter.write(badNasaMissionData)); - // Get the buffered data using powermock - // Assert that the buffer position is reset to zero Assert.assertEquals(avroWriter.getEncoder().bytesBuffered(), 0); }