Avro encoder/decoder for use as serializer.class
in Apache Kafka 0.8
Table of Contents
By default Avro uses CharSequence
for fields defined as string
in an Avro schema. Oftentimes this is not what you
want -- see the discussions at AVRO-803 and
Apache Avro: map uses CharSequence as key
for details.
For the unit tests included in this project we explicitly configure Avro to use java.lang.String
instead of
CharSequence
(or Avro's own Utf8
class), which provides us with the behavior you would expect when working with string-like objects in Java/Scala.
How you would configure Avro's "string behavior" depends on your project setup. In our case we are using
sbt and sbt-avro to compile Avro schemas into Java
code. Here, build.sbt is the place to configure Avro through sbt-avro's stringType
setting.
If you are not using sbt but, say, Maven then you would need to add <stringType>String</stringType>
to the
configuration of avro-maven-plugin in your pom.xml
.
The codec assumes you are using Avro with code generation, i.e. you create an Avro schema (see
twitter.avsc) and then compile the schema into Java code. "With code generation" is the
scenario where you would use SpecificDatumWriter
and SpecificDatumReader
in Avro (which is what this codec does)
instead of GenericDatumWriter
and GenericDatumReader
.
See the Avro documentation for details.
Note: The build artifacts published in Clojars.org are built against Java 6.
When using sbt add the following lines to build.sbt
:
resolvers ++= Seq(
"clojars-repository" at "https://clojars.org/repo"
)
libraryDependencies ++= Seq(
"com.miguno" %% "kafka-avro-codec" % "0.1.1"
)
When using gradle add the following lines to build.gradle
:
repositories {
mavenCentral()
maven { url 'https://clojars.org/repo' }
}
dependencies {
compile 'com.miguno:kafka-avro-codec_2.10:0.1.1'
}
Let us assume you have defined an Avro schema for Twitter tweets, and you want to use this schema for data messages that you sent into a Kafka topic (see twitter.avsc). To implement an accompanying Avro encoder/decoder pair for Kafka you would need to extend two base classes:
Example:
package your.app
import kafka.utils.VerifiableProperties
import com.miguno.kafka.avro.AvroEncoder
import com.miguno.avro.Tweet // <-- This is your Avro record, see twitter.avsc
class TweetAvroEncoder[Tweet](props: VerifiableProperties = null) extends AvroEncoder[Tweet](props,
Tweet.getClassSchema)
class TweetAvroDecoder[Tweet](props: VerifiableProperties = null) extends AvroDecoder[Tweet](props,
Tweet.getClassSchema)
TODO
val topic = "zerg.hydra" // name of the Kafka topic you are reading from
val numThreads = 3 // number of threads for reading from that topic (note: #partitions should be >= 3 in this example)
val topicCountMap = Map(topic -> numThreads)
val valueDecoder = new TweetAvroDecoder[Tweet]
val keyDecoder = valueDecoder // or use `null` in case you explicitly not want to use keys (note that in Kafka 0.8
// this means some topic partitions may never see data)
val consumerMap = consumerConnector.createMessageStreams(topicCountMap, keyDecoder, valueDecoder)
- Scala 2.10.5
- sbt 0.13.8
- Java JDK 6 or 7
- Avro 1.7.6
$ ./sbt clean compile
$ ./sbt clean test
$ ./sbt clean package
Create a file ~/.clojars-credentials.txt
with the following contents:
realm=clojars
host=clojars.org
user=YOUR_CLOJARS_USERNAME
password=YOUR_CLOJARS_PASSWORD
Security-wise it is also recommended to chmod 600 ~/.clojars-credentials.txt
.
TODO: Explain versioning etc.
Publish the build artifacts via:
$ ./sbt clean test publish
Code contributions, bug reports, feature requests etc. are all welcome.
If you are new to GitHub please read Contributing to a project for how to send patches and pull requests to kafka-avro-codec.
Copyright © 2014 Michael G. Noll
See LICENSE for licensing information.