-
Notifications
You must be signed in to change notification settings - Fork 5
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
This implements a way to launch MapReduce jobs. The job description is forced to be application/json, and likewise, the response will be in JSON too. A dev-dependency on data.json was added to make the test case nicer, and to showcase how to use the map/reduce functionality. Signed-off-by: Gergely Nagy <[email protected]>
- Loading branch information
Showing
4 changed files
with
115 additions
and
1 deletion.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,19 @@ | ||
(ns kria.map-reduce | ||
(:require | ||
[kria.conversions :refer [byte-string? | ||
byte-string<-utf8-string]] | ||
[kria.core :refer [call]] | ||
[kria.pb.map-reduce :refer [RbpMapRedReq->bytes bytes->RbpMapRedResp]])) | ||
|
||
(set! *warn-on-reflection* true) | ||
|
||
(defn mapreduce | ||
"Launch a map/reduce job (json request/response)." | ||
[asc request cb stream-cb] | ||
{:pre [(byte-string? request)]} | ||
|
||
(call asc cb :map-red-req :map-red-resp | ||
RbpMapRedReq->bytes bytes->RbpMapRedResp | ||
{:request request | ||
:content-type (byte-string<-utf8-string "application/json")} | ||
true :response :done stream-cb)) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,46 @@ | ||
(ns kria.pb.map-reduce | ||
(:require | ||
[kria.conversions | ||
:refer [byte-string<-utf8-string | ||
utf8-string<-byte-string]]) | ||
(:import | ||
[com.basho.riak.protobuf | ||
RiakKvPB$RpbMapRedReq | ||
RiakKvPB$RpbMapRedResp])) | ||
|
||
(set! *warn-on-reflection* true) | ||
|
||
(defrecord RbpMapRedReq | ||
[request ; required bytes | ||
content-type ; required bytes | ||
]) | ||
|
||
(defn ^RiakKvPB$RpbMapRedReq RbpMapRedReq->pb | ||
[m] | ||
(let [b (RiakKvPB$RpbMapRedReq/newBuilder)] | ||
(let [x (:request m)] | ||
(.setRequest b x)) | ||
(let [x (:content-type m)] | ||
(.setContentType b x)) | ||
(.build b))) | ||
|
||
(defn RbpMapRedReq->bytes | ||
[m] | ||
(.toByteArray (RbpMapRedReq->pb m))) | ||
|
||
(defrecord RbpMapRedResp | ||
[phase ; optional uint32 | ||
response ; optional bytes | ||
done ; optional bool | ||
]) | ||
|
||
(defn ^RiakKvPB$RpbMapRedResp pb->RbpMapRedResp | ||
[^RiakKvPB$RpbMapRedResp pb] | ||
(->RbpMapRedResp | ||
(.getPhase pb) | ||
(.getResponse pb) | ||
(.getDone pb))) | ||
|
||
(defn bytes->RbpMapRedResp | ||
[^bytes x] | ||
(pb->RbpMapRedResp (RiakKvPB$RpbMapRedResp/parseFrom x))) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,48 @@ | ||
(ns kria.map-reduce-test | ||
(:require [clojure.test :refer :all] | ||
[kria.test-helpers :as h] | ||
[kria.client :as c] | ||
[kria.map-reduce :as mr] | ||
[kria.object :as o] | ||
[kria.conversions :refer [byte-string<-utf8-string | ||
utf8-string<-byte-string]] | ||
[clojure.data.json :as json])) | ||
|
||
(deftest mr-test | ||
(testing "simple bucket summing MapReduce job" | ||
(let [conn (h/connect) | ||
b (h/rand-bucket) | ||
job (json/write-str | ||
{:inputs (utf8-string<-byte-string b) | ||
:query [{:map {:arg nil | ||
:name "Riak.mapValuesJson" | ||
:language "javascript" | ||
:keep false}} | ||
{:reduce {:arg nil | ||
:name "Riak.reduceSum" | ||
:language "javascript" | ||
:keep true}}]}) | ||
result (promise) | ||
stream (atom []) | ||
stream-cb (fn [xs] | ||
(if (and xs | ||
(not (zero? (.size xs)))) | ||
(swap! stream conj xs) | ||
(deliver result @stream))) | ||
result-cb (fn [asc e a] (or a e))] | ||
(doseq [n (range 10)] | ||
(let [p (promise)] | ||
(o/put conn b | ||
(byte-string<-utf8-string (format "K-%d" n)) | ||
{:value (byte-string<-utf8-string (format "%d" n))} | ||
{} | ||
(fn [_ _ a] (deliver p a))) | ||
@p)) | ||
|
||
(mr/mapreduce conn (byte-string<-utf8-string job) | ||
result-cb stream-cb) | ||
(is (= (-> @result | ||
first | ||
utf8-string<-byte-string | ||
json/read-str) | ||
[(apply + (range 10))]))))) |