Skip to content

Commit

Permalink
Configure DLog Bookie, Pulsar, and Admin clients via pass through con…
Browse files Browse the repository at this point in the history
…fig (apache#15818)
  • Loading branch information
michaeljmarshall authored Jun 1, 2022
1 parent 554c15d commit aa67349
Show file tree
Hide file tree
Showing 26 changed files with 506 additions and 36 deletions.
14 changes: 12 additions & 2 deletions conf/broker.conf
Original file line number Diff line number Diff line change
Expand Up @@ -677,6 +677,9 @@ brokerClientTlsCiphers=
# used by the internal client to authenticate with Pulsar brokers
brokerClientTlsProtocols=

# You can add extra configuration options for the Pulsar Client and the Pulsar Admin Client
# by prefixing them with "brokerClient_". These configurations are applied after hard coded configuration
# and before the above brokerClient configurations named above.

### --- Metadata Store --- ###

Expand Down Expand Up @@ -950,8 +953,11 @@ managedLedgerDefaultAckQuorum=2
# in case of lack of enough bookies
#bookkeeper_opportunisticStriping=false

# you can add other configuration options for the BookKeeper client
# by prefixing them with bookkeeper_
# You can add other configuration options for the BookKeeper client
# by prefixing them with "bookkeeper_". These configurations are applied
# to all bookkeeper clients started by the broker (including the managed ledger bookkeeper clients as well as
# the BookkeeperPackagesStorage bookkeeper client), except the distributed log bookkeeper client.
# The dlog bookkeeper client is configured in the functions worker configuration file.

# How frequently to flush the cursor positions that were accumulated due to rate limiting. (seconds).
# Default is 60 seconds
Expand Down Expand Up @@ -1429,6 +1435,10 @@ packagesReplicas=1
# The bookkeeper ledger root path
packagesManagementLedgerRootPath=/ledgers

# When using BookKeeperPackagesStorageProvider, you can configure the
# bookkeeper client by prefixing configurations with "bookkeeper_".
# This config applies to managed ledger bookkeeper clients, as well.

### --- Packages management service configuration variables (end) --- ###

#enable or disable strict bookie affinity
Expand Down
20 changes: 20 additions & 0 deletions conf/functions_worker.yml
Original file line number Diff line number Diff line change
Expand Up @@ -398,6 +398,26 @@ validateConnectorConfig: false
# If it is set to true, you must ensure that it has been initialized by "bin/pulsar initialize-cluster-metadata" command.
initializedDlogMetadata: false

###########################
# Arbitrary Configuration
###########################
# When a configuration parameter is not explicitly named in the WorkerConfig class, it is only accessible from the
# properties map. This map can be configured by supplying values to the properties map in this config file.

# Configure the DLog bookkeeper client by prefixing configurations with "bookkeeper_". Because these are arbitrary, they
# must be added to the properties map to get correctly applied. This configuration applies to the Dlog bookkeeper client
# in both the standalone function workers and function workers initialized in the broker.

# You can add extra configuration options for the Pulsar Client and the Pulsar Admin Client
# by prefixing them with "brokerClient_". These configurations are applied after hard coded configuration
# and before the above brokerClient configurations named above.

## For example, when using the token authentication provider (AuthenticationProviderToken), you must configure several
## custom configurations. Here is a sample for configuring one of the necessary configs:
#properties:
# tokenPublicKey: "file:///path/to/my/key"
# tokenPublicAlg: "RSA256"

### --- Deprecated settings --- ###
configurationStoreServers: localhost:2181

Expand Down
4 changes: 4 additions & 0 deletions conf/proxy.conf
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,10 @@ tlsEnabledWithBroker=false
# Tls cert refresh duration in seconds (set 0 to check on every new connection)
tlsCertRefreshCheckDurationSec=300

# You can add extra configuration options for the Pulsar Client
# by prefixing them with "brokerClient_". These configurations are applied after hard coded configuration
# and before the above brokerClient configurations named above.

##### --- Rate Limiting --- #####

# Max concurrent inbound connections. The proxy will reject requests beyond that.
Expand Down
4 changes: 4 additions & 0 deletions conf/websocket.conf
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,10 @@ brokerClientAuthenticationPlugin=
brokerClientAuthenticationParameters=
brokerClientTrustCertsFilePath=

# You can add extra configuration options for the Pulsar Client
# by prefixing them with "brokerClient_". These configurations are applied after hard coded configuration
# and before the above brokerClient configurations named above.

# When this parameter is not empty, unauthenticated users perform as anonymousUserRole
anonymousUserRole=

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
import java.io.IOException;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.client.BKException;
Expand All @@ -43,6 +42,7 @@
import org.apache.bookkeeper.stats.StatsLogger;
import org.apache.pulsar.bookie.rackawareness.BookieRackAffinityMapping;
import org.apache.pulsar.bookie.rackawareness.IsolatedBookieEnsemblePlacementPolicy;
import org.apache.pulsar.client.internal.PropertiesUtils;
import org.apache.pulsar.common.allocator.PulsarByteBufAllocator;
import org.apache.pulsar.common.protocol.Commands;
import org.apache.pulsar.metadata.api.MetadataStore;
Expand Down Expand Up @@ -150,15 +150,11 @@ ClientConfiguration createBkClientConfiguration(MetadataStoreExtended store, Ser
conf.getBookkeeperClientGetBookieInfoIntervalSeconds(), TimeUnit.SECONDS);
bkConf.setGetBookieInfoRetryIntervalSeconds(
conf.getBookkeeperClientGetBookieInfoRetryIntervalSeconds(), TimeUnit.SECONDS);
Properties allProps = conf.getProperties();
allProps.forEach((key, value) -> {
String sKey = key.toString();
if (sKey.startsWith("bookkeeper_") && value != null) {
String bkExtraConfigKey = sKey.substring(11);
log.info("Extra BookKeeper client configuration {}, setting {}={}", sKey, bkExtraConfigKey, value);
bkConf.setProperty(bkExtraConfigKey, value);
}
});
PropertiesUtils.filterAndMapProperties(conf.getProperties(), "bookkeeper_")
.forEach((key, value) -> {
log.info("Applying BookKeeper client configuration setting {}={}", key, value);
bkConf.setProperty(key, value);
});
return bkConf;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,8 @@
import org.apache.pulsar.client.api.transaction.TransactionBufferClient;
import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
import org.apache.pulsar.client.impl.conf.ConfigurationDataUtils;
import org.apache.pulsar.client.internal.PropertiesUtils;
import org.apache.pulsar.client.util.ExecutorProvider;
import org.apache.pulsar.common.conf.InternalConfigurationData;
import org.apache.pulsar.common.configuration.PulsarConfigurationLoader;
Expand Down Expand Up @@ -1395,10 +1397,19 @@ public PulsarClientImpl createClientImpl(ClientConfigurationData clientConf)
public synchronized PulsarClient getClient() throws PulsarServerException {
if (this.client == null) {
try {
ClientConfigurationData conf = new ClientConfigurationData();
ClientConfigurationData initialConf = new ClientConfigurationData();

// Disable memory limit for broker client
conf.setMemoryLimitBytes(0);
// Disable memory limit for broker client and disable stats
initialConf.setMemoryLimitBytes(0);
initialConf.setStatsIntervalSeconds(0);

// Apply all arbitrary configuration. This must be called before setting any fields annotated as
// @Secret on the ClientConfigurationData object because of the way they are serialized.
// See https://github.com/apache/pulsar/issues/8509 for more information.
Map<String, Object> overrides = PropertiesUtils
.filterAndMapProperties(this.getConfiguration().getProperties(), "brokerClient_");
ClientConfigurationData conf =
ConfigurationDataUtils.loadData(overrides, initialConf, ClientConfigurationData.class);

conf.setServiceUrl(this.getConfiguration().isTlsEnabled()
? this.brokerServiceUrlTls : this.brokerServiceUrl);
Expand Down Expand Up @@ -1427,8 +1438,6 @@ public synchronized PulsarClient getClient() throws PulsarServerException {
this.getConfiguration().getBrokerClientAuthenticationPlugin(),
this.getConfiguration().getBrokerClientAuthenticationParameters()));
}

conf.setStatsIntervalSeconds(0);
this.client = createClientImpl(conf);
} catch (Exception e) {
throw new PulsarServerException(e);
Expand All @@ -1448,10 +1457,16 @@ public synchronized PulsarAdmin getAdminClient() throws PulsarServerException {
+ ", webServiceAddressTls: " + webServiceAddressTls
+ ", webServiceAddress: " + webServiceAddress);
}
PulsarAdminBuilder builder = PulsarAdmin.builder().serviceHttpUrl(adminApiUrl) //
.authentication(//
conf.getBrokerClientAuthenticationPlugin(), //
conf.getBrokerClientAuthenticationParameters());
PulsarAdminBuilder builder = PulsarAdmin.builder().serviceHttpUrl(adminApiUrl);

// Apply all arbitrary configuration. This must be called before setting any fields annotated as
// @Secret on the ClientConfigurationData object because of the way they are serialized.
// See https://github.com/apache/pulsar/issues/8509 for more information.
builder.loadConf(PropertiesUtils.filterAndMapProperties(config.getProperties(), "brokerClient_"));

builder.authentication(
conf.getBrokerClientAuthenticationPlugin(),
conf.getBrokerClientAuthenticationParameters());

if (conf.isBrokerClientTlsEnabled()) {
builder.tlsCiphers(config.getBrokerClientTlsCiphers())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@
import org.apache.pulsar.client.impl.ClientBuilderImpl;
import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
import org.apache.pulsar.client.internal.PropertiesUtils;
import org.apache.pulsar.common.api.proto.CommandGetTopicsOfNamespace.Mode;
import org.apache.pulsar.common.lookup.GetTopicsResult;
import org.apache.pulsar.common.lookup.data.LookupData;
Expand Down Expand Up @@ -1264,6 +1265,11 @@ public PulsarClientImpl getNamespaceClient(ClusterDataImpl cluster) {
.enableTcpNoDelay(false)
.statsInterval(0, TimeUnit.SECONDS);

// Apply all arbitrary configuration. This must be called before setting any fields annotated as
// @Secret on the ClientConfigurationData object because of the way they are serialized.
// See https://github.com/apache/pulsar/issues/8509 for more information.
clientBuilder.loadConf(PropertiesUtils.filterAndMapProperties(config.getProperties(), "brokerClient_"));

if (pulsar.getConfiguration().isAuthenticationEnabled()) {
clientBuilder.authentication(pulsar.getConfiguration().getBrokerClientAuthenticationPlugin(),
pulsar.getConfiguration().getBrokerClientAuthenticationParameters());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,7 @@
import org.apache.pulsar.client.api.SizeUnit;
import org.apache.pulsar.client.impl.ClientBuilderImpl;
import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
import org.apache.pulsar.client.internal.PropertiesUtils;
import org.apache.pulsar.common.allocator.PulsarByteBufAllocator;
import org.apache.pulsar.common.configuration.BindAddress;
import org.apache.pulsar.common.configuration.FieldContext;
Expand Down Expand Up @@ -1181,6 +1182,12 @@ public PulsarClient getReplicationClient(String cluster, Optional<ClusterData> c
// Disable memory limit for replication client
clientBuilder.memoryLimit(0, SizeUnit.BYTES);

// Apply all arbitrary configuration. This must be called before setting any fields annotated as
// @Secret on the ClientConfigurationData object because of the way they are serialized.
// See https://github.com/apache/pulsar/issues/8509 for more information.
clientBuilder.loadConf(PropertiesUtils.filterAndMapProperties(pulsar.getConfiguration().getProperties(),
"brokerClient_"));

if (data.getAuthenticationPlugin() != null && data.getAuthenticationParameters() != null) {
clientBuilder.authentication(data.getAuthenticationPlugin(), data.getAuthenticationParameters());
} else if (pulsar.getConfiguration().isAuthenticationEnabled()) {
Expand Down Expand Up @@ -1256,10 +1263,16 @@ public PulsarAdmin getClusterPulsarAdmin(String cluster, Optional<ClusterData> c

boolean isTlsUrl = conf.isBrokerClientTlsEnabled() && isNotBlank(data.getServiceUrlTls());
String adminApiUrl = isTlsUrl ? data.getServiceUrlTls() : data.getServiceUrl();
PulsarAdminBuilder builder = PulsarAdmin.builder().serviceHttpUrl(adminApiUrl)
.authentication(
conf.getBrokerClientAuthenticationPlugin(),
conf.getBrokerClientAuthenticationParameters());
PulsarAdminBuilder builder = PulsarAdmin.builder().serviceHttpUrl(adminApiUrl);

// Apply all arbitrary configuration. This must be called before setting any fields annotated as
// @Secret on the ClientConfigurationData object because of the way they are serialized.
// See https://github.com/apache/pulsar/issues/8509 for more information.
builder.loadConf(PropertiesUtils.filterAndMapProperties(conf.getProperties(), "brokerClient_"));

builder.authentication(
conf.getBrokerClientAuthenticationPlugin(),
conf.getBrokerClientAuthenticationParameters());

if (isTlsUrl) {
builder.allowTlsInsecureConnection(conf.isTlsAllowInsecureConnection());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import org.apache.pulsar.client.api.ClientBuilder;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.SizeUnit;
import org.apache.pulsar.client.internal.PropertiesUtils;
import org.apache.pulsar.common.configuration.PulsarConfigurationLoader;
import org.apache.pulsar.common.util.CmdGenerateDocs;
import org.apache.pulsar.common.util.netty.EventLoopUtil;
Expand Down Expand Up @@ -107,6 +108,11 @@ public static void main(String[] args) throws Exception {
ClientBuilder clientBuilder = PulsarClient.builder()
.memoryLimit(0, SizeUnit.BYTES);

// Apply all arbitrary configuration. This must be called before setting any fields annotated as
// @Secret on the ClientConfigurationData object because of the way they are serialized.
// See https://github.com/apache/pulsar/issues/8509 for more information.
clientBuilder.loadConf(PropertiesUtils.filterAndMapProperties(brokerConfig.getProperties(), "brokerClient_"));

if (isNotBlank(brokerConfig.getBrokerClientAuthenticationPlugin())) {
clientBuilder.authentication(brokerConfig.getBrokerClientAuthenticationPlugin(),
brokerConfig.getBrokerClientAuthenticationParameters());
Expand Down
Loading

0 comments on commit aa67349

Please sign in to comment.