Skip to content

Commit

Permalink
lib: move chan.cora out from cml.cora
Browse files Browse the repository at this point in the history
CML is a general protocol, while the concept like channel implements
on top of it. Later we can add net / io also on top it. So it's better
to have separate libs
  • Loading branch information
tiancaiamao committed Oct 11, 2024
1 parent ea4e7f1 commit 89bd384
Show file tree
Hide file tree
Showing 3 changed files with 72 additions and 62 deletions.
59 changes: 59 additions & 0 deletions lib/chan.cora
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
(@import "cora/lib/cml" cml)
(@import "cora/lib/queue" queue)

;; channel is a sendq and recvq
;; sendq represent the coroutines blocked sending msg to the channel.
;; recvq represent the coroutines blocked recving msg from the channel.
(defun .make ()
[(queue.make) (queue.make)])

(func .recv-try
[sendq recvq] => (if (queue.empty? sendq)
false
(match (queue.dequeue sendq)
[state resume val wrap]
(if (= 'Done (value state))
(.recv-try [sendq recvq]) ;; ignore stale item and continue
(begin
(cml.spawn (lambda () (resume (wrap ()))))
(set state 'Done)
val)))))

(defun .recv-block (recvq k wrap)
(let state (gensym 'state)
(begin
(set state 'Wait)
(queue.enqueue recvq [state k wrap]))))

;; recv on channel is an operation
(defun .recv (ch)
(let try (lambda () (.recv-try ch))
block (lambda (k wrap) (.recv-block (cadr ch) k wrap))
[try block default-wrap]))

(func .send-try
[sendq recvq] val => (if (queue.empty? recvq)
false
(match (queue.dequeue recvq)
[state resume wrap]
(if (= 'Done (value state))
(.send-try [sendq recvq]) ;; ignore stale
(begin
(cml.spawn (lambda ()
(begin
;; (io.display "resume recv op\n")
(resume (wrap val)))))
(set state 'Done)
())))))

(defun .send-block (sendq k val wrap)
(let state (gensym 'state)
(begin
(set state 'Wait)
(queue.enqueue sendq [state k val wrap]))))

;; send on channel is an operation
(defun .send (ch val)
(let try (lambda () (.send-try ch val))
block (lambda (k wrap) (.send-block (car ch) k val wrap))
[try block default-wrap]))
62 changes: 6 additions & 56 deletions lib/cml.cora
Original file line number Diff line number Diff line change
@@ -1,3 +1,9 @@
;; concurrent ml
;; ref https://wingolog.org/archives/2018/05/16/lightweight-concurrency-in-lua
;; https://wingolog.org/archives/2017/06/29/a-new-concurrent-ml
;; "Toward a parallel implementation of Concurrent ML" John Reppy
;; "Parallel Concurrent ML" John Reppy

(@import "cora/lib/queue" queue)
(@import "cora/lib/rand" rand)
;; (@import "cora/lib/io" io)
Expand Down Expand Up @@ -31,65 +37,9 @@
(queue.enqueue .task-queue (lambda () (k ())))
(.main)))))

;; channel is a sendq and recvq
;; sendq represent the coroutines blocked sending msg to the channel.
;; recvq represent the coroutines blocked recving msg from the channel.
(defun .make-chan ()
[(queue.make) (queue.make)])

(defun default-wrap (x) x)

(func .recv-try
[sendq recvq] => (if (queue.empty? sendq)
false
(match (queue.dequeue sendq)
[state resume val wrap]
(if (= 'Done (value state))
(.recv-try [sendq recvq]) ;; ignore stale item and continue
(begin
(queue.enqueue .task-queue (lambda () (resume (wrap ()))))
(set state 'Done)
val)))))

(defun .recv-block (recvq k wrap)
(let state (gensym 'state)
(begin
(set state 'Wait)
(queue.enqueue recvq [state k wrap]))))

;; recv on channel is an operation
(defun .recv (ch)
(let try (lambda () (.recv-try ch))
block (lambda (k wrap) (.recv-block (cadr ch) k wrap))
[try block default-wrap]))

(func .send-try
[sendq recvq] val => (if (queue.empty? recvq)
false
(match (queue.dequeue recvq)
[state resume wrap]
(if (= 'Done (value state))
(.send-try [sendq recvq]) ;; ignore stale
(begin
(queue.enqueue .task-queue (lambda ()
(begin
;; (io.display "resume recv op\n")
(resume (wrap val)))))
(set state 'Done)
())))))

(defun .send-block (sendq k val wrap)
(let state (gensym 'state)
(begin
(set state 'Wait)
(queue.enqueue sendq [state k val wrap]))))

;; send on channel is an operation
(defun .send (ch val)
(let try (lambda () (.send-try ch val))
block (lambda (k wrap) (.send-block (car ch) k val wrap))
[try block default-wrap]))

;; wrap an operation to get another operation.
;; fn receive the value of the operation.
(func .wrap
Expand Down
13 changes: 7 additions & 6 deletions test/benchmark/ping-pong.cora
Original file line number Diff line number Diff line change
@@ -1,21 +1,22 @@
(@import "cora/lib/cml" cml)
(@import "cora/lib/chan" chan)
(@import "cora/lib/io" io)

(let s2c (cml.make-chan)
c2s (cml.make-chan)
(let s2c (chan.make)
c2s (chan.make)

(begin

(defun server ()
(begin
(cml.perform
(choice
(cml.wrap (cml.recv c2s) (lambda (v)
(cml.wrap (chan.recv c2s) (lambda (v)
(begin
(io.display "server recv:")
(io.display v)
(io.display "\n"))))
(cml.wrap (cml.send s2c "pong") (lambda (v)
(cml.wrap (chan.send s2c "pong") (lambda (v)
(io.display "server send pong\n"))))
)
(server)))
Expand All @@ -24,9 +25,9 @@
(begin
(cml.perform
(choice
(cml.wrap (cml.send c2s "ping") (lambda (v)
(cml.wrap (chan.send c2s "ping") (lambda (v)
(io.display "client send ping\n")))
(cml.wrap (cml.recv s2c) (lambda (v) (begin
(cml.wrap (chan.recv s2c) (lambda (v) (begin
(io.display "client recv:")
(io.display v)
(io.display "\n")
Expand Down

0 comments on commit 89bd384

Please sign in to comment.