Skip to content

Commit

Permalink
Check for null arguments in Namespaces Rest API (apache#7247)
Browse files Browse the repository at this point in the history
* Check for null arguments

* Fix test

Co-authored-by: Sanjeev Kulkarni <[email protected]>
  • Loading branch information
srkukarni and Sanjeev Kulkarni authored Jun 12, 2020
1 parent eda3526 commit 134a8a2
Show file tree
Hide file tree
Showing 4 changed files with 62 additions and 34 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
*/
package org.apache.pulsar.broker.admin;

import static com.google.common.base.Preconditions.checkArgument;
import static org.apache.pulsar.broker.cache.ConfigurationCacheService.POLICIES;
import static org.apache.pulsar.common.util.Codec.decode;

Expand Down Expand Up @@ -397,7 +396,7 @@ protected void validateBrokerName(String broker) throws MalformedURLException {
if (!brokerUrl.equals(pulsar().getSafeWebServiceAddress())
&& !brokerUrlTls.equals(pulsar().getWebServiceAddressTls())) {
String[] parts = broker.split(":");
checkArgument(parts.length == 2, "Invalid broker url %s", broker);
checkArgument(parts.length == 2, String.format("Invalid broker url %s", broker));
String host = parts[0];
int port = Integer.parseInt(parts[1]);

Expand Down Expand Up @@ -844,4 +843,16 @@ protected void resumeAsyncResponseExceptionally(AsyncResponse asyncResponse, Thr
asyncResponse.resume(new RestException(throwable));
}
}

protected void checkNotNull(Object o, String errorMessage) {
if (o == null) {
throw new RestException(Status.BAD_REQUEST, errorMessage);
}
}

protected void checkArgument(boolean b, String errorMessage) {
if (!b) {
throw new RestException(Status.BAD_REQUEST, errorMessage);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,6 @@
*/
package org.apache.pulsar.broker.admin.impl;

import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
import static java.util.concurrent.TimeUnit.SECONDS;
import static org.apache.commons.lang3.StringUtils.isBlank;
import static org.apache.pulsar.broker.cache.ConfigurationCacheService.POLICIES;
Expand Down Expand Up @@ -110,6 +108,7 @@ public abstract class NamespacesBase extends AdminResource {
private static final long MAX_BUNDLES = ((long) 1) << 32;

protected List<String> internalGetTenantNamespaces(String tenant) {
checkNotNull(tenant, "Tenant should not be null");
validateTenantOperation(tenant, TenantOperation.LIST_NAMESPACES);

try {
Expand Down Expand Up @@ -380,6 +379,8 @@ protected void internalDeleteNamespaceBundle(String bundleRange, boolean authori

protected void internalGrantPermissionOnNamespace(String role, Set<AuthAction> actions) {
validateNamespaceOperation(namespaceName, NamespaceOperation.GRANT_PERMISSION);
checkNotNull(role, "Role should not be null");
checkNotNull(actions, "Actions should not be null");

try {
AuthorizationService authService = pulsar().getBrokerService().getAuthorizationService();
Expand Down Expand Up @@ -411,6 +412,8 @@ protected void internalGrantPermissionOnNamespace(String role, Set<AuthAction> a

protected void internalGrantPermissionOnSubscription(String subscription, Set<String> roles) {
validateNamespaceOperation(namespaceName, NamespaceOperation.GRANT_PERMISSION);
checkNotNull(subscription, "Subscription should not be null");
checkNotNull(roles, "Roles should not be null");

try {
AuthorizationService authService = pulsar().getBrokerService().getAuthorizationService();
Expand Down Expand Up @@ -442,6 +445,7 @@ protected void internalGrantPermissionOnSubscription(String subscription, Set<St
protected void internalRevokePermissionsOnNamespace(String role) {
validateNamespaceOperation(namespaceName, NamespaceOperation.REVOKE_PERMISSION);
validatePoliciesReadOnlyAccess();
checkNotNull(role, "Role should not be null");

try {
Stat nodeStat = new Stat();
Expand Down Expand Up @@ -472,6 +476,8 @@ protected void internalRevokePermissionsOnNamespace(String role) {
protected void internalRevokePermissionsOnSubscription(String subscriptionName, String role) {
validateNamespaceOperation(namespaceName, NamespaceOperation.REVOKE_PERMISSION);
validatePoliciesReadOnlyAccess();
checkNotNull(subscriptionName, "SubscriptionName should not be null");
checkNotNull(role, "Role should not be null");

AuthorizationService authService = pulsar().getBrokerService().getAuthorizationService();
if (null != authService) {
Expand All @@ -497,6 +503,7 @@ protected Set<String> internalGetNamespaceReplicationClusters() {
protected void internalSetNamespaceReplicationClusters(List<String> clusterIds) {
validateNamespacePolicyOperation(namespaceName, PolicyName.REPLICATION, PolicyOperation.WRITE);
validatePoliciesReadOnlyAccess();
checkNotNull(clusterIds, "ClusterIds should not be null");

Set<String> replicationClusterSet = Sets.newHashSet(clusterIds);
if (!namespaceName.isGlobal()) {
Expand Down Expand Up @@ -1021,6 +1028,7 @@ protected BookieAffinityGroupData internalGetBookieAffinityGroup() {
@SuppressWarnings("deprecation")
public void internalUnloadNamespaceBundle(String bundleRange, boolean authoritative) {
validateSuperUserAccess();
checkNotNull(bundleRange, "BundleRange should not be null");
log.info("[{}] Unloading namespace bundle {}/{}", clientAppId(), namespaceName, bundleRange);

Policies policies = getNamespacePolicies(namespaceName);
Expand Down Expand Up @@ -1069,6 +1077,7 @@ public void internalUnloadNamespaceBundle(String bundleRange, boolean authoritat
@SuppressWarnings("deprecation")
protected void internalSplitNamespaceBundle(String bundleRange, boolean authoritative, boolean unload, String splitAlgorithmName) {
validateSuperUserAccess();
checkNotNull(bundleRange, "BundleRange should not be null");
log.info("[{}] Split namespace bundle {}/{}", clientAppId(), namespaceName, bundleRange);

Policies policies = getNamespacePolicies(namespaceName);
Expand Down Expand Up @@ -1581,6 +1590,7 @@ protected void internalClearNamespaceBacklog(AsyncResponse asyncResponse, boolea
@SuppressWarnings("deprecation")
protected void internalClearNamespaceBundleBacklog(String bundleRange, boolean authoritative) {
validateNamespaceOperation(namespaceName, NamespaceOperation.CLEAR_BACKLOG);
checkNotNull(bundleRange, "BundleRange should not be null");

Policies policies = getNamespacePolicies(namespaceName);

Expand All @@ -1602,6 +1612,7 @@ protected void internalClearNamespaceBundleBacklog(String bundleRange, boolean a
protected void internalClearNamespaceBacklogForSubscription(AsyncResponse asyncResponse, String subscription,
boolean authoritative) {
validateNamespaceOperation(namespaceName, NamespaceOperation.CLEAR_BACKLOG);
checkNotNull(subscription, "Subscription should not be null");

final List<CompletableFuture<Void>> futures = Lists.newArrayList();
try {
Expand Down Expand Up @@ -1645,6 +1656,8 @@ protected void internalClearNamespaceBacklogForSubscription(AsyncResponse asyncR
protected void internalClearNamespaceBundleBacklogForSubscription(String subscription, String bundleRange,
boolean authoritative) {
validateNamespaceOperation(namespaceName, NamespaceOperation.CLEAR_BACKLOG);
checkNotNull(subscription, "Subscription should not be null");
checkNotNull(bundleRange, "BundleRange should not be null");

Policies policies = getNamespacePolicies(namespaceName);

Expand All @@ -1666,6 +1679,7 @@ protected void internalClearNamespaceBundleBacklogForSubscription(String subscri
protected void internalUnsubscribeNamespace(AsyncResponse asyncResponse, String subscription,
boolean authoritative) {
validateNamespaceOperation(namespaceName, NamespaceOperation.UNSUBSCRIBE);
checkNotNull(subscription, "Subscription should not be null");

final List<CompletableFuture<Void>> futures = Lists.newArrayList();
try {
Expand Down Expand Up @@ -1708,6 +1722,8 @@ protected void internalUnsubscribeNamespace(AsyncResponse asyncResponse, String
@SuppressWarnings("deprecation")
protected void internalUnsubscribeNamespaceBundle(String subscription, String bundleRange, boolean authoritative) {
validateNamespaceOperation(namespaceName, NamespaceOperation.UNSUBSCRIBE);
checkNotNull(subscription, "Subscription should not be null");
checkNotNull(bundleRange, "BundleRange should not be null");

Policies policies = getNamespacePolicies(namespaceName);

Expand Down Expand Up @@ -1845,6 +1861,7 @@ protected void internalSetDelayedDelivery(DelayedDeliveryPolicies delayedDeliver

protected void internalSetNamespaceAntiAffinityGroup(String antiAffinityGroup) {
validateNamespacePolicyOperation(namespaceName, PolicyName.ANTI_AFFINITY, PolicyOperation.WRITE);
checkNotNull(antiAffinityGroup, "AntiAffinityGroup should not be null");
validatePoliciesReadOnlyAccess();

log.info("[{}] Setting anti-affinity group {} for {}", clientAppId(), antiAffinityGroup, namespaceName);
Expand Down Expand Up @@ -1922,6 +1939,9 @@ protected void internalRemoveNamespaceAntiAffinityGroup() {
protected List<String> internalGetAntiAffinityNamespaces(String cluster, String antiAffinityGroup,
String tenant) {
validateNamespacePolicyOperation(namespaceName, PolicyName.ANTI_AFFINITY, PolicyOperation.READ);
checkNotNull(cluster, "Cluster should not be null");
checkNotNull(antiAffinityGroup, "AntiAffinityGroup should not be null");
checkNotNull(tenant, "Tenant should not be null");

log.info("[{}]-{} Finding namespaces for {} in {}", clientAppId(), tenant, antiAffinityGroup, cluster);

Expand Down Expand Up @@ -1952,24 +1972,21 @@ protected List<String> internalGetAntiAffinityNamespaces(String cluster, String
}

private void validatePersistencePolicies(PersistencePolicies persistence) {
try {
checkNotNull(persistence);
final ServiceConfiguration config = pulsar().getConfiguration();
checkArgument(persistence.getBookkeeperEnsemble() <= config.getManagedLedgerMaxEnsembleSize(),
"Bookkeeper-Ensemble must be <= %s", config.getManagedLedgerMaxEnsembleSize());
checkArgument(persistence.getBookkeeperWriteQuorum() <= config.getManagedLedgerMaxWriteQuorum(),
"Bookkeeper-WriteQuorum must be <= %s", config.getManagedLedgerMaxWriteQuorum());
checkArgument(persistence.getBookkeeperAckQuorum() <= config.getManagedLedgerMaxAckQuorum(),
"Bookkeeper-AckQuorum must be <= %s", config.getManagedLedgerMaxAckQuorum());
checkArgument(
(persistence.getBookkeeperEnsemble() >= persistence.getBookkeeperWriteQuorum())
&& (persistence.getBookkeeperWriteQuorum() >= persistence.getBookkeeperAckQuorum()),
"Bookkeeper Ensemble (%s) >= WriteQuorum (%s) >= AckQuoru (%s)",
persistence.getBookkeeperEnsemble(), persistence.getBookkeeperWriteQuorum(),
persistence.getBookkeeperAckQuorum());
} catch (NullPointerException | IllegalArgumentException e) {
throw new RestException(Status.PRECONDITION_FAILED, e.getMessage());
}
checkNotNull(persistence, "persistence policies should not be null");
final ServiceConfiguration config = pulsar().getConfiguration();
checkArgument(persistence.getBookkeeperEnsemble() <= config.getManagedLedgerMaxEnsembleSize(),
"Bookkeeper-Ensemble must be <= " + config.getManagedLedgerMaxEnsembleSize());
checkArgument(persistence.getBookkeeperWriteQuorum() <= config.getManagedLedgerMaxWriteQuorum(),
"Bookkeeper-WriteQuorum must be <= " + config.getManagedLedgerMaxWriteQuorum());
checkArgument(persistence.getBookkeeperAckQuorum() <= config.getManagedLedgerMaxAckQuorum(),
"Bookkeeper-AckQuorum must be <= " + config.getManagedLedgerMaxAckQuorum());
checkArgument(
(persistence.getBookkeeperEnsemble() >= persistence.getBookkeeperWriteQuorum())
&& (persistence.getBookkeeperWriteQuorum() >= persistence.getBookkeeperAckQuorum()),
String.format("Bookkeeper Ensemble (%s) >= WriteQuorum (%s) >= AckQuoru (%s)",
persistence.getBookkeeperEnsemble(), persistence.getBookkeeperWriteQuorum(),
persistence.getBookkeeperAckQuorum()));

}

protected RetentionPolicies internalGetRetention() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,13 @@
*/
package org.apache.pulsar.broker.admin.impl;

import static com.google.common.base.Preconditions.checkNotNull;
import static org.apache.pulsar.broker.cache.ConfigurationCacheService.POLICIES;

import static org.apache.pulsar.common.util.Codec.decode;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.github.zafarkhaja.semver.Version;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
Expand Down Expand Up @@ -1404,11 +1404,11 @@ protected void internalSkipMessages(String subName, int numMessages, boolean aut
if (subName.startsWith(topic.getReplicatorPrefix())) {
String remoteCluster = PersistentReplicator.getRemoteCluster(subName);
PersistentReplicator repl = (PersistentReplicator) topic.getPersistentReplicator(remoteCluster);
checkNotNull(repl);
Preconditions.checkNotNull(repl);
repl.skipMessages(numMessages).get();
} else {
PersistentSubscription sub = topic.getSubscription(subName);
checkNotNull(sub);
Preconditions.checkNotNull(sub);
sub.skipMessages(numMessages).get();
}
log.info("[{}] Skipped {} messages on {} {}", clientAppId(), numMessages, topicName, subName);
Expand Down Expand Up @@ -1798,7 +1798,7 @@ protected void internalResetCursorOnPosition(String subName, boolean authoritati
}
try {
PersistentSubscription sub = topic.getSubscription(subName);
checkNotNull(sub);
Preconditions.checkNotNull(sub);
sub.resetCursor(PositionImpl.get(messageId.getLedgerId(), messageId.getEntryId())).get();
log.info("[{}][{}] successfully reset cursor on subscription {} to position {}", clientAppId(),
topicName, subName, messageId);
Expand Down Expand Up @@ -1907,7 +1907,7 @@ private void verifyReadOperation(boolean authoritative) {
}

private Response generateResponseWithEntry(Entry entry) throws IOException {
checkNotNull(entry);
Preconditions.checkNotNull(entry);
PositionImpl pos = (PositionImpl) entry.getPosition();
ByteBuf metadataAndPayload = entry.getDataBuffer();

Expand Down Expand Up @@ -2151,11 +2151,11 @@ private void internalExpireMessagesForSinglePartition(String subName, int expire
if (subName.startsWith(topic.getReplicatorPrefix())) {
String remoteCluster = PersistentReplicator.getRemoteCluster(subName);
PersistentReplicator repl = (PersistentReplicator) topic.getPersistentReplicator(remoteCluster);
checkNotNull(repl);
Preconditions.checkNotNull(repl);
repl.expireMessages(expireTimeInSeconds);
} else {
PersistentSubscription sub = topic.getSubscription(subName);
checkNotNull(sub);
Preconditions.checkNotNull(sub);
sub.expireMessages(expireTimeInSeconds);
}
log.info("[{}] Message expire started up to {} on {} {}", clientAppId(), expireTimeInSeconds, topicName,
Expand Down Expand Up @@ -2374,7 +2374,7 @@ private Topic getOrCreateTopic(TopicName topicName) {
private Subscription getSubscriptionReference(String subName, PersistentTopic topic) {
try {
Subscription sub = topic.getSubscription(subName);
return checkNotNull(sub);
return Preconditions.checkNotNull(sub);
} catch (Exception e) {
throw new RestException(Status.NOT_FOUND, "Subscription not found");
}
Expand All @@ -2387,7 +2387,7 @@ private PersistentReplicator getReplicatorReference(String replName, PersistentT
try {
String remoteCluster = PersistentReplicator.getRemoteCluster(replName);
PersistentReplicator repl = (PersistentReplicator) topic.getPersistentReplicator(remoteCluster);
return checkNotNull(repl);
return Preconditions.checkNotNull(repl);
} catch (Exception e) {
throw new RestException(Status.NOT_FOUND, "Replicator not found");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -301,19 +301,19 @@ public void testSetPersistencepolicies() throws Exception {
admin.namespaces().setPersistence(namespace, new PersistencePolicies(3, 4, 3, 10.0));
fail("should have failed");
} catch (PulsarAdminException e) {
assertEquals(e.getStatusCode(), 412);
assertEquals(e.getStatusCode(), 400);
}
try {
admin.namespaces().setPersistence(namespace, new PersistencePolicies(3, 3, 4, 10.0));
fail("should have failed");
} catch (PulsarAdminException e) {
assertEquals(e.getStatusCode(), 412);
assertEquals(e.getStatusCode(), 400);
}
try {
admin.namespaces().setPersistence(namespace, new PersistencePolicies(6, 3, 1, 10.0));
fail("should have failed");
} catch (PulsarAdminException e) {
assertEquals(e.getStatusCode(), 412);
assertEquals(e.getStatusCode(), 400);
}

// make sure policies has not been changed
Expand Down

0 comments on commit 134a8a2

Please sign in to comment.