Skip to content

Commit

Permalink
KAFKA-4894; Fix findbugs "default character set in use" warnings
Browse files Browse the repository at this point in the history
Author: Colin P. Mccabe <[email protected]>

Reviewers: Jun Rao <[email protected]>

Closes apache#2683 from cmccabe/KAFKA-4894
  • Loading branch information
cmccabe authored and junrao committed Mar 20, 2017
1 parent 783900c commit 5a2fcdd
Show file tree
Hide file tree
Showing 24 changed files with 168 additions and 121 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.nio.charset.StandardCharsets;

public class ErrorLoggingCallback implements Callback {
private static final Logger log = LoggerFactory.getLogger(ErrorLoggingCallback.class);
private String topic;
Expand All @@ -44,9 +46,9 @@ public ErrorLoggingCallback(String topic, byte[] key, byte[] value, boolean logA
public void onCompletion(RecordMetadata metadata, Exception e) {
if (e != null) {
String keyString = (key == null) ? "null" :
logAsString ? new String(key) : key.length + " bytes";
logAsString ? new String(key, StandardCharsets.UTF_8) : key.length + " bytes";
String valueString = (valueLength == -1) ? "null" :
logAsString ? new String(value) : valueLength + " bytes";
logAsString ? new String(value, StandardCharsets.UTF_8) : valueLength + " bytes";
log.error("Error when sending message to topic {} with key: {}, value: {} with error:",
topic, keyString, valueString, e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.nio.charset.StandardCharsets;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.atomic.AtomicBoolean;
Expand Down Expand Up @@ -87,8 +88,10 @@ private void runCommand() throws IOException {
//One time scheduling.
timeoutTimer.schedule(new ShellTimeoutTimerTask(this), timeout);
}
final BufferedReader errReader = new BufferedReader(new InputStreamReader(process.getErrorStream()));
BufferedReader inReader = new BufferedReader(new InputStreamReader(process.getInputStream()));
final BufferedReader errReader = new BufferedReader(
new InputStreamReader(process.getErrorStream(), StandardCharsets.UTF_8));
BufferedReader inReader = new BufferedReader(
new InputStreamReader(process.getInputStream(), StandardCharsets.UTF_8));
final StringBuffer errMsg = new StringBuffer();

// read error and input streams as this would free up the buffers
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.PrintStream;
import java.io.UnsupportedEncodingException;
import java.nio.charset.StandardCharsets;
import java.util.Collection;
import java.util.Map;

Expand Down Expand Up @@ -60,8 +62,9 @@ public void start(Map<String, String> props) {
outputStream = System.out;
} else {
try {
outputStream = new PrintStream(new FileOutputStream(filename, true));
} catch (FileNotFoundException e) {
outputStream = new PrintStream(new FileOutputStream(filename, true), false,
StandardCharsets.UTF_8.name());
} catch (FileNotFoundException | UnsupportedEncodingException e) {
throw new ConnectException("Couldn't find or create file for FileStreamSinkTask", e);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
Expand Down Expand Up @@ -64,7 +65,7 @@ public void start(Map<String, String> props) {
stream = System.in;
// Tracking offset for stdin doesn't make sense
streamOffset = null;
reader = new BufferedReader(new InputStreamReader(stream));
reader = new BufferedReader(new InputStreamReader(stream, StandardCharsets.UTF_8));
}
topic = props.get(FileStreamSourceConnector.TOPIC_CONFIG);
if (topic == null)
Expand Down Expand Up @@ -99,7 +100,7 @@ public List<SourceRecord> poll() throws InterruptedException {
} else {
streamOffset = 0L;
}
reader = new BufferedReader(new InputStreamReader(stream));
reader = new BufferedReader(new InputStreamReader(stream, StandardCharsets.UTF_8));
log.debug("Opened {} for reading", logFilename());
} catch (FileNotFoundException e) {
log.warn("Couldn't find file {} for FileStreamSourceTask, sleeping to wait for it to be created", logFilename());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import java.io.ByteArrayOutputStream;
import java.io.PrintStream;
import java.io.UnsupportedEncodingException;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
Expand Down Expand Up @@ -339,8 +340,8 @@ protected Connector getConnector(String connType) {

private String trace(Throwable t) {
ByteArrayOutputStream output = new ByteArrayOutputStream();
t.printStackTrace(new PrintStream(output));
try {
t.printStackTrace(new PrintStream(output, false, StandardCharsets.UTF_8.name()));
return output.toString("UTF-8");
} catch (UnsupportedEncodingException e) {
return null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
import java.net.HttpURLConnection;
import java.net.URI;
import java.net.URL;
import java.nio.charset.StandardCharsets;
import java.util.EnumSet;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -213,7 +214,7 @@ public static <T> HttpResponse<T> httpRequest(String url, String method, Object
connection.setDoOutput(true);

OutputStream os = connection.getOutputStream();
os.write(serializedBody.getBytes());
os.write(serializedBody.getBytes(StandardCharsets.UTF_8));
os.flush();
os.close();
}
Expand Down
17 changes: 9 additions & 8 deletions core/src/main/scala/kafka/coordinator/GroupMetadataManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package kafka.coordinator

import java.io.PrintStream
import java.nio.ByteBuffer
import java.nio.charset.StandardCharsets
import java.util.concurrent.TimeUnit
import java.util.concurrent.atomic.AtomicBoolean
import java.util.concurrent.locks.ReentrantLock
Expand Down Expand Up @@ -1045,10 +1046,10 @@ object GroupMetadataManager {
val formattedValue =
if (value == null) "NULL"
else GroupMetadataManager.readOffsetMessageValue(ByteBuffer.wrap(value)).toString
output.write(groupTopicPartition.toString.getBytes)
output.write("::".getBytes)
output.write(formattedValue.getBytes)
output.write("\n".getBytes)
output.write(groupTopicPartition.toString.getBytes(StandardCharsets.UTF_8))
output.write("::".getBytes(StandardCharsets.UTF_8))
output.write(formattedValue.getBytes(StandardCharsets.UTF_8))
output.write("\n".getBytes(StandardCharsets.UTF_8))
case _ => // no-op
}
}
Expand All @@ -1066,10 +1067,10 @@ object GroupMetadataManager {
val formattedValue =
if (value == null) "NULL"
else GroupMetadataManager.readGroupMessageValue(groupId, ByteBuffer.wrap(value)).toString
output.write(groupId.getBytes)
output.write("::".getBytes)
output.write(formattedValue.getBytes)
output.write("\n".getBytes)
output.write(groupId.getBytes(StandardCharsets.UTF_8))
output.write("::".getBytes(StandardCharsets.UTF_8))
output.write(formattedValue.getBytes(StandardCharsets.UTF_8))
output.write("\n".getBytes(StandardCharsets.UTF_8))
case _ => // no-op
}
}
Expand Down
5 changes: 3 additions & 2 deletions core/src/main/scala/kafka/server/OffsetCheckpoint.scala
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import scala.collection._
import kafka.utils.{Exit, Logging}
import kafka.common._
import java.io._
import java.nio.charset.StandardCharsets

import org.apache.kafka.common.TopicPartition

Expand All @@ -47,7 +48,7 @@ class OffsetCheckpoint(val file: File) extends Logging {
lock synchronized {
// write to temp file and then swap with the existing file
val fileOutputStream = new FileOutputStream(tempPath.toFile)
val writer = new BufferedWriter(new OutputStreamWriter(fileOutputStream))
val writer = new BufferedWriter(new OutputStreamWriter(fileOutputStream, StandardCharsets.UTF_8))
try {
writer.write(CurrentVersion.toString)
writer.newLine()
Expand Down Expand Up @@ -83,7 +84,7 @@ class OffsetCheckpoint(val file: File) extends Logging {
new IOException(s"Malformed line in offset checkpoint file: $line'")

lock synchronized {
val reader = new BufferedReader(new FileReader(file))
val reader = new BufferedReader(new InputStreamReader(new FileInputStream(file), StandardCharsets.UTF_8))
var line: String = null
try {
line = reader.readLine()
Expand Down
24 changes: 14 additions & 10 deletions core/src/main/scala/kafka/tools/ConsoleConsumer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,10 @@
package kafka.tools

import java.io.PrintStream
import java.nio.charset.StandardCharsets
import java.util.concurrent.CountDownLatch
import java.util.{Locale, Properties, Random}

import joptsimple._
import kafka.api.OffsetRequest
import kafka.common.{MessageFormatter, StreamEndException}
Expand All @@ -33,6 +35,7 @@ import org.apache.kafka.common.record.TimestampType
import org.apache.kafka.common.serialization.Deserializer
import org.apache.kafka.common.utils.Utils
import org.apache.log4j.Logger

import scala.collection.JavaConverters._

/**
Expand Down Expand Up @@ -412,8 +415,8 @@ object ConsoleConsumer extends Logging {
class DefaultMessageFormatter extends MessageFormatter {
var printKey = false
var printTimestamp = false
var keySeparator = "\t".getBytes
var lineSeparator = "\n".getBytes
var keySeparator = "\t".getBytes(StandardCharsets.UTF_8)
var lineSeparator = "\n".getBytes(StandardCharsets.UTF_8)

var keyDeserializer: Option[Deserializer[_]] = None
var valueDeserializer: Option[Deserializer[_]] = None
Expand All @@ -424,9 +427,9 @@ class DefaultMessageFormatter extends MessageFormatter {
if (props.containsKey("print.key"))
printKey = props.getProperty("print.key").trim.equalsIgnoreCase("true")
if (props.containsKey("key.separator"))
keySeparator = props.getProperty("key.separator").getBytes
keySeparator = props.getProperty("key.separator").getBytes(StandardCharsets.UTF_8)
if (props.containsKey("line.separator"))
lineSeparator = props.getProperty("line.separator").getBytes
lineSeparator = props.getProperty("line.separator").getBytes(StandardCharsets.UTF_8)
// Note that `toString` will be called on the instance returned by `Deserializer.deserialize`
if (props.containsKey("key.deserializer"))
keyDeserializer = Some(Class.forName(props.getProperty("key.deserializer")).newInstance().asInstanceOf[Deserializer[_]])
Expand All @@ -438,8 +441,9 @@ class DefaultMessageFormatter extends MessageFormatter {
def writeTo(consumerRecord: ConsumerRecord[Array[Byte], Array[Byte]], output: PrintStream) {

def write(deserializer: Option[Deserializer[_]], sourceBytes: Array[Byte], separator: Array[Byte]) {
val nonNullBytes = Option(sourceBytes).getOrElse("null".getBytes)
val convertedBytes = deserializer.map(_.deserialize(null, nonNullBytes).toString.getBytes).getOrElse(nonNullBytes)
val nonNullBytes = Option(sourceBytes).getOrElse("null".getBytes(StandardCharsets.UTF_8))
val convertedBytes = deserializer.map(_.deserialize(null, nonNullBytes).toString.
getBytes(StandardCharsets.UTF_8)).getOrElse(nonNullBytes)
output.write(convertedBytes)
output.write(separator)
}
Expand All @@ -448,9 +452,9 @@ class DefaultMessageFormatter extends MessageFormatter {

if (printTimestamp) {
if (timestampType != TimestampType.NO_TIMESTAMP_TYPE)
output.write(s"$timestampType:$timestamp".getBytes)
output.write(s"$timestampType:$timestamp".getBytes(StandardCharsets.UTF_8))
else
output.write(s"NO_TIMESTAMP".getBytes)
output.write(s"NO_TIMESTAMP".getBytes(StandardCharsets.UTF_8))
output.write(keySeparator)
}

Expand All @@ -470,8 +474,8 @@ class LoggingMessageFormatter extends MessageFormatter {
defaultWriter.writeTo(consumerRecord, output)
if (logger.isInfoEnabled)
logger.info({if (timestampType != TimestampType.NO_TIMESTAMP_TYPE) s"$timestampType:$timestamp, " else ""} +
s"key:${if (key == null) "null" else new String(key)}, " +
s"value:${if (value == null) "null" else new String(value)}")
s"key:${if (key == null) "null" else new String(key, StandardCharsets.UTF_8)}, " +
s"value:${if (value == null) "null" else new String(value, StandardCharsets.UTF_8)}")
}
}

Expand Down
11 changes: 6 additions & 5 deletions core/src/main/scala/kafka/tools/ConsoleProducer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import kafka.utils.{CommandLineUtils, Exit, ToolsUtils}
import kafka.producer.{NewShinyProducer, OldProducer}
import java.util.Properties
import java.io._
import java.nio.charset.StandardCharsets

import joptsimple._
import org.apache.kafka.clients.producer.{ProducerConfig, ProducerRecord}
Expand Down Expand Up @@ -301,7 +302,7 @@ object ConsoleProducer {
keySeparator = props.getProperty("key.separator")
if (props.containsKey("ignore.error"))
ignoreError = props.getProperty("ignore.error").trim.equalsIgnoreCase("true")
reader = new BufferedReader(new InputStreamReader(inputStream))
reader = new BufferedReader(new InputStreamReader(inputStream, StandardCharsets.UTF_8))
}

override def readMessage() = {
Expand All @@ -312,14 +313,14 @@ object ConsoleProducer {
case (line, true) =>
line.indexOf(keySeparator) match {
case -1 =>
if (ignoreError) new ProducerRecord(topic, line.getBytes)
if (ignoreError) new ProducerRecord(topic, line.getBytes(StandardCharsets.UTF_8))
else throw new KafkaException(s"No key found on line $lineNumber: $line")
case n =>
val value = (if (n + keySeparator.size > line.size) "" else line.substring(n + keySeparator.size)).getBytes
new ProducerRecord(topic, line.substring(0, n).getBytes, value)
val value = (if (n + keySeparator.size > line.size) "" else line.substring(n + keySeparator.size)).getBytes(StandardCharsets.UTF_8)
new ProducerRecord(topic, line.substring(0, n).getBytes(StandardCharsets.UTF_8), value)
}
case (line, false) =>
new ProducerRecord(topic, line.getBytes)
new ProducerRecord(topic, line.getBytes(StandardCharsets.UTF_8))
}
}
}
Expand Down
5 changes: 3 additions & 2 deletions core/src/main/scala/kafka/tools/EndToEndLatency.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package kafka.tools

import java.nio.charset.StandardCharsets
import java.util.{Arrays, Collections, Properties}

import kafka.utils.Exit
Expand Down Expand Up @@ -113,8 +114,8 @@ object EndToEndLatency {
}

//Check result matches the original record
val sent = new String(message)
val read = new String(recordIter.next().value())
val sent = new String(message, StandardCharsets.UTF_8)
val read = new String(recordIter.next().value(), StandardCharsets.UTF_8)
if (!read.equals(sent)) {
finalise()
throw new RuntimeException(s"The message read [$read] did not match the message sent [$sent]")
Expand Down
6 changes: 4 additions & 2 deletions core/src/main/scala/kafka/tools/ExportZkOffsets.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@

package kafka.tools

import java.io.FileWriter
import java.io.{FileOutputStream, FileWriter, OutputStreamWriter}
import java.nio.charset.StandardCharsets

import joptsimple._
import kafka.utils.{CommandLineUtils, Exit, Logging, ZKGroupTopicDirs, ZkUtils}
Expand Down Expand Up @@ -76,7 +77,8 @@ object ExportZkOffsets extends Logging {
val outfile = options.valueOf(outFileOpt)

var zkUtils : ZkUtils = null
val fileWriter : FileWriter = new FileWriter(outfile)
val fileWriter : OutputStreamWriter =
new OutputStreamWriter(new FileOutputStream(outfile), StandardCharsets.UTF_8)

try {
zkUtils = ZkUtils(zkConnect,
Expand Down
6 changes: 3 additions & 3 deletions core/src/main/scala/kafka/tools/ImportZkOffsets.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@

package kafka.tools

import java.io.BufferedReader
import java.io.FileReader
import java.io.{BufferedReader, FileInputStream, InputStreamReader}
import java.nio.charset.StandardCharsets

import joptsimple._
import kafka.utils.{CommandLineUtils, Exit, Logging, ZkUtils}
Expand Down Expand Up @@ -77,7 +77,7 @@ object ImportZkOffsets extends Logging {
}

private def getPartitionOffsetsFromFile(filename: String):Map[String,String] = {
val fr = new FileReader(filename)
val fr = new InputStreamReader(new FileInputStream(filename), StandardCharsets.UTF_8)
val br = new BufferedReader(fr)
var partOffsetsMap: Map[String,String] = Map()

Expand Down
5 changes: 3 additions & 2 deletions core/src/main/scala/kafka/tools/ProducerPerformance.scala
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import java.util.concurrent.atomic.AtomicLong
import java.util._
import java.text.SimpleDateFormat
import java.math.BigInteger
import java.nio.charset.StandardCharsets

import org.apache.kafka.common.utils.Utils
import org.apache.log4j.Logger
Expand Down Expand Up @@ -245,7 +246,7 @@ object ProducerPerformance extends Logging {

val seqMsgString = String.format("%1$-" + msgSize + "s", msgHeader).replace(' ', 'x')
debug(seqMsgString)
seqMsgString.getBytes()
seqMsgString.getBytes(StandardCharsets.UTF_8)
}

private def generateProducerData(topic: String, messageId: Long): Array[Byte] = {
Expand Down Expand Up @@ -276,7 +277,7 @@ object ProducerPerformance extends Logging {
Thread.sleep(config.messageSendGapMs)
})
} catch {
case e: Throwable => error("Error when sending message " + new String(message), e)
case e: Throwable => error("Error when sending message " + new String(message, StandardCharsets.UTF_8), e)
}
i += 1
}
Expand Down
Loading

0 comments on commit 5a2fcdd

Please sign in to comment.