Skip to content

Commit

Permalink
Fix bug causing random partitions with partition key
Browse files Browse the repository at this point in the history
Fixes elodina#22

Array[Byte] does not have the identical hashcode even given identical contents. List[Byte] has the desired hashcode property at the cost of some performance.
  • Loading branch information
deanchen committed Dec 26, 2014
1 parent 3406bfe commit f978371
Showing 1 changed file with 3 additions and 3 deletions.
6 changes: 3 additions & 3 deletions src/main/scala/KafkaProducer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -93,17 +93,17 @@ case class KafkaProducer(

val producer = new Producer[AnyRef, AnyRef](new ProducerConfig(props))

def kafkaMesssage(message: Array[Byte], partition: Array[Byte]): KeyedMessage[AnyRef, AnyRef] = {
def kafkaMesssage(message: List[Byte], partition: List[Byte]): KeyedMessage[AnyRef, AnyRef] = {
if (partition == null) {
new KeyedMessage(topic,message)
} else {
new KeyedMessage(topic,partition,message)
}
}

def send(message: String, partition: String = null): Unit = send(message.getBytes("UTF8"), if (partition == null) null else partition.getBytes("UTF8"))
def send(message: String, partition: String = null): Unit = send(message.getBytes("UTF8").toList, if (partition == null) null else partition.getBytes("UTF8").toList)

def send(message: Array[Byte], partition: Array[Byte]): Unit = {
def send(message: List[Byte], partition: List[Byte]): Unit = {
try {
producer.send(kafkaMesssage(message, partition))
} catch {
Expand Down

0 comments on commit f978371

Please sign in to comment.