Skip to content

Commit

Permalink
[Functions] Fixes function worker get superuser role (apache#9259)
Browse files Browse the repository at this point in the history
* Fixes function worker get superuser role
---

Fixes apache#7879

*Motivation*

Function worker should use authorization service to check
a role if a superuser.

*Modifications*

- Fix the isSuperuser method in the function

*Verify this change*

Please pick either of following options.

- Adjust the original test case to verify it

* Fix the tests
  • Loading branch information
zymap authored Jan 23, 2021
1 parent 65b3a62 commit d3f8440
Show file tree
Hide file tree
Showing 3 changed files with 68 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,17 @@
import static org.apache.commons.lang3.StringUtils.isNotBlank;
import static org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest.retryStrategically;
import static org.apache.pulsar.functions.utils.functioncache.FunctionCacheEntry.JAVA_INSTANCE_JAR_PROPERTY;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.when;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertNotEquals;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;

import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
Expand All @@ -54,6 +59,7 @@
import java.util.Optional;
import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
Expand All @@ -67,6 +73,7 @@
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.ServiceConfigurationUtils;
import org.apache.pulsar.broker.authentication.AuthenticationProviderTls;
import org.apache.pulsar.broker.authorization.AuthorizationService;
import org.apache.pulsar.broker.authorization.PulsarAuthorizationProvider;
import org.apache.pulsar.broker.loadbalance.impl.SimpleLoadManagerImpl;
import org.apache.pulsar.client.admin.BrokerStats;
Expand Down Expand Up @@ -1515,15 +1522,30 @@ public void testAuthorization(boolean validRoleName) throws Exception {
propAdmin.setAllowedClusters(Sets.newHashSet(Lists.newArrayList("use")));
admin.tenants().updateTenant(tenant, propAdmin);

String jarFilePathUrl = Utils.FILE + ":" + getClass().getClassLoader().getResource("pulsar-functions-api-examples.jar").getFile();
String jarFilePathUrl = Utils.FILE + ":" + getClass().getClassLoader()
.getResource("pulsar-functions-api-examples.jar").getFile();
FunctionConfig functionConfig = createFunctionConfig(tenant, namespacePortion, functionName,
"my.*", sinkTopic, subscriptionName);
try {
admin.functions().createFunctionWithUrl(functionConfig, jarFilePathUrl);
assertTrue(validRoleName);
} catch (org.apache.pulsar.client.admin.PulsarAdminException.NotAuthorizedException ne) {
assertFalse(validRoleName);
if (!validRoleName) {
// create a non-superuser admin to test the api
admin = spy(
PulsarAdmin.builder().serviceHttpUrl(pulsar.getWebServiceAddressTls())
.tlsTrustCertsFilePath(TLS_TRUST_CERT_FILE_PATH)
.allowTlsInsecureConnection(true).build());
try {
admin.functions().createFunctionWithUrl(functionConfig, jarFilePathUrl);
} catch (org.apache.pulsar.client.admin.PulsarAdminException.NotAuthorizedException ne) {
assertFalse(validRoleName);
}
} else {
try {
admin.functions().createFunctionWithUrl(functionConfig, jarFilePathUrl);
assertTrue(validRoleName);
} catch (org.apache.pulsar.client.admin.PulsarAdminException.NotAuthorizedException ne) {
fail();
}
}

}

@Test(timeOut = 20000)
Expand Down Expand Up @@ -1815,4 +1837,4 @@ static class Metric {
double value;
}

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;

Expand Down Expand Up @@ -1530,9 +1531,25 @@ protected void componentInstanceStatusRequestValidate (final String tenant,
}

public boolean isSuperUser(String clientRole) {
return clientRole != null
&& worker().getWorkerConfig().getSuperUserRoles() != null
&& worker().getWorkerConfig().getSuperUserRoles().contains(clientRole);
if (clientRole != null) {
try {
if ((worker().getWorkerConfig().getSuperUserRoles() != null
&& worker().getWorkerConfig().getSuperUserRoles().contains(clientRole))) {
return true;
}
return worker().getAuthorizationService().isSuperUser(clientRole, null)
.get(worker().getWorkerConfig().getZooKeeperOperationTimeoutSeconds(), SECONDS);
} catch (InterruptedException e) {
log.warn("Time-out {} sec while checking the role {} is a super user role ",
worker().getWorkerConfig().getZooKeeperOperationTimeoutSeconds(), clientRole);
throw new RestException(Status.INTERNAL_SERVER_ERROR, e.getMessage());
} catch (Exception e) {
log.warn("Admin-client with Role - failed to check the role {} is a super user role {} ", clientRole,
e.getMessage(), e);
throw new RestException(Status.INTERNAL_SERVER_ERROR, e.getMessage());
}
}
return false;
}

public boolean allowFunctionOps(NamespaceName namespaceName, String role,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@
import org.apache.pulsar.functions.worker.PulsarWorkerService;
import org.apache.pulsar.functions.worker.WorkerUtils;
import org.apache.pulsar.functions.worker.WorkerConfig;
import org.apache.pulsar.functions.worker.WorkerService;
import org.glassfish.jersey.media.multipart.FormDataContentDisposition;
import org.powermock.api.mockito.PowerMockito;
import org.powermock.core.classloader.annotations.PowerMockIgnore;
Expand All @@ -64,6 +63,7 @@
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;

import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.any;
import static org.mockito.Mockito.anyInt;
import static org.mockito.Mockito.eq;
Expand Down Expand Up @@ -248,6 +248,12 @@ public void testIsAuthorizedRole() throws PulsarAdminException, InterruptedExcep
// test super user
assertTrue(functionImpl.isAuthorizedRole("test-tenant", "test-ns", superUser, authenticationDataSource));

// test pulsar super user
final String pulsarSuperUser = "pulsarSuperUser";
when(authorizationService.isSuperUser(pulsarSuperUser, null)).thenReturn(CompletableFuture.completedFuture(true));
assertTrue(functionImpl.isAuthorizedRole("test-tenant", "test-ns", pulsarSuperUser, authenticationDataSource));
assertTrue(functionImpl.isSuperUser(pulsarSuperUser));

// test normal user
functionImpl = spy(new FunctionsImpl(() -> mockedWorkerService));
doReturn(false).when(functionImpl).allowFunctionOps(any(), any(), any());
Expand All @@ -257,6 +263,7 @@ public void testIsAuthorizedRole() throws PulsarAdminException, InterruptedExcep
when(admin.tenants()).thenReturn(tenants);
when(this.mockedWorkerService.getBrokerAdmin()).thenReturn(admin);
when(authorizationService.isTenantAdmin("test-tenant", "test-user", tenantInfo, authenticationDataSource)).thenReturn(CompletableFuture.completedFuture(false));
when(authorizationService.isSuperUser("test-user", null)).thenReturn(CompletableFuture.completedFuture(false));
assertFalse(functionImpl.isAuthorizedRole("test-tenant", "test-ns", "test-user", authenticationDataSource));

// if user is tenant admin
Expand All @@ -270,6 +277,7 @@ public void testIsAuthorizedRole() throws PulsarAdminException, InterruptedExcep
when(admin.tenants()).thenReturn(tenants);
when(this.mockedWorkerService.getBrokerAdmin()).thenReturn(admin);
when(authorizationService.isTenantAdmin("test-tenant", "test-user", tenantInfo, authenticationDataSource)).thenReturn(CompletableFuture.completedFuture(true));
when(authorizationService.isSuperUser("test-user", authenticationDataSource)).thenReturn(CompletableFuture.completedFuture(false));
assertTrue(functionImpl.isAuthorizedRole("test-tenant", "test-ns", "test-user", authenticationDataSource));

// test user allow function action
Expand Down Expand Up @@ -301,24 +309,31 @@ public void testIsAuthorizedRole() throws PulsarAdminException, InterruptedExcep
public void testIsSuperUser() throws PulsarAdminException {

FunctionsImpl functionImpl = spy(new FunctionsImpl(() -> mockedWorkerService));
AuthorizationService authorizationService = mock(AuthorizationService.class);
doReturn(authorizationService).when(mockedWorkerService).getAuthorizationService();
WorkerConfig workerConfig = new WorkerConfig();
workerConfig.setAuthorizationEnabled(true);
workerConfig.setSuperUserRoles(Collections.singleton(superUser));
doReturn(workerConfig).when(mockedWorkerService).getWorkerConfig();
when(authorizationService.isSuperUser(anyString(), any()))
.thenAnswer((invocationOnMock) -> {
String role = invocationOnMock.getArgument(0, String.class);
return CompletableFuture.completedFuture(superUser.equals(role));
});

AuthenticationDataSource authenticationDataSource = mock(AuthenticationDataSource.class);
assertTrue(functionImpl.isSuperUser(superUser));

assertFalse(functionImpl.isSuperUser("normal-user"));
assertFalse(functionImpl.isSuperUser( null));

// test super roles is null

// test super roles is null and it's not a pulsar super user
when(authorizationService.isSuperUser(superUser, null))
.thenReturn(CompletableFuture.completedFuture(false));
functionImpl = spy(new FunctionsImpl(() -> mockedWorkerService));
workerConfig = new WorkerConfig();
workerConfig.setAuthorizationEnabled(true);
doReturn(workerConfig).when(mockedWorkerService).getWorkerConfig();

assertFalse(functionImpl.isSuperUser(superUser));
}

Expand Down

0 comments on commit d3f8440

Please sign in to comment.