From 5ae173f01cbd05d0ed28da6054f557c0f9ccb2cc Mon Sep 17 00:00:00 2001 From: Andrey Antukh Date: Tue, 14 Oct 2025 15:48:20 +0200 Subject: [PATCH] :sparkles: Make worker subsystem more resilent to redis restarts --- backend/src/app/worker/dispatcher.clj | 102 ++++++++++++++++++++------ backend/src/app/worker/runner.clj | 43 +++++++---- 2 files changed, 107 insertions(+), 38 deletions(-) diff --git a/backend/src/app/worker/dispatcher.clj b/backend/src/app/worker/dispatcher.clj index 62b03b80fa..f95ade8e1c 100644 --- a/backend/src/app/worker/dispatcher.clj +++ b/backend/src/app/worker/dispatcher.clj @@ -42,45 +42,99 @@ (assert (sm/check schema:dispatcher cfg))) (def ^:private sql:select-next-tasks - "select id, queue from task as t - where t.scheduled_at <= now() - and (t.status = 'new' or t.status = 'retry') - and queue ~~* ?::text - order by t.priority desc, t.scheduled_at - limit ? - for update skip locked") + "SELECT id, queue, scheduled_at from task AS t + WHERE t.scheduled_at <= ?::timestamptz + AND (t.status = 'new' OR t.status = 'retry') + AND queue ~~* ?::text + ORDER BY t.priority DESC, t.scheduled_at + LIMIT ? + FOR UPDATE + SKIP LOCKED") (def ^:private sql:mark-task-scheduled "UPDATE task SET status = 'scheduled' WHERE id = ANY(?)") +(def ^:private sql:reschedule-lost + "UPDATE task + SET status='new', scheduled_at=?::timestamptz + FROM (SELECT t.id + FROM task AS t + WHERE status = 'scheduled' + AND (?::timestamptz - t.scheduled_at) > '5 min'::interval) AS subquery + WHERE task.id=subquery.id +RETURNING task.id, task.queue") + +(def ^:private sql:clean-orphan + "UPDATE task + SET status='failed', modified_at=?::timestamptz, + error='orphan with running status' + FROM (SELECT t.id + FROM task AS t + WHERE status = 'running' + AND (?::timestamptz - t.modified_at) > '24 hour'::interval) AS subquery + WHERE task.id=subquery.id +RETURNING task.id, task.queue") + (defmethod ig/init-key ::wrk/dispatcher [_ {:keys [::db/pool ::wrk/tenant ::batch-size ::timeout] :as cfg}] - (letfn [(get-tasks [{:keys [::db/conn]}] - (let [prefix (str tenant ":%")] - (not-empty (db/exec! conn [sql:select-next-tasks prefix batch-size])))) + (letfn [(reschedule-lost-tasks [{:keys [::db/conn ::timestamp]}] + (doseq [{:keys [id queue]} (db/exec! conn [sql:reschedule-lost timestamp timestamp] + {:return-keys true})] + (l/wrn :hint "reschedule" + :id (str id) + :queue queue))) - (mark-as-scheduled [{:keys [::db/conn]} ids] - (let [sql [sql:mark-task-scheduled + (clean-orphan [{:keys [::db/conn ::timestamp]}] + (doseq [{:keys [id queue]} (db/exec! conn [sql:clean-orphan timestamp timestamp] + {:return-keys true})] + (l/wrn :hint "mark as orphan failed" + :id (str id) + :queue queue))) + + (get-tasks [{:keys [::db/conn ::timestamp] :as cfg}] + (let [prefix (str tenant ":%") + result (db/exec! conn [sql:select-next-tasks timestamp prefix batch-size])] + (not-empty result))) + + (mark-as-scheduled [{:keys [::db/conn]} items] + (let [ids (map :id items) + sql [sql:mark-task-scheduled (db/create-array conn "uuid" ids)]] (db/exec-one! conn sql))) (push-tasks [{:keys [::rds/conn] :as cfg} [queue tasks]] - (let [ids (mapv :id tasks) - key (str/ffmt "taskq:%" queue) - res (rds/rpush conn key (mapv t/encode-str ids))] + (let [items (mapv (juxt :id :scheduled-at) tasks) + key (str/ffmt "penpot.worker.queue:%" queue)] - (mark-as-scheduled cfg ids) - (l/trc :hist "enqueue tasks on redis" - :queue queue - :tasks (count ids) - :queued res))) + (rds/rpush conn key (mapv t/encode-str items)) + (mark-as-scheduled cfg tasks) + + (doseq [{:keys [id queue]} tasks] + (l/trc :hist "schedule" + :id (str id) + :queue queue)))) (run-batch' [cfg] - (if-let [tasks (get-tasks cfg)] - (->> (group-by :queue tasks) - (run! (partial push-tasks cfg))) - ::wait)) + (let [cfg (assoc cfg ::timestamp (ct/now))] + ;; Reschedule lost in transit tasks (can happen when + ;; redis server is restarted just after task is pushed) + (reschedule-lost-tasks cfg) + + ;; Mark as failed all tasks that are still marked as + ;; running but it's been more than 24 hours since its + ;; last modification + (clean-orphan cfg) + + ;; Then, schedule the next tasks in queue + (if-let [tasks (get-tasks cfg)] + (->> (group-by :queue tasks) + (run! (partial push-tasks cfg))) + + ;; If no tasks found on this batch run, we signal the + ;; run-loop to wait for some time before start running + ;; the next batch interation + ::wait))) (run-batch [] (let [rconn (rds/connect cfg)] diff --git a/backend/src/app/worker/runner.clj b/backend/src/app/worker/runner.clj index 0f245da19b..ea8cd08cf2 100644 --- a/backend/src/app/worker/runner.clj +++ b/backend/src/app/worker/runner.clj @@ -38,7 +38,7 @@ [:max-retries :int] [:retry-num :int] [:priority :int] - [:status [:enum "scheduled" "completed" "new" "retry" "failed"]] + [:status [:enum "scheduled" "running" "completed" "new" "retry" "failed"]] [:label {:optional true} :string] [:props :map]]) @@ -69,7 +69,7 @@ (decode-task-row)))) (defn- run-task - [{:keys [::wrk/registry ::id ::queue] :as cfg} task] + [{:keys [::db/pool ::wrk/registry ::id ::queue] :as cfg} task] (try (l/dbg :hint "start" :name (:name task) @@ -78,6 +78,13 @@ :runner-id id :retry (:retry-num task)) + ;; Mark task as running + (db/update! pool :task + {:status "running" + :modified-at (ct/now)} + {:id (:id task)} + {::db/return-keys false}) + (let [tpoint (ct/tpoint) task-fn (wrk/get-task registry (:name task)) result (when task-fn (task-fn task)) @@ -121,7 +128,7 @@ {:status "retry" :error cause}))))))) (defn- run-task! - [{:keys [::id ::timeout] :as cfg} task-id] + [{:keys [::id ::timeout] :as cfg} task-id scheduled-at] (loop [task (get-task cfg task-id)] (cond (ex/exception? task) @@ -129,20 +136,26 @@ (db/serialization-error? task)) (do (l/wrn :hint "connection error on retrieving task from database (retrying in some instants)" - :id id + :runner-id id :cause task) (px/sleep timeout) (recur (get-task cfg task-id))) (do (l/err :hint "unhandled exception on retrieving task from database (retrying in some instants)" - :id id + :runner-id id :cause task) (px/sleep timeout) (recur (get-task cfg task-id)))) + (not= (inst-ms scheduled-at) + (inst-ms (:scheduled-at task))) + (l/wrn :hint "skiping task, rescheduled" + :task-id task-id + :runner-id id) + (nil? task) (l/wrn :hint "no task found on the database" - :id id + :runner-id id :task-id task-id) :else @@ -185,17 +198,19 @@ (db/update! pool :task {:completed-at now :modified-at now + :error nil :status "completed"} {:id (:id task)}) nil)) (decode-payload [payload] (try - (let [task-id (t/decode-str payload)] - (if (uuid? task-id) - task-id - (l/err :hint "received unexpected payload (uuid expected)" - :payload task-id))) + (let [[task-id scheduled-at :as payload] (t/decode-str payload)] + (if (and (uuid? task-id) + (ct/inst? scheduled-at)) + payload + (l/err :hint "received unexpected payload" + :payload payload))) (catch Throwable cause (l/err :hint "unable to decode payload" :payload payload @@ -211,8 +226,8 @@ (throw (IllegalArgumentException. (str "invalid status received: " status)))))) - (run-task-loop [task-id] - (loop [result (run-task! cfg task-id)] + (run-task-loop [[task-id scheduled-at]] + (loop [result (run-task! cfg task-id scheduled-at)] (when-let [cause (process-result result)] (if (or (db/connection-error? cause) (db/serialization-error? cause)) @@ -226,7 +241,7 @@ :cause cause))))))] (try - (let [key (str/ffmt "taskq:%" queue) + (let [key (str/ffmt "penpot.worker.queue:%" queue) [_ payload] (rds/blpop conn [key] timeout)] (some-> payload decode-payload