Skip to content

Commit

Permalink
[Functions] Provide an interface for functions worker service (apache…
Browse files Browse the repository at this point in the history
…#8560)

*Motivation*

Make the pulsar functions worker serve as an interface to allow plugin different functions of worker service implementations.
  • Loading branch information
sijie authored Dec 14, 2020
1 parent 3da2365 commit 26749a8
Show file tree
Hide file tree
Showing 69 changed files with 2,539 additions and 1,076 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1804,6 +1804,12 @@ public class ServiceConfiguration implements PulsarConfiguration {
)
private boolean functionsWorkerEnabled = false;

@FieldContext(
category = CATEGORY_FUNCTIONS,
doc = "The nar package for the function worker service"
)
private String functionsWorkerServiceNarPackage = "";

/**** --- Broker Web Stats --- ****/
@FieldContext(
category = CATEGORY_METRICS,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,12 +47,12 @@
import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.ServiceConfigurationUtils;
import org.apache.pulsar.common.allocator.PulsarByteBufAllocator;
import org.apache.pulsar.common.naming.NamespaceBundleSplitAlgorithm;
import org.apache.pulsar.common.protocol.Commands;
import org.apache.pulsar.functions.worker.WorkerConfig;
import org.apache.pulsar.functions.worker.WorkerService;
import org.apache.pulsar.functions.worker.service.WorkerServiceLoader;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.bridge.SLF4JBridgeHandler;
Expand Down Expand Up @@ -129,6 +129,7 @@ private static class BrokerStarter {
private final StatsProvider bookieStatsProvider;
private final ServerConfiguration bookieConfig;
private final WorkerService functionsWorkerService;
private final WorkerConfig workerConfig;

BrokerStarter(String[] args) throws Exception{
StarterArguments starterArguments = new StarterArguments();
Expand Down Expand Up @@ -170,50 +171,18 @@ private static class BrokerStarter {

// init functions worker
if (starterArguments.runFunctionsWorker || brokerConfig.isFunctionsWorkerEnabled()) {
WorkerConfig workerConfig;
if (isBlank(starterArguments.fnWorkerConfigFile)) {
workerConfig = new WorkerConfig();
} else {
workerConfig = WorkerConfig.load(starterArguments.fnWorkerConfigFile);
}
// worker talks to local broker
String hostname = ServiceConfigurationUtils.getDefaultOrConfiguredAddress(
brokerConfig.getAdvertisedAddress());
workerConfig.setWorkerHostname(hostname);
workerConfig.setWorkerPort(brokerConfig.getWebServicePort().get());
workerConfig.setWorkerId(
"c-" + brokerConfig.getClusterName()
+ "-fw-" + hostname
+ "-" + workerConfig.getWorkerPort());
// inherit broker authorization setting
workerConfig.setAuthenticationEnabled(brokerConfig.isAuthenticationEnabled());
workerConfig.setAuthenticationProviders(brokerConfig.getAuthenticationProviders());

workerConfig.setAuthorizationEnabled(brokerConfig.isAuthorizationEnabled());
workerConfig.setAuthorizationProvider(brokerConfig.getAuthorizationProvider());
workerConfig.setConfigurationStoreServers(brokerConfig.getConfigurationStoreServers());
workerConfig.setZooKeeperSessionTimeoutMillis(brokerConfig.getZooKeeperSessionTimeoutMillis());
workerConfig.setZooKeeperOperationTimeoutSeconds(brokerConfig.getZooKeeperOperationTimeoutSeconds());

workerConfig.setTlsAllowInsecureConnection(brokerConfig.isTlsAllowInsecureConnection());
workerConfig.setTlsEnableHostnameVerification(false);
workerConfig.setBrokerClientTrustCertsFilePath(brokerConfig.getTlsTrustCertsFilePath());

// client in worker will use this config to authenticate with broker
workerConfig.setBrokerClientAuthenticationPlugin(brokerConfig.getBrokerClientAuthenticationPlugin());
workerConfig.setBrokerClientAuthenticationParameters(
brokerConfig.getBrokerClientAuthenticationParameters());

// inherit super users
workerConfig.setSuperUserRoles(brokerConfig.getSuperUserRoles());

functionsWorkerService = new WorkerService(workerConfig);
workerConfig = PulsarService.initializeWorkerConfigFromBrokerConfig(
brokerConfig, starterArguments.fnWorkerConfigFile
);
functionsWorkerService = WorkerServiceLoader.load(workerConfig);
} else {
workerConfig = null;
functionsWorkerService = null;
}

// init pulsar service
pulsarService = new PulsarService(brokerConfig,
workerConfig,
Optional.ofNullable(functionsWorkerService),
(exitCode) -> {
log.info("Halting broker process with code {}",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,9 @@
package org.apache.pulsar;

import static org.apache.pulsar.broker.cache.ConfigurationCacheService.POLICIES_ROOT;
import static org.apache.pulsar.common.policies.data.Policies.getBundles;
import com.beust.jcommander.JCommander;
import com.beust.jcommander.Parameter;
import com.google.common.collect.Lists;
import java.io.IOException;
import java.util.Collections;
import java.util.List;
Expand All @@ -35,7 +35,6 @@
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
import org.apache.pulsar.common.policies.data.BundlesData;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.Policies;
import org.apache.pulsar.common.policies.data.TenantInfo;
Expand Down Expand Up @@ -355,22 +354,5 @@ public static ZooKeeper initZk(String connection, int sessionTimeout) throws Exc
return zkConnect;
}

private static BundlesData getBundles(int numBundles) {
Long maxVal = ((long) 1) << 32;
Long segSize = maxVal / numBundles;
List<String> partitions = Lists.newArrayList();
partitions.add(String.format("0x%08x", 0L));
Long curPartition = segSize;
for (int i = 0; i < numBundles; i++) {
if (i != numBundles - 1) {
partitions.add(String.format("0x%08x", curPartition));
} else {
partitions.add(String.format("0x%08x", maxVal - 1));
}
curPartition += segSize;
}
return new BundlesData(partitions);
}

private static final Logger log = LoggerFactory.getLogger(PulsarClusterMetadataSetup.class);
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
*/
package org.apache.pulsar;

import static org.apache.commons.lang3.StringUtils.isBlank;
import com.beust.jcommander.Parameter;
import com.google.common.collect.Sets;
import java.io.File;
Expand All @@ -28,7 +27,6 @@
import org.apache.bookkeeper.conf.ServerConfiguration;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.ServiceConfigurationUtils;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.PulsarAdminBuilder;
import org.apache.pulsar.client.admin.PulsarAdminException;
Expand All @@ -37,6 +35,7 @@
import org.apache.pulsar.common.policies.data.TenantInfo;
import org.apache.pulsar.functions.worker.WorkerConfig;
import org.apache.pulsar.functions.worker.WorkerService;
import org.apache.pulsar.functions.worker.service.WorkerServiceLoader;
import org.apache.pulsar.transaction.coordinator.TransactionCoordinatorID;
import org.apache.pulsar.zookeeper.LocalBookkeeperEnsemble;
import org.slf4j.Logger;
Expand All @@ -51,6 +50,7 @@ public class PulsarStandalone implements AutoCloseable {
LocalBookkeeperEnsemble bkEnsemble;
ServiceConfiguration config;
WorkerService fnWorkerService;
WorkerConfig workerConfig;

public void setBroker(PulsarService broker) {
this.broker = broker;
Expand Down Expand Up @@ -267,54 +267,23 @@ public void start() throws Exception {

// initialize the functions worker
if (!this.isNoFunctionsWorker()) {
WorkerConfig workerConfig;
if (isBlank(this.getFnWorkerConfigFile())) {
workerConfig = new WorkerConfig();
} else {
workerConfig = WorkerConfig.load(this.getFnWorkerConfigFile());
}
workerConfig = PulsarService.initializeWorkerConfigFromBrokerConfig(
config, this.getFnWorkerConfigFile());
// worker talks to local broker
if (this.isNoStreamStorage()) {
// only set the state storage service url when state is enabled.
workerConfig.setStateStorageServiceUrl(null);
} else if (workerConfig.getStateStorageServiceUrl() == null) {
workerConfig.setStateStorageServiceUrl("bk://127.0.0.1:" + this.getStreamStoragePort());
}

String hostname = ServiceConfigurationUtils.getDefaultOrConfiguredAddress(
config.getAdvertisedAddress());
workerConfig.setWorkerHostname(hostname);
workerConfig.setWorkerPort(config.getWebServicePort().get());
workerConfig.setWorkerId(
"c-" + config.getClusterName()
+ "-fw-" + hostname
+ "-" + workerConfig.getWorkerPort());
// inherit broker authorization setting
workerConfig.setAuthenticationEnabled(config.isAuthenticationEnabled());
workerConfig.setAuthenticationProviders(config.getAuthenticationProviders());

workerConfig.setAuthorizationEnabled(config.isAuthorizationEnabled());
workerConfig.setAuthorizationProvider(config.getAuthorizationProvider());
workerConfig.setConfigurationStoreServers(config.getConfigurationStoreServers());
workerConfig.setZooKeeperSessionTimeoutMillis(config.getZooKeeperSessionTimeoutMillis());
workerConfig.setZooKeeperOperationTimeoutSeconds(config.getZooKeeperOperationTimeoutSeconds());

workerConfig.setTlsAllowInsecureConnection(config.isTlsAllowInsecureConnection());
workerConfig.setTlsEnableHostnameVerification(false);
workerConfig.setBrokerClientTrustCertsFilePath(config.getTlsTrustCertsFilePath());

// client in worker will use this config to authenticate with broker
workerConfig.setBrokerClientAuthenticationPlugin(config.getBrokerClientAuthenticationPlugin());
workerConfig.setBrokerClientAuthenticationParameters(config.getBrokerClientAuthenticationParameters());

// inherit super users
workerConfig.setSuperUserRoles(config.getSuperUserRoles());

fnWorkerService = new WorkerService(workerConfig);
fnWorkerService = WorkerServiceLoader.load(workerConfig);
} else {
workerConfig = new WorkerConfig();
}

// Start Broker
broker = new PulsarService(config,
workerConfig,
Optional.ofNullable(fnWorkerService),
(exitCode) -> {
log.info("Halting standalone process with code {}", exitCode);
Expand Down
Loading

0 comments on commit 26749a8

Please sign in to comment.