Skip to content

Commit

Permalink
add lifelines to orphaned worker processes
Browse files Browse the repository at this point in the history
  • Loading branch information
stylewarning committed Feb 26, 2024
1 parent 1e01515 commit c19ef52
Show file tree
Hide file tree
Showing 2 changed files with 105 additions and 20 deletions.
44 changes: 43 additions & 1 deletion src-manager/main.lisp
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,42 @@
(warn "Unknown worker identified as #~D" id)
nil)))))

;;; Lifelines

(defun revive-lifelines (lifelines)
(dolist (lifeline lifelines)
(cond
((probe-file lifeline)
(handler-case
(let ((lifeline-socket (make-instance 'sb-bsd-sockets:local-socket :type :stream)))
(unwind-protect
(progn
(sb-bsd-sockets:socket-connect lifeline-socket lifeline)
(let ((stream (sb-bsd-sockets:socket-make-stream
lifeline-socket
:element-type 'character
:input t
:output t
:buffering ':line)))
(let ((new-id (make-id)))
(push
(make-instance 'worker-status
:id new-id
:last-heartbeat (get-internal-real-time))
**workers**)
(format t "Reviving: lifeline ~A -> ~S~%" lifeline new-id)
(write-form stream `(:revive :id ,new-id :socket ,**socket-node**))
;; TODO: bookkeeping on current computational progress
)))
(sb-bsd-sockets:socket-close lifeline-socket)))
(sb-bsd-sockets:socket-error (c)
(declare (ignore c))
(warn "Error communicating with lifeline socket ~A... skipping" lifeline))))
(t
(warn "Invalid lifeline: ~A" lifeline)))))

;;; Heartbeat

(defun make-heartbeat-checker (&optional (timeout 10))
(lambda ()
(loop
Expand Down Expand Up @@ -147,7 +183,12 @@
:required t
:description "maximum number of workers"
:long-name "max-workers"
:key :max-workers)))
:key :max-workers)
(clingon:make-option
:list
:description "lifelines to restart work with"
:long-name "lifeline"
:key :lifelines)))

(defun cli-command ()
(clingon:make-command
Expand All @@ -166,6 +207,7 @@
(sb-bsd-sockets:socket-bind **socket** (namestring **socket-node**))
(sb-bsd-sockets:socket-listen **socket** 8)
(start-heartbeat-checker-thread)
(revive-lifelines (clingon:getopt cmd ':lifelines))
(start-socket-thread)
(format t "Started socket on: ~A~%" **socket-node**)
(format t "Waiting for socket thread to end.~%")
Expand Down
81 changes: 62 additions & 19 deletions src-worker/main.lisp
Original file line number Diff line number Diff line change
Expand Up @@ -22,21 +22,61 @@
:options (cli-options)
:handler #'cli-handler))

(sb-ext:defglobal **communication-error-occured** nil)

(defun await-manager-reconnection ()
(format t "Awaiting reconnection to a manager...")
(let* ((lifeline (make-instance 'sb-bsd-sockets:local-socket :type :stream))
(lifeline-node (merge-pathnames
(format nil "worker-~A-lifeline" **id**)
"/tmp/")))
(sb-bsd-sockets:socket-bind lifeline (namestring lifeline-node))
(sb-bsd-sockets:socket-listen lifeline 8)
(format t "~2%!!! LIFELINE !!! Provide manager node to negotiate with to: ~S~2%" (namestring lifeline-node))
(finish-output)
(loop :named :REVIVED :do
(let* ((client (sb-bsd-sockets:socket-accept lifeline))
(stream (sb-bsd-sockets:socket-make-stream
client
:input t
:output t
:element-type 'character
:buffering :line))
(message (read-form stream)))
(format t "Lifeline message received: ~S~%" message)
(sb-bsd-sockets:socket-close client)
(when (typep message '(cons (member :revive)))
(alexandria:destructuring-case message
((:revive &key id socket)
(setf **id** id
**manager-address** (namestring socket))
;; TODO: We need to make sure we the manager knows what
;; state we are in before we carry on.
(return-from :REVIVED))))))
(sb-bsd-sockets:socket-close lifeline)))

(defmacro with-manager-io ((stream) &body body)
(alexandria:with-gensyms (manager)
`(bt:with-lock-held (**manager-lock**)
(let ((,manager (make-instance 'sb-bsd-sockets:local-socket
:type :stream)))
:type :stream)))
(unwind-protect
(progn
(sb-bsd-sockets:socket-connect ,manager **manager-address**)
(let ((,stream (sb-bsd-sockets:socket-make-stream
,manager
:element-type 'character
:input t
:output t
:buffering ':line)))
,@body))
(tagbody
:RETRY
(handler-case
(progn
(sb-bsd-sockets:socket-connect ,manager **manager-address**)
(let ((,stream (sb-bsd-sockets:socket-make-stream
,manager
:element-type 'character
:input t
:output t
:buffering ':line)))
,@body))
(sb-bsd-sockets:socket-error (c)
(setf **comunication-error-occured** c)
(await-manager-reconnection)
(go :RETRY))))
(when (sb-bsd-sockets:socket-open-p ,manager)
(sb-bsd-sockets:socket-close ,manager)))))))

Expand All @@ -50,18 +90,17 @@

;;; Heartbeat

(defun make-heartbeat (id &optional (period 5))
(let ((heartbeat `(,id :heartbeat)))
(lambda ()
(loop
(with-manager-io (stream)
(write-form stream heartbeat)
(finish-output stream))
(sleep period)))))
(defun make-heartbeat (&optional (period 5))
(lambda ()
(loop
(with-manager-io (stream)
(write-form stream `(,**id** :heartbeat))
(finish-output stream))
(sleep period))))

(sb-ext:defglobal **heartbeat-thread** nil)
(defun start-heartbeat-thread ()
(setf **heartbeat-thread** (bt:make-thread (make-heartbeat **id**)
(setf **heartbeat-thread** (bt:make-thread (make-heartbeat)
:name "Heartbeat Thread")))

;;; Request ID
Expand Down Expand Up @@ -91,11 +130,15 @@
(let ((id? (request-id stream)))
(cond
(id?
(format t "Received ID: ~D~%" id?)
(setf **id** id?))
(t
(uiop:quit 1)))))

;; Start the heartbeat so we don't get kicked off.
(start-heartbeat-thread)

;; Request and dispatch work.
(with-manager-io (stream)
(format t "#~D connected to socket.~%" **id**)
(write-form stream `(,**id** :ping))
Expand Down

0 comments on commit c19ef52

Please sign in to comment.