Skip to content

Commit

Permalink
Align configurations defaults between default file and Java object (b…
Browse files Browse the repository at this point in the history
…roker.conf, proxy.conf, websocket.conf) (apache#13272)

* Websocket and proxy

* update doc

* fix license header

* fix failing test

* * Revert numThreads to use available processor by default
* Revert 'brokerDeduplicationSnapshotFrequencyInSeconds' to 2min

* fix rebase
  • Loading branch information
nicoloboschi authored Dec 15, 2021
1 parent 1456f87 commit 4e7d788
Show file tree
Hide file tree
Showing 9 changed files with 171 additions and 18 deletions.
12 changes: 6 additions & 6 deletions conf/broker.conf
Original file line number Diff line number Diff line change
Expand Up @@ -232,7 +232,7 @@ brokerDeduplicationEnabled=false
brokerDeduplicationMaxNumberOfProducers=10000

# How often is the thread pool scheduled to check whether a snapshot needs to be taken.(disable with value 0)
brokerDeduplicationSnapshotFrequencyInSeconds=10
brokerDeduplicationSnapshotFrequencyInSeconds=120
# If this time interval is exceeded, a snapshot will be taken.
# It will run simultaneously with `brokerDeduplicationEntriesInterval`
brokerDeduplicationSnapshotIntervalSeconds=120
Expand Down Expand Up @@ -426,7 +426,7 @@ maxConcurrentTopicLoadRequest=5000
maxConcurrentNonPersistentMessagePerConnection=1000

# Number of worker threads to serve non-persistent topic
numWorkerThreadsForNonPersistentTopic=8
numWorkerThreadsForNonPersistentTopic=

# Enable broker to load persistent topics
enablePersistentTopics=true
Expand Down Expand Up @@ -926,10 +926,10 @@ managedLedgerStatsPeriodSeconds=60
managedLedgerDigestType=CRC32C

# Number of threads to be used for managed ledger tasks dispatching
managedLedgerNumWorkerThreads=8
managedLedgerNumWorkerThreads=

# Number of threads to be used for managed ledger scheduled tasks
managedLedgerNumSchedulerThreads=8
managedLedgerNumSchedulerThreads=

# Amount of memory to use for caching data payload in managed ledger. This memory
# is allocated from JVM direct memory and it's shared across all the topics
Expand Down Expand Up @@ -1195,10 +1195,10 @@ bootstrapNamespaces=
webSocketServiceEnabled=false

# Number of IO threads in Pulsar Client used in WebSocket proxy
webSocketNumIoThreads=8
webSocketNumIoThreads=

# Number of connections per Broker in Pulsar Client used in WebSocket proxy
webSocketConnectionsPerBroker=8
webSocketConnectionsPerBroker=

# Time in milliseconds that idle WebSocket session times out
webSocketSessionIdleTimeoutMillis=300000
Expand Down
4 changes: 2 additions & 2 deletions conf/websocket.conf
Original file line number Diff line number Diff line change
Expand Up @@ -49,13 +49,13 @@ bindAddress=0.0.0.0
clusterName=

# Number of IO threads in Pulsar Client used in WebSocket proxy
webSocketNumIoThreads=8
webSocketNumIoThreads=

# Number of threads to use in HTTP server. Default is Runtime.getRuntime().availableProcessors()
numHttpServerThreads=

# Number of connections per Broker in Pulsar Client used in WebSocket proxy
webSocketConnectionsPerBroker=8
webSocketConnectionsPerBroker=

# Time in milliseconds that idle WebSocket session times out
webSocketSessionIdleTimeoutMillis=300000
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -700,7 +700,7 @@ public class ServiceConfiguration implements PulsarConfiguration {
+ "Reducing to lower value can give more accuracy while throttling publish but "
+ "it uses more CPU to perform frequent check. (Disable publish throttling with value 0)"
)
private int topicPublisherThrottlingTickTimeMillis = 5;
private int topicPublisherThrottlingTickTimeMillis = 10;
@FieldContext(
category = CATEGORY_SERVER,
doc = "Enable precise rate limit for topic publish"
Expand Down Expand Up @@ -816,7 +816,7 @@ public class ServiceConfiguration implements PulsarConfiguration {
doc = "Default dispatch-throttling is disabled for consumers which already caught-up with"
+ " published messages and don't have backlog. This enables dispatch-throttling for "
+ " non-backlog consumers as well.")
private boolean dispatchThrottlingOnNonBacklogConsumerEnabled = false;
private boolean dispatchThrottlingOnNonBacklogConsumerEnabled = true;

@FieldContext(
category = CATEGORY_POLICIES,
Expand Down Expand Up @@ -1596,7 +1596,7 @@ public class ServiceConfiguration implements PulsarConfiguration {
category = CATEGORY_STORAGE_ML,
doc = "Threshold to which bring down the cache level when eviction is triggered"
)
private double managedLedgerCacheEvictionWatermark = 0.9f;
private double managedLedgerCacheEvictionWatermark = 0.9;
@FieldContext(category = CATEGORY_STORAGE_ML,
doc = "Configure the cache eviction frequency for the managed ledger cache. Default is 100/s")
private double managedLedgerCacheEvictionFrequency = 100.0;
Expand Down Expand Up @@ -2005,7 +2005,7 @@ public class ServiceConfiguration implements PulsarConfiguration {
category = CATEGORY_REPLICATION,
doc = "Enable replication metrics"
)
private boolean replicationMetricsEnabled = false;
private boolean replicationMetricsEnabled = true;
@FieldContext(
category = CATEGORY_REPLICATION,
doc = "Max number of connections to open for each broker in a remote cluster.\n\n"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ public void testLoadConfig() throws SecurityException, NoSuchMethodException, IO
assertTrue(serviceConfig.isBacklogQuotaCheckEnabled());
assertEquals(serviceConfig.getManagedLedgerDefaultMarkDeleteRateLimit(), 5.0);
assertEquals(serviceConfig.getReplicationProducerQueueSize(), 50);
assertFalse(serviceConfig.isReplicationMetricsEnabled());
assertTrue(serviceConfig.isReplicationMetricsEnabled());
assertTrue(serviceConfig.isBookkeeperClientHealthCheckEnabled());
assertEquals(serviceConfig.getBookkeeperClientHealthCheckErrorThresholdPerInterval(), 5);
assertTrue(serviceConfig.isBookkeeperClientRackawarePolicyEnabled());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,19 @@
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertTrue;

import java.beans.Introspector;
import java.beans.PropertyDescriptor;
import java.io.ByteArrayInputStream;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.PrintWriter;
import java.io.StringWriter;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.Properties;
import lombok.Cleanup;
Expand Down Expand Up @@ -210,4 +217,27 @@ public void testBookkeeperMetadataStore() throws Exception {
assertTrue(conf.isConfigurationStoreSeparated());
assertTrue(conf.isBookkeeperMetadataStoreSeparated());
}

@Test
public void testConfigFileDefaults() throws Exception {
try (FileInputStream stream = new FileInputStream("../conf/broker.conf")) {
final ServiceConfiguration javaConfig = PulsarConfigurationLoader.create(new Properties(), ServiceConfiguration.class);
final ServiceConfiguration fileConfig = PulsarConfigurationLoader.create(stream, ServiceConfiguration.class);
List<String> toSkip = Arrays.asList("properties", "class");
int counter = 0;
for (PropertyDescriptor pd : Introspector.getBeanInfo(ServiceConfiguration.class).getPropertyDescriptors()) {
if (pd.getReadMethod() == null || toSkip.contains(pd.getName())) {
continue;
}
final String key = pd.getName();
final Object javaValue = pd.getReadMethod().invoke(javaConfig);
final Object fileValue = pd.getReadMethod().invoke(fileConfig);
assertTrue(Objects.equals(javaValue, fileValue), "property '"
+ key + "' conf/broker.conf default value doesn't match java default value\nConf: "+ fileValue + "\nJava: " + javaValue);
counter++;
}
assertEquals(counter, 378);
}

}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.proxy.server;


import org.apache.pulsar.common.configuration.PulsarConfigurationLoader;
import org.testng.annotations.Test;

import java.beans.Introspector;
import java.beans.PropertyDescriptor;
import java.io.FileInputStream;
import java.util.Arrays;
import java.util.List;
import java.util.Objects;
import java.util.Properties;

import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertTrue;

@Test(groups = "broker")
public class ProxyConfigurationTest {

@Test
public void testConfigFileDefaults() throws Exception {
try (FileInputStream stream = new FileInputStream("../conf/proxy.conf")) {
final ProxyConfiguration javaConfig = PulsarConfigurationLoader.create(new Properties(), ProxyConfiguration.class);
final ProxyConfiguration fileConfig = PulsarConfigurationLoader.create(stream, ProxyConfiguration.class);
List<String> toSkip = Arrays.asList("properties", "class");
int counter = 0;
for (PropertyDescriptor pd : Introspector.getBeanInfo(ProxyConfiguration.class).getPropertyDescriptors()) {
if (pd.getReadMethod() == null || toSkip.contains(pd.getName())) {
continue;
}
final String key = pd.getName();
final Object javaValue = pd.getReadMethod().invoke(javaConfig);
final Object fileValue = pd.getReadMethod().invoke(fileConfig);
assertTrue(Objects.equals(javaValue, fileValue), "property '"
+ key + "' conf/proxy.conf default value doesn't match java default value\nConf: "+ fileValue + "\nJava: " + javaValue);
counter++;
}
assertEquals(79, counter);
}
}


}
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ public class WebSocketProxyConfiguration implements PulsarConfiguration {
private Optional<Integer> webServicePortTls = Optional.empty();

@FieldContext(doc = "Hostname or IP address the service binds on, default is 0.0.0.0.")
private String bindAddress;
private String bindAddress = "0.0.0.0";

@FieldContext(doc = "Maximum size of a text message during parsing in WebSocket proxy")
private int webSocketMaxTextFrameSize = 1024 * 1024;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.websocket.service;

import org.apache.pulsar.common.configuration.PulsarConfigurationLoader;
import org.testng.annotations.Test;

import java.beans.Introspector;
import java.beans.PropertyDescriptor;
import java.io.FileInputStream;
import java.util.Arrays;
import java.util.List;
import java.util.Objects;
import java.util.Properties;

import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertTrue;

@Test(groups = "broker")
public class WebSocketProxyConfigurationTest {

@Test
public void testConfigFileDefaults() throws Exception {
try (FileInputStream stream = new FileInputStream("../conf/websocket.conf")) {
final WebSocketProxyConfiguration javaConfig = PulsarConfigurationLoader.create(new Properties(), WebSocketProxyConfiguration.class);
final WebSocketProxyConfiguration fileConfig = PulsarConfigurationLoader.create(stream, WebSocketProxyConfiguration.class);
List<String> toSkip = Arrays.asList("properties", "class");
int counter = 0;
for (PropertyDescriptor pd : Introspector.getBeanInfo(WebSocketProxyConfiguration.class).getPropertyDescriptors()) {
if (pd.getReadMethod() == null || toSkip.contains(pd.getName())) {
continue;
}
final String key = pd.getName();
final Object javaValue = pd.getReadMethod().invoke(javaConfig);
final Object fileValue = pd.getReadMethod().invoke(fileConfig);
assertTrue(Objects.equals(javaValue, fileValue), "property '"
+ key + "' conf/websocket.conf default value doesn't match java default value\nConf: "+ fileValue + "\nJava: " + javaValue);
counter++;
}
assertEquals(36, counter);
}
}

}
9 changes: 5 additions & 4 deletions site2/docs/reference-configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -153,8 +153,8 @@ Pulsar brokers are responsible for handling incoming messages from producers, di
|webServicePort| Port to use to server HTTP request |8080|
|webServicePortTls| Port to use to server HTTPS request |8443|
|webSocketServiceEnabled| Enable the WebSocket API service in broker |false|
|webSocketNumIoThreads|The number of IO threads in Pulsar Client used in WebSocket proxy.|8|
|webSocketConnectionsPerBroker|The number of connections per Broker in Pulsar Client used in WebSocket proxy.|8|
|webSocketNumIoThreads|The number of IO threads in Pulsar Client used in WebSocket proxy.|Runtime.getRuntime().availableProcessors()|
|webSocketConnectionsPerBroker|The number of connections per Broker in Pulsar Client used in WebSocket proxy.|Runtime.getRuntime().availableProcessors()|
|webSocketSessionIdleTimeoutMillis|Time in milliseconds that idle WebSocket session times out.|300000|
|webSocketMaxTextFrameSize|The maximum size of a text message during parsing in WebSocket proxy.|1048576|
|exposeTopicLevelMetricsInPrometheus|Whether to enable topic level metrics.|true|
Expand All @@ -168,6 +168,7 @@ Pulsar brokers are responsible for handling incoming messages from producers, di
|brokerDeduplicationMaxNumberOfProducers| The maximum number of producers for which information will be stored for deduplication purposes. |10000|
|brokerDeduplicationEntriesInterval| The number of entries after which a deduplication informational snapshot is taken. A larger interval will lead to fewer snapshots being taken, though this would also lengthen the topic recovery time (the time required for entries published after the snapshot to be replayed). |1000|
|brokerDeduplicationProducerInactivityTimeoutMinutes| The time of inactivity (in minutes) after which the broker will discard deduplication information related to a disconnected producer. |360|
|brokerDeduplicationSnapshotFrequencyInSeconds| How often is the thread pool scheduled to check whether a snapshot needs to be taken. The value of `0` means it is disabled. |120|
|dispatchThrottlingRatePerReplicatorInMsg| The default messages per second dispatch throttling-limit for every replicator in replication. The value of `0` means disabling replication message dispatch-throttling| 0 |
|dispatchThrottlingRatePerReplicatorInByte| The default bytes per second dispatch throttling-limit for every replicator in replication. The value of `0` means disabling replication message-byte dispatch-throttling| 0 |
|zooKeeperSessionTimeoutMillis| Zookeeper session timeout in milliseconds |30000|
Expand Down Expand Up @@ -590,8 +591,8 @@ You can set the log level and configuration in the [log4j2.yaml](https://github
|managedLedgerDefaultWriteQuorum| |1|
|managedLedgerDefaultAckQuorum| |1|
| managedLedgerDigestType | Default type of checksum to use when writing to BookKeeper. | CRC32C |
| managedLedgerNumWorkerThreads | Number of threads to be used for managed ledger tasks dispatching. | 8 |
| managedLedgerNumSchedulerThreads | Number of threads to be used for managed ledger scheduled tasks. | 8 |
| managedLedgerNumWorkerThreads | Number of threads to be used for managed ledger tasks dispatching. | Runtime.getRuntime().availableProcessors() |
| managedLedgerNumSchedulerThreads | Number of threads to be used for managed ledger scheduled tasks. | Runtime.getRuntime().availableProcessors() |
|managedLedgerCacheSizeMB| |N/A|
|managedLedgerCacheCopyEntries| Whether to copy the entry payloads when inserting in cache.| false|
|managedLedgerCacheEvictionWatermark| |0.9|
Expand Down

0 comments on commit 4e7d788

Please sign in to comment.