Skip to content

Commit

Permalink
KAFKA-39
Browse files Browse the repository at this point in the history
  • Loading branch information
Neha Narkhede committed May 3, 2011
1 parent 7ebbca4 commit eab4159
Show file tree
Hide file tree
Showing 2 changed files with 24 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
*/
package kafka.javaapi.consumer

import kafka.consumer.{KafkaMessageStream, ConsumerConfig, ZookeeperConsumerConnectorMBean}
import kafka.consumer.{KafkaMessageStream, ConsumerConfig}

/**
* This class handles the consumers interaction with zookeeper
Expand Down Expand Up @@ -51,7 +51,18 @@ import kafka.consumer.{KafkaMessageStream, ConsumerConfig, ZookeeperConsumerConn
* /consumers/[group_id]/offsets/[topic]/[broker_id-partition_id] --> offset_counter_value
* Each consumer tracks the offset of the latest message consumed for each partition.
*
*/

/**
* JMX interface for monitoring consumer
*/
trait ZookeeperConsumerConnectorMBean {
def getPartOwnerStats: String
def getConsumerGroup: String
def getOffsetLag(topic: String, brokerId: Int, partitionId: Int): Long
def getConsumedOffset(topic: String, brokerId: Int, partitionId: Int): Long
def getLatestOffset(topic: String, brokerId: Int, partitionId: Int): Long
}

private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
val enableFetcher: Boolean) // for testing only
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,10 @@ import kafka.utils.Utils
import kafka.{TestZKUtils, TestUtils}
import org.scalatest.junit.JUnitSuite
import org.junit.{After, Before, Test}
import kafka.consumer.{ConsumerConfig, KafkaMessageStream, ConsumerTimeoutException}
import scala.collection.JavaConversions._
import kafka.javaapi.message.ByteBufferMessageSet
import kafka.consumer.{Consumer, ConsumerConfig, KafkaMessageStream, ConsumerTimeoutException}
import javax.management.NotCompliantMBeanException

class ZookeeperConsumerConnectorTest extends JUnitSuite with KafkaServerTestHarness with ZooKeeperTestHarness {
private val logger = Logger.getLogger(getClass())
Expand Down Expand Up @@ -154,6 +155,16 @@ class ZookeeperConsumerConnectorTest extends JUnitSuite with KafkaServerTestHarn
messages.sortWith((s,t) => s.checksum < t.checksum)
}

@Test
def testJMX() {
val consumerConfig = new ConsumerConfig(TestUtils.createConsumerProperties(zkConnect, group, consumer0))
try {
val consumer = Consumer.createJavaConsumerConnector(consumerConfig)
}catch {
case e: NotCompliantMBeanException => fail("Should not fail with NotCompliantMBeanException")
}
}

private def getMessageList(messages: Message*): java.util.List[Message] = {
val messageList = new java.util.ArrayList[Message]()
messages.foreach(m => messageList.add(m))
Expand Down

0 comments on commit eab4159

Please sign in to comment.