Skip to content

Commit

Permalink
More renaming of constants to conform to Scala style. Adding accessor…
Browse files Browse the repository at this point in the history
… methods to ByteBufferMessageSet class variables and making them private
  • Loading branch information
Neha Narkhede committed Jul 18, 2011
1 parent f0c6cfd commit b882ccc
Show file tree
Hide file tree
Showing 11 changed files with 101 additions and 57 deletions.
1 change: 1 addition & 0 deletions contrib/hadoop-consumer/src/main/Main.iml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
<sourceFolder url="file://$MODULE_DIR$/../../../../core/src/main/scala" isTestSource="false" />
<sourceFolder url="file://$MODULE_DIR$/../../../../examples/src/main/java" isTestSource="false" />
<sourceFolder url="file://$MODULE_DIR$/../../../../perf/src/main/java" isTestSource="false" />
<sourceFolder url="file://$MODULE_DIR$/../../../../core/src/test/scala" isTestSource="true" />
</content>
<orderEntry type="inheritedJdk" />
<orderEntry type="sourceFolder" forTests="false" />
Expand Down
25 changes: 25 additions & 0 deletions core/src/main/scala/kafka/common/UnknownCodecException.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
/*
* Copyright 2010 LinkedIn
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package kafka.common

/**
* Indicates the client has requested a range no longer available on the server
*/
class UnknownCodecException(message: String) extends RuntimeException(message) {
def this() = this(null)
}

2 changes: 1 addition & 1 deletion core/src/main/scala/kafka/consumer/FetcherRunnable.scala
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ class FetcherRunnable(val name: String,
for((messages, info) <- response.zip(partitionTopicInfos)) {
try {
var done = false
if(messages.errorCode == ErrorMapping.OffsetOutOfRangeCode) {
if(messages.getErrorCode == ErrorMapping.OffsetOutOfRangeCode) {
logger.info("offset " + info.getFetchOffset + " out of range")
// see if we can fix this error
val resetOffset = resetConsumerOffsets(info.topic, info.partition)
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/kafka/javaapi/Implicits.scala
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ private[javaapi] object Implicits {

implicit def scalaMessageSetToJavaMessageSet(messageSet: kafka.message.ByteBufferMessageSet):
kafka.javaapi.message.ByteBufferMessageSet = {
new kafka.javaapi.message.ByteBufferMessageSet(messageSet.buffer, messageSet.errorCode, messageSet.deepIterate)
new kafka.javaapi.message.ByteBufferMessageSet(messageSet.getBuffer, messageSet.getErrorCode, messageSet.getDeepIterate)
}

implicit def toJavaSyncProducer(producer: kafka.producer.SyncProducer): kafka.javaapi.producer.SyncProducer = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,9 @@ class ByteBufferMessageSet protected () extends MessageSet {
def this(compressionEnabled: Boolean = false, messages: java.util.List[Message]) {
this()
this.underlying = new kafka.message.ByteBufferMessageSet(compressionEnabled, asBuffer(messages))
this.buffer = underlying.buffer
this.errorCode = underlying.errorCode
this.deepIterate = underlying.deepIterate
this.buffer = underlying.getBuffer
this.errorCode = underlying.getErrorCode
this.deepIterate = underlying.getDeepIterate
}

def validBytes: Int = underlying.validBytes
Expand Down
27 changes: 16 additions & 11 deletions core/src/main/scala/kafka/message/ByteBufferMessageSet.scala
Original file line number Diff line number Diff line change
Expand Up @@ -35,23 +35,22 @@ import kafka.common.{InvalidMessageSizeException, ErrorMapping}
class ByteBufferMessageSet protected () extends MessageSet {
private val logger = Logger.getLogger(getClass())
private var validByteCount = -1
var buffer:ByteBuffer = null
var errorCode:Int = ErrorMapping.NoError
private var buffer: ByteBuffer = null
private var errorCode: Int = ErrorMapping.NoError
private var shallowValidByteCount = -1
private var deepValidByteCount = -1
var deepIterate = false


def this(buffer: ByteBuffer, errorCode: Int, deepIterate: Boolean = false) = {
private var deepIterate = true

def this(buffer: ByteBuffer, errorCode: Int, deepIterate: Boolean = true) = {
this()
this.buffer = buffer
this.errorCode = errorCode
this.deepIterate = deepIterate
}

def this(buffer: ByteBuffer) = this(buffer, ErrorMapping.NoError, false)
def this(buffer: ByteBuffer) = this(buffer, ErrorMapping.NoError, true)

def this(compressionEnabled:Boolean, messages: Message*) {
def this(compressionEnabled: Boolean, messages: Message*) {
this()
if (compressionEnabled) {
val message = CompressionUtils.compress(messages)
Expand All @@ -68,7 +67,7 @@ class ByteBufferMessageSet protected () extends MessageSet {
}
}

def this(compressionEnabled:Boolean, messages: Iterable[Message]) {
def this(compressionEnabled: Boolean, messages: Iterable[Message]) {
this()
if (compressionEnabled) {
val message = CompressionUtils.compress(messages)
Expand All @@ -92,8 +91,14 @@ class ByteBufferMessageSet protected () extends MessageSet {
def disableDeepIteration() = {
deepIterate = false
}

def serialized():ByteBuffer = buffer

def getDeepIterate = deepIterate

def getBuffer = buffer

def getErrorCode = errorCode

def serialized(): ByteBuffer = buffer

def validBytes: Int = deepIterate match {
case true => deepValidBytes
Expand Down
39 changes: 19 additions & 20 deletions core/src/main/scala/kafka/message/CompressionUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -9,17 +9,19 @@ import java.nio.ByteBuffer
import org.apache.log4j.Logger

object CompressionUtils {
var DEFAULT_COMPRESSION_CODEC = 1;
var DefaultCompressionCodec = 1;
//0 is reserved to indicate no compression
val GZIP_COMPRESSION = 1;
val GzipCompression = 1;
private val logger = Logger.getLogger(getClass)
def compress(messages:Iterable[Message]):Message = compress(messages, DEFAULT_COMPRESSION_CODEC)

def compress(messages:Iterable[Message], compressionCodec:Int):Message = compressionCodec match {
case GZIP_COMPRESSION =>

def compress(messages: Iterable[Message]): Message = compress(messages, DefaultCompressionCodec)

def compress(messages: Iterable[Message], compressionCodec: Int):Message = compressionCodec match {
case GzipCompression =>
val outputStream:ByteArrayOutputStream = new ByteArrayOutputStream()
val gzipOutput:GZIPOutputStream = new GZIPOutputStream(outputStream)
logger.debug("Allocating message byte buffer of size = " + MessageSet.messageSetSize(messages))
if(logger.isDebugEnabled)
logger.debug("Allocating message byte buffer of size = " + MessageSet.messageSetSize(messages))
val messageByteBuffer = ByteBuffer.allocate(MessageSet.messageSetSize(messages))
for (message <- messages) {
message.serializeTo(messageByteBuffer)
Expand All @@ -30,33 +32,30 @@ object CompressionUtils {
outputStream.close();
val oneCompressedMessage:Message = new Message(outputStream.toByteArray,compressionCodec)
oneCompressedMessage
case _ =>
print("Unknown Codec: " + compressionCodec)
throw new Exception()
case _ =>
throw new kafka.common.UnknownCodecException("Unknown Codec: " + compressionCodec)
}
def decompress(message:Message):ByteBufferMessageSet = message.compressionCodec match {
case GZIP_COMPRESSION =>

def decompress(message: Message): ByteBufferMessageSet = message.compressionCodec match {
case GzipCompression =>
val outputStream:ByteArrayOutputStream = new ByteArrayOutputStream
val inputStream:InputStream = new ByteBufferBackedInputStream(message.payload)
val inputStream:InputStream = new ByteBufferBackedInputStream(message.payload)
val gzipIn:GZIPInputStream = new GZIPInputStream(inputStream)
val intermediateBuffer = new Array[Byte](1024)
var len=gzipIn.read(intermediateBuffer)
while (len >0) {
outputStream.write(intermediateBuffer,0,len)
len = gzipIn.read(intermediateBuffer)
len = gzipIn.read(intermediateBuffer)
}

gzipIn.close
outputStream.close
val outputBuffer = ByteBuffer.allocate(outputStream.size)
outputBuffer.put(outputStream.toByteArray)
outputBuffer.rewind
val outputByteArray = outputStream.toByteArray
new ByteBufferMessageSet(outputBuffer)
case _ =>
print("Unknown Codec: " + message.compressionCodec)
throw new Exception()
case _ =>
throw new kafka.common.UnknownCodecException("Unknown Codec: " + message.compressionCodec)
}

}
36 changes: 25 additions & 11 deletions core/src/main/scala/kafka/message/Message.scala
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ object Message {
* 0 for no compression
* 1 for compression
*/
def CrcOffset(magic:Byte): Int = magic match {
def CrcOffset(magic: Byte): Int = magic match {
case 0 => MagicOffset + MagicLength
case _ => AttributeOffset + AttributeLength
}
Expand All @@ -51,15 +51,15 @@ object Message {
* 0 for no compression
* 1 for compression
*/
def PayloadOffset(magic:Byte): Int = CrcOffset(magic) + CrcLength
def PayloadOffset(magic: Byte): Int = CrcOffset(magic) + CrcLength

/**
* Computes the size of the message header based on the magic byte
* @param magic Specifies the magic byte value. Possible values are 0 and 1
* 0 for no compression
* 1 for compression
*/
def HeaderSize(magic:Byte): Int = PayloadOffset(magic)
def HeaderSize(magic: Byte): Int = PayloadOffset(magic)

/**
* Size of the header for magic byte 0. This is the minimum size of any message header
Expand All @@ -70,26 +70,40 @@ object Message {
* Specifies the mask for the compression code. 2 bits to hold the compression codec.
* 0 is reserved to indicate no compression
*/
val CompressionCodeMask:Int = 0x03 //
val CompressionCodeMask: Int = 0x03 //


val NoCompression:Int = 0
}

/**
* A message. The format of an N byte message is the following:
* 1 byte "magic" identifier to allow format changes
* 1 byte "attributes" identifier to allow annotations on the message independent of the version (e.g. compression enabled, type of codec used)
* 4 byte CRC32 of the payload
* N - 6 byte payload
*
* If magic byte is 0
*
* 1. 1 byte "magic" identifier to allow format changes
*
* 2. 4 byte CRC32 of the payload
*
* 3. N - 5 byte payload
*
* If magic byte is 1
*
* 1. 1 byte "magic" identifier to allow format changes
*
* 2. 1 byte "attributes" identifier to allow annotations on the message independent of the version (e.g. compression enabled, type of codec used)
*
* 3. 4 byte CRC32 of the payload
*
* 4. N - 6 byte payload
*
*/
class Message(val buffer: ByteBuffer) {

import kafka.message.Message._


private def this(checksum: Long, bytes: Array[Byte], compressionCodec:Int) = {
private def this(checksum: Long, bytes: Array[Byte], compressionCodec: Int) = {
this(ByteBuffer.allocate(Message.HeaderSize(Message.CurrentMagicValue) + bytes.length))
buffer.put(CurrentMagicValue)
var attributes:Byte = 0
Expand All @@ -104,12 +118,12 @@ class Message(val buffer: ByteBuffer) {

def this(checksum:Long, bytes:Array[Byte]) = this(checksum, bytes, Message.NoCompression)

def this(bytes: Array[Byte], compressionCodec:Int) = {
def this(bytes: Array[Byte], compressionCodec: Int) = {
//Note: we're not crc-ing the attributes header, so we're susceptible to bit-flipping there
this(Utils.crc32(bytes), bytes, compressionCodec)
}

def this(bytes:Array[Byte]) = this(bytes, Message.NoCompression)
def this(bytes: Array[Byte]) = this(bytes, Message.NoCompression)

def size: Int = buffer.limit

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ class LazyInitProducerTest extends JUnit3Suite with ProducerConsumerTestHarness
val topic = "test"
val sent = new ByteBufferMessageSet(false, new Message("hello".getBytes()), new Message("there".getBytes()))
producer.send(topic, sent)
sent.buffer.rewind
sent.getBuffer.rewind
var fetched: ByteBufferMessageSet = null
while(fetched == null || fetched.validBytes == 0)
fetched = consumer.fetch(new FetchRequest(topic, 0, 0, 10000))
Expand Down Expand Up @@ -93,7 +93,7 @@ class LazyInitProducerTest extends JUnit3Suite with ProducerConsumerTestHarness
val set = new ByteBufferMessageSet(false, new Message(("a_" + topic).getBytes), new Message(("b_" + topic).getBytes))
messages += topic -> set
producer.send(topic, set)
set.buffer.rewind
set.getBuffer.rewind
fetches += new FetchRequest(topic, 0, 0, 10000)
}

Expand Down Expand Up @@ -139,7 +139,7 @@ class LazyInitProducerTest extends JUnit3Suite with ProducerConsumerTestHarness
producer.multiSend(produceList.toArray)

for (messageSet <- messages.values)
messageSet.buffer.rewind
messageSet.getBuffer.rewind

// wait a bit for produced message to be available
Thread.sleep(200)
Expand All @@ -166,7 +166,7 @@ class LazyInitProducerTest extends JUnit3Suite with ProducerConsumerTestHarness
producer.multiSend(produceList.toArray)

for (messageSet <- messages.values)
messageSet.buffer.rewind
messageSet.getBuffer.rewind

// wait a bit for produced message to be available
Thread.sleep(750)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ class PrimitiveApiTest extends JUnit3Suite with ProducerConsumerTestHarness with
val set = new ByteBufferMessageSet(false, new Message(("a_" + topic).getBytes), new Message(("b_" + topic).getBytes))
messages += topic -> set
producer.send(topic, set)
set.buffer.rewind
set.getBuffer.rewind
fetches += new FetchRequest(topic, 0, 0, 10000)
}

Expand Down Expand Up @@ -156,7 +156,7 @@ class PrimitiveApiTest extends JUnit3Suite with ProducerConsumerTestHarness with
set.enableDeepIteration
messages += topic -> set
producer.send(topic, set)
set.buffer.rewind
set.getBuffer.rewind
fetches += new FetchRequest(topic, 0, 0, 10000)
}

Expand Down Expand Up @@ -224,7 +224,7 @@ class PrimitiveApiTest extends JUnit3Suite with ProducerConsumerTestHarness with
producer.multiSend(produceList.toArray)

for (messageSet <- messages.values)
messageSet.buffer.rewind
messageSet.getBuffer.rewind

// wait a bit for produced message to be available
Thread.sleep(200)
Expand All @@ -249,7 +249,7 @@ class PrimitiveApiTest extends JUnit3Suite with ProducerConsumerTestHarness with
producer.multiSend(produceList.toArray)

for (messageSet <- messages.values)
messageSet.buffer.rewind
messageSet.getBuffer.rewind

// wait a bit for produced message to be available
Thread.sleep(200)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ class ServerShutdownTest extends JUnitSuite {

// send some messages
producer.send(topic, sent1)
sent1.buffer.rewind
sent1.getBuffer.rewind

Thread.sleep(200)
// do a clean shutdown
Expand Down Expand Up @@ -93,7 +93,7 @@ class ServerShutdownTest extends JUnitSuite {

// send some more messages
producer.send(topic, sent2)
sent2.buffer.rewind
sent2.getBuffer.rewind

Thread.sleep(200)

Expand Down

0 comments on commit b882ccc

Please sign in to comment.