From 8546dc9f235033544d41eaa3cc2a01afa0955af5 Mon Sep 17 00:00:00 2001 From: asmyczek Date: Thu, 24 Feb 2011 22:05:41 -0800 Subject: [PATCH 01/12] Adding Clojure client, initial commit. --- clients/clojure/.gitignore | 2 + clients/clojure/README.md | 51 ++++ clients/clojure/leiningen/run_example.clj | 7 + clients/clojure/log4j.properties | 5 + clients/clojure/project.clj | 13 + clients/clojure/src/kafka/buffer.clj | 153 ++++++++++ clients/clojure/src/kafka/example.clj | 36 +++ clients/clojure/src/kafka/kafka.clj | 262 ++++++++++++++++++ clients/clojure/src/kafka/print.clj | 22 ++ clients/clojure/src/kafka/serializable.clj | 22 ++ clients/clojure/src/kafka/types.clj | 28 ++ clients/clojure/test/kafka/buffer_test.clj | 47 ++++ clients/clojure/test/kafka/print_test.clj | 12 + .../clojure/test/kafka/serializable_test.clj | 14 + 14 files changed, 674 insertions(+) create mode 100644 clients/clojure/.gitignore create mode 100644 clients/clojure/README.md create mode 100644 clients/clojure/leiningen/run_example.clj create mode 100644 clients/clojure/log4j.properties create mode 100644 clients/clojure/project.clj create mode 100644 clients/clojure/src/kafka/buffer.clj create mode 100644 clients/clojure/src/kafka/example.clj create mode 100644 clients/clojure/src/kafka/kafka.clj create mode 100644 clients/clojure/src/kafka/print.clj create mode 100644 clients/clojure/src/kafka/serializable.clj create mode 100644 clients/clojure/src/kafka/types.clj create mode 100644 clients/clojure/test/kafka/buffer_test.clj create mode 100644 clients/clojure/test/kafka/print_test.clj create mode 100644 clients/clojure/test/kafka/serializable_test.clj diff --git a/clients/clojure/.gitignore b/clients/clojure/.gitignore new file mode 100644 index 0000000..07a2153 --- /dev/null +++ b/clients/clojure/.gitignore @@ -0,0 +1,2 @@ +lib +classes diff --git a/clients/clojure/README.md b/clients/clojure/README.md new file mode 100644 index 0000000..4472637 --- /dev/null +++ b/clients/clojure/README.md @@ -0,0 +1,51 @@ +# kafka-clj +kafka-clj provides a producer and consumer client that supports a basic fetch API as well as a managed sequence interface for lazy message fetching. Multifetch is not supported yet. + +## Quick Start + +Download and start [Kafka](http://sna-projects.com/kafka/quickstart.php). + +Pull dependencies with [Leiningen](https://github.com/technomancy/leiningen): + + $ lein deps + +And run the example: + + $ lein run-example + +## Usage + +### Sending messages + + (with-open [p (producer "localhost" 9092)] + (produce p "test" 0 "Message 1") + (produce p "test" 0 ["Message 2" "Message 3"])) + +### Simple consumer + + (with-open [c (consumer "localhost" 9092)] + (let [offs (offsets c "test" 0 -1 10)] + (consume c "test" 0 (last offs) 1000000))) + +### Consumer sequence + + (with-open [c (consumer "localhost" 9092)] + (doseq [m (consume-seq c "test" 0 {:blocking true})] + (println m))) + +Following options are supported: + +* :blocking _boolean_ default false, seq returns nil as soon all messages are consumed. If set to true, the sequence tries to fetch new messages :repeat-count times every :repeat-timeout milliseconds. +* :repeat-count _int_ number of attempts to fetch new messages before terminating, default 10. +* :repeat-timeout _int_ wait time in milliseconds between fetch attempts, default 1000. +* :offset _long_ initialized to highest offset if not provided. +* :max-size _int_ max result message size, default 1000000. + +### Serialization + +Load _kafka.print_ for basic print_dup or _kafka.serializeable_ for Java object serialization. +For custom serialization implement Pack and Unpack protocols. + + +Questions? Email adam.smyczek \_at\_ gmail.com. + diff --git a/clients/clojure/leiningen/run_example.clj b/clients/clojure/leiningen/run_example.clj new file mode 100644 index 0000000..5a499da --- /dev/null +++ b/clients/clojure/leiningen/run_example.clj @@ -0,0 +1,7 @@ +(ns leiningen.run-example + (:use kafka.example)) + +(defn run-example + [project & args] + (run)) + diff --git a/clients/clojure/log4j.properties b/clients/clojure/log4j.properties new file mode 100644 index 0000000..eeaef64 --- /dev/null +++ b/clients/clojure/log4j.properties @@ -0,0 +1,5 @@ +log4j.rootLogger=DEBUG, A1 + +log4j.appender.A1=org.apache.log4j.ConsoleAppender +log4j.appender.A1.layout=org.apache.log4j.PatternLayout +log4j.appender.A1.layout.ConversionPattern= %-5p %c - %m%n diff --git a/clients/clojure/project.clj b/clients/clojure/project.clj new file mode 100644 index 0000000..214e868 --- /dev/null +++ b/clients/clojure/project.clj @@ -0,0 +1,13 @@ +(defproject kafka-clj "0.1-SNAPSHOT" + :description "Kafka client for Clojure." + :url "http://sna-projects.com/kafka/" + :dependencies [[org.clojure/clojure "1.2.0"] + [org.clojure/clojure-contrib "1.2.0"] + [log4j "1.2.15" :exclusions [javax.mail/mail + javax.jms/jms + com.sun.jdmk/jmxtools + com.sun.jmx/jmxri]]] + :disable-deps-clean false + :warn-on-reflection true + :source-path "src" + :test-path "test") diff --git a/clients/clojure/src/kafka/buffer.clj b/clients/clojure/src/kafka/buffer.clj new file mode 100644 index 0000000..05e6717 --- /dev/null +++ b/clients/clojure/src/kafka/buffer.clj @@ -0,0 +1,153 @@ +(ns #^{:doc "Wrapper around ByteBuffer, + provides a DSL to model byte messages."} + kafka.buffer + (:import (java.nio ByteBuffer) + (java.nio.channels SocketChannel))) + +(def #^{:doc "Buffer stack bind in with-buffer."} + *buf* []) + +(defn ^ByteBuffer top + "Retrieve top buffer from *buf* stack." + [] + (peek *buf*)) + +; +; Writing to buffer +; + +(defprotocol Put + "Put protocol defines a generic buffer put method." + (put [this])) + +; Put implementations + +(extend-type Byte + Put + (put [this] (.put (top) this))) + +(extend-type Integer + Put + (put [this] (.putInt (top) this))) + +(extend-type Short + Put + (put [this] (.putShort (top) this))) + +(extend-type Long + Put + (put [this] (.putLong (top) this))) + +(extend-type String + Put + (put [this] (.put (top) (.getBytes this "UTF-8")))) + +(extend-type (class (byte-array 0)) + Put + (put [this] (.put (top) ^bytes this))) + +(extend-type clojure.lang.IPersistentCollection + Put + (put [this] (doseq [e this] (put e)))) + +(defmacro length-encoded + [type & body] + `(with-buffer (.slice (top)) + (put (~type 0)) + (let [^ByteBuffer this# (top) + ^ByteBuffer parent# (peek (pop *buf*)) + type-size# (.position this#)] + ~@body + (let [size# (.position this#)] + (.rewind this#) + (put (~type (- size# type-size#))) + (.position parent# (+ (.position parent#) size#)))))) + +(defmacro with-put + [size f & body] + `(with-buffer (.slice (top)) + (put (byte-array ~size)) + ~@body + (let [^ByteBuffer this# (top) + ^ByteBuffer parent# (peek (pop *buf*)) + pos# (.position this#) + ba# (byte-array (- pos# ~size))] + (doto this# (.rewind) (.get (byte-array ~size)) (.get ba#)) + (.rewind this#) + (put (~f ba#)) + (.position parent# (+ (.position parent#) pos#))))) + +; +; Read from buffer +; + +(defn get-byte + [] + (.get (top))) + +(defn get-short + [] + (.getShort (top))) + +(defn get-int + [] + (.getInt (top))) + +(defn get-long + [] + (.getLong (top))) + +(defn get-array + "Reads byte array of argument length from buffer." + [^int length] + (let [ba (byte-array length)] + (.get (top) ba) + ba)) + +(defn get-string + "Reads string of argument length from buffer." + [^int length] + (let [ba (byte-array length)] + (.get (top) ba) + (String. ba "UTF-8"))) + +; +; Util functions and macros +; + +(defmacro with-buffer + "Evaluates body in the context of the buffer." + [buffer & body] + `(binding [*buf* (conj *buf* ~buffer)] + ~@body)) + +(defn flip + [] + (.flip (top))) + +(defn rewind + [] + (.rewind (top))) + +(defn clear + [] + (.clear (top))) + +(defn has-remaining + [] + (.hasRemaining (top))) + +(defn read-from + "Reads from channel to the underlying top buffer. + Throws ConnectException if channel is closed." + [^SocketChannel channel] + (let [size (.read channel (top))] + (if (< size 0) + (throw (java.net.ConnectException. "Channel closed?")) + size))) + +(defn write-to + "Writes underlying top buffer to channel." + [^SocketChannel channel] + (.write channel (top))) + diff --git a/clients/clojure/src/kafka/example.clj b/clients/clojure/src/kafka/example.clj new file mode 100644 index 0000000..4123242 --- /dev/null +++ b/clients/clojure/src/kafka/example.clj @@ -0,0 +1,36 @@ +(ns #^{:doc "Producer/Consumer example."} + kafka.example + (:use (clojure.contrib logging) + (kafka types kafka print))) + +(defmacro thread + "Executes body in a thread, logs exceptions." + [ & body] + `(future + (try + ~@body + (catch Exception e# + (error "Consumer exception." e#))))) + +(defn start-consumer + [] + (thread + (with-open [c (consumer "localhost" 9092)] + (doseq [m (consume-seq c "test" 0 {:blocking true})] + (println "Consumed <-- " m))))) + +(defn start-producer + [] + (thread + (with-open [p (producer "localhost" 9092)] + (doseq [i (range 1 30)] + (let [m (str "Message " i)] + (produce p "test" 0 m) + (println "Produced --> " m) + (Thread/sleep 1000)))))) + +(defn run + [] + (start-consumer) + (start-producer)) + diff --git a/clients/clojure/src/kafka/kafka.clj b/clients/clojure/src/kafka/kafka.clj new file mode 100644 index 0000000..a10d1b7 --- /dev/null +++ b/clients/clojure/src/kafka/kafka.clj @@ -0,0 +1,262 @@ +(ns #^{:doc "Core kafka-clj module, + provides producer and consumer factories."} + kafka.kafka + (:use (kafka types buffer) + (clojure.contrib logging)) + (:import (kafka.types Message) + (java.nio ByteBuffer) + (java.nio.channels SocketChannel) + (java.net InetSocketAddress) + (java.util.zip CRC32))) + +; +; Utils +; + +(defn- crc32-int + "CRC for byte array." + [^bytes ba] + (let [crc (doto (CRC32.) (.update ba)) + lv (.getValue crc)] + (.intValue (bit-and lv 0xffffffff)))) + +(defn- new-channel + "Create and setup a new channel for a host and port. + Supported options: + :buffer-size - socket buffer size for send and receive buffer. + :so-timeout - socket timeout." + [host port opts] + (let [buffer-size (or (:buffer-size opts) 65536) + so-timeout (or (:so-timeout opts) 50000) + ch (SocketChannel/open)] + (doto (.socket ch) + (.setReceiveBufferSize buffer-size) + (.setSendBufferSize buffer-size) + (.setSoTimeout so-timeout)) + (doto ch + (.configureBlocking true) + (.connect (InetSocketAddress. host port))))) + +(defn- close-channel + "Close the channel." + [channel] + (.close channel) + (.close (.socket channel))) + +(defn- response-size + "Read first four bytes from channel as an integer." + [channel] + (with-buffer (ByteBuffer/allocate 4) + (read-from channel) + (flip) + (get-int))) + +(defmacro with-error-code + "Convenience response error code check." + [ & body] + `(let [error-code# (get-short)] ; error code + (if (not= error-code# 0) + (error (str "Error code: " error-code#)) + ~@body))) + +; +; Producer +; + +(defn- send-message + "Send messages." + [channel topic partition messages opts] + (let [size (or (:buffer-size opts) 65536)] + (with-buffer (ByteBuffer/allocate size) + (length-encoded int ; request size + (put (short 0)) ; request type + (length-encoded short ; topic size + (put topic)) ; topic + (put (int partition)) ; partition + (length-encoded int ; messages size + (doseq [m messages] + (length-encoded int ; message size + (put (byte 0)) ; magic + (with-put 4 crc32-int ; crc + (put (.message (pack m)))))))) ; message + (flip) + (write-to channel)))) + +(defn producer + "Producer factory." + [host port & [opts]] + (let [channel (new-channel host port opts)] + (reify Producer + (produce [this topic partition messages] + (let [msg (if (sequential? messages) messages [messages])] + (send-message channel topic partition msg opts))) + (close [this] + (close-channel channel))))) + +; +; Consumer +; + +; Offset + +(defn- offset-fetch-request + "Fetch offsets request." + [channel topic partition time max-offsets] + (let [size (+ 4 2 2 (count topic) 4 8 4)] + (with-buffer (ByteBuffer/allocate size) + (length-encoded int ; request size + (put (short 4)) ; request type + (length-encoded short ; topic size + (put topic)) ; topic + (put (int partition)) ; partition + (put (long time)) ; time + (put (int max-offsets))) ; max-offsets + (flip) + (write-to channel)))) + +(defn- fetch-offsets + "Fetch offsets as an integer sequence." + [channel topic partition time max-offsets] + (offset-fetch-request channel topic partition time max-offsets) + (let [rsp-size (response-size channel)] + (with-buffer (ByteBuffer/allocate rsp-size) + (read-from channel) + (flip) + (with-error-code + (repeat (get-int) (get-long)))))) + +; Messages + +(defn- message-fetch-request + "Fetch messages request." + [channel topic partition offset max-size] + (let [size (+ 4 2 2 (count topic) 4 8 4)] + (with-buffer (ByteBuffer/allocate size) + (length-encoded int ; request size + (put (short 1)) ; request type + (length-encoded short ; topic size + (put topic)) ; topic + (put (int partition)) ; partition + (put (long offset)) ; offset + (put (int max-size))) ; max size + (flip) + (write-to channel)))) + +(defn- read-response + "Read response from buffer. Returns a pair [new offset, messages sequence]." + [offset] + (with-error-code + (loop [off offset msg (clojure.lang.PersistentQueue/EMPTY)] + (if (has-remaining) + (let [size (get-int) ; message size + magic (get-byte) ; magic + crc (get-int) ; crc + message (get-array (- size 5))] + (recur (+ off size 4) (conj msg (unpack (Message. message))))) + [off msg])))) + +(defn- fetch-messages + "Message fetch, returns a pair [new offset, messages sequence]." + [channel topic partition offset max-size] + (message-fetch-request channel topic partition offset max-size) + (let [rsp-size (response-size channel)] + (with-buffer (ByteBuffer/allocate rsp-size) + (read-from channel) + (flip) + (read-response offset)))) + +; Consumer sequence + +(defn- seq-fetch + "Non-blocking fetch function used by consumer sequence." + [channel topic partition opts] + (let [max-size (or (:max-size opts) 1000000)] + (fn [offset] + (fetch-messages channel topic partition offset max-size)))) + +(defn- blocking-seq-fetch + "Blocking fetch function used by consumer sequence." + [channel topic partition opts] + (let [repeat-count (or (:repeat-count opts) 10) + repeat-timeout (or (:repeat-timeout opts) 1000) + max-size (or (:max-size opts) 1000000)] + (fn [offset] + (loop [c repeat-count] + (if (> c 0) + (let [rs (fetch-messages channel topic partition offset max-size)] + (if (or (nil? rs) (= offset (first rs))) + (do + (Thread/sleep repeat-timeout) + (recur (dec c))) + rs)) + (debug "Stopping blocking seq fetch.")))))) + +(defn- fetch-queue + [offset queue fetch-fn] + (if (empty? @queue) + (if-let [[new-offset msg] (fetch-fn @offset)] + (do + (debug (str "Fetched " (count msg) " messages.")) + (debug (str "New offset " new-offset ".")) + (swap! queue #(reduce conj % (reverse msg))) + (reset! offset new-offset))))) + +(defn- consumer-seq + "Sequence constructor." + [offset fetch-fn] + (let [offset (atom offset) + queue (atom (clojure.lang.PersistentQueue/EMPTY))] + (reify + clojure.lang.IPersistentCollection + (seq [this] this) + (cons [this _] (throw (Exception. "cons not supported for consumer sequence."))) + (empty [this] nil) + (equiv [this o] + (fatal "Implement equiv for consumer seq!") + false) + clojure.lang.ISeq + (first [this] + (fetch-queue offset queue fetch-fn) + (first @queue)) + (next [this] + (swap! queue rest) + (fetch-queue offset queue fetch-fn) + (if (not (empty? @queue)) this)) + (more [this] + (swap! queue rest) + (fetch-queue offset queue fetch-fn) + (if (empty? @queue) (empty) this)) + Object + (toString [this] + (str "ConsumerQueue"))))) + +; Consumer factory + +(defn consumer + "Consumer factory." + [host port & [opts]] + (let [channel (new-channel host port opts)] + (reify Consumer + (consume [this topic partition offset max-size] + (fetch-messages channel topic partition offset max-size)) + + (offsets [this topic partition time max-offsets] + (fetch-offsets channel topic partition time max-offsets)) + + (consume-seq [this topic partition] + (let [[offset] (fetch-offsets channel topic partition -1 1)] + (debug (str "Initializing last offset to " offset ".")) + (consumer-seq offset (seq-fetch channel topic partition opts)))) + + (consume-seq [this topic partition opts] + (let [[offset] (or (:offset opts) + (fetch-offsets channel topic partition -1 1)) + fetch-fn (if (:blocking opts) + (blocking-seq-fetch channel topic partition opts) + (seq-fetch channel topic partition opts))] + (debug (str "Initializing last offset to " offset ".")) + (consumer-seq offset fetch-fn))) + + (close [this] + (close-channel channel))))) + diff --git a/clients/clojure/src/kafka/print.clj b/clients/clojure/src/kafka/print.clj new file mode 100644 index 0000000..51c5b64 --- /dev/null +++ b/clients/clojure/src/kafka/print.clj @@ -0,0 +1,22 @@ +(ns #^{:doc "Basic Clojure print-dup -> read-string message serialization."} + kafka.print + (:use kafka.types) + (:import (kafka.types Message))) + +(extend-type Object + Pack + (pack [this] + (let [^String st (with-out-str (print-dup this *out*))] + (kafka.types.Message. (.getBytes st "UTF-8"))))) + +(extend-type Message + Unpack + (unpack [this] + (let [^bytes ba (.message this) + msg (String. ba "UTF-8")] + (if (not (empty? msg)) + (try + (read-string msg) + (catch Exception e + (println "Invalid expression " msg))))))) + diff --git a/clients/clojure/src/kafka/serializable.clj b/clients/clojure/src/kafka/serializable.clj new file mode 100644 index 0000000..a4fe0aa --- /dev/null +++ b/clients/clojure/src/kafka/serializable.clj @@ -0,0 +1,22 @@ +(ns #^{:doc "Serialization for all Java Serializable objects."} + kafka.serializable + (:use kafka.types) + (:import (kafka.types Message) + (java.io Serializable + ObjectOutputStream ByteArrayOutputStream + ObjectInputStream ByteArrayInputStream))) + +(extend-type Serializable + Pack + (pack [this] + (let [bas (ByteArrayOutputStream.)] + (with-open [oos (ObjectOutputStream. bas)] + (.writeObject oos this)) + (kafka.types.Message. (.toByteArray bas))))) + +(extend-type Message + Unpack + (unpack [this] + (with-open [ois (ObjectInputStream. (ByteArrayInputStream. (.message this)))] + (.readObject ois)))) + diff --git a/clients/clojure/src/kafka/types.clj b/clients/clojure/src/kafka/types.clj new file mode 100644 index 0000000..98e8143 --- /dev/null +++ b/clients/clojure/src/kafka/types.clj @@ -0,0 +1,28 @@ +(ns #^{:doc "Base kafka-clj types."} + kafka.types) + +(deftype #^{:doc "Message type, a wrapper around a byte array."} + Message [^bytes message]) + +(defprotocol Pack + "Pack protocol converts an object into a Message." + (pack [this] "Convert object to a message.")) + +(defprotocol Unpack + "Unpack protocol, reads an object from a Message." + (unpack [^Message this] "Read an object from the message.")) + +(defprotocol Producer + "Producer protocol." + (produce [this topic partition messages] "Send message[s] for a topic to a partition.") + (close [this] "Closes the producer, socket and channel.")) + +(defprotocol Consumer + "Consumer protocol." + (consume [this topic partition offset max-size] "Fetch messages. Returns a pair [last-offset, message sequence]") + (offsets [this topic partition time max-offsets] "Query offsets. Returns offsets seq.") + + (consume-seq [this topic partition] + [this topic partition opts] "Creates a lazy sequence over the consumer.") + (close [this] "Close the consumer, socket and channel.")) + diff --git a/clients/clojure/test/kafka/buffer_test.clj b/clients/clojure/test/kafka/buffer_test.clj new file mode 100644 index 0000000..29b5ac6 --- /dev/null +++ b/clients/clojure/test/kafka/buffer_test.clj @@ -0,0 +1,47 @@ +(ns kafka.buffer-test + (:use (kafka buffer) + clojure.test) + (:import (java.nio ByteBuffer))) + +(deftest test-put-get + (with-buffer (ByteBuffer/allocate 64) + (put (byte 5)) + (put (short 10)) + (put (int 20)) + (put (long 40)) + (put "test") + (put (byte-array 3 [(byte 1) (byte 2) (byte 3)])) + (flip) + + (is (= (get-byte) (byte 5))) + (is (= (get-short) (short 10))) + (is (= (get-int) (int 20))) + (is (= (get-long) (long 40))) + (is (= (get-string 4) "test")) + (let [ba (get-array 3)] + (is (= (nth ba 0) (byte 1))) + (is (= (nth ba 1) (byte 2))) + (is (= (nth ba 2) (byte 3)))))) + +(deftest test-with-put + (with-buffer (ByteBuffer/allocate 64) + (with-put 4 count + (put "test 1")) + (flip) + + (is (= (get-int) (int 6))) + (is (= (get-string 6) "test 1")))) + +(deftest test-length-encoded + (with-buffer (ByteBuffer/allocate 64) + (length-encoded short + (put "test 1")) + (length-encoded int + (put "test 2")) + (flip) + + (is (= (get-short) (short 6))) + (is (= (get-string 6) "test 1")) + (is (= (get-int) (int 6))) + (is (= (get-string 6) "test 2")))) + diff --git a/clients/clojure/test/kafka/print_test.clj b/clients/clojure/test/kafka/print_test.clj new file mode 100644 index 0000000..c326f3f --- /dev/null +++ b/clients/clojure/test/kafka/print_test.clj @@ -0,0 +1,12 @@ +(ns kafka.print-test + (:use (kafka types print) + clojure.test)) + +(deftest test-pack-unpack + (is (= "test" (unpack (pack "test")))) + (is (= 123 (unpack (pack 123)))) + (is (= true (unpack (pack true)))) + (is (= [1 2 3] (unpack (pack [1 2 3])))) + (is (= {:a 1} (unpack (pack {:a 1})))) + (is (= '(+ 1 2 3) (unpack (pack '(+ 1 2 3)))))) + diff --git a/clients/clojure/test/kafka/serializable_test.clj b/clients/clojure/test/kafka/serializable_test.clj new file mode 100644 index 0000000..de364b1 --- /dev/null +++ b/clients/clojure/test/kafka/serializable_test.clj @@ -0,0 +1,14 @@ +(ns kafka.serializable-test + (:use (kafka types serializable) + clojure.test)) + +(deftest test-pack-unpack + (is (= "test" (unpack (pack "test")))) + (is (= 123 (unpack (pack 123)))) + (is (= true (unpack (pack true)))) + (is (= [1 2 3] (unpack (pack [1 2 3])))) + (is (= {:a 1} (unpack (pack {:a 1})))) + (is (= '(+ 1 2 3) (unpack (pack '(+ 1 2 3))))) + (let [now (java.util.Date.)] + (is (= now (unpack (pack now)))))) + From 8b8b5025137e0ca779cadd73910fd4717b7bd60f Mon Sep 17 00:00:00 2001 From: asmyczek Date: Thu, 24 Feb 2011 22:39:05 -0800 Subject: [PATCH 02/12] Offsets fetch fix. --- clients/clojure/src/kafka/kafka.clj | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/clients/clojure/src/kafka/kafka.clj b/clients/clojure/src/kafka/kafka.clj index a10d1b7..3978f9f 100644 --- a/clients/clojure/src/kafka/kafka.clj +++ b/clients/clojure/src/kafka/kafka.clj @@ -123,7 +123,10 @@ (read-from channel) (flip) (with-error-code - (repeat (get-int) (get-long)))))) + (loop [c (get-int) res []] + (if (> c 0) + (recur (dec c) (conj res (get-long))) + (if (empty? res) [0] res))))))) ; Messages @@ -146,7 +149,7 @@ "Read response from buffer. Returns a pair [new offset, messages sequence]." [offset] (with-error-code - (loop [off offset msg (clojure.lang.PersistentQueue/EMPTY)] + (loop [off offset msg []] (if (has-remaining) (let [size (get-int) ; message size magic (get-byte) ; magic From 3fed84c3368d4f9ed12924dbbf02383c8614e225 Mon Sep 17 00:00:00 2001 From: asmyczek Date: Fri, 25 Feb 2011 22:08:53 -0800 Subject: [PATCH 03/12] Adding complete buffer read from channel, enforcing eager buffer reads. --- clients/clojure/src/kafka/buffer.clj | 15 +++++++++++++ clients/clojure/src/kafka/kafka.clj | 32 ++++++++++++++-------------- 2 files changed, 31 insertions(+), 16 deletions(-) diff --git a/clients/clojure/src/kafka/buffer.clj b/clients/clojure/src/kafka/buffer.clj index 05e6717..6b025b7 100644 --- a/clients/clojure/src/kafka/buffer.clj +++ b/clients/clojure/src/kafka/buffer.clj @@ -7,6 +7,9 @@ (def #^{:doc "Buffer stack bind in with-buffer."} *buf* []) +(def #^{:doc "Number of attempts to read a complete buffer from channel."} + *channel-read-count* 3) + (defn ^ByteBuffer top "Retrieve top buffer from *buf* stack." [] @@ -146,6 +149,18 @@ (throw (java.net.ConnectException. "Channel closed?")) size))) +(defn read-completely-from + "Read the complete top buffer from the channel." + [^SocketChannel channel] + (loop [t *channel-read-count* size 0] + (let [s (read-from channel)] + (cond + (< t 0) + (throw (Exception. "Unable to read complete buffer from channel.")) + (has-remaining) + (recur (dec t) (+ size s)) + :else size)))) + (defn write-to "Writes underlying top buffer to channel." [^SocketChannel channel] diff --git a/clients/clojure/src/kafka/kafka.clj b/clients/clojure/src/kafka/kafka.clj index 3978f9f..3bdde4a 100644 --- a/clients/clojure/src/kafka/kafka.clj +++ b/clients/clojure/src/kafka/kafka.clj @@ -27,7 +27,7 @@ :so-timeout - socket timeout." [host port opts] (let [buffer-size (or (:buffer-size opts) 65536) - so-timeout (or (:so-timeout opts) 50000) + so-timeout (or (:so-timeout opts) 60000) ch (SocketChannel/open)] (doto (.socket ch) (.setReceiveBufferSize buffer-size) @@ -47,16 +47,16 @@ "Read first four bytes from channel as an integer." [channel] (with-buffer (ByteBuffer/allocate 4) - (read-from channel) + (read-completely-from channel) (flip) (get-int))) (defmacro with-error-code "Convenience response error code check." - [ & body] + [request & body] `(let [error-code# (get-short)] ; error code (if (not= error-code# 0) - (error (str "Error code: " error-code#)) + (error (str "Request " ~request " returned with error code: " error-code# ".")) ~@body))) ; @@ -122,11 +122,11 @@ (with-buffer (ByteBuffer/allocate rsp-size) (read-from channel) (flip) - (with-error-code + (with-error-code "Fetch-Offsets" (loop [c (get-int) res []] (if (> c 0) (recur (dec c) (conj res (get-long))) - (if (empty? res) [0] res))))))) + (doall res))))))) ; Messages @@ -148,7 +148,7 @@ (defn- read-response "Read response from buffer. Returns a pair [new offset, messages sequence]." [offset] - (with-error-code + (with-error-code "Fetch-Messages" (loop [off offset msg []] (if (has-remaining) (let [size (get-int) ; message size @@ -156,7 +156,7 @@ crc (get-int) ; crc message (get-array (- size 5))] (recur (+ off size 4) (conj msg (unpack (Message. message))))) - [off msg])))) + [off (doall msg)])))) (defn- fetch-messages "Message fetch, returns a pair [new offset, messages sequence]." @@ -164,7 +164,7 @@ (message-fetch-request channel topic partition offset max-size) (let [rsp-size (response-size channel)] (with-buffer (ByteBuffer/allocate rsp-size) - (read-from channel) + (read-completely-from channel) (flip) (read-response offset)))) @@ -191,15 +191,15 @@ (do (Thread/sleep repeat-timeout) (recur (dec c))) - rs)) + (doall rs))) (debug "Stopping blocking seq fetch.")))))) (defn- fetch-queue [offset queue fetch-fn] (if (empty? @queue) - (if-let [[new-offset msg] (fetch-fn @offset)] - (do - (debug (str "Fetched " (count msg) " messages.")) + (let [[new-offset msg] (fetch-fn @offset)] + (when new-offset + (debug (str "Fetched " (count msg) " messages:")) (debug (str "New offset " new-offset ".")) (swap! queue #(reduce conj % (reverse msg))) (reset! offset new-offset))))) @@ -208,7 +208,7 @@ "Sequence constructor." [offset fetch-fn] (let [offset (atom offset) - queue (atom (clojure.lang.PersistentQueue/EMPTY))] + queue (atom [])] (reify clojure.lang.IPersistentCollection (seq [this] this) @@ -249,7 +249,7 @@ (consume-seq [this topic partition] (let [[offset] (fetch-offsets channel topic partition -1 1)] (debug (str "Initializing last offset to " offset ".")) - (consumer-seq offset (seq-fetch channel topic partition opts)))) + (consumer-seq (or offset 0) (seq-fetch channel topic partition opts)))) (consume-seq [this topic partition opts] (let [[offset] (or (:offset opts) @@ -258,7 +258,7 @@ (blocking-seq-fetch channel topic partition opts) (seq-fetch channel topic partition opts))] (debug (str "Initializing last offset to " offset ".")) - (consumer-seq offset fetch-fn))) + (consumer-seq (or offset 0) fetch-fn))) (close [this] (close-channel channel))))) From f5f3f8c80c057bf73937e95a3db96a55c0b6053a Mon Sep 17 00:00:00 2001 From: asmyczek Date: Fri, 25 Feb 2011 23:30:08 -0800 Subject: [PATCH 04/12] Maintaining recieved message order in consumer-seq. --- clients/clojure/src/kafka/example.clj | 8 +++++--- clients/clojure/src/kafka/kafka.clj | 4 ++-- 2 files changed, 7 insertions(+), 5 deletions(-) diff --git a/clients/clojure/src/kafka/example.clj b/clients/clojure/src/kafka/example.clj index 4123242..acc81ad 100644 --- a/clients/clojure/src/kafka/example.clj +++ b/clients/clojure/src/kafka/example.clj @@ -10,14 +10,15 @@ (try ~@body (catch Exception e# - (error "Consumer exception." e#))))) + (error "Exception." e#))))) (defn start-consumer [] (thread (with-open [c (consumer "localhost" 9092)] (doseq [m (consume-seq c "test" 0 {:blocking true})] - (println "Consumed <-- " m))))) + (println "Consumed <-- " m))) + (println "Finished consuming."))) (defn start-producer [] @@ -27,7 +28,8 @@ (let [m (str "Message " i)] (produce p "test" 0 m) (println "Produced --> " m) - (Thread/sleep 1000)))))) + (Thread/sleep 1000)))) + (println "Finished producing."))) (defn run [] diff --git a/clients/clojure/src/kafka/kafka.clj b/clients/clojure/src/kafka/kafka.clj index 3bdde4a..1a1dbef 100644 --- a/clients/clojure/src/kafka/kafka.clj +++ b/clients/clojure/src/kafka/kafka.clj @@ -56,7 +56,7 @@ [request & body] `(let [error-code# (get-short)] ; error code (if (not= error-code# 0) - (error (str "Request " ~request " returned with error code: " error-code# ".")) + (error (str "Request " ~request " returned error code: " error-code# ".")) ~@body))) ; @@ -208,7 +208,7 @@ "Sequence constructor." [offset fetch-fn] (let [offset (atom offset) - queue (atom [])] + queue (atom (seq []))] (reify clojure.lang.IPersistentCollection (seq [this] this) From e540f7a7c3f105711376cfa4fcf73bdd12722dfe Mon Sep 17 00:00:00 2001 From: asmyczek Date: Sat, 26 Feb 2011 20:59:09 -0800 Subject: [PATCH 05/12] Updating run-example task, moving log4j.properties to resources path in classpath. --- clients/clojure/leiningen/run_example.clj | 7 +++++-- clients/clojure/{ => resources}/log4j.properties | 0 2 files changed, 5 insertions(+), 2 deletions(-) rename clients/clojure/{ => resources}/log4j.properties (100%) diff --git a/clients/clojure/leiningen/run_example.clj b/clients/clojure/leiningen/run_example.clj index 5a499da..7e3fd0e 100644 --- a/clients/clojure/leiningen/run_example.clj +++ b/clients/clojure/leiningen/run_example.clj @@ -1,7 +1,10 @@ (ns leiningen.run-example - (:use kafka.example)) + (:use [leiningen.compile :only (eval-in-project)])) (defn run-example [project & args] - (run)) + (eval-in-project project + `(do + (require 'kafka.example) + (kafka.example/run)))) diff --git a/clients/clojure/log4j.properties b/clients/clojure/resources/log4j.properties similarity index 100% rename from clients/clojure/log4j.properties rename to clients/clojure/resources/log4j.properties From e0634a19bd0f28c80df7efc99f9856b4b0ccf227 Mon Sep 17 00:00:00 2001 From: asmyczek Date: Sat, 26 Feb 2011 22:08:34 -0800 Subject: [PATCH 06/12] Minor cleanup. --- clients/clojure/README.md | 7 +-- clients/clojure/resources/log4j.properties | 2 +- clients/clojure/src/kafka/buffer.clj | 49 ++++++++++------- clients/clojure/src/kafka/example.clj | 2 +- clients/clojure/src/kafka/kafka.clj | 64 +++++++++++----------- clients/clojure/src/kafka/types.clj | 6 +- clients/clojure/test/kafka/buffer_test.clj | 9 ++- 7 files changed, 73 insertions(+), 66 deletions(-) diff --git a/clients/clojure/README.md b/clients/clojure/README.md index 4472637..2a65f88 100644 --- a/clients/clojure/README.md +++ b/clients/clojure/README.md @@ -1,5 +1,5 @@ # kafka-clj -kafka-clj provides a producer and consumer client that supports a basic fetch API as well as a managed sequence interface for lazy message fetching. Multifetch is not supported yet. +kafka-clj provides a producer and consumer that supports a basic fetch API as well as a managed sequence interface. Multifetch is not supported yet. ## Quick Start @@ -35,7 +35,7 @@ And run the example: Following options are supported: -* :blocking _boolean_ default false, seq returns nil as soon all messages are consumed. If set to true, the sequence tries to fetch new messages :repeat-count times every :repeat-timeout milliseconds. +* :blocking _boolean_ default false, sequence returns nil the first time fetch does not return new messages. If set to true, the sequence tries to fetch new messages :repeat-count times every :repeat-timeout milliseconds. * :repeat-count _int_ number of attempts to fetch new messages before terminating, default 10. * :repeat-timeout _int_ wait time in milliseconds between fetch attempts, default 1000. * :offset _long_ initialized to highest offset if not provided. @@ -43,8 +43,7 @@ Following options are supported: ### Serialization -Load _kafka.print_ for basic print_dup or _kafka.serializeable_ for Java object serialization. -For custom serialization implement Pack and Unpack protocols. +Load namespace _kafka.print_ for basic print_dup/read-string serialization or _kafka.serializeable_ for Java object serialization. For custom serialization implement Pack and Unpack protocols. Questions? Email adam.smyczek \_at\_ gmail.com. diff --git a/clients/clojure/resources/log4j.properties b/clients/clojure/resources/log4j.properties index eeaef64..51597ef 100644 --- a/clients/clojure/resources/log4j.properties +++ b/clients/clojure/resources/log4j.properties @@ -1,4 +1,4 @@ -log4j.rootLogger=DEBUG, A1 +log4j.rootLogger=INFO, A1 log4j.appender.A1=org.apache.log4j.ConsoleAppender log4j.appender.A1.layout=org.apache.log4j.PatternLayout diff --git a/clients/clojure/src/kafka/buffer.clj b/clients/clojure/src/kafka/buffer.clj index 6b025b7..b427968 100644 --- a/clients/clojure/src/kafka/buffer.clj +++ b/clients/clojure/src/kafka/buffer.clj @@ -8,23 +8,46 @@ *buf* []) (def #^{:doc "Number of attempts to read a complete buffer from channel."} - *channel-read-count* 3) + *channel-read-count* 5) + +; +; Main buffer functions +; + +(defn buffer + "Creates a new ByteBuffer of argument size." + [^int size] + (ByteBuffer/allocate size)) (defn ^ByteBuffer top - "Retrieve top buffer from *buf* stack." + "Returns top buffer from *buf* stack." [] (peek *buf*)) +(defn flip + [] + (.flip (top))) + +(defn rewind + [] + (.rewind (top))) + +(defn clear + [] + (.clear (top))) + +(defn has-remaining + [] + (.hasRemaining (top))) + ; -; Writing to buffer +; Write to buffer ; (defprotocol Put "Put protocol defines a generic buffer put method." (put [this])) -; Put implementations - (extend-type Byte Put (put [this] (.put (top) this))) @@ -124,22 +147,6 @@ `(binding [*buf* (conj *buf* ~buffer)] ~@body)) -(defn flip - [] - (.flip (top))) - -(defn rewind - [] - (.rewind (top))) - -(defn clear - [] - (.clear (top))) - -(defn has-remaining - [] - (.hasRemaining (top))) - (defn read-from "Reads from channel to the underlying top buffer. Throws ConnectException if channel is closed." diff --git a/clients/clojure/src/kafka/example.clj b/clients/clojure/src/kafka/example.clj index acc81ad..9356e42 100644 --- a/clients/clojure/src/kafka/example.clj +++ b/clients/clojure/src/kafka/example.clj @@ -24,7 +24,7 @@ [] (thread (with-open [p (producer "localhost" 9092)] - (doseq [i (range 1 30)] + (doseq [i (range 1 20)] (let [m (str "Message " i)] (produce p "test" 0 m) (println "Produced --> " m) diff --git a/clients/clojure/src/kafka/kafka.clj b/clients/clojure/src/kafka/kafka.clj index 1a1dbef..95f671c 100644 --- a/clients/clojure/src/kafka/kafka.clj +++ b/clients/clojure/src/kafka/kafka.clj @@ -4,9 +4,8 @@ (:use (kafka types buffer) (clojure.contrib logging)) (:import (kafka.types Message) - (java.nio ByteBuffer) (java.nio.channels SocketChannel) - (java.net InetSocketAddress) + (java.net Socket InetSocketAddress) (java.util.zip CRC32))) ; @@ -21,17 +20,19 @@ (.intValue (bit-and lv 0xffffffff)))) (defn- new-channel - "Create and setup a new channel for a host and port. + "Create and setup a new channel for a host name, port and options. Supported options: - :buffer-size - socket buffer size for send and receive buffer. - :so-timeout - socket timeout." - [host port opts] - (let [buffer-size (or (:buffer-size opts) 65536) - so-timeout (or (:so-timeout opts) 60000) + :receive-buffer-size - receive socket buffer size, default 65536. + :send-buffer-size - send socket buffer size, default 65536. + :socket-timeout - socket timeout." + [^String host ^Integer port opts] + (let [receive-buf-size (or (:receive-buffer-size opts) 65536) + send-buf-size (or (:send-buffer-size opts) 65536) + so-timeout (or (:socket-timeout opts) 60000) ch (SocketChannel/open)] (doto (.socket ch) - (.setReceiveBufferSize buffer-size) - (.setSendBufferSize buffer-size) + (.setReceiveBufferSize receive-buf-size) + (.setSendBufferSize send-buf-size) (.setSoTimeout so-timeout)) (doto ch (.configureBlocking true) @@ -39,14 +40,14 @@ (defn- close-channel "Close the channel." - [channel] + [^SocketChannel channel] (.close channel) (.close (.socket channel))) (defn- response-size "Read first four bytes from channel as an integer." [channel] - (with-buffer (ByteBuffer/allocate 4) + (with-buffer (buffer 4) (read-completely-from channel) (flip) (get-int))) @@ -66,24 +67,25 @@ (defn- send-message "Send messages." [channel topic partition messages opts] - (let [size (or (:buffer-size opts) 65536)] - (with-buffer (ByteBuffer/allocate size) - (length-encoded int ; request size - (put (short 0)) ; request type - (length-encoded short ; topic size - (put topic)) ; topic - (put (int partition)) ; partition - (length-encoded int ; messages size + (let [size (or (:send-buffer-size opts) 65536)] + (with-buffer (buffer size) + (length-encoded int ; request size + (put (short 0)) ; request type + (length-encoded short ; topic size + (put topic)) ; topic + (put (int partition)) ; partition + (length-encoded int ; messages size (doseq [m messages] - (length-encoded int ; message size - (put (byte 0)) ; magic - (with-put 4 crc32-int ; crc - (put (.message (pack m)))))))) ; message + (let [^Message pm (pack m)] + (length-encoded int ; message size + (put (byte 0)) ; magic + (with-put 4 crc32-int ; crc + (put (.message pm)))))))) ; message (flip) (write-to channel)))) (defn producer - "Producer factory." + "Producer factory. See new-channel for list of supported options." [host port & [opts]] (let [channel (new-channel host port opts)] (reify Producer @@ -103,7 +105,7 @@ "Fetch offsets request." [channel topic partition time max-offsets] (let [size (+ 4 2 2 (count topic) 4 8 4)] - (with-buffer (ByteBuffer/allocate size) + (with-buffer (buffer size) (length-encoded int ; request size (put (short 4)) ; request type (length-encoded short ; topic size @@ -119,8 +121,8 @@ [channel topic partition time max-offsets] (offset-fetch-request channel topic partition time max-offsets) (let [rsp-size (response-size channel)] - (with-buffer (ByteBuffer/allocate rsp-size) - (read-from channel) + (with-buffer (buffer rsp-size) + (read-completely-from channel) (flip) (with-error-code "Fetch-Offsets" (loop [c (get-int) res []] @@ -134,7 +136,7 @@ "Fetch messages request." [channel topic partition offset max-size] (let [size (+ 4 2 2 (count topic) 4 8 4)] - (with-buffer (ByteBuffer/allocate size) + (with-buffer (buffer size) (length-encoded int ; request size (put (short 1)) ; request type (length-encoded short ; topic size @@ -163,7 +165,7 @@ [channel topic partition offset max-size] (message-fetch-request channel topic partition offset max-size) (let [rsp-size (response-size channel)] - (with-buffer (ByteBuffer/allocate rsp-size) + (with-buffer (buffer rsp-size) (read-completely-from channel) (flip) (read-response offset)))) @@ -236,7 +238,7 @@ ; Consumer factory (defn consumer - "Consumer factory." + "Consumer factory. See new-channel for list of supported options." [host port & [opts]] (let [channel (new-channel host port opts)] (reify Consumer diff --git a/clients/clojure/src/kafka/types.clj b/clients/clojure/src/kafka/types.clj index 98e8143..9155d62 100644 --- a/clients/clojure/src/kafka/types.clj +++ b/clients/clojure/src/kafka/types.clj @@ -5,8 +5,8 @@ Message [^bytes message]) (defprotocol Pack - "Pack protocol converts an object into a Message." - (pack [this] "Convert object to a message.")) + "Pack protocol converts an object to a Message." + (pack [this] "Convert object to a Message.")) (defprotocol Unpack "Unpack protocol, reads an object from a Message." @@ -23,6 +23,6 @@ (offsets [this topic partition time max-offsets] "Query offsets. Returns offsets seq.") (consume-seq [this topic partition] - [this topic partition opts] "Creates a lazy sequence over the consumer.") + [this topic partition opts] "Creates a sequence over the consumer.") (close [this] "Close the consumer, socket and channel.")) diff --git a/clients/clojure/test/kafka/buffer_test.clj b/clients/clojure/test/kafka/buffer_test.clj index 29b5ac6..87889f0 100644 --- a/clients/clojure/test/kafka/buffer_test.clj +++ b/clients/clojure/test/kafka/buffer_test.clj @@ -1,10 +1,9 @@ (ns kafka.buffer-test (:use (kafka buffer) - clojure.test) - (:import (java.nio ByteBuffer))) + clojure.test)) (deftest test-put-get - (with-buffer (ByteBuffer/allocate 64) + (with-buffer (buffer 64) (put (byte 5)) (put (short 10)) (put (int 20)) @@ -24,7 +23,7 @@ (is (= (nth ba 2) (byte 3)))))) (deftest test-with-put - (with-buffer (ByteBuffer/allocate 64) + (with-buffer (buffer 64) (with-put 4 count (put "test 1")) (flip) @@ -33,7 +32,7 @@ (is (= (get-string 6) "test 1")))) (deftest test-length-encoded - (with-buffer (ByteBuffer/allocate 64) + (with-buffer (buffer 64) (length-encoded short (put "test 1")) (length-encoded int From 429dc3110cd655e4cd2a0fc12b3a24fe58a2898e Mon Sep 17 00:00:00 2001 From: asmyczek Date: Mon, 28 Feb 2011 16:21:50 -0800 Subject: [PATCH 07/12] Adding license file. --- clients/clojure/LICENSE | 202 ++++++++++++++++++++++++++++++++++++++++ 1 file changed, 202 insertions(+) create mode 100644 clients/clojure/LICENSE diff --git a/clients/clojure/LICENSE b/clients/clojure/LICENSE new file mode 100644 index 0000000..7817b22 --- /dev/null +++ b/clients/clojure/LICENSE @@ -0,0 +1,202 @@ + + Apache License + Version 2.0, January 2004 + http://www.apache.org/licenses/ + +TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION + +1. Definitions. + + "License" shall mean the terms and conditions for use, reproduction, + and distribution as defined by Sections 1 through 9 of this document. + + "Licensor" shall mean the copyright owner or entity authorized by + the copyright owner that is granting the License. + + "Legal Entity" shall mean the union of the acting entity and all + other entities that control, are controlled by, or are under common + control with that entity. For the purposes of this definition, + "control" means (i) the power, direct or indirect, to cause the + direction or management of such entity, whether by contract or + otherwise, or (ii) ownership of fifty percent (50%) or more of the + outstanding shares, or (iii) beneficial ownership of such entity. + + "You" (or "Your") shall mean an individual or Legal Entity + exercising permissions granted by this License. + + "Source" form shall mean the preferred form for making modifications, + including but not limited to software source code, documentation + source, and configuration files. + + "Object" form shall mean any form resulting from mechanical + transformation or translation of a Source form, including but + not limited to compiled object code, generated documentation, + and conversions to other media types. + + "Work" shall mean the work of authorship, whether in Source or + Object form, made available under the License, as indicated by a + copyright notice that is included in or attached to the work + (an example is provided in the Appendix below). + + "Derivative Works" shall mean any work, whether in Source or Object + form, that is based on (or derived from) the Work and for which the + editorial revisions, annotations, elaborations, or other modifications + represent, as a whole, an original work of authorship. For the purposes + of this License, Derivative Works shall not include works that remain + separable from, or merely link (or bind by name) to the interfaces of, + the Work and Derivative Works thereof. + + "Contribution" shall mean any work of authorship, including + the original version of the Work and any modifications or additions + to that Work or Derivative Works thereof, that is intentionally + submitted to Licensor for inclusion in the Work by the copyright owner + or by an individual or Legal Entity authorized to submit on behalf of + the copyright owner. For the purposes of this definition, "submitted" + means any form of electronic, verbal, or written communication sent + to the Licensor or its representatives, including but not limited to + communication on electronic mailing lists, source code control systems, + and issue tracking systems that are managed by, or on behalf of, the + Licensor for the purpose of discussing and improving the Work, but + excluding communication that is conspicuously marked or otherwise + designated in writing by the copyright owner as "Not a Contribution." + + "Contributor" shall mean Licensor and any individual or Legal Entity + on behalf of whom a Contribution has been received by Licensor and + subsequently incorporated within the Work. + +2. Grant of Copyright License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + copyright license to reproduce, prepare Derivative Works of, + publicly display, publicly perform, sublicense, and distribute the + Work and such Derivative Works in Source or Object form. + +3. Grant of Patent License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + (except as stated in this section) patent license to make, have made, + use, offer to sell, sell, import, and otherwise transfer the Work, + where such license applies only to those patent claims licensable + by such Contributor that are necessarily infringed by their + Contribution(s) alone or by combination of their Contribution(s) + with the Work to which such Contribution(s) was submitted. If You + institute patent litigation against any entity (including a + cross-claim or counterclaim in a lawsuit) alleging that the Work + or a Contribution incorporated within the Work constitutes direct + or contributory patent infringement, then any patent licenses + granted to You under this License for that Work shall terminate + as of the date such litigation is filed. + +4. Redistribution. You may reproduce and distribute copies of the + Work or Derivative Works thereof in any medium, with or without + modifications, and in Source or Object form, provided that You + meet the following conditions: + + (a) You must give any other recipients of the Work or + Derivative Works a copy of this License; and + + (b) You must cause any modified files to carry prominent notices + stating that You changed the files; and + + (c) You must retain, in the Source form of any Derivative Works + that You distribute, all copyright, patent, trademark, and + attribution notices from the Source form of the Work, + excluding those notices that do not pertain to any part of + the Derivative Works; and + + (d) If the Work includes a "NOTICE" text file as part of its + distribution, then any Derivative Works that You distribute must + include a readable copy of the attribution notices contained + within such NOTICE file, excluding those notices that do not + pertain to any part of the Derivative Works, in at least one + of the following places: within a NOTICE text file distributed + as part of the Derivative Works; within the Source form or + documentation, if provided along with the Derivative Works; or, + within a display generated by the Derivative Works, if and + wherever such third-party notices normally appear. The contents + of the NOTICE file are for informational purposes only and + do not modify the License. You may add Your own attribution + notices within Derivative Works that You distribute, alongside + or as an addendum to the NOTICE text from the Work, provided + that such additional attribution notices cannot be construed + as modifying the License. + + You may add Your own copyright statement to Your modifications and + may provide additional or different license terms and conditions + for use, reproduction, or distribution of Your modifications, or + for any such Derivative Works as a whole, provided Your use, + reproduction, and distribution of the Work otherwise complies with + the conditions stated in this License. + +5. Submission of Contributions. Unless You explicitly state otherwise, + any Contribution intentionally submitted for inclusion in the Work + by You to the Licensor shall be under the terms and conditions of + this License, without any additional terms or conditions. + Notwithstanding the above, nothing herein shall supersede or modify + the terms of any separate license agreement you may have executed + with Licensor regarding such Contributions. + +6. Trademarks. This License does not grant permission to use the trade + names, trademarks, service marks, or product names of the Licensor, + except as required for reasonable and customary use in describing the + origin of the Work and reproducing the content of the NOTICE file. + +7. Disclaimer of Warranty. Unless required by applicable law or + agreed to in writing, Licensor provides the Work (and each + Contributor provides its Contributions) on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + implied, including, without limitation, any warranties or conditions + of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A + PARTICULAR PURPOSE. You are solely responsible for determining the + appropriateness of using or redistributing the Work and assume any + risks associated with Your exercise of permissions under this License. + +8. Limitation of Liability. In no event and under no legal theory, + whether in tort (including negligence), contract, or otherwise, + unless required by applicable law (such as deliberate and grossly + negligent acts) or agreed to in writing, shall any Contributor be + liable to You for damages, including any direct, indirect, special, + incidental, or consequential damages of any character arising as a + result of this License or out of the use or inability to use the + Work (including but not limited to damages for loss of goodwill, + work stoppage, computer failure or malfunction, or any and all + other commercial damages or losses), even if such Contributor + has been advised of the possibility of such damages. + +9. Accepting Warranty or Additional Liability. While redistributing + the Work or Derivative Works thereof, You may choose to offer, + and charge a fee for, acceptance of support, warranty, indemnity, + or other liability obligations and/or rights consistent with this + License. However, in accepting such obligations, You may act only + on Your own behalf and on Your sole responsibility, not on behalf + of any other Contributor, and only if You agree to indemnify, + defend, and hold each Contributor harmless for any liability + incurred by, or claims asserted against, such Contributor by reason + of your accepting any such warranty or additional liability. + +END OF TERMS AND CONDITIONS + +APPENDIX: How to apply the Apache License to your work. + + To apply the Apache License to your work, attach the following + boilerplate notice, with the fields enclosed by brackets "[]" + replaced with your own identifying information. (Don't include + the brackets!) The text should be enclosed in the appropriate + comment syntax for the file format. We also recommend that a + file or class name and description of purpose be included on the + same "printed page" as the copyright notice for easier + identification within third-party archives. + +Copyright 2011 LinkedIn + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. From 77f92c80861240930394640454eaef7d183e16e7 Mon Sep 17 00:00:00 2001 From: Jun Rao Date: Tue, 8 Mar 2011 16:02:17 -0800 Subject: [PATCH 08/12] make restart faster on a clean shutdown --- bin/kafka-run-class.sh | 7 ++ .../kafka/consumer/FetcherRunnable.scala | 4 +- src/main/scala/kafka/log/Log.scala | 8 +- src/main/scala/kafka/log/LogManager.scala | 7 +- .../scala/kafka/message/FileMessageSet.scala | 41 +++---- .../kafka/server/KafkaRequestHandlers.scala | 2 +- src/main/scala/kafka/server/KafkaServer.scala | 43 ++++--- .../other/kafka/TestLatestLogOffset.scala | 2 +- .../other/kafka/TestLogPerformance.scala | 2 +- .../scala/unit/kafka/log/LogManagerTest.scala | 6 +- src/test/scala/unit/kafka/log/LogTest.scala | 14 +-- .../kafka/server/ServerShutdownTest.scala | 107 ++++++++++++++++++ 12 files changed, 185 insertions(+), 58 deletions(-) create mode 100644 src/test/scala/unit/kafka/server/ServerShutdownTest.scala diff --git a/bin/kafka-run-class.sh b/bin/kafka-run-class.sh index 5d4cda8..8fa2144 100755 --- a/bin/kafka-run-class.sh +++ b/bin/kafka-run-class.sh @@ -20,6 +20,13 @@ do fi done +for file in $base_dir/lib_managed/scala_2.8.0/compile/*.jar; +do + if [ ${file##*/} != "sbt-launch.jar" ]; then + CLASSPATH=$CLASSPATH:$file + fi +done + if [ -z "$KAFKA_OPTS" ]; then KAFKA_OPTS="-Xmx512M -server -Dcom.sun.management.jmxremote -Dlog4j.configuration=file:$base_dir/src/log4j.properties " fi diff --git a/src/main/scala/kafka/consumer/FetcherRunnable.scala b/src/main/scala/kafka/consumer/FetcherRunnable.scala index 3e538a2..8322ba1 100644 --- a/src/main/scala/kafka/consumer/FetcherRunnable.scala +++ b/src/main/scala/kafka/consumer/FetcherRunnable.scala @@ -93,8 +93,10 @@ class FetcherRunnable(val name: String, } if (logger.isTraceEnabled) logger.trace("fetched bytes: " + read) - if(read == 0) + if(read == 0) { + logger.debug("backing off " + config.backoffIncrementMs + " ms") Thread.sleep(config.backoffIncrementMs) + } } } catch { diff --git a/src/main/scala/kafka/log/Log.scala b/src/main/scala/kafka/log/Log.scala index a0ea40b..c3e2317 100644 --- a/src/main/scala/kafka/log/Log.scala +++ b/src/main/scala/kafka/log/Log.scala @@ -95,7 +95,7 @@ class LogSegment(val file: File, val messageSet: FileMessageSet, val start: Long * An append-only log for storing messages. */ @threadsafe -class Log(val dir: File, val maxSize: Long, val flushInterval: Int) { +class Log(val dir: File, val maxSize: Long, val flushInterval: Int, val needRecovery: Boolean) { private val logger = Logger.getLogger(classOf[Log]) @@ -147,11 +147,11 @@ class Log(val dir: File, val maxSize: Long, val flushInterval: Int) { }) validateSegments(accum) - // run recovery on the final section and make it mutable + //make the final section mutable and run recovery on it if necessary val last = accum.remove(accum.size - 1) last.messageSet.close() - logger.info("Loading the last segment in mutable mode and running recover on " + last.file.getAbsolutePath()) - val mutable = new LogSegment(last.file, new FileMessageSet(last.file, true, new AtomicBoolean(true)), last.start) + logger.info("Loading the last segment " + last.file.getAbsolutePath() + " in mutable mode, recovery " + needRecovery) + val mutable = new LogSegment(last.file, new FileMessageSet(last.file, true, new AtomicBoolean(needRecovery)), last.start) accum.add(mutable) } new SegmentList(accum.toArray(new Array[LogSegment](accum.size))) diff --git a/src/main/scala/kafka/log/LogManager.scala b/src/main/scala/kafka/log/LogManager.scala index 5bcc564..06b39bb 100644 --- a/src/main/scala/kafka/log/LogManager.scala +++ b/src/main/scala/kafka/log/LogManager.scala @@ -33,7 +33,8 @@ class LogManager(val config: KafkaConfig, private val scheduler: KafkaScheduler, private val time: Time, val logCleanupIntervalMs: Long, - val logCleanupMinAgeMs: Long) { + val logCleanupMinAgeMs: Long, + needRecovery: Boolean) { val logDir: File = new File(config.logDir) private val numPartitions = config.numPartitions @@ -64,7 +65,7 @@ class LogManager(val config: KafkaConfig, logger.warn("Skipping unexplainable file '" + dir.getAbsolutePath() + "'--should it be there?") } else { logger.info("Loading log '" + dir.getName() + "'") - val log = new Log(dir, maxSize, flushInterval) + val log = new Log(dir, maxSize, flushInterval, needRecovery) val topicPartion = Utils.getTopicPartition(dir.getName) logs.putIfNotExists(topicPartion._1, new Pool[Int, Log]()) val parts = logs.get(topicPartion._1) @@ -142,7 +143,7 @@ class LogManager(val config: KafkaConfig, logCreationLock synchronized { val d = new File(logDir, topic + "-" + partition) d.mkdirs() - new Log(d, maxSize, flushInterval) + new Log(d, maxSize, flushInterval, false) } } diff --git a/src/main/scala/kafka/message/FileMessageSet.scala b/src/main/scala/kafka/message/FileMessageSet.scala index 565793b..312c237 100644 --- a/src/main/scala/kafka/message/FileMessageSet.scala +++ b/src/main/scala/kafka/message/FileMessageSet.scala @@ -34,10 +34,10 @@ import kafka.utils._ */ @nonthreadsafe class FileMessageSet private[message](private[message] val channel: FileChannel, - private[message] val offset: Long, - private[message] val limit: Long, - val mutable: Boolean, - val needRecover: AtomicBoolean) extends MessageSet { + private[message] val offset: Long, + private[message] val limit: Long, + val mutable: Boolean, + val needRecover: AtomicBoolean) extends MessageSet { private val setSize = new AtomicLong() private val setHighWaterMark = new AtomicLong() @@ -46,11 +46,19 @@ class FileMessageSet private[message](private[message] val channel: FileChannel, if(mutable) { if(limit < Long.MaxValue || offset > 0) throw new IllegalArgumentException("Attempt to open a mutable message set with a view or offset, which is not allowed.") - // set the file position to the end of the file for appending messages - val startMs = System.currentTimeMillis - val truncated = recover() - logger.info("Recovery succeeded in " + (System.currentTimeMillis - startMs) / 1000 + - " seconds. " + truncated + " bytes truncated.") + + if (needRecover.get) { + // set the file position to the end of the file for appending messages + val startMs = System.currentTimeMillis + val truncated = recover() + logger.info("Recovery succeeded in " + (System.currentTimeMillis - startMs) / 1000 + + " seconds. " + truncated + " bytes truncated.") + } + else { + setSize.set(channel.size()) + setHighWaterMark.set(sizeInBytes) + channel.position(channel.size) + } } else { setSize.set(scala.math.min(channel.size(), limit) - offset) setHighWaterMark.set(sizeInBytes) @@ -129,21 +137,6 @@ class FileMessageSet private[message](private[message] val channel: FileChannel, } } - /** - * The last valid message that stored in Kafka, this method should only be called over recover - */ - def lastMessage(): Message = { - if(!needRecover.get()) { - var message: Message = null - val iter = iterator - while (iter.hasNext) - message = iter.next() - message - } - else - throw new IllegalStateException("recover needs to be performed on the fileset") - } - /** * The number of bytes taken up by this file set */ diff --git a/src/main/scala/kafka/server/KafkaRequestHandlers.scala b/src/main/scala/kafka/server/KafkaRequestHandlers.scala index ae149bb..6955c3b 100644 --- a/src/main/scala/kafka/server/KafkaRequestHandlers.scala +++ b/src/main/scala/kafka/server/KafkaRequestHandlers.scala @@ -60,7 +60,7 @@ class KafkaRequestHandlers(val logManager: LogManager) { } catch { case e => - logger.error("erorr processing ProduceRequst on " + request.topic + ":" + partition + " " + e + Utils.stackTrace(e)) + logger.error("error processing ProduceRequst on " + request.topic + ":" + partition + " " + e + Utils.stackTrace(e)) e match { case _: IOException => logger.error("force shutdown due to " + e) diff --git a/src/main/scala/kafka/server/KafkaServer.scala b/src/main/scala/kafka/server/KafkaServer.scala index c12d99e..ae09f04 100644 --- a/src/main/scala/kafka/server/KafkaServer.scala +++ b/src/main/scala/kafka/server/KafkaServer.scala @@ -23,31 +23,38 @@ import java.util.concurrent.CountDownLatch import java.util.concurrent.atomic.AtomicBoolean import kafka.utils.{Utils, SystemTime, KafkaScheduler} import kafka.network.{SocketServerStats, SocketServer} +import java.io.File class KafkaServer(val config: KafkaConfig) { - + val CLEAN_SHUTDOWN_FILE = ".cleanshutdown" val isStarted = new AtomicBoolean(false) private val logger = Logger.getLogger(classOf[KafkaServer]) private val shutdownLatch = new CountDownLatch(1) private val statsMBeanName = "kafka:type=kafka.SocketServerStats" - @BeanProperty var socketServer: SocketServer = null - @BeanProperty val scheduler = new KafkaScheduler(1, "kafka-logcleaner-", false) - private val logManager: LogManager = new LogManager(config, - scheduler, - SystemTime, - 1000 * 60 * config.logCleanupIntervalMinutes, - 1000 * 60 * 60 * config.logRetentionHours) + private var logManager: LogManager = null def startup() { try { logger.info("Starting Kafka server...") - + var needRecovery = true + val cleanShutDownFile = new File(new File(config.logDir), CLEAN_SHUTDOWN_FILE) + if (cleanShutDownFile.exists) { + needRecovery = false + cleanShutDownFile.delete + } + logManager = new LogManager(config, + scheduler, + SystemTime, + 1000 * 60 * config.logCleanupIntervalMinutes, + 1000 * 60 * 60 * config.logRetentionHours, + needRecovery) + val handlers = new KafkaRequestHandlers(logManager) socketServer = new SocketServer(config.port, config.numThreads, @@ -72,10 +79,20 @@ class KafkaServer(val config: KafkaConfig) { def shutdown() { logger.info("Shutting down...") - scheduler.shutdown - socketServer.shutdown() - Utils.swallow(logger.warn, Utils.unregisterMBean(statsMBeanName)) - logManager.close() + try { + scheduler.shutdown + socketServer.shutdown() + Utils.swallow(logger.warn, Utils.unregisterMBean(statsMBeanName)) + logManager.close() + + val cleanShutDownFile = new File(new File(config.logDir), CLEAN_SHUTDOWN_FILE) + cleanShutDownFile.createNewFile + } + catch { + case e => + logger.fatal(e) + logger.fatal(Utils.stackTrace(e)) + } shutdownLatch.countDown() logger.info("shut down completed") } diff --git a/src/test/scala/other/kafka/TestLatestLogOffset.scala b/src/test/scala/other/kafka/TestLatestLogOffset.scala index 74ec7df..34224ba 100644 --- a/src/test/scala/other/kafka/TestLatestLogOffset.scala +++ b/src/test/scala/other/kafka/TestLatestLogOffset.scala @@ -35,7 +35,7 @@ object TestLatestLogOffset { val partition = args(2).toInt val topicDir = logDirName + "/" + topic + "-" + partition - val log = new Log(new File(topicDir), 50*1024*1024, 5000000) + val log = new Log(new File(topicDir), 50*1024*1024, 5000000, false) val offsets = log.getOffsetsBefore(new OffsetRequest(topic, partition, OffsetRequest.LATEST_TIME, 1)) println("# of latest offsets returned " + offsets.length) diff --git a/src/test/scala/other/kafka/TestLogPerformance.scala b/src/test/scala/other/kafka/TestLogPerformance.scala index 3538afa..af6bff6 100644 --- a/src/test/scala/other/kafka/TestLogPerformance.scala +++ b/src/test/scala/other/kafka/TestLogPerformance.scala @@ -29,7 +29,7 @@ object TestLogPerformance { val messageSize = args(1).toInt val batchSize = args(2).toInt val dir = TestUtils.tempDir() - val log = new Log(dir, 50*1024*1024, 5000000) + val log = new Log(dir, 50*1024*1024, 5000000, false) val bytes = new Array[Byte](messageSize) new java.util.Random().nextBytes(bytes) val message = new Message(bytes) diff --git a/src/test/scala/unit/kafka/log/LogManagerTest.scala b/src/test/scala/unit/kafka/log/LogManagerTest.scala index e469504..8b129a6 100644 --- a/src/test/scala/unit/kafka/log/LogManagerTest.scala +++ b/src/test/scala/unit/kafka/log/LogManagerTest.scala @@ -41,7 +41,7 @@ class LogManagerTest extends JUnitSuite { override val logFileSize = 1024 override val enableZookeeper = false } - logManager = new LogManager(config, null, time, -1, maxLogAge) + logManager = new LogManager(config, null, time, -1, maxLogAge, false) logManager.startup logDir = logManager.logDir } @@ -95,7 +95,7 @@ class LogManagerTest extends JUnitSuite { override val flushInterval = Int.MaxValue override val flushIntervalMap = Utils.getTopicFlushIntervals("timebasedflush:100") } - logManager = new LogManager(config, null, time, -1, maxLogAge) + logManager = new LogManager(config, null, time, -1, maxLogAge, false) logManager.startup val log = logManager.getOrCreateLog("timebasedflush", 0) for(i <- 0 until 200) { @@ -118,7 +118,7 @@ class LogManagerTest extends JUnitSuite { override val topicPartitionsMap = Utils.getTopicPartitions("testPartition:2") } - logManager = new LogManager(config, null, time, -1, maxLogAge) + logManager = new LogManager(config, null, time, -1, maxLogAge, false) logManager.startup for(i <- 0 until 2) { diff --git a/src/test/scala/unit/kafka/log/LogTest.scala b/src/test/scala/unit/kafka/log/LogTest.scala index 33b8707..2c0ac24 100644 --- a/src/test/scala/unit/kafka/log/LogTest.scala +++ b/src/test/scala/unit/kafka/log/LogTest.scala @@ -49,14 +49,14 @@ class LogTest extends JUnitSuite { @Test def testLoadEmptyLog() { createEmptyLogs(logDir, 0) - new Log(logDir, 1024, 1000) + new Log(logDir, 1024, 1000, false) } @Test def testLoadInvalidLogsFails() { createEmptyLogs(logDir, 0, 15) try { - new Log(logDir, 1024, 1000) + new Log(logDir, 1024, 1000, false) fail("Allowed load of corrupt logs without complaint.") } catch { case e: IllegalStateException => "This is good" @@ -65,7 +65,7 @@ class LogTest extends JUnitSuite { @Test def testAppendAndRead() { - val log = new Log(logDir, 1024, 1000) + val log = new Log(logDir, 1024, 1000, false) val message = new Message(Integer.toString(42).getBytes()) for(i <- 0 until 10) log.append(new ByteBufferMessageSet(message)) @@ -82,7 +82,7 @@ class LogTest extends JUnitSuite { @Test def testReadOutOfRange() { createEmptyLogs(logDir, 1024) - val log = new Log(logDir, 1024, 1000) + val log = new Log(logDir, 1024, 1000, false) assertEquals("Reading just beyond end of log should produce 0 byte read.", 0L, log.read(1024, 1000).sizeInBytes) try { log.read(0, 1024) @@ -102,7 +102,7 @@ class LogTest extends JUnitSuite { @Test def testLogRolls() { /* create a multipart log with 100 messages */ - val log = new Log(logDir, 100, 1000) + val log = new Log(logDir, 100, 1000, false) val numMessages = 100 for(i <- 0 until numMessages) log.append(TestUtils.singleMessageSet(Integer.toString(i).getBytes())) @@ -158,7 +158,7 @@ class LogTest extends JUnitSuite { def testEdgeLogRolls() { { // first test a log segment starting at 0 - val log = new Log(logDir, 100, 1000) + val log = new Log(logDir, 100, 1000, false) val curOffset = log.nextAppendOffset assertEquals(curOffset, 0) @@ -171,7 +171,7 @@ class LogTest extends JUnitSuite { { // second test an empty log segment starting at none-zero - val log = new Log(logDir, 100, 1000) + val log = new Log(logDir, 100, 1000, false) val numMessages = 1 for(i <- 0 until numMessages) log.append(TestUtils.singleMessageSet(Integer.toString(i).getBytes())) diff --git a/src/test/scala/unit/kafka/server/ServerShutdownTest.scala b/src/test/scala/unit/kafka/server/ServerShutdownTest.scala new file mode 100644 index 0000000..ed1b33c --- /dev/null +++ b/src/test/scala/unit/kafka/server/ServerShutdownTest.scala @@ -0,0 +1,107 @@ +/* + * Copyright 2010 LinkedIn + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package kafka.server + +import org.scalatest.junit.JUnitSuite +import org.junit.Test +import junit.framework.Assert._ +import kafka.TestUtils +import java.io.File +import kafka.utils.Utils +import kafka.message.{Message, ByteBufferMessageSet} +import kafka.api.FetchRequest +import kafka.integration.ProducerConsumerTestHarness +import kafka.producer.SimpleProducer +import kafka.consumer.SimpleConsumer + +class ServerShutdownTest extends JUnitSuite { + val port = 9999 + + @Test + def testCleanShutdown() { + val props = TestUtils.createBrokerConfig(0, port) + val config = new KafkaConfig(props) { + override val enableZookeeper = false + } + val host = "localhost" + val topic = "test" + val sent1 = new ByteBufferMessageSet(new Message("hello".getBytes()), new Message("there".getBytes())) + val sent2 = new ByteBufferMessageSet(new Message("more".getBytes()), new Message("messages".getBytes())) + + { + val producer = new SimpleProducer(host, + port, + 64*1024, + 100000, + 10000) + val consumer = new SimpleConsumer(host, + port, + 1000000, + 64*1024) + + val server = new KafkaServer(config) + server.startup() + + // send some messages + producer.send(topic, sent1) + sent1.buffer.rewind + + // do a clean shutdown + server.shutdown() + val cleanShutDownFile = new File(new File(config.logDir), server.CLEAN_SHUTDOWN_FILE) + assertTrue(cleanShutDownFile.exists) + } + + + { + val producer = new SimpleProducer(host, + port, + 64*1024, + 100000, + 10000) + val consumer = new SimpleConsumer(host, + port, + 1000000, + 64*1024) + + val server = new KafkaServer(config) + server.startup() + + // bring the server back again and read the messages + var fetched: ByteBufferMessageSet = null + while(fetched == null || fetched.validBytes == 0) + fetched = consumer.fetch(new FetchRequest(topic, 0, 0, 10000)) + TestUtils.checkEquals(sent1.iterator, fetched.iterator) + val newOffset = fetched.validBytes + + // send some more messages + producer.send(topic, sent2) + sent2.buffer.rewind + + Thread.sleep(200) + + fetched = null + while(fetched == null || fetched.validBytes == 0) + fetched = consumer.fetch(new FetchRequest(topic, 0, newOffset, 10000)) + TestUtils.checkEquals(sent2.iterator, fetched.iterator) + + server.shutdown() + Utils.rm(server.config.logDir) + } + + } + +} \ No newline at end of file From 5f9df4820d91427693290e4f061d2e0cdbb4c171 Mon Sep 17 00:00:00 2001 From: Jun Rao Date: Tue, 8 Mar 2011 16:19:33 -0800 Subject: [PATCH 09/12] overflow of long values in LogManager; via Nagesh Susarla --- src/main/scala/kafka/server/KafkaServer.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/main/scala/kafka/server/KafkaServer.scala b/src/main/scala/kafka/server/KafkaServer.scala index ae09f04..68a8b9e 100644 --- a/src/main/scala/kafka/server/KafkaServer.scala +++ b/src/main/scala/kafka/server/KafkaServer.scala @@ -51,8 +51,8 @@ class KafkaServer(val config: KafkaConfig) { logManager = new LogManager(config, scheduler, SystemTime, - 1000 * 60 * config.logCleanupIntervalMinutes, - 1000 * 60 * 60 * config.logRetentionHours, + 1000L * 60 * config.logCleanupIntervalMinutes, + 1000L * 60 * 60 * config.logRetentionHours, needRecovery) val handlers = new KafkaRequestHandlers(logManager) From bcc1cb737524bb8c862b71e6daed525ba42d31aa Mon Sep 17 00:00:00 2001 From: Jun Rao Date: Tue, 8 Mar 2011 16:29:51 -0800 Subject: [PATCH 10/12] ProducerPerformance set wrong flag for variable record size --- src/main/scala/kafka/tools/ProducerPerformance.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/main/scala/kafka/tools/ProducerPerformance.scala b/src/main/scala/kafka/tools/ProducerPerformance.scala index ad2b1cb..dc20a14 100644 --- a/src/main/scala/kafka/tools/ProducerPerformance.scala +++ b/src/main/scala/kafka/tools/ProducerPerformance.scala @@ -84,7 +84,7 @@ object ProducerPerformance { val url = new URI(options.valueOf(urlOpt)) val numMessages = options.valueOf(numMessagesOpt).intValue val messageSize = options.valueOf(messageSizeOpt).intValue - var isFixSize = options.has(varyMessageSizeOpt) + var isFixSize = !options.has(varyMessageSizeOpt) val batchSize = options.valueOf(batchSizeOpt).intValue val numThreads = options.valueOf(numThreadsOpt).intValue val topic = options.valueOf(topicOpt) From 3e5b22b4466a6241b11b79d1e1d6d0a135f4e47c Mon Sep 17 00:00:00 2001 From: Jun Rao Date: Tue, 8 Mar 2011 16:37:37 -0800 Subject: [PATCH 11/12] better message for InvalidMessageSizeException --- src/main/scala/kafka/message/ByteBufferMessageSet.scala | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/main/scala/kafka/message/ByteBufferMessageSet.scala b/src/main/scala/kafka/message/ByteBufferMessageSet.scala index 22f0b86..a7466f3 100644 --- a/src/main/scala/kafka/message/ByteBufferMessageSet.scala +++ b/src/main/scala/kafka/message/ByteBufferMessageSet.scala @@ -84,7 +84,8 @@ class ByteBufferMessageSet(val buffer: ByteBuffer, val errorCOde: Int) extends M if(iter.remaining < size) { validByteCount = currValidBytes if (currValidBytes == 0) - throw new InvalidMessageSizeException("invalid message size:" + size + " only received bytes:" + iter.remaining) + throw new InvalidMessageSizeException("invalid message size:" + size + " only received bytes:" + iter.remaining + + " possible causes (1) a single message larger than the fetch size; (2) log corruption") return allDone() } currValidBytes += 4 + size From 53cab083e7780dc3842eeff3b843041b761ead88 Mon Sep 17 00:00:00 2001 From: Jun Rao Date: Tue, 8 Mar 2011 16:48:08 -0800 Subject: [PATCH 12/12] change clean shutdown file name --- src/main/scala/kafka/server/KafkaServer.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/main/scala/kafka/server/KafkaServer.scala b/src/main/scala/kafka/server/KafkaServer.scala index 68a8b9e..7cecaea 100644 --- a/src/main/scala/kafka/server/KafkaServer.scala +++ b/src/main/scala/kafka/server/KafkaServer.scala @@ -26,7 +26,7 @@ import kafka.network.{SocketServerStats, SocketServer} import java.io.File class KafkaServer(val config: KafkaConfig) { - val CLEAN_SHUTDOWN_FILE = ".cleanshutdown" + val CLEAN_SHUTDOWN_FILE = ".kafka_cleanshutdown" val isStarted = new AtomicBoolean(false) private val logger = Logger.getLogger(classOf[KafkaServer])