From 8c91197d13fdef1df0d43c05eb39c547bd5632e9 Mon Sep 17 00:00:00 2001 From: Sanjeev Kulkarni Date: Thu, 27 Aug 2020 16:16:38 -0700 Subject: [PATCH] convertFromConfig should support an overloading of cluster Name (#7890) Co-authored-by: Sanjeev Kulkarni --- .../org/apache/pulsar/functions/worker/Worker.java | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/Worker.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/Worker.java index 0b9eb7750bd86..968b8b4031eb7 100644 --- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/Worker.java +++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/Worker.java @@ -22,6 +22,7 @@ import lombok.extern.slf4j.Slf4j; import org.apache.bookkeeper.common.util.OrderedExecutor; import org.apache.pulsar.broker.PulsarServerException; +import org.apache.pulsar.broker.ServiceConfiguration; import org.apache.pulsar.broker.authentication.AuthenticationService; import org.apache.pulsar.broker.authorization.AuthorizationService; import org.apache.pulsar.broker.cache.ConfigurationCacheService; @@ -179,13 +180,13 @@ private AuthorizationService getAuthorizationService() throws PulsarServerExcept } this.configurationCacheService = new ConfigurationCacheService(this.globalZkCache, this.workerConfig.getPulsarFunctionsCluster()); - return new AuthorizationService(PulsarConfigurationLoader.convertFrom(workerConfig), this.configurationCacheService); + return new AuthorizationService(getServiceConfiguration(), this.configurationCacheService); } return null; } private AuthenticationService getAuthenticationService() throws PulsarServerException { - return new AuthenticationService(PulsarConfigurationLoader.convertFrom(workerConfig)); + return new AuthenticationService(getServiceConfiguration()); } public ZooKeeperClientFactory getZooKeeperClientFactory() { @@ -223,4 +224,10 @@ public Optional getListenPortHTTP() { public Optional getListenPortHTTPS() { return this.server.getListenPortHTTPS(); } + + private ServiceConfiguration getServiceConfiguration() { + ServiceConfiguration serviceConfiguration = PulsarConfigurationLoader.convertFrom(workerConfig); + serviceConfiguration.setClusterName(workerConfig.getPulsarFunctionsCluster()); + return serviceConfiguration; + } }