Skip to content

Commit

Permalink
Corrected ByteBufferMessageSet to take in an initial offset and use t…
Browse files Browse the repository at this point in the history
…hat in the iterator to return the correct offset to consume the next message
  • Loading branch information
nehanarkhede committed Jul 21, 2011
1 parent d423896 commit 18009f3
Show file tree
Hide file tree
Showing 13 changed files with 44 additions and 32 deletions.
5 changes: 3 additions & 2 deletions config/log4j.properties
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,9 @@ log4j.appender.fileAppender.layout.ConversionPattern= %-4r [%t] %-5p %c %x - %m%

# Turn on all our debugging info
log4j.logger.kafka=INFO,stdout
log4j.logger.kafka.request.logger=TRACE,fileAppender
log4j.additivity.kafka.request.logger=false
log4j.logger.kafka.consumer.PartitionTopicInfo=TRACE,stdout
#log4j.logger.kafka.request.logger=TRACE,fileAppender
#log4j.additivity.kafka.request.logger=false
#log4j.logger.kafka.network.Processor=TRACE,fileAppender
#log4j.additivity.kafka.network.Processor=false
#log4j.logger.org.I0Itec.zkclient.ZkClient=DEBUG
Expand Down
6 changes: 3 additions & 3 deletions core/src/main/scala/kafka/api/MultiFetchResponse.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,11 @@
package kafka.api

import java.nio._
import scala.collection.mutable
import collection.mutable
import kafka.utils.IteratorTemplate
import kafka.message._

class MultiFetchResponse(val buffer: ByteBuffer, val numSets: Int) extends Iterable[ByteBufferMessageSet] {
class MultiFetchResponse(val buffer: ByteBuffer, val numSets: Int, val offsets: Array[Long]) extends Iterable[ByteBufferMessageSet] {
private val messageSets = new mutable.ListBuffer[ByteBufferMessageSet]

for(i <- 0 until numSets) {
Expand All @@ -31,7 +31,7 @@ class MultiFetchResponse(val buffer: ByteBuffer, val numSets: Int) extends Itera
val payloadSize = size - 2
copy.limit(payloadSize)
buffer.position(buffer.position + payloadSize)
messageSets += new ByteBufferMessageSet(copy, errorCode, true)
messageSets += new ByteBufferMessageSet(copy, offsets(i), errorCode, true)
}

def iterator : Iterator[ByteBufferMessageSet] = {
Expand Down
6 changes: 3 additions & 3 deletions core/src/main/scala/kafka/consumer/PartitionTopicInfo.scala
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ private[consumer] class PartitionTopicInfo(val topic: String,
def resetFetchOffset(newFetchOffset: Long) = {
fetchedOffset.set(newFetchOffset)
if (logger.isDebugEnabled)
logger.debug("reset fetch offset of " + this + " to " + newFetchOffset)
logger.debug("reset fetch offset of ( %s ) to %d".format(this, newFetchOffset))
}

/**
Expand All @@ -61,7 +61,7 @@ private[consumer] class PartitionTopicInfo(val topic: String,
logger.trace("Current consumed offset = " + consumedOffset.get)
val newOffset = consumedOffset.addAndGet(messageSize)
if (logger.isDebugEnabled)
logger.debug("updated consume offset of " + this + " to " + newOffset)
logger.debug("updated consume offset of ( %s ) to %d".format(this, newOffset))
}

/**
Expand All @@ -76,7 +76,7 @@ private[consumer] class PartitionTopicInfo(val topic: String,
logger.trace("Updating fetch offset = " + fetchedOffset.get + " with size = " + size)
val newOffset = fetchedOffset.addAndGet(size)
if (logger.isDebugEnabled)
logger.debug("updated fetch offset of " + this + " to " + newOffset)
logger.debug("updated fetch offset of ( %s ) to %d".format(this, newOffset))
chunkQueue.put(new FetchedDataChunk(messages, this, fetchOffset))
}
size
Expand Down
4 changes: 2 additions & 2 deletions core/src/main/scala/kafka/consumer/SimpleConsumer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ class SimpleConsumer(val host: String,
val endTime = SystemTime.nanoseconds
SimpleConsumerStats.recordFetchRequest(endTime - startTime)
SimpleConsumerStats.recordConsumptionThroughput(response._1.buffer.limit)
new ByteBufferMessageSet(response._1.buffer, response._2, true)
new ByteBufferMessageSet(response._1.buffer, request.offset, response._2, true)
}
}

Expand Down Expand Up @@ -137,7 +137,7 @@ class SimpleConsumer(val host: String,
SimpleConsumerStats.recordConsumptionThroughput(response._1.buffer.limit)

// error code will be set on individual messageset inside MultiFetchResponse
new MultiFetchResponse(response._1.buffer, fetches.length)
new MultiFetchResponse(response._1.buffer, fetches.length, fetches.toArray.map(f => f.offset))
}
}

Expand Down
5 changes: 3 additions & 2 deletions core/src/main/scala/kafka/javaapi/Implicits.scala
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,8 @@ private[javaapi] object Implicits {

implicit def scalaMessageSetToJavaMessageSet(messageSet: kafka.message.ByteBufferMessageSet):
kafka.javaapi.message.ByteBufferMessageSet = {
new kafka.javaapi.message.ByteBufferMessageSet(messageSet.getBuffer, messageSet.getErrorCode, messageSet.getDeepIterate)
new kafka.javaapi.message.ByteBufferMessageSet(messageSet.getBuffer, messageSet.initialOffset,
messageSet.getErrorCode, messageSet.getDeepIterate)
}

implicit def toJavaSyncProducer(producer: kafka.producer.SyncProducer): kafka.javaapi.producer.SyncProducer = {
Expand Down Expand Up @@ -121,5 +122,5 @@ private[javaapi] object Implicits {
response.underlying

implicit def toJavaMultiFetchResponse(response: kafka.api.MultiFetchResponse): kafka.javaapi.MultiFetchResponse =
new kafka.javaapi.MultiFetchResponse(response.buffer, response.numSets)
new kafka.javaapi.MultiFetchResponse(response.buffer, response.numSets, response.offsets)
}
4 changes: 2 additions & 2 deletions core/src/main/scala/kafka/javaapi/MultiFetchResponse.scala
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,13 @@ import kafka.utils.IteratorTemplate
import java.nio.ByteBuffer
import message.ByteBufferMessageSet

class MultiFetchResponse(buffer: ByteBuffer, numSets: Int) extends java.lang.Iterable[ByteBufferMessageSet] {
class MultiFetchResponse(buffer: ByteBuffer, numSets: Int, offsets: Array[Long]) extends java.lang.Iterable[ByteBufferMessageSet] {
val underlyingBuffer = ByteBuffer.wrap(buffer.array)
// this has the side effect of setting the initial position of buffer correctly
val errorCode = underlyingBuffer.getShort

import Implicits._
val underlying = new kafka.api.MultiFetchResponse(underlyingBuffer, numSets)
val underlying = new kafka.api.MultiFetchResponse(underlyingBuffer, numSets, offsets)

override def toString() = underlying.toString

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,15 +21,14 @@ import org.apache.log4j.Logger
import kafka.message._

class ByteBufferMessageSet(val buffer: ByteBuffer,
val initialOffset: Long = 0L,
val errorCode: Int = ErrorMapping.NoError,
val deepIterate: Boolean = true) extends MessageSet {
private val logger = Logger.getLogger(getClass())
val underlying: kafka.message.ByteBufferMessageSet = new kafka.message.ByteBufferMessageSet(buffer, errorCode, deepIterate)
// var buffer:ByteBuffer = null
// var errorCode:Int = ErrorMapping.NoError
// var deepIterate = false

def this(buffer: ByteBuffer) = this(buffer, ErrorMapping.NoError, true)
val underlying: kafka.message.ByteBufferMessageSet = new kafka.message.ByteBufferMessageSet(buffer,
initialOffset,
errorCode, deepIterate)
def this(buffer: ByteBuffer) = this(buffer, 0L, ErrorMapping.NoError, true)

def this(compressionCodec: CompressionCodec, messages: java.util.List[Message]) {
this(compressionCodec match {
Expand All @@ -49,7 +48,7 @@ class ByteBufferMessageSet(val buffer: ByteBuffer,
message.serializeTo(buffer)
buffer.rewind
buffer
}, ErrorMapping.NoError, true)
}, 0L, ErrorMapping.NoError, true)
}

def validBytes: Long = underlying.validBytes
Expand Down Expand Up @@ -77,13 +76,13 @@ class ByteBufferMessageSet(val buffer: ByteBuffer,
other match {
case that: ByteBufferMessageSet =>
(that canEqual this) && errorCode == that.errorCode && buffer.equals(that.buffer) &&
deepIterate == that.deepIterate
deepIterate == that.deepIterate && initialOffset == that.initialOffset
case _ => false
}
}

def canEqual(other: Any): Boolean = other.isInstanceOf[ByteBufferMessageSet]

override def hashCode: Int = 31 * (17 + errorCode) + buffer.hashCode + deepIterate.hashCode
override def hashCode: Int = 31 * (17 + errorCode) + buffer.hashCode + deepIterate.hashCode + initialOffset.hashCode

}
11 changes: 6 additions & 5 deletions core/src/main/scala/kafka/message/ByteBufferMessageSet.scala
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import kafka.utils.IteratorTemplate
*
*/
class ByteBufferMessageSet(val buffer: ByteBuffer,
val initialOffset: Long = 0L,
val errorCode: Int = ErrorMapping.NoError,
val deepIterate: Boolean = true) extends MessageSet {
private val logger = Logger.getLogger(getClass())
Expand All @@ -57,7 +58,7 @@ class ByteBufferMessageSet(val buffer: ByteBuffer,
message.serializeTo(buffer)
buffer.rewind
buffer
}, ErrorMapping.NoError, true)
}, 0L, ErrorMapping.NoError, true)
}

def this(compressionCodec: CompressionCodec, messages: Iterable[Message]) {
Expand All @@ -76,7 +77,7 @@ class ByteBufferMessageSet(val buffer: ByteBuffer,
message.serializeTo(buffer)
buffer.rewind
buffer
}, ErrorMapping.NoError, true)
}, 0L, ErrorMapping.NoError, true)
}

def getDeepIterate = deepIterate
Expand Down Expand Up @@ -154,7 +155,7 @@ class ByteBufferMessageSet(val buffer: ByteBuffer,
ErrorMapping.maybeThrowException(errorCode)
new IteratorTemplate[MessageOffset] {
var topIter = buffer.slice()
var currValidBytes = 0L
var currValidBytes = initialOffset
var innerIter:Iterator[MessageOffset] = null
var lastMessageSize = 0L

Expand Down Expand Up @@ -232,12 +233,12 @@ class ByteBufferMessageSet(val buffer: ByteBuffer,
other match {
case that: ByteBufferMessageSet =>
(that canEqual this) && errorCode == that.errorCode && buffer.equals(that.buffer) &&
deepIterate == that.deepIterate
deepIterate == that.deepIterate && initialOffset == that.initialOffset
case _ => false
}
}

override def canEqual(other: Any): Boolean = other.isInstanceOf[ByteBufferMessageSet]

override def hashCode: Int = 31 + (17 * errorCode) + buffer.hashCode + deepIterate.hashCode
override def hashCode: Int = 31 + (17 * errorCode) + buffer.hashCode + deepIterate.hashCode + initialOffset.hashCode
}
10 changes: 8 additions & 2 deletions core/src/main/scala/kafka/message/Message.scala
Original file line number Diff line number Diff line change
Expand Up @@ -133,8 +133,14 @@ class Message(val buffer: ByteBuffer) {

def attributes: Byte = buffer.get(AttributeOffset)

def compressionCodec:CompressionCodec =
CompressionCodec.getCompressionCodec(buffer.get(AttributeOffset) & CompressionCodeMask)
def compressionCodec:CompressionCodec = {
magic match {
case 0 => NoCompressionCodec
case 1 => CompressionCodec.getCompressionCodec(buffer.get(AttributeOffset) & CompressionCodeMask)
case _ => throw new RuntimeException("Invalid magic byte " + magic)
}

}

def checksum: Long = Utils.getUnsignedInt(buffer, CrcOffset(magic))

Expand Down
3 changes: 3 additions & 0 deletions core/src/main/scala/kafka/tools/ReplayLogProducer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,9 @@ object ReplayLogProducer {
props.put("broker.list", brokerInfoList(1))
props.put("reconnect.interval", Integer.MAX_VALUE.toString)
props.put("buffer.size", (64*1024).toString)
props.put("compression.codec", config.compressionCodec.codec.toString)
props.put("batch.size", config.batchSize.toString)

if(config.isAsync)
props.put("producer.type", "async")

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ class LogCorruptionTest extends JUnit3Suite with ProducerConsumerTestHarness wit
}
catch {
case e: InvalidMessageSizeException => "This is good"
case e: Exception => "This is not bad too !"
}

zkConsumerConnector1.shutdown
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ class ByteBufferMessageSetTest extends kafka.javaapi.message.BaseMessageSetTestC
val buffer = ByteBuffer.allocate(messageList.sizeInBytes.toInt + 2)
buffer.put(messageList.buffer)
buffer.putShort(4)
val messageListPlus = new ByteBufferMessageSet(buffer, 0, true)
val messageListPlus = new ByteBufferMessageSet(buffer, 0, 0, true)
assertEquals("Adding invalid bytes shouldn't change byte count", messageList.validBytes, messageListPlus.validBytes)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ class ServerShutdownTest extends JUnitSuite {
fetched = null
while(fetched == null || fetched.validBytes == 0)
fetched = consumer.fetch(new FetchRequest(topic, 0, newOffset, 10000))
TestUtils.checkEquals(sent2.iterator, fetched.iterator)
TestUtils.checkEquals(sent2.map(m => m.message).iterator, fetched.map(m => m.message).iterator)

server.shutdown()
Utils.rm(server.config.logDir)
Expand Down

0 comments on commit 18009f3

Please sign in to comment.