From 93d3c5ef52ec65fcec5bd92c872726ae827aacbf Mon Sep 17 00:00:00 2001 From: Rajan Dhabalia Date: Mon, 20 May 2019 10:45:04 -0700 Subject: [PATCH] [pulsar-common] add option that field can be empty (#3543) Revert "[pulsar-common] add option that field can be empty" This reverts commit e1e62bbaf0448867d9ab6af1efb23a46e4de947a. set optional param fix tests fix configs fix test fix test --- .../ProxySaslAuthenticationTest.java | 5 +- .../pulsar/broker/ServiceConfiguration.java | 18 +++--- .../PulsarConfigurationLoaderTest.java | 16 +++-- .../pulsar/broker/SLAMonitoringTest.java | 7 ++- .../pulsar/broker/admin/AdminApiTest.java | 4 +- .../broker/admin/AdminApiTlsAuthTest.java | 26 ++++---- .../admin/BrokerAdminClientTlsAuthTest.java | 22 ++++--- .../broker/admin/v1/V1_AdminApiTest.java | 4 +- .../auth/MockedPulsarServiceBaseTest.java | 7 ++- .../AntiAffinityNamespaceGroupTest.java | 9 +-- .../broker/loadbalance/LoadBalancerTest.java | 61 ++++++++++--------- .../ModularLoadManagerImplTest.java | 42 +++++++------ .../SimpleLoadManagerImplTest.java | 8 +-- .../broker/service/AdvertisedAddressTest.java | 6 +- .../service/BacklogQuotaManagerTest.java | 5 +- .../service/BrokerBkEnsemblesTests.java | 5 +- .../broker/service/BrokerServiceTest.java | 16 ++--- .../broker/service/MaxMessageSizeTest.java | 6 +- .../broker/service/ReplicatorTestBase.java | 25 ++++---- .../pulsar/broker/web/WebServiceTest.java | 5 +- .../AuthenticatedProducerConsumerTest.java | 5 +- ...enticationTlsHostnameVerificationTest.java | 9 +-- .../client/api/BrokerServiceLookupTest.java | 50 +++++++-------- .../client/api/NonPersistentTopicTest.java | 13 ++-- .../client/api/ServiceUrlProviderTest.java | 12 ++-- .../client/api/TlsProducerConsumerBase.java | 5 +- .../service/web/DiscoveryServiceWebTest.java | 3 +- .../worker/PulsarFunctionE2ESecurityTest.java | 4 +- .../worker/PulsarFunctionPublishTest.java | 8 +-- .../worker/PulsarFunctionStateTest.java | 8 +-- .../worker/PulsarWorkerAssignmentTest.java | 4 +- .../pulsar/io/PulsarFunctionAdminTest.java | 8 +-- .../pulsar/io/PulsarFunctionE2ETest.java | 8 +-- .../proxy/ProxyAuthenticationTest.java | 3 +- .../proxy/ProxyAuthorizationTest.java | 3 +- .../proxy/ProxyConfigurationTest.java | 4 +- .../proxy/ProxyPublishConsumeTest.java | 3 +- .../proxy/ProxyPublishConsumeTlsTest.java | 5 +- .../ProxyPublishConsumeWithoutZKTest.java | 3 +- .../proxy/v1/V1_ProxyAuthenticationTest.java | 3 +- .../pulsar/common/util/FieldParser.java | 59 +++++++++++++++--- .../service/server/ServiceConfig.java | 24 ++++---- .../service/BaseDiscoveryTestSetup.java | 5 +- .../service/web/DiscoveryServiceWebTest.java | 7 ++- .../proxy/server/ProxyConfiguration.java | 16 ++--- .../server/AuthedAdminProxyHandlerTest.java | 14 +++-- ...roxyAuthenticatedProducerConsumerTest.java | 13 ++-- .../proxy/server/ProxyAuthenticationTest.java | 5 +- .../server/ProxyConnectionThrottlingTest.java | 4 +- .../server/ProxyForwardAuthDataTest.java | 5 +- .../server/ProxyLookupThrottlingTest.java | 4 +- .../pulsar/proxy/server/ProxyParserTest.java | 2 +- .../server/ProxyRolesEnforcementTest.java | 5 +- .../apache/pulsar/proxy/server/ProxyTest.java | 3 +- .../pulsar/proxy/server/ProxyTlsTest.java | 9 +-- .../server/ProxyWithAuthorizationNegTest.java | 13 ++-- .../server/ProxyWithAuthorizationTest.java | 21 ++++--- .../ProxyWithoutServiceDiscoveryTest.java | 13 ++-- .../SuperUserAuthedAdminProxyHandlerTest.java | 14 +++-- .../server/UnauthedAdminProxyHandlerTest.java | 5 +- .../service/WebSocketProxyConfiguration.java | 12 ++-- 61 files changed, 412 insertions(+), 299 deletions(-) diff --git a/pulsar-broker-auth-sasl/src/test/java/org/apache/pulsar/broker/authentication/ProxySaslAuthenticationTest.java b/pulsar-broker-auth-sasl/src/test/java/org/apache/pulsar/broker/authentication/ProxySaslAuthenticationTest.java index 2d9d51d3a7329..544ce887b66be 100644 --- a/pulsar-broker-auth-sasl/src/test/java/org/apache/pulsar/broker/authentication/ProxySaslAuthenticationTest.java +++ b/pulsar-broker-auth-sasl/src/test/java/org/apache/pulsar/broker/authentication/ProxySaslAuthenticationTest.java @@ -24,6 +24,7 @@ import java.nio.file.Files; import java.util.HashSet; import java.util.Map; +import java.util.Optional; import java.util.Properties; import java.util.Set; import java.util.concurrent.TimeUnit; @@ -225,8 +226,8 @@ void testAuthentication() throws Exception { ProxyConfiguration proxyConfig = new ProxyConfiguration(); proxyConfig.setAuthenticationEnabled(true); - proxyConfig.setServicePort(servicePort); - proxyConfig.setWebServicePort(webServicePort); + proxyConfig.setServicePort(Optional.of(servicePort)); + proxyConfig.setWebServicePort(Optional.of(webServicePort)); proxyConfig.setBrokerServiceURL("pulsar://localhost:" + BROKER_PORT); proxyConfig.setSaslJaasClientAllowedIds(".*" + localHostname + ".*"); proxyConfig.setSaslJaasServerSectionName("PulsarProxy"); diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java index f047703942258..60d5a9cfa3920 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java @@ -109,22 +109,22 @@ public class ServiceConfiguration implements PulsarConfiguration { category = CATEGORY_SERVER, doc = "The port for serving binary protobuf requests" ) - private Integer brokerServicePort = 6650; + private Optional brokerServicePort = Optional.of(6650); @FieldContext( category = CATEGORY_SERVER, doc = "The port for serving tls secured binary protobuf requests" ) - private Integer brokerServicePortTls = null; + private Optional brokerServicePortTls = Optional.empty(); @FieldContext( category = CATEGORY_SERVER, doc = "The port for serving http requests" ) - private Integer webServicePort = 8080; + private Optional webServicePort = Optional.of(8080); @FieldContext( category = CATEGORY_SERVER, doc = "The port for serving https requests" ) - private Integer webServicePortTls = null; + private Optional webServicePortTls = Optional.empty(); @FieldContext( category = CATEGORY_SERVER, @@ -1274,18 +1274,18 @@ public int getBookkeeperHealthCheckIntervalSec() { } public Optional getBrokerServicePort() { - return Optional.ofNullable(brokerServicePort); + return brokerServicePort; } public Optional getBrokerServicePortTls() { - return Optional.ofNullable(brokerServicePortTls); + return brokerServicePortTls; } public Optional getWebServicePort() { - return Optional.ofNullable(webServicePort); + return webServicePort; } public Optional getWebServicePortTls() { - return Optional.ofNullable(webServicePortTls); + return webServicePortTls; } -} +} \ No newline at end of file diff --git a/pulsar-broker-common/src/test/java/org/apache/pulsar/common/configuration/PulsarConfigurationLoaderTest.java b/pulsar-broker-common/src/test/java/org/apache/pulsar/common/configuration/PulsarConfigurationLoaderTest.java index feff2acb94e51..9e0bfa94c22b6 100644 --- a/pulsar-broker-common/src/test/java/org/apache/pulsar/common/configuration/PulsarConfigurationLoaderTest.java +++ b/pulsar-broker-common/src/test/java/org/apache/pulsar/common/configuration/PulsarConfigurationLoaderTest.java @@ -20,6 +20,7 @@ import static org.apache.pulsar.common.configuration.PulsarConfigurationLoader.isComplete; import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertFalse; import static org.testng.Assert.assertNotNull; import static org.testng.Assert.assertTrue; import static org.testng.Assert.fail; @@ -32,6 +33,7 @@ import java.io.InputStream; import java.io.OutputStreamWriter; import java.io.PrintWriter; +import java.util.Optional; import java.util.Properties; import org.apache.bookkeeper.client.api.DigestType; @@ -44,10 +46,10 @@ public class MockConfiguration implements PulsarConfiguration { private String zookeeperServers = "localhost:2181"; private String configurationStoreServers = "localhost:2184"; - private int brokerServicePort = 7650; - private int brokerServicePortTls = 7651; - private int webServicePort = 9080; - private int webServicePortTls = 9443; + private Optional brokerServicePort = Optional.of(7650); + private Optional brokerServicePortTls = Optional.of(7651); + private Optional webServicePort = Optional.of(9080); + private Optional webServicePortTls = Optional.of(9443); private int notExistFieldInServiceConfig = 0; @Override @@ -102,6 +104,9 @@ public void testPulsarConfiguraitonLoadingStream() throws Exception { printWriter.println("brokerClientAuthenticationParameters=role:my-role"); printWriter.println("superUserRoles=appid1,appid2"); printWriter.println("brokerServicePort=7777"); + printWriter.println("brokerServicePortTls=8777"); + printWriter.println("webServicePort="); + printWriter.println("webServicePortTls="); printWriter.println("managedLedgerDefaultMarkDeleteRateLimit=5.0"); printWriter.println("managedLedgerDigestType=CRC32C"); printWriter.println("managedLedgerCacheSizeMB="); @@ -116,6 +121,9 @@ public void testPulsarConfiguraitonLoadingStream() throws Exception { assertEquals(serviceConfig.getClusterName(), "usc"); assertEquals(serviceConfig.getBrokerClientAuthenticationParameters(), "role:my-role"); assertEquals(serviceConfig.getBrokerServicePort().get(), new Integer(7777)); + assertEquals(serviceConfig.getBrokerServicePortTls().get(), new Integer(8777)); + assertFalse(serviceConfig.getWebServicePort().isPresent()); + assertFalse(serviceConfig.getWebServicePortTls().isPresent()); assertEquals(serviceConfig.getManagedLedgerDigestType(), DigestType.CRC32C); assertTrue(serviceConfig.getManagedLedgerCacheSizeMB() > 0); } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/SLAMonitoringTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/SLAMonitoringTest.java index cddf3abb5524e..6b56a29555d87 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/SLAMonitoringTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/SLAMonitoringTest.java @@ -28,6 +28,7 @@ import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.Set; import java.util.concurrent.ExecutorService; import java.util.concurrent.LinkedBlockingQueue; @@ -81,12 +82,12 @@ void setup() throws Exception { brokerNativeBrokerPorts[i] = PortManager.nextFreePort(); ServiceConfiguration config = new ServiceConfiguration(); - config.setBrokerServicePort(brokerNativeBrokerPorts[i]); + config.setBrokerServicePort(Optional.ofNullable(brokerNativeBrokerPorts[i])); config.setClusterName("my-cluster"); config.setAdvertisedAddress("localhost"); - config.setWebServicePort(brokerWebServicePorts[i]); + config.setWebServicePort(Optional.ofNullable(brokerWebServicePorts[i])); config.setZookeeperServers("127.0.0.1" + ":" + ZOOKEEPER_PORT); - config.setBrokerServicePort(brokerNativeBrokerPorts[i]); + config.setBrokerServicePort(Optional.ofNullable(brokerNativeBrokerPorts[i])); config.setDefaultNumberOfNamespaceBundles(1); config.setLoadBalancerEnabled(false); configurations[i] = config; diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java index c8ee9caaf9722..16a34671b4746 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java @@ -140,8 +140,8 @@ public class AdminApiTest extends MockedPulsarServiceBaseTest { @Override public void setup() throws Exception { conf.setLoadBalancerEnabled(true); - conf.setBrokerServicePortTls(BROKER_PORT_TLS); - conf.setWebServicePortTls(BROKER_WEBSERVICE_PORT_TLS); + conf.setBrokerServicePortTls(Optional.of(BROKER_PORT_TLS)); + conf.setWebServicePortTls(Optional.of(BROKER_WEBSERVICE_PORT_TLS)); conf.setTlsCertificateFilePath(TLS_SERVER_CERT_FILE_PATH); conf.setTlsKeyFilePath(TLS_SERVER_KEY_FILE_PATH); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTlsAuthTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTlsAuthTest.java index e6623e5db6224..6a5302e553095 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTlsAuthTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTlsAuthTest.java @@ -19,6 +19,18 @@ package org.apache.pulsar.broker.admin; import com.google.common.collect.ImmutableSet; + +import java.util.List; +import java.util.Optional; +import java.security.cert.X509Certificate; +import javax.net.ssl.SSLContext; +import javax.ws.rs.NotAuthorizedException; +import javax.ws.rs.client.Client; +import javax.ws.rs.client.ClientBuilder; +import javax.ws.rs.client.WebTarget; +import javax.ws.rs.core.GenericType; +import javax.ws.rs.core.MediaType; + import lombok.extern.slf4j.Slf4j; import org.apache.http.conn.ssl.NoopHostnameVerifier; import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest; @@ -41,16 +53,6 @@ import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; -import javax.net.ssl.SSLContext; -import javax.ws.rs.NotAuthorizedException; -import javax.ws.rs.client.Client; -import javax.ws.rs.client.ClientBuilder; -import javax.ws.rs.client.WebTarget; -import javax.ws.rs.core.GenericType; -import javax.ws.rs.core.MediaType; -import java.security.cert.X509Certificate; -import java.util.List; - @Slf4j public class AdminApiTlsAuthTest extends MockedPulsarServiceBaseTest { @@ -62,8 +64,8 @@ private static String getTLSFile(String name) { @Override public void setup() throws Exception { conf.setLoadBalancerEnabled(true); - conf.setBrokerServicePortTls(BROKER_PORT_TLS); - conf.setWebServicePortTls(BROKER_WEBSERVICE_PORT_TLS); + conf.setBrokerServicePortTls(Optional.of(BROKER_PORT_TLS)); + conf.setWebServicePortTls(Optional.of(BROKER_WEBSERVICE_PORT_TLS)); conf.setTlsCertificateFilePath(getTLSFile("broker.cert")); conf.setTlsKeyFilePath(getTLSFile("broker.key-pk8")); conf.setTlsTrustCertsFilePath(getTLSFile("ca.cert")); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/BrokerAdminClientTlsAuthTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/BrokerAdminClientTlsAuthTest.java index a9cea1f157231..7cd234115dfe2 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/BrokerAdminClientTlsAuthTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/BrokerAdminClientTlsAuthTest.java @@ -19,6 +19,12 @@ package org.apache.pulsar.broker.admin; import com.google.common.collect.ImmutableSet; + +import java.util.Optional; + +import static org.testng.Assert.fail; + +import java.lang.reflect.Method; import lombok.extern.slf4j.Slf4j; import org.apache.bookkeeper.test.PortManager; import org.apache.pulsar.broker.PulsarService; @@ -35,10 +41,6 @@ import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; -import java.lang.reflect.Method; - -import static org.testng.Assert.fail; - @Slf4j public class BrokerAdminClientTlsAuthTest extends MockedPulsarServiceBaseTest { protected String methodName; @@ -55,8 +57,8 @@ private static String getTLSFile(String name) { @BeforeMethod @Override public void setup() throws Exception { - conf.setBrokerServicePortTls(BROKER_PORT_TLS); - conf.setWebServicePortTls(BROKER_WEBSERVICE_PORT_TLS); + conf.setBrokerServicePortTls(Optional.of(BROKER_PORT_TLS)); + conf.setWebServicePortTls(Optional.of(BROKER_WEBSERVICE_PORT_TLS)); buildConf(conf); super.internalSetup(); } @@ -109,10 +111,10 @@ public void testPersistentList() throws Exception { /***** Start Broker 2 ******/ ServiceConfiguration conf = new ServiceConfiguration(); - conf.setBrokerServicePort(PortManager.nextFreePort()); - conf.setBrokerServicePortTls(PortManager.nextFreePort()); - conf.setWebServicePort(PortManager.nextFreePort()); - conf.setWebServicePortTls(PortManager.nextFreePort()); + conf.setBrokerServicePort(Optional.ofNullable(PortManager.nextFreePort())); + conf.setBrokerServicePortTls(Optional.ofNullable(PortManager.nextFreePort())); + conf.setWebServicePort(Optional.ofNullable(PortManager.nextFreePort())); + conf.setWebServicePortTls(Optional.ofNullable(PortManager.nextFreePort())); conf.setAdvertisedAddress("localhost"); conf.setClusterName(this.conf.getClusterName()); conf.setZookeeperServers("localhost:2181"); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v1/V1_AdminApiTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v1/V1_AdminApiTest.java index 42ea2ce995dee..3d32716664386 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v1/V1_AdminApiTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v1/V1_AdminApiTest.java @@ -139,8 +139,8 @@ public class V1_AdminApiTest extends MockedPulsarServiceBaseTest { @Override public void setup() throws Exception { conf.setLoadBalancerEnabled(true); - conf.setBrokerServicePortTls(BROKER_PORT_TLS); - conf.setWebServicePortTls(BROKER_WEBSERVICE_PORT_TLS); + conf.setBrokerServicePortTls(Optional.of(BROKER_PORT_TLS)); + conf.setWebServicePortTls(Optional.of(BROKER_WEBSERVICE_PORT_TLS)); conf.setTlsCertificateFilePath(TLS_SERVER_CERT_FILE_PATH); conf.setTlsKeyFilePath(TLS_SERVER_KEY_FILE_PATH); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java index e65ce0f548dc8..b987503704bfa 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java @@ -30,6 +30,7 @@ import java.net.URL; import java.util.ArrayList; import java.util.List; +import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -91,8 +92,10 @@ public MockedPulsarServiceBaseTest() { protected void resetConfig() { this.conf = new ServiceConfiguration(); - this.conf.setBrokerServicePort(BROKER_PORT); - this.conf.setWebServicePort(BROKER_WEBSERVICE_PORT); + this.conf.setAdvertisedAddress("localhost"); + this.conf.setBrokerServicePort(Optional.ofNullable(BROKER_PORT)); + this.conf.setAdvertisedAddress("localhost"); + this.conf.setWebServicePort(Optional.ofNullable(BROKER_WEBSERVICE_PORT)); this.conf.setClusterName(configClusterName); this.conf.setAdvertisedAddress("localhost"); // there are TLS tests in here, they need to use localhost because of the certificate this.conf.setManagedLedgerCacheSizeMB(8); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/AntiAffinityNamespaceGroupTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/AntiAffinityNamespaceGroupTest.java index befc46da0dcf6..1672bdf6c819d 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/AntiAffinityNamespaceGroupTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/AntiAffinityNamespaceGroupTest.java @@ -26,6 +26,7 @@ import java.lang.reflect.Field; import java.net.URL; import java.util.Map; +import java.util.Optional; import java.util.Set; import java.util.concurrent.ExecutorService; import java.util.concurrent.LinkedBlockingQueue; @@ -119,9 +120,9 @@ void setup() throws Exception { ServiceConfiguration config1 = new ServiceConfiguration(); config1.setLoadManagerClassName(ModularLoadManagerImpl.class.getName()); config1.setClusterName("use"); - config1.setWebServicePort(PRIMARY_BROKER_WEBSERVICE_PORT); + config1.setWebServicePort(Optional.ofNullable(PRIMARY_BROKER_WEBSERVICE_PORT)); config1.setZookeeperServers("127.0.0.1" + ":" + ZOOKEEPER_PORT); - config1.setBrokerServicePort(PRIMARY_BROKER_PORT); + config1.setBrokerServicePort(Optional.ofNullable(PRIMARY_BROKER_PORT)); config1.setFailureDomainsEnabled(true); config1.setLoadBalancerEnabled(true); config1.setAdvertisedAddress("localhost"); @@ -138,9 +139,9 @@ void setup() throws Exception { ServiceConfiguration config2 = new ServiceConfiguration(); config2.setLoadManagerClassName(ModularLoadManagerImpl.class.getName()); config2.setClusterName("use"); - config2.setWebServicePort(SECONDARY_BROKER_WEBSERVICE_PORT); + config2.setWebServicePort(Optional.ofNullable(SECONDARY_BROKER_WEBSERVICE_PORT)); config2.setZookeeperServers("127.0.0.1" + ":" + ZOOKEEPER_PORT); - config2.setBrokerServicePort(SECONDARY_BROKER_PORT); + config2.setBrokerServicePort(Optional.ofNullable(SECONDARY_BROKER_PORT)); config2.setFailureDomainsEnabled(true); pulsar2 = new PulsarService(config2); secondaryHost = String.format("%s:%d", "localhost", diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/LoadBalancerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/LoadBalancerTest.java index 3b5c5779e82d9..3c15a8d311114 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/LoadBalancerTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/LoadBalancerTest.java @@ -23,6 +23,34 @@ import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.common.collect.Sets; +import static org.apache.pulsar.broker.cache.ConfigurationCacheService.POLICIES; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertTrue; +import static org.testng.Assert.fail; + +import java.lang.reflect.Field; +import java.net.InetAddress; +import java.net.URL; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.TreeMap; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; +import java.util.concurrent.atomic.AtomicReference; + import org.apache.bookkeeper.test.PortManager; import org.apache.bookkeeper.util.ZkUtils; import org.apache.pulsar.broker.PulsarService; @@ -63,32 +91,6 @@ import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; -import java.lang.reflect.Field; -import java.net.URL; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.TreeMap; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.ThreadPoolExecutor; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; -import java.util.concurrent.atomic.AtomicReference; - -import static org.apache.pulsar.broker.cache.ConfigurationCacheService.POLICIES; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.never; -import static org.mockito.Mockito.times; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.when; -import static org.testng.Assert.assertEquals; -import static org.testng.Assert.assertTrue; -import static org.testng.Assert.fail; - /** * Start two brokers in the same cluster and have them connect to the same zookeeper. When the PulsarService starts, it * will do the leader election and one of the brokers will become the leader. Then kill that broker and check if the @@ -132,12 +134,13 @@ void setup() throws Exception { brokerNativeBrokerPorts[i] = PortManager.nextFreePort(); ServiceConfiguration config = new ServiceConfiguration(); - config.setBrokerServicePort(brokerNativeBrokerPorts[i]); + config.setBrokerServicePort(Optional.ofNullable(brokerNativeBrokerPorts[i])); config.setClusterName("use"); config.setAdvertisedAddress(localhost); - config.setWebServicePort(brokerWebServicePorts[i]); + config.setAdvertisedAddress("localhost"); + config.setWebServicePort(Optional.ofNullable(brokerWebServicePorts[i])); config.setZookeeperServers("127.0.0.1" + ":" + ZOOKEEPER_PORT); - config.setBrokerServicePort(brokerNativeBrokerPorts[i]); + config.setBrokerServicePort(Optional.ofNullable(brokerNativeBrokerPorts[i])); config.setLoadManagerClassName(SimpleLoadManagerImpl.class.getName()); config.setAdvertisedAddress(localhost+i); config.setLoadBalancerEnabled(false); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/ModularLoadManagerImplTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/ModularLoadManagerImplTest.java index 6d25b7dfc67ec..50b7229e86438 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/ModularLoadManagerImplTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/ModularLoadManagerImplTest.java @@ -23,6 +23,22 @@ import com.google.common.collect.Range; import com.google.common.collect.Sets; import com.google.common.hash.Hashing; + +import java.lang.reflect.Field; +import java.lang.reflect.Method; +import java.net.InetAddress; +import java.net.URL; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Supplier; + import org.apache.bookkeeper.test.PortManager; import org.apache.bookkeeper.util.ZkUtils; import org.apache.pulsar.broker.BrokerData; @@ -61,19 +77,6 @@ import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; -import java.lang.reflect.Field; -import java.lang.reflect.Method; -import java.net.URL; -import java.util.Map; -import java.util.Set; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.ThreadPoolExecutor; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicReference; -import java.util.function.Supplier; - import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.spy; import static org.mockito.Mockito.verify; @@ -151,10 +154,11 @@ void setup() throws Exception { ServiceConfiguration config1 = new ServiceConfiguration(); config1.setLoadManagerClassName(ModularLoadManagerImpl.class.getName()); config1.setClusterName("use"); - config1.setWebServicePort(PRIMARY_BROKER_WEBSERVICE_PORT); + config1.setWebServicePort(Optional.ofNullable(PRIMARY_BROKER_WEBSERVICE_PORT)); config1.setZookeeperServers("127.0.0.1" + ":" + ZOOKEEPER_PORT); - config1.setBrokerServicePort(PRIMARY_BROKER_PORT); + config1.setAdvertisedAddress("localhost"); + config1.setBrokerServicePort(Optional.ofNullable(PRIMARY_BROKER_PORT)); pulsar1 = new PulsarService(config1); pulsar1.start(); @@ -167,10 +171,10 @@ void setup() throws Exception { ServiceConfiguration config2 = new ServiceConfiguration(); config2.setLoadManagerClassName(ModularLoadManagerImpl.class.getName()); config2.setClusterName("use"); - config2.setWebServicePort(SECONDARY_BROKER_WEBSERVICE_PORT); + config2.setWebServicePort(Optional.ofNullable(SECONDARY_BROKER_WEBSERVICE_PORT)); config2.setZookeeperServers("127.0.0.1" + ":" + ZOOKEEPER_PORT); - config2.setBrokerServicePort(SECONDARY_BROKER_PORT); config2.setAdvertisedAddress("localhost"); + config2.setBrokerServicePort(Optional.ofNullable(SECONDARY_BROKER_PORT)); pulsar2 = new PulsarService(config2); secondaryHost = String.format("%s:%d", "localhost", SECONDARY_BROKER_WEBSERVICE_PORT); @@ -599,9 +603,9 @@ public void testOwnBrokerZnodeByMultipleBroker() throws Exception { config.setClusterName("use"); int brokerWebServicePort = PortManager.nextFreePort(); int brokerServicePort = PortManager.nextFreePort(); - config.setWebServicePort(brokerWebServicePort); + config.setWebServicePort(Optional.ofNullable(brokerWebServicePort)); config.setZookeeperServers("127.0.0.1" + ":" + ZOOKEEPER_PORT); - config.setBrokerServicePort(brokerServicePort); + config.setBrokerServicePort(Optional.ofNullable(brokerServicePort)); PulsarService pulsar = new PulsarService(config); // create znode using different zk-session final String brokerZnode = LoadManager.LOADBALANCE_BROKERS_ROOT + "/" + pulsar.getAdvertisedAddress() + ":" diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/SimpleLoadManagerImplTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/SimpleLoadManagerImplTest.java index aaa60794e7793..69e2864c704f5 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/SimpleLoadManagerImplTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/SimpleLoadManagerImplTest.java @@ -133,9 +133,9 @@ void setup() throws Exception { // Start broker 1 ServiceConfiguration config1 = spy(new ServiceConfiguration()); config1.setClusterName("use"); - config1.setWebServicePort(PRIMARY_BROKER_WEBSERVICE_PORT); + config1.setWebServicePort(Optional.ofNullable(PRIMARY_BROKER_WEBSERVICE_PORT)); config1.setZookeeperServers("127.0.0.1" + ":" + ZOOKEEPER_PORT); - config1.setBrokerServicePort(PRIMARY_BROKER_PORT); + config1.setBrokerServicePort(Optional.ofNullable(PRIMARY_BROKER_PORT)); config1.setLoadManagerClassName(SimpleLoadManagerImpl.class.getName()); pulsar1 = new PulsarService(config1); @@ -150,9 +150,9 @@ void setup() throws Exception { // Start broker 2 ServiceConfiguration config2 = new ServiceConfiguration(); config2.setClusterName("use"); - config2.setWebServicePort(SECONDARY_BROKER_WEBSERVICE_PORT); + config2.setWebServicePort(Optional.ofNullable(SECONDARY_BROKER_WEBSERVICE_PORT)); config2.setZookeeperServers("127.0.0.1" + ":" + ZOOKEEPER_PORT); - config2.setBrokerServicePort(SECONDARY_BROKER_PORT); + config2.setBrokerServicePort(Optional.ofNullable(SECONDARY_BROKER_PORT)); config2.setLoadManagerClassName(SimpleLoadManagerImpl.class.getName()); pulsar2 = new PulsarService(config2); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/AdvertisedAddressTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/AdvertisedAddressTest.java index 3990168aa22fc..55d136a411fe0 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/AdvertisedAddressTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/AdvertisedAddressTest.java @@ -27,6 +27,8 @@ import com.google.gson.JsonObject; import java.nio.charset.StandardCharsets; +import java.util.Optional; + import org.testng.Assert; import org.testng.annotations.AfterMethod; import org.testng.annotations.BeforeMethod; @@ -49,10 +51,10 @@ public void setup() throws Exception { bkEnsemble.start(); ServiceConfiguration config = new ServiceConfiguration(); config.setZookeeperServers("127.0.0.1" + ":" + ZOOKEEPER_PORT); - config.setWebServicePort(BROKER_WEBSERVICE_PORT); + config.setWebServicePort(Optional.ofNullable(BROKER_WEBSERVICE_PORT)); config.setClusterName("usc"); config.setAdvertisedAddress("localhost"); - config.setBrokerServicePort(BROKER_SERVICE_PORT); + config.setBrokerServicePort(Optional.ofNullable(BROKER_SERVICE_PORT)); config.setAdvertisedAddress(advertisedAddress); config.setManagedLedgerMaxEntriesPerLedger(5); config.setManagedLedgerMinLedgerRolloverTimeMinutes(0); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BacklogQuotaManagerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BacklogQuotaManagerTest.java index a2fb3b202f3ca..fdbc0fc851982 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BacklogQuotaManagerTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BacklogQuotaManagerTest.java @@ -22,6 +22,7 @@ import static org.testng.Assert.assertTrue; import java.net.URL; +import java.util.Optional; import java.util.concurrent.CountDownLatch; import java.util.concurrent.CyclicBarrier; import java.util.concurrent.TimeUnit; @@ -79,9 +80,9 @@ void setup() throws Exception { config = new ServiceConfiguration(); config.setZookeeperServers("127.0.0.1" + ":" + ZOOKEEPER_PORT); config.setAdvertisedAddress("localhost"); - config.setWebServicePort(BROKER_WEBSERVICE_PORT); + config.setWebServicePort(Optional.ofNullable(BROKER_WEBSERVICE_PORT)); config.setClusterName("usc"); - config.setBrokerServicePort(BROKER_SERVICE_PORT); + config.setBrokerServicePort(Optional.ofNullable(BROKER_SERVICE_PORT)); config.setAuthorizationEnabled(false); config.setAuthenticationEnabled(false); config.setBacklogQuotaCheckIntervalInSeconds(TIME_TO_CHECK_BACKLOG_QUOTA); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerBkEnsemblesTests.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerBkEnsemblesTests.java index b7303c65aa5cc..940002d13b8e1 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerBkEnsemblesTests.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerBkEnsemblesTests.java @@ -27,6 +27,7 @@ import java.net.URL; import java.util.Map.Entry; import java.util.NavigableMap; +import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; @@ -95,9 +96,9 @@ protected void setup() throws Exception { config = new ServiceConfiguration(); config.setZookeeperServers("127.0.0.1" + ":" + ZOOKEEPER_PORT); config.setAdvertisedAddress("localhost"); - config.setWebServicePort(BROKER_WEBSERVICE_PORT); + config.setWebServicePort(Optional.ofNullable(BROKER_WEBSERVICE_PORT)); config.setClusterName("usc"); - config.setBrokerServicePort(BROKER_SERVICE_PORT); + config.setBrokerServicePort(Optional.ofNullable(BROKER_SERVICE_PORT)); config.setAuthorizationEnabled(false); config.setAuthenticationEnabled(false); config.setManagedLedgerMaxEntriesPerLedger(5); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java index 418bebf122288..79d6c79040b6f 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java @@ -446,8 +446,8 @@ public void testTlsEnabled() throws Exception { final String subName = "newSub"; conf.setAuthenticationEnabled(false); - conf.setBrokerServicePortTls(BROKER_PORT_TLS); - conf.setWebServicePortTls(BROKER_WEBSERVICE_PORT_TLS); + conf.setBrokerServicePortTls(Optional.of(BROKER_PORT_TLS)); + conf.setWebServicePortTls(Optional.of(BROKER_WEBSERVICE_PORT_TLS)); conf.setTlsCertificateFilePath(TLS_SERVER_CERT_FILE_PATH); conf.setTlsKeyFilePath(TLS_SERVER_KEY_FILE_PATH); restartBroker(); @@ -525,8 +525,8 @@ public void testTlsAuthAllowInsecure() throws Exception { conf.setAuthenticationEnabled(true); conf.setAuthenticationProviders(providers); - conf.setBrokerServicePortTls(BROKER_PORT_TLS); - conf.setWebServicePortTls(BROKER_WEBSERVICE_PORT_TLS); + conf.setBrokerServicePortTls(Optional.of(BROKER_PORT_TLS)); + conf.setWebServicePortTls(Optional.of(BROKER_WEBSERVICE_PORT_TLS)); conf.setTlsCertificateFilePath(TLS_SERVER_CERT_FILE_PATH); conf.setTlsKeyFilePath(TLS_SERVER_KEY_FILE_PATH); conf.setTlsAllowInsecureConnection(true); @@ -585,8 +585,8 @@ public void testTlsAuthDisallowInsecure() throws Exception { conf.setAuthenticationEnabled(true); conf.setAuthenticationProviders(providers); - conf.setBrokerServicePortTls(BROKER_PORT_TLS); - conf.setWebServicePortTls(BROKER_WEBSERVICE_PORT_TLS); + conf.setBrokerServicePortTls(Optional.of(BROKER_PORT_TLS)); + conf.setWebServicePortTls(Optional.of(BROKER_WEBSERVICE_PORT_TLS)); conf.setTlsCertificateFilePath(TLS_SERVER_CERT_FILE_PATH); conf.setTlsKeyFilePath(TLS_SERVER_KEY_FILE_PATH); conf.setTlsAllowInsecureConnection(false); @@ -644,8 +644,8 @@ public void testTlsAuthUseTrustCert() throws Exception { conf.setAuthenticationEnabled(true); conf.setAuthenticationProviders(providers); - conf.setBrokerServicePortTls(BROKER_PORT_TLS); - conf.setWebServicePortTls(BROKER_WEBSERVICE_PORT_TLS); + conf.setBrokerServicePortTls(Optional.of(BROKER_PORT_TLS)); + conf.setWebServicePortTls(Optional.of(BROKER_WEBSERVICE_PORT_TLS)); conf.setTlsCertificateFilePath(TLS_SERVER_CERT_FILE_PATH); conf.setTlsKeyFilePath(TLS_SERVER_KEY_FILE_PATH); conf.setTlsAllowInsecureConnection(false); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/MaxMessageSizeTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/MaxMessageSizeTest.java index 4fb3183ab5a39..626da84654326 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/MaxMessageSizeTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/MaxMessageSizeTest.java @@ -19,6 +19,8 @@ package org.apache.pulsar.broker.service; import com.google.common.collect.Sets; + +import java.util.Optional; import java.util.concurrent.TimeUnit; import org.apache.bookkeeper.conf.ServerConfiguration; import org.apache.bookkeeper.test.PortManager; @@ -60,9 +62,9 @@ void setup() { configuration = new ServiceConfiguration(); configuration.setZookeeperServers("127.0.0.1:" + ZOOKEEPER_PORT); configuration.setAdvertisedAddress("localhost"); - configuration.setWebServicePort(BROKER_WEBSERVER_PORT); + configuration.setWebServicePort(Optional.of(BROKER_WEBSERVER_PORT)); configuration.setClusterName("max_message_test"); - configuration.setBrokerServicePort(BROKER_SERVICE_PORT); + configuration.setBrokerServicePort(Optional.of(BROKER_SERVICE_PORT)); configuration.setAuthorizationEnabled(false); configuration.setAuthenticationEnabled(false); configuration.setManagedLedgerMaxEntriesPerLedger(5); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTestBase.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTestBase.java index 73b9b917294b6..e503c9017e90e 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTestBase.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTestBase.java @@ -28,6 +28,7 @@ import java.net.URL; import java.util.Set; import java.util.TreeSet; +import java.util.Optional; import java.util.concurrent.ExecutorService; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ThreadPoolExecutor; @@ -117,15 +118,15 @@ void setup() throws Exception { // independent config objects instead of referring to the same properties object config1.setClusterName("r1"); config1.setAdvertisedAddress("localhost"); - config1.setWebServicePort(webServicePort1); - config1.setWebServicePortTls(webServicePortTls1); + config1.setWebServicePort(Optional.ofNullable(webServicePort1)); + config1.setWebServicePortTls(Optional.ofNullable(webServicePortTls1)); config1.setZookeeperServers("127.0.0.1:" + zkPort1); config1.setConfigurationStoreServers("127.0.0.1:" + globalZKPort + "/foo"); config1.setBrokerDeleteInactiveTopicsEnabled(isBrokerServicePurgeInactiveTopic()); config1.setBrokerServicePurgeInactiveFrequencyInSeconds( inSec(getBrokerServicePurgeInactiveFrequency(), TimeUnit.SECONDS)); - config1.setBrokerServicePort(PortManager.nextFreePort()); - config1.setBrokerServicePortTls(PortManager.nextFreePort()); + config1.setBrokerServicePort(Optional.ofNullable(PortManager.nextFreePort())); + config1.setBrokerServicePortTls(Optional.ofNullable(PortManager.nextFreePort())); config1.setTlsCertificateFilePath(TLS_SERVER_CERT_FILE_PATH); config1.setTlsKeyFilePath(TLS_SERVER_KEY_FILE_PATH); config1.setTlsTrustCertsFilePath(TLS_SERVER_CERT_FILE_PATH); @@ -150,15 +151,15 @@ void setup() throws Exception { int webServicePortTls2 = PortManager.nextFreePort(); config2.setClusterName("r2"); config2.setAdvertisedAddress("localhost"); - config2.setWebServicePort(webServicePort2); - config2.setWebServicePortTls(webServicePortTls2); + config2.setWebServicePort(Optional.ofNullable(webServicePort2)); + config2.setWebServicePortTls(Optional.ofNullable(webServicePortTls2)); config2.setZookeeperServers("127.0.0.1:" + zkPort2); config2.setConfigurationStoreServers("127.0.0.1:" + globalZKPort + "/foo"); config2.setBrokerDeleteInactiveTopicsEnabled(isBrokerServicePurgeInactiveTopic()); config2.setBrokerServicePurgeInactiveFrequencyInSeconds( inSec(getBrokerServicePurgeInactiveFrequency(), TimeUnit.SECONDS)); - config2.setBrokerServicePort(PortManager.nextFreePort()); - config2.setBrokerServicePortTls(PortManager.nextFreePort()); + config2.setBrokerServicePort(Optional.ofNullable(PortManager.nextFreePort())); + config2.setBrokerServicePortTls(Optional.ofNullable(PortManager.nextFreePort())); config2.setTlsCertificateFilePath(TLS_SERVER_CERT_FILE_PATH); config2.setTlsKeyFilePath(TLS_SERVER_KEY_FILE_PATH); config2.setTlsTrustCertsFilePath(TLS_SERVER_CERT_FILE_PATH); @@ -183,15 +184,15 @@ void setup() throws Exception { int webServicePortTls3 = PortManager.nextFreePort(); config3.setClusterName("r3"); config3.setAdvertisedAddress("localhost"); - config3.setWebServicePort(webServicePort3); - config3.setWebServicePortTls(webServicePortTls3); + config3.setWebServicePort(Optional.ofNullable(webServicePort3)); + config3.setWebServicePortTls(Optional.ofNullable(webServicePortTls3)); config3.setZookeeperServers("127.0.0.1:" + zkPort3); config3.setConfigurationStoreServers("127.0.0.1:" + globalZKPort + "/foo"); config3.setBrokerDeleteInactiveTopicsEnabled(isBrokerServicePurgeInactiveTopic()); config3.setBrokerServicePurgeInactiveFrequencyInSeconds( inSec(getBrokerServicePurgeInactiveFrequency(), TimeUnit.SECONDS)); - config3.setBrokerServicePort(PortManager.nextFreePort()); - config3.setBrokerServicePortTls(PortManager.nextFreePort()); + config3.setBrokerServicePort(Optional.ofNullable(PortManager.nextFreePort())); + config3.setBrokerServicePortTls(Optional.ofNullable(PortManager.nextFreePort())); config3.setTlsEnabled(true); config3.setTlsCertificateFilePath(TLS_SERVER_CERT_FILE_PATH); config3.setTlsKeyFilePath(TLS_SERVER_KEY_FILE_PATH); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/web/WebServiceTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/web/WebServiceTest.java index 38cf4cc8aac7a..22e0819a09d7a 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/web/WebServiceTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/web/WebServiceTest.java @@ -31,6 +31,7 @@ import java.util.HashMap; import java.util.HashSet; import java.util.Map; +import java.util.Optional; import java.util.Set; import javax.net.ssl.HttpsURLConnection; @@ -242,9 +243,9 @@ private void setupEnv(boolean enableFilter, String minApiVersion, boolean allowU ServiceConfiguration config = new ServiceConfiguration(); config.setAdvertisedAddress("localhost"); - config.setWebServicePort(BROKER_WEBSERVICE_PORT); + config.setWebServicePort(Optional.ofNullable(BROKER_WEBSERVICE_PORT)); if (enableTls) { - config.setWebServicePortTls(BROKER_WEBSERVICE_PORT_TLS); + config.setWebServicePortTls(Optional.ofNullable(BROKER_WEBSERVICE_PORT_TLS)); } config.setClientLibraryVersionCheckEnabled(enableFilter); config.setAuthenticationEnabled(enableAuth); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthenticatedProducerConsumerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthenticatedProducerConsumerTest.java index af1281e9bc3f1..f328edcfaf688 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthenticatedProducerConsumerTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthenticatedProducerConsumerTest.java @@ -26,6 +26,7 @@ import java.util.HashMap; import java.util.HashSet; import java.util.Map; +import java.util.Optional; import java.util.Set; import java.util.concurrent.TimeUnit; @@ -71,8 +72,8 @@ protected void setup() throws Exception { conf.setAuthenticationEnabled(true); conf.setAuthorizationEnabled(true); - conf.setBrokerServicePortTls(BROKER_PORT_TLS); - conf.setWebServicePortTls(BROKER_WEBSERVICE_PORT_TLS); + conf.setBrokerServicePortTls(Optional.of(BROKER_PORT_TLS)); + conf.setWebServicePortTls(Optional.of(BROKER_WEBSERVICE_PORT_TLS)); conf.setTlsTrustCertsFilePath(TLS_TRUST_CERT_FILE_PATH); conf.setTlsCertificateFilePath(TLS_SERVER_CERT_FILE_PATH); conf.setTlsKeyFilePath(TLS_SERVER_KEY_FILE_PATH); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthenticationTlsHostnameVerificationTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthenticationTlsHostnameVerificationTest.java index 27f0c615ae6bf..37db6ac67d46d 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthenticationTlsHostnameVerificationTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthenticationTlsHostnameVerificationTest.java @@ -25,6 +25,7 @@ import java.util.HashMap; import java.util.HashSet; import java.util.Map; +import java.util.Optional; import java.util.Set; import java.util.concurrent.TimeUnit; @@ -154,8 +155,8 @@ public void testTlsSyncProducerAndConsumerWithInvalidBrokerHost(boolean hostname this.hostnameVerificationEnabled = hostnameVerificationEnabled; // setup broker cert which has CN = "pulsar" different than broker's hostname="localhost" - conf.setBrokerServicePortTls(BROKER_PORT_TLS); - conf.setWebServicePortTls(BROKER_WEBSERVICE_PORT_TLS); + conf.setBrokerServicePortTls(Optional.of(BROKER_PORT_TLS)); + conf.setWebServicePortTls(Optional.of(BROKER_WEBSERVICE_PORT_TLS)); conf.setTlsTrustCertsFilePath(TLS_MIM_TRUST_CERT_FILE_PATH); conf.setTlsCertificateFilePath(TLS_MIM_SERVER_CERT_FILE_PATH); conf.setTlsKeyFilePath(TLS_MIM_SERVER_KEY_FILE_PATH); @@ -195,8 +196,8 @@ public void testTlsSyncProducerAndConsumerCorrectBrokerHost() throws Exception { log.info("-- Starting {} test --", methodName); // setup broker cert which has CN = "localhost" - conf.setBrokerServicePortTls(BROKER_PORT_TLS); - conf.setWebServicePortTls(BROKER_WEBSERVICE_PORT_TLS); + conf.setBrokerServicePortTls(Optional.of(BROKER_PORT_TLS)); + conf.setWebServicePortTls(Optional.of(BROKER_WEBSERVICE_PORT_TLS)); conf.setTlsTrustCertsFilePath(TLS_TRUST_CERT_FILE_PATH); conf.setTlsCertificateFilePath(TLS_SERVER_CERT_FILE_PATH); conf.setTlsKeyFilePath(TLS_SERVER_KEY_FILE_PATH); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/BrokerServiceLookupTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/BrokerServiceLookupTest.java index b56781b3c25bd..c9d92b669da97 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/BrokerServiceLookupTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/BrokerServiceLookupTest.java @@ -141,8 +141,8 @@ public void testMultipleBrokerLookup() throws Exception { /**** start broker-2 ****/ ServiceConfiguration conf2 = new ServiceConfiguration(); - conf2.setBrokerServicePort(PortManager.nextFreePort()); - conf2.setWebServicePort(PortManager.nextFreePort()); + conf2.setBrokerServicePort(Optional.ofNullable(PortManager.nextFreePort())); + conf2.setWebServicePort(Optional.ofNullable(PortManager.nextFreePort())); conf2.setAdvertisedAddress("localhost"); conf2.setClusterName(conf.getClusterName()); conf2.setZookeeperServers("localhost:2181"); @@ -220,8 +220,8 @@ public void testMultipleBrokerDifferentClusterLookup() throws Exception { final String property = "my-property2"; ServiceConfiguration conf2 = new ServiceConfiguration(); conf2.setAdvertisedAddress("localhost"); - conf2.setBrokerServicePort(PortManager.nextFreePort()); - conf2.setWebServicePort(PortManager.nextFreePort()); + conf2.setBrokerServicePort(Optional.ofNullable(PortManager.nextFreePort())); + conf2.setWebServicePort(Optional.ofNullable(PortManager.nextFreePort())); conf2.setAdvertisedAddress("localhost"); conf2.setClusterName(newCluster); // Broker2 serves newCluster conf2.setZookeeperServers("localhost:2181"); @@ -308,8 +308,8 @@ public void testPartitionTopicLookup() throws Exception { /**** start broker-2 ****/ ServiceConfiguration conf2 = new ServiceConfiguration(); conf2.setAdvertisedAddress("localhost"); - conf2.setBrokerServicePort(PortManager.nextFreePort()); - conf2.setWebServicePort(PortManager.nextFreePort()); + conf2.setBrokerServicePort(Optional.ofNullable(PortManager.nextFreePort())); + conf2.setWebServicePort(Optional.ofNullable(PortManager.nextFreePort())); conf2.setAdvertisedAddress("localhost"); conf2.setClusterName(pulsar.getConfiguration().getClusterName()); conf2.setZookeeperServers("localhost:2181"); @@ -382,10 +382,10 @@ public void testWebserviceServiceTls() throws Exception { /**** start broker-2 ****/ ServiceConfiguration conf2 = new ServiceConfiguration(); conf2.setAdvertisedAddress("localhost"); - conf2.setBrokerServicePort(PortManager.nextFreePort()); - conf2.setBrokerServicePortTls(PortManager.nextFreePort()); - conf2.setWebServicePort(PortManager.nextFreePort()); - conf2.setWebServicePortTls(PortManager.nextFreePort()); + conf2.setBrokerServicePort(Optional.ofNullable(PortManager.nextFreePort())); + conf2.setBrokerServicePortTls(Optional.ofNullable(PortManager.nextFreePort())); + conf2.setWebServicePort(Optional.ofNullable(PortManager.nextFreePort())); + conf2.setWebServicePortTls(Optional.ofNullable(PortManager.nextFreePort())); conf2.setAdvertisedAddress("localhost"); conf2.setTlsAllowInsecureConnection(true); conf2.setTlsCertificateFilePath(TLS_SERVER_CERT_FILE_PATH); @@ -395,8 +395,8 @@ public void testWebserviceServiceTls() throws Exception { PulsarService pulsar2 = startBroker(conf2); // restart broker1 with tls enabled - conf.setBrokerServicePortTls(BROKER_PORT_TLS); - conf.setWebServicePortTls(BROKER_WEBSERVICE_PORT_TLS); + conf.setBrokerServicePortTls(Optional.ofNullable(BROKER_PORT_TLS)); + conf.setWebServicePortTls(Optional.ofNullable(BROKER_WEBSERVICE_PORT_TLS)); conf.setTlsAllowInsecureConnection(true); conf.setTlsCertificateFilePath(TLS_SERVER_CERT_FILE_PATH); conf.setTlsKeyFilePath(TLS_SERVER_KEY_FILE_PATH); @@ -476,7 +476,7 @@ public void testDiscoveryLookup() throws Exception { // (1) start discovery service ServiceConfig config = new ServiceConfig(); - config.setServicePort(nextFreePort()); + config.setServicePort(Optional.of(nextFreePort())); config.setBindOnLocalhost(true); DiscoveryService discoveryService = spy(new DiscoveryService(config)); doReturn(mockZooKeeperClientFactory).when(discoveryService).getZooKeeperClientFactory(); @@ -527,8 +527,8 @@ public void testDiscoveryLookupTls() throws Exception { final String TLS_CLIENT_KEY_FILE_PATH = "./src/test/resources/certificate/client.key"; // (1) restart broker1 with tls enabled - conf.setBrokerServicePortTls(BROKER_PORT_TLS); - conf.setWebServicePortTls(BROKER_WEBSERVICE_PORT_TLS); + conf.setBrokerServicePortTls(Optional.ofNullable(BROKER_PORT_TLS)); + conf.setWebServicePortTls(Optional.ofNullable(BROKER_WEBSERVICE_PORT_TLS)); conf.setTlsAllowInsecureConnection(true); conf.setTlsCertificateFilePath(TLS_SERVER_CERT_FILE_PATH); conf.setTlsKeyFilePath(TLS_SERVER_KEY_FILE_PATH); @@ -537,8 +537,8 @@ public void testDiscoveryLookupTls() throws Exception { // (2) start discovery service ServiceConfig config = new ServiceConfig(); - config.setServicePort(nextFreePort()); - config.setServicePortTls(nextFreePort()); + config.setServicePort(Optional.of(nextFreePort())); + config.setServicePortTls(Optional.of(nextFreePort())); config.setBindOnLocalhost(true); config.setTlsCertificateFilePath(TLS_SERVER_CERT_FILE_PATH); config.setTlsKeyFilePath(TLS_SERVER_KEY_FILE_PATH); @@ -589,7 +589,7 @@ public void testDiscoveryLookupAuthAndAuthSuccess() throws Exception { // (1) start discovery service ServiceConfig config = new ServiceConfig(); - config.setServicePort(nextFreePort()); + config.setServicePort(Optional.of(nextFreePort())); config.setBindOnLocalhost(true); // add Authentication Provider Set providersClassNames = Sets.newHashSet(MockAuthenticationProvider.class.getName()); @@ -662,7 +662,7 @@ public void testDiscoveryLookupAuthenticationFailure() throws Exception { // (1) start discovery service ServiceConfig config = new ServiceConfig(); - config.setServicePort(nextFreePort()); + config.setServicePort(Optional.of(nextFreePort())); config.setBindOnLocalhost(true); // set Authentication provider which fails authentication Set providersClassNames = Sets.newHashSet(MockAuthenticationProviderFail.class.getName()); @@ -721,7 +721,7 @@ public void testDiscoveryLookupAuthorizationFailure() throws Exception { // (1) start discovery service ServiceConfig config = new ServiceConfig(); - config.setServicePort(nextFreePort()); + config.setServicePort(Optional.of(nextFreePort())); config.setBindOnLocalhost(true); // set Authentication provider which returns "invalid" appid so, authorization fails Set providersClassNames = Sets.newHashSet(MockAuthorizationProviderFail.class.getName()); @@ -803,8 +803,8 @@ public void testSplitUnloadLookupTest() throws Exception { // (1) Start broker-1 ServiceConfiguration conf2 = new ServiceConfiguration(); conf2.setAdvertisedAddress("localhost"); - conf2.setBrokerServicePort(PortManager.nextFreePort()); - conf2.setWebServicePort(PortManager.nextFreePort()); + conf2.setBrokerServicePort(Optional.ofNullable(PortManager.nextFreePort())); + conf2.setWebServicePort(Optional.ofNullable(PortManager.nextFreePort())); conf2.setAdvertisedAddress("localhost"); conf2.setClusterName(conf.getClusterName()); conf2.setZookeeperServers("localhost:2181"); @@ -907,8 +907,8 @@ public void testModularLoadManagerSplitBundle() throws Exception { // (1) Start broker-1 ServiceConfiguration conf2 = new ServiceConfiguration(); conf2.setAdvertisedAddress("localhost"); - conf2.setBrokerServicePort(PortManager.nextFreePort()); - conf2.setWebServicePort(PortManager.nextFreePort()); + conf2.setBrokerServicePort(Optional.ofNullable(PortManager.nextFreePort())); + conf2.setWebServicePort(Optional.ofNullable(PortManager.nextFreePort())); conf2.setAdvertisedAddress("localhost"); conf2.setClusterName(conf.getClusterName()); conf2.setLoadManagerClassName(ModularLoadManagerImpl.class.getName()); @@ -1182,4 +1182,4 @@ public String authenticate(AuthenticationDataSource authData) throws Authenticat return "invalid"; } } -} +} \ No newline at end of file diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/NonPersistentTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/NonPersistentTopicTest.java index 2e7a44407f274..9d7392d3f4910 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/NonPersistentTopicTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/NonPersistentTopicTest.java @@ -30,6 +30,7 @@ import java.lang.reflect.Field; import java.net.URL; +import java.util.Optional; import java.util.Set; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; @@ -882,13 +883,13 @@ void setupReplicationCluster() throws Exception { ServiceConfiguration config1 = new ServiceConfiguration(); config1.setClusterName(configClusterName); config1.setAdvertisedAddress("localhost"); - config1.setWebServicePort(webServicePort1); + config1.setWebServicePort(Optional.ofNullable(webServicePort1)); config1.setZookeeperServers("127.0.0.1:" + zkPort1); config1.setConfigurationStoreServers("127.0.0.1:" + globalZKPort + "/foo"); config1.setBrokerDeleteInactiveTopicsEnabled(isBrokerServicePurgeInactiveTopic()); config1.setBrokerServicePurgeInactiveFrequencyInSeconds( inSec(getBrokerServicePurgeInactiveFrequency(), TimeUnit.SECONDS)); - config1.setBrokerServicePort(PortManager.nextFreePort()); + config1.setBrokerServicePort(Optional.ofNullable(PortManager.nextFreePort())); config1.setBacklogQuotaCheckIntervalInSeconds(TIME_TO_CHECK_BACKLOG_QUOTA); pulsar1 = new PulsarService(config1); pulsar1.start(); @@ -907,14 +908,14 @@ void setupReplicationCluster() throws Exception { int webServicePort2 = PortManager.nextFreePort(); config2 = new ServiceConfiguration(); config2.setClusterName("r2"); - config2.setWebServicePort(webServicePort2); + config2.setWebServicePort(Optional.ofNullable(webServicePort2)); config2.setAdvertisedAddress("localhost"); config2.setZookeeperServers("127.0.0.1:" + zkPort2); config2.setConfigurationStoreServers("127.0.0.1:" + globalZKPort + "/foo"); config2.setBrokerDeleteInactiveTopicsEnabled(isBrokerServicePurgeInactiveTopic()); config2.setBrokerServicePurgeInactiveFrequencyInSeconds( inSec(getBrokerServicePurgeInactiveFrequency(), TimeUnit.SECONDS)); - config2.setBrokerServicePort(PortManager.nextFreePort()); + config2.setBrokerServicePort(Optional.ofNullable(PortManager.nextFreePort())); config2.setBacklogQuotaCheckIntervalInSeconds(TIME_TO_CHECK_BACKLOG_QUOTA); pulsar2 = new PulsarService(config2); pulsar2.start(); @@ -933,14 +934,14 @@ void setupReplicationCluster() throws Exception { int webServicePort3 = PortManager.nextFreePort(); config3 = new ServiceConfiguration(); config3.setClusterName("r3"); - config3.setWebServicePort(webServicePort3); + config3.setWebServicePort(Optional.ofNullable(webServicePort3)); config3.setAdvertisedAddress("localhost"); config3.setZookeeperServers("127.0.0.1:" + zkPort3); config3.setConfigurationStoreServers("127.0.0.1:" + globalZKPort + "/foo"); config3.setBrokerDeleteInactiveTopicsEnabled(isBrokerServicePurgeInactiveTopic()); config3.setBrokerServicePurgeInactiveFrequencyInSeconds( inSec(getBrokerServicePurgeInactiveFrequency(), TimeUnit.SECONDS)); - config3.setBrokerServicePort(PortManager.nextFreePort()); + config3.setBrokerServicePort(Optional.ofNullable(PortManager.nextFreePort())); pulsar3 = new PulsarService(config3); pulsar3.start(); ns3 = pulsar3.getBrokerService(); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ServiceUrlProviderTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ServiceUrlProviderTest.java index 1badfa2166414..167fc7a85f39d 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ServiceUrlProviderTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ServiceUrlProviderTest.java @@ -18,7 +18,9 @@ */ package org.apache.pulsar.client.api; -import lombok.extern.slf4j.Slf4j; +import java.util.Optional; +import java.util.concurrent.TimeUnit; + import org.apache.bookkeeper.test.PortManager; import org.apache.pulsar.broker.PulsarService; import org.apache.pulsar.client.impl.ConsumerImpl; @@ -29,7 +31,7 @@ import org.testng.annotations.BeforeClass; import org.testng.annotations.Test; -import java.util.concurrent.TimeUnit; +import lombok.extern.slf4j.Slf4j; @Slf4j public class ServiceUrlProviderTest extends ProducerConsumerBase { @@ -102,8 +104,8 @@ public void testCreateClientWithAutoChangedServiceUrlProvider() throws Exception .subscribe(); PulsarService pulsarService1 = pulsar; - conf.setBrokerServicePort(PortManager.nextFreePort()); - conf.setWebServicePort(PortManager.nextFreePort()); + conf.setBrokerServicePort(Optional.ofNullable(PortManager.nextFreePort())); + conf.setWebServicePort(Optional.ofNullable(PortManager.nextFreePort())); startBroker(); PulsarService pulsarService2 = pulsar; @@ -167,4 +169,4 @@ public void onServiceUrlChanged(String newServiceUrl) throws PulsarClientExcepti this.getPulsarClient().updateServiceUrl(newServiceUrl); } } -} +} \ No newline at end of file diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TlsProducerConsumerBase.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TlsProducerConsumerBase.java index a496531aecbbc..9a66712c60a77 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TlsProducerConsumerBase.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TlsProducerConsumerBase.java @@ -22,6 +22,7 @@ import java.util.HashMap; import java.util.Map; +import java.util.Optional; import java.util.Set; import org.apache.pulsar.client.admin.PulsarAdmin; @@ -58,8 +59,8 @@ protected void cleanup() throws Exception { } protected void internalSetUpForBroker() throws Exception { - conf.setBrokerServicePortTls(BROKER_PORT_TLS); - conf.setWebServicePortTls(BROKER_WEBSERVICE_PORT_TLS); + conf.setBrokerServicePortTls(Optional.of(BROKER_PORT_TLS)); + conf.setWebServicePortTls(Optional.of(BROKER_WEBSERVICE_PORT_TLS)); conf.setTlsCertificateFilePath(TLS_SERVER_CERT_FILE_PATH); conf.setTlsKeyFilePath(TLS_SERVER_KEY_FILE_PATH); conf.setTlsTrustCertsFilePath(TLS_TRUST_CERT_FILE_PATH); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/discovery/service/web/DiscoveryServiceWebTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/discovery/service/web/DiscoveryServiceWebTest.java index da28ee86be679..3300f6cc7e2c7 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/discovery/service/web/DiscoveryServiceWebTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/discovery/service/web/DiscoveryServiceWebTest.java @@ -22,6 +22,7 @@ import static org.testng.Assert.fail; import java.util.Map; +import java.util.Optional; import java.util.TreeMap; import java.util.concurrent.CompletableFuture; @@ -80,7 +81,7 @@ public void testRedirectUrlWithServerStarted() throws Exception { // 1. start server int port = PortManager.nextFreePort(); ServiceConfig config = new ServiceConfig(); - config.setWebServicePort(port); + config.setWebServicePort(Optional.of(port)); ServerManager server = new ServerManager(config); DiscoveryZooKeeperClientFactoryImpl.zk = mockZookKeeper; Map params = new TreeMap<>(); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionE2ESecurityTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionE2ESecurityTest.java index 775a4bfa5a316..84d627f4f9639 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionE2ESecurityTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionE2ESecurityTest.java @@ -127,9 +127,9 @@ void setup(Method method) throws Exception { config.setClusterName("use"); Set superUsers = Sets.newHashSet(ADMIN_SUBJECT); config.setSuperUserRoles(superUsers); - config.setWebServicePort(brokerWebServicePort); + config.setWebServicePort(Optional.of(brokerWebServicePort)); config.setZookeeperServers("127.0.0.1" + ":" + ZOOKEEPER_PORT); - config.setBrokerServicePort(brokerServicePort); + config.setBrokerServicePort(Optional.of(brokerServicePort)); config.setLoadManagerClassName(SimpleLoadManagerImpl.class.getName()); config.setAdvertisedAddress("localhost"); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionPublishTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionPublishTest.java index e3b41313188b6..ccd912bd45530 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionPublishTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionPublishTest.java @@ -148,11 +148,11 @@ public boolean accept(File dir, String name) { config.setClusterName("use"); Set superUsers = Sets.newHashSet("superUser"); config.setSuperUserRoles(superUsers); - config.setWebServicePort(brokerWebServicePort); - config.setWebServicePortTls(brokerWebServiceTlsPort); + config.setWebServicePort(Optional.of(brokerWebServicePort)); + config.setWebServicePortTls(Optional.of(brokerWebServiceTlsPort)); config.setZookeeperServers("127.0.0.1" + ":" + ZOOKEEPER_PORT); - config.setBrokerServicePort(brokerServicePort); - config.setBrokerServicePortTls(brokerServiceTlsPort); + config.setBrokerServicePort(Optional.of(brokerServicePort)); + config.setBrokerServicePortTls(Optional.of(brokerServiceTlsPort)); config.setLoadManagerClassName(SimpleLoadManagerImpl.class.getName()); config.setTlsAllowInsecureConnection(true); config.setAdvertisedAddress("localhost"); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionStateTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionStateTest.java index 69f19a072b4c4..7f590dfaae386 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionStateTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionStateTest.java @@ -171,11 +171,11 @@ public boolean accept(File dir, String name) { config.setClusterName("use"); Set superUsers = Sets.newHashSet("superUser"); config.setSuperUserRoles(superUsers); - config.setWebServicePort(brokerWebServicePort); - config.setWebServicePortTls(brokerWebServiceTlsPort); + config.setWebServicePort(Optional.of(brokerWebServicePort)); + config.setWebServicePortTls(Optional.of(brokerWebServiceTlsPort)); config.setZookeeperServers("127.0.0.1" + ":" + ZOOKEEPER_PORT); - config.setBrokerServicePort(brokerServicePort); - config.setBrokerServicePortTls(brokerServiceTlsPort); + config.setBrokerServicePort(Optional.of(brokerServicePort)); + config.setBrokerServicePortTls(Optional.of(brokerServiceTlsPort)); config.setLoadManagerClassName(SimpleLoadManagerImpl.class.getName()); config.setTlsAllowInsecureConnection(true); config.setAdvertisedAddress("localhost"); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarWorkerAssignmentTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarWorkerAssignmentTest.java index febdadd832edc..55e4a70dcc812 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarWorkerAssignmentTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarWorkerAssignmentTest.java @@ -97,9 +97,9 @@ void setup(Method method) throws Exception { config.setClusterName("use"); final Set superUsers = Sets.newHashSet("superUser"); config.setSuperUserRoles(superUsers); - config.setWebServicePort(brokerWebServicePort); + config.setWebServicePort(Optional.ofNullable(brokerWebServicePort)); config.setZookeeperServers("127.0.0.1" + ":" + ZOOKEEPER_PORT); - config.setBrokerServicePort(brokerServicePort); + config.setBrokerServicePort(Optional.ofNullable(brokerServicePort)); config.setLoadManagerClassName(SimpleLoadManagerImpl.class.getName()); config.setAdvertisedAddress("localhost"); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionAdminTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionAdminTest.java index 6b65f9ca04261..a019e8b2105ec 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionAdminTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionAdminTest.java @@ -105,11 +105,11 @@ void setup(Method method) throws Exception { config.setClusterName("use"); Set superUsers = Sets.newHashSet("superUser"); config.setSuperUserRoles(superUsers); - config.setWebServicePort(brokerWebServicePort); - config.setWebServicePortTls(brokerWebServiceTlsPort); + config.setWebServicePort(Optional.ofNullable(brokerWebServicePort)); + config.setWebServicePortTls(Optional.ofNullable(brokerWebServiceTlsPort)); config.setZookeeperServers("127.0.0.1" + ":" + ZOOKEEPER_PORT); - config.setBrokerServicePort(brokerServicePort); - config.setBrokerServicePortTls(brokerServiceTlsPort); + config.setBrokerServicePort(Optional.ofNullable(brokerServicePort)); + config.setBrokerServicePortTls(Optional.ofNullable(brokerServiceTlsPort)); config.setLoadManagerClassName(SimpleLoadManagerImpl.class.getName()); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionE2ETest.java b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionE2ETest.java index 40b1783e6db7c..f35aec8d9141a 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionE2ETest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionE2ETest.java @@ -170,11 +170,11 @@ public boolean accept(File dir, String name) { config.setClusterName("use"); Set superUsers = Sets.newHashSet("superUser"); config.setSuperUserRoles(superUsers); - config.setWebServicePort(brokerWebServicePort); - config.setWebServicePortTls(brokerWebServiceTlsPort); + config.setWebServicePort(Optional.ofNullable(brokerWebServicePort)); + config.setWebServicePortTls(Optional.ofNullable(brokerWebServiceTlsPort)); config.setZookeeperServers("127.0.0.1" + ":" + ZOOKEEPER_PORT); - config.setBrokerServicePort(brokerServicePort); - config.setBrokerServicePortTls(brokerServiceTlsPort); + config.setBrokerServicePort(Optional.ofNullable(brokerServicePort)); + config.setBrokerServicePortTls(Optional.ofNullable(brokerServiceTlsPort)); config.setLoadManagerClassName(SimpleLoadManagerImpl.class.getName()); config.setTlsAllowInsecureConnection(true); config.setAdvertisedAddress("localhost"); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyAuthenticationTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyAuthenticationTest.java index c59bf47ba0567..725cad5c53cee 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyAuthenticationTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyAuthenticationTest.java @@ -24,6 +24,7 @@ import com.google.common.collect.Sets; import java.net.URI; +import java.util.Optional; import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; @@ -64,7 +65,7 @@ public void setup() throws Exception { port = PortManager.nextFreePort(); WebSocketProxyConfiguration config = new WebSocketProxyConfiguration(); - config.setWebServicePort(port); + config.setWebServicePort(Optional.of(port)); config.setClusterName("test"); config.setAuthenticationEnabled(true); // If this is not set, 500 error occurs. diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyAuthorizationTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyAuthorizationTest.java index b79862f3d9e64..9df17c7175353 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyAuthorizationTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyAuthorizationTest.java @@ -23,6 +23,7 @@ import static org.testng.Assert.assertEquals; import java.util.EnumSet; +import java.util.Optional; import java.util.Set; import org.apache.bookkeeper.test.PortManager; @@ -62,7 +63,7 @@ protected void setup() throws Exception { config.setConfigurationStoreServers("dummy-zk-servers"); config.setSuperUserRoles(superUser); config.setClusterName("c1"); - config.setWebServicePort(TEST_PORT); + config.setWebServicePort(Optional.of(TEST_PORT)); service = spy(new WebSocketService(config)); doReturn(mockZooKeeperClientFactory).when(service).getZooKeeperClientFactory(); service.start(); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyConfigurationTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyConfigurationTest.java index 7a8a29909090e..a640776cd8664 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyConfigurationTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyConfigurationTest.java @@ -22,6 +22,8 @@ import static org.mockito.Mockito.spy; import static org.testng.Assert.assertEquals; +import java.util.Optional; + import org.apache.bookkeeper.test.PortManager; import org.apache.pulsar.client.api.ProducerConsumerBase; import org.apache.pulsar.client.impl.PulsarClientImpl; @@ -41,7 +43,7 @@ public void setup() throws Exception { super.producerBaseSetup(); config = new WebSocketProxyConfiguration(); - config.setWebServicePort(PortManager.nextFreePort()); + config.setWebServicePort(Optional.of(PortManager.nextFreePort())); config.setClusterName("test"); config.setConfigurationStoreServers("dummy-zk-servers"); } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPublishConsumeTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPublishConsumeTest.java index 6f08cc8595fe3..9a260362ec79e 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPublishConsumeTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPublishConsumeTest.java @@ -29,6 +29,7 @@ import java.net.URI; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.Map.Entry; import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; @@ -84,7 +85,7 @@ public void setup() throws Exception { port = PortManager.nextFreePort(); WebSocketProxyConfiguration config = new WebSocketProxyConfiguration(); - config.setWebServicePort(port); + config.setWebServicePort(Optional.of(port)); config.setClusterName("test"); config.setConfigurationStoreServers("dummy-zk-servers"); service = spy(new WebSocketService(config)); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPublishConsumeTlsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPublishConsumeTlsTest.java index 96f3c758e9f79..86e27ab8dcc23 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPublishConsumeTlsTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPublishConsumeTlsTest.java @@ -26,6 +26,7 @@ import java.security.GeneralSecurityException; import java.util.HashMap; import java.util.Map; +import java.util.Optional; import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; @@ -65,8 +66,8 @@ public void setup() throws Exception { port = PortManager.nextFreePort(); tlsPort = PortManager.nextFreePort(); WebSocketProxyConfiguration config = new WebSocketProxyConfiguration(); - config.setWebServicePort(port); - config.setWebServicePortTls(tlsPort); + config.setWebServicePort(Optional.of(port)); + config.setWebServicePortTls(Optional.of(tlsPort)); config.setBrokerClientTlsEnabled(true); config.setTlsKeyFilePath(TLS_SERVER_KEY_FILE_PATH); config.setTlsCertificateFilePath(TLS_SERVER_CERT_FILE_PATH); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPublishConsumeWithoutZKTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPublishConsumeWithoutZKTest.java index 0f70cc2e6becc..debe0f0c3f36b 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPublishConsumeWithoutZKTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPublishConsumeWithoutZKTest.java @@ -23,6 +23,7 @@ import static org.mockito.Mockito.spy; import java.net.URI; +import java.util.Optional; import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; @@ -56,7 +57,7 @@ public void setup() throws Exception { port = PortManager.nextFreePort(); WebSocketProxyConfiguration config = new WebSocketProxyConfiguration(); - config.setWebServicePort(port); + config.setWebServicePort(Optional.of(port)); config.setClusterName("test"); config.setServiceUrl(pulsar.getWebServiceAddress()); config.setServiceUrlTls(pulsar.getWebServiceAddressTls()); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/v1/V1_ProxyAuthenticationTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/v1/V1_ProxyAuthenticationTest.java index b13a6c64cfddf..48ff28ce32dea 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/v1/V1_ProxyAuthenticationTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/v1/V1_ProxyAuthenticationTest.java @@ -25,6 +25,7 @@ import com.google.common.collect.Sets; import java.net.URI; +import java.util.Optional; import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; @@ -69,7 +70,7 @@ public void setup() throws Exception { port = PortManager.nextFreePort(); WebSocketProxyConfiguration config = new WebSocketProxyConfiguration(); - config.setWebServicePort(port); + config.setWebServicePort(Optional.of(port)); config.setClusterName("use"); config.setAuthenticationEnabled(true); // If this is not set, 500 error occurs. diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/util/FieldParser.java b/pulsar-common/src/main/java/org/apache/pulsar/common/util/FieldParser.java index d344c30ce5b3d..d81cd75b60af4 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/util/FieldParser.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/util/FieldParser.java @@ -21,20 +21,24 @@ import static com.google.common.base.Preconditions.checkNotNull; import static java.lang.String.format; -import com.fasterxml.jackson.databind.util.EnumResolver; - import java.lang.reflect.Field; import java.lang.reflect.Method; import java.lang.reflect.ParameterizedType; +import java.lang.reflect.Type; import java.util.Arrays; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.Set; import java.util.stream.Collectors; import org.apache.commons.lang3.StringUtils; +import com.fasterxml.jackson.databind.util.EnumResolver; +import com.google.common.collect.Lists; +import com.google.common.collect.Sets; + import io.netty.util.internal.StringUtil; /** @@ -139,6 +143,8 @@ public static void update(Map properties, T obj) throws Ille String v = (String) properties.get(f.getName()); if (!StringUtils.isBlank(v)) { f.set(obj, value(v, f)); + } else { + setEmptyValue(v, f, obj); } } catch (Exception e) { throw new IllegalArgumentException(format("failed to initialize %s field while setting value %s", @@ -160,22 +166,61 @@ public static void update(Map properties, T obj) throws Ille public static Object value(String strValue, Field field) { checkNotNull(field); // if field is not primitive type - if (field.getGenericType() instanceof ParameterizedType) { + Type fieldType = field.getGenericType(); + if (fieldType instanceof ParameterizedType) { Class clazz = (Class) ((ParameterizedType) field.getGenericType()).getActualTypeArguments()[0]; // convert to list - if (field.getType().equals(List.class)) + if (field.getType().equals(List.class)) { return stringToList(strValue, clazz); - // convert to set - else if (field.getType().equals(Set.class)) + } // convert to set + else if (field.getType().equals(Set.class)) { return stringToSet(strValue, clazz); - else + } else if (field.getType().equals(Optional.class)) { + Type typeClazz = ((ParameterizedType) fieldType).getActualTypeArguments()[0]; + if (typeClazz instanceof ParameterizedType) { + throw new IllegalArgumentException(format("unsupported non-primitive Optional<%s> for %s", + typeClazz.getClass(), field.getName())); + } + return Optional.ofNullable(convert(strValue, (Class) typeClazz)); + } else { throw new IllegalArgumentException( format("unsupported field-type %s for %s", field.getType(), field.getName())); + } } else { return convert(strValue, field.getType()); } } + /** + * Sets the empty/null value if field is allowed to be set empty + * + * @param strValue + * @param field + * @param obj + * @throws IllegalArgumentException + * @throws IllegalAccessException + */ + public static void setEmptyValue(String strValue, Field field, T obj) + throws IllegalArgumentException, IllegalAccessException { + checkNotNull(field); + // if field is not primitive type + Type fieldType = field.getGenericType(); + if (fieldType instanceof ParameterizedType) { + if (field.getType().equals(List.class)) { + field.set(obj, Lists.newArrayList()); + } else if (field.getType().equals(Set.class)) { + field.set(obj, Sets.newHashSet()); + } else if (field.getType().equals(Optional.class)) { + field.set(obj, Optional.empty()); + } else { + throw new IllegalArgumentException( + format("unsupported field-type %s for %s", field.getType(), field.getName())); + } + } else if (Number.class.isAssignableFrom(field.getType()) || fieldType.getClass().equals(String.class)) { + field.set(obj, null); + } + } + private static Class wrap(Class type) { return WRAPPER_TYPES.containsKey(type) ? WRAPPER_TYPES.get(type) : type; } diff --git a/pulsar-discovery-service/src/main/java/org/apache/pulsar/discovery/service/server/ServiceConfig.java b/pulsar-discovery-service/src/main/java/org/apache/pulsar/discovery/service/server/ServiceConfig.java index 5f89527a8d241..fa750ff010b69 100644 --- a/pulsar-discovery-service/src/main/java/org/apache/pulsar/discovery/service/server/ServiceConfig.java +++ b/pulsar-discovery-service/src/main/java/org/apache/pulsar/discovery/service/server/ServiceConfig.java @@ -46,13 +46,13 @@ public class ServiceConfig implements PulsarConfiguration { private int zookeeperSessionTimeoutMs = 30_000; // Port to use to server binary-proto request - private Integer servicePort = 5000; + private Optional servicePort = Optional.ofNullable(5000); // Port to use to server binary-proto-tls request - private Integer servicePortTls; + private Optional servicePortTls = Optional.empty(); // Port to use to server HTTP request - private Integer webServicePort = 8080; + private Optional webServicePort = Optional.ofNullable(8080); // Port to use to server HTTPS request - private Integer webServicePortTls; + private Optional webServicePortTls = Optional.empty(); // Control whether to bind directly on localhost rather than on normal // hostname private boolean bindOnLocalhost = false; @@ -135,34 +135,34 @@ public void setZookeeperSessionTimeoutMs(int zookeeperSessionTimeoutMs) { } public Optional getServicePort() { - return Optional.ofNullable(servicePort); + return servicePort; } - public void setServicePort(int servicePort) { + public void setServicePort(Optional servicePort) { this.servicePort = servicePort; } public Optional getServicePortTls() { - return Optional.ofNullable(servicePortTls); + return servicePortTls; } - public void setServicePortTls(int servicePortTls) { + public void setServicePortTls(Optional servicePortTls) { this.servicePortTls = servicePortTls; } public Optional getWebServicePort() { - return Optional.ofNullable(webServicePort); + return webServicePort; } - public void setWebServicePort(int webServicePort) { + public void setWebServicePort(Optional webServicePort) { this.webServicePort = webServicePort; } public Optional getWebServicePortTls() { - return Optional.ofNullable(webServicePortTls); + return webServicePortTls; } - public void setWebServicePortTls(int webServicePortTls) { + public void setWebServicePortTls(Optional webServicePortTls) { this.webServicePortTls = webServicePortTls; } diff --git a/pulsar-discovery-service/src/test/java/org/apache/pulsar/discovery/service/BaseDiscoveryTestSetup.java b/pulsar-discovery-service/src/test/java/org/apache/pulsar/discovery/service/BaseDiscoveryTestSetup.java index 88e4a7fe0d7e6..668f423deb7fc 100644 --- a/pulsar-discovery-service/src/test/java/org/apache/pulsar/discovery/service/BaseDiscoveryTestSetup.java +++ b/pulsar-discovery-service/src/test/java/org/apache/pulsar/discovery/service/BaseDiscoveryTestSetup.java @@ -21,6 +21,7 @@ import static org.apache.bookkeeper.test.PortManager.nextFreePort; import static org.apache.pulsar.discovery.service.web.ZookeeperCacheLoader.LOADBALANCE_BROKERS_ROOT; +import java.util.Optional; import java.util.concurrent.CompletableFuture; import org.apache.bookkeeper.util.ZkUtils; @@ -49,8 +50,8 @@ public class BaseDiscoveryTestSetup { protected void setup() throws Exception { config = new ServiceConfig(); - config.setServicePort(nextFreePort()); - config.setServicePortTls(nextFreePort()); + config.setServicePort(Optional.ofNullable(nextFreePort())); + config.setServicePortTls(Optional.ofNullable(nextFreePort())); config.setBindOnLocalhost(true); config.setTlsCertificateFilePath(TLS_SERVER_CERT_FILE_PATH); diff --git a/pulsar-discovery-service/src/test/java/org/apache/pulsar/discovery/service/web/DiscoveryServiceWebTest.java b/pulsar-discovery-service/src/test/java/org/apache/pulsar/discovery/service/web/DiscoveryServiceWebTest.java index 452daaae88b4c..ece064b89d107 100644 --- a/pulsar-discovery-service/src/test/java/org/apache/pulsar/discovery/service/web/DiscoveryServiceWebTest.java +++ b/pulsar-discovery-service/src/test/java/org/apache/pulsar/discovery/service/web/DiscoveryServiceWebTest.java @@ -33,6 +33,7 @@ import java.security.SecureRandom; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.TreeMap; import java.util.stream.Collectors; @@ -136,7 +137,7 @@ public void testRiderectUrlWithServerStarted() throws Exception { // 1. start server int port = nextFreePort(); ServiceConfig config = new ServiceConfig(); - config.setWebServicePort(port); + config.setWebServicePort(Optional.ofNullable(port)); ServerManager server = new ServerManager(config); DiscoveryZooKeeperClientFactoryImpl.zk = mockZookKeeper; Map params = new TreeMap<>(); @@ -192,8 +193,8 @@ public void testTlsEnable() throws Exception { int port = nextFreePort(); int tlsPort = nextFreePort(); ServiceConfig config = new ServiceConfig(); - config.setWebServicePort(port); - config.setWebServicePortTls(tlsPort); + config.setWebServicePort(Optional.ofNullable(port)); + config.setWebServicePortTls(Optional.ofNullable(tlsPort)); config.setTlsCertificateFilePath(TLS_SERVER_CERT_FILE_PATH); config.setTlsKeyFilePath(TLS_SERVER_KEY_FILE_PATH); ServerManager server = new ServerManager(config); diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConfiguration.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConfiguration.java index 1ca1159c772c3..389e1d297ce06 100644 --- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConfiguration.java +++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConfiguration.java @@ -128,23 +128,23 @@ public class ProxyConfiguration implements PulsarConfiguration { category = CATEGORY_SERVER, doc = "The port for serving binary protobuf request" ) - private Integer servicePort = 6650; + private Optional servicePort = Optional.ofNullable(6650); @FieldContext( category = CATEGORY_SERVER, doc = "The port for serving tls secured binary protobuf request" ) - private Integer servicePortTls; + private Optional servicePortTls = Optional.empty(); @FieldContext( category = CATEGORY_SERVER, doc = "The port for serving http requests" ) - private Integer webServicePort = 8080; + private Optional webServicePort = Optional.ofNullable(8080); @FieldContext( category = CATEGORY_SERVER, doc = "The port for serving https requests" ) - private Integer webServicePortTls; + private Optional webServicePortTls = Optional.empty(); @FieldContext( category = CATEGORY_SERVER, @@ -371,7 +371,7 @@ public Properties getProperties() { } public Optional getServicePort() { - return Optional.ofNullable(servicePort); + return servicePort; } public Optional getproxyLogLevel() { @@ -382,15 +382,15 @@ public void setProxyLogLevel(int proxyLogLevel) { } public Optional getServicePortTls() { - return Optional.ofNullable(servicePortTls); + return servicePortTls; } public Optional getWebServicePort() { - return Optional.ofNullable(webServicePort); + return webServicePort; } public Optional getWebServicePortTls() { - return Optional.ofNullable(webServicePortTls); + return webServicePortTls; } public void setProperties(Properties properties) { diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/AuthedAdminProxyHandlerTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/AuthedAdminProxyHandlerTest.java index 7faf848389b48..6c2cabe287484 100644 --- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/AuthedAdminProxyHandlerTest.java +++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/AuthedAdminProxyHandlerTest.java @@ -21,6 +21,8 @@ import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.spy; +import java.util.Optional; + import com.google.common.collect.ImmutableSet; import com.google.common.collect.ImmutableMap; @@ -63,8 +65,8 @@ protected void setup() throws Exception { // enable tls and auth&auth at broker conf.setAuthenticationEnabled(true); conf.setAuthorizationEnabled(true); - conf.setBrokerServicePortTls(BROKER_PORT_TLS); - conf.setWebServicePortTls(BROKER_WEBSERVICE_PORT_TLS); + conf.setBrokerServicePortTls(Optional.ofNullable(BROKER_PORT_TLS)); + conf.setWebServicePortTls(Optional.ofNullable(BROKER_WEBSERVICE_PORT_TLS)); conf.setTlsTrustCertsFilePath(getTlsFile("ca.cert")); conf.setTlsCertificateFilePath(getTlsFile("broker.cert")); conf.setTlsKeyFilePath(getTlsFile("broker.key-pk8")); @@ -78,10 +80,10 @@ protected void setup() throws Exception { // start proxy service proxyConfig.setAuthenticationEnabled(true); proxyConfig.setAuthorizationEnabled(true); - proxyConfig.setServicePort(PortManager.nextFreePort()); - proxyConfig.setServicePortTls(PortManager.nextFreePort()); - proxyConfig.setWebServicePort(PortManager.nextFreePort()); - proxyConfig.setWebServicePortTls(PortManager.nextFreePort()); + proxyConfig.setServicePort(Optional.ofNullable(PortManager.nextFreePort())); + proxyConfig.setServicePortTls(Optional.ofNullable(PortManager.nextFreePort())); + proxyConfig.setWebServicePort(Optional.ofNullable(PortManager.nextFreePort())); + proxyConfig.setWebServicePortTls(Optional.ofNullable(PortManager.nextFreePort())); proxyConfig.setTlsEnabledWithBroker(true); // enable tls and auth&auth at proxy diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyAuthenticatedProducerConsumerTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyAuthenticatedProducerConsumerTest.java index fe6f9c2cd635f..cd76e163564bf 100644 --- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyAuthenticatedProducerConsumerTest.java +++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyAuthenticatedProducerConsumerTest.java @@ -23,6 +23,7 @@ import java.util.HashSet; import java.util.Map; +import java.util.Optional; import java.util.Set; import java.util.concurrent.TimeUnit; @@ -74,8 +75,8 @@ protected void setup() throws Exception { conf.setAuthenticationEnabled(true); conf.setAuthorizationEnabled(true); - conf.setBrokerServicePortTls(BROKER_PORT_TLS); - conf.setWebServicePortTls(BROKER_WEBSERVICE_PORT_TLS); + conf.setBrokerServicePortTls(Optional.ofNullable(BROKER_PORT_TLS)); + conf.setWebServicePortTls(Optional.ofNullable(BROKER_WEBSERVICE_PORT_TLS)); conf.setTlsTrustCertsFilePath(TLS_TRUST_CERT_FILE_PATH); conf.setTlsCertificateFilePath(TLS_SERVER_CERT_FILE_PATH); conf.setTlsKeyFilePath(TLS_SERVER_KEY_FILE_PATH); @@ -102,10 +103,10 @@ protected void setup() throws Exception { proxyConfig.setAuthenticationEnabled(true); proxyConfig.setAuthenticationEnabled(true); - proxyConfig.setServicePort(PortManager.nextFreePort()); - proxyConfig.setServicePortTls(PortManager.nextFreePort()); - proxyConfig.setWebServicePort(PortManager.nextFreePort()); - proxyConfig.setWebServicePortTls(PortManager.nextFreePort()); + proxyConfig.setServicePort(Optional.ofNullable(PortManager.nextFreePort())); + proxyConfig.setServicePortTls(Optional.ofNullable(PortManager.nextFreePort())); + proxyConfig.setWebServicePort(Optional.ofNullable(PortManager.nextFreePort())); + proxyConfig.setWebServicePortTls(Optional.ofNullable(PortManager.nextFreePort())); proxyConfig.setTlsEnabledWithBroker(true); // enable tls and auth&auth at proxy diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyAuthenticationTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyAuthenticationTest.java index 694eb750aa24b..27f8d6e9528c5 100644 --- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyAuthenticationTest.java +++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyAuthenticationTest.java @@ -24,6 +24,7 @@ import java.util.HashMap; import java.util.HashSet; import java.util.Map; +import java.util.Optional; import java.util.Map.Entry; import java.util.Set; @@ -216,8 +217,8 @@ void testAuthentication() throws Exception { // Step 2: Try to use proxy Client as a normal Client - expect exception ProxyConfiguration proxyConfig = new ProxyConfiguration(); proxyConfig.setAuthenticationEnabled(true); - proxyConfig.setServicePort(servicePort); - proxyConfig.setWebServicePort(webServicePort); + proxyConfig.setServicePort(Optional.ofNullable(servicePort)); + proxyConfig.setWebServicePort(Optional.ofNullable(webServicePort)); proxyConfig.setBrokerServiceURL("pulsar://localhost:" + BROKER_PORT); proxyConfig.setBrokerClientAuthenticationPlugin(BasicAuthentication.class.getName()); diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyConnectionThrottlingTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyConnectionThrottlingTest.java index 4813615a4bcc7..1946ad4be2de7 100644 --- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyConnectionThrottlingTest.java +++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyConnectionThrottlingTest.java @@ -20,6 +20,8 @@ import static org.mockito.Mockito.doReturn; +import java.util.Optional; + import org.apache.bookkeeper.test.PortManager; import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest; import org.apache.pulsar.broker.authentication.AuthenticationService; @@ -48,7 +50,7 @@ public class ProxyConnectionThrottlingTest extends MockedPulsarServiceBaseTest { protected void setup() throws Exception { internalSetup(); - proxyConfig.setServicePort(PortManager.nextFreePort()); + proxyConfig.setServicePort(Optional.ofNullable(PortManager.nextFreePort())); proxyConfig.setZookeeperServers(DUMMY_VALUE); proxyConfig.setConfigurationStoreServers(DUMMY_VALUE); proxyConfig.setMaxConcurrentLookupRequests(NUM_CONCURRENT_LOOKUP); diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyForwardAuthDataTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyForwardAuthDataTest.java index 4997de0165612..fc5260f72eaab 100644 --- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyForwardAuthDataTest.java +++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyForwardAuthDataTest.java @@ -23,6 +23,7 @@ import com.google.common.collect.Sets; import java.util.HashSet; +import java.util.Optional; import java.util.Set; import org.apache.bookkeeper.test.PortManager; @@ -105,8 +106,8 @@ void testForwardAuthData() throws Exception { ProxyConfiguration proxyConfig = new ProxyConfiguration(); proxyConfig.setAuthenticationEnabled(true); - proxyConfig.setServicePort(servicePort); - proxyConfig.setWebServicePort(webServicePort); + proxyConfig.setServicePort(Optional.ofNullable(servicePort)); + proxyConfig.setWebServicePort(Optional.ofNullable(webServicePort)); proxyConfig.setBrokerServiceURL("pulsar://localhost:" + BROKER_PORT); proxyConfig.setBrokerClientAuthenticationPlugin(BasicAuthentication.class.getName()); proxyConfig.setBrokerClientAuthenticationParameters(proxyAuthParams); diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyLookupThrottlingTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyLookupThrottlingTest.java index 9c8ea850c3b6f..a950a7cdafe10 100644 --- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyLookupThrottlingTest.java +++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyLookupThrottlingTest.java @@ -21,6 +21,8 @@ import static org.mockito.Mockito.doReturn; import static org.testng.Assert.assertTrue; +import java.util.Optional; + import org.apache.bookkeeper.test.PortManager; import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest; import org.apache.pulsar.broker.authentication.AuthenticationService; @@ -47,7 +49,7 @@ public class ProxyLookupThrottlingTest extends MockedPulsarServiceBaseTest { protected void setup() throws Exception { internalSetup(); - proxyConfig.setServicePort(PortManager.nextFreePort()); + proxyConfig.setServicePort(Optional.ofNullable(PortManager.nextFreePort())); proxyConfig.setZookeeperServers(DUMMY_VALUE); proxyConfig.setConfigurationStoreServers(DUMMY_VALUE); proxyConfig.setMaxConcurrentLookupRequests(NUM_CONCURRENT_LOOKUP); diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyParserTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyParserTest.java index b9ecd41c90bcf..8f5dfdaf0fef0 100644 --- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyParserTest.java +++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyParserTest.java @@ -66,7 +66,7 @@ protected void setup() throws Exception { - proxyConfig.setServicePort(PortManager.nextFreePort()); + proxyConfig.setServicePort(Optional.of(PortManager.nextFreePort())); proxyConfig.setZookeeperServers(DUMMY_VALUE); proxyConfig.setConfigurationStoreServers(DUMMY_VALUE); //enable full parsing feature diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyRolesEnforcementTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyRolesEnforcementTest.java index 7d93ca6d0d76f..cd388f3093444 100644 --- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyRolesEnforcementTest.java +++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyRolesEnforcementTest.java @@ -26,6 +26,7 @@ import java.util.HashMap; import java.util.HashSet; import java.util.Map; +import java.util.Optional; import java.util.Map.Entry; import java.util.Set; @@ -213,8 +214,8 @@ void testIncorrectRoles() throws Exception { ProxyConfiguration proxyConfig = new ProxyConfiguration(); proxyConfig.setAuthenticationEnabled(true); - proxyConfig.setServicePort(servicePort); - proxyConfig.setWebServicePort(webServicePort); + proxyConfig.setServicePort(Optional.ofNullable(servicePort)); + proxyConfig.setWebServicePort(Optional.ofNullable(webServicePort)); proxyConfig.setBrokerServiceURL("pulsar://localhost:" + BROKER_PORT); proxyConfig.setBrokerClientAuthenticationPlugin(BasicAuthentication.class.getName()); diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyTest.java index 94cb83ece07ac..42a990e5824ff 100644 --- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyTest.java +++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyTest.java @@ -25,6 +25,7 @@ import static org.testng.Assert.assertEquals; import java.util.concurrent.ExecutionException; +import java.util.Optional; import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; @@ -90,7 +91,7 @@ public static class Foo { protected void setup() throws Exception { internalSetup(); - proxyConfig.setServicePort(PortManager.nextFreePort()); + proxyConfig.setServicePort(Optional.ofNullable(PortManager.nextFreePort())); proxyConfig.setZookeeperServers(DUMMY_VALUE); proxyConfig.setConfigurationStoreServers(DUMMY_VALUE); diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyTlsTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyTlsTest.java index 2bf544d92c7b3..4710a4ba558a0 100644 --- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyTlsTest.java +++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyTlsTest.java @@ -21,6 +21,7 @@ import static com.google.common.base.Preconditions.checkNotNull; import static org.mockito.Mockito.doReturn; +import java.util.Optional; import java.util.concurrent.TimeUnit; import org.apache.bookkeeper.test.PortManager; @@ -54,10 +55,10 @@ public class ProxyTlsTest extends MockedPulsarServiceBaseTest { protected void setup() throws Exception { internalSetup(); - proxyConfig.setServicePort(PortManager.nextFreePort()); - proxyConfig.setServicePortTls(PortManager.nextFreePort()); - proxyConfig.setWebServicePort(PortManager.nextFreePort()); - proxyConfig.setWebServicePortTls(PortManager.nextFreePort()); + proxyConfig.setServicePort(Optional.ofNullable(PortManager.nextFreePort())); + proxyConfig.setServicePortTls(Optional.ofNullable(PortManager.nextFreePort())); + proxyConfig.setWebServicePort(Optional.ofNullable(PortManager.nextFreePort())); + proxyConfig.setWebServicePortTls(Optional.ofNullable(PortManager.nextFreePort())); proxyConfig.setTlsEnabledWithBroker(false); proxyConfig.setTlsCertificateFilePath(TLS_PROXY_CERT_FILE_PATH); proxyConfig.setTlsKeyFilePath(TLS_PROXY_KEY_FILE_PATH); diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithAuthorizationNegTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithAuthorizationNegTest.java index d1df9461091c9..6224f520c04b4 100644 --- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithAuthorizationNegTest.java +++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithAuthorizationNegTest.java @@ -22,6 +22,7 @@ import java.util.HashSet; import java.util.Map; +import java.util.Optional; import java.util.Set; import java.util.concurrent.TimeUnit; @@ -79,8 +80,8 @@ protected void setup() throws Exception { conf.setAuthenticationEnabled(true); conf.setAuthorizationEnabled(true); - conf.setBrokerServicePortTls(BROKER_PORT_TLS); - conf.setWebServicePortTls(BROKER_WEBSERVICE_PORT_TLS); + conf.setBrokerServicePortTls(Optional.ofNullable(BROKER_PORT_TLS)); + conf.setWebServicePortTls(Optional.ofNullable(BROKER_WEBSERVICE_PORT_TLS)); conf.setTlsTrustCertsFilePath(TLS_PROXY_TRUST_CERT_FILE_PATH); conf.setTlsCertificateFilePath(TLS_BROKER_CERT_FILE_PATH); conf.setTlsKeyFilePath(TLS_BROKER_KEY_FILE_PATH); @@ -108,10 +109,10 @@ protected void setup() throws Exception { proxyConfig.setBrokerServiceURL("pulsar://localhost:" + BROKER_PORT); proxyConfig.setBrokerServiceURLTLS("pulsar://localhost:" + BROKER_PORT_TLS); - proxyConfig.setServicePort(PortManager.nextFreePort()); - proxyConfig.setServicePortTls(PortManager.nextFreePort()); - proxyConfig.setWebServicePort(PortManager.nextFreePort()); - proxyConfig.setWebServicePortTls(PortManager.nextFreePort()); + proxyConfig.setServicePort(Optional.ofNullable(PortManager.nextFreePort())); + proxyConfig.setServicePortTls(Optional.ofNullable(PortManager.nextFreePort())); + proxyConfig.setWebServicePort(Optional.ofNullable(PortManager.nextFreePort())); + proxyConfig.setWebServicePortTls(Optional.ofNullable(PortManager.nextFreePort())); proxyConfig.setTlsEnabledWithBroker(true); // enable tls and auth&auth at proxy diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithAuthorizationTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithAuthorizationTest.java index dd1da64e8e2d9..0ec4fd3b3e644 100644 --- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithAuthorizationTest.java +++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithAuthorizationTest.java @@ -22,6 +22,7 @@ import java.util.HashSet; import java.util.Map; +import java.util.Optional; import java.util.Set; import java.util.concurrent.TimeUnit; @@ -143,8 +144,8 @@ protected void setup() throws Exception { conf.setAuthenticationEnabled(true); conf.setAuthorizationEnabled(true); - conf.setBrokerServicePortTls(BROKER_PORT_TLS); - conf.setWebServicePortTls(BROKER_WEBSERVICE_PORT_TLS); + conf.setBrokerServicePortTls(Optional.ofNullable(BROKER_PORT_TLS)); + conf.setWebServicePortTls(Optional.ofNullable(BROKER_WEBSERVICE_PORT_TLS)); conf.setTlsTrustCertsFilePath(TLS_PROXY_TRUST_CERT_FILE_PATH); conf.setTlsCertificateFilePath(TLS_BROKER_CERT_FILE_PATH); conf.setTlsKeyFilePath(TLS_BROKER_KEY_FILE_PATH); @@ -172,10 +173,10 @@ protected void setup() throws Exception { proxyConfig.setBrokerServiceURL("pulsar://localhost:" + BROKER_PORT); proxyConfig.setBrokerServiceURLTLS("pulsar://localhost:" + BROKER_PORT_TLS); - proxyConfig.setServicePort(PortManager.nextFreePort()); - proxyConfig.setServicePortTls(PortManager.nextFreePort()); - proxyConfig.setWebServicePort(PortManager.nextFreePort()); - proxyConfig.setWebServicePortTls(PortManager.nextFreePort()); + proxyConfig.setServicePort(Optional.ofNullable(PortManager.nextFreePort())); + proxyConfig.setServicePortTls(Optional.ofNullable(PortManager.nextFreePort())); + proxyConfig.setWebServicePort(Optional.ofNullable(PortManager.nextFreePort())); + proxyConfig.setWebServicePortTls(Optional.ofNullable(PortManager.nextFreePort())); proxyConfig.setTlsEnabledWithBroker(true); // enable tls and auth&auth at proxy @@ -390,10 +391,10 @@ public void tlsCiphersAndProtocols(Set tlsCiphers, Set tlsProtoc proxyConfig.setBrokerServiceURL("pulsar://localhost:" + BROKER_PORT); proxyConfig.setBrokerServiceURLTLS("pulsar://localhost:" + BROKER_PORT_TLS); - proxyConfig.setServicePort(PortManager.nextFreePort()); - proxyConfig.setServicePortTls(PortManager.nextFreePort()); - proxyConfig.setWebServicePort(PortManager.nextFreePort()); - proxyConfig.setWebServicePortTls(PortManager.nextFreePort()); + proxyConfig.setServicePort(Optional.ofNullable(PortManager.nextFreePort())); + proxyConfig.setServicePortTls(Optional.ofNullable(PortManager.nextFreePort())); + proxyConfig.setWebServicePort(Optional.ofNullable(PortManager.nextFreePort())); + proxyConfig.setWebServicePortTls(Optional.ofNullable(PortManager.nextFreePort())); proxyConfig.setTlsEnabledWithBroker(true); // enable tls and auth&auth at proxy diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithoutServiceDiscoveryTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithoutServiceDiscoveryTest.java index 78d0f0e52b330..211d8e8752f86 100644 --- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithoutServiceDiscoveryTest.java +++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithoutServiceDiscoveryTest.java @@ -22,6 +22,7 @@ import java.util.HashSet; import java.util.Map; +import java.util.Optional; import java.util.Set; import java.util.concurrent.TimeUnit; @@ -71,8 +72,8 @@ protected void setup() throws Exception { conf.setAuthenticationEnabled(true); conf.setAuthorizationEnabled(false); - conf.setBrokerServicePortTls(BROKER_PORT_TLS); - conf.setWebServicePortTls(BROKER_WEBSERVICE_PORT_TLS); + conf.setBrokerServicePortTls(Optional.ofNullable(BROKER_PORT_TLS)); + conf.setWebServicePortTls(Optional.ofNullable(BROKER_WEBSERVICE_PORT_TLS)); conf.setTlsTrustCertsFilePath(TLS_TRUST_CERT_FILE_PATH); conf.setTlsCertificateFilePath(TLS_SERVER_CERT_FILE_PATH); conf.setTlsKeyFilePath(TLS_SERVER_KEY_FILE_PATH); @@ -100,10 +101,10 @@ protected void setup() throws Exception { proxyConfig.setBrokerServiceURL("pulsar://localhost:" + BROKER_PORT); proxyConfig.setBrokerServiceURLTLS("pulsar://localhost:" + BROKER_PORT_TLS); - proxyConfig.setServicePort(PortManager.nextFreePort()); - proxyConfig.setServicePortTls(PortManager.nextFreePort()); - proxyConfig.setWebServicePort(PortManager.nextFreePort()); - proxyConfig.setWebServicePortTls(PortManager.nextFreePort()); + proxyConfig.setServicePort(Optional.ofNullable(PortManager.nextFreePort())); + proxyConfig.setServicePortTls(Optional.ofNullable(PortManager.nextFreePort())); + proxyConfig.setWebServicePort(Optional.ofNullable(PortManager.nextFreePort())); + proxyConfig.setWebServicePortTls(Optional.ofNullable(PortManager.nextFreePort())); proxyConfig.setTlsEnabledWithBroker(true); // enable tls and auth&auth at proxy diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/SuperUserAuthedAdminProxyHandlerTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/SuperUserAuthedAdminProxyHandlerTest.java index 0f4c4dd789169..ace09f31ac27b 100644 --- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/SuperUserAuthedAdminProxyHandlerTest.java +++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/SuperUserAuthedAdminProxyHandlerTest.java @@ -21,6 +21,8 @@ import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.spy; +import java.util.Optional; + import com.google.common.collect.ImmutableSet; import com.google.common.collect.ImmutableMap; @@ -65,8 +67,8 @@ protected void setup() throws Exception { conf.setAuthenticationEnabled(true); conf.setAuthorizationEnabled(true); - conf.setBrokerServicePortTls(BROKER_PORT_TLS); - conf.setWebServicePortTls(BROKER_WEBSERVICE_PORT_TLS); + conf.setBrokerServicePortTls(Optional.ofNullable(BROKER_PORT_TLS)); + conf.setWebServicePortTls(Optional.ofNullable(BROKER_WEBSERVICE_PORT_TLS)); conf.setTlsTrustCertsFilePath(getTlsFile("ca.cert")); conf.setTlsCertificateFilePath(getTlsFile("broker.cert")); conf.setTlsKeyFilePath(getTlsFile("broker.key-pk8")); @@ -80,10 +82,10 @@ protected void setup() throws Exception { // start proxy service proxyConfig.setAuthenticationEnabled(true); proxyConfig.setAuthorizationEnabled(true); - proxyConfig.setServicePort(PortManager.nextFreePort()); - proxyConfig.setServicePortTls(PortManager.nextFreePort()); - proxyConfig.setWebServicePort(PortManager.nextFreePort()); - proxyConfig.setWebServicePortTls(PortManager.nextFreePort()); + proxyConfig.setServicePort(Optional.ofNullable(PortManager.nextFreePort())); + proxyConfig.setServicePortTls(Optional.ofNullable(PortManager.nextFreePort())); + proxyConfig.setWebServicePort(Optional.ofNullable(PortManager.nextFreePort())); + proxyConfig.setWebServicePortTls(Optional.ofNullable(PortManager.nextFreePort())); proxyConfig.setTlsEnabledWithBroker(true); // enable tls and auth&auth at proxy diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/UnauthedAdminProxyHandlerTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/UnauthedAdminProxyHandlerTest.java index 492300538aac9..8bc937883ac41 100644 --- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/UnauthedAdminProxyHandlerTest.java +++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/UnauthedAdminProxyHandlerTest.java @@ -22,6 +22,7 @@ import java.util.HashSet; import java.util.List; +import java.util.Optional; import java.util.Set; import javax.servlet.http.HttpServletRequest; @@ -65,8 +66,8 @@ protected void setup() throws Exception { super.init(); // start proxy service - proxyConfig.setServicePort(PortManager.nextFreePort()); - proxyConfig.setWebServicePort(PortManager.nextFreePort()); + proxyConfig.setServicePort(Optional.ofNullable(PortManager.nextFreePort())); + proxyConfig.setWebServicePort(Optional.ofNullable(PortManager.nextFreePort())); proxyConfig.setBrokerWebServiceURL(brokerUrl.toString()); proxyConfig.setStatusFilePath(STATUS_FILE_PATH); proxyConfig.setZookeeperServers(DUMMY_VALUE); diff --git a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/service/WebSocketProxyConfiguration.java b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/service/WebSocketProxyConfiguration.java index 8b212d88dc8ef..f550e76ac1043 100644 --- a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/service/WebSocketProxyConfiguration.java +++ b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/service/WebSocketProxyConfiguration.java @@ -59,9 +59,9 @@ public class WebSocketProxyConfiguration implements PulsarConfiguration { private long zooKeeperSessionTimeoutMillis = 30000; // Port to use to server HTTP request - private Integer webServicePort = 8080; + private Optional webServicePort = Optional.of(8080); // Port to use to server HTTPS request - private Integer webServicePortTls; + private Optional webServicePortTls = Optional.empty(); // Hostname or IP address the service binds on, default is 0.0.0.0. private String bindAddress; // --- Authentication --- @@ -199,18 +199,18 @@ public void setZooKeeperSessionTimeoutMillis(long zooKeeperSessionTimeoutMillis) } public Optional getWebServicePort() { - return Optional.ofNullable(webServicePort); + return webServicePort; } - public void setWebServicePort(int webServicePort) { + public void setWebServicePort(Optional webServicePort) { this.webServicePort = webServicePort; } public Optional getWebServicePortTls() { - return Optional.ofNullable(webServicePortTls); + return webServicePortTls; } - public void setWebServicePortTls(int webServicePortTls) { + public void setWebServicePortTls(Optional webServicePortTls) { this.webServicePortTls = webServicePortTls; }