Skip to content

Commit

Permalink
Enforcing the compressed.topics config option
Browse files Browse the repository at this point in the history
  • Loading branch information
Neha Narkhede committed Jul 19, 2011
1 parent 849d3a8 commit cb83183
Show file tree
Hide file tree
Showing 8 changed files with 49 additions and 17 deletions.
2 changes: 1 addition & 1 deletion core/src/main/scala/kafka/producer/ProducerPool.scala
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ class ProducerPool[V](private val config: ProducerConfig,
private val logger = Logger.getLogger(classOf[ProducerPool[V]])
private var eventHandler = inputEventHandler
if(eventHandler == null)
eventHandler = new DefaultEventHandler(config.compressionCodec, cbkHandler)
eventHandler = new DefaultEventHandler(config, cbkHandler)

if(serializer == null)
throw new InvalidConfigException("serializer passed in is null!")
Expand Down
14 changes: 11 additions & 3 deletions core/src/main/scala/kafka/producer/SyncProducerConfig.scala
Original file line number Diff line number Diff line change
Expand Up @@ -43,13 +43,21 @@ trait SyncProducerConfigShared {
val maxMessageSize = Utils.getInt(props, "max.message.size", 1000000)

/**
* This parameter allows you to enable compression for all data generated *
* by this producer. The default is false.
* This parameter allows you to specify the compression codec for all data generated *
* by this producer. The default is NoCompressionCodec
*/
val compressionCodec = Utils.getCompressionCodec(props, "compression.codec")

/** This parameter allows you to set whether compression should be turned *
* on for particular topics
*
* If the compression codec is anything other than NoCompressionCodec,
*
* Enable compression only for specified topics if any
*
* If the list of compressed topics is empty, then enable the specified compression codec for all topics
*
* If the compression codec is NoCompressionCodec, compression is disabled for all topics
*/
val compressedTopics = Utils.getString(props, "compressed.topics", null)
val compressedTopics = Utils.getCSVList(Utils.getString(props, "compressed.topics", null))
}
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ private[kafka] class AsyncProducer[T](config: AsyncProducerConfig,
cbkHandler.init(cbkHandlerProps)
private val sendThread = new ProducerSendThread("ProducerSendThread-" + AsyncProducer.Random.nextInt, queue,
serializer, producer,
if(eventHandler != null) eventHandler else new DefaultEventHandler[T](config.compressionCodec, cbkHandler),
if(eventHandler != null) eventHandler else new DefaultEventHandler[T](config, cbkHandler),
cbkHandler, config.queueTime, config.batchSize, AsyncProducer.Shutdown)
sendThread.setDaemon(false)

Expand Down
21 changes: 17 additions & 4 deletions core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,10 @@ import org.apache.log4j.Logger
import kafka.api.ProducerRequest
import kafka.serializer.Encoder
import java.util.Properties
import kafka.producer.{SyncProducerConfig, ProducerConfig, SyncProducer}
import kafka.message.{CompressionCodec, ByteBufferMessageSet}
import kafka.message.{NoCompressionCodec, ByteBufferMessageSet}
import kafka.producer.{SyncProducerConfigShared, SyncProducerConfig, SyncProducer}

private[kafka] class DefaultEventHandler[T](val compression: CompressionCodec,
private[kafka] class DefaultEventHandler[T](val config: SyncProducerConfigShared,
val cbkHandler: CallbackHandler[T]) extends EventHandler[T] {

private val logger = Logger.getLogger(classOf[DefaultEventHandler[T]])
Expand All @@ -52,7 +52,20 @@ private[kafka] class DefaultEventHandler[T](val compression: CompressionCodec,
serializer: Encoder[T]): Map[(String, Int), ByteBufferMessageSet] = {
import scala.collection.JavaConversions._
val eventsPerTopicMap = eventsPerTopic.map(e => ((e._1._1, e._1._2) , e._2.map(l => serializer.toMessage(l))))
eventsPerTopicMap.map(e => ((e._1._1, e._1._2) , new ByteBufferMessageSet(compression, e._2: _*)))
val topicsAndPartitions = eventsPerTopic.map(e => e._1)
/** enforce the compressed.topics config here.
* If the compression codec is anything other than NoCompressionCodec,
* Enable compression only for specified topics if any
* If the list of compressed topics is empty, then enable the specified compression codec for all topics
* If the compression codec is NoCompressionCodec, compression is disabled for all topics
*/
val messages = eventsPerTopicMap.map(e => {
if(config.compressedTopics.contains(e._1._1))
new ByteBufferMessageSet(config.compressionCodec, e._2: _*)
else
new ByteBufferMessageSet(NoCompressionCodec, e._2: _*)
})
topicsAndPartitions.zip(messages)
}

private def collate(events: Seq[QueueItem[T]]): Map[(String,Int), Seq[T]] = {
Expand Down
10 changes: 6 additions & 4 deletions core/src/main/scala/kafka/tools/ProducerPerformance.scala
Original file line number Diff line number Diff line change
Expand Up @@ -156,8 +156,9 @@ object ProducerPerformance {
props.put("reconnect.interval", Integer.MAX_VALUE.toString)
props.put("buffer.size", (64*1024).toString)

val producer = new Producer[String, String](new ProducerConfig(props), new StringEncoder,
new DefaultEventHandler(config.compressionCodec, null), null, new DefaultPartitioner)
val producerConfig = new ProducerConfig(props)
val producer = new Producer[String, String](producerConfig, new StringEncoder,
new DefaultEventHandler(producerConfig, null), null, new DefaultPartitioner)

override def run {
var bytesSent = 0L
Expand Down Expand Up @@ -219,8 +220,9 @@ object ProducerPerformance {
props.put("reconnect.interval", Integer.MAX_VALUE.toString)
props.put("buffer.size", (64*1024).toString)

val producer = new Producer[String, String](new ProducerConfig(props), new StringEncoder,
new DefaultEventHandler(config.compressionCodec, null), null, new DefaultPartitioner)
val producerConfig = new ProducerConfig(props)
val producer = new Producer[String, String](producerConfig, new StringEncoder,
new DefaultEventHandler(producerConfig, null), null, new DefaultPartitioner)

override def run {
var bytesSent = 0L
Expand Down
5 changes: 3 additions & 2 deletions core/src/main/scala/kafka/tools/ReplayLogProducer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -160,8 +160,9 @@ object ReplayLogProducer {
if(config.isAsync)
props.put("producer.type", "async")

val producer = new Producer[Message, Message](new ProducerConfig(props), new DefaultEncoder,
new DefaultEventHandler(config.compressionCodec, null),
val producerConfig = new ProducerConfig(props)
val producer = new Producer[Message, Message](producerConfig, new DefaultEncoder,
new DefaultEventHandler(producerConfig, null),
null, new DefaultPartitioner)

override def run() {
Expand Down
8 changes: 8 additions & 0 deletions core/src/main/scala/kafka/utils/Utils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -533,6 +533,14 @@ object Utils {
map
}

def getCSVList(csvList: String): Seq[String] = {
if(csvList == null)
Seq.empty[String]
else {
csvList.split(",").filter(v => !v.equals(""))
}
}

def getTopicRentionHours(retentionHours: String) : Map[String, Int] = {
val exceptionMsg = "Malformed token for topic.log.retention.hours in server.properties: "
val successMsg = "The retention hour for "
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,8 @@ class CompressionUtilTest extends TestCase {

TestUtils.checkLength(decompressedMessages.iterator,2)

TestUtils.checkLength(decompressedMessages.elements,3)
TestUtils.checkLength(decompressedMessages.iterator,3)

TestUtils.checkEquals(messages.iterator, decompressedMessages.elements)
TestUtils.checkEquals(messages.iterator, decompressedMessages.iterator)
}
}

0 comments on commit cb83183

Please sign in to comment.