Skip to content

Commit

Permalink
make module pulsar-broker gradually conform checkstyle (apache#8821)
Browse files Browse the repository at this point in the history
Main issue: https://github.com/streamnative/pulsar/issues/1768

This PR changes code style related things only
  • Loading branch information
Renkai authored Dec 7, 2020
1 parent f62f12a commit d418e02
Show file tree
Hide file tree
Showing 11 changed files with 975 additions and 694 deletions.
2 changes: 1 addition & 1 deletion buildtools/src/main/resources/pulsar/suppressions.xml
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@
<suppress checks="ConstantName" files="MessageId.java"/>
<suppress checks="MethodName" files="TopicsImpl.java"/>
<suppress checks="MemberName" files="TopicsImpl.java"/>
<suppress checks=".*" files="src/main/java/org/apache/pulsar/broker/admin/.*.java"/>
<suppress checks=".*" files="src/main/java/org/apache/pulsar/broker/admin/impl/.*.java"/>
<suppress checks=".*" files="src/main/java/org/apache/pulsar/broker/cache/.*.java"/>
<suppress checks=".*" files="src/main/java/org/apache/pulsar/broker/delayed/.*.java"/>
<suppress checks=".*" files="src/main/java/org/apache/pulsar/broker/intercept/.*.java"/>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,8 @@ protected void zkCreateOptimistic(String path, byte[] content) throws Exception
ZkUtils.createFullPathOptimistic(globalZk(), path, content, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
}

protected void zkCreateOptimisticAsync(ZooKeeper zk, String path, byte[] content, AsyncCallback.StringCallback callback) {
protected void zkCreateOptimisticAsync(ZooKeeper zk, String path,
byte[] content, AsyncCallback.StringCallback callback) {
ZkUtils.asyncCreateFullPathOptimistic(zk, path, content, ZooDefs.Ids.OPEN_ACL_UNSAFE,
CreateMode.PERSISTENT, callback, null);
}
Expand Down Expand Up @@ -148,7 +149,7 @@ protected void zkSync(String path) throws Exception {
}

/**
* Get the domain of the topic (whether it's persistent or non-persistent)
* Get the domain of the topic (whether it's persistent or non-persistent).
*/
protected String domain() {
if (uri.getPath().startsWith("persistent/")) {
Expand Down Expand Up @@ -226,10 +227,9 @@ public void validatePoliciesReadOnlyAccess() {
}

/**
* Get the list of namespaces (on every cluster) for a given property
* Get the list of namespaces (on every cluster) for a given property.
*
* @param property
* the property name
* @param property the property name
* @return the list of namespaces
*/
protected List<String> getListOfNamespaces(String property) throws Exception {
Expand All @@ -239,7 +239,8 @@ protected List<String> getListOfNamespaces(String property) throws Exception {
for (String clusterOrNamespace : globalZk().getChildren(path(POLICIES, property), false)) {
// Then get the list of namespaces
try {
final List<String> children = globalZk().getChildren(path(POLICIES, property, clusterOrNamespace), false);
final List<String> children = globalZk().getChildren(
path(POLICIES, property, clusterOrNamespace), false);
if (children == null || children.isEmpty()) {
String namespace = NamespaceName.get(property, clusterOrNamespace).toString();
// if the length is 0 then this is probably a leftover cluster from namespace created
Expand Down Expand Up @@ -274,18 +275,19 @@ protected CompletableFuture<Void> tryCreatePartitionsAsync(int numPartitions) {

private CompletableFuture<Void> tryCreatePartitionAsync(final int partition, CompletableFuture<Void> reuseFuture) {
CompletableFuture<Void> result = reuseFuture == null ? new CompletableFuture<>() : reuseFuture;
zkCreateOptimisticAsync(localZk(), ZkAdminPaths.managedLedgerPath(topicName.getPartition(partition)), new byte[0],
(rc, s, o, s1) -> {
if (KeeperException.Code.OK.intValue() == rc) {
if (log.isDebugEnabled()) {
log.debug("[{}] Topic partition {} created.", clientAppId(),
topicName.getPartition(partition));
}
result.complete(null);
} else if (KeeperException.Code.NODEEXISTS.intValue() == rc) {
log.info("[{}] Topic partition {} is exists, doing nothing.", clientAppId(),
topicName.getPartition(partition));
result.complete(null);
zkCreateOptimisticAsync(localZk(),
ZkAdminPaths.managedLedgerPath(topicName.getPartition(partition)), new byte[0],
(rc, s, o, s1) -> {
if (KeeperException.Code.OK.intValue() == rc) {
if (log.isDebugEnabled()) {
log.debug("[{}] Topic partition {} created.", clientAppId(),
topicName.getPartition(partition));
}
result.complete(null);
} else if (KeeperException.Code.NODEEXISTS.intValue() == rc) {
log.info("[{}] Topic partition {} is exists, doing nothing.", clientAppId(),
topicName.getPartition(partition));
result.complete(null);
} else if (KeeperException.Code.BADVERSION.intValue() == rc) {
log.warn("[{}] Fail to create topic partition {} with concurrent modification, retry now.",
clientAppId(), topicName.getPartition(partition));
Expand Down Expand Up @@ -354,7 +356,8 @@ protected void validatePartitionedTopicName(String tenant, String namespace, Str
validateTopicName(tenant, namespace, encodedTopic);
// second, "-partition-" is not allowed
if (encodedTopic.contains(TopicName.PARTITIONED_TOPIC_SUFFIX)) {
throw new RestException(Status.PRECONDITION_FAILED, "Partitioned Topic Name should not contain '-partition-'");
throw new RestException(Status.PRECONDITION_FAILED,
"Partitioned Topic Name should not contain '-partition-'");
}
}

Expand All @@ -365,8 +368,9 @@ protected void validatePartitionedTopicMetadata(String tenant, String namespace,
if (partitionedTopicMetadata.partitions < 1) {
throw new RestException(Status.CONFLICT, "Topic is not partitioned topic");
}
} catch ( InterruptedException | ExecutionException e) {
log.error("Failed to validate partitioned topic metadata {}://{}/{}/{}", domain(), tenant, namespace, topicName, e);
} catch (InterruptedException | ExecutionException e) {
log.error("Failed to validate partitioned topic metadata {}://{}/{}/{}",
domain(), tenant, namespace, topicName, e);
throw new RestException(Status.INTERNAL_SERVER_ERROR, "Check topic partition meta failed.");
}
}
Expand All @@ -385,7 +389,7 @@ protected void validateTopicName(String property, String cluster, String namespa
}

/**
* Redirect the call to the specified broker
* Redirect the call to the specified broker.
*
* @param broker
* Broker name
Expand Down Expand Up @@ -527,14 +531,14 @@ protected Optional<TopicPolicies> getTopicPolicies(TopicName topicName) {
}

protected boolean checkBacklogQuota(BacklogQuota quota, RetentionPolicies retention) {
if (retention == null || retention.getRetentionSizeInMB() == 0 ||
retention.getRetentionSizeInMB() == -1) {
if (retention == null || retention.getRetentionSizeInMB() == 0
|| retention.getRetentionSizeInMB() == -1) {
return true;
}
if (quota == null) {
quota = pulsar().getBrokerService().getBacklogQuotaManager().getDefaultQuota();
}
if (quota.getLimit() >= ( retention.getRetentionSizeInMB() * 1024 * 1024)) {
if (quota.getLimit() >= (retention.getRetentionSizeInMB() * 1024 * 1024)) {
return false;
}
return true;
Expand Down Expand Up @@ -765,7 +769,8 @@ protected List<String> getPartitionedTopicList(TopicDomain topicDomain) {
List<String> partitionedTopics = Lists.newArrayList();

try {
String partitionedTopicPath = path(PARTITIONED_TOPIC_PATH_ZNODE, namespaceName.toString(), topicDomain.value());
String partitionedTopicPath = path(PARTITIONED_TOPIC_PATH_ZNODE,
namespaceName.toString(), topicDomain.value());
List<String> topics = globalZk().getChildren(partitionedTopicPath, false);
partitionedTopics = topics.stream()
.map(s -> String.format("%s://%s/%s", topicDomain.value(), namespaceName.toString(), decode(s)))
Expand All @@ -792,11 +797,13 @@ protected void internalCreatePartitionedTopic(AsyncResponse asyncResponse, int n
return;
}
if (numPartitions <= 0) {
asyncResponse.resume(new RestException(Status.NOT_ACCEPTABLE, "Number of partitions should be more than 0"));
asyncResponse.resume(new RestException(Status.NOT_ACCEPTABLE,
"Number of partitions should be more than 0"));
return;
}
if (maxPartitions > 0 && numPartitions > maxPartitions) {
asyncResponse.resume(new RestException(Status.NOT_ACCEPTABLE, "Number of partitions should be less than or equal to " + maxPartitions));
asyncResponse.resume(new RestException(Status.NOT_ACCEPTABLE,
"Number of partitions should be less than or equal to " + maxPartitions));
return;
}
checkTopicExistsAsync(topicName).thenAccept(exists -> {
Expand All @@ -812,31 +819,42 @@ protected void internalCreatePartitionedTopic(AsyncResponse asyncResponse, int n
if (KeeperException.Code.OK.intValue() == rc) {
globalZk().sync(path, (rc2, s2, ctx) -> {
if (KeeperException.Code.OK.intValue() == rc2) {
log.info("[{}] Successfully created partitioned topic {}", clientAppId(), topicName);
log.info("[{}] Successfully created partitioned topic {}",
clientAppId(), topicName);
tryCreatePartitionsAsync(numPartitions).thenAccept(v -> {
log.info("[{}] Successfully created partitions for topic {}", clientAppId(), topicName);
log.info("[{}] Successfully created partitions for topic {}",
clientAppId(), topicName);
asyncResponse.resume(Response.noContent().build());
}).exceptionally(e -> {
log.error("[{}] Failed to create partitions for topic {}", clientAppId(), topicName);
log.error("[{}] Failed to create partitions for topic {}",
clientAppId(), topicName);
// The partitioned topic is created but there are some partitions create failed
asyncResponse.resume(new RestException(e));
return null;
});
} else {
log.error("[{}] Failed to create partitioned topic {}", clientAppId(), topicName, KeeperException.create(KeeperException.Code.get(rc2)));
asyncResponse.resume(new RestException(KeeperException.create(KeeperException.Code.get(rc2))));
log.error("[{}] Failed to create partitioned topic {}",
clientAppId(), topicName,
KeeperException.create(KeeperException.Code.get(rc2)));
asyncResponse.resume(
new RestException(KeeperException.create(KeeperException.Code.get(rc2))));
}
}, null);
} else if (KeeperException.Code.NODEEXISTS.intValue() == rc) {
log.warn("[{}] Failed to create already existing partitioned topic {}", clientAppId(), topicName);
asyncResponse.resume(new RestException(Status.CONFLICT, "Partitioned topic already exists"));
log.warn("[{}] Failed to create already existing partitioned topic {}",
clientAppId(), topicName);
asyncResponse.resume(new RestException(Status.CONFLICT,
"Partitioned topic already exists"));
} else if (KeeperException.Code.BADVERSION.intValue() == rc) {
log.warn("[{}] Failed to create partitioned topic {}: concurrent modification", clientAppId(),
log.warn("[{}] Failed to create partitioned topic {}: concurrent modification",
clientAppId(),
topicName);
asyncResponse.resume(new RestException(Status.CONFLICT, "Concurrent modification"));
} else {
log.error("[{}] Failed to create partitioned topic {}", clientAppId(), topicName, KeeperException.create(KeeperException.Code.get(rc)));
asyncResponse.resume(new RestException(KeeperException.create(KeeperException.Code.get(rc))));
log.error("[{}] Failed to create partitioned topic {}",
clientAppId(), topicName, KeeperException.create(KeeperException.Code.get(rc)));
asyncResponse.resume(
new RestException(KeeperException.create(KeeperException.Code.get(rc))));
}
});
} catch (Exception e) {
Expand Down Expand Up @@ -865,7 +883,8 @@ protected CompletableFuture<Boolean> checkTopicExistsAsync(TopicName topicName)
.thenCompose(topics -> {
boolean exists = false;
for (String topic : topics) {
if (topicName.getPartitionedTopicName().equals(TopicName.get(topic).getPartitionedTopicName())) {
if (topicName.getPartitionedTopicName().equals(
TopicName.get(topic).getPartitionedTopicName())) {
exists = true;
break;
}
Expand Down
Loading

0 comments on commit d418e02

Please sign in to comment.