Skip to content

Commit

Permalink
More code cleanup. Changed the visibility of variables in ByteBufferM…
Browse files Browse the repository at this point in the history
…essageSet to private with accessor methods. Deleted unused code
  • Loading branch information
nehanarkhede committed Jul 22, 2011
1 parent 2715ec7 commit 4c49c4e
Show file tree
Hide file tree
Showing 7 changed files with 41 additions and 38 deletions.
8 changes: 5 additions & 3 deletions core/src/main/scala/kafka/consumer/ConsumerIterator.scala
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,12 @@ class ConsumerIterator(private val channel: BlockingQueue[FetchedDataChunk], con
private var current: Iterator[MessageOffset] = null
private var currentDataChunk: FetchedDataChunk = null
private var currentTopicInfo: PartitionTopicInfo = null
private var consumedOffset: Long = Long.MaxValue
private var consumedOffset: Long = -1L

override def next(): Message = {
val message = super.next
if(consumedOffset < 0)
throw new IllegalStateException("Offset returned by the message set is invalid %d".format(consumedOffset))
currentTopicInfo.resetConsumeOffset(consumedOffset)
if(logger.isTraceEnabled)
logger.trace("Setting consumed offset to %d".format(consumedOffset))
Expand All @@ -63,8 +65,8 @@ class ConsumerIterator(private val channel: BlockingQueue[FetchedDataChunk], con
} else {
currentTopicInfo = currentDataChunk.topicInfo
if (currentTopicInfo.getConsumeOffset != currentDataChunk.fetchOffset) {
logger.error("consumed offset: " + currentTopicInfo.getConsumeOffset + " doesn't match fetch offset: " +
currentDataChunk.fetchOffset + " for " + currentTopicInfo + "; consumer may lose data")
logger.error("consumed offset: %d doesn't match fetch offset: %d for %s;\n Consumer may lose data"
.format(currentTopicInfo.getConsumeOffset, currentDataChunk.fetchOffset, currentTopicInfo))
currentTopicInfo.resetConsumeOffset(currentDataChunk.fetchOffset)
}
current = currentDataChunk.messages.iterator
Expand Down
11 changes: 0 additions & 11 deletions core/src/main/scala/kafka/consumer/PartitionTopicInfo.scala
Original file line number Diff line number Diff line change
Expand Up @@ -53,17 +53,6 @@ private[consumer] class PartitionTopicInfo(val topic: String,
logger.debug("reset fetch offset of ( %s ) to %d".format(this, newFetchOffset))
}

/**
* Record the given number of bytes as having been consumed
*/
def consumed(messageSize: Long): Unit = {
if(logger.isTraceEnabled)
logger.trace("Current consumed offset = " + consumedOffset.get)
val newOffset = consumedOffset.addAndGet(messageSize)
if (logger.isDebugEnabled)
logger.debug("updated consume offset of ( %s ) to %d".format(this, newOffset))
}

/**
* Enqueue a message set for processing
* @return the number of valid bytes
Expand Down
8 changes: 5 additions & 3 deletions core/src/main/scala/kafka/javaapi/Implicits.scala
Original file line number Diff line number Diff line change
Expand Up @@ -29,17 +29,19 @@ private[javaapi] object Implicits {

implicit def scalaMessageSetToJavaMessageSet(messageSet: kafka.message.ByteBufferMessageSet):
kafka.javaapi.message.ByteBufferMessageSet = {
new kafka.javaapi.message.ByteBufferMessageSet(messageSet.getBuffer, messageSet.initialOffset,
new kafka.javaapi.message.ByteBufferMessageSet(messageSet.getBuffer, messageSet.getInitialOffset,
messageSet.getErrorCode, messageSet.getDeepIterate)
}

implicit def toJavaSyncProducer(producer: kafka.producer.SyncProducer): kafka.javaapi.producer.SyncProducer = {
logger.debug("Implicit instantiation of Java Sync Producer")
if(logger.isDebugEnabled)
logger.debug("Implicit instantiation of Java Sync Producer")
new kafka.javaapi.producer.SyncProducer(producer)
}

implicit def toSyncProducer(producer: kafka.javaapi.producer.SyncProducer): kafka.producer.SyncProducer = {
logger.debug("Implicit instantiation of Sync Producer")
if(logger.isDebugEnabled)
logger.debug("Implicit instantiation of Sync Producer")
producer.underlying
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,10 @@ import kafka.common.ErrorMapping
import org.apache.log4j.Logger
import kafka.message._

class ByteBufferMessageSet(val buffer: ByteBuffer,
val initialOffset: Long = 0L,
val errorCode: Int = ErrorMapping.NoError,
val deepIterate: Boolean = true) extends MessageSet {
class ByteBufferMessageSet(private val buffer: ByteBuffer,
private val initialOffset: Long = 0L,
private val errorCode: Int = ErrorMapping.NoError,
private val deepIterate: Boolean = true) extends MessageSet {
private val logger = Logger.getLogger(getClass())
val underlying: kafka.message.ByteBufferMessageSet = new kafka.message.ByteBufferMessageSet(buffer,
initialOffset,
Expand Down Expand Up @@ -55,6 +55,14 @@ class ByteBufferMessageSet(val buffer: ByteBuffer,

def serialized():ByteBuffer = underlying.serialized

def getInitialOffset = initialOffset

def getDeepIterate = deepIterate

def getBuffer = buffer

def getErrorCode = errorCode

override def iterator: java.util.Iterator[MessageOffset] = new java.util.Iterator[MessageOffset] {
val underlyingIterator = underlying.iterator
override def hasNext(): Boolean = {
Expand Down
12 changes: 7 additions & 5 deletions core/src/main/scala/kafka/message/ByteBufferMessageSet.scala
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,10 @@ import kafka.utils.IteratorTemplate
* Option 2: Give it a list of messages along with instructions relating to serialization format. Producers will use this method.
*
*/
class ByteBufferMessageSet(val buffer: ByteBuffer,
val initialOffset: Long = 0L,
val errorCode: Int = ErrorMapping.NoError,
val deepIterate: Boolean = true) extends MessageSet {
class ByteBufferMessageSet(private val buffer: ByteBuffer,
private val initialOffset: Long = 0L,
private val errorCode: Int = ErrorMapping.NoError,
private val deepIterate: Boolean = true) extends MessageSet {
private val logger = Logger.getLogger(getClass())
private var validByteCount = -1L
private var shallowValidByteCount = -1L
Expand All @@ -61,6 +61,8 @@ class ByteBufferMessageSet(val buffer: ByteBuffer,
}, 0L, ErrorMapping.NoError, true)
}

def getInitialOffset = initialOffset

def getDeepIterate = deepIterate

def getBuffer = buffer
Expand Down Expand Up @@ -105,7 +107,7 @@ class ByteBufferMessageSet(val buffer: ByteBuffer,
ErrorMapping.maybeThrowException(errorCode)
new IteratorTemplate[MessageOffset] {
var iter = buffer.slice()
var currValidBytes = 0
var currValidBytes = 0L

override def makeNext(): MessageOffset = {
// read the size of the item
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ class PrimitiveApiTest extends JUnit3Suite with ProducerConsumerTestHarness with
messages = getMessageList(Seq.empty[Message]: _*))
producer.send(topic, sent2)
Thread.sleep(200)
sent2.buffer.rewind
sent2.getBuffer.rewind
var fetched2 = consumer.fetch(new FetchRequest(topic, 0, 0, 10000))
TestUtils.checkEquals(sent2.iterator, fetched2.iterator)

Expand All @@ -61,7 +61,7 @@ class PrimitiveApiTest extends JUnit3Suite with ProducerConsumerTestHarness with
producer.send(topic, sent3)

Thread.sleep(200)
sent3.buffer.rewind
sent3.getBuffer.rewind
var fetched3: ByteBufferMessageSet = null
while(fetched3 == null || fetched3.validBytes == 0)
fetched3 = consumer.fetch(new FetchRequest(topic, 0, 0, 10000))
Expand Down Expand Up @@ -93,7 +93,7 @@ class PrimitiveApiTest extends JUnit3Suite with ProducerConsumerTestHarness with
messages = getMessageList(Seq.empty[Message]: _*))
producer.send(topic, sent2)
Thread.sleep(200)
sent2.buffer.rewind
sent2.getBuffer.rewind
var fetched2 = consumer.fetch(new FetchRequest(topic, 0, 0, 10000))
TestUtils.checkEquals(sent2.iterator, fetched2.iterator)

Expand All @@ -105,7 +105,7 @@ class PrimitiveApiTest extends JUnit3Suite with ProducerConsumerTestHarness with
producer.send(topic, sent3)

Thread.sleep(200)
sent3.buffer.rewind
sent3.getBuffer.rewind
var fetched3: ByteBufferMessageSet = null
while(fetched3 == null || fetched3.validBytes == 0)
fetched3 = consumer.fetch(new FetchRequest(topic, 0, 0, 10000))
Expand Down Expand Up @@ -140,7 +140,7 @@ class PrimitiveApiTest extends JUnit3Suite with ProducerConsumerTestHarness with
new Message(("b_" + topic).getBytes)))
messages += topic -> set
producer.send(topic, set)
set.buffer.rewind
set.getBuffer.rewind
fetches += new FetchRequest(topic, 0, 0, 10000)
}

Expand Down Expand Up @@ -213,7 +213,7 @@ class PrimitiveApiTest extends JUnit3Suite with ProducerConsumerTestHarness with
new Message(("b_" + topic).getBytes)))
messages += topic -> set
producer.send(topic, set)
set.buffer.rewind
set.getBuffer.rewind
fetches += new FetchRequest(topic, 0, 0, 10000)
}

Expand Down Expand Up @@ -286,7 +286,7 @@ class PrimitiveApiTest extends JUnit3Suite with ProducerConsumerTestHarness with
new Message(("b_" + topic).getBytes)))
messages += topic -> set
producer.send(topic, set)
set.buffer.rewind
set.getBuffer.rewind
fetches.add(new FetchRequest(topic, 0, 0, 10000))
}

Expand Down Expand Up @@ -317,7 +317,7 @@ class PrimitiveApiTest extends JUnit3Suite with ProducerConsumerTestHarness with
new Message(("b_" + topic).getBytes)))
messages += topic -> set
producer.send(topic, set)
set.buffer.rewind
set.getBuffer.rewind
fetches.add(new FetchRequest(topic, 0, 0, 10000))
}

Expand Down Expand Up @@ -353,7 +353,7 @@ class PrimitiveApiTest extends JUnit3Suite with ProducerConsumerTestHarness with
producer.multiSend(produceList.toArray)

for (messageSet <- messages.values)
messageSet.buffer.rewind
messageSet.getBuffer.rewind

// wait a bit for produced message to be available
Thread.sleep(200)
Expand Down Expand Up @@ -386,7 +386,7 @@ class PrimitiveApiTest extends JUnit3Suite with ProducerConsumerTestHarness with
producer.multiSend(produceList.toArray)

for (messageSet <- messages.values)
messageSet.buffer.rewind
messageSet.getBuffer.rewind

// wait a bit for produced message to be available
Thread.sleep(200)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ class ByteBufferMessageSetTest extends kafka.javaapi.message.BaseMessageSetTestC
messages = getMessageList(new Message("hello".getBytes()),
new Message("there".getBytes())))
val buffer = ByteBuffer.allocate(messageList.sizeInBytes.toInt + 2)
buffer.put(messageList.buffer)
buffer.put(messageList.getBuffer)
buffer.putShort(4)
val messageListPlus = new ByteBufferMessageSet(buffer)
assertEquals("Adding invalid bytes shouldn't change byte count", messageList.validBytes, messageListPlus.validBytes)
Expand All @@ -46,7 +46,7 @@ class ByteBufferMessageSetTest extends kafka.javaapi.message.BaseMessageSetTestC
messages = getMessageList(new Message("hello".getBytes()),
new Message("there".getBytes())))
val buffer = ByteBuffer.allocate(messageList.sizeInBytes.toInt + 2)
buffer.put(messageList.buffer)
buffer.put(messageList.getBuffer)
buffer.putShort(4)
val messageListPlus = new ByteBufferMessageSet(buffer, 0, 0, true)
assertEquals("Adding invalid bytes shouldn't change byte count", messageList.validBytes, messageListPlus.validBytes)
Expand Down

0 comments on commit 4c49c4e

Please sign in to comment.