Skip to content

Commit

Permalink
Consumer Iterator modified to update consumed offset one message at a…
Browse files Browse the repository at this point in the history
… time for uncompressed messages and one compressed message at a time for compressed messages
  • Loading branch information
Neha Narkhede committed Jul 21, 2011
1 parent ebf9b1c commit d423896
Show file tree
Hide file tree
Showing 5 changed files with 20 additions and 24 deletions.
14 changes: 4 additions & 10 deletions core/src/main/scala/kafka/consumer/ConsumerIterator.scala
Original file line number Diff line number Diff line change
Expand Up @@ -33,15 +33,13 @@ class ConsumerIterator(private val channel: BlockingQueue[FetchedDataChunk], con
private val logger = Logger.getLogger(classOf[ConsumerIterator])
private var current: Iterator[MessageOffset] = null
private var currentDataChunk: FetchedDataChunk = null
private var setConsumedOffset: Boolean = false
private var currentTopicInfo: PartitionTopicInfo = null
private var consumedOffset: Long = Long.MaxValue

override def next(): Message = {
val message = super.next
if(setConsumedOffset) {
currentTopicInfo.consumed(currentDataChunk.messages.shallowValidBytes)
setConsumedOffset = false
}
currentTopicInfo.resetConsumeOffset(consumedOffset)
logger.info("Setting consumed offset to %d".format(consumedOffset))
message
}

Expand Down Expand Up @@ -71,11 +69,7 @@ class ConsumerIterator(private val channel: BlockingQueue[FetchedDataChunk], con
}
}
val item = current.next
if(!current.hasNext) {
// the iterator in this data chunk is exhausted. Update the consumed offset now
setConsumedOffset = true
}

consumedOffset = item.offset
item.message
}

Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/kafka/consumer/FetcherRunnable.scala
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ class FetcherRunnable(val name: String,

val response = simpleConsumer.multifetch(fetches : _*)

var read = 0
var read = 0L

for((messages, info) <- response.zip(partitionTopicInfos)) {
try {
Expand Down
4 changes: 2 additions & 2 deletions core/src/main/scala/kafka/consumer/PartitionTopicInfo.scala
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ private[consumer] class PartitionTopicInfo(val topic: String,
/**
* Record the given number of bytes as having been consumed
*/
def consumed(messageSize: Int): Unit = {
def consumed(messageSize: Long): Unit = {
if(logger.isTraceEnabled)
logger.trace("Current consumed offset = " + consumedOffset.get)
val newOffset = consumedOffset.addAndGet(messageSize)
Expand All @@ -68,7 +68,7 @@ private[consumer] class PartitionTopicInfo(val topic: String,
* Enqueue a message set for processing
* @return the number of valid bytes
*/
def enqueue(messages: ByteBufferMessageSet, fetchOffset: Long): Int = {
def enqueue(messages: ByteBufferMessageSet, fetchOffset: Long): Long = {
val size = messages.shallowValidBytes
if(size > 0) {
// update fetched offset to the compressed data chunk size, not the decompressed message set size
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ class ByteBufferMessageSet(val buffer: ByteBuffer,
}, ErrorMapping.NoError, true)
}

def validBytes: Int = underlying.validBytes
def validBytes: Long = underlying.validBytes

def serialized():ByteBuffer = underlying.serialized

Expand Down
22 changes: 12 additions & 10 deletions core/src/main/scala/kafka/message/ByteBufferMessageSet.scala
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,9 @@ class ByteBufferMessageSet(val buffer: ByteBuffer,
val errorCode: Int = ErrorMapping.NoError,
val deepIterate: Boolean = true) extends MessageSet {
private val logger = Logger.getLogger(getClass())
private var validByteCount = -1
private var shallowValidByteCount = -1
private var deepValidByteCount = -1
private var validByteCount = -1L
private var shallowValidByteCount = -1L
private var deepValidByteCount = -1L

def this(compressionCodec: CompressionCodec, messages: Message*) {
this(
Expand Down Expand Up @@ -87,12 +87,12 @@ class ByteBufferMessageSet(val buffer: ByteBuffer,

def serialized(): ByteBuffer = buffer

def validBytes: Int = deepIterate match {
def validBytes: Long = deepIterate match {
case true => deepValidBytes
case false => shallowValidBytes
}

def shallowValidBytes: Int = {
def shallowValidBytes: Long = {
if(shallowValidByteCount < 0) {
val iter = shallowIterator
while(iter.hasNext)
Expand All @@ -101,7 +101,7 @@ class ByteBufferMessageSet(val buffer: ByteBuffer,
shallowValidByteCount
}

def deepValidBytes: Int = {
def deepValidBytes: Long = {
if (deepValidByteCount < 0) {
val iter = deepIterator
while (iter.hasNext)
Expand Down Expand Up @@ -154,8 +154,9 @@ class ByteBufferMessageSet(val buffer: ByteBuffer,
ErrorMapping.maybeThrowException(errorCode)
new IteratorTemplate[MessageOffset] {
var topIter = buffer.slice()
var currValidBytes = 0
var currValidBytes = 0L
var innerIter:Iterator[MessageOffset] = null
var lastMessageSize = 0L

def innerDone():Boolean = (innerIter==null || !innerIter.hasNext)

Expand All @@ -165,6 +166,8 @@ class ByteBufferMessageSet(val buffer: ByteBuffer,
return allDone()
}
val size = topIter.getInt()
lastMessageSize = size

if(logger.isTraceEnabled) {
logger.trace("Remaining bytes in iterator = " + topIter.remaining)
logger.trace("size of data = " + size)
Expand Down Expand Up @@ -203,9 +206,8 @@ class ByteBufferMessageSet(val buffer: ByteBuffer,
case true => makeNextOuter
case false => {
val messageAndOffset = innerIter.next
//TODO: Alternatively check if messageAndOffset.offset can be used
// currValidBytes += messageAndOffset.message.serializedSize
currValidBytes += messageAndOffset.offset
if(!innerIter.hasNext)
currValidBytes += 4 + lastMessageSize
new MessageOffset(messageAndOffset.message, currValidBytes)
}
}
Expand Down

0 comments on commit d423896

Please sign in to comment.