Skip to content

Commit

Permalink
[PIP-45] Remove ConfigurationCacheService from AuthorizationProvider (a…
Browse files Browse the repository at this point in the history
…pache#12064)

* [PIP-45] Remove ConfigurationCacheService from AuthorizationProvider

* Fixed DiscoveryService

* Fixed WebSocketService

* Fixed references in Worker

* Fixed BrokerService

* Fixed ServerCnxTest

* Fixed ProxyService
  • Loading branch information
merlimat authored Sep 16, 2021
1 parent 39ddf6d commit 23ffdb7
Show file tree
Hide file tree
Showing 14 changed files with 62 additions and 86 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
import org.apache.pulsar.broker.cache.ConfigurationCacheService;
import org.apache.pulsar.broker.resources.PulsarResources;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.policies.data.AuthAction;
Expand Down Expand Up @@ -93,8 +94,28 @@ default CompletableFuture<Boolean> isTenantAdmin(String tenant, String role, Ten
* pulsar zk configuration cache service
* @throws IOException
* if the initialization fails
*
* @deprecated ConfigurationCacheService is not supported anymore as a way to get access to metadata.
* @see #initialize(ServiceConfiguration, PulsarResources)
*/
void initialize(ServiceConfiguration conf, ConfigurationCacheService configCache) throws IOException;
@Deprecated
default void initialize(ServiceConfiguration conf, ConfigurationCacheService configCache) throws IOException {
}

/**
* Perform initialization for the authorization provider
*
* @param conf
* broker config object
* @param pulsarResources
* Resources component for access to metadata
* @throws IOException
* if the initialization fails
*/
default void initialize(ServiceConfiguration conf, PulsarResources pulsarResources) throws IOException {
// For compatibility, call the old deprecated initialize
initialize(conf, (ConfigurationCacheService) null);
}

/**
* Check if the specified role has permission to send messages to the specified fully qualified topic name.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
import org.apache.pulsar.broker.cache.ConfigurationCacheService;
import org.apache.pulsar.broker.resources.PulsarResources;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.AuthAction;
Expand Down Expand Up @@ -55,14 +56,14 @@ public class AuthorizationService {
private AuthorizationProvider provider;
private final ServiceConfiguration conf;

public AuthorizationService(ServiceConfiguration conf, ConfigurationCacheService configCache)
public AuthorizationService(ServiceConfiguration conf, PulsarResources pulsarResources)
throws PulsarServerException {
this.conf = conf;
try {
final String providerClassname = conf.getAuthorizationProvider();
if (StringUtils.isNotBlank(providerClassname)) {
provider = (AuthorizationProvider) Class.forName(providerClassname).newInstance();
provider.initialize(conf, configCache);
provider.initialize(conf, pulsarResources);
log.info("{} has been loaded.", providerClassname);
} else {
throw new PulsarServerException("No authorization providers are present.");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
import org.apache.pulsar.broker.cache.ConfigurationCacheService;
import org.apache.pulsar.broker.resources.PulsarResources;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.NamespaceOperation;
Expand Down Expand Up @@ -67,7 +68,7 @@ public MultiRolesTokenAuthorizationProvider() {
}

@Override
public void initialize(ServiceConfiguration conf, ConfigurationCacheService configCache) throws IOException {
public void initialize(ServiceConfiguration conf, PulsarResources pulsarResources) throws IOException {
String prefix = (String) conf.getProperty(CONF_TOKEN_SETTING_PREFIX);
if (null == prefix) {
prefix = "";
Expand All @@ -78,7 +79,7 @@ public void initialize(ServiceConfiguration conf, ConfigurationCacheService conf
this.roleClaim = (String) tokenAuthClaim;
}

super.initialize(conf, configCache);
super.initialize(conf, pulsarResources);
}

private List<String> getRoles(AuthenticationDataSource authData) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,17 +63,17 @@ public class PulsarAuthorizationProvider implements AuthorizationProvider {
public PulsarAuthorizationProvider() {
}

public PulsarAuthorizationProvider(ServiceConfiguration conf, ConfigurationCacheService configCache)
public PulsarAuthorizationProvider(ServiceConfiguration conf, PulsarResources resources)
throws IOException {
initialize(conf, configCache);
initialize(conf, resources);
}

@Override
public void initialize(ServiceConfiguration conf, ConfigurationCacheService configCache) throws IOException {
public void initialize(ServiceConfiguration conf, PulsarResources pulsarResources) throws IOException {
checkNotNull(conf, "ServiceConfiguration can't be null");
checkNotNull(configCache, "ConfigurationCacheService can't be null");
checkNotNull(pulsarResources, "PulsarResources can't be null");
this.conf = conf;
this.pulsarResources = configCache.getPulsarResources();
this.pulsarResources = pulsarResources;

}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -285,7 +285,7 @@ public BrokerService(PulsarService pulsar, EventLoopGroup eventLoopGroup) throws
this.statsUpdater = Executors
.newSingleThreadScheduledExecutor(new DefaultThreadFactory("pulsar-stats-updater"));
this.authorizationService = new AuthorizationService(
pulsar.getConfiguration(), pulsar.getConfigurationCache());
pulsar.getConfiguration(), pulsar().getPulsarResources());

pulsar.getLocalMetadataStore().registerListener(this::handleMetadataChanges);
pulsar.getConfigurationMetadataStore().registerListener(this::handleMetadataChanges);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
import org.apache.pulsar.broker.cache.ConfigurationCacheService;
import org.apache.pulsar.broker.resources.PulsarResources;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.policies.data.AuthAction;
Expand Down Expand Up @@ -63,10 +64,6 @@ public CompletableFuture<Boolean> isTenantAdmin(String tenant, String role, Tena
return roleAuthorizedAsync(role);
}

@Override
public void initialize(ServiceConfiguration conf, ConfigurationCacheService configCache) throws IOException {
}

@Override
public CompletableFuture<Boolean> canProduceAsync(TopicName topicName, String role,
AuthenticationDataSource authenticationData) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -528,13 +528,14 @@ public void testNonExistentTopic() throws Exception {
doReturn(zkDataCache).when(configCacheService).policiesCache();
doReturn(CompletableFuture.completedFuture(Optional.empty())).when(zkDataCache).getAsync(matches(".*nonexistent.*"));

AuthorizationService authorizationService = spy(new AuthorizationService(svcConfig, configCacheService));
AuthorizationService authorizationService = spy(new AuthorizationService(svcConfig, pulsar.getPulsarResources()));
doReturn(authorizationService).when(brokerService).getAuthorizationService();
doReturn(true).when(brokerService).isAuthorizationEnabled();
svcConfig.setAuthorizationEnabled(true);
Field providerField = AuthorizationService.class.getDeclaredField("provider");
providerField.setAccessible(true);
PulsarAuthorizationProvider authorizationProvider = spy(new PulsarAuthorizationProvider(svcConfig, configCacheService));
PulsarAuthorizationProvider authorizationProvider = spy(new PulsarAuthorizationProvider(svcConfig,
pulsar.getPulsarResources()));
providerField.set(authorizationService, authorizationProvider);
doReturn(CompletableFuture.completedFuture(false)).when(authorizationProvider).isSuperUser(Mockito.anyString(), Mockito.any(), Mockito.any());

Expand All @@ -560,10 +561,11 @@ public void testNonExistentTopic() throws Exception {
@Test(timeOut = 30000)
public void testClusterAccess() throws Exception {
svcConfig.setAuthorizationEnabled(true);
AuthorizationService authorizationService = spy(new AuthorizationService(svcConfig, configCacheService));
AuthorizationService authorizationService = spy(new AuthorizationService(svcConfig, pulsar.getPulsarResources()));
Field providerField = AuthorizationService.class.getDeclaredField("provider");
providerField.setAccessible(true);
PulsarAuthorizationProvider authorizationProvider = spy(new PulsarAuthorizationProvider(svcConfig, configCacheService));
PulsarAuthorizationProvider authorizationProvider = spy(new PulsarAuthorizationProvider(svcConfig,
pulsar.getPulsarResources()));
providerField.set(authorizationService, authorizationProvider);
doReturn(authorizationService).when(brokerService).getAuthorizationService();
doReturn(true).when(brokerService).isAuthorizationEnabled();
Expand All @@ -590,12 +592,12 @@ public void testClusterAccess() throws Exception {

@Test(timeOut = 30000)
public void testNonExistentTopicSuperUserAccess() throws Exception {
AuthorizationService authorizationService = spy(new AuthorizationService(svcConfig, configCacheService));
AuthorizationService authorizationService = spy(new AuthorizationService(svcConfig, pulsar.getPulsarResources()));
doReturn(authorizationService).when(brokerService).getAuthorizationService();
doReturn(true).when(brokerService).isAuthorizationEnabled();
Field providerField = AuthorizationService.class.getDeclaredField("provider");
providerField.setAccessible(true);
PulsarAuthorizationProvider authorizationProvider = spy(new PulsarAuthorizationProvider(svcConfig, configCacheService));
PulsarAuthorizationProvider authorizationProvider = spy(new PulsarAuthorizationProvider(svcConfig, pulsar.getPulsarResources()));
providerField.set(authorizationService, authorizationProvider);
doReturn(CompletableFuture.completedFuture(true)).when(authorizationProvider).isSuperUser(Mockito.anyString(), Mockito.any(), Mockito.any());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import org.apache.pulsar.broker.authorization.AuthorizationService;
import org.apache.pulsar.broker.authorization.PulsarAuthorizationProvider;
import org.apache.pulsar.broker.cache.ConfigurationCacheService;
import org.apache.pulsar.broker.resources.PulsarResources;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.impl.MessageIdImpl;
import org.apache.pulsar.common.naming.NamespaceName;
Expand Down Expand Up @@ -459,7 +460,7 @@ public CompletableFuture<Boolean> isSuperUser(String role,
}

@Override
public void initialize(ServiceConfiguration conf, ConfigurationCacheService configCache) throws IOException {
public void initialize(ServiceConfiguration conf, PulsarResources pulsarResources) throws IOException {
this.conf = conf;
// No-op
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import org.apache.pulsar.broker.authentication.AuthenticationProvider;
import org.apache.pulsar.broker.authorization.AuthorizationProvider;
import org.apache.pulsar.broker.cache.ConfigurationCacheService;
import org.apache.pulsar.broker.resources.PulsarResources;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.api.Authentication;
import org.apache.pulsar.client.api.AuthenticationDataProvider;
Expand Down Expand Up @@ -258,7 +259,7 @@ public void close() throws IOException {
}

@Override
public void initialize(ServiceConfiguration conf, ConfigurationCacheService configCache) throws IOException {
public void initialize(ServiceConfiguration conf, PulsarResources resources) throws IOException {
this.conf = conf;
// No-op
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import static org.mockito.Mockito.when;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;

import com.google.common.collect.Sets;
import java.io.Closeable;
import java.io.File;
Expand All @@ -43,7 +42,7 @@
import org.apache.pulsar.broker.authentication.AuthenticationProviderTls;
import org.apache.pulsar.broker.authentication.AuthenticationService;
import org.apache.pulsar.broker.authorization.AuthorizationService;
import org.apache.pulsar.broker.cache.ConfigurationCacheService;
import org.apache.pulsar.broker.resources.PulsarResources;
import org.apache.pulsar.client.admin.Namespaces;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.PulsarAdminException;
Expand Down Expand Up @@ -138,7 +137,7 @@ void setup(Method method) throws Exception {
functionsWorkerService.init(workerConfig, null, false);

AuthenticationService authenticationService = new AuthenticationService(config);
AuthorizationService authorizationService = new AuthorizationService(config, mock(ConfigurationCacheService.class));
AuthorizationService authorizationService = new AuthorizationService(config, mock(PulsarResources.class));
when(functionsWorkerService.getAuthenticationService()).thenReturn(authenticationService);
when(functionsWorkerService.getAuthorizationService()).thenReturn(authorizationService);
when(functionsWorkerService.isInitialized()).thenReturn(true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,6 @@ public class DiscoveryService implements Closeable {
private final ServiceConfig config;
private String serviceUrl;
private String serviceUrlTls;
private ConfigurationMetadataCacheService configurationCacheService;
private AuthenticationService authenticationService;
private AuthorizationService authorizationService;
private BrokerDiscoveryProvider discoveryProvider;
Expand Down Expand Up @@ -96,10 +95,9 @@ public void start() throws Exception {
configMetadataStore = createConfigurationMetadataStore();
pulsarResources = new PulsarResources(localMetadataStore, configMetadataStore);
discoveryProvider = new BrokerDiscoveryProvider(this.config, pulsarResources);
this.configurationCacheService = new ConfigurationMetadataCacheService(pulsarResources, null);
ServiceConfiguration serviceConfiguration = PulsarConfigurationLoader.convertFrom(config);
authenticationService = new AuthenticationService(serviceConfiguration);
authorizationService = new AuthorizationService(serviceConfiguration, configurationCacheService);
authorizationService = new AuthorizationService(serviceConfiguration, pulsarResources);
startServer();
}

Expand Down Expand Up @@ -216,14 +214,6 @@ public AuthorizationService getAuthorizationService() {
return authorizationService;
}

public ConfigurationCacheService getConfigurationCacheService() {
return configurationCacheService;
}

public void setConfigurationCacheService(ConfigurationMetadataCacheService configurationCacheService) {
this.configurationCacheService = configurationCacheService;
}

public MetadataStoreExtended createLocalMetadataStore() throws MetadataStoreException {
return PulsarResources.createMetadataStore(config.getZookeeperServers(), config.getZookeeperSessionTimeoutMs());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,25 +18,19 @@
*/
package org.apache.pulsar.functions.worker;

import io.netty.util.concurrent.DefaultThreadFactory;
import java.io.IOException;
import java.util.Optional;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.common.util.OrderedExecutor;
import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.authentication.AuthenticationService;
import org.apache.pulsar.broker.authorization.AuthorizationService;
import org.apache.pulsar.broker.cache.ConfigurationCacheService;
import org.apache.pulsar.broker.cache.ConfigurationMetadataCacheService;
import org.apache.pulsar.broker.resources.PulsarResources;
import org.apache.pulsar.common.configuration.PulsarConfigurationLoader;
import org.apache.pulsar.functions.worker.rest.WorkerServer;
import org.apache.pulsar.functions.worker.service.WorkerServiceLoader;
import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
import org.apache.pulsar.zookeeper.GlobalZooKeeperCache;
import org.apache.pulsar.zookeeper.ZooKeeperClientFactory;
import org.apache.pulsar.zookeeper.ZookeeperBkClientFactoryImpl;

Expand All @@ -51,7 +45,6 @@ public class Worker {
private final OrderedExecutor orderedExecutor = OrderedExecutor.newBuilder().numThreads(8).name("zk-cache-ordered").build();
private PulsarResources pulsarResources;
private MetadataStoreExtended configMetadataStore;
private ConfigurationMetadataCacheService configurationCacheService;
private final ErrorNotifier errorNotifier;

public Worker(WorkerConfig workerConfig) {
Expand Down Expand Up @@ -89,9 +82,7 @@ private AuthorizationService getAuthorizationService() throws PulsarServerExcept
throw new PulsarServerException(e);
}
pulsarResources = new PulsarResources(null, configMetadataStore);
this.configurationCacheService = new ConfigurationMetadataCacheService(this.pulsarResources,
this.workerConfig.getPulsarFunctionsCluster());
return new AuthorizationService(getServiceConfiguration(), this.configurationCacheService);
return new AuthorizationService(getServiceConfiguration(), this.pulsarResources);
}
return null;
}
Expand Down
Loading

0 comments on commit 23ffdb7

Please sign in to comment.