Skip to content

Commit

Permalink
KAFKA-3771; Improving Kafka core code
Browse files Browse the repository at this point in the history
- Used flatMap instead of map and flatten
- Use isEmpty, NonEmpty, isDefined as appropriate
- Used head, keys and keySet where appropriate
- Used contains, diff and find where appropriate
- Removed redundant val modifier for case class constructor
- toString has no parameters, no side effect hence without () consistent usage
- Removed unnecessary return , parentheses and semi colons.

Author: Joshi <[email protected]>
Author: Rekha Joshi <[email protected]>

Reviewers: Ismael Juma <[email protected]>

Closes apache#1451 from rekhajoshm/KAFKA-3771
  • Loading branch information
rekhajoshm authored and ijuma committed Jun 6, 2016
1 parent 2c7fae0 commit 79aaf19
Show file tree
Hide file tree
Showing 104 changed files with 401 additions and 392 deletions.
2 changes: 1 addition & 1 deletion core/src/main/scala/kafka/admin/TopicCommand.scala
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,7 @@ object TopicCommand extends Logging {
val leader = zkUtils.getLeaderForPartition(topic, partitionId)
if ((!reportUnderReplicatedPartitions && !reportUnavailablePartitions) ||
(reportUnderReplicatedPartitions && inSyncReplicas.size < assignedReplicas.size) ||
(reportUnavailablePartitions && (!leader.isDefined || !liveBrokers.contains(leader.get)))) {
(reportUnavailablePartitions && (leader.isEmpty || !liveBrokers.contains(leader.get)))) {
print("\tTopic: " + topic)
print("\tPartition: " + partitionId)
print("\tLeader: " + (if(leader.isDefined) leader.get else "none"))
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/kafka/api/ApiVersion.scala
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ sealed trait ApiVersion extends Ordered[ApiVersion] {
override def compare(that: ApiVersion): Int =
ApiVersion.orderingByVersion.compare(this, that)

override def toString(): String = version
override def toString: String = version
}

// Keep the IDs in order of versions
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ case class ControlledShutdownRequest(versionId: Short,
4 /* broker id */
}

override def toString(): String = {
override def toString: String = {
describe(true)
}

Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/kafka/api/FetchRequest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ case class FetchRequest(versionId: Short = FetchRequest.CurrentVersion,

def numPartitions = requestInfo.size

override def toString(): String = {
override def toString: String = {
describe(true)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ private[kafka] abstract class GenericRequestAndHeader(val versionId: Short,
body.sizeOf()
}

override def toString(): String = {
override def toString: String = {
describe(true)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ private[kafka] abstract class GenericResponseAndHeader(val correlationId: Int,
body.sizeOf()
}

override def toString(): String = {
override def toString: String = {
describe(true)
}

Expand Down
4 changes: 2 additions & 2 deletions core/src/main/scala/kafka/api/LeaderAndIsr.scala
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ object LeaderAndIsr {
case class LeaderAndIsr(var leader: Int, var leaderEpoch: Int, var isr: List[Int], var zkVersion: Int) {
def this(leader: Int, isr: List[Int]) = this(leader, LeaderAndIsr.initialLeaderEpoch, isr, LeaderAndIsr.initialZKVersion)

override def toString(): String = {
override def toString: String = {
Json.encode(Map("leader" -> leader, "leader_epoch" -> leaderEpoch, "isr" -> isr))
}
}
Expand Down Expand Up @@ -83,7 +83,7 @@ case class PartitionStateInfo(leaderIsrAndControllerEpoch: LeaderIsrAndControlle
size
}

override def toString(): String = {
override def toString: String = {
val partitionStateInfo = new StringBuilder
partitionStateInfo.append("(LeaderAndIsrInfo:" + leaderIsrAndControllerEpoch.toString)
partitionStateInfo.append(",ReplicationFactor:" + replicationFactor + ")")
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/kafka/api/OffsetRequest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ case class OffsetRequest(requestInfo: Map[TopicAndPartition, PartitionOffsetRequ
def isFromOrdinaryClient = replicaId == Request.OrdinaryConsumerId
def isFromDebuggingClient = replicaId == Request.DebuggingConsumerId

override def toString(): String = {
override def toString: String = {
describe(true)
}

Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/kafka/api/OffsetResponse.scala
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ object OffsetResponse {


case class PartitionOffsetsResponse(error: Short, offsets: Seq[Long]) {
override def toString(): String = {
override def toString: String = {
new String("error: " + Errors.forCode(error).exceptionName + " offsets: " + offsets.mkString)
}
}
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/kafka/api/ProducerRequest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ case class ProducerRequest(versionId: Short = ProducerRequest.CurrentVersion,

def numPartitions = data.size

override def toString(): String = {
override def toString: String = {
describe(true)
}

Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/kafka/api/RequestOrResponse.scala
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ object Request {
val DebuggingConsumerId: Int = -2

// Broker ids are non-negative int.
def isValidBrokerId(brokerId: Int): Boolean = (brokerId >= 0)
def isValidBrokerId(brokerId: Int): Boolean = brokerId >= 0
}


Expand Down
4 changes: 2 additions & 2 deletions core/src/main/scala/kafka/api/TopicMetadata.scala
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ case class TopicMetadata(topic: String, partitionsMetadata: Seq[PartitionMetadat
partitionsMetadata.foreach(m => m.writeTo(buffer))
}

override def toString(): String = {
override def toString: String = {
val topicMetadataInfo = new StringBuilder
topicMetadataInfo.append("{TopicMetadata for topic %s -> ".format(topic))
Errors.forCode(errorCode) match {
Expand Down Expand Up @@ -138,7 +138,7 @@ case class PartitionMetadata(partitionId: Int,
isr.foreach(r => buffer.putInt(r.id))
}

override def toString(): String = {
override def toString: String = {
val partitionMetadataString = new StringBuilder
partitionMetadataString.append("\tpartition " + partitionId)
partitionMetadataString.append("\tleader: " + (if(leader.isDefined) leader.get.toString else "none"))
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/kafka/api/TopicMetadataRequest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ case class TopicMetadataRequest(versionId: Short,
topics.foldLeft(0)(_ + shortStringLength(_)) /* topics */
}

override def toString(): String = {
override def toString: String = {
describe(true)
}

Expand Down
4 changes: 2 additions & 2 deletions core/src/main/scala/kafka/client/ClientUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -142,11 +142,11 @@ object ClientUtils extends Logging{

var offsetManagerChannelOpt: Option[BlockingChannel] = None

while (!offsetManagerChannelOpt.isDefined) {
while (offsetManagerChannelOpt.isEmpty) {

var coordinatorOpt: Option[BrokerEndPoint] = None

while (!coordinatorOpt.isDefined) {
while (coordinatorOpt.isEmpty) {
try {
if (!queryChannel.isConnected)
queryChannel = channelToAnyBroker(zkUtils)
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/kafka/cluster/Cluster.scala
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,6 @@ private[kafka] class Cluster {

def size = brokers.size

override def toString(): String =
override def toString: String =
"Cluster(" + brokers.values.mkString(", ") + ")"
}
14 changes: 7 additions & 7 deletions core/src/main/scala/kafka/cluster/Partition.scala
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ class Partition(val topic: String,
private var controllerEpoch: Int = KafkaController.InitialControllerEpoch - 1
this.logIdent = "Partition [%s,%d] on broker %d: ".format(topic, partitionId, localBrokerId)

private def isReplicaLocal(replicaId: Int) : Boolean = (replicaId == localBrokerId)
private def isReplicaLocal(replicaId: Int) : Boolean = replicaId == localBrokerId
val tags = Map("topic" -> topic, "partition" -> partitionId.toString)

newGauge("UnderReplicated",
Expand Down Expand Up @@ -158,7 +158,7 @@ class Partition(val topic: String,
}

def getLeaderEpoch(): Int = {
return this.leaderEpoch
this.leaderEpoch
}

/**
Expand Down Expand Up @@ -381,9 +381,9 @@ class Partition(val topic: String,
leaderReplicaIfLocal() match {
case Some(leaderReplica) =>
val outOfSyncReplicas = getOutOfSyncReplicas(leaderReplica, replicaMaxLagTimeMs)
if(outOfSyncReplicas.size > 0) {
if(outOfSyncReplicas.nonEmpty) {
val newInSyncReplicas = inSyncReplicas -- outOfSyncReplicas
assert(newInSyncReplicas.size > 0)
assert(newInSyncReplicas.nonEmpty)
info("Shrinking ISR for partition [%s,%d] from %s to %s".format(topic, partitionId,
inSyncReplicas.map(_.brokerId).mkString(","), newInSyncReplicas.map(_.brokerId).mkString(",")))
// update ISR in zk and in cache
Expand Down Expand Up @@ -421,7 +421,7 @@ class Partition(val topic: String,
val candidateReplicas = inSyncReplicas - leaderReplica

val laggingReplicas = candidateReplicas.filter(r => (time.milliseconds - r.lastCaughtUpTimeMs) > maxLagMs)
if(laggingReplicas.size > 0)
if(laggingReplicas.nonEmpty)
debug("Lagging replicas for partition %s are %s".format(TopicAndPartition(topic, partitionId), laggingReplicas.map(_.brokerId).mkString(",")))

laggingReplicas
Expand Down Expand Up @@ -484,7 +484,7 @@ class Partition(val topic: String,
}

override def equals(that: Any): Boolean = {
if(!(that.isInstanceOf[Partition]))
if(!that.isInstanceOf[Partition])
return false
val other = that.asInstanceOf[Partition]
if(topic.equals(other.topic) && partitionId == other.partitionId)
Expand All @@ -496,7 +496,7 @@ class Partition(val topic: String,
31 + topic.hashCode() + 17*partitionId
}

override def toString(): String = {
override def toString: String = {
val partitionString = new StringBuilder
partitionString.append("Topic: " + topic)
partitionString.append("; Partition: " + partitionId)
Expand Down
4 changes: 2 additions & 2 deletions core/src/main/scala/kafka/cluster/Replica.scala
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ class Replica(val brokerId: Int,
}

override def equals(that: Any): Boolean = {
if(!(that.isInstanceOf[Replica]))
if(!that.isInstanceOf[Replica])
return false
val other = that.asInstanceOf[Replica]
if(topic.equals(other.topic) && brokerId == other.brokerId && partition.equals(other.partition))
Expand All @@ -111,7 +111,7 @@ class Replica(val brokerId: Int,
}


override def toString(): String = {
override def toString: String = {
val replicaString = new StringBuilder
replicaString.append("ReplicaId: " + brokerId)
replicaString.append("; Topic: " + topic)
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/kafka/common/AppInfo.scala
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ object AppInfo extends KafkaMetricsGroup {
newGauge("CommitID",
new Gauge[String] {
def value = {
AppInfoParser.getCommitId();
AppInfoParser.getCommitId()
}
})

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,9 @@ class ZkNodeChangeNotificationListener(private val zkUtils: ZkUtils,
if (changeId > lastExecutedChange) {
val changeZnode = seqNodeRoot + "/" + notification
val (data, stat) = zkUtils.readDataMaybeNull(changeZnode)
data map (notificationHandler.processNotification(_)) getOrElse (logger.warn(s"read null data from $changeZnode when processing notification $notification"))
data.map(notificationHandler.processNotification(_)).getOrElse {
logger.warn(s"read null data from $changeZnode when processing notification $notification")
}
}
lastExecutedChange = changeId
}
Expand All @@ -107,6 +109,7 @@ class ZkNodeChangeNotificationListener(private val zkUtils: ZkUtils,

/**
* Purges expired notifications.
*
* @param now
* @param notifications
*/
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/kafka/consumer/KafkaStream.scala
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ class KafkaStream[K,V](private val queue: BlockingQueue[FetchedDataChunk],
iter.clearCurrentChunk()
}

override def toString(): String = {
override def toString: String = {
"%s kafka stream".format(clientId)
}
}
2 changes: 1 addition & 1 deletion core/src/main/scala/kafka/consumer/PartitionAssignor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ class RoundRobinAssignor() extends PartitionAssignor with Logging {
val partitionAssignment =
new Pool[String, mutable.Map[TopicAndPartition, ConsumerThreadId]](Some(valueFactory))

if (ctx.consumersForTopic.size > 0) {
if (ctx.consumersForTopic.nonEmpty) {
// check conditions (a) and (b)
val (headTopic, headThreadIdSet) = (ctx.consumersForTopic.head._1, ctx.consumersForTopic.head._2.toSet)
ctx.consumersForTopic.foreach { case (topic, threadIds) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ class PartitionTopicInfo(val topic: String,
}
}

override def toString(): String = topic + ":" + partitionId.toString + ": fetched offset = " + fetchedOffset.get +
override def toString: String = topic + ":" + partitionId.toString + ": fetched offset = " + fetchedOffset.get +
": consumed offset = " + consumedOffset.get
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -257,13 +257,13 @@ class ControllerBrokerRequestBatch(controller: KafkaController) extends Logging

def newBatch() {
// raise error if the previous batch is not empty
if (leaderAndIsrRequestMap.size > 0)
if (leaderAndIsrRequestMap.nonEmpty)
throw new IllegalStateException("Controller to broker state change requests batch is not empty while creating " +
"a new one. Some LeaderAndIsr state changes %s might be lost ".format(leaderAndIsrRequestMap.toString()))
if (stopReplicaRequestMap.size > 0)
if (stopReplicaRequestMap.nonEmpty)
throw new IllegalStateException("Controller to broker state change requests batch is not empty while creating a " +
"new one. Some StopReplica state changes %s might be lost ".format(stopReplicaRequestMap.toString()))
if (updateMetadataRequestMap.size > 0)
if (updateMetadataRequestMap.nonEmpty)
throw new IllegalStateException("Controller to broker state change requests batch is not empty while creating a " +
"new one. Some UpdateMetadata state changes %s might be lost ".format(updateMetadataRequestMap.toString()))
}
Expand Down Expand Up @@ -424,15 +424,15 @@ class ControllerBrokerRequestBatch(controller: KafkaController) extends Logging
stopReplicaRequestMap.clear()
} catch {
case e : Throwable => {
if (leaderAndIsrRequestMap.size > 0) {
if (leaderAndIsrRequestMap.nonEmpty) {
error("Haven't been able to send leader and isr requests, current state of " +
s"the map is $leaderAndIsrRequestMap. Exception message: $e")
}
if (updateMetadataRequestMap.size > 0) {
if (updateMetadataRequestMap.nonEmpty) {
error("Haven't been able to send metadata update requests, current state of " +
s"the map is $updateMetadataRequestMap. Exception message: $e")
}
if (stopReplicaRequestMap.size > 0) {
if (stopReplicaRequestMap.nonEmpty) {
error("Haven't been able to send stop replica requests, current state of " +
s"the map is $stopReplicaRequestMap. Exception message: $e")
}
Expand Down
Loading

0 comments on commit 79aaf19

Please sign in to comment.