Skip to content

Commit

Permalink
enable checkStyle plugin in pulsar-sql module (apache#13783)
Browse files Browse the repository at this point in the history
Motivation
enable checkStyle_plugin  in pulsar-websocket module.
  • Loading branch information
liudezhi2098 authored Jan 20, 2022
1 parent b0c7259 commit d9faf78
Show file tree
Hide file tree
Showing 30 changed files with 239 additions and 95 deletions.
14 changes: 14 additions & 0 deletions pulsar-websocket/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,20 @@
</execution>
</executions>
</plugin>

<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-checkstyle-plugin</artifactId>
<executions>
<execution>
<id>checkstyle</id>
<phase>verify</phase>
<goals>
<goal>check</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,15 @@
*/
package org.apache.pulsar.websocket;

import static com.google.common.base.Preconditions.checkArgument;
import com.google.common.base.Splitter;
import java.io.Closeable;
import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.broker.authentication.AuthenticationDataHttps;
import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
Expand All @@ -44,16 +52,6 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import java.io.Closeable;
import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;

import static com.google.common.base.Preconditions.checkArgument;

public abstract class AbstractWebSocketHandler extends WebSocketAdapter implements Closeable {

protected final WebSocketService service;
Expand All @@ -63,7 +61,8 @@ public abstract class AbstractWebSocketHandler extends WebSocketAdapter implemen
protected final Map<String, String> queryParams;


public AbstractWebSocketHandler(WebSocketService service, HttpServletRequest request, ServletUpgradeResponse response) {
public AbstractWebSocketHandler(WebSocketService service, HttpServletRequest request,
ServletUpgradeResponse response) {
this.service = service;
this.request = new WebSocketHttpServletRequestWrapper(request);
this.topic = extractTopicName(request);
Expand Down Expand Up @@ -218,20 +217,20 @@ private TopicName extractTopicName(HttpServletRequest request) {

final boolean isV2Format = parts.get(2).equals("v2");
final int domainIndex = isV2Format ? 4 : 3;
checkArgument(parts.get(domainIndex).equals("persistent") ||
parts.get(domainIndex).equals("non-persistent"));
checkArgument(parts.get(domainIndex).equals("persistent")
|| parts.get(domainIndex).equals("non-persistent"));


final String domain = parts.get(domainIndex);
final NamespaceName namespace = isV2Format ? NamespaceName.get(parts.get(5), parts.get(6)) :
NamespaceName.get( parts.get(4), parts.get(5), parts.get(6));
NamespaceName.get(parts.get(4), parts.get(5), parts.get(6));
//The topic name which contains slashes is also split , so it needs to be jointed
int startPosition = 7;
boolean isConsumer = "consumer".equals(parts.get(2)) || "consumer".equals(parts.get(3));
int endPosition = isConsumer ? parts.size() -1 : parts.size();
int endPosition = isConsumer ? parts.size() - 1 : parts.size();
StringBuilder topicName = new StringBuilder(parts.get(startPosition));
while (++startPosition < endPosition) {
if(StringUtils.isEmpty(parts.get(startPosition))){
if (StringUtils.isEmpty(parts.get(startPosition))){
continue;
}
topicName.append("/").append(parts.get(startPosition));
Expand All @@ -241,7 +240,8 @@ private TopicName extractTopicName(HttpServletRequest request) {
return TopicName.get(domain, namespace, name);
}

protected abstract Boolean isAuthorized(String authRole, AuthenticationDataSource authenticationData) throws Exception;
protected abstract Boolean isAuthorized(String authRole, AuthenticationDataSource authenticationData)
throws Exception;

private static final Logger log = LoggerFactory.getLogger(AbstractWebSocketHandler.class);
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,9 @@
package org.apache.pulsar.websocket;

import static com.google.common.base.Preconditions.checkArgument;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.google.common.base.Enums;
import com.google.common.base.Splitter;

import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import java.io.IOException;
Expand All @@ -33,9 +31,7 @@
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
import java.util.concurrent.atomic.LongAdder;

import javax.servlet.http.HttpServletRequest;

import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.ConsumerBuilder;
Expand Down Expand Up @@ -141,7 +137,8 @@ public ConsumerHandler(WebSocketService service, HttpServletRequest request, Ser

private void receiveMessage() {
if (log.isDebugEnabled()) {
log.debug("[{}:{}] [{}] [{}] Receive next message", request.getRemoteAddr(), request.getRemotePort(), topic, subscription);
log.debug("[{}:{}] [{}] [{}] Receive next message", request.getRemoteAddr(), request.getRemotePort(),
topic, subscription);
}

consumer.receiveAsync().thenAccept(msg -> {
Expand Down Expand Up @@ -480,8 +477,8 @@ public static String extractSubscription(HttpServletRequest request) {

final boolean isV2Format = parts.get(2).equals("v2");
final int domainIndex = isV2Format ? 4 : 3;
checkArgument(parts.get(domainIndex).equals("persistent") ||
parts.get(domainIndex).equals("non-persistent"));
checkArgument(parts.get(domainIndex).equals("persistent")
|| parts.get(domainIndex).equals("non-persistent"));
checkArgument(parts.get(8).length() > 0, "Empty subscription name");

return Codec.decode(parts.get(8));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,8 @@
import static org.apache.pulsar.websocket.WebSocketError.FailedToDeserializeFromJSON;
import static org.apache.pulsar.websocket.WebSocketError.PayloadEncodingError;
import static org.apache.pulsar.websocket.WebSocketError.UnknownError;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.google.common.base.Enums;

import java.io.IOException;
import java.time.format.DateTimeParseException;
import java.util.Arrays;
Expand All @@ -36,9 +34,7 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
import java.util.concurrent.atomic.LongAdder;

import javax.servlet.http.HttpServletRequest;

import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
import org.apache.pulsar.client.api.CompressionType;
import org.apache.pulsar.client.api.HashingScheme;
Expand All @@ -58,7 +54,6 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;


/**
* Websocket end-point url handler to handle incoming message coming from client. Websocket end-point url handler to
* handle incoming message coming from client.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,9 +122,10 @@ public ReaderHandler(WebSocketService service, HttpServletRequest request, Servl
}
}

private void receiveMessage() {
private void receiveMessage() {
if (log.isDebugEnabled()) {
log.debug("[{}:{}] [{}] [{}] Receive next message", request.getRemoteAddr(), request.getRemotePort(), topic, subscription);
log.debug("[{}:{}] [{}] [{}] Receive next message", request.getRemoteAddr(), request.getRemotePort(),
topic, subscription);
}

reader.readNextAsync().thenAccept(msg -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ public void configure(WebSocketServletFactory factory) {
if (service.getConfig().getWebSocketSessionIdleTimeoutMillis() > 0) {
factory.getPolicy().setIdleTimeout(service.getConfig().getWebSocketSessionIdleTimeoutMillis());
}
factory.setCreator((request, response) -> new ProducerHandler(service, request.getHttpServletRequest(), response));
factory.setCreator((request, response) -> new ProducerHandler(service, request.getHttpServletRequest(),
response));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -180,11 +180,11 @@ && isNotBlank(config.getBrokerClientAuthenticationParameters())) {
}

if (config.isBrokerClientTlsEnabled()) {
if (isNotBlank(clusterData.getBrokerServiceUrlTls())) {
clientBuilder.serviceUrl(clusterData.getBrokerServiceUrlTls());
} else if (isNotBlank(clusterData.getServiceUrlTls())) {
clientBuilder.serviceUrl(clusterData.getServiceUrlTls());
}
if (isNotBlank(clusterData.getBrokerServiceUrlTls())) {
clientBuilder.serviceUrl(clusterData.getBrokerServiceUrlTls());
} else if (isNotBlank(clusterData.getServiceUrlTls())) {
clientBuilder.serviceUrl(clusterData.getServiceUrlTls());
}
} else if (isNotBlank(clusterData.getBrokerServiceUrl())) {
clientBuilder.serviceUrl(clusterData.getBrokerServiceUrl());
} else {
Expand Down Expand Up @@ -234,14 +234,16 @@ public ScheduledExecutorService getExecutor() {
}

public boolean isAuthenticationEnabled() {
if (this.config == null)
if (this.config == null) {
return false;
}
return this.config.isAuthenticationEnabled();
}

public boolean isAuthorizationEnabled() {
if (this.config == null)
if (this.config == null) {
return false;
}
return this.config.isAuthorizationEnabled();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,7 @@
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;

import javax.ws.rs.core.Response.Status;

import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.stats.Metrics;
import org.apache.pulsar.common.util.RestException;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import javax.ws.rs.core.Context;
import javax.ws.rs.core.Response.Status;
import javax.ws.rs.core.UriInfo;

import org.apache.pulsar.broker.authentication.AuthenticationDataHttps;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.util.RestException;
Expand Down Expand Up @@ -61,7 +60,7 @@ protected WebSocketService service() {
}

/**
* Gets a caller id (IP + role)
* Gets a caller id (IP + role).
*
* @return the web service caller identification
*/
Expand Down Expand Up @@ -109,7 +108,7 @@ protected void validateSuperUserAccess() {
}

/**
* Checks if user has super-user access or user is authorized to produce/consume on a given topic
* Checks if user has super-user access or user is authorized to produce/consume on a given topic.
*
* @param topic
* @throws RestException
Expand All @@ -134,7 +133,7 @@ protected void validateUserAccess(TopicName topic) {
}

/**
* Checks if user is authorized to produce/consume on a given topic
* Checks if user is authorized to produce/consume on a given topic.
*
* @param topic
* @return
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

/**
* Helper class to access AirCompressor package private classes.
*/
package org.apache.pulsar.websocket.admin;
Original file line number Diff line number Diff line change
Expand Up @@ -19,22 +19,18 @@
package org.apache.pulsar.websocket.admin.v1;

import static org.apache.pulsar.common.util.Codec.decode;

import io.swagger.annotations.Api;
import io.swagger.annotations.ApiOperation;
import io.swagger.annotations.ApiResponse;
import io.swagger.annotations.ApiResponses;

import java.util.Collection;
import java.util.Map;

import javax.ws.rs.Encoded;
import javax.ws.rs.GET;
import javax.ws.rs.Path;
import javax.ws.rs.PathParam;
import javax.ws.rs.Produces;
import javax.ws.rs.core.MediaType;

import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.stats.Metrics;
import org.apache.pulsar.websocket.admin.WebSocketProxyStatsBase;
Expand All @@ -47,7 +43,8 @@ public class WebSocketProxyStatsV1 extends WebSocketProxyStatsBase {

@GET
@Path("/metrics")
@ApiOperation(value = "Gets the metrics for Monitoring", notes = "Requested should be executed by Monitoring agent on each proxy to fetch the metrics", response = Metrics.class, responseContainer = "List")
@ApiOperation(value = "Gets the metrics for Monitoring", notes = "Requested should be executed by Monitoring agent"
+ " on each proxy to fetch the metrics", response = Metrics.class, responseContainer = "List")
@ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission") })
public Collection<Metrics> internalGetMetrics() throws Exception {
return super.internalGetMetrics();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

/**
* Helper class to access AirCompressor package private classes.
*/
package org.apache.pulsar.websocket.admin.v1;
Original file line number Diff line number Diff line change
Expand Up @@ -19,22 +19,18 @@
package org.apache.pulsar.websocket.admin.v2;

import static org.apache.pulsar.common.util.Codec.decode;

import io.swagger.annotations.Api;
import io.swagger.annotations.ApiOperation;
import io.swagger.annotations.ApiResponse;
import io.swagger.annotations.ApiResponses;

import java.util.Collection;
import java.util.Map;

import javax.ws.rs.Encoded;
import javax.ws.rs.GET;
import javax.ws.rs.Path;
import javax.ws.rs.PathParam;
import javax.ws.rs.Produces;
import javax.ws.rs.core.MediaType;

import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.stats.Metrics;
import org.apache.pulsar.websocket.admin.WebSocketProxyStatsBase;
Expand All @@ -46,7 +42,8 @@
public class WebSocketProxyStatsV2 extends WebSocketProxyStatsBase {
@GET
@Path("/metrics")
@ApiOperation(value = "Gets the metrics for Monitoring", notes = "Requested should be executed by Monitoring agent on each proxy to fetch the metrics", response = Metrics.class, responseContainer = "List")
@ApiOperation(value = "Gets the metrics for Monitoring", notes = "Requested should be executed by Monitoring agent"
+ " on each proxy to fetch the metrics", response = Metrics.class, responseContainer = "List")
@ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission") })
public Collection<Metrics> internalGetMetrics() throws Exception {
return super.internalGetMetrics();
Expand Down
Loading

0 comments on commit d9faf78

Please sign in to comment.