Skip to content

Commit

Permalink
Merge pull request metabase#6190 from metabase/xray-async
Browse files Browse the repository at this point in the history
Xray async caching
  • Loading branch information
salsakran authored Oct 17, 2017
2 parents a350a5e + 77ac46b commit c73949c
Show file tree
Hide file tree
Showing 5 changed files with 105 additions and 32 deletions.
13 changes: 13 additions & 0 deletions resources/migrations/000_migrations.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3927,3 +3927,16 @@ databaseChangeLog:
type: text
constraints:
nullable: false
- changeSet:
- id: 68
- author: sbelak
- changes:
- addColumn:
tableName: computation_job
columns:
- column:
name: context
type: text
- column:
name: ended_at
type: DATETIME
21 changes: 15 additions & 6 deletions src/metabase/api/x_ray.clj
Original file line number Diff line number Diff line change
Expand Up @@ -37,19 +37,28 @@
(defn- x-ray
[max-cost model]
(api/check-403 (costs/enable-xrays))
{:job-id (async/compute
#(fe/x-ray (fe/extract-features {:max-cost max-cost} model)))})
{:job-id (async/with-async
[model model
opts {:max-cost max-cost}]
(fe/x-ray (fe/extract-features opts model)))})

(defn- compare
[max-cost model1 model2]
(api/check-403 (costs/enable-xrays))
{:job-id (async/compute
#(fe/x-ray (fe/compare-features {:max-cost max-cost} model1 model2)))})
{:job-id (async/with-async
[model1 model1
model2 model2
opts {:max-cost max-cost}]
(fe/x-ray (fe/compare-features opts model1 model2)))})

(defn- compare-filtered-field
[max-cost model1 model2 field]
{:job-id (async/compute
#(let [{:keys [comparison constituents]} (compare max-cost model1 model2)]
{:job-id (async/with-async
[model1 model1
model2 model2
field field
opts {:max-cost max-cost}]
(let [{:keys [comparison constituents]} (compare opts model1 model2)]
{:comparison (-> comparison (get field))
:top-contributors (-> comparison (get field) :top-contributors)
:constituents constituents}))})
Expand Down
88 changes: 69 additions & 19 deletions src/metabase/feature_extraction/async.clj
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
(ns metabase.feature-extraction.async
(:require [clojure.tools.logging :as log]
(:require [cheshire.core :as json]
[clojure.tools.logging :as log]
[metabase.api.common :as api]
[metabase.public-settings :as public-settings]
[metabase.models
[computation-job :refer [ComputationJob]]
[computation-job-result :refer [ComputationJobResult]]]
[metabase.util :as u]
[toucan.db :as db]))

(defonce ^:private running-jobs (atom {}))
Expand All @@ -23,7 +26,9 @@
:job_id id
:permanence :temporary
:payload payload)
(db/update! ComputationJob id :status :done))
(db/update! ComputationJob id
:status :done
:ended_at (u/new-sql-timestamp)))
(swap! running-jobs dissoc id)
(log/info (format "Async job %s done." id)))

Expand All @@ -36,7 +41,9 @@
:job_id id
:permanence :temporary
:payload error)
(db/update! ComputationJob id :status :error)))
(db/update! ComputationJob id
:status :error
:ended_at (u/new-sql-timestamp))))
(swap! running-jobs dissoc id))

(defn cancel
Expand All @@ -45,24 +52,67 @@
(when (running? job)
(future-cancel (@running-jobs id))
(swap! running-jobs dissoc id)
(db/update! ComputationJob id :status :canceled)))
(db/update! ComputationJob id :status :canceled)
(log/info (format "Async job %s canceled." id))))

(defn- time-delta-seconds
[^java.util.Date a ^java.util.Date b]
(Math/round (/ (- (.getTime b) (.getTime a)) 1000.0)))

(defn- fresh?
"Is the cached job still fresh?
Uses the same logic as `metabase.api.card`."
[{:keys [created_at ended_at]}]
(let [duration (time-delta-seconds created_at ended_at)
ttl (* duration (public-settings/query-caching-ttl-ratio))
age (time-delta-seconds ended_at (java.util.Date.))]
(<= age ttl)))

(defn- cached-job
[ctx]
(when (public-settings/enable-query-caching)
(let [job (db/select-one ComputationJob
:context (json/encode ctx)
:status [:not= "error"]
{:order-by [[:ended_at :desc]]})]
(when (some-> job fresh?)
job))))

(defn compute
"Compute closure `f` asynchronously. Returns id of the associated computation
job."
[f]
(let [job (db/insert! ComputationJob
:creator_id api/*current-user-id*
:status :running
:type :simple-job)
id (:id job)]
(log/info (format "Async job %s started." id))
(swap! running-jobs assoc id (future
(try
(save-result job (f))
(catch Throwable e
(save-error job e)))))
id))
"Compute closure `f` in context `ctx` asynchronously. Returns id of the
associated computation job.
Will return cached result if query caching is enabled and a job with identical
context has successfully run within TTL."
[ctx f]
(or (-> ctx cached-job :id)
(let [{:keys [id] :as job} (db/insert! ComputationJob
:creator_id api/*current-user-id*
:status :running
:type :simple-job
:context ctx)]
(log/info (format "Async job %s started." id))
(swap! running-jobs assoc id (future
(try
(save-result job (f))
(catch Throwable e
(save-error job e)))))
id)))

(defmacro with-async
"Asynchronously evaluate expressions in lexial contexet of `bindings`.
Note: when caching is enabled `bindings` (both their shape and values) are
used to determine cache hits and should be used for all parameters that
disambiguate the call."
[bindings & body]
(let [binding-vars (vec (take-nth 2 bindings))]
`(let ~bindings
(compute {:source (quote ~body)
:bindings (quote ~bindings)
:closure (zipmap (quote ~binding-vars) ~binding-vars)}
(fn [] ~@body)))))

(defn result
"Get result of an asynchronous computation job."
Expand Down
7 changes: 4 additions & 3 deletions src/metabase/models/computation_job.clj
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,11 @@
(u/strict-extend (class ComputationJob)
models/IModel
(merge models/IModelDefaults
{:types (constantly {:status :keyword
:type :keyword})
{:types (constantly {:status :keyword
:type :keyword
:context :json})
:properties (constantly {:timestamped? true})})
i/IObjectPermissions
(merge i/IObjectPermissionsDefaults
{:can-read? creator?
{:can-read? (constantly true)
:can-write? creator?}))
8 changes: 4 additions & 4 deletions test/metabase/feature_extraction/async_test.clj
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,13 @@

(expect
true
(let [job-id (compute (constantly 1))]
(let [job-id (compute (gensym) (constantly 1))]
(Thread/sleep 100)
(done? (ComputationJob job-id))))

(expect
[true false false]
(let [job-id (compute #(loop [] (Thread/sleep 100) (recur)))]
(let [job-id (compute (gensym) #(loop [] (Thread/sleep 100) (recur)))]
(Thread/sleep 100)
(let [r? (running? (ComputationJob job-id))]
(cancel (ComputationJob job-id))
Expand All @@ -20,14 +20,14 @@
(expect
{:status :done
:result 1}
(let [job-id (compute (constantly 1))]
(let [job-id (compute (gensym) (constantly 1))]
(Thread/sleep 100)
(select-keys (result (ComputationJob job-id)) [:status :result])))

(expect
[:error
"foo"]
(let [job-id (compute #(throw (Throwable. "foo")))]
(let [job-id (compute (gensym) #(throw (Throwable. "foo")))]
(Thread/sleep 100)
(let [job (ComputationJob job-id)]
[(:status job)
Expand Down

0 comments on commit c73949c

Please sign in to comment.