Skip to content

Commit

Permalink
SAMZA-21: Change KafkaSystemConsumer and BrokerProxy consumer defaults.
Browse files Browse the repository at this point in the history
  • Loading branch information
jghoman committed Jan 8, 2014
1 parent ba8ed30 commit 1df6792
Show file tree
Hide file tree
Showing 5 changed files with 30 additions and 41 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import grizzled.slf4j.Logging
import java.nio.channels.ClosedByInterruptException
import java.util.Map.Entry
import scala.collection.mutable
import kafka.consumer.ConsumerConfig

/**
* A BrokerProxy consolidates Kafka fetches meant for a particular broker and retrieves them all at once, providing
Expand All @@ -41,8 +42,11 @@ abstract class BrokerProxy(
val system: String,
val clientID: String,
val metrics: KafkaSystemConsumerMetrics,
val timeout: Int = Int.MaxValue,
val bufferSize: Int = 1024000,
val timeout: Int = ConsumerConfig.SocketTimeout,
val bufferSize: Int = ConsumerConfig.SocketBufferSize,
val fetchSize:Int = ConsumerConfig.FetchSize,
val consumerMinSize:Int = ConsumerConfig.MinFetchBytes,
val consumerMaxWait:Int = ConsumerConfig.MaxFetchWaitMs,
offsetGetter: GetOffset = new GetOffset("fail")) extends Toss with Logging {

val messageSink: MessageSink
Expand Down Expand Up @@ -70,10 +74,7 @@ abstract class BrokerProxy(
val hostString = "%s:%d" format (host, port)
info("Creating new SimpleConsumer for host %s for system %s" format (hostString, system))

val sc = new DefaultFetchSimpleConsumer(host, port, timeout, bufferSize, clientID) {
val fetchSize: Int = 256 * 1024
}

val sc = new DefaultFetchSimpleConsumer(host, port, timeout, bufferSize, clientID, fetchSize, consumerMinSize, consumerMaxWait)
sc
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,17 +24,15 @@ package org.apache.samza.system.kafka
import kafka.consumer.SimpleConsumer
import kafka.api._
import kafka.common.TopicAndPartition
import kafka.common.TopicAndPartition
import kafka.consumer.ConsumerConfig

abstract class DefaultFetchSimpleConsumer(host: scala.Predef.String, port: scala.Int, soTimeout: scala.Int, bufferSize: scala.Int, clientId: scala.Predef.String)
class DefaultFetchSimpleConsumer(host: scala.Predef.String, port: scala.Int, soTimeout: scala.Int, bufferSize: scala.Int,
clientId: scala.Predef.String, fetchSize: Int = ConsumerConfig.FetchSize,
minBytes:Int = ConsumerConfig.MinFetchBytes, maxWait:Int = ConsumerConfig.MaxFetchWaitMs)
extends SimpleConsumer(host, port, soTimeout, bufferSize, clientId) {

val maxWait:Int = Int.MaxValue
val minBytes:Int = 1
val fetchSize:Int

def defaultFetch(fetches:(TopicAndPartition, Long)*) = {
val fbr = new FetchRequestBuilder().maxWait(1000)
val fbr = new FetchRequestBuilder().maxWait(maxWait)
.minBytes(minBytes)
.clientId(clientId)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import kafka.serializer.Decoder
import org.apache.samza.util.BlockingEnvelopeMap
import org.apache.samza.system.SystemStreamPartition
import org.apache.samza.system.IncomingMessageEnvelope
import kafka.consumer.ConsumerConfig

object KafkaSystemConsumer {
def toTopicAndPartition(systemStreamPartition: SystemStreamPartition) = {
Expand All @@ -50,8 +51,11 @@ private[kafka] class KafkaSystemConsumer(
brokerListString: String,
metrics: KafkaSystemConsumerMetrics,
clientId: String = "undefined-client-id-%s" format UUID.randomUUID.toString,
timeout: Int = Int.MaxValue,
bufferSize: Int = 1024000,
timeout: Int = ConsumerConfig.ConsumerTimeoutMs,
bufferSize: Int = ConsumerConfig.SocketBufferSize,
fetchSize:Int = ConsumerConfig.MaxFetchSize,
consumerMinSize:Int = ConsumerConfig.MinFetchBytes,
consumerMaxWait:Int = ConsumerConfig.MaxFetchWaitMs,
brokerMetadataFailureRefreshMs: Long = 10000,
fetchThreshold: Int = 0,
offsetGetter: GetOffset = new GetOffset("fail"),
Expand Down Expand Up @@ -114,9 +118,10 @@ private[kafka] class KafkaSystemConsumer(

brokerOption match {
case Some(broker) =>
val brokerProxy = brokerProxies.getOrElseUpdate((broker.host, broker.port), new BrokerProxy(broker.host, broker.port, systemName, clientId, metrics, timeout, bufferSize, offsetGetter) {
val brokerProxy = new BrokerProxy(broker.host, broker.port, systemName, clientId, metrics, timeout, bufferSize, fetchSize, consumerMinSize, consumerMaxWait, offsetGetter) {
val messageSink: MessageSink = sink
})
}
brokerProxies.getOrElseUpdate((broker.host, broker.port), brokerProxy)

brokerProxy.addTopicPartition(head, Option(lastOffset))
case None => warn("No such topic-partition: %s, dropping." format head)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,20 +22,9 @@ package org.apache.samza.system.kafka
import org.apache.samza.util.KafkaUtil
import org.apache.samza.config.Config
import org.apache.samza.metrics.MetricsRegistry
import org.apache.samza.config.KafkaConfig
import org.apache.samza.config.KafkaConfig.Config2Kafka
import org.apache.samza.config.StreamConfig.Config2Stream
import org.apache.samza.config.SystemConfig.Config2System
import org.apache.samza.util.ClientUtilTopicMetadataStore
import org.apache.samza.SamzaException
import scala.collection.JavaConversions._
import java.util.Properties
import kafka.producer.Producer
import kafka.producer.async.DefaultEventHandler
import kafka.utils.Utils
import org.apache.samza.util.Util
import kafka.serializer.Decoder
import kafka.serializer.DefaultDecoder
import org.apache.samza.system.SystemFactory

class KafkaSystemFactory extends SystemFactory {
Expand All @@ -52,26 +41,24 @@ class KafkaSystemFactory extends SystemFactory {
// TODO could add stream-level overrides for timeout and buffer size
val timeout = consumerConfig.socketTimeoutMs
val bufferSize = consumerConfig.socketReceiveBufferBytes
val fetchSize = consumerConfig.fetchMessageMaxBytes
val consumerMinSize = consumerConfig.fetchMinBytes
val consumerMaxWait = consumerConfig.fetchWaitMaxMs
val autoOffsetResetDefault = consumerConfig.autoOffsetReset
val autoOffsetResetTopics = config.getAutoOffsetResetTopics(systemName)
val fetchThreshold = config.getConsumerFetchThreshold(systemName).getOrElse("0").toInt
val offsetGetter = new GetOffset(autoOffsetResetDefault, autoOffsetResetTopics)
val deserializer = config.getConsumerMsgDeserializerClass(systemName) match {
case Some(deserializerClass) => Util.getObj[Decoder[Object]](deserializerClass)
case _ => new DefaultDecoder().asInstanceOf[Decoder[Object]]
}
val keyDeserializer = config.getConsumerKeyDeserializerClass(systemName) match {
case Some(deserializerClass) => Util.getObj[Decoder[Object]](deserializerClass)
case _ => new DefaultDecoder().asInstanceOf[Decoder[Object]]
}


new KafkaSystemConsumer(
systemName = systemName,
brokerListString = brokerListString,
metrics = metrics,
clientId = clientId,
timeout = timeout,
bufferSize = bufferSize,
fetchSize = fetchSize,
consumerMinSize = consumerMinSize,
consumerMaxWait = consumerMaxWait,
fetchThreshold = fetchThreshold,
offsetGetter = offsetGetter)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,9 +87,7 @@ class TestBrokerProxy extends Logging {
}
alreadyCreatedConsumer = true

new DefaultFetchSimpleConsumer("a", 1, 2, 3, "b") {
val fetchSize: Int = 42

new DefaultFetchSimpleConsumer("a", 1, 2, 3, "b", 42) {
val sc = Mockito.mock(classOf[SimpleConsumer])
val mockOffsetResponse = {
val offsetResponse = Mockito.mock(classOf[OffsetResponse])
Expand Down Expand Up @@ -243,7 +241,7 @@ class TestBrokerProxy extends Logging {

// So now we have a fetch response that will fail. Prime the mockGetOffset to send us to a new offset

val bp = new BrokerProxy("host", 423, "system", "clientID", doNothingMetrics, Int.MaxValue, 1024000, mockOffsetGetter) {
val bp = new BrokerProxy("host", 423, "system", "clientID", doNothingMetrics, Int.MaxValue, 1024000, 256 * 1024, 524288, 1000, mockOffsetGetter) {
val messageSink: MessageSink = mockMessageSink

override def createSimpleConsumer() = {
Expand Down

0 comments on commit 1df6792

Please sign in to comment.