diff --git a/pulsar-common/pom.xml b/pulsar-common/pom.xml
index 7533439c2e8b6..9bf7c44bed148 100644
--- a/pulsar-common/pom.xml
+++ b/pulsar-common/pom.xml
@@ -224,8 +224,23 @@
-
-
+
+ com.github.spotbugs
+ spotbugs-maven-plugin
+ ${spotbugs-maven-plugin.version}
+
+
+ spotbugs
+ verify
+
+ check
+
+
+
+
+ ${basedir}/src/main/resources/findbugsExclude.xml
+
+
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/client/api/url/DataURLStreamHandler.java b/pulsar-common/src/main/java/org/apache/pulsar/client/api/url/DataURLStreamHandler.java
index 6dbfd73fa4e98..7b27d1fc6452d 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/client/api/url/DataURLStreamHandler.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/client/api/url/DataURLStreamHandler.java
@@ -27,6 +27,7 @@
import java.net.URL;
import java.net.URLConnection;
import java.net.URLStreamHandler;
+import java.nio.charset.StandardCharsets;
import java.util.Base64;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
@@ -76,7 +77,7 @@ public void connect() throws IOException {
if (matcher.group("base64") == null) {
// Support Urlencode but not decode here because already decoded by URI class.
- this.data = matcher.group("data").getBytes();
+ this.data = matcher.group("data").getBytes(StandardCharsets.UTF_8);
} else {
this.data = Base64.getDecoder().decode(matcher.group("data"));
}
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/api/raw/RawMessageImpl.java b/pulsar-common/src/main/java/org/apache/pulsar/common/api/raw/RawMessageImpl.java
index 014cc992625e3..41907fbd7351e 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/api/raw/RawMessageImpl.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/api/raw/RawMessageImpl.java
@@ -22,6 +22,7 @@
import io.netty.buffer.Unpooled;
import io.netty.util.Recycler;
import io.netty.util.Recycler.Handle;
+import java.nio.charset.StandardCharsets;
import java.util.Base64;
import java.util.Collections;
import java.util.Map;
@@ -155,7 +156,7 @@ public Optional getKeyBytes() {
if (hasBase64EncodedKey()) {
return Optional.of(Unpooled.wrappedBuffer(Base64.getDecoder().decode(getKey().get())));
} else {
- return Optional.of(Unpooled.wrappedBuffer(getKey().get().getBytes()));
+ return Optional.of(Unpooled.wrappedBuffer(getKey().get().getBytes(StandardCharsets.UTF_8)));
}
}
return Optional.empty();
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/nar/FileUtils.java b/pulsar-common/src/main/java/org/apache/pulsar/common/nar/FileUtils.java
index cc84dc6ccc33b..cc677302ed802 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/nar/FileUtils.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/nar/FileUtils.java
@@ -181,7 +181,8 @@ public static void deleteFilesInDirectory(
}
if (ingestFile.isDirectory() && recurse) {
FileUtils.deleteFilesInDirectory(ingestFile, filter, logger, recurse, deleteEmptyDirectories);
- if (deleteEmptyDirectories && ingestFile.list().length == 0) {
+ String[] ingestFileList = ingestFile.list();
+ if (deleteEmptyDirectories && ingestFileList != null && ingestFileList.length == 0) {
FileUtils.deleteFile(ingestFile, logger, 3);
}
}
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/nar/NarClassLoader.java b/pulsar-common/src/main/java/org/apache/pulsar/common/nar/NarClassLoader.java
index bc6af797e813c..d1c478ab6cb1e 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/nar/NarClassLoader.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/nar/NarClassLoader.java
@@ -26,18 +26,25 @@
import java.io.BufferedReader;
import java.io.File;
import java.io.FileFilter;
+import java.io.FileInputStream;
import java.io.FileReader;
import java.io.IOException;
+import java.io.InputStreamReader;
import java.net.URL;
import java.net.URLClassLoader;
+import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
+import java.security.AccessController;
+import java.security.PrivilegedAction;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Comparator;
import java.util.List;
import java.util.Set;
+
+import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
/**
@@ -151,11 +158,13 @@ public static NarClassLoader getFromArchive(File narPath, Set additional
String narExtractionDirectory)
throws IOException {
File unpacked = NarUnpacker.unpackNar(narPath, getNarExtractionDirectory(narExtractionDirectory));
- try {
- return new NarClassLoader(unpacked, additionalJars, parent);
- } catch (ClassNotFoundException | NoClassDefFoundError e) {
- throw new IOException(e);
- }
+ return AccessController.doPrivileged(new PrivilegedAction() {
+ @SneakyThrows
+ @Override
+ public NarClassLoader run() {
+ return new NarClassLoader(unpacked, additionalJars, parent);
+ }
+ });
}
private static File getNarExtractionDirectory(String configuredDirectory) {
@@ -208,7 +217,7 @@ public File getWorkingDirectory() {
*/
public String getServiceDefinition(String serviceName) throws IOException {
String serviceDefPath = narWorkingDirectory + "/META-INF/services/" + serviceName;
- return new String(Files.readAllBytes(Paths.get(serviceDefPath)));
+ return new String(Files.readAllBytes(Paths.get(serviceDefPath)), StandardCharsets.UTF_8);
}
public List getServiceImplementation(String serviceName) throws IOException {
@@ -216,7 +225,8 @@ public List getServiceImplementation(String serviceName) throws IOExcept
String serviceDefPath = narWorkingDirectory + "/META-INF/services/" + serviceName;
- try (BufferedReader reader = new BufferedReader(new FileReader(serviceDefPath))) {
+ try (BufferedReader reader = new BufferedReader(
+ new InputStreamReader(new FileInputStream(serviceDefPath), StandardCharsets.UTF_8))) {
String line;
while ((line = reader.readLine()) != null) {
line = line.trim();
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/net/ServiceURI.java b/pulsar-common/src/main/java/org/apache/pulsar/common/net/ServiceURI.java
index 3c72d8a6a3367..28667559e4485 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/net/ServiceURI.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/net/ServiceURI.java
@@ -26,6 +26,7 @@
import com.google.common.base.Strings;
import java.net.URI;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.List;
import java.util.stream.Collectors;
import lombok.AccessLevel;
@@ -220,7 +221,8 @@ private static int getServicePort(String serviceName, String[] serviceInfos) {
} else if (serviceInfos.length == 1 && serviceInfos[0].toLowerCase().equals(SSL_SERVICE)) {
port = BINARY_TLS_PORT;
} else {
- throw new IllegalArgumentException("Invalid pulsar service : " + serviceName + "+" + serviceInfos);
+ throw new IllegalArgumentException("Invalid pulsar service : " + serviceName + "+"
+ + Arrays.toString(serviceInfos));
}
break;
case HTTP_SERVICE:
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/BrokerStatus.java b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/BrokerStatus.java
index c7c2a8e0a8621..bd7a338677ba7 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/BrokerStatus.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/BrokerStatus.java
@@ -19,10 +19,12 @@
package org.apache.pulsar.common.policies.data;
import com.google.common.collect.ComparisonChain;
+import lombok.EqualsAndHashCode;
/**
* Information about the broker status.
*/
+@EqualsAndHashCode
public class BrokerStatus implements Comparable {
private String brokerAddress;
private boolean active;
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/FunctionStats.java b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/FunctionStats.java
index 8ccf07168aac9..94eb9905e970a 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/FunctionStats.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/FunctionStats.java
@@ -27,6 +27,7 @@
import java.util.List;
import java.util.Map;
import lombok.Data;
+import lombok.EqualsAndHashCode;
import org.apache.pulsar.common.util.ObjectMapperFactory;
/**
@@ -120,6 +121,7 @@ public static class FunctionInstanceStatsDataBase {
/**
* Function instance statistics data.
*/
+ @EqualsAndHashCode(callSuper = true)
@Data
@JsonInclude(JsonInclude.Include.ALWAYS)
@JsonPropertyOrder({ "receivedTotal", "processedSuccessfullyTotal", "systemExceptionsTotal",
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/OffloadPolicies.java b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/OffloadPolicies.java
index 4c5058b88cf5e..da9db187517dd 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/OffloadPolicies.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/OffloadPolicies.java
@@ -60,7 +60,7 @@ public class OffloadPolicies implements Serializable {
public final static int DEFAULT_READ_BUFFER_SIZE_IN_BYTES = 1024 * 1024; // 1MB
public final static int DEFAULT_OFFLOAD_MAX_THREADS = 2;
public final static int DEFAULT_OFFLOAD_MAX_PREFETCH_ROUNDS = 1;
- public final static String[] DRIVER_NAMES = {
+ final static String[] DRIVER_NAMES = {
"S3", "aws-s3", "google-cloud-storage", "filesystem", "azureblob", "aliyun-oss"
};
public final static String DEFAULT_OFFLOADER_DIRECTORY = "./offloaders";
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/PersistentOfflineTopicStats.java b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/PersistentOfflineTopicStats.java
index 87e858f97a08d..0880bae8f594f 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/PersistentOfflineTopicStats.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/PersistentOfflineTopicStats.java
@@ -72,7 +72,7 @@ public void reset() {
/**
* Details about a cursor.
*/
- public class CursorDetails {
+ public static class CursorDetails {
public long cursorBacklog;
public long cursorLedgerId;
@@ -93,7 +93,7 @@ public void addLedgerDetails(long entries, long timestamp, long size, long ledge
/**
* Details about a ledger.
*/
- public class LedgerDetails {
+ public static class LedgerDetails {
public long entries;
public long timestamp;
public long size;
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/Policies.java b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/Policies.java
index a5e218441f6a2..6a748d3ea039b 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/Policies.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/Policies.java
@@ -240,15 +240,14 @@ public String toString() {
.add("offload_policies", offload_policies).toString();
}
-
private static final long MAX_BUNDLES = ((long) 1) << 32;
public static BundlesData getBundles(int numBundles) {
- if (numBundles <= 0 || numBundles > MAX_BUNDLES) {
+ if (numBundles <= 0) {
throw new RestException(Status.BAD_REQUEST,
- "Invalid number of bundles. Number of numbles has to be in the range of (0, 2^32].");
+ "Invalid number of bundles. Number of numbles has to be in the range of (0, 2^32].");
}
- Long maxVal = ((long) 1) << 32;
+ Long maxVal = MAX_BUNDLES;
Long segSize = maxVal / numBundles;
List partitions = Lists.newArrayList();
partitions.add(String.format("0x%08x", 0L));
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Markers.java b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Markers.java
index f7b6ed40e458f..d8fcd04960d76 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Markers.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Markers.java
@@ -99,10 +99,8 @@ public static ByteBuf newReplicatedSubscriptionsSnapshotRequest(String snapshotI
public static ReplicatedSubscriptionsSnapshotRequest parseReplicatedSubscriptionsSnapshotRequest(ByteBuf payload)
throws IOException {
ByteBufCodedInputStream inStream = ByteBufCodedInputStream.get(payload);
- ReplicatedSubscriptionsSnapshotRequest.Builder builder = null;
-
+ ReplicatedSubscriptionsSnapshotRequest.Builder builder = ReplicatedSubscriptionsSnapshotRequest.newBuilder();
try {
- builder = ReplicatedSubscriptionsSnapshotRequest.newBuilder();
return builder.mergeFrom(inStream, null).build();
} finally {
builder.recycle();
@@ -148,10 +146,8 @@ public static ByteBuf newReplicatedSubscriptionsSnapshotResponse(String snapshot
public static ReplicatedSubscriptionsSnapshotResponse parseReplicatedSubscriptionsSnapshotResponse(ByteBuf payload)
throws IOException {
ByteBufCodedInputStream inStream = ByteBufCodedInputStream.get(payload);
- ReplicatedSubscriptionsSnapshotResponse.Builder builder = null;
-
+ ReplicatedSubscriptionsSnapshotResponse.Builder builder = ReplicatedSubscriptionsSnapshotResponse.newBuilder();
try {
- builder = ReplicatedSubscriptionsSnapshotResponse.newBuilder();
return builder.mergeFrom(inStream, null).build();
} finally {
builder.recycle();
@@ -198,10 +194,8 @@ public static ByteBuf newReplicatedSubscriptionsSnapshot(String snapshotId, Stri
public static ReplicatedSubscriptionsSnapshot parseReplicatedSubscriptionsSnapshot(ByteBuf payload)
throws IOException {
ByteBufCodedInputStream inStream = ByteBufCodedInputStream.get(payload);
- ReplicatedSubscriptionsSnapshot.Builder builder = null;
-
+ ReplicatedSubscriptionsSnapshot.Builder builder = ReplicatedSubscriptionsSnapshot.newBuilder();
try {
- builder = ReplicatedSubscriptionsSnapshot.newBuilder();
return builder.mergeFrom(inStream, null).build();
} finally {
builder.recycle();
@@ -243,10 +237,8 @@ public static ByteBuf newReplicatedSubscriptionsUpdate(String subscriptionName,
public static ReplicatedSubscriptionsUpdate parseReplicatedSubscriptionsUpdate(ByteBuf payload)
throws IOException {
ByteBufCodedInputStream inStream = ByteBufCodedInputStream.get(payload);
- ReplicatedSubscriptionsUpdate.Builder builder = null;
-
+ ReplicatedSubscriptionsUpdate.Builder builder = ReplicatedSubscriptionsUpdate.newBuilder();
try {
- builder = ReplicatedSubscriptionsUpdate.newBuilder();
return builder.mergeFrom(inStream, null).build();
} finally {
builder.recycle();
@@ -280,11 +272,8 @@ public static ByteBuf newTxnAbortMarker(long sequenceId, long txnMostBits,
public static PulsarMarkers.TxnCommitMarker parseCommitMarker(ByteBuf payload) throws IOException {
ByteBufCodedInputStream inStream = ByteBufCodedInputStream.get(payload);
-
- PulsarMarkers.TxnCommitMarker.Builder builder = null;
-
+ PulsarMarkers.TxnCommitMarker.Builder builder = PulsarMarkers.TxnCommitMarker.newBuilder();
try {
- builder = PulsarMarkers.TxnCommitMarker.newBuilder();
return builder.mergeFrom(inStream, null).build();
} finally {
builder.recycle();
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/PulsarDecoder.java b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/PulsarDecoder.java
index 28c1fe1bed835..804b4ec7ca242 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/PulsarDecoder.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/PulsarDecoder.java
@@ -486,6 +486,8 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception
handleEndTxnOnSubscriptionResponse(cmd.getEndTxnOnSubscriptionResponse());
cmd.getEndTxnOnSubscriptionResponse().recycle();
break;
+ default:
+ break;
}
} finally {
if (cmdBuilder != null) {
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/schema/SchemaInfoUtil.java b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/schema/SchemaInfoUtil.java
index c3e7fee4415aa..d543c68fae240 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/schema/SchemaInfoUtil.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/schema/SchemaInfoUtil.java
@@ -18,6 +18,7 @@
*/
package org.apache.pulsar.common.protocol.schema;
+import java.nio.charset.StandardCharsets;
import java.util.Collections;
import java.util.TreeMap;
import lombok.experimental.UtilityClass;
@@ -61,7 +62,7 @@ public static SchemaInfo newSchemaInfo(Schema schema) {
public static SchemaInfo newSchemaInfo(String name, GetSchemaResponse schema) {
SchemaInfo si = new SchemaInfo();
si.setName(name);
- si.setSchema(schema.getData().getBytes());
+ si.setSchema(schema.getData().getBytes(StandardCharsets.UTF_8));
si.setType(schema.getType());
si.setProperties(schema.getProperties());
return si;
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/stats/Rate.java b/pulsar-common/src/main/java/org/apache/pulsar/common/stats/Rate.java
index a97123a0ea19f..d9d71fbe3f7c1 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/stats/Rate.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/stats/Rate.java
@@ -19,6 +19,7 @@
package org.apache.pulsar.common.stats;
import static com.google.common.base.Preconditions.checkArgument;
+import java.math.BigDecimal;
import java.util.concurrent.atomic.LongAdder;
/**
@@ -29,7 +30,7 @@ public class Rate {
private final LongAdder countAdder = new LongAdder();
// Computed stats
- private long count = 0;
+ private long count = 0L;
private double rate = 0.0d;
private double valueRate = 0.0d;
private double averageValue = 0.0d;
@@ -60,7 +61,7 @@ public void calculateRate(double period) {
count = countAdder.sumThenReset();
long sum = valueAdder.sumThenReset();
- averageValue = count != 0 ? sum / count : 0.0d;
+ averageValue = count != 0 ? Long.valueOf(sum).doubleValue() / Long.valueOf(count).doubleValue() : 0.0d;
rate = count / period;
valueRate = sum / period;
}
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/util/ClassLoaderUtils.java b/pulsar-common/src/main/java/org/apache/pulsar/common/util/ClassLoaderUtils.java
index 9af547e922b4a..2cb32a4d35121 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/util/ClassLoaderUtils.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/util/ClassLoaderUtils.java
@@ -23,6 +23,8 @@
import java.net.URL;
import java.net.URLClassLoader;
import java.nio.file.Path;
+import java.security.AccessController;
+import java.security.PrivilegedAction;
/**
* Helper methods wrt Classloading.
@@ -37,7 +39,8 @@ public class ClassLoaderUtils {
*/
public static ClassLoader loadJar(File jar) throws MalformedURLException {
java.net.URL url = jar.toURI().toURL();
- return new URLClassLoader(new URL[]{url});
+ return AccessController.doPrivileged(
+ (PrivilegedAction) () -> new URLClassLoader(new URL[]{url}));
}
public static ClassLoader extractClassLoader(Path archivePath, File packageFile) throws Exception {
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/util/NamespaceBundleStatsComparator.java b/pulsar-common/src/main/java/org/apache/pulsar/common/util/NamespaceBundleStatsComparator.java
index 588f0baa9689a..275ea659eb885 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/util/NamespaceBundleStatsComparator.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/util/NamespaceBundleStatsComparator.java
@@ -18,6 +18,7 @@
*/
package org.apache.pulsar.common.util;
+import java.io.Serializable;
import java.util.Comparator;
import java.util.Map;
import org.apache.pulsar.policies.data.loadbalancer.NamespaceBundleStats;
@@ -25,7 +26,7 @@
/**
*/
-public class NamespaceBundleStatsComparator implements Comparator {
+public class NamespaceBundleStatsComparator implements Comparator, Serializable {
Map map;
ResourceType resType;
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/util/RestException.java b/pulsar-common/src/main/java/org/apache/pulsar/common/util/RestException.java
index e0a72d81af410..3d947b73072ac 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/util/RestException.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/util/RestException.java
@@ -39,7 +39,9 @@ public static String getExceptionData(Throwable t) {
}
writer.append("Stacktrace:\n\n");
- t.printStackTrace(new PrintWriter(writer));
+ if (t != null) {
+ t.printStackTrace(new PrintWriter(writer));
+ }
return writer.toString();
}
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/util/SecurityUtility.java b/pulsar-common/src/main/java/org/apache/pulsar/common/util/SecurityUtility.java
index 8438f139a5782..41fb1e4136a2f 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/util/SecurityUtility.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/util/SecurityUtility.java
@@ -29,6 +29,7 @@
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
+import java.nio.charset.StandardCharsets;
import java.security.GeneralSecurityException;
import java.security.KeyFactory;
import java.security.KeyManagementException;
@@ -341,7 +342,7 @@ public static PrivateKey loadPrivateKeyFromPemStream(InputStream inStream) throw
return privateKey;
}
- try (BufferedReader reader = new BufferedReader(new InputStreamReader(inStream))) {
+ try (BufferedReader reader = new BufferedReader(new InputStreamReader(inStream, StandardCharsets.UTF_8))) {
if (inStream.markSupported()) {
inStream.reset();
}
@@ -349,7 +350,7 @@ public static PrivateKey loadPrivateKeyFromPemStream(InputStream inStream) throw
String currentLine = null;
// Jump to the first line after -----BEGIN [RSA] PRIVATE KEY-----
- while (!reader.readLine().startsWith("-----BEGIN")) {
+ while ((currentLine = reader.readLine()) != null && !currentLine.startsWith("-----BEGIN")) {
reader.readLine();
}
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentBitSet.java b/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentBitSet.java
index 24decbce6e309..ab8aaa6b4c5d9 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentBitSet.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentBitSet.java
@@ -18,12 +18,15 @@
*/
package org.apache.pulsar.common.util.collections;
+import lombok.EqualsAndHashCode;
+
import java.util.BitSet;
import java.util.concurrent.locks.StampedLock;
/**
* Safe multithreaded version of {@code BitSet}.
*/
+@EqualsAndHashCode(callSuper = true)
public class ConcurrentBitSet extends BitSet {
private static final long serialVersionUID = 1L;
@@ -164,4 +167,4 @@ public boolean isEmpty() {
}
return isEmpty;
}
-}
\ No newline at end of file
+}
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentBitSetRecyclable.java b/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentBitSetRecyclable.java
index 21ee42bb9c043..d096567877cb7 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentBitSetRecyclable.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentBitSetRecyclable.java
@@ -20,11 +20,14 @@
import io.netty.util.Recycler;
import io.netty.util.Recycler.Handle;
+import lombok.EqualsAndHashCode;
+
import java.util.BitSet;
/**
* Safe multithreaded version of {@code BitSet} and leverage netty recycler.
*/
+@EqualsAndHashCode(callSuper = true)
public class ConcurrentBitSetRecyclable extends ConcurrentBitSet {
private final Handle recyclerHandle;
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/GrowablePriorityLongPairQueue.java b/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/GrowablePriorityLongPairQueue.java
index d278c2984756d..0005aade69e23 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/GrowablePriorityLongPairQueue.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/GrowablePriorityLongPairQueue.java
@@ -85,7 +85,7 @@ public synchronized void add(long item1, long item2) {
loc = parent(loc);
}
- this.size++;
+ SIZE_UPDATER.incrementAndGet(this);
}
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/LongPairRangeSet.java b/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/LongPairRangeSet.java
index 2335599512a30..92ae3a659201b 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/LongPairRangeSet.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/LongPairRangeSet.java
@@ -23,6 +23,8 @@
import com.google.common.collect.Range;
import com.google.common.collect.RangeSet;
import com.google.common.collect.TreeRangeSet;
+import lombok.EqualsAndHashCode;
+
import java.util.Collection;
import java.util.List;
import java.util.Set;
@@ -148,6 +150,7 @@ public interface RangeProcessor> {
/**
* This class is a simple key-value data structure.
*/
+ @EqualsAndHashCode
class LongPair implements Comparable {
@SuppressWarnings("checkstyle:ConstantName")
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/util/keystoretls/KeyStoreSSLContext.java b/pulsar-common/src/main/java/org/apache/pulsar/common/util/keystoretls/KeyStoreSSLContext.java
index cc902a123d365..651396c751665 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/util/keystoretls/KeyStoreSSLContext.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/util/keystoretls/KeyStoreSSLContext.java
@@ -141,7 +141,9 @@ public SSLContext createSSLContext() throws GeneralSecurityException, IOExceptio
KeyManagerFactory keyManagerFactory = KeyManagerFactory.getInstance(kmfAlgorithm);
KeyStore keyStore = KeyStore.getInstance(keyStoreTypeString);
char[] passwordChars = keyStorePassword.toCharArray();
- keyStore.load(new FileInputStream(keyStorePath), passwordChars);
+ try (FileInputStream inputStream = new FileInputStream(keyStorePath)) {
+ keyStore.load(inputStream, passwordChars);
+ }
keyManagerFactory.init(keyStore, passwordChars);
keyManagers = keyManagerFactory.getKeyManagers();
}
@@ -154,7 +156,9 @@ public SSLContext createSSLContext() throws GeneralSecurityException, IOExceptio
trustManagerFactory = TrustManagerFactory.getInstance(tmfAlgorithm);
KeyStore trustStore = KeyStore.getInstance(trustStoreTypeString);
char[] passwordChars = trustStorePassword.toCharArray();
- trustStore.load(new FileInputStream(trustStorePath), passwordChars);
+ try (FileInputStream inputStream = new FileInputStream(trustStorePath)) {
+ trustStore.load(inputStream, passwordChars);
+ }
trustManagerFactory.init(trustStore);
}
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/policies/data/loadbalancer/JvmUsage.java b/pulsar-common/src/main/java/org/apache/pulsar/policies/data/loadbalancer/JvmUsage.java
index 824d6db137a9e..4861c94d2ac7a 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/policies/data/loadbalancer/JvmUsage.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/policies/data/loadbalancer/JvmUsage.java
@@ -45,7 +45,7 @@ public static JvmUsage populateFrom(Map metrics) {
JvmUsage jvmUsage = null;
if (metrics.containsKey("jvm_thread_cnt")) {
jvmUsage = new JvmUsage();
- jvmUsage.threadCount = ((Long) metrics.get("jvm_thread_cnt")).longValue();
+ jvmUsage.threadCount = Long.valueOf(metrics.get("jvm_thread_cnt").toString());
}
return jvmUsage;
}
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/policies/data/loadbalancer/NamespaceBundleStats.java b/pulsar-common/src/main/java/org/apache/pulsar/policies/data/loadbalancer/NamespaceBundleStats.java
index daf5db93cee2d..9b7763ed0fafc 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/policies/data/loadbalancer/NamespaceBundleStats.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/policies/data/loadbalancer/NamespaceBundleStats.java
@@ -18,9 +18,14 @@
*/
package org.apache.pulsar.policies.data.loadbalancer;
+import lombok.EqualsAndHashCode;
+
+import java.io.Serializable;
+
/**
*/
-public class NamespaceBundleStats implements Comparable {
+@EqualsAndHashCode
+public class NamespaceBundleStats implements Comparable, Serializable {
public double msgRateIn;
public double msgThroughputIn;
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/policies/data/loadbalancer/ResourceUnitRanking.java b/pulsar-common/src/main/java/org/apache/pulsar/policies/data/loadbalancer/ResourceUnitRanking.java
index b46f0e976b0b7..0dda36c64969f 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/policies/data/loadbalancer/ResourceUnitRanking.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/policies/data/loadbalancer/ResourceUnitRanking.java
@@ -19,11 +19,14 @@
package org.apache.pulsar.policies.data.loadbalancer;
import java.util.Set;
+
+import lombok.EqualsAndHashCode;
import org.apache.pulsar.common.policies.data.ResourceQuota;
/**
* The class containing information about system resources, allocated quota, and loaded bundles.
*/
+@EqualsAndHashCode
public class ResourceUnitRanking implements Comparable {
private static final long KBITS_TO_BYTES = 1024 / 8;
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/policies/data/loadbalancer/ResourceUsage.java b/pulsar-common/src/main/java/org/apache/pulsar/policies/data/loadbalancer/ResourceUsage.java
index b97e3145a48e3..752f0f066d4db 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/policies/data/loadbalancer/ResourceUsage.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/policies/data/loadbalancer/ResourceUsage.java
@@ -18,9 +18,12 @@
*/
package org.apache.pulsar.policies.data.loadbalancer;
+import lombok.EqualsAndHashCode;
+
/**
* POJO used to represents any system specific resource usage this is the format that load manager expects it in.
*/
+@EqualsAndHashCode
public class ResourceUsage {
public double usage;
public double limit;
diff --git a/pulsar-common/src/main/resources/findbugsExclude.xml b/pulsar-common/src/main/resources/findbugsExclude.xml
new file mode 100644
index 0000000000000..1706bca8422c0
--- /dev/null
+++ b/pulsar-common/src/main/resources/findbugsExclude.xml
@@ -0,0 +1,53 @@
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
diff --git a/pulsar-common/src/test/java/org/apache/pulsar/common/util/collections/ConcurrentBitSetRecyclableTest.java b/pulsar-common/src/test/java/org/apache/pulsar/common/util/collections/ConcurrentBitSetRecyclableTest.java
index e9371767f2a33..6d57a25710a42 100644
--- a/pulsar-common/src/test/java/org/apache/pulsar/common/util/collections/ConcurrentBitSetRecyclableTest.java
+++ b/pulsar-common/src/test/java/org/apache/pulsar/common/util/collections/ConcurrentBitSetRecyclableTest.java
@@ -40,21 +40,21 @@ public void testRecycle() {
public void testGenerateByBitSet() {
BitSet bitSet = new BitSet();
ConcurrentBitSetRecyclable bitSetRecyclable = ConcurrentBitSetRecyclable.create(bitSet);
- Assert.assertEquals(bitSet, bitSetRecyclable);
+ Assert.assertEquals(bitSet.toByteArray(), bitSetRecyclable.toByteArray());
bitSet.set(0, 10);
bitSetRecyclable.recycle();
bitSetRecyclable = ConcurrentBitSetRecyclable.create(bitSet);
- Assert.assertEquals(bitSet, bitSetRecyclable);
+ Assert.assertEquals(bitSet.toByteArray(), bitSetRecyclable.toByteArray());
bitSet.clear(5);
bitSetRecyclable.recycle();
bitSetRecyclable = ConcurrentBitSetRecyclable.create(bitSet);
- Assert.assertEquals(bitSet, bitSetRecyclable);
+ Assert.assertEquals(bitSet.toByteArray(), bitSetRecyclable.toByteArray());
bitSet.clear();
bitSetRecyclable.recycle();
bitSetRecyclable = ConcurrentBitSetRecyclable.create(bitSet);
- Assert.assertEquals(bitSet, bitSetRecyclable);
+ Assert.assertEquals(bitSet.toByteArray(), bitSetRecyclable.toByteArray());
}
}