Skip to content

Commit

Permalink
Replaced references of MessageOffset to use MessageAndOffset
Browse files Browse the repository at this point in the history
  • Loading branch information
nehanarkhede committed Jul 29, 2011
1 parent 4b4c50d commit 08276e1
Show file tree
Hide file tree
Showing 70 changed files with 117 additions and 107 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
import kafka.javaapi.consumer.SimpleConsumer;
import kafka.javaapi.message.ByteBufferMessageSet;
import kafka.message.Message;
import kafka.message.MessageOffset;
import kafka.message.MessageAndOffset;
import kafka.message.MessageSet;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.mapred.JobConf;
Expand Down Expand Up @@ -44,7 +44,7 @@ public class KafkaETLContext {
protected long _count; /*current count*/

protected MultiFetchResponse _response = null; /*fetch response*/
protected Iterator<MessageOffset> _messageIt = null; /*message iterator*/
protected Iterator<MessageAndOffset> _messageIt = null; /*message iterator*/

protected int _retry = 0;
protected long _requestTime = 0; /*accumulative request time*/
Expand Down Expand Up @@ -123,7 +123,7 @@ public boolean getNext(KafkaETLKey key, BytesWritable value) throws IOException
while ( !gotNext && iter.hasNext()) {
ByteBufferMessageSet msgSet = iter.next();
if ( hasError(msgSet)) return false;
_messageIt = (Iterator<MessageOffset>) msgSet.iterator();
_messageIt = (Iterator<MessageAndOffset>) msgSet.iterator();
gotNext = get(key, value);
}
}
Expand Down Expand Up @@ -172,7 +172,7 @@ public void close() throws IOException {

protected boolean get(KafkaETLKey key, BytesWritable value) throws IOException {
if (_messageIt != null && _messageIt.hasNext()) {
MessageOffset msgAndOffset = _messageIt.next();
MessageAndOffset msgAndOffset = _messageIt.next();

ByteBuffer buf = msgAndOffset.message().payload();
int origSize = buf.remaining();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,4 +24,4 @@ public UndefinedPropertyException(String message) {
super(message);
}

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,4 +21,4 @@ package kafka.common
*/
class InvalidConfigException(message: String) extends RuntimeException(message) {
def this() = this(null)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,4 +20,4 @@ package kafka.common
*/
class InvalidPartitionException(message: String) extends RuntimeException(message) {
def this() = this(null)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,4 +20,4 @@ package kafka.common
*/
class UnavailableProducerException(message: String) extends RuntimeException(message) {
def this() = this(null)
}
}
2 changes: 1 addition & 1 deletion core/src/main/scala/kafka/consumer/ConsumerConnector.scala
Original file line number Diff line number Diff line change
Expand Up @@ -71,4 +71,4 @@ object Consumer {
Utils.swallow(logger.warn, Utils.registerMBean(consumerConnect.underlying, consumerStatsMBeanName))
consumerConnect
}
}
}
4 changes: 2 additions & 2 deletions core/src/main/scala/kafka/consumer/ConsumerIterator.scala
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import kafka.utils.IteratorTemplate
import org.apache.log4j.Logger
import java.util.concurrent.{TimeUnit, BlockingQueue}
import kafka.cluster.Partition
import kafka.message.{MessageOffset, MessageSet, Message}
import kafka.message.{MessageAndOffset, MessageSet, Message}

/**
* An iterator that blocks until a value can be read from the supplied queue.
Expand All @@ -31,7 +31,7 @@ class ConsumerIterator(private val channel: BlockingQueue[FetchedDataChunk], con
extends IteratorTemplate[Message] {

private val logger = Logger.getLogger(classOf[ConsumerIterator])
private var current: Iterator[MessageOffset] = null
private var current: Iterator[MessageAndOffset] = null
private var currentDataChunk: FetchedDataChunk = null
private var currentTopicInfo: PartitionTopicInfo = null
private var consumedOffset: Long = -1L
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/kafka/consumer/TopicCount.scala
Original file line number Diff line number Diff line change
Expand Up @@ -86,4 +86,4 @@ private[consumer] class TopicCount(val consumerIdString: String, val topicCountM
builder.append(" }")
builder.toString
}
}
}
2 changes: 1 addition & 1 deletion core/src/main/scala/kafka/javaapi/MultiFetchResponse.scala
Original file line number Diff line number Diff line change
Expand Up @@ -41,4 +41,4 @@ class MultiFetchResponse(buffer: ByteBuffer, numSets: Int, offsets: Array[Long])
}
}
}
}
}
2 changes: 1 addition & 1 deletion core/src/main/scala/kafka/javaapi/ProducerRequest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -48,4 +48,4 @@ class ProducerRequest(val topic: String,

override def hashCode: Int = 31 + (17 * partition) + topic.hashCode + messages.hashCode

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -67,4 +67,4 @@ class SimpleConsumer(val host: String,
def close() {
underlying.close
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -85,4 +85,4 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
def shutdown() {
underlying.shutdown
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -67,13 +67,13 @@ class ByteBufferMessageSet(private val buffer: ByteBuffer,

def getErrorCode = errorCode

override def iterator: java.util.Iterator[MessageOffset] = new java.util.Iterator[MessageOffset] {
override def iterator: java.util.Iterator[MessageAndOffset] = new java.util.Iterator[MessageAndOffset] {
val underlyingIterator = underlying.iterator
override def hasNext(): Boolean = {
underlyingIterator.hasNext
}

override def next(): MessageOffset = {
override def next(): MessageAndOffset = {
underlyingIterator.next
}

Expand All @@ -97,4 +97,4 @@ class ByteBufferMessageSet(private val buffer: ByteBuffer,

override def hashCode: Int = 31 * (17 + errorCode) + buffer.hashCode + deepIterate.hashCode + initialOffset.hashCode

}
}
8 changes: 4 additions & 4 deletions core/src/main/scala/kafka/javaapi/message/MessageSet.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
package kafka.javaapi.message

import java.nio.channels.WritableByteChannel
import kafka.message.{MessageOffset, InvalidMessageException, Message}
import kafka.message.{MessageAndOffset, InvalidMessageException, Message}

/**
* A set of messages. A message set has a fixed serialized form, though the container
Expand All @@ -26,12 +26,12 @@ import kafka.message.{MessageOffset, InvalidMessageException, Message}
* 4 byte size containing an integer N
* N message bytes as described in the message class
*/
abstract class MessageSet extends java.lang.Iterable[MessageOffset] {
abstract class MessageSet extends java.lang.Iterable[MessageAndOffset] {

/**
* Provides an iterator over the messages in this set
*/
def iterator: java.util.Iterator[MessageOffset]
def iterator: java.util.Iterator[MessageAndOffset]

/**
* Gives the total size of this message set in bytes
Expand All @@ -50,4 +50,4 @@ abstract class MessageSet extends java.lang.Iterable[MessageOffset] {
throw new InvalidMessageException
}
}
}
}
2 changes: 1 addition & 1 deletion core/src/main/scala/kafka/javaapi/producer/Producer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -119,4 +119,4 @@ class Producer[K,V](config: ProducerConfig,
* the zookeeper client connection if one exists
*/
def close = underlying.close
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,4 +30,4 @@ class ProducerData[K, V](private val topic: String,
def getKey: K = key

def getData: java.util.List[V] = data
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -44,4 +44,4 @@ class SyncProducer(syncProducer: kafka.producer.SyncProducer) {
def close() {
underlying.close
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,4 +23,4 @@ class ByteBufferBackedInputStream(buffer:ByteBuffer) extends InputStream {
case false => -1
}
}
}
}
48 changes: 29 additions & 19 deletions core/src/main/scala/kafka/message/ByteBufferMessageSet.scala
Original file line number Diff line number Diff line change
Expand Up @@ -82,11 +82,13 @@ class ByteBufferMessageSet(private val buffer: ByteBuffer,

def shallowValidBytes: Long = {
if(shallowValidByteCount < 0) {
val iter = shallowIterator
while(iter.hasNext)
iter.next()
val iter = deepIterator
while(iter.hasNext) {
val messageAndOffset = iter.next
shallowValidByteCount = messageAndOffset.offset
}
}
shallowValidByteCount
shallowValidByteCount - initialOffset
}

def deepValidBytes: Long = {
Expand All @@ -102,23 +104,29 @@ class ByteBufferMessageSet(private val buffer: ByteBuffer,
def writeTo(channel: WritableByteChannel, offset: Long, size: Long): Long =
channel.write(buffer.duplicate)

override def iterator: Iterator[MessageOffset] = deepIterate match {
override def iterator: Iterator[MessageAndOffset] = deepIterate match {
case true => deepIterator
case false => shallowIterator
}

private def shallowIterator(): Iterator[MessageOffset] = {

/**
* Applications won't require to use this API for shallow iteration
* of compressed message sets. But unit tests might want it. So we
* cannot get rid of this API all together.
*/
private def shallowIterator(): Iterator[MessageAndOffset] = {
ErrorMapping.maybeThrowException(errorCode)
new IteratorTemplate[MessageOffset] {
new IteratorTemplate[MessageAndOffset] {
var iter = buffer.slice()
var currValidBytes = 0L
override def makeNext(): MessageOffset = {
var currValidBytes = initialOffset

override def makeNext(): MessageAndOffset = {
// read the size of the item
if(iter.remaining < 4) {
shallowValidByteCount = currValidBytes
return allDone()
}

val size = iter.getInt()
if(size < 0 || iter.remaining < size) {
shallowValidByteCount = currValidBytes
Expand All @@ -128,27 +136,29 @@ class ByteBufferMessageSet(private val buffer: ByteBuffer,
.format(size, iter.remaining, currValidBytes))
return allDone()
}

currValidBytes += 4 + size
val message = iter.slice()
message.limit(size)

iter.position(iter.position + size)
new MessageOffset(new Message(message), currValidBytes)
new MessageAndOffset(new Message(message), currValidBytes)
}
}
}


private def deepIterator(): Iterator[MessageOffset] = {
private def deepIterator(): Iterator[MessageAndOffset] = {
ErrorMapping.maybeThrowException(errorCode)
new IteratorTemplate[MessageOffset] {
new IteratorTemplate[MessageAndOffset] {
var topIter = buffer.slice()
var currValidBytes = initialOffset
var innerIter:Iterator[MessageOffset] = null
var innerIter:Iterator[MessageAndOffset] = null
var lastMessageSize = 0L

def innerDone():Boolean = (innerIter==null || !innerIter.hasNext)

def makeNextOuter: MessageOffset = {
def makeNextOuter: MessageAndOffset = {
if (topIter.remaining < 4) {
deepValidByteCount = currValidBytes
return allDone()
Expand Down Expand Up @@ -178,7 +188,7 @@ class ByteBufferMessageSet(private val buffer: ByteBuffer,
logger.debug("Message is uncompressed. Valid byte count = %d".format(currValidBytes))
innerIter = null
currValidBytes += 4 + size
new MessageOffset(newMessage, currValidBytes)
new MessageAndOffset(newMessage, currValidBytes)
case _ =>
if(logger.isDebugEnabled)
logger.debug("Message is compressed. Valid byte count = %d".format(currValidBytes))
Expand All @@ -187,7 +197,7 @@ class ByteBufferMessageSet(private val buffer: ByteBuffer,
}
}

override def makeNext(): MessageOffset = {
override def makeNext(): MessageAndOffset = {
if(logger.isDebugEnabled)
logger.debug("makeNext() in deepIterator: innerDone = " + innerDone)
innerDone match {
Expand All @@ -196,7 +206,7 @@ class ByteBufferMessageSet(private val buffer: ByteBuffer,
val messageAndOffset = innerIter.next
if(!innerIter.hasNext)
currValidBytes += 4 + lastMessageSize
new MessageOffset(messageAndOffset.message, currValidBytes)
new MessageAndOffset(messageAndOffset.message, currValidBytes)
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/kafka/message/CompressionCodec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -32,4 +32,4 @@ case object DefaultCompressionCodec extends CompressionCodec { val codec = 1 }

case object GZIPCompressionCodec extends CompressionCodec { val codec = 1 }

case object NoCompressionCodec extends CompressionCodec { val codec = 0 }
case object NoCompressionCodec extends CompressionCodec { val codec = 0 }
2 changes: 1 addition & 1 deletion core/src/main/scala/kafka/message/CompressionUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -140,4 +140,4 @@ object CompressionUtils {
case _ =>
throw new kafka.common.UnknownCodecException("Unknown Codec: " + message.compressionCodec)
}
}
}
16 changes: 8 additions & 8 deletions core/src/main/scala/kafka/message/FileMessageSet.scala
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,10 @@ import kafka.utils._
*/
@nonthreadsafe
class FileMessageSet private[kafka](private[message] val channel: FileChannel,
private[message] val offset: Long,
private[message] val limit: Long,
val mutable: Boolean,
val needRecover: AtomicBoolean) extends MessageSet {
private[message] val offset: Long,
private[message] val limit: Long,
val mutable: Boolean,
val needRecover: AtomicBoolean) extends MessageSet {

private val setSize = new AtomicLong()
private val setHighWaterMark = new AtomicLong()
Expand Down Expand Up @@ -108,11 +108,11 @@ class FileMessageSet private[kafka](private[message] val channel: FileChannel,
/**
* Get an iterator over the messages in the set
*/
override def iterator: Iterator[MessageOffset] = {
new IteratorTemplate[MessageOffset] {
override def iterator: Iterator[MessageAndOffset] = {
new IteratorTemplate[MessageAndOffset] {
var location = offset

override def makeNext(): MessageOffset = {
override def makeNext(): MessageAndOffset = {
// read the size of the item
val sizeBuffer = ByteBuffer.allocate(4)
channel.read(sizeBuffer, location)
Expand All @@ -133,7 +133,7 @@ class FileMessageSet private[kafka](private[message] val channel: FileChannel,

// increment the location and return the item
location += size + 4
new MessageOffset(new Message(buffer), location)
new MessageAndOffset(new Message(buffer), location)
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/kafka/message/MessageAndOffset.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,4 +19,4 @@ package kafka.message
/**
* Represents message and offset of the next message. This is used in the MessageSet to iterate over it
*/
case class MessageAndOffset(val message: Message, val offset: Long)
case class MessageAndOffset(val message: Message, val offset: Long)
Loading

0 comments on commit 08276e1

Please sign in to comment.