Skip to content

Commit

Permalink
Added a system test for producer with compression enabled. Corrected …
Browse files Browse the repository at this point in the history
…the logic for enforcing compressed.topics. Changed message.isValid to check the magic byte and throw UnknownMagicByteException if magic byte is unknown
  • Loading branch information
nehanarkhede committed Jul 22, 2011
1 parent 18009f3 commit fab1263
Show file tree
Hide file tree
Showing 12 changed files with 188 additions and 75 deletions.
11 changes: 6 additions & 5 deletions config/log4j.properties
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,16 @@ log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=[%d] %p %m (%c)%n

log4j.appender.fileAppender=org.apache.log4j.FileAppender
log4j.appender.fileAppender.File=kafka-request.log
log4j.appender.fileAppender.layout=org.apache.log4j.PatternLayout
log4j.appender.fileAppender.layout.ConversionPattern= %-4r [%t] %-5p %c %x - %m%n
#log4j.appender.fileAppender=org.apache.log4j.FileAppender
#log4j.appender.fileAppender.File=kafka-request.log
#log4j.appender.fileAppender.layout=org.apache.log4j.PatternLayout
#log4j.appender.fileAppender.layout.ConversionPattern= %-4r [%t] %-5p %c %x - %m%n


# Turn on all our debugging info
log4j.logger.kafka=INFO,stdout
log4j.logger.kafka.consumer.PartitionTopicInfo=TRACE,stdout
#log4j.logger.kafka.producer.async.DefaultEventHandler=DEBUG,stdout
#log4j.logger.kafka.consumer.PartitionTopicInfo=TRACE,stdout
#log4j.logger.kafka.request.logger=TRACE,fileAppender
#log4j.additivity.kafka.request.logger=false
#log4j.logger.kafka.network.Processor=TRACE,fileAppender
Expand Down
25 changes: 25 additions & 0 deletions core/src/main/scala/kafka/common/UnknownMagicByteException.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
/*
* Copyright 2010 LinkedIn
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package kafka.common

/**
* Indicates the client has requested a range no longer available on the server
*/
class UnknownMagicByteException(message: String) extends RuntimeException(message) {
def this() = this(null)
}

6 changes: 3 additions & 3 deletions core/src/main/scala/kafka/consumer/ConsoleConsumer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ object ConsoleConsumer {
formatter.writeTo(message, System.out)
messageCount += 1
}
println("Received " + messageCount + " messages")
logger.info("Received " + messageCount + " messages")
System.out.flush()
formatter.close()
connector.shutdown()
Expand All @@ -150,7 +150,7 @@ object ConsoleConsumer {
def checkRequiredArgs(parser: OptionParser, options: OptionSet, required: OptionSpec[_]*) {
for(arg <- required) {
if(!options.has(arg)) {
System.err.println("Missing required argument \"" + arg + "\"")
logger.error("Missing required argument \"" + arg + "\"")
parser.printHelpOn(System.err)
System.exit(1)
}
Expand All @@ -160,7 +160,7 @@ object ConsoleConsumer {
def tryParseFormatterArgs(args: Iterable[String]): Properties = {
val splits = args.map(_ split "=").filterNot(_ == null).filterNot(_.length == 0)
if(!splits.forall(_.length == 2)) {
System.err.println("Invalid parser arguments: " + args.mkString(" "))
logger.error("Invalid parser arguments: " + args.mkString(" "))
System.exit(1)
}
val props = new Properties
Expand Down
6 changes: 4 additions & 2 deletions core/src/main/scala/kafka/consumer/ConsumerIterator.scala
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,8 @@ class ConsumerIterator(private val channel: BlockingQueue[FetchedDataChunk], con
override def next(): Message = {
val message = super.next
currentTopicInfo.resetConsumeOffset(consumedOffset)
logger.info("Setting consumed offset to %d".format(consumedOffset))
if(logger.isTraceEnabled)
logger.trace("Setting consumed offset to %d".format(consumedOffset))
message
}

Expand All @@ -55,7 +56,8 @@ class ConsumerIterator(private val channel: BlockingQueue[FetchedDataChunk], con
}
}
if(currentDataChunk eq ZookeeperConsumerConnector.shutdownCommand) {
logger.debug("Received the shutdown command")
if(logger.isDebugEnabled)
logger.debug("Received the shutdown command")
channel.offer(currentDataChunk)
return allDone
} else {
Expand Down
35 changes: 18 additions & 17 deletions core/src/main/scala/kafka/message/ByteBufferBackedInputStream.scala
Original file line number Diff line number Diff line change
Expand Up @@ -5,21 +5,22 @@ import java.nio.ByteBuffer
import scala.Math

class ByteBufferBackedInputStream(buffer:ByteBuffer) extends InputStream {
override def read():Int = {
if (!buffer.hasRemaining())
{
-1
}
(buffer.get() & 0xFF)
}

override def read(bytes:Array[Byte], off:Int, len:Int):Int = {
if (!buffer.hasRemaining()) {
-1
}
// Read only what's left
val realLen = math.min(len, buffer.remaining())
buffer.get(bytes, off, realLen)
realLen
}
override def read():Int = {
buffer.hasRemaining match {
case true =>
(buffer.get() & 0xFF)
case false => -1
}
}

override def read(bytes:Array[Byte], off:Int, len:Int):Int = {
buffer.hasRemaining match {
case true =>
// Read only what's left
val realLen = math.min(len, buffer.remaining())
buffer.get(bytes, off, realLen)
realLen
case false => -1
}
}
}
50 changes: 30 additions & 20 deletions core/src/main/scala/kafka/message/CompressionUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -35,29 +35,41 @@ object CompressionUtils {
val gzipOutput:GZIPOutputStream = new GZIPOutputStream(outputStream)
if(logger.isDebugEnabled)
logger.debug("Allocating message byte buffer of size = " + MessageSet.messageSetSize(messages))

val messageByteBuffer = ByteBuffer.allocate(MessageSet.messageSetSize(messages))
for (message <- messages) {
message.serializeTo(messageByteBuffer)
}
messages.foreach(m => m.serializeTo(messageByteBuffer))
messageByteBuffer.rewind
gzipOutput.write(messageByteBuffer.array)
gzipOutput.close();
outputStream.close();

try {
gzipOutput.write(messageByteBuffer.array)
} catch {
case e: IOException => logger.error("Error while writing to the GZIP output stream", e)
} finally {
gzipOutput.close();
outputStream.close();
}

val oneCompressedMessage:Message = new Message(outputStream.toByteArray, compressionCodec)
oneCompressedMessage
case GZIPCompressionCodec =>
val outputStream:ByteArrayOutputStream = new ByteArrayOutputStream()
val gzipOutput:GZIPOutputStream = new GZIPOutputStream(outputStream)
if(logger.isDebugEnabled)
logger.debug("Allocating message byte buffer of size = " + MessageSet.messageSetSize(messages))

val messageByteBuffer = ByteBuffer.allocate(MessageSet.messageSetSize(messages))
for (message <- messages) {
message.serializeTo(messageByteBuffer)
}
messages.foreach(m => m.serializeTo(messageByteBuffer))
messageByteBuffer.rewind
gzipOutput.write(messageByteBuffer.array)
gzipOutput.close();
outputStream.close();

try {
gzipOutput.write(messageByteBuffer.array)
} catch {
case e: IOException => logger.error("Error while writing to the GZIP output stream", e)
} finally {
gzipOutput.close();
outputStream.close();
}

val oneCompressedMessage:Message = new Message(outputStream.toByteArray, compressionCodec)
oneCompressedMessage
case _ =>
Expand All @@ -70,10 +82,9 @@ object CompressionUtils {
val inputStream:InputStream = new ByteBufferBackedInputStream(message.payload)
val gzipIn:GZIPInputStream = new GZIPInputStream(inputStream)
val intermediateBuffer = new Array[Byte](1024)
var len=gzipIn.read(intermediateBuffer)
while (len >0) {
outputStream.write(intermediateBuffer,0,len)
len = gzipIn.read(intermediateBuffer)

Stream.continually(gzipIn.read(intermediateBuffer)).takeWhile(_ > 0).foreach { dataRead =>
outputStream.write(intermediateBuffer, 0, dataRead)
}

gzipIn.close
Expand All @@ -88,10 +99,9 @@ object CompressionUtils {
val inputStream:InputStream = new ByteBufferBackedInputStream(message.payload)
val gzipIn:GZIPInputStream = new GZIPInputStream(inputStream)
val intermediateBuffer = new Array[Byte](1024)
var len=gzipIn.read(intermediateBuffer)
while (len >0) {
outputStream.write(intermediateBuffer,0,len)
len = gzipIn.read(intermediateBuffer)

Stream.continually(gzipIn.read(intermediateBuffer)).takeWhile(_ > 0).foreach { dataRead =>
outputStream.write(intermediateBuffer, 0, dataRead)
}

gzipIn.close
Expand Down
18 changes: 11 additions & 7 deletions core/src/main/scala/kafka/message/Message.scala
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,14 @@ import java.nio.channels._
import java.util.zip.CRC32
import java.util.UUID
import kafka.utils._
import kafka.common.UnknownMagicByteException

/**
* Message byte offsets
*/
object Message {
val MagicVersion1: Byte = 0
val MagicVersion2: Byte = 1
val CurrentMagicValue: Byte = 1
val MagicOffset = 0
val MagicLength = 1
Expand All @@ -39,8 +42,9 @@ object Message {
* 1 for compression
*/
def CrcOffset(magic: Byte): Int = magic match {
case 0 => MagicOffset + MagicLength
case _ => AttributeOffset + AttributeLength
case MagicVersion1 => MagicOffset + MagicLength
case MagicVersion2 => AttributeOffset + AttributeLength
case _ => throw new UnknownMagicByteException("Magic byte value of %d is unknown".format(magic))
}

val CrcLength = 4
Expand Down Expand Up @@ -153,9 +157,9 @@ class Message(val buffer: ByteBuffer) {
payload
}

def isValid: Boolean =
def isValid: Boolean =
checksum == Utils.crc32(buffer.array, buffer.position + buffer.arrayOffset + PayloadOffset(magic), payloadSize)

def serializedSize: Int = 4 /* int size*/ + buffer.limit

def serializeTo(serBuffer:ByteBuffer) = {
Expand All @@ -164,12 +168,12 @@ class Message(val buffer: ByteBuffer) {
}

override def toString(): String =
"message(magic = " + magic + ", attributes = " + attributes + ", crc = " + checksum +
", payload = " + payload + ")"
"message(magic = %d, attributes = %d, crc = %d, payload = %s)".format(magic, attributes, checksum, payload)

override def equals(any: Any): Boolean = {
any match {
case that: Message => size == that.size && attributes == that.attributes && checksum == that.checksum && payload == that.payload && magic == that.magic
case that: Message => size == that.size && attributes == that.attributes && checksum == that.checksum &&
payload == that.payload && magic == that.magic
case _ => false
}
}
Expand Down
35 changes: 28 additions & 7 deletions core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,8 @@ private[kafka] class DefaultEventHandler[T](val config: SyncProducerConfigShared
if(messagesPerTopic.size > 0) {
val requests = messagesPerTopic.map(f => new ProducerRequest(f._1._1, f._1._2, f._2)).toArray
syncProducer.multiSend(requests)
if(logger.isDebugEnabled)
logger.debug("kafka producer sent messages for topics " + messagesPerTopic)
if(logger.isTraceEnabled)
logger.trace("kafka producer sent messages for topics " + messagesPerTopic)
}
}

Expand All @@ -60,11 +60,32 @@ private[kafka] class DefaultEventHandler[T](val config: SyncProducerConfigShared
* If the compression codec is NoCompressionCodec, compression is disabled for all topics
*/
val messages = eventsPerTopicMap.map(e => {
if(config.compressedTopics.contains(e._1._1))
new ByteBufferMessageSet(config.compressionCodec, e._2: _*)
else
new ByteBufferMessageSet(NoCompressionCodec, e._2: _*)
})
config.compressionCodec match {
case NoCompressionCodec =>
if(logger.isDebugEnabled)
logger.debug("Sending %d messages with no compression".format(e._2.size))
new ByteBufferMessageSet(NoCompressionCodec, e._2: _*)
case _ =>
config.compressedTopics.size match {
case 0 =>
if(logger.isDebugEnabled)
logger.debug("Sending %d messages with compression %d".format(e._2.size, config.compressionCodec.codec))
new ByteBufferMessageSet(config.compressionCodec, e._2: _*)
case _ =>
if(config.compressedTopics.contains(e._1._1)) {
if(logger.isDebugEnabled)
logger.debug("Sending %d messages with compression %d".format(e._2.size, config.compressionCodec.codec))
new ByteBufferMessageSet(config.compressionCodec, e._2: _*)
}
else {
if(logger.isDebugEnabled)
logger.debug("Sending %d messages with no compression as %s is not in compressed.topics - %s"
.format(e._2.size, e._1._1, config.compressedTopics.toString))
new ByteBufferMessageSet(NoCompressionCodec, e._2: _*)
}
}
}
})
topicsAndPartitions.zip(messages)
}

Expand Down
4 changes: 3 additions & 1 deletion core/src/main/scala/kafka/tools/ProducerPerformance.scala
Original file line number Diff line number Diff line change
Expand Up @@ -150,12 +150,14 @@ object ProducerPerformance {
else
props.put("broker.list", brokerInfoList(1))

props.put("compression.codec", config.compressionCodec.toString)
props.put("compression.codec", config.compressionCodec.codec.toString)
props.put("producer.type","async")
props.put("batch.size", config.batchSize.toString)
props.put("reconnect.interval", Integer.MAX_VALUE.toString)
props.put("buffer.size", (64*1024).toString)

logger.info("Producer properties = " + props.toString)

val producerConfig = new ProducerConfig(props)
val producer = new Producer[String, String](producerConfig, new StringEncoder,
new DefaultEventHandler(producerConfig, null), null, new DefaultPartitioner)
Expand Down
21 changes: 10 additions & 11 deletions core/src/main/scala/kafka/tools/ReplayLogProducer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -50,15 +50,8 @@ object ReplayLogProducer {
for (thread <- threadList)
thread.start

// attach shutdown handler to catch control-c
Runtime.getRuntime().addShutdownHook(new Thread() {
override def run() = {
consumerConnector.shutdown
threadList.foreach(_.shutdown)
logger.info("consumer threads shutted down")
}
})

threadList.foreach(_.shutdown)
consumerConnector.shutdown
}

class Config(args: Array[String]) {
Expand Down Expand Up @@ -178,8 +171,14 @@ object ReplayLogProducer {
else
stream
for (message <- iter) {
producer.send(new ProducerData[Message, Message](config.outputTopic, message))
messageCount += 1
try {
producer.send(new ProducerData[Message, Message](config.outputTopic, message))
if (config.delayedMSBtwSend > 0 && (messageCount + 1) % config.batchSize == 0)
Thread.sleep(config.delayedMSBtwSend)
messageCount += 1
}catch {
case ie: Exception => logger.error("Skipping this message", ie)
}
}
}catch {
case e: ConsumerTimeoutException => logger.error("consumer thread timing out", e)
Expand Down
Loading

0 comments on commit fab1263

Please sign in to comment.