Skip to content

Commit

Permalink
Enforce the validation of Pulsar configuration (apache#1084)
Browse files Browse the repository at this point in the history
* Enforce the validation of Pulsar configuration

* Fixed isEmpty logic

* Some tests fixes
  • Loading branch information
merlimat authored and hrsakai committed Jan 19, 2018
1 parent ee1b24e commit b5392be
Show file tree
Hide file tree
Showing 6 changed files with 55 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,6 @@
*/
package org.apache.pulsar.common.configuration;

import org.apache.pulsar.broker.ServiceConfiguration;

import static com.google.common.base.Preconditions.checkNotNull;
import static org.apache.pulsar.common.util.FieldParser.update;

Expand All @@ -31,16 +29,21 @@
import java.util.Map;
import java.util.Properties;

import org.apache.commons.lang.StringUtils;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Loads ServiceConfiguration with properties
*
*
*
*/
public class PulsarConfigurationLoader {

/**
* Creates PulsarConfiguration and loads it with populated attribute values loaded from provided property file.
*
*
* @param configFile
* @throws IOException
* @throws IllegalArgumentException
Expand All @@ -54,7 +57,7 @@ public static <T extends PulsarConfiguration> T create(String configFile,
/**
* Creates PulsarConfiguration and loads it with populated attribute values loaded from provided inputstream
* property file.
*
*
* @param inStream
* @throws IOException
* if an error occurred when reading from the input stream.
Expand Down Expand Up @@ -94,26 +97,38 @@ protected static <T extends PulsarConfiguration> T create(Properties properties,
* Validates {@link FieldContext} annotation on each field of the class element. If element is annotated required
* and value of the element is null or number value is not in a provided (min,max) range then consider as incomplete
* object and throws exception with incomplete parameters
*
*
* @param object
* @return
* @throws IllegalArgumentException
* if object is field values are not completed according to {@link FieldContext} constraints.
* @throws IllegalAccessException
*/
public static boolean isComplete(Object obj) throws IllegalArgumentException, IllegalAccessException {
public static boolean isComplete(Object obj) throws IllegalArgumentException {
checkNotNull(obj);
Field[] fields = obj.getClass().getDeclaredFields();
StringBuilder error = new StringBuilder();
for (Field field : fields) {
if (field.isAnnotationPresent(FieldContext.class)) {
field.setAccessible(true);
Object value = field.get(obj);
Object value;

try {
value = field.get(obj);
} catch (IllegalAccessException e) {
throw new RuntimeException(e);
}

if (log.isDebugEnabled()) {
log.debug("Validating configuration field '{}' = '{}'", field.getName(), value);
}
boolean isRequired = ((FieldContext) field.getAnnotation(FieldContext.class)).required();
long minValue = ((FieldContext) field.getAnnotation(FieldContext.class)).minValue();
long maxValue = ((FieldContext) field.getAnnotation(FieldContext.class)).maxValue();
if (isRequired && value == null)
if (isRequired && isEmpty(value)) {
error.append(String.format("Required %s is null,", field.getName()));
}

if (value != null && Number.class.isAssignableFrom(value.getClass())) {
long fieldVal = ((Number) value).longValue();
boolean valid = fieldVal >= minValue && fieldVal <= maxValue;
Expand All @@ -130,6 +145,16 @@ public static boolean isComplete(Object obj) throws IllegalArgumentException, Il
return true;
}

private static boolean isEmpty(Object obj) {
if (obj == null) {
return true;
} else if (obj instanceof String) {
return StringUtils.isBlank((String) obj);
} else {
return false;
}
}

/**
* Converts a PulsarConfiguration object to a ServiceConfiguration object.
*
Expand Down Expand Up @@ -170,4 +195,5 @@ public static ServiceConfiguration convertFrom(PulsarConfiguration conf) throws
return convertFrom(conf, true);
}

private static final Logger log = LoggerFactory.getLogger(PulsarConfigurationLoader.class);
}
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ public PulsarStandaloneStarter(String[] args) throws Exception {
}

this.config = PulsarConfigurationLoader.create((new FileInputStream(configFile)), ServiceConfiguration.class);
PulsarConfigurationLoader.isComplete(config);

// Set ZK server's host to localhost
config.setZookeeperServers("127.0.0.1:" + zkPort);
config.setGlobalZookeeperServers("127.0.0.1:" + zkPort);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
import org.apache.pulsar.broker.web.WebService;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.util.FutureUtil;
import org.apache.pulsar.common.configuration.PulsarConfigurationLoader;
import org.apache.pulsar.common.naming.DestinationName;
import org.apache.pulsar.common.naming.NamespaceBundle;
import org.apache.pulsar.common.naming.NamespaceName;
Expand Down Expand Up @@ -130,6 +131,9 @@ public enum State {
private final Condition isClosedCondition = mutex.newCondition();

public PulsarService(ServiceConfiguration config) {
// Validate correctness of configuration
PulsarConfigurationLoader.isComplete(config);

state = State.Init;
this.bindAddress = ServiceConfigurationUtils.getDefaultOrConfiguredAddress(config.getBindAddress());
this.advertisedAddress = advertisedAddress(config);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,8 @@ protected void resetConfig() {
this.conf.setManagedLedgerCacheSizeMB(8);
this.conf.setActiveConsumerFailoverDelayTimeMillis(0);
this.conf.setDefaultNumberOfNamespaceBundles(1);
this.conf.setZookeeperServers("localhost:2181");
this.conf.setClusterName("mock");
}

protected final void internalSetup() throws Exception {
Expand Down Expand Up @@ -255,6 +257,6 @@ public static void retryStrategically(Predicate<Void> predicate, int retryCount,
Thread.sleep(intSleepTime + (intSleepTime * i));
}
}

private static final Logger log = LoggerFactory.getLogger(MockedPulsarServiceBaseTest.class);
}
Original file line number Diff line number Diff line change
Expand Up @@ -257,6 +257,7 @@ private void setupEnv(boolean enableFilter, String minApiVersion, boolean allowU
config.setTlsTrustCertsFilePath(allowInsecure ? "" : TLS_CLIENT_CERT_FILE_PATH);
config.setClusterName("local");
config.setAdvertisedAddress("localhost"); // TLS certificate expects localhost
config.setZookeeperServers("localhost:2181");
pulsar = spy(new PulsarService(config));
doReturn(new MockedZooKeeperClientFactoryImpl()).when(pulsar).getZooKeeperClientFactory();
pulsar.start();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,7 @@ public void testMultipleBrokerLookup() throws Exception {
conf2.setWebServicePortTls(PortManager.nextFreePort());
conf2.setAdvertisedAddress("localhost");
conf2.setClusterName(conf.getClusterName());
conf2.setZookeeperServers("localhost:2181");
PulsarService pulsar2 = startBroker(conf2);
pulsar.getLoadManager().get().writeLoadReportOnZookeeper();
pulsar2.getLoadManager().get().writeLoadReportOnZookeeper();
Expand Down Expand Up @@ -227,6 +228,7 @@ public void testMultipleBrokerDifferentClusterLookup() throws Exception {
conf2.setWebServicePortTls(PortManager.nextFreePort());
conf2.setAdvertisedAddress("localhost");
conf2.setClusterName(newCluster); // Broker2 serves newCluster
conf2.setZookeeperServers("localhost:2181");
String broker2ServiceUrl = "pulsar://localhost:" + conf2.getBrokerServicePort();

admin.clusters().createCluster(newCluster, new ClusterData("http://127.0.0.1:" + BROKER_WEBSERVICE_PORT, null, broker2ServiceUrl, null));
Expand Down Expand Up @@ -319,6 +321,7 @@ public void testPartitionTopicLookup() throws Exception {
conf2.setWebServicePortTls(PortManager.nextFreePort());
conf2.setAdvertisedAddress("localhost");
conf2.setClusterName(pulsar.getConfiguration().getClusterName());
conf2.setZookeeperServers("localhost:2181");
PulsarService pulsar2 = startBroker(conf2);
pulsar.getLoadManager().get().writeLoadReportOnZookeeper();
pulsar2.getLoadManager().get().writeLoadReportOnZookeeper();
Expand Down Expand Up @@ -397,6 +400,7 @@ public void testWebserviceServiceTls() throws Exception {
conf2.setTlsCertificateFilePath(TLS_SERVER_CERT_FILE_PATH);
conf2.setTlsKeyFilePath(TLS_SERVER_KEY_FILE_PATH);
conf2.setClusterName(conf.getClusterName());
conf2.setZookeeperServers("localhost:2181");
PulsarService pulsar2 = startBroker(conf2);

// restart broker1 with tls enabled
Expand Down Expand Up @@ -789,6 +793,7 @@ public void testSplitUnloadLookupTest() throws Exception {
conf2.setWebServicePortTls(PortManager.nextFreePort());
conf2.setAdvertisedAddress("localhost");
conf2.setClusterName(conf.getClusterName());
conf2.setZookeeperServers("localhost:2181");
PulsarService pulsar2 = startBroker(conf2);
pulsar.getLoadManager().get().writeLoadReportOnZookeeper();
pulsar2.getLoadManager().get().writeLoadReportOnZookeeper();
Expand Down Expand Up @@ -856,23 +861,23 @@ public void testSplitUnloadLookupTest() throws Exception {
pulsar2.close();

}

/**
*
*
* <pre>
* When broker-1's Modular-load-manager splits the bundle and update local-policies, broker-2 should get watch of
* local-policies and update bundleCache so, new lookup can be redirected properly.
*
*
* (1) Start broker-1 and broker-2
* (2) Make sure broker-2 always assign bundle to broker1
* (3) Broker-2 receives topic-1 request, creates local-policies and sets the watch
* (4) Broker-1 will own topic-1
* (5) Broker-2 will be a leader and trigger Split the bundle for topic-1
* (6) Broker-2 should get the watch and update bundle cache
* (7) Make lookup request again to Broker-2 which should succeed.
*
*
* </pre>
*
*
* @throws Exception
*/
@Test(timeOut = 5000)
Expand All @@ -892,6 +897,7 @@ public void testModularLoadManagerSplitBundle() throws Exception {
conf2.setAdvertisedAddress("localhost");
conf2.setClusterName(conf.getClusterName());
conf2.setLoadManagerClassName(ModularLoadManagerImpl.class.getName());
conf2.setZookeeperServers("localhost:2181");
PulsarService pulsar2 = startBroker(conf2);

// configure broker-1 with ModularLoadlManager
Expand Down

0 comments on commit b5392be

Please sign in to comment.