Skip to content

Commit

Permalink
Renamed MessageOffset to MessageAndOffset. Cleaned up code. Changed m…
Browse files Browse the repository at this point in the history
…ethod names to camel case in Message.scala. Moved compression config to ProducerConfig
  • Loading branch information
nehanarkhede committed Jul 29, 2011
1 parent c227dd3 commit 4b4c50d
Show file tree
Hide file tree
Showing 7 changed files with 43 additions and 42 deletions.
4 changes: 3 additions & 1 deletion core/src/main/scala/kafka/log/Log.scala
Original file line number Diff line number Diff line change
Expand Up @@ -329,7 +329,9 @@ private[log] class Log(val dir: File, val maxSize: Long, val flushInterval: Int,
startIndex = 0
case _ =>
var isFound = false
println("Offset time array = " + offsetTimeArray.foreach(o => println(o._1 + ", " + o._2)))
if(logger.isDebugEnabled) {
logger.debug("Offset time array = " + offsetTimeArray.foreach(o => "%d, %d".format(o._1, o._2)))
}
startIndex = offsetTimeArray.length - 1
while (startIndex >= 0 && !isFound) {
if (offsetTimeArray(startIndex)._2 <= request.time)
Expand Down
35 changes: 17 additions & 18 deletions core/src/main/scala/kafka/message/Message.scala
Original file line number Diff line number Diff line change
Expand Up @@ -34,14 +34,22 @@ object Message {
val MagicLength = 1
val AttributeOffset = MagicOffset + MagicLength
val AttributeLength = 1
/**
* Specifies the mask for the compression code. 2 bits to hold the compression codec.
* 0 is reserved to indicate no compression
*/
val CompressionCodeMask: Int = 0x03 //


val NoCompression:Int = 0

/**
* Computes the CRC value based on the magic byte
* @param magic Specifies the magic byte value. Possible values are 0 and 1
* 0 for no compression
* 1 for compression
*/
def CrcOffset(magic: Byte): Int = magic match {
def crcOffset(magic: Byte): Int = magic match {
case MagicVersion1 => MagicOffset + MagicLength
case MagicVersion2 => AttributeOffset + AttributeLength
case _ => throw new UnknownMagicByteException("Magic byte value of %d is unknown".format(magic))
Expand All @@ -55,29 +63,20 @@ object Message {
* 0 for no compression
* 1 for compression
*/
def PayloadOffset(magic: Byte): Int = CrcOffset(magic) + CrcLength
def payloadOffset(magic: Byte): Int = crcOffset(magic) + CrcLength

/**
* Computes the size of the message header based on the magic byte
* @param magic Specifies the magic byte value. Possible values are 0 and 1
* 0 for no compression
* 1 for compression
*/
def HeaderSize(magic: Byte): Int = PayloadOffset(magic)
def headerSize(magic: Byte): Int = payloadOffset(magic)

/**
* Size of the header for magic byte 0. This is the minimum size of any message header
*/
val MinHeaderSize = HeaderSize(0);

/**
* Specifies the mask for the compression code. 2 bits to hold the compression codec.
* 0 is reserved to indicate no compression
*/
val CompressionCodeMask: Int = 0x03 //


val NoCompression:Int = 0
val MinHeaderSize = headerSize(0);
}

/**
Expand Down Expand Up @@ -108,7 +107,7 @@ class Message(val buffer: ByteBuffer) {


private def this(checksum: Long, bytes: Array[Byte], compressionCodec: CompressionCodec) = {
this(ByteBuffer.allocate(Message.HeaderSize(Message.CurrentMagicValue) + bytes.length))
this(ByteBuffer.allocate(Message.headerSize(Message.CurrentMagicValue) + bytes.length))
buffer.put(CurrentMagicValue)
var attributes:Byte = 0
if (compressionCodec.codec > 0) {
Expand All @@ -131,7 +130,7 @@ class Message(val buffer: ByteBuffer) {

def size: Int = buffer.limit

def payloadSize: Int = size - HeaderSize(magic)
def payloadSize: Int = size - headerSize(magic)

def magic: Byte = buffer.get(MagicOffset)

Expand All @@ -146,19 +145,19 @@ class Message(val buffer: ByteBuffer) {

}

def checksum: Long = Utils.getUnsignedInt(buffer, CrcOffset(magic))
def checksum: Long = Utils.getUnsignedInt(buffer, crcOffset(magic))

def payload: ByteBuffer = {
var payload = buffer.duplicate
payload.position(HeaderSize(magic))
payload.position(headerSize(magic))
payload = payload.slice()
payload.limit(payloadSize)
payload.rewind()
payload
}

def isValid: Boolean =
checksum == Utils.crc32(buffer.array, buffer.position + buffer.arrayOffset + PayloadOffset(magic), payloadSize)
checksum == Utils.crc32(buffer.array, buffer.position + buffer.arrayOffset + payloadOffset(magic), payloadSize)

def serializedSize: Int = 4 /* int size*/ + buffer.limit

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,4 +19,4 @@ package kafka.message
/**
* Represents message and offset of the next message. This is used in the MessageSet to iterate over it
*/
case class MessageOffset(val message: Message, val offset: Long)
case class MessageAndOffset(val message: Message, val offset: Long)
19 changes: 19 additions & 0 deletions core/src/main/scala/kafka/producer/ProducerConfig.scala
Original file line number Diff line number Diff line change
Expand Up @@ -38,4 +38,23 @@ class ProducerConfig(val props: Properties) extends ZKConfig(props)
* or not. Valid values are - async for asynchronous send *
* sync for synchronous send */
val producerType = Utils.getString(props, "producer.type", "sync")

/**
* This parameter allows you to specify the compression codec for all data generated *
* by this producer. The default is NoCompressionCodec
*/
val compressionCodec = Utils.getCompressionCodec(props, "compression.codec")

/** This parameter allows you to set whether compression should be turned *
* on for particular topics
*
* If the compression codec is anything other than NoCompressionCodec,
*
* Enable compression only for specified topics if any
*
* If the list of compressed topics is empty, then enable the specified compression codec for all topics
*
* If the compression codec is NoCompressionCodec, compression is disabled for all topics
*/
val compressedTopics = Utils.getCSVList(Utils.getString(props, "compressed.topics", null))
}
19 changes: 0 additions & 19 deletions core/src/main/scala/kafka/producer/SyncProducerConfig.scala
Original file line number Diff line number Diff line change
Expand Up @@ -41,23 +41,4 @@ trait SyncProducerConfigShared {
val reconnectInterval = Utils.getInt(props, "reconnect.interval", 30000)

val maxMessageSize = Utils.getInt(props, "max.message.size", 1000000)

/**
* This parameter allows you to specify the compression codec for all data generated *
* by this producer. The default is NoCompressionCodec
*/
val compressionCodec = Utils.getCompressionCodec(props, "compression.codec")

/** This parameter allows you to set whether compression should be turned *
* on for particular topics
*
* If the compression codec is anything other than NoCompressionCodec,
*
* Enable compression only for specified topics if any
*
* If the list of compressed topics is empty, then enable the specified compression codec for all topics
*
* If the compression codec is NoCompressionCodec, compression is disabled for all topics
*/
val compressedTopics = Utils.getCSVList(Utils.getString(props, "compressed.topics", null))
}
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ private[kafka] class AsyncProducer[T](config: AsyncProducerConfig,
cbkHandler.init(cbkHandlerProps)
private val sendThread = new ProducerSendThread("ProducerSendThread-" + AsyncProducer.Random.nextInt, queue,
serializer, producer,
if(eventHandler != null) eventHandler else new DefaultEventHandler[T](config, cbkHandler),
if(eventHandler != null) eventHandler else new DefaultEventHandler[T](new ProducerConfig(config.props), cbkHandler),
cbkHandler, config.queueTime, config.batchSize, AsyncProducer.Shutdown)
sendThread.setDaemon(false)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,9 @@ import kafka.api.ProducerRequest
import kafka.serializer.Encoder
import java.util.Properties
import kafka.message.{NoCompressionCodec, ByteBufferMessageSet}
import kafka.producer.{SyncProducerConfigShared, SyncProducerConfig, SyncProducer}
import kafka.producer.{ProducerConfig, SyncProducerConfigShared, SyncProducerConfig, SyncProducer}

private[kafka] class DefaultEventHandler[T](val config: SyncProducerConfigShared,
private[kafka] class DefaultEventHandler[T](val config: ProducerConfig,
val cbkHandler: CallbackHandler[T]) extends EventHandler[T] {

private val logger = Logger.getLogger(classOf[DefaultEventHandler[T]])
Expand Down

0 comments on commit 4b4c50d

Please sign in to comment.