Skip to content

Commit

Permalink
Merge pull request metabase#4988 from metabase/streaming-response
Browse files Browse the repository at this point in the history
Streaming response
  • Loading branch information
thearthur authored May 11, 2017
2 parents 97246a9 + f21449a commit 2222f41
Show file tree
Hide file tree
Showing 7 changed files with 193 additions and 10 deletions.
3 changes: 2 additions & 1 deletion project.clj
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,8 @@
[postgresql "9.3-1102.jdbc41"] ; Postgres driver
[io.crate/crate-jdbc "2.1.6"] ; Crate JDBC driver
[prismatic/schema "1.1.5"] ; Data schema declaration and validation library
[ring/ring-jetty-adapter "1.5.1"] ; Ring adapter using Jetty webserver (used to run a Ring server for unit tests)
[ring/ring-core "1.6.0"]
[ring/ring-jetty-adapter "1.6.0"] ; Ring adapter using Jetty webserver (used to run a Ring server for unit tests)
[ring/ring-json "0.4.0"] ; Ring middleware for reading/writing JSON automatically
[stencil "0.5.0"] ; Mustache templates for Clojure
[toucan "1.0.3" ; Model layer, hydration, and DB utilities
Expand Down
6 changes: 4 additions & 2 deletions src/metabase/api/card.clj
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,15 @@
[compojure.core :refer [DELETE GET POST PUT]]
[metabase
[events :as events]
[middleware :as middleware]
[public-settings :as public-settings]
[query-processor :as qp]
[util :as u]]
[metabase.api
[common :as api]
[dataset :as dataset-api]
[label :as label-api]]
[metabase.api.common.internal :refer [route-fn-name]]
[metabase.models
[card :as card :refer [Card]]
[card-favorite :refer [CardFavorite]]
Expand Down Expand Up @@ -467,5 +469,5 @@
(api/check-embedding-enabled)
(db/select [Card :name :id], :enable_embedding true, :archived false))


(api/define-routes)
(api/define-routes
(middleware/streaming-json-response (route-fn-name 'POST "/:card-id/query")))
2 changes: 1 addition & 1 deletion src/metabase/api/common.clj
Original file line number Diff line number Diff line change
Expand Up @@ -264,7 +264,7 @@
(s/replace #"^metabase\." "")
(s/replace #"\." "/"))
(u/pprint-to-str (concat api-routes additional-routes))))
~@api-routes ~@additional-routes)))
~@additional-routes ~@api-routes)))


;;; ------------------------------------------------------------ PERMISSIONS CHECKING HELPER FNS ------------------------------------------------------------
Expand Down
6 changes: 4 additions & 2 deletions src/metabase/api/dataset.clj
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,11 @@
[compojure.core :refer [POST]]
[dk.ative.docjure.spreadsheet :as spreadsheet]
[metabase
[middleware :as middleware]
[query-processor :as qp]
[util :as u]]
[metabase.api.common :as api]
[metabase.api.common.internal :refer [route-fn-name]]
[metabase.models
[database :refer [Database]]
[query :as query]]
Expand Down Expand Up @@ -124,5 +126,5 @@
(qp/dataset-query (dissoc query :constraints)
{:executed-by api/*current-user-id*, :context (export-format->context export-format)}))))


(api/define-routes)
(api/define-routes
(middleware/streaming-json-response (route-fn-name 'POST "/")))
2 changes: 1 addition & 1 deletion src/metabase/core.clj
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@

(def ^:private app
"The primary entry point to the Ring HTTP server."
(-> routes/routes
(-> #'routes/routes ; the #' is to allow tests to redefine endpoints
mb-middleware/log-api-call
mb-middleware/add-security-headers ; Add HTTP headers to API responses to prevent them from being cached
(wrap-json-body ; extracts json POST body and makes it avaliable on request
Expand Down
83 changes: 81 additions & 2 deletions src/metabase/middleware.clj
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
(ns metabase.middleware
"Metabase-specific middleware functions & configuration."
(:require [cheshire.generate :refer [add-encoder encode-nil encode-str]]
(:require [cheshire
[core :as json]
[generate :refer [add-encoder encode-nil encode-str]]]
[clojure.core.async :as async]
[clojure.java.io :as io]
[clojure.tools.logging :as log]
[metabase
[config :as config]
Expand All @@ -15,10 +19,13 @@
[setting :refer [defsetting]]
[user :as user :refer [User]]]
monger.json
[ring.core.protocols :as protocols]
[ring.util.response :as response]
[toucan
[db :as db]
[models :as models]])
(:import com.fasterxml.jackson.core.JsonGenerator))
(:import com.fasterxml.jackson.core.JsonGenerator
java.io.OutputStream))

;;; # ------------------------------------------------------------ UTIL FNS ------------------------------------------------------------

Expand Down Expand Up @@ -354,3 +361,75 @@
(handler request))
(catch Throwable e
{:status 400, :body (.getMessage e)}))))

;;; ------------------------------------------------------------ EXCEPTION HANDLING ------------------------------------------------------------

(def ^:private ^:const streaming-response-keep-alive-interval-ms
"Interval between sending newline characters to keep Heroku from terminating
requests like queries that take a long time to complete."
(* 1 1000))

;; Handle ring response maps that contain a core.async chan in the :body key:
;;
;; {:status 200
;; :body (async/chan)}
;;
;; and send each string sent to that queue back to the browser as it arrives
;; this avoids output buffering in the default stream handling which was not sending
;; any responses until ~5k characters where in the queue.
(extend-protocol protocols/StreamableResponseBody
clojure.core.async.impl.channels.ManyToManyChannel
(write-body-to-stream [output-queue _ ^OutputStream output-stream]
(log/debug (u/format-color 'green "starting streaming request"))
(with-open [out (io/writer output-stream)]
(loop [chunk (async/<!! output-queue)]
(when-not (= chunk ::EOF)
(.write out (str chunk))
(try
(.flush out)
(catch org.eclipse.jetty.io.EofException e
(log/info (u/format-color 'yellow "connection closed, canceling request %s" (type e)))
(async/close! output-queue)
(throw e)))
(recur (async/<!! output-queue)))))))

(defn streaming-json-response
"This midelware assumes handlers fail early or return success
Run the handler in a future and send newlines to keep the connection open
and help detect when the browser is no longer listening for the response.
Waits for one second to see if the handler responds immediately, If it does
then there is no need to stream the response and it is sent back directly.
In cases where it takes longer than a second, assume the eventual result will
be a success and start sending newlines to keep the connection open."
[handler]
(fn [request]
(let [response (future (handler request))
optimistic-response (deref response streaming-response-keep-alive-interval-ms ::no-immediate-response)]
(if (= optimistic-response ::no-immediate-response)
;; if we didn't get a normal response in the first poling interval assume it's going to be slow
;; and start sending keepalive packets.
(let [output (async/chan 1)]
;; the output channel will be closed by the adapter when the incoming connection is closed.
(future
(loop []
(Thread/sleep streaming-response-keep-alive-interval-ms)
(when-not (realized? response)
(log/debug (u/format-color 'blue "Response not ready, writing one byte & sleeping..."))
;; a newline padding character is used because it forces output flushing in jetty.
;; if sending this character fails because the connection is closed, the chan will then close.
;; Newlines are no-ops when reading JSON which this depends upon.
(when-not (async/>!! output "\n")
(log/info (u/format-color 'yellow "canceled request %s" (future-cancel response)))
(future-cancel response)) ;; try our best to kill the thread running the query.
(recur))))
(future
(try
;; This is the part where we make this assume it's a JSON response we are sending.
(async/>!! output (json/encode (:body @response)))
(finally
(async/>!! output ::EOF)
(async/close! response))))
;; here we assume a successful response will be written to the output channel.
(assoc (response/response output)
:content-type "applicaton/json"))
optimistic-response))))
101 changes: 100 additions & 1 deletion test/metabase/middleware_test.clj
Original file line number Diff line number Diff line change
@@ -1,13 +1,20 @@
(ns metabase.middleware-test
(:require [cheshire.core :as json]
[clojure.core.async :as async]
[clojure.java.io :as io]
[clojure.tools.logging :as log]
[compojure.core :refer [GET]]
[expectations :refer :all]
[metabase
[middleware :refer :all]
[config :as config]
[middleware :as middleware :refer :all]
[routes :as routes]
[util :as u]]
[metabase.api.common :refer [*current-user* *current-user-id*]]
[metabase.models.session :refer [Session]]
[metabase.test.data.users :refer :all]
[ring.mock.request :as mock]
[ring.util.response :as resp]
[toucan.db :as db]))

;; =========================== TEST wrap-session-id middleware ===========================
Expand Down Expand Up @@ -176,3 +183,95 @@
(expect "{\"my-bytes\":\"0xC42360D7\"}"
(json/generate-string {:my-bytes (byte-array [196 35 96 215 8 106 108 248 183 215 244 143 17 160 53 186
213 30 116 25 87 31 123 172 207 108 47 107 191 215 76 92])}))
;;; stuff here

(defn- streaming-fast-success [_]
(resp/response {:success true}))

(defn- streaming-fast-failure [_]
(throw (Exception. "immediate failure")))

(defn- streaming-slow-success [_]
(Thread/sleep 7000)
(resp/response {:success true}))

(defn- streaming-slow-failure [_]
(Thread/sleep 7000)
(throw (Exception. "delayed failure")))

(defn- test-streaming-endpoint [handler]
(let [path (str handler)]
(with-redefs [metabase.routes/routes (compojure.core/routes
(GET (str "/" path) [] (middleware/streaming-json-response
handler)))]
(let [connection (async/chan 1000)
reader (io/input-stream (str "http://localhost:" (config/config-int :mb-jetty-port) "/" path))]
(async/go-loop [next-char (.read reader)]
(if (pos? next-char)
(do
(async/>! connection (char next-char))
(recur (.read reader)))
(async/close! connection)))
(let [_ (Thread/sleep 1500)
first-second (async/poll! connection)
_ (Thread/sleep 1000)
second-second (async/poll! connection)
eventually (apply str (async/<!! (async/into [] connection)))]
[first-second second-second eventually])))))


;;slow success
(expect
[\newline \newline "\n\n\n{\"success\":true}"]
(test-streaming-endpoint streaming-slow-success))

;; immediate success should have no padding
(expect
[\{ \" "success\":true}"]
(test-streaming-endpoint streaming-fast-success))

;; we know delayed failures (exception thrown) will just drop the connection
(expect
[\newline \newline "\n\n\n"]
(test-streaming-endpoint streaming-slow-failure))

;; immediate failures (where an exception is thown will return a 500
(expect
#"Server returned HTTP response code: 500 for URL:.*"
(try
(test-streaming-endpoint streaming-fast-failure)
(catch java.io.IOException e
(.getMessage e))))

;; test that handler is killed when connection closes
(def test-slow-handler-state (atom :unset))

(defn- test-slow-handler [_]
(log/debug (u/format-color 'yellow "starting test-slow-handler"))
(Thread/sleep 7000) ;; this is somewhat long to make sure the keepalive polling has time to kill it.
(reset! test-slow-handler-state :ran-to-compleation)
(log/debug (u/format-color 'yellow "finished test-slow-handler"))
(resp/response {:success true}))

(defn- start-and-maybe-kill-test-request [kill?]
(reset! test-slow-handler-state :initial-state)
(let [path "test-slow-handler"]
(with-redefs [metabase.routes/routes (compojure.core/routes
(GET (str "/" path) [] (middleware/streaming-json-response
test-slow-handler)))]
(let [reader (io/input-stream (str "http://localhost:" (config/config-int :mb-jetty-port) "/" path))]
(Thread/sleep 1500)
(when kill?
(.close reader))
(Thread/sleep 10000)))) ;; this is long enough to ensure that the handler has run to completion if it was not killed.
@test-slow-handler-state)

;; In this first test we will close the connection before the test handler gets to change the state
(expect
:initial-state
(start-and-maybe-kill-test-request true))

;; and to make sure this test actually works, run the same test again and let it change the state.
(expect
:ran-to-compleation
(start-and-maybe-kill-test-request false))

0 comments on commit 2222f41

Please sign in to comment.