Skip to content

Commit

Permalink
WIP: streams/part-time-simple.
Browse files Browse the repository at this point in the history
Intended for wall-clock timed reducers which should *not* use
periodically-until-expired. No tests yet; I need to write tests and have
it take over for throttle, rollup, and friends.
  • Loading branch information
aphyr committed Apr 2, 2013
1 parent 65f9654 commit 7a297aa
Showing 1 changed file with 47 additions and 0 deletions.
47 changes: 47 additions & 0 deletions src/riemann/streams.clj
Original file line number Diff line number Diff line change
Expand Up @@ -482,6 +482,53 @@ OA
(setup)
(recur event))))))

(defn part-time-simple
"Divides wall clock time into discrete windows. Returns a stream. Whenever
events arrive in a given window, calls (create) to generate some initial
state for that window, and uses (add state event) to combine events with that
state. At the end of the window, calls (finish state).
Does not treat expired events differently; ticks will still be scheduled even
if an expired event arrives. When no events arrive in a given window, does
nothing. (create) may be invoked partway through a window, and should be
idempotent, as it will be run inside of (swap!).
Concurrency guarantees:
(create) may be called multiple times for a given time slice.
(add) when called, will receive exactly one distinct bucket in each time
slice.
(finish) will be called *exactly once* for each time slice."
[dt create add finish]
(let [anchor (unix-time)
; Whether or not the next tick has been scheduled, and the current
; window.
state (atom [nil nil])

; Called every dt seconds to flush the window.
tick (fn tick []
(let [last-window (atom nil)]
; Reset the state to nil.
(swap! state (fn [[_ window]]
(reset! last-window window)
[nil nil]))

; And finalize the last window
(finish @last-window)))]

(fn stream [event]
(let [[scheduled _] (swap! state (fn [[scheduled window]]
(if (nil? scheduled)
; We're the first ones here.
[:first (add (create) event)]

; Some other thread has already
; scheduled
[:done (add window event)])))]
(when (= :first scheduled)
; We were the first thread to update this window.
(once! (next-tick anchor dt) tick))))))

(defn fold-interval
"Applies the folder function to all event-key values of events during
interval seconds."
Expand Down

0 comments on commit 7a297aa

Please sign in to comment.