Skip to content

Commit

Permalink
initial socket servers
Browse files Browse the repository at this point in the history
  • Loading branch information
stylewarning committed Feb 26, 2024
1 parent ab7012c commit 70ae9ab
Show file tree
Hide file tree
Showing 9 changed files with 384 additions and 1 deletion.
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1 +1,4 @@
*.fasl
.DS_Store
hypergeometrica-worker
hypergeometrica-manager
16 changes: 16 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
.PHONY: all
all: hypergeometrica-manager hypergeometrica-worker

# FIXME: this seems to depend on quicklisp being loaded, probably so
# ASDF can know about the paths.
hypergeometrica-manager:
sbcl --non-interactive --eval '(asdf:make "hypergeometrica-manager")'
mv src-manager/hypergeometrica-manager .

hypergeometrica-worker:
sbcl --non-interactive --eval '(asdf:make "hypergeometrica-worker")'
mv src-worker/hypergeometrica-worker .

.PHONY: clean
clean:
rm -f hypergeometrica-manager hypergeometrica-worker
34 changes: 34 additions & 0 deletions hypergeometrica-manager.asd
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
;;;; hypergeometrica-manager.asd
;;;;
;;;; Copyright (c) 2024 Robert Smith

(asdf:defsystem #:hypergeometrica-manager
:description "Manager process for running large Hypergeometrica calculations."
:author "Robert Smith <[email protected]>"
:license "BSD 3-clause (See LICENSE.txt)"
:depends-on (#:clingon #:uiop #:bordeaux-threads #:sb-bsd-sockets)
; :in-order-to ((asdf:test-op (asdf:test-op #:hypergeometrica-manager/tests)))
:around-compile (lambda (compile)
(let (#+sbcl (sb-ext:*derive-function-types* t))
(funcall compile)))
:pathname "src-manager/"
:serial t
:components ((:file "package")
(:file "main"))
:build-operation "program-op"
:build-pathname "hypergeometrica-manager"
:entry-point "hypergeometrica-manager:main")

(asdf:defsystem #:hypergeometrica-manager/tests
:description "Tests for HYPERGEOMETRICA-MANAGER."
:author "Robert Smith <[email protected]>"
:license "BSD 3-clause (See LICENSE.txt)"
:defsystem-depends-on (#:uiop)
:depends-on (#:hypergeometrica-manager
#:fiasco)
:perform (asdf:test-op (o s)
#+ignore
(uiop:symbol-call '#:hypergeometrica-tests
'#:test-hypergeometrica))
:serial t
:components ())
34 changes: 34 additions & 0 deletions hypergeometrica-worker.asd
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
;;;; hypergeometrica-worker.asd
;;;;
;;;; Copyright (c) 2024 Robert Smith

(asdf:defsystem #:hypergeometrica-worker
:description "Worker process for running large Hypergeometrica calculations."
:author "Robert Smith <[email protected]>"
:license "BSD 3-clause (See LICENSE.txt)"
:depends-on (#:clingon #:uiop #:bordeaux-threads #:sb-bsd-sockets)
; :in-order-to ((asdf:test-op (asdf:test-op #:hypergeometrica-worker/tests)))
:around-compile (lambda (compile)
(let (#+sbcl (sb-ext:*derive-function-types* t))
(funcall compile)))
:pathname "src-worker/"
:serial t
:components ((:file "package")
(:file "main"))
:build-operation "program-op"
:build-pathname "hypergeometrica-worker"
:entry-point "hypergeometrica-worker:main")

(asdf:defsystem #:hypergeometrica-worker/tests
:description "Tests for HYPERGEOMETRICA-WORKER."
:author "Robert Smith <[email protected]>"
:license "BSD 3-clause (See LICENSE.txt)"
:defsystem-depends-on (#:uiop)
:depends-on (#:hypergeometrica-worker
#:fiasco)
:perform (asdf:test-op (o s)
#+ignore
(uiop:symbol-call '#:hypergeometrica-tests
'#:test-hypergeometrica))
:serial t
:components ())
2 changes: 1 addition & 1 deletion hypergeometrica.asd
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
;;;; hypergeometrica.asd
;;;;
;;;; Copyright (c) 2019-2023 Robert Smith
;;;; Copyright (c) 2019-2024 Robert Smith

(asdf:defsystem #:hypergeometrica
:description "Calculate lots of digits of things."
Expand Down
178 changes: 178 additions & 0 deletions src-manager/main.lisp
Original file line number Diff line number Diff line change
@@ -0,0 +1,178 @@
;;;; Copyright (c) 2024 Robert Smith

(in-package #:hypergeometrica-manager)

(sb-ext:defglobal **socket** nil)
(sb-ext:defglobal **socket-node** nil)
(sb-ext:defglobal **socket-thread** nil)

;;; Socket

(defun write-form (stream form)
(prin1 form stream)
(terpri stream)
(finish-output stream))

(defun make-socket-listener ()
(let* ((server **socket**))
(lambda ()
(unwind-protect
(loop
(let* ((client (sb-bsd-sockets:socket-accept server))
(stream (sb-bsd-sockets:socket-make-stream
client
:input t
:output t
:element-type 'character
:buffering :line
:timeout 5))
(message (read stream nil '(:eof))))
(handle-worker-message message stream)
(sb-bsd-sockets:socket-close client)))
(sb-bsd-sockets:socket-close **socket**)
(delete-file **socket-node**)
(setf **socket** nil
**socket-node** nil
**socket-thread** nil)))))

(defun start-socket-thread ()
(when **socket-thread**
(warn "Socket thread already started.")
(bt:destroy-thread **socket-thread**))
(setf **socket-thread** (bt:make-thread
(make-socket-listener)
:name "Hypergeometrica Socket Server")))

;;; Worker Tracking

(defclass worker-status ()
((id :accessor worker-status-id
:initarg :id)
(last-heartbeat :accessor last-heartbeat
:initarg :last-heartbeat)))

(sb-ext:defglobal **max-workers** 1)
(sb-ext:defglobal **workers-lock** (bt:make-lock "**workers**"))
(sb-ext:defglobal **workers** nil)

(defun make-id ()
(sleep 1.5)
(get-universal-time))

(defun check-worker (id)
(bt:with-lock-held (**workers-lock**)
(let ((status (find id **workers** :key #'worker-status-id)))
(cond
(status
(setf (last-heartbeat status) (get-internal-real-time))
id)
(t
(warn "Unknown worker identified as #~D" id)
nil)))))

(defun make-heartbeat-checker (&optional (timeout 10))
(lambda ()
(loop
(sleep timeout)
(bt:with-lock-held (**workers-lock**)
(loop :for status :in **workers**
:if (< timeout (/ (- (get-internal-real-time)
(last-heartbeat status))
internal-time-units-per-second))
:collect status :into evict
:else
:collect status :into renew
:finally (progn
(setf **workers** renew)
(dolist (status evict)
(warn "Evicting ~A due to timeout." (worker-status-id status)))))))))

(sb-ext:defglobal **heartbeat-checker-thread** nil)
(defun start-heartbeat-checker-thread ()
(setf **heartbeat-checker-thread**
(bt:make-thread (make-heartbeat-checker) :name "Heartbeat Checker")))

;;; Worker Message Handling

(defun handle-unknown-message (message)
(warn "Unknown message received: ~A" (prin1-to-string message))
nil)

(defun handle-worker-message (message stream)
(typecase message
(atom
(handle-unknown-message message))
((cons keyword)
(alexandria:destructuring-case message
((:eof)
(warn "Received EOF from client."))
((:join)
(bt:with-lock-held (**workers-lock**)
(cond
((> **max-workers** (length **workers**))
(let ((new-id (make-id)))
(push
(make-instance 'worker-status
:id new-id
:last-heartbeat (get-internal-real-time))
**workers**)
(write-form stream `(:welcome :id ,new-id))))
(t
(write-form stream '(:no-vacancy))))))
((:status)
nil)
((t &rest rest)
(declare (ignore rest))
(handle-unknown-message message))))
(t
(let ((from (car message)))
(when (check-worker from)
(alexandria:destructuring-case (cdr message)
((:ping)
(format t "Ping from client ~D~%" from)
(write-form stream '(:pong))
(finish-output stream))
((:heartbeat)
(format t "Heartbeat from worker #~D~%" from))
((t &rest rest)
(declare (ignore rest))
(handle-unknown-message message))))))))

;;; CLI

(defun cli-options ()
(list
(clingon:make-option
:integer
:required t
:description "maximum number of workers"
:long-name "max-workers"
:key :max-workers)))

(defun cli-command ()
(clingon:make-command
:name "hypergeometrica-manager"
:options (cli-options)
:handler #'cli-handler))

(defun cli-handler (cmd)
(let ((pid (sb-posix:getpid)))
(setf **max-workers** (clingon:getopt cmd ':max-workers)
**socket** (make-instance 'sb-bsd-sockets:local-socket
:type :stream)
**socket-node** (merge-pathnames
(format nil "manager-~D" pid)
"/tmp/"))
(sb-bsd-sockets:socket-bind **socket** (namestring **socket-node**))
(sb-bsd-sockets:socket-listen **socket** 8)
(start-heartbeat-checker-thread)
(start-socket-thread)
(format t "Started socket on: ~A~%" **socket-node**)
(format t "Waiting for socket thread to end.~%")
(finish-output)
(bt:join-thread **socket-thread**)))

(defun main ()
(sb-ext:disable-debugger)
(let ((app (cli-command)))
(clingon:run app)))
5 changes: 5 additions & 0 deletions src-manager/package.lisp
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
;;;; Copyright (c) 2024 Robert Smith

(defpackage #:hypergeometrica-manager
(:use #:cl)
(:export #:main))
108 changes: 108 additions & 0 deletions src-worker/main.lisp
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
;;;; Copyright (c) 2024 Robert Smith

(in-package #:hypergeometrica-worker)

(sb-ext:defglobal **id** nil)
(sb-ext:defglobal **manager-address** nil)
(sb-ext:defglobal **manager-lock** (bt:make-lock))

(defun cli-options ()
(list
(clingon:make-option
:string
:description "Address of open socket."
:short-name #\s
:long-name "socket-address"
:key :socket-address)
))

(defun cli-command ()
(clingon:make-command
:name "hypergeometrica-worker"
:options (cli-options)
:handler #'cli-handler))

(defmacro with-manager-io ((stream) &body body)
(alexandria:with-gensyms (manager)
`(bt:with-lock-held (**manager-lock**)
(let ((,manager (make-instance 'sb-bsd-sockets:local-socket
:type :stream)))
(unwind-protect
(progn
(sb-bsd-sockets:socket-connect ,manager **manager-address**)
(let ((,stream (sb-bsd-sockets:socket-make-stream
,manager
:element-type 'character
:input t
:output t
:buffering ':line)))
,@body))
(when (sb-bsd-sockets:socket-open-p ,manager)
(sb-bsd-sockets:socket-close ,manager)))))))

(defun write-form (stream form)
(prin1 form stream)
(terpri stream)
(finish-output stream))

(defun read-form (stream)
(read stream nil '(:eof)))

;;; Heartbeat

(defun make-heartbeat (id &optional (period 5))
(let ((heartbeat `(,id :heartbeat)))
(lambda ()
(loop
(with-manager-io (stream)
(write-form stream heartbeat)
(finish-output stream))
(sleep period)))))

(sb-ext:defglobal **heartbeat-thread** nil)
(defun start-heartbeat-thread ()
(setf **heartbeat-thread** (bt:make-thread (make-heartbeat **id**)
:name "Heartbeat Thread")))

;;; Request ID

(defun request-id (stream)
(write-form stream '(:join))
(alexandria:destructuring-ecase (read-form stream)
((:welcome &key id)
(format t "Got an ID: ~A~%" id)
id)
((:no-vacancy)
(format t "No vacancy.~%")
nil)))

;;; Main

(defun cli-handler (cmd)
(let ((manager-address (clingon:getopt cmd ':socket-address)))
;; Set the manager address.
(unless (probe-file manager-address)
(error "Manager address ~A not found." manager-address))
(setf **manager-address** manager-address)

;; Get an ID.
(with-manager-io (stream)
(format t "Requesting ID.~%")
(let ((id? (request-id stream)))
(cond
(id?
(setf **id** id?))
(t
(uiop:quit 1)))))

(start-heartbeat-thread)
(with-manager-io (stream)
(format t "#~D connected to socket.~%" **id**)
(write-form stream `(,**id** :ping))
(format t "#~D: received ~S~%" **id** (read-form stream))
(finish-output))
(bt:join-thread **heartbeat-thread**)))

(defun main ()
(let ((app (cli-command)))
(clingon:run app)))
5 changes: 5 additions & 0 deletions src-worker/package.lisp
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
;;;; Copyright (c) 2024 Robert Smith

(defpackage #:hypergeometrica-worker
(:use #:cl)
(:export #:main))

0 comments on commit 70ae9ab

Please sign in to comment.