From b530184464ce0afe8b002c14ca62fbd96ea4dfcf Mon Sep 17 00:00:00 2001 From: congbo <39078850+congbobo184@users.noreply.github.com> Date: Mon, 17 Oct 2022 09:44:11 +0800 Subject: [PATCH] [fix][test] AdvertisedListenersTest.setup (#17869) --- .../mledger/impl/ManagedLedgerBkTest.java | 4 +- .../test/BookKeeperClusterTestCase.java | 9 ++- .../pulsar/broker/MultiBrokerBaseTest.java | 5 ++ .../loadbalance/AdvertisedListenersTest.java | 8 +-- .../SimpleProtocolHandlerTestsBase.java | 13 +++- .../worker/PulsarFunctionTlsTest.java | 11 ++- .../pulsar/common/util/PortManager.java | 69 +++++++++++++++++++ .../pulsar/common/util/PortManagerTest.java | 37 ++++++++++ .../test/BookKeeperClusterTestCase.java | 8 ++- .../SimpleProxyExtensionTestBase.java | 13 +++- 10 files changed, 158 insertions(+), 19 deletions(-) create mode 100644 pulsar-common/src/main/java/org/apache/pulsar/common/util/PortManager.java create mode 100644 pulsar-common/src/test/java/org/apache/pulsar/common/util/PortManagerTest.java diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerBkTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerBkTest.java index 3ad521eeb79ec..5c42e7f46bda5 100644 --- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerBkTest.java +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerBkTest.java @@ -18,6 +18,7 @@ */ package org.apache.bookkeeper.mledger.impl; +import static org.apache.pulsar.common.util.PortManager.releaseLockedPort; import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertNotNull; import static org.testng.Assert.assertTrue; @@ -117,7 +118,7 @@ public void testBookieFailure() throws Exception { metadataStore.unsetAlwaysFail(); bkc = new BookKeeperTestClient(baseClientConf); - startNewBookie(); + int port = startNewBookie(); // Reconnect a new bk client factory.shutdown(); @@ -147,6 +148,7 @@ public void testBookieFailure() throws Exception { assertEquals("entry-2", new String(entries.get(0).getData())); entries.forEach(Entry::release); factory.shutdown(); + releaseLockedPort(port); } @Test diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/test/BookKeeperClusterTestCase.java b/managed-ledger/src/test/java/org/apache/bookkeeper/test/BookKeeperClusterTestCase.java index e4f1d470bf6de..5518838be37a9 100644 --- a/managed-ledger/src/test/java/org/apache/bookkeeper/test/BookKeeperClusterTestCase.java +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/test/BookKeeperClusterTestCase.java @@ -24,6 +24,7 @@ package org.apache.bookkeeper.test; import static org.apache.bookkeeper.util.BookKeeperConstants.AVAILABLE_NODE; +import static org.apache.pulsar.common.util.PortManager.nextLockedFreePort; import static org.testng.Assert.assertFalse; import com.google.common.base.Stopwatch; @@ -62,7 +63,7 @@ import org.apache.bookkeeper.proto.BookieServer; import org.apache.bookkeeper.replication.Auditor; import org.apache.bookkeeper.replication.ReplicationWorker; -import org.apache.bookkeeper.util.PortManager; +import org.apache.pulsar.common.util.PortManager; import org.apache.pulsar.metadata.api.MetadataStoreConfig; import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended; import org.apache.pulsar.metadata.impl.FaultInjectionMetadataStore; @@ -113,6 +114,7 @@ public void handleTestMethodName(Method method) { private boolean isAutoRecoveryEnabled; protected ExecutorService executor; + private final List bookiePorts = new ArrayList<>(); SynchronousQueue asyncExceptions = new SynchronousQueue<>(); protected void captureThrowable(Runnable c) { @@ -264,7 +266,7 @@ protected void startBKCluster(String metadataServiceUri) throws Exception { // Create Bookie Servers (B1, B2, B3) for (int i = 0; i < numBookies; i++) { - startNewBookie(); + bookiePorts.add(startNewBookie()); } } @@ -283,6 +285,7 @@ protected void stopBKCluster() throws Exception { t.shutdown(); } servers.clear(); + bookiePorts.removeIf(PortManager::releaseLockedPort); } protected ServerConfiguration newServerConfiguration() throws Exception { @@ -290,7 +293,7 @@ protected ServerConfiguration newServerConfiguration() throws Exception { int port; if (baseConf.isEnableLocalTransport() || !baseConf.getAllowEphemeralPorts()) { - port = PortManager.nextFreePort(); + port = nextLockedFreePort(); } else { port = 0; } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/MultiBrokerBaseTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/MultiBrokerBaseTest.java index c00ae8cd0d39d..5b78a32dc37e5 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/MultiBrokerBaseTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/MultiBrokerBaseTest.java @@ -21,12 +21,14 @@ import java.util.ArrayList; import java.util.Collections; import java.util.List; + import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest; import org.apache.pulsar.client.admin.PulsarAdmin; import org.apache.pulsar.client.admin.PulsarAdminBuilder; import org.apache.pulsar.client.admin.PulsarAdminException; import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.api.PulsarClientException; +import org.apache.pulsar.common.util.PortManager; import org.apache.pulsar.metadata.api.MetadataStoreException; import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended; import org.apache.pulsar.metadata.impl.ZKMetadataStore; @@ -124,6 +126,9 @@ protected void additionalBrokersCleanup() { try { pulsarService.getConfiguration().setBrokerShutdownTimeoutMs(0L); pulsarService.close(); + pulsarService.getConfiguration().getBrokerServicePort().ifPresent(PortManager::releaseLockedPort); + pulsarService.getConfiguration().getWebServicePort().ifPresent(PortManager::releaseLockedPort); + pulsarService.getConfiguration().getWebServicePortTls().ifPresent(PortManager::releaseLockedPort); } catch (PulsarServerException e) { // ignore } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/AdvertisedListenersTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/AdvertisedListenersTest.java index 489efa5755ba8..9ca4510a209b4 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/AdvertisedListenersTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/AdvertisedListenersTest.java @@ -18,6 +18,7 @@ */ package org.apache.pulsar.broker.loadbalance; +import static org.apache.pulsar.common.util.PortManager.nextLockedFreePort; import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertNotNull; @@ -25,7 +26,6 @@ import java.util.Optional; import lombok.Cleanup; import lombok.extern.slf4j.Slf4j; -import org.apache.bookkeeper.util.PortManager; import org.apache.http.HttpEntity; import org.apache.http.HttpHeaders; import org.apache.http.client.methods.CloseableHttpResponse; @@ -66,9 +66,9 @@ protected ServiceConfiguration createConfForAdditionalBroker(int additionalBroke } private void updateConfig(ServiceConfiguration conf, String advertisedAddress) { - int pulsarPort = PortManager.nextFreePort(); - int httpPort = PortManager.nextFreePort(); - int httpsPort = PortManager.nextFreePort(); + int pulsarPort = nextLockedFreePort(); + int httpPort = nextLockedFreePort(); + int httpsPort = nextLockedFreePort(); // Use invalid domain name as identifier and instead make sure the advertised listeners work as intended conf.setAdvertisedAddress(advertisedAddress); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/protocol/SimpleProtocolHandlerTestsBase.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/protocol/SimpleProtocolHandlerTestsBase.java index 639ccf7ecd01b..f08771934f666 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/protocol/SimpleProtocolHandlerTestsBase.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/protocol/SimpleProtocolHandlerTestsBase.java @@ -22,12 +22,12 @@ import io.netty.channel.*; import io.netty.channel.socket.SocketChannel; import lombok.extern.slf4j.Slf4j; -import org.apache.bookkeeper.util.PortManager; import org.apache.commons.io.FileUtils; import org.apache.commons.io.IOUtils; import org.apache.pulsar.broker.ServiceConfiguration; import org.apache.pulsar.broker.service.BrokerService; import org.apache.pulsar.broker.service.BrokerTestBase; +import org.apache.pulsar.common.util.PortManager; import org.testng.annotations.AfterClass; import org.testng.annotations.BeforeClass; import org.testng.annotations.Test; @@ -39,11 +39,14 @@ import java.net.SocketAddress; import java.nio.charset.StandardCharsets; import java.nio.file.Files; +import java.util.ArrayList; import java.util.Collections; +import java.util.List; import java.util.Map; import java.util.zip.ZipEntry; import java.util.zip.ZipOutputStream; +import static org.apache.pulsar.common.util.PortManager.nextLockedFreePort; import static org.testng.Assert.assertEquals; @Slf4j @@ -54,6 +57,8 @@ public static final class MyProtocolHandler implements ProtocolHandler { private ServiceConfiguration conf; + private final List ports = new ArrayList<>(); + @Override public String protocolName() { return "test"; @@ -81,7 +86,9 @@ public void start(BrokerService service) { @Override public Map> newChannelInitializers() { - return Collections.singletonMap(new InetSocketAddress(conf.getBindAddress(), PortManager.nextFreePort()), + int port = nextLockedFreePort(); + this.ports.add(port); + return Collections.singletonMap(new InetSocketAddress(conf.getBindAddress(), port), new ChannelInitializer() { @Override protected void initChannel(SocketChannel socketChannel) throws Exception { @@ -106,7 +113,7 @@ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { @Override public void close() { - + ports.removeIf(PortManager::releaseLockedPort); } } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionTlsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionTlsTest.java index 3aaf914b9eb54..bfc9928a7dec7 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionTlsTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionTlsTest.java @@ -18,6 +18,7 @@ */ package org.apache.pulsar.functions.worker; +import static org.apache.pulsar.common.util.PortManager.nextLockedFreePort; import static org.testng.Assert.assertEquals; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; @@ -33,7 +34,6 @@ import java.util.Optional; import java.util.Set; import lombok.extern.slf4j.Slf4j; -import org.apache.bookkeeper.util.PortManager; import org.apache.pulsar.broker.PulsarService; import org.apache.pulsar.broker.ServiceConfiguration; import org.apache.pulsar.broker.authentication.AuthenticationProviderTls; @@ -44,6 +44,7 @@ import org.apache.pulsar.common.policies.data.TenantInfo; import org.apache.pulsar.common.util.ClassLoaderUtils; import org.apache.pulsar.common.util.ObjectMapperFactory; +import org.apache.pulsar.common.util.PortManager; import org.apache.pulsar.functions.api.utils.IdentityFunction; import org.apache.pulsar.functions.runtime.thread.ThreadRuntimeFactory; import org.apache.pulsar.functions.runtime.thread.ThreadRuntimeFactoryConfig; @@ -86,8 +87,8 @@ void setup() throws Exception { // start brokers for (int i = 0; i < BROKER_COUNT; i++) { - int brokerPort = PortManager.nextFreePort(); - int webPort = PortManager.nextFreePort(); + int brokerPort = nextLockedFreePort(); + int webPort = nextLockedFreePort(); ServiceConfiguration config = new ServiceConfiguration(); config.setBrokerShutdownTimeoutMs(0L); @@ -196,6 +197,10 @@ void tearDown() throws Exception { for (int i = 0; i < BROKER_COUNT; i++) { if (pulsarServices[i] != null) { pulsarServices[i].close(); + pulsarServices[i].getConfiguration(). + getBrokerServicePort().ifPresent(PortManager::releaseLockedPort); + pulsarServices[i].getConfiguration() + .getWebServicePort().ifPresent(PortManager::releaseLockedPort); } } bkEnsemble.stop(); diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/util/PortManager.java b/pulsar-common/src/main/java/org/apache/pulsar/common/util/PortManager.java new file mode 100644 index 0000000000000..b9df071fdbc7e --- /dev/null +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/util/PortManager.java @@ -0,0 +1,69 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.common.util; + +import java.net.ServerSocket; +import java.util.HashSet; +import java.util.Set; + +public class PortManager { + + private static final Set PORTS = new HashSet<>(); + + /** + * Return a locked available port. + * + * @return locked available port. + */ + public static synchronized int nextLockedFreePort() { + int exceptionCount = 0; + while (true) { + try (ServerSocket ss = new ServerSocket(0)) { + int port = ss.getLocalPort(); + if (!checkPortIfLocked(port)) { + PORTS.add(port); + return port; + } + } catch (Exception e) { + exceptionCount++; + if (exceptionCount > 100) { + throw new RuntimeException("Unable to allocate socket port", e); + } + } + } + } + + /** + * Returns whether the port was released successfully. + * + * @return whether the release is successful. + */ + public static synchronized boolean releaseLockedPort(int lockedPort) { + return PORTS.remove(lockedPort); + } + + /** + * Check port if locked. + * + * @return whether the port is locked. + */ + public static synchronized boolean checkPortIfLocked(int lockedPort) { + return PORTS.contains(lockedPort); + } +} diff --git a/pulsar-common/src/test/java/org/apache/pulsar/common/util/PortManagerTest.java b/pulsar-common/src/test/java/org/apache/pulsar/common/util/PortManagerTest.java new file mode 100644 index 0000000000000..88057ba943ab2 --- /dev/null +++ b/pulsar-common/src/test/java/org/apache/pulsar/common/util/PortManagerTest.java @@ -0,0 +1,37 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.common.util; + +import org.testng.annotations.Test; + +import static org.apache.pulsar.common.util.PortManager.checkPortIfLocked; +import static org.apache.pulsar.common.util.PortManager.nextLockedFreePort; +import static org.apache.pulsar.common.util.PortManager.releaseLockedPort; +import static org.testng.Assert.assertFalse; +import static org.testng.Assert.assertTrue; + +public class PortManagerTest { + @Test + public void testCheckPortIfLockedAndRemove() { + int port = nextLockedFreePort(); + assertTrue(checkPortIfLocked(port)); + assertTrue(releaseLockedPort(port)); + assertFalse(checkPortIfLocked(port)); + } +} diff --git a/pulsar-package-management/bookkeeper-storage/src/test/java/org/apache/pulsar/packages/management/storage/bookkeeper/bookkeeper/test/BookKeeperClusterTestCase.java b/pulsar-package-management/bookkeeper-storage/src/test/java/org/apache/pulsar/packages/management/storage/bookkeeper/bookkeeper/test/BookKeeperClusterTestCase.java index 7921e784f60e7..327562c77de4c 100644 --- a/pulsar-package-management/bookkeeper-storage/src/test/java/org/apache/pulsar/packages/management/storage/bookkeeper/bookkeeper/test/BookKeeperClusterTestCase.java +++ b/pulsar-package-management/bookkeeper-storage/src/test/java/org/apache/pulsar/packages/management/storage/bookkeeper/bookkeeper/test/BookKeeperClusterTestCase.java @@ -27,6 +27,7 @@ import static org.apache.bookkeeper.bookie.BookKeeperServerStats.LD_INDEX_SCOPE; import static org.apache.bookkeeper.bookie.BookKeeperServerStats.LD_LEDGER_SCOPE; import static org.apache.bookkeeper.util.BookKeeperConstants.AVAILABLE_NODE; +import static org.apache.pulsar.common.util.PortManager.nextLockedFreePort; import static org.testng.Assert.assertFalse; import com.google.common.base.Stopwatch; import java.io.File; @@ -83,7 +84,7 @@ import org.apache.bookkeeper.test.ZooKeeperCluster; import org.apache.bookkeeper.test.ZooKeeperClusterUtil; import org.apache.bookkeeper.util.DiskChecker; -import org.apache.bookkeeper.util.PortManager; +import org.apache.pulsar.common.util.PortManager; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.ZooKeeper; import org.slf4j.Logger; @@ -129,6 +130,8 @@ public void handleTestMethodName(Method method) { private boolean isAutoRecoveryEnabled; + private final List bookiePorts = new ArrayList<>(); + SynchronousQueue asyncExceptions = new SynchronousQueue<>(); protected void captureThrowable(Runnable c) { try { @@ -282,6 +285,7 @@ protected void stopBKCluster() throws Exception { t.shutdown(); } servers.clear(); + bookiePorts.removeIf(PortManager::releaseLockedPort); } protected ServerConfiguration newServerConfiguration() throws Exception { @@ -289,7 +293,7 @@ protected ServerConfiguration newServerConfiguration() throws Exception { int port; if (baseConf.isEnableLocalTransport() || !baseConf.getAllowEphemeralPorts()) { - port = PortManager.nextFreePort(); + port = nextLockedFreePort(); } else { port = 0; } diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/extensions/SimpleProxyExtensionTestBase.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/extensions/SimpleProxyExtensionTestBase.java index 0cc4bbb6bb500..c779acb6ebe90 100644 --- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/extensions/SimpleProxyExtensionTestBase.java +++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/extensions/SimpleProxyExtensionTestBase.java @@ -22,12 +22,12 @@ import io.netty.channel.*; import io.netty.channel.socket.SocketChannel; import lombok.extern.slf4j.Slf4j; -import org.apache.bookkeeper.util.PortManager; import org.apache.commons.io.FileUtils; import org.apache.commons.io.IOUtils; import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest; import org.apache.pulsar.broker.authentication.AuthenticationService; import org.apache.pulsar.common.configuration.PulsarConfigurationLoader; +import org.apache.pulsar.common.util.PortManager; import org.apache.pulsar.metadata.impl.ZKMetadataStore; import org.apache.pulsar.proxy.server.ProxyConfiguration; import org.apache.pulsar.proxy.server.ProxyService; @@ -43,12 +43,15 @@ import java.net.SocketAddress; import java.nio.charset.StandardCharsets; import java.nio.file.Files; +import java.util.ArrayList; import java.util.Collections; +import java.util.List; import java.util.Map; import java.util.Optional; import java.util.zip.ZipEntry; import java.util.zip.ZipOutputStream; +import static org.apache.pulsar.common.util.PortManager.nextLockedFreePort; import static org.mockito.Mockito.doReturn; import static org.testng.Assert.assertEquals; @@ -60,6 +63,8 @@ public static final class MyProxyExtension implements ProxyExtension { private ProxyConfiguration conf; + private final List ports = new ArrayList<>(); + @Override public String extensionName() { return "test"; @@ -81,7 +86,9 @@ public void start(ProxyService service) { @Override public Map> newChannelInitializers() { - return Collections.singletonMap(new InetSocketAddress(conf.getBindAddress(), PortManager.nextFreePort()), + int port = nextLockedFreePort(); + this.ports.add(port); + return Collections.singletonMap(new InetSocketAddress(conf.getBindAddress(), port), new ChannelInitializer() { @Override protected void initChannel(SocketChannel socketChannel) throws Exception { @@ -106,7 +113,7 @@ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { @Override public void close() { - + ports.removeIf(PortManager::releaseLockedPort); } }