Skip to content

Commit

Permalink
SAMZA-1530; Bump up Kafka dependency to 0.11
Browse files Browse the repository at this point in the history
Author: Dong Lin <[email protected]>

Reviewers: Xinyu Liu <[email protected]>

Closes apache#395 from lindong28/SAMZA-1530
  • Loading branch information
lindong28 authored and xiliu committed Jan 10, 2018
1 parent 07f28e9 commit a6540b4
Show file tree
Hide file tree
Showing 16 changed files with 86 additions and 72 deletions.
2 changes: 1 addition & 1 deletion bin/check-all.sh
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@

set -e

SCALAs=( "2.10" "2.11" "2.12" )
SCALAs=( "2.11" "2.12" )
JDKs=( "JAVA8_HOME" )
YARNs=( "2.6.1" "2.7.1" )

Expand Down
2 changes: 1 addition & 1 deletion gradle/dependency-versions.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
jodaTimeVersion = "2.2"
joptSimpleVersion = "3.2"
junitVersion = "4.8.1"
kafkaVersion = "0.10.1.1"
kafkaVersion = "0.11.0.2"
log4jVersion = "1.2.17"
metricsVersion = "2.2.0"
mockitoVersion = "1.10.19"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,7 @@ class BrokerProxy(
firstCallBarrier.countDown()

// Split response into errors and non errors, processing the errors first
val (nonErrorResponses, errorResponses) = response.data.toSet.partition(_._2.error == ErrorMapping.NoError)
val (nonErrorResponses, errorResponses) = response.data.toSet.partition(_._2.error.code() == ErrorMapping.NoError)

handleErrors(errorResponses, response)

Expand Down Expand Up @@ -219,18 +219,17 @@ class BrokerProxy(
immutableNextOffsetsCopy.keySet.foreach(abdicate(_))
}

def handleErrors(errorResponses: Set[(TopicAndPartition, FetchResponsePartitionData)], response:FetchResponse) = {
def handleErrors(errorResponses: Set[(TopicAndPartition, FetchResponsePartitionData)], response: FetchResponse) = {
// FetchResponse should really return Option and a list of the errors so we don't have to find them ourselves
case class Error(tp: TopicAndPartition, code: Short, exception: Throwable)
case class Error(tp: TopicAndPartition, code: Short, exception: Exception)

// Now subdivide the errors into three types: non-recoverable, not leader (== abdicate) and offset out of range (== get new offset)

// Convert FetchResponse into easier-to-work-with Errors
val errors = for (
(topicAndPartition, responseData) <- errorResponses;
errorCode <- Option(response.errorCode(topicAndPartition.topic, topicAndPartition.partition)); // Scala's being cranky about referring to error.getKey values...
exception <- Option(ErrorMapping.exceptionFor(errorCode))
) yield new Error(topicAndPartition, errorCode, exception)
error <- Option(response.error(topicAndPartition.topic, topicAndPartition.partition)) // Scala's being cranky about referring to error.getKey values...
) yield new Error(topicAndPartition, error.code(), error.exception())

val (notLeaderOrUnknownTopic, otherErrors) = errors.partition { case (e) => e.code == ErrorMapping.NotLeaderForPartitionCode || e.code == ErrorMapping.UnknownTopicOrPartitionCode }
val (offsetOutOfRangeErrors, remainingErrors) = otherErrors.partition(_.code == ErrorMapping.OffsetOutOfRangeCode)
Expand All @@ -241,7 +240,7 @@ class BrokerProxy(
// handle the recoverable errors.
remainingErrors.foreach(e => {
warn("Got non-recoverable error codes during multifetch. Throwing an exception to trigger reconnect. Errors: %s" format remainingErrors.mkString(","))
KafkaUtil.maybeThrowException(e.code) })
KafkaUtil.maybeThrowException(e.exception) })

notLeaderOrUnknownTopic.foreach(e => abdicate(e.tp))

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ class GetOffset(
val messages = consumer.defaultFetch((topicAndPartition, offset.toLong))

if (messages.hasError) {
KafkaUtil.maybeThrowException(messages.errorCode(topicAndPartition.topic, topicAndPartition.partition))
KafkaUtil.maybeThrowException(messages.error(topicAndPartition.topic, topicAndPartition.partition).exception())
}

info("Able to successfully read from offset %s for topic and partition %s. Using it to instantiate consumer." format (offset, topicAndPartition))
Expand All @@ -86,7 +86,7 @@ class GetOffset(
.get(topicAndPartition)
.getOrElse(toss("Unable to find offset information for %s" format topicAndPartition))

KafkaUtil.maybeThrowException(partitionOffsetResponse.error)
KafkaUtil.maybeThrowException(partitionOffsetResponse.error.exception())

partitionOffsetResponse
.offsets
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ class KafkaSystemAdmin(
metadataTTL)
val result = metadata.map {
case (topic, topicMetadata) => {
KafkaUtil.maybeThrowException(topicMetadata.errorCode)
KafkaUtil.maybeThrowException(topicMetadata.error.exception())
val partitionsMap = topicMetadata.partitionsMetadata.map {
pm =>
new Partition(pm.partitionId) -> new SystemStreamPartitionMetadata("", "", "")
Expand Down Expand Up @@ -350,7 +350,7 @@ class KafkaSystemAdmin(
.values
// Convert the topic metadata to a Seq[(Broker, TopicAndPartition)]
.flatMap(topicMetadata => {
KafkaUtil.maybeThrowException(topicMetadata.errorCode)
KafkaUtil.maybeThrowException(topicMetadata.error.exception())
topicMetadata
.partitionsMetadata
// Convert Seq[PartitionMetadata] to Seq[(Broker, TopicAndPartition)]
Expand Down Expand Up @@ -390,7 +390,7 @@ class KafkaSystemAdmin(
.getOffsetsBefore(new OffsetRequest(partitionOffsetInfo))
.partitionErrorAndOffsets
.mapValues(partitionErrorAndOffset => {
KafkaUtil.maybeThrowException(partitionErrorAndOffset.error)
KafkaUtil.maybeThrowException(partitionErrorAndOffset.error.exception())
partitionErrorAndOffset.offsets.head
})

Expand Down Expand Up @@ -480,7 +480,7 @@ class KafkaSystemAdmin(
val metadataStore = new ClientUtilTopicMetadataStore(brokerListString, clientId, timeout)
val topicMetadataMap = TopicMetadataCache.getTopicMetadata(Set(topicName), systemName, metadataStore.getTopicInfo, metadataTTL)
val topicMetadata = topicMetadataMap(topicName)
KafkaUtil.maybeThrowException(topicMetadata.errorCode)
KafkaUtil.maybeThrowException(topicMetadata.error.exception())

val partitionCount = topicMetadata.partitionsMetadata.length
if (partitionCount != spec.getPartitionCount) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,6 @@ object TopicMetadataCache extends Logging {
* partition's metadata has a bad errorCode.
*/
def hasBadErrorCode(streamMetadata: TopicMetadata) = {
KafkaUtil.isBadErrorCode(streamMetadata.errorCode) || streamMetadata.partitionsMetadata.exists(partitionMetadata => KafkaUtil.isBadErrorCode(partitionMetadata.errorCode))
KafkaUtil.isBadErrorCode(streamMetadata.error.code()) || streamMetadata.partitionsMetadata.exists(partitionMetadata => KafkaUtil.isBadErrorCode(partitionMetadata.error.code()))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,8 @@ import org.apache.samza.config.{ApplicationConfig, Config, ConfigException}
import org.apache.samza.config.JobConfig.Config2Job
import org.apache.samza.execution.StreamManager
import org.apache.samza.system.OutgoingMessageEnvelope
import kafka.common.{ErrorMapping, ReplicaNotAvailableException}
import org.apache.kafka.common.errors.ReplicaNotAvailableException
import kafka.common.ErrorMapping
import org.apache.kafka.common.errors.TopicExistsException
import org.apache.samza.system.kafka.TopicMetadataCache

Expand Down Expand Up @@ -71,9 +72,10 @@ object KafkaUtil extends Logging {
* <a href="https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol">protocol
* docs</a>, ReplicaNotAvailableException can be safely ignored.
*/
def maybeThrowException(code: Short) {
def maybeThrowException(e: Exception) {
try {
ErrorMapping.maybeThrowException(code)
if (e != null)
throw e
} catch {
case e: ReplicaNotAvailableException =>
debug("Got ReplicaNotAvailableException, but ignoring since it's safe to do so.")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReferenceArray;
import kafka.producer.ProducerClosedException;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
Expand All @@ -42,6 +43,7 @@
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.record.Record;
import org.apache.kafka.common.record.RecordBatch;
import org.apache.kafka.test.TestUtils;

public class MockKafkaProducer implements Producer<byte[], byte[]> {
Expand Down Expand Up @@ -98,7 +100,7 @@ public Future<RecordMetadata> send(ProducerRecord record) {
}

private RecordMetadata getRecordMetadata(ProducerRecord record) {
return new RecordMetadata(new TopicPartition(record.topic(), record.partition() == null ? 0 : record.partition()), 0, this.msgsSent.get(), Record.NO_TIMESTAMP, -1, -1, -1);
return new RecordMetadata(new TopicPartition(record.topic(), record.partition() == null ? 0 : record.partition()), 0, this.msgsSent.get(), -1L, -1, -1, -1);
}

@Override
Expand Down Expand Up @@ -190,6 +192,25 @@ public synchronized void flush () {
new FlushRunnable(0).run();
}

public void initTransactions() {

}

public void abortTransaction() {

}

public void beginTransaction() {

}

public void commitTransaction() {

}

public void sendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata> offsets, String consumerGroupId) {

}

private static class FutureFailure implements Future<RecordMetadata> {

Expand Down Expand Up @@ -232,7 +253,7 @@ private static class FutureSuccess implements Future<RecordMetadata> {

public FutureSuccess(ProducerRecord record, int offset) {
this.record = record;
this._metadata = new RecordMetadata(new TopicPartition(record.topic(), record.partition() == null ? 0 : record.partition()), 0, offset, Record.NO_TIMESTAMP, -1, -1, -1);
this._metadata = new RecordMetadata(new TopicPartition(record.topic(), record.partition() == null ? 0 : record.partition()), 0, offset, RecordBatch.NO_TIMESTAMP, -1, -1, -1);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,15 +52,12 @@ class TestKafkaCheckpointManager extends KafkaServerTestHarness {
val checkpoint1 = new Checkpoint(ImmutableMap.of(ssp, "offset-1"))
val checkpoint2 = new Checkpoint(ImmutableMap.of(ssp, "offset-2"))
val taskName = new TaskName("Partition 0")

var brokers: String = null
var config: Config = null

@Before
override def setUp {
super.setUp
TestUtils.waitUntilTrue(() => servers.head.metadataCache.getAliveBrokers.size == numBrokers, "Wait for cache to update")
brokers = brokerList.split(",").map(p => "localhost" + p).mkString(",")
config = getConfig()
}

Expand Down Expand Up @@ -140,7 +137,7 @@ class TestKafkaCheckpointManager extends KafkaServerTestHarness {
val defaultSerializer = classOf[ByteArraySerializer].getCanonicalName
val props = new Properties()
props.putAll(ImmutableMap.of(
ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers,
ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList,
ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, defaultSerializer,
ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, defaultSerializer))
props
Expand All @@ -151,7 +148,7 @@ class TestKafkaCheckpointManager extends KafkaServerTestHarness {
.put(JobConfig.JOB_NAME, "some-job-name")
.put(JobConfig.JOB_ID, "i001")
.put(s"systems.$checkpointSystemName.samza.factory", classOf[KafkaSystemFactory].getCanonicalName)
.put(s"systems.$checkpointSystemName.producer.bootstrap.servers", brokers)
.put(s"systems.$checkpointSystemName.producer.bootstrap.servers", brokerList)
.put(s"systems.$checkpointSystemName.consumer.zookeeper.connect", zkConnect)
.put("task.checkpoint.system", checkpointSystemName)
.build())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,10 @@ import java.nio.ByteBuffer
import java.util.concurrent.CountDownLatch

import kafka.api.{PartitionOffsetsResponse, _}
import kafka.common.{ErrorMapping, TopicAndPartition}
import kafka.common.TopicAndPartition
import kafka.consumer.SimpleConsumer
import kafka.message.{ByteBufferMessageSet, Message, MessageAndOffset, MessageSet}
import org.apache.kafka.common.protocol.Errors
import org.apache.samza.SamzaException
import org.apache.samza.util.Logging
import org.junit.Assert._
Expand Down Expand Up @@ -165,7 +166,7 @@ class TestBrokerProxy extends Logging {
messageSet
}

val fetchResponsePartitionData = FetchResponsePartitionData(0, 500, messageSet)
val fetchResponsePartitionData = FetchResponsePartitionData(Errors.NONE, 500, messageSet)
val map = scala.Predef.Map[TopicAndPartition, FetchResponsePartitionData](tp -> fetchResponsePartitionData)

when(fetchResponse.data).thenReturn(map.toSeq)
Expand Down Expand Up @@ -257,12 +258,12 @@ class TestBrokerProxy extends Logging {
}
val mfr = mock(classOf[FetchResponse])
when(mfr.hasError).thenReturn(true)
when(mfr.errorCode("topic", 42)).thenReturn(ErrorMapping.OffsetOutOfRangeCode)
when(mfr.error("topic", 42)).thenReturn(Errors.OFFSET_OUT_OF_RANGE)

val messageSet = mock(classOf[MessageSet])
when(messageSet.iterator).thenReturn(Iterator.empty)
val response = mock(classOf[FetchResponsePartitionData])
when(response.error).thenReturn(ErrorMapping.OffsetOutOfRangeCode)
when(response.error).thenReturn(Errors.OFFSET_OUT_OF_RANGE)
val responseMap = Map(tp -> response)
when(mfr.data).thenReturn(responseMap.toSeq)
invocationCount += 1
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,8 @@ package org.apache.samza.system.kafka
import java.util.{Properties, UUID}

import kafka.admin.AdminUtils
import kafka.common.{ErrorMapping, LeaderNotAvailableException}
import org.apache.kafka.common.errors.LeaderNotAvailableException
import org.apache.kafka.common.protocol.Errors
import kafka.consumer.{Consumer, ConsumerConfig}
import kafka.integration.KafkaServerTestHarness
import kafka.server.KafkaConfig
Expand Down Expand Up @@ -68,19 +69,13 @@ object TestKafkaSystemAdmin extends KafkaServerTestHarness {
@BeforeClass
override def setUp {
super.setUp

val config = new java.util.HashMap[String, String]()

brokers = brokerList.split(",").map(p => "localhost" + p).mkString(",")

config.put("bootstrap.servers", brokers)
config.put("bootstrap.servers", brokerList)
config.put("acks", "all")
config.put("serializer.class", "kafka.serializer.StringEncoder")

producerConfig = new KafkaProducerConfig("kafka", "i001", config)

producer = new KafkaProducer[Array[Byte], Array[Byte]](producerConfig.getProducerProperties)
metadataStore = new ClientUtilTopicMetadataStore(brokers, "some-job-name")
metadataStore = new ClientUtilTopicMetadataStore(brokerList, "some-job-name")
}


Expand All @@ -107,9 +102,8 @@ object TestKafkaSystemAdmin extends KafkaServerTestHarness {
try {
val topicMetadataMap = TopicMetadataCache.getTopicMetadata(Set(topic), SYSTEM, metadataStore.getTopicInfo)
val topicMetadata = topicMetadataMap(topic)
val errorCode = topicMetadata.errorCode

KafkaUtil.maybeThrowException(errorCode)
KafkaUtil.maybeThrowException(topicMetadata.error.exception())

done = expectedPartitionCount == topicMetadata.partitionsMetadata.size
} catch {
Expand Down Expand Up @@ -137,11 +131,11 @@ object TestKafkaSystemAdmin extends KafkaServerTestHarness {
}

def createSystemAdmin: KafkaSystemAdmin = {
new KafkaSystemAdmin(SYSTEM, brokers, connectZk = () => ZkUtils(zkConnect, 6000, 6000, zkSecure))
new KafkaSystemAdmin(SYSTEM, brokerList, connectZk = () => ZkUtils(zkConnect, 6000, 6000, zkSecure))
}

def createSystemAdmin(coordinatorStreamProperties: Properties, coordinatorStreamReplicationFactor: Int, topicMetaInformation: Map[String, ChangelogInfo]): KafkaSystemAdmin = {
new KafkaSystemAdmin(SYSTEM, brokers, connectZk = () => ZkUtils(zkConnect, 6000, 6000, zkSecure), coordinatorStreamProperties, coordinatorStreamReplicationFactor, 10000, ConsumerConfig.SocketBufferSize, UUID.randomUUID.toString, topicMetaInformation, Map())
new KafkaSystemAdmin(SYSTEM, brokerList, connectZk = () => ZkUtils(zkConnect, 6000, 6000, zkSecure), coordinatorStreamProperties, coordinatorStreamReplicationFactor, 10000, ConsumerConfig.SocketBufferSize, UUID.randomUUID.toString, topicMetaInformation, Map())
}

}
Expand Down Expand Up @@ -281,7 +275,7 @@ class TestKafkaSystemAdmin {
@Test
def testShouldCreateCoordinatorStream {
val topic = "test-coordinator-stream"
val systemAdmin = new KafkaSystemAdmin(SYSTEM, brokers, () => ZkUtils(zkConnect, 6000, 6000, zkSecure), coordinatorStreamReplicationFactor = 3)
val systemAdmin = new KafkaSystemAdmin(SYSTEM, brokerList, () => ZkUtils(zkConnect, 6000, 6000, zkSecure), coordinatorStreamReplicationFactor = 3)

val spec = StreamSpec.createCoordinatorStreamSpec(topic, "kafka")
systemAdmin.createStream(spec)
Expand All @@ -294,14 +288,14 @@ class TestKafkaSystemAdmin {
assertEquals(3, partitionMetadata.replicas.size)
}

class KafkaSystemAdminWithTopicMetadataError extends KafkaSystemAdmin(SYSTEM, brokers, () => ZkUtils(zkConnect, 6000, 6000, zkSecure)) {
class KafkaSystemAdminWithTopicMetadataError extends KafkaSystemAdmin(SYSTEM, brokerList, () => ZkUtils(zkConnect, 6000, 6000, zkSecure)) {
import kafka.api.TopicMetadata
var metadataCallCount = 0

// Simulate Kafka telling us that the leader for the topic is not available
override def getTopicMetadata(topics: Set[String]) = {
metadataCallCount += 1
val topicMetadata = TopicMetadata(topic = "quux", partitionsMetadata = Seq(), errorCode = ErrorMapping.LeaderNotAvailableCode)
val topicMetadata = TopicMetadata(topic = "quux", partitionsMetadata = Seq(), error = Errors.LEADER_NOT_AVAILABLE)
Map("quux" -> topicMetadata)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import kafka.cluster.Broker
import kafka.common.TopicAndPartition
import kafka.message.Message
import kafka.message.MessageAndOffset

import org.apache.kafka.common.protocol.Errors
import org.apache.samza.system.IncomingMessageEnvelope
import org.apache.samza.system.SystemStreamPartition
import org.apache.samza.Partition
Expand Down Expand Up @@ -68,7 +68,7 @@ class TestKafkaSystemConsumer {
// Lie and tell the store that the partition metadata is empty. We can't
// use partition metadata because it has Broker in its constructor, which
// is package private to Kafka.
val metadataStore = new MockMetadataStore(Map(streamName -> TopicMetadata(streamName, Seq.empty, 0)))
val metadataStore = new MockMetadataStore(Map(streamName -> TopicMetadata(streamName, Seq.empty, Errors.NONE)))
var hosts = List[String]()
var getHostPortCount = 0
val consumer = new KafkaSystemConsumer(systemName, systemAdmin, metrics, metadataStore, clientId) {
Expand Down
Loading

0 comments on commit a6540b4

Please sign in to comment.