Skip to content

Commit

Permalink
Re-model LSP JSON-RPC with core.async channels (clojure-lsp#1110)
Browse files Browse the repository at this point in the history
This gives clients a uniform interface to conform to. They model
messages as Clojure hashmaps, and put those messages to and take them
from a pair of channels.

This cleans up the tests slightly, but is actually prep for
clojure-lsp/lsp4clj#8. We'll be able to use
this code for both client and server communication. With tools to
convert stdio to channels, we get a few benefits.

1. It will be easier to implement socket communication
   clojure-lsp/lsp4clj#1 as a complement to
   stdio communication. We'll need to write the code that converts
   socket i/o to channels, but the client (or actually server) won't
   change.
2. It will be easier to write mocks for tests. The tests (even unit
   tests) can read from and write to a server's channels to make
   assertions about how it responds without having to understand the
   wire level of the LSP JSON-RPC protocol.
3. There will be a natural mechanism for concurrency. Servers can choose
   to read several messages from a channel, distributing each to a pool
   of workers.
  • Loading branch information
mainej authored Jul 10, 2022
1 parent 59212fa commit b46dcda
Show file tree
Hide file tree
Showing 3 changed files with 139 additions and 86 deletions.
126 changes: 41 additions & 85 deletions cli/integration-test/integration/client.clj
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
(ns integration.client
(:require
[cheshire.core :as json]
[clojure.java.io :as io]
[clojure.string :as string]
[integration.helper :as h])
[clojure.core.async :as async]
[integration.helper :as h]
[integration.lsp-json-rpc :as lsp-json-rpc])
(:import
[java.time LocalDateTime]
[java.time.format DateTimeFormatter]))
Expand All @@ -26,9 +25,6 @@
(defn ^:private colored [color string]
(str (get colors color) string (:reset colors)))

(defn ^:private content-length [json]
(+ 1 (.length json)))

(def ^:private ld-formatter DateTimeFormatter/ISO_LOCAL_DATE_TIME)
(defn ^:private local-datetime-str [] (.format ld-formatter (LocalDateTime/now)))

Expand All @@ -49,93 +45,54 @@
([{:keys [client-id]} color msg params]
(println (local-datetime-str)
(colored color (str "Client " client-id " " msg))
(colored :yellow params))))

(def ^:private wire-lock (Object.))

(defn wire-send [server-in params]
(let [content (json/generate-string params)]
(binding [*out* server-in]
(locking wire-lock
(println (str "Content-Length: " (content-length content)))
(println "")
(println content)
(flush)))))

(defn ^:private lsp-rpc [{:keys [request-id]} method params]
{:jsonrpc "2.0"
:method method
:params params
:id (swap! request-id inc)})

(defn read-n-chars [^java.io.Reader input content-length]
(let [cs (char-array content-length)]
(loop [total-read 0]
(when (< total-read content-length)
(let [new-read (.read input cs total-read (- content-length total-read))]
(when (< new-read 0)
(throw (ex-info "no content" {})))
(recur (+ total-read new-read)))))
(String. cs)))

(defn read-content-length [input]
(binding [*in* input]
(when-let [line (read-line)] ;; returns nil when input is closed, i.e. server output has closed
(let [[h v] (string/split line #":")]
(when-not (= h "Content-Length")
(throw (ex-info "unexpected header" {:line line})))
(parse-long (string/trim v))))))

(defn ^:private listen! [server-out client]
(try
(loop []
;; Block, waiting for next Content-Length line, then parse the number of
;; characters specified as JSON-RPC. If the server output stream is
;; closed, also close the client by exiting this loop.
(if-let [content-length (read-content-length server-out)]
;; NOTE: this doesn't attempt to handle Content-Type header
(let [content-length (+ 2 content-length) ;; include \r\n before message
content (read-n-chars server-out content-length)
{:keys [id method] :as json} (cheshire.core/parse-string content true)]
(colored :yellow params))
(flush)))

(defn ^:private listen!
"Read JSON-RPC messages (Clojure hashmaps) off the message channel, parse them
as requests, responses or notifcations, and send them to the client. Returns a
channel which will close when the messages channel closes."
[messages client]
(async/go-loop []
(if-let [{:keys [id method] :as json} (async/<! messages)]
(do
(try
(cond
(and id method) (receive-request client json)
id (receive-response client json)
:else (receive-notification client json))
(recur))
(do
(log client :white "listener closed:" "server closed")
(flush))))
(catch Throwable e
(log client :red "listener closed:" "exception")
(println e)
(throw e))))

(defrecord Client [client-id
server-in server-out
listener
request-id sent-requests
received-requests received-notifications
mock-responses]
(catch Throwable e
(log client :red "listener closed:" "exception receiving")
(println e)
(throw e)))
(recur))
(log client :white "listener closed:" "server closed"))))

(defrecord TestClient [client-id
sender receiver
request-id sent-requests
received-requests received-notifications
mock-responses]
IClient
(start [this]
(reset! listener (future (listen! server-out this))))
(swap! receiver listen! this))
(shutdown [_this] ;; simulate client closing
(.close server-in))
(exit [_this] ;; wait for shutdown of server to propagate to listener
@@listener)
(async/close! sender))
(exit [_this] ;; wait for shutdown of server to propagate to receiver
(async/<!! @receiver))
(send-request [this method body]
(let [req (lsp-rpc this method body)
(let [req (lsp-json-rpc/json-rpc-message (swap! request-id inc) method body)
p (promise)]
(log this :cyan "sending request:" req)
;; Important: record request before sending it, so it is sure to be
;; available during receive-response.
(swap! sent-requests assoc (:id req) p)
(wire-send server-in req)
(async/>!! sender req)
p))
(send-notification [this method body]
(let [notif (lsp-rpc this method body)]
(let [notif (lsp-json-rpc/json-rpc-message method body)]
(log this :blue "sending notification:" notif)
(wire-send server-in notif)))
(async/>!! sender notif)))
(receive-response [this {:keys [id] :as resp}]
(if-let [request (get @sent-requests id)]
(do (log this :green "received reponse:" resp)
Expand All @@ -155,7 +112,7 @@
(let [resp {:id id
:result mock-resp}]
(log this :magenta "sending mock response:" resp)
(wire-send server-in resp))))
(async/>!! sender resp))))
(receive-notification [this notif]
(log this :blue "received notification:" notif)
(swap! received-notifications conj notif))
Expand All @@ -165,17 +122,16 @@

(defonce client-id (atom 0))

(defn client [server-in server-out]
(map->Client
(defn stdio-client [server-in server-out]
(map->TestClient
{:client-id (swap! client-id inc)
:server-in server-in
:server-out server-out
:sender (lsp-json-rpc/buffered-writer->sender-chan server-in)
:receiver (atom (lsp-json-rpc/buffered-reader->receiver-chan server-out))
:request-id (atom 0)
:sent-requests (atom {})
:received-requests (atom [])
:received-notifications (atom [])
:mock-responses (atom {})
:listener (atom nil)}))
:mock-responses (atom {})}))

(defn ^:private keyname [key] (str (namespace key) "/" (name key)))

Expand Down
2 changes: 1 addition & 1 deletion cli/integration-test/integration/lsp.clj
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

(defn start-process! []
(let [server (start-server (first *command-line-args*))
client (client/client (io/writer (:in server)) (io/reader (:out server)))]
client (client/stdio-client (io/writer (:in server)) (io/reader (:out server)))]
(client/start client)
(alter-var-root #'*clojure-lsp-process* (constantly server))
(alter-var-root #'*mock-client* (constantly client))))
Expand Down
97 changes: 97 additions & 0 deletions cli/integration-test/integration/lsp_json_rpc.clj
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
(ns integration.lsp-json-rpc
"Models LSP JSON-RPC as core.async channels of messages (Clojure hashmaps).
https://microsoft.github.io/language-server-protocol/specifications/lsp/3.17/specification/#baseProtocol"
(:require
[cheshire.core :as json]
[clojure.core.async :as async]
[clojure.string :as string]))

(set! *warn-on-reflection* true)

(defn ^:private read-n-chars [^java.io.Reader reader content-length]
(let [cs (char-array content-length)]
(loop [total-read 0]
(when (< total-read content-length)
;; FIXME: this is buggy. It reads `content-length` chars, but
;; `content-length` is specified in bytes. It works as long as the
;; integration tests are all in ASCII, but would break if they weren't.
;; See https://github.com/mainej/lsp4clj/blob/lsp2clj/server/src/lsp4clj/json_rpc.clj
;; for a correct implementation. The best way to fix this is probably to
;; wait for that to be merged and then use those helpers.
(let [new-read (.read reader cs total-read (- content-length total-read))]
(when (< new-read 0)
;; TODO: return nil instead?
(throw (java.io.EOFException.)))
(recur (+ total-read new-read)))))
(String. cs)))

(defn ^:private parse-header [line headers]
(let [[h v] (string/split line #":\s*" 2)]
(when-not (contains? #{"Content-Length" "Content-Type"} h)
(throw (ex-info "unexpected header" {:line line})))
(assoc headers h v)))

(defn ^:private read-message [reader headers]
(let [content-length (parse-long (get headers "Content-Length"))
;; TODO: handle content-type
content (read-n-chars reader content-length)]
(json/parse-string content true)))

(defn ^:private write-message [msg]
(let [content (json/generate-string msg)]
;; FIXME: this is buggy. It sets `Content-Length` to the number of chars in
;; the content, but `Content-Length` should be the number of bytes (encoded
;; as UTF-8). The fix is the same as for `read-n-chars`: we should use the
;; helpers provided by lsp4clj.
(print (str "Content-Length: " (.length content) "\r\n"
"\r\n"
content))
(flush)))

(defn ^:private read-line-async
"Reads a line of input asynchronously. Returns a channel which will yield the
line when it is ready, or nil if the input has closed. Returns immediately.
Avoids blocking by reading in a separate thread."
[^java.io.BufferedReader input]
;; we are agnostic about \r\n or \n because readLine is too
(async/thread (.readLine input)))

(defn buffered-reader->receiver-chan
"Returns a channel which will yield parsed messages that have been read off
the reader. When the reader is closed, closes the channel."
[^java.io.BufferedReader reader]
(let [msgs (async/chan 1)]
(async/go-loop [headers {}]
(if-let [line (async/<! (read-line-async reader))]
(if (string/blank? line) ;; a blank line after the headers indicate start of message
(do (async/>! msgs (read-message reader headers))
(recur {}))
(recur (parse-header line headers)))
;; input closed; also close channel
(async/close! msgs)))
msgs))

(defn buffered-writer->sender-chan
"Returns a channel which expects to have messages put on it. nil values are
not allowed. Serializes and writes the messages to the writer. When the
channel is closed, closes the writer."
[^java.io.BufferedWriter writer]
(let [messages (async/chan 1)]
(binding [*out* writer]
(async/go-loop []
(if-let [msg (async/<! messages)]
(do
(write-message msg)
(recur))
;; channel closed; also close writer
(.close writer))))
messages))

(defn json-rpc-message
([method params] ;; notification
{:jsonrpc "2.0"
:method method
:params params})
([id method params] ;; request
(assoc (json-rpc-message method params) :id id)))

0 comments on commit b46dcda

Please sign in to comment.