Skip to content

Commit

Permalink
[pulsar-common] add option that field can be empty (apache#3543)
Browse files Browse the repository at this point in the history
Revert "[pulsar-common] add option that field can be empty"

This reverts commit e1e62bbaf0448867d9ab6af1efb23a46e4de947a.

set optional param

fix tests

fix configs

fix test

fix test
  • Loading branch information
rdhabalia authored May 20, 2019
1 parent 6e51237 commit 93d3c5e
Show file tree
Hide file tree
Showing 61 changed files with 412 additions and 299 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import java.nio.file.Files;
import java.util.HashSet;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -225,8 +226,8 @@ void testAuthentication() throws Exception {

ProxyConfiguration proxyConfig = new ProxyConfiguration();
proxyConfig.setAuthenticationEnabled(true);
proxyConfig.setServicePort(servicePort);
proxyConfig.setWebServicePort(webServicePort);
proxyConfig.setServicePort(Optional.of(servicePort));
proxyConfig.setWebServicePort(Optional.of(webServicePort));
proxyConfig.setBrokerServiceURL("pulsar://localhost:" + BROKER_PORT);
proxyConfig.setSaslJaasClientAllowedIds(".*" + localHostname + ".*");
proxyConfig.setSaslJaasServerSectionName("PulsarProxy");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,22 +109,22 @@ public class ServiceConfiguration implements PulsarConfiguration {
category = CATEGORY_SERVER,
doc = "The port for serving binary protobuf requests"
)
private Integer brokerServicePort = 6650;
private Optional<Integer> brokerServicePort = Optional.of(6650);
@FieldContext(
category = CATEGORY_SERVER,
doc = "The port for serving tls secured binary protobuf requests"
)
private Integer brokerServicePortTls = null;
private Optional<Integer> brokerServicePortTls = Optional.empty();
@FieldContext(
category = CATEGORY_SERVER,
doc = "The port for serving http requests"
)
private Integer webServicePort = 8080;
private Optional<Integer> webServicePort = Optional.of(8080);
@FieldContext(
category = CATEGORY_SERVER,
doc = "The port for serving https requests"
)
private Integer webServicePortTls = null;
private Optional<Integer> webServicePortTls = Optional.empty();

@FieldContext(
category = CATEGORY_SERVER,
Expand Down Expand Up @@ -1274,18 +1274,18 @@ public int getBookkeeperHealthCheckIntervalSec() {
}

public Optional<Integer> getBrokerServicePort() {
return Optional.ofNullable(brokerServicePort);
return brokerServicePort;
}

public Optional<Integer> getBrokerServicePortTls() {
return Optional.ofNullable(brokerServicePortTls);
return brokerServicePortTls;
}

public Optional<Integer> getWebServicePort() {
return Optional.ofNullable(webServicePort);
return webServicePort;
}

public Optional<Integer> getWebServicePortTls() {
return Optional.ofNullable(webServicePortTls);
return webServicePortTls;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import static org.apache.pulsar.common.configuration.PulsarConfigurationLoader.isComplete;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;
Expand All @@ -32,6 +33,7 @@
import java.io.InputStream;
import java.io.OutputStreamWriter;
import java.io.PrintWriter;
import java.util.Optional;
import java.util.Properties;

import org.apache.bookkeeper.client.api.DigestType;
Expand All @@ -44,10 +46,10 @@ public class MockConfiguration implements PulsarConfiguration {

private String zookeeperServers = "localhost:2181";
private String configurationStoreServers = "localhost:2184";
private int brokerServicePort = 7650;
private int brokerServicePortTls = 7651;
private int webServicePort = 9080;
private int webServicePortTls = 9443;
private Optional<Integer> brokerServicePort = Optional.of(7650);
private Optional<Integer> brokerServicePortTls = Optional.of(7651);
private Optional<Integer> webServicePort = Optional.of(9080);
private Optional<Integer> webServicePortTls = Optional.of(9443);
private int notExistFieldInServiceConfig = 0;

@Override
Expand Down Expand Up @@ -102,6 +104,9 @@ public void testPulsarConfiguraitonLoadingStream() throws Exception {
printWriter.println("brokerClientAuthenticationParameters=role:my-role");
printWriter.println("superUserRoles=appid1,appid2");
printWriter.println("brokerServicePort=7777");
printWriter.println("brokerServicePortTls=8777");
printWriter.println("webServicePort=");
printWriter.println("webServicePortTls=");
printWriter.println("managedLedgerDefaultMarkDeleteRateLimit=5.0");
printWriter.println("managedLedgerDigestType=CRC32C");
printWriter.println("managedLedgerCacheSizeMB=");
Expand All @@ -116,6 +121,9 @@ public void testPulsarConfiguraitonLoadingStream() throws Exception {
assertEquals(serviceConfig.getClusterName(), "usc");
assertEquals(serviceConfig.getBrokerClientAuthenticationParameters(), "role:my-role");
assertEquals(serviceConfig.getBrokerServicePort().get(), new Integer(7777));
assertEquals(serviceConfig.getBrokerServicePortTls().get(), new Integer(8777));
assertFalse(serviceConfig.getWebServicePort().isPresent());
assertFalse(serviceConfig.getWebServicePortTls().isPresent());
assertEquals(serviceConfig.getManagedLedgerDigestType(), DigestType.CRC32C);
assertTrue(serviceConfig.getManagedLedgerCacheSizeMB() > 0);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
Expand Down Expand Up @@ -81,12 +82,12 @@ void setup() throws Exception {
brokerNativeBrokerPorts[i] = PortManager.nextFreePort();

ServiceConfiguration config = new ServiceConfiguration();
config.setBrokerServicePort(brokerNativeBrokerPorts[i]);
config.setBrokerServicePort(Optional.ofNullable(brokerNativeBrokerPorts[i]));
config.setClusterName("my-cluster");
config.setAdvertisedAddress("localhost");
config.setWebServicePort(brokerWebServicePorts[i]);
config.setWebServicePort(Optional.ofNullable(brokerWebServicePorts[i]));
config.setZookeeperServers("127.0.0.1" + ":" + ZOOKEEPER_PORT);
config.setBrokerServicePort(brokerNativeBrokerPorts[i]);
config.setBrokerServicePort(Optional.ofNullable(brokerNativeBrokerPorts[i]));
config.setDefaultNumberOfNamespaceBundles(1);
config.setLoadBalancerEnabled(false);
configurations[i] = config;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,8 +140,8 @@ public class AdminApiTest extends MockedPulsarServiceBaseTest {
@Override
public void setup() throws Exception {
conf.setLoadBalancerEnabled(true);
conf.setBrokerServicePortTls(BROKER_PORT_TLS);
conf.setWebServicePortTls(BROKER_WEBSERVICE_PORT_TLS);
conf.setBrokerServicePortTls(Optional.of(BROKER_PORT_TLS));
conf.setWebServicePortTls(Optional.of(BROKER_WEBSERVICE_PORT_TLS));
conf.setTlsCertificateFilePath(TLS_SERVER_CERT_FILE_PATH);
conf.setTlsKeyFilePath(TLS_SERVER_KEY_FILE_PATH);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,18 @@
package org.apache.pulsar.broker.admin;

import com.google.common.collect.ImmutableSet;

import java.util.List;
import java.util.Optional;
import java.security.cert.X509Certificate;
import javax.net.ssl.SSLContext;
import javax.ws.rs.NotAuthorizedException;
import javax.ws.rs.client.Client;
import javax.ws.rs.client.ClientBuilder;
import javax.ws.rs.client.WebTarget;
import javax.ws.rs.core.GenericType;
import javax.ws.rs.core.MediaType;

import lombok.extern.slf4j.Slf4j;
import org.apache.http.conn.ssl.NoopHostnameVerifier;
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
Expand All @@ -41,16 +53,6 @@
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

import javax.net.ssl.SSLContext;
import javax.ws.rs.NotAuthorizedException;
import javax.ws.rs.client.Client;
import javax.ws.rs.client.ClientBuilder;
import javax.ws.rs.client.WebTarget;
import javax.ws.rs.core.GenericType;
import javax.ws.rs.core.MediaType;
import java.security.cert.X509Certificate;
import java.util.List;

@Slf4j
public class AdminApiTlsAuthTest extends MockedPulsarServiceBaseTest {

Expand All @@ -62,8 +64,8 @@ private static String getTLSFile(String name) {
@Override
public void setup() throws Exception {
conf.setLoadBalancerEnabled(true);
conf.setBrokerServicePortTls(BROKER_PORT_TLS);
conf.setWebServicePortTls(BROKER_WEBSERVICE_PORT_TLS);
conf.setBrokerServicePortTls(Optional.of(BROKER_PORT_TLS));
conf.setWebServicePortTls(Optional.of(BROKER_WEBSERVICE_PORT_TLS));
conf.setTlsCertificateFilePath(getTLSFile("broker.cert"));
conf.setTlsKeyFilePath(getTLSFile("broker.key-pk8"));
conf.setTlsTrustCertsFilePath(getTLSFile("ca.cert"));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,12 @@
package org.apache.pulsar.broker.admin;

import com.google.common.collect.ImmutableSet;

import java.util.Optional;

import static org.testng.Assert.fail;

import java.lang.reflect.Method;
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.test.PortManager;
import org.apache.pulsar.broker.PulsarService;
Expand All @@ -35,10 +41,6 @@
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

import java.lang.reflect.Method;

import static org.testng.Assert.fail;

@Slf4j
public class BrokerAdminClientTlsAuthTest extends MockedPulsarServiceBaseTest {
protected String methodName;
Expand All @@ -55,8 +57,8 @@ private static String getTLSFile(String name) {
@BeforeMethod
@Override
public void setup() throws Exception {
conf.setBrokerServicePortTls(BROKER_PORT_TLS);
conf.setWebServicePortTls(BROKER_WEBSERVICE_PORT_TLS);
conf.setBrokerServicePortTls(Optional.of(BROKER_PORT_TLS));
conf.setWebServicePortTls(Optional.of(BROKER_WEBSERVICE_PORT_TLS));
buildConf(conf);
super.internalSetup();
}
Expand Down Expand Up @@ -109,10 +111,10 @@ public void testPersistentList() throws Exception {

/***** Start Broker 2 ******/
ServiceConfiguration conf = new ServiceConfiguration();
conf.setBrokerServicePort(PortManager.nextFreePort());
conf.setBrokerServicePortTls(PortManager.nextFreePort());
conf.setWebServicePort(PortManager.nextFreePort());
conf.setWebServicePortTls(PortManager.nextFreePort());
conf.setBrokerServicePort(Optional.ofNullable(PortManager.nextFreePort()));
conf.setBrokerServicePortTls(Optional.ofNullable(PortManager.nextFreePort()));
conf.setWebServicePort(Optional.ofNullable(PortManager.nextFreePort()));
conf.setWebServicePortTls(Optional.ofNullable(PortManager.nextFreePort()));
conf.setAdvertisedAddress("localhost");
conf.setClusterName(this.conf.getClusterName());
conf.setZookeeperServers("localhost:2181");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -139,8 +139,8 @@ public class V1_AdminApiTest extends MockedPulsarServiceBaseTest {
@Override
public void setup() throws Exception {
conf.setLoadBalancerEnabled(true);
conf.setBrokerServicePortTls(BROKER_PORT_TLS);
conf.setWebServicePortTls(BROKER_WEBSERVICE_PORT_TLS);
conf.setBrokerServicePortTls(Optional.of(BROKER_PORT_TLS));
conf.setWebServicePortTls(Optional.of(BROKER_WEBSERVICE_PORT_TLS));
conf.setTlsCertificateFilePath(TLS_SERVER_CERT_FILE_PATH);
conf.setTlsKeyFilePath(TLS_SERVER_KEY_FILE_PATH);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import java.net.URL;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
Expand Down Expand Up @@ -91,8 +92,10 @@ public MockedPulsarServiceBaseTest() {

protected void resetConfig() {
this.conf = new ServiceConfiguration();
this.conf.setBrokerServicePort(BROKER_PORT);
this.conf.setWebServicePort(BROKER_WEBSERVICE_PORT);
this.conf.setAdvertisedAddress("localhost");
this.conf.setBrokerServicePort(Optional.ofNullable(BROKER_PORT));
this.conf.setAdvertisedAddress("localhost");
this.conf.setWebServicePort(Optional.ofNullable(BROKER_WEBSERVICE_PORT));
this.conf.setClusterName(configClusterName);
this.conf.setAdvertisedAddress("localhost"); // there are TLS tests in here, they need to use localhost because of the certificate
this.conf.setManagedLedgerCacheSizeMB(8);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import java.lang.reflect.Field;
import java.net.URL;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
Expand Down Expand Up @@ -119,9 +120,9 @@ void setup() throws Exception {
ServiceConfiguration config1 = new ServiceConfiguration();
config1.setLoadManagerClassName(ModularLoadManagerImpl.class.getName());
config1.setClusterName("use");
config1.setWebServicePort(PRIMARY_BROKER_WEBSERVICE_PORT);
config1.setWebServicePort(Optional.ofNullable(PRIMARY_BROKER_WEBSERVICE_PORT));
config1.setZookeeperServers("127.0.0.1" + ":" + ZOOKEEPER_PORT);
config1.setBrokerServicePort(PRIMARY_BROKER_PORT);
config1.setBrokerServicePort(Optional.ofNullable(PRIMARY_BROKER_PORT));
config1.setFailureDomainsEnabled(true);
config1.setLoadBalancerEnabled(true);
config1.setAdvertisedAddress("localhost");
Expand All @@ -138,9 +139,9 @@ void setup() throws Exception {
ServiceConfiguration config2 = new ServiceConfiguration();
config2.setLoadManagerClassName(ModularLoadManagerImpl.class.getName());
config2.setClusterName("use");
config2.setWebServicePort(SECONDARY_BROKER_WEBSERVICE_PORT);
config2.setWebServicePort(Optional.ofNullable(SECONDARY_BROKER_WEBSERVICE_PORT));
config2.setZookeeperServers("127.0.0.1" + ":" + ZOOKEEPER_PORT);
config2.setBrokerServicePort(SECONDARY_BROKER_PORT);
config2.setBrokerServicePort(Optional.ofNullable(SECONDARY_BROKER_PORT));
config2.setFailureDomainsEnabled(true);
pulsar2 = new PulsarService(config2);
secondaryHost = String.format("%s:%d", "localhost",
Expand Down
Loading

0 comments on commit 93d3c5e

Please sign in to comment.