diff --git a/resources/migrations/000_migrations.yaml b/resources/migrations/000_migrations.yaml index 8de30336e5264..0335e6267f758 100644 --- a/resources/migrations/000_migrations.yaml +++ b/resources/migrations/000_migrations.yaml @@ -3927,3 +3927,16 @@ databaseChangeLog: type: text constraints: nullable: false + - changeSet: + - id: 68 + - author: sbelak + - changes: + - addColumn: + tableName: computation_job + columns: + - column: + name: context + type: text + - column: + name: ended_at + type: DATETIME \ No newline at end of file diff --git a/src/metabase/api/x_ray.clj b/src/metabase/api/x_ray.clj index 2b3e2329c06c6..c4f3e5983c273 100644 --- a/src/metabase/api/x_ray.clj +++ b/src/metabase/api/x_ray.clj @@ -37,19 +37,28 @@ (defn- x-ray [max-cost model] (api/check-403 (costs/enable-xrays)) - {:job-id (async/compute - #(fe/x-ray (fe/extract-features {:max-cost max-cost} model)))}) + {:job-id (async/with-async + [model model + opts {:max-cost max-cost}] + (fe/x-ray (fe/extract-features opts model)))}) (defn- compare [max-cost model1 model2] (api/check-403 (costs/enable-xrays)) - {:job-id (async/compute - #(fe/x-ray (fe/compare-features {:max-cost max-cost} model1 model2)))}) + {:job-id (async/with-async + [model1 model1 + model2 model2 + opts {:max-cost max-cost}] + (fe/x-ray (fe/compare-features opts model1 model2)))}) (defn- compare-filtered-field [max-cost model1 model2 field] - {:job-id (async/compute - #(let [{:keys [comparison constituents]} (compare max-cost model1 model2)] + {:job-id (async/with-async + [model1 model1 + model2 model2 + field field + opts {:max-cost max-cost}] + (let [{:keys [comparison constituents]} (compare opts model1 model2)] {:comparison (-> comparison (get field)) :top-contributors (-> comparison (get field) :top-contributors) :constituents constituents}))}) diff --git a/src/metabase/feature_extraction/async.clj b/src/metabase/feature_extraction/async.clj index c83ccf6b01084..46d9185a1385b 100644 --- a/src/metabase/feature_extraction/async.clj +++ b/src/metabase/feature_extraction/async.clj @@ -1,9 +1,12 @@ (ns metabase.feature-extraction.async - (:require [clojure.tools.logging :as log] + (:require [cheshire.core :as json] + [clojure.tools.logging :as log] [metabase.api.common :as api] + [metabase.public-settings :as public-settings] [metabase.models [computation-job :refer [ComputationJob]] [computation-job-result :refer [ComputationJobResult]]] + [metabase.util :as u] [toucan.db :as db])) (defonce ^:private running-jobs (atom {})) @@ -23,7 +26,9 @@ :job_id id :permanence :temporary :payload payload) - (db/update! ComputationJob id :status :done)) + (db/update! ComputationJob id + :status :done + :ended_at (u/new-sql-timestamp))) (swap! running-jobs dissoc id) (log/info (format "Async job %s done." id))) @@ -36,7 +41,9 @@ :job_id id :permanence :temporary :payload error) - (db/update! ComputationJob id :status :error))) + (db/update! ComputationJob id + :status :error + :ended_at (u/new-sql-timestamp)))) (swap! running-jobs dissoc id)) (defn cancel @@ -45,24 +52,67 @@ (when (running? job) (future-cancel (@running-jobs id)) (swap! running-jobs dissoc id) - (db/update! ComputationJob id :status :canceled))) + (db/update! ComputationJob id :status :canceled) + (log/info (format "Async job %s canceled." id)))) + +(defn- time-delta-seconds + [^java.util.Date a ^java.util.Date b] + (Math/round (/ (- (.getTime b) (.getTime a)) 1000.0))) + +(defn- fresh? + "Is the cached job still fresh? + + Uses the same logic as `metabase.api.card`." + [{:keys [created_at ended_at]}] + (let [duration (time-delta-seconds created_at ended_at) + ttl (* duration (public-settings/query-caching-ttl-ratio)) + age (time-delta-seconds ended_at (java.util.Date.))] + (<= age ttl))) + +(defn- cached-job + [ctx] + (when (public-settings/enable-query-caching) + (let [job (db/select-one ComputationJob + :context (json/encode ctx) + :status [:not= "error"] + {:order-by [[:ended_at :desc]]})] + (when (some-> job fresh?) + job)))) (defn compute - "Compute closure `f` asynchronously. Returns id of the associated computation - job." - [f] - (let [job (db/insert! ComputationJob - :creator_id api/*current-user-id* - :status :running - :type :simple-job) - id (:id job)] - (log/info (format "Async job %s started." id)) - (swap! running-jobs assoc id (future - (try - (save-result job (f)) - (catch Throwable e - (save-error job e))))) - id)) + "Compute closure `f` in context `ctx` asynchronously. Returns id of the + associated computation job. + + Will return cached result if query caching is enabled and a job with identical + context has successfully run within TTL." + [ctx f] + (or (-> ctx cached-job :id) + (let [{:keys [id] :as job} (db/insert! ComputationJob + :creator_id api/*current-user-id* + :status :running + :type :simple-job + :context ctx)] + (log/info (format "Async job %s started." id)) + (swap! running-jobs assoc id (future + (try + (save-result job (f)) + (catch Throwable e + (save-error job e))))) + id))) + +(defmacro with-async + "Asynchronously evaluate expressions in lexial contexet of `bindings`. + + Note: when caching is enabled `bindings` (both their shape and values) are + used to determine cache hits and should be used for all parameters that + disambiguate the call." + [bindings & body] + (let [binding-vars (vec (take-nth 2 bindings))] + `(let ~bindings + (compute {:source (quote ~body) + :bindings (quote ~bindings) + :closure (zipmap (quote ~binding-vars) ~binding-vars)} + (fn [] ~@body))))) (defn result "Get result of an asynchronous computation job." diff --git a/src/metabase/models/computation_job.clj b/src/metabase/models/computation_job.clj index c3728ff9ef902..7c065ca3c616d 100644 --- a/src/metabase/models/computation_job.clj +++ b/src/metabase/models/computation_job.clj @@ -13,10 +13,11 @@ (u/strict-extend (class ComputationJob) models/IModel (merge models/IModelDefaults - {:types (constantly {:status :keyword - :type :keyword}) + {:types (constantly {:status :keyword + :type :keyword + :context :json}) :properties (constantly {:timestamped? true})}) i/IObjectPermissions (merge i/IObjectPermissionsDefaults - {:can-read? creator? + {:can-read? (constantly true) :can-write? creator?})) diff --git a/test/metabase/feature_extraction/async_test.clj b/test/metabase/feature_extraction/async_test.clj index 35d41054a8a0f..6d2969b8ad6f1 100644 --- a/test/metabase/feature_extraction/async_test.clj +++ b/test/metabase/feature_extraction/async_test.clj @@ -5,13 +5,13 @@ (expect true - (let [job-id (compute (constantly 1))] + (let [job-id (compute (gensym) (constantly 1))] (Thread/sleep 100) (done? (ComputationJob job-id)))) (expect [true false false] - (let [job-id (compute #(loop [] (Thread/sleep 100) (recur)))] + (let [job-id (compute (gensym) #(loop [] (Thread/sleep 100) (recur)))] (Thread/sleep 100) (let [r? (running? (ComputationJob job-id))] (cancel (ComputationJob job-id)) @@ -20,14 +20,14 @@ (expect {:status :done :result 1} - (let [job-id (compute (constantly 1))] + (let [job-id (compute (gensym) (constantly 1))] (Thread/sleep 100) (select-keys (result (ComputationJob job-id)) [:status :result]))) (expect [:error "foo"] - (let [job-id (compute #(throw (Throwable. "foo")))] + (let [job-id (compute (gensym) #(throw (Throwable. "foo")))] (Thread/sleep 100) (let [job (ComputationJob job-id)] [(:status job)