Skip to content

Commit

Permalink
Lose the *registry* dynamic var
Browse files Browse the repository at this point in the history
  • Loading branch information
Andy Chambers committed Mar 13, 2016
1 parent 68b04fb commit f49bdcb
Show file tree
Hide file tree
Showing 2 changed files with 17 additions and 23 deletions.
34 changes: 16 additions & 18 deletions src/samza_config/serde.clj
Original file line number Diff line number Diff line change
Expand Up @@ -18,23 +18,17 @@
(def registry-cache-size 100)
(def schema-registry-url (System/getenv "SCHEMA_REGISTRY_URL"))

(def ^:dynamic *registry*)

(defn registry [config]
(defn registry-client [config]
(let [registry-url (-> (.get config "confluent.schema.registry.url"))
registry-capacity (or (-> (.get config "confluent.schema.registry.capacity")
edn/read-string)
100)]
(CachedSchemaRegistryClient. registry-url registry-capacity)))

(defn local-registry []
(defn local-registry-client [_config]
(LocalSchemaRegistryClient.))

(defmacro with-schema-registry [registry & body]
`(binding [*registry* ~registry]
~@body))

(defn avro-encoder [config]
(defn avro-encoder [registry config]
(let [schema (let [find-schema (-> (get config "confluent.schema.resolver")
(read-string)
(eval))]
Expand All @@ -47,7 +41,7 @@
(-> (reify Encoder
(toBytes [this object]
(when object
(let [version (.register *registry* (topic schema) schema)
(let [version (.register registry (topic schema) schema)
out (ByteArrayOutputStream.)]
(doto (DataOutputStream. out)
(.writeByte magic)
Expand All @@ -56,13 +50,13 @@
(a/encode schema out object)
(.toByteArray out))))))))

(defn avro-decoder [config]
(defn avro-decoder [registry config]
(reify Decoder
(fromBytes [this bs]
(when-let [buffer (and bs (java.nio.ByteBuffer/wrap bs))]
(if-not (= (.get buffer) magic)
(throw (ex-info "Unknown magic byte" {:bytes bs}))
(let [schema (.getByID *registry* (.getInt buffer))
(let [schema (.getByID @registry (.getInt buffer))
len (- (.limit buffer) 1 id-size)
start (+ (.position buffer)
(.arrayOffset buffer))
Expand All @@ -84,12 +78,16 @@
(defrecord AvroSerdeFactory []
SerdeFactory
(getSerde [this serde-name config]
(reify Serde
(fromBytes [this bytes]
(.fromBytes (avro-decoder config) bytes))

(toBytes [this msg]
(.toBytes (avro-encoder config) msg)))))
(let [registry (let [build-registry (-> (get config "confluent.schema.registry.factory")
(read-string)
(eval))]
(build-registry config))]
(reify Serde
(fromBytes [this bytes]
(.fromBytes (avro-decoder registry config) bytes))

(toBytes [this msg]
(.toBytes (avro-encoder registry config) msg))))))

(defrecord UUIDSerdeFactory []
SerdeFactory
Expand Down
6 changes: 1 addition & 5 deletions src/samza_config/task.clj
Original file line number Diff line number Diff line change
@@ -1,7 +1,4 @@
(ns samza-config.task
(:require
[clojure.string :as str]
[samza-config.serde :refer [with-schema-registry registry]])
(:import [org.apache.samza.task WindowableTask ClosableTask StreamTask])
(:gen-class
:name samza-config.task.Task
Expand Down Expand Up @@ -40,8 +37,7 @@
(try
(let [task (get-task this)]
(when (instance? StreamTask task)
(with-schema-registry (registry (get-config task))
(.process task envelope collector coordinator))))
(.process task envelope collector coordinator)))
(catch Exception e
(handle-exception e (.state this)))))

Expand Down

0 comments on commit f49bdcb

Please sign in to comment.