Skip to content

Commit

Permalink
✨ Move worker runner to a separated namespace
Browse files Browse the repository at this point in the history
  • Loading branch information
niwinz authored and hirunatan committed Apr 3, 2024
1 parent 4ccea6b commit 3a67e51
Show file tree
Hide file tree
Showing 6 changed files with 284 additions and 253 deletions.
2 changes: 1 addition & 1 deletion backend/resources/log4j2-devenv.xml
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@

<Logger name="app" level="all" additivity="false">
<AppenderRef ref="main" level="trace" />
<AppenderRef ref="console" level="info" />
<AppenderRef ref="console" level="debug" />
</Logger>

<Logger name="user" level="trace" additivity="false">
Expand Down
4 changes: 2 additions & 2 deletions backend/src/app/main.clj
Original file line number Diff line number Diff line change
Expand Up @@ -493,15 +493,15 @@
::mtx/metrics (ig/ref ::mtx/metrics)
::db/pool (ig/ref ::db/pool)}

[::default ::wrk/worker]
[::default ::wrk/runner]
{::wrk/parallelism (cf/get ::worker-default-parallelism 1)
::wrk/queue :default
::rds/redis (ig/ref ::rds/redis)
::wrk/registry (ig/ref ::wrk/registry)
::mtx/metrics (ig/ref ::mtx/metrics)
::db/pool (ig/ref ::db/pool)}

[::webhook ::wrk/worker]
[::webhook ::wrk/runner]
{::wrk/parallelism (cf/get ::worker-webhook-parallelism 1)
::wrk/queue :webhooks
::rds/redis (ig/ref ::rds/redis)
Expand Down
246 changes: 2 additions & 244 deletions backend/src/app/worker.clj
Original file line number Diff line number Diff line change
Expand Up @@ -8,21 +8,16 @@
"Async tasks abstraction (impl)."
(:require
[app.common.data :as d]
[app.common.data.macros :as dm]
[app.common.exceptions :as ex]
[app.common.logging :as l]
[app.common.spec :as us]
[app.common.transit :as t]
[app.common.uuid :as uuid]
[app.config :as cf]
[app.db :as db]
[app.metrics :as mtx]
[app.redis :as rds]
[app.util.time :as dt]
[clojure.spec.alpha :as s]
[cuerdas.core :as str]
[integrant.core :as ig]
[promesa.exec :as px]))
[integrant.core :as ig]))

(set! *warn-on-reflection* true)

Expand Down Expand Up @@ -59,244 +54,6 @@
{}
tasks))

;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
;; WORKER
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;

(defn- decode-task-row
[{:keys [props] :as row}]
(cond-> row
(db/pgobject? props)
(assoc :props (db/decode-transit-pgobject props))))

(declare ^:private run-worker-loop!)
(declare ^:private start-worker!)
(declare ^:private get-error-context)

(defmethod ig/pre-init-spec ::worker [_]
(s/keys :req [::parallelism
::mtx/metrics
::db/pool
::rds/redis
::queue
::registry]))

(defmethod ig/prep-key ::worker
[_ cfg]
(merge {::parallelism 1}
(d/without-nils cfg)))

(defmethod ig/init-key ::worker
[_ {:keys [::db/pool ::queue ::parallelism] :as cfg}]
(let [queue (d/name queue)
cfg (assoc cfg ::queue queue)]
(if (db/read-only? pool)
(l/wrn :hint "worker: not started (db is read-only)" :queue queue :parallelism parallelism)
(doall
(->> (range parallelism)
(map #(assoc cfg ::worker-id %))
(map start-worker!))))))

(defmethod ig/halt-key! ::worker
[_ threads]
(run! px/interrupt! threads))

(defn- start-worker!
[{:keys [::rds/redis ::worker-id ::queue] :as cfg}]
(px/thread
{:name (format "penpot/worker/runner:%s" worker-id)}
(l/inf :hint "worker: started" :worker-id worker-id :queue queue)
(try
(dm/with-open [rconn (rds/connect redis)]
(let [tenant (cf/get :tenant "main")
cfg (-> cfg
(assoc ::queue (str/ffmt "taskq:%:%" tenant queue))
(assoc ::rds/rconn rconn)
(assoc ::timeout (dt/duration "5s")))]
(loop []
(when (px/interrupted?)
(throw (InterruptedException. "interrupted")))

(run-worker-loop! cfg)
(recur))))

(catch InterruptedException _
(l/debug :hint "worker: interrupted"
:worker-id worker-id
:queue queue))
(catch Throwable cause
(l/err :hint "worker: unexpected exception"
:worker-id worker-id
:queue queue
:cause cause))
(finally
(l/inf :hint "worker: terminated"
:worker-id worker-id
:queue queue)))))

(defn- run-worker-loop!
[{:keys [::db/pool ::rds/rconn ::timeout ::queue ::registry ::worker-id]}]
(letfn [(handle-task-retry [{:keys [task error inc-by delay] :or {inc-by 1 delay 1000}}]
(let [explain (ex-message error)
nretry (+ (:retry-num task) inc-by)
now (dt/now)
delay (->> (iterate #(* % 2) delay) (take nretry) (last))]
(db/update! pool :task
{:error explain
:status "retry"
:modified-at now
:scheduled-at (dt/plus now delay)
:retry-num nretry}
{:id (:id task)})
nil))

(handle-task-failure [{:keys [task error]}]
(let [explain (ex-message error)]
(db/update! pool :task
{:error explain
:modified-at (dt/now)
:status "failed"}
{:id (:id task)})
nil))

(handle-task-completion [{:keys [task]}]
(let [now (dt/now)]
(db/update! pool :task
{:completed-at now
:modified-at now
:status "completed"}
{:id (:id task)})
nil))

(decode-payload [^bytes payload]
(try
(let [task-id (t/decode payload)]
(if (uuid? task-id)
task-id
(l/err :hint "worker: received unexpected payload (uuid expected)"
:payload task-id)))
(catch Throwable cause
(l/err :hint "worker: unable to decode payload"
:payload payload
:length (alength payload)
:cause cause))))

(handle-task [{:keys [name] :as task}]
(let [task-fn (get registry name)]
(if task-fn
(task-fn task)
(l/wrn :hint "no task handler found" :name name))
{:status :completed :task task}))

(handle-task-exception [cause task]
(let [edata (ex-data cause)]
(if (and (< (:retry-num task)
(:max-retries task))
(= ::retry (:type edata)))
(cond-> {:status :retry :task task :error cause}
(dt/duration? (:delay edata))
(assoc :delay (:delay edata))

(= ::noop (:strategy edata))
(assoc :inc-by 0))
(do
(l/err :hint "worker: unhandled exception on task"
::l/context (get-error-context cause task)
:cause cause)
(if (>= (:retry-num task) (:max-retries task))
{:status :failed :task task :error cause}
{:status :retry :task task :error cause})))))

(get-task [task-id]
(ex/try!
(some-> (db/get* pool :task {:id task-id})
(decode-task-row))))

(run-task [task-id]
(loop [task (get-task task-id)]
(cond
(ex/exception? task)
(if (or (db/connection-error? task)
(db/serialization-error? task))
(do
(l/wrn :hint "worker: connection error on retrieving task from database (retrying in some instants)"
:worker-id worker-id
:cause task)
(px/sleep (::rds/timeout rconn))
(recur (get-task task-id)))
(do
(l/err :hint "worker: unhandled exception on retrieving task from database (retrying in some instants)"
:worker-id worker-id
:cause task)
(px/sleep (::rds/timeout rconn))
(recur (get-task task-id))))

(nil? task)
(l/wrn :hint "worker: no task found on the database"
:worker-id worker-id
:task-id task-id)

:else
(try
(l/trc :hint "executing task"
:name (:name task)
:id (str (:id task))
:queue queue
:worker-id worker-id
:retry (:retry-num task))
(handle-task task)
(catch InterruptedException cause
(throw cause))
(catch Throwable cause
(handle-task-exception cause task))))))

(process-result [{:keys [status] :as result}]
(ex/try!
(case status
:retry (handle-task-retry result)
:failed (handle-task-failure result)
:completed (handle-task-completion result)
nil)))

(run-task-loop [task-id]
(loop [result (run-task task-id)]
(when-let [cause (process-result result)]
(if (or (db/connection-error? cause)
(db/serialization-error? cause))
(do
(l/wrn :hint "worker: database exeption on processing task result (retrying in some instants)"
:cause cause)
(px/sleep (::rds/timeout rconn))
(recur result))
(do
(l/err :hint "worker: unhandled exception on processing task result (retrying in some instants)"
:cause cause)
(px/sleep (::rds/timeout rconn))
(recur result))))))]

(try
(let [[_ payload] (rds/blpop! rconn timeout queue)]
(some-> payload
decode-payload
run-task-loop))

(catch InterruptedException cause
(throw cause))

(catch Exception cause
(if (rds/timeout-exception? cause)
(do
(l/err :hint "worker: redis pop operation timeout, consider increasing redis timeout (will retry in some instants)"
:timeout timeout
:cause cause)
(px/sleep timeout))

(l/err :hint "worker: unhandled exception" :cause cause))))))

(defn get-error-context
[_ item]
{:params item})

;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
;; SUBMIT API
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
Expand Down Expand Up @@ -348,6 +105,7 @@
[& {:keys [::task ::delay ::queue ::priority ::max-retries ::conn ::dedupe ::label]
:or {delay 0 queue :default priority 100 max-retries 3 label ""}
:as options}]

(us/verify! ::submit-options options)
(let [duration (dt/duration delay)
interval (db/interval duration)
Expand Down
9 changes: 5 additions & 4 deletions backend/src/app/worker/cron.clj
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,8 @@
[app.common.logging :as l]
[app.db :as db]
[app.util.time :as dt]
[app.worker :as wrk]
[app.worker :as-alias wrk]
[app.worker.runner :refer [get-error-context]]
[clojure.spec.alpha :as s]
[cuerdas.core :as str]
[integrant.core :as ig]
Expand Down Expand Up @@ -64,7 +65,7 @@

(catch Throwable cause
(let [elapsed (dt/format-duration (tpoint))]
(binding [l/*context* (wrk/get-error-context cause task)]
(binding [l/*context* (get-error-context cause task)]
(l/err :hint "unhandled exception on running task"
:task-id id
:elapsed elapsed
Expand Down Expand Up @@ -98,11 +99,11 @@
(s/def ::props (s/nilable map?))
(s/def ::task keyword?)

(s/def ::wrk/task
(s/def ::task-item
(s/keys :req-un [::cron ::task]
:opt-un [::props ::id]))

(s/def ::wrk/entries (s/coll-of (s/nilable ::wrk/task)))
(s/def ::wrk/entries (s/coll-of (s/nilable ::task-item)))

(defmethod ig/pre-init-spec ::wrk/cron [_]
(s/keys :req [::db/pool ::wrk/entries ::wrk/registry]))
Expand Down
Loading

0 comments on commit 3a67e51

Please sign in to comment.