Make worker subsystem more resilent to redis restarts

This commit is contained in:
Andrey Antukh
2025-10-14 15:48:20 +02:00
parent 85591bd579
commit 5ae173f01c
2 changed files with 107 additions and 38 deletions

View File

@@ -42,45 +42,99 @@
(assert (sm/check schema:dispatcher cfg))) (assert (sm/check schema:dispatcher cfg)))
(def ^:private sql:select-next-tasks (def ^:private sql:select-next-tasks
"select id, queue from task as t "SELECT id, queue, scheduled_at from task AS t
where t.scheduled_at <= now() WHERE t.scheduled_at <= ?::timestamptz
and (t.status = 'new' or t.status = 'retry') AND (t.status = 'new' OR t.status = 'retry')
and queue ~~* ?::text AND queue ~~* ?::text
order by t.priority desc, t.scheduled_at ORDER BY t.priority DESC, t.scheduled_at
limit ? LIMIT ?
for update skip locked") FOR UPDATE
SKIP LOCKED")
(def ^:private sql:mark-task-scheduled (def ^:private sql:mark-task-scheduled
"UPDATE task SET status = 'scheduled' "UPDATE task SET status = 'scheduled'
WHERE id = ANY(?)") WHERE id = ANY(?)")
(def ^:private sql:reschedule-lost
"UPDATE task
SET status='new', scheduled_at=?::timestamptz
FROM (SELECT t.id
FROM task AS t
WHERE status = 'scheduled'
AND (?::timestamptz - t.scheduled_at) > '5 min'::interval) AS subquery
WHERE task.id=subquery.id
RETURNING task.id, task.queue")
(def ^:private sql:clean-orphan
"UPDATE task
SET status='failed', modified_at=?::timestamptz,
error='orphan with running status'
FROM (SELECT t.id
FROM task AS t
WHERE status = 'running'
AND (?::timestamptz - t.modified_at) > '24 hour'::interval) AS subquery
WHERE task.id=subquery.id
RETURNING task.id, task.queue")
(defmethod ig/init-key ::wrk/dispatcher (defmethod ig/init-key ::wrk/dispatcher
[_ {:keys [::db/pool ::wrk/tenant ::batch-size ::timeout] :as cfg}] [_ {:keys [::db/pool ::wrk/tenant ::batch-size ::timeout] :as cfg}]
(letfn [(get-tasks [{:keys [::db/conn]}] (letfn [(reschedule-lost-tasks [{:keys [::db/conn ::timestamp]}]
(let [prefix (str tenant ":%")] (doseq [{:keys [id queue]} (db/exec! conn [sql:reschedule-lost timestamp timestamp]
(not-empty (db/exec! conn [sql:select-next-tasks prefix batch-size])))) {:return-keys true})]
(l/wrn :hint "reschedule"
:id (str id)
:queue queue)))
(mark-as-scheduled [{:keys [::db/conn]} ids] (clean-orphan [{:keys [::db/conn ::timestamp]}]
(let [sql [sql:mark-task-scheduled (doseq [{:keys [id queue]} (db/exec! conn [sql:clean-orphan timestamp timestamp]
{:return-keys true})]
(l/wrn :hint "mark as orphan failed"
:id (str id)
:queue queue)))
(get-tasks [{:keys [::db/conn ::timestamp] :as cfg}]
(let [prefix (str tenant ":%")
result (db/exec! conn [sql:select-next-tasks timestamp prefix batch-size])]
(not-empty result)))
(mark-as-scheduled [{:keys [::db/conn]} items]
(let [ids (map :id items)
sql [sql:mark-task-scheduled
(db/create-array conn "uuid" ids)]] (db/create-array conn "uuid" ids)]]
(db/exec-one! conn sql))) (db/exec-one! conn sql)))
(push-tasks [{:keys [::rds/conn] :as cfg} [queue tasks]] (push-tasks [{:keys [::rds/conn] :as cfg} [queue tasks]]
(let [ids (mapv :id tasks) (let [items (mapv (juxt :id :scheduled-at) tasks)
key (str/ffmt "taskq:%" queue) key (str/ffmt "penpot.worker.queue:%" queue)]
res (rds/rpush conn key (mapv t/encode-str ids))]
(mark-as-scheduled cfg ids) (rds/rpush conn key (mapv t/encode-str items))
(l/trc :hist "enqueue tasks on redis" (mark-as-scheduled cfg tasks)
:queue queue
:tasks (count ids) (doseq [{:keys [id queue]} tasks]
:queued res))) (l/trc :hist "schedule"
:id (str id)
:queue queue))))
(run-batch' [cfg] (run-batch' [cfg]
(if-let [tasks (get-tasks cfg)] (let [cfg (assoc cfg ::timestamp (ct/now))]
(->> (group-by :queue tasks) ;; Reschedule lost in transit tasks (can happen when
(run! (partial push-tasks cfg))) ;; redis server is restarted just after task is pushed)
::wait)) (reschedule-lost-tasks cfg)
;; Mark as failed all tasks that are still marked as
;; running but it's been more than 24 hours since its
;; last modification
(clean-orphan cfg)
;; Then, schedule the next tasks in queue
(if-let [tasks (get-tasks cfg)]
(->> (group-by :queue tasks)
(run! (partial push-tasks cfg)))
;; If no tasks found on this batch run, we signal the
;; run-loop to wait for some time before start running
;; the next batch interation
::wait)))
(run-batch [] (run-batch []
(let [rconn (rds/connect cfg)] (let [rconn (rds/connect cfg)]

View File

@@ -38,7 +38,7 @@
[:max-retries :int] [:max-retries :int]
[:retry-num :int] [:retry-num :int]
[:priority :int] [:priority :int]
[:status [:enum "scheduled" "completed" "new" "retry" "failed"]] [:status [:enum "scheduled" "running" "completed" "new" "retry" "failed"]]
[:label {:optional true} :string] [:label {:optional true} :string]
[:props :map]]) [:props :map]])
@@ -69,7 +69,7 @@
(decode-task-row)))) (decode-task-row))))
(defn- run-task (defn- run-task
[{:keys [::wrk/registry ::id ::queue] :as cfg} task] [{:keys [::db/pool ::wrk/registry ::id ::queue] :as cfg} task]
(try (try
(l/dbg :hint "start" (l/dbg :hint "start"
:name (:name task) :name (:name task)
@@ -78,6 +78,13 @@
:runner-id id :runner-id id
:retry (:retry-num task)) :retry (:retry-num task))
;; Mark task as running
(db/update! pool :task
{:status "running"
:modified-at (ct/now)}
{:id (:id task)}
{::db/return-keys false})
(let [tpoint (ct/tpoint) (let [tpoint (ct/tpoint)
task-fn (wrk/get-task registry (:name task)) task-fn (wrk/get-task registry (:name task))
result (when task-fn (task-fn task)) result (when task-fn (task-fn task))
@@ -121,7 +128,7 @@
{:status "retry" :error cause}))))))) {:status "retry" :error cause})))))))
(defn- run-task! (defn- run-task!
[{:keys [::id ::timeout] :as cfg} task-id] [{:keys [::id ::timeout] :as cfg} task-id scheduled-at]
(loop [task (get-task cfg task-id)] (loop [task (get-task cfg task-id)]
(cond (cond
(ex/exception? task) (ex/exception? task)
@@ -129,20 +136,26 @@
(db/serialization-error? task)) (db/serialization-error? task))
(do (do
(l/wrn :hint "connection error on retrieving task from database (retrying in some instants)" (l/wrn :hint "connection error on retrieving task from database (retrying in some instants)"
:id id :runner-id id
:cause task) :cause task)
(px/sleep timeout) (px/sleep timeout)
(recur (get-task cfg task-id))) (recur (get-task cfg task-id)))
(do (do
(l/err :hint "unhandled exception on retrieving task from database (retrying in some instants)" (l/err :hint "unhandled exception on retrieving task from database (retrying in some instants)"
:id id :runner-id id
:cause task) :cause task)
(px/sleep timeout) (px/sleep timeout)
(recur (get-task cfg task-id)))) (recur (get-task cfg task-id))))
(not= (inst-ms scheduled-at)
(inst-ms (:scheduled-at task)))
(l/wrn :hint "skiping task, rescheduled"
:task-id task-id
:runner-id id)
(nil? task) (nil? task)
(l/wrn :hint "no task found on the database" (l/wrn :hint "no task found on the database"
:id id :runner-id id
:task-id task-id) :task-id task-id)
:else :else
@@ -185,17 +198,19 @@
(db/update! pool :task (db/update! pool :task
{:completed-at now {:completed-at now
:modified-at now :modified-at now
:error nil
:status "completed"} :status "completed"}
{:id (:id task)}) {:id (:id task)})
nil)) nil))
(decode-payload [payload] (decode-payload [payload]
(try (try
(let [task-id (t/decode-str payload)] (let [[task-id scheduled-at :as payload] (t/decode-str payload)]
(if (uuid? task-id) (if (and (uuid? task-id)
task-id (ct/inst? scheduled-at))
(l/err :hint "received unexpected payload (uuid expected)" payload
:payload task-id))) (l/err :hint "received unexpected payload"
:payload payload)))
(catch Throwable cause (catch Throwable cause
(l/err :hint "unable to decode payload" (l/err :hint "unable to decode payload"
:payload payload :payload payload
@@ -211,8 +226,8 @@
(throw (IllegalArgumentException. (throw (IllegalArgumentException.
(str "invalid status received: " status)))))) (str "invalid status received: " status))))))
(run-task-loop [task-id] (run-task-loop [[task-id scheduled-at]]
(loop [result (run-task! cfg task-id)] (loop [result (run-task! cfg task-id scheduled-at)]
(when-let [cause (process-result result)] (when-let [cause (process-result result)]
(if (or (db/connection-error? cause) (if (or (db/connection-error? cause)
(db/serialization-error? cause)) (db/serialization-error? cause))
@@ -226,7 +241,7 @@
:cause cause))))))] :cause cause))))))]
(try (try
(let [key (str/ffmt "taskq:%" queue) (let [key (str/ffmt "penpot.worker.queue:%" queue)
[_ payload] (rds/blpop conn [key] timeout)] [_ payload] (rds/blpop conn [key] timeout)]
(some-> payload (some-> payload
decode-payload decode-payload