🐛 Truncate worker scheduled-at to milliseconds

The nanosecond precision has the problem with transit serialization
roundtrip used for pass data on the worker scheduler throught redis
and generates unnecesary rescheduling.
This commit is contained in:
Andrey Antukh
2025-11-05 10:15:21 +01:00
parent 9e7ec594ca
commit cd53d3659c
2 changed files with 25 additions and 18 deletions

View File

@@ -77,8 +77,8 @@
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
(def ^:private sql:insert-new-task (def ^:private sql:insert-new-task
"insert into task (id, name, props, queue, label, priority, max_retries, scheduled_at) "insert into task (id, name, props, queue, label, priority, max_retries, created_at, modified_at, scheduled_at)
values (?, ?, ?, ?, ?, ?, ?, now() + ?) values (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
returning id") returning id")
(def ^:private (def ^:private
@@ -88,7 +88,7 @@
AND queue=? AND queue=?
AND label=? AND label=?
AND status = 'new' AND status = 'new'
AND scheduled_at > now()") AND scheduled_at > ?")
(def ^:private schema:options (def ^:private schema:options
[:map {:title "submit-options"} [:map {:title "submit-options"}
@@ -111,8 +111,10 @@
(check-options! options) (check-options! options)
(let [duration (ct/duration delay) (let [delay (ct/duration delay)
interval (db/interval duration) now (ct/now)
scheduled-at (-> (ct/plus now delay)
(ct/truncate :millisecond))
props (db/tjson params) props (db/tjson params)
id (uuid/next) id (uuid/next)
tenant (cf/get :tenant) tenant (cf/get :tenant)
@@ -120,8 +122,8 @@
queue (str/ffmt "%:%" tenant (d/name queue)) queue (str/ffmt "%:%" tenant (d/name queue))
conn (db/get-connectable options) conn (db/get-connectable options)
deleted (when dedupe deleted (when dedupe
(-> (db/exec-one! conn [sql:remove-not-started-tasks task queue label]) (-> (db/exec-one! conn [sql:remove-not-started-tasks task queue label now])
:next.jdbc/update-count))] (db/get-update-count)))]
(l/trc :hint "submit task" (l/trc :hint "submit task"
:name task :name task
@@ -129,11 +131,13 @@
:queue queue :queue queue
:label label :label label
:dedupe (boolean dedupe) :dedupe (boolean dedupe)
:delay (ct/format-duration duration) :delay (ct/format-duration delay)
:replace (or deleted 0)) :replace (or deleted 0))
(db/exec-one! conn [sql:insert-new-task id task props queue (db/exec-one! conn [sql:insert-new-task id task props queue
label priority max-retries interval]) label priority max-retries
now now scheduled-at])
id)) id))
(defn invoke! (defn invoke!

View File

@@ -158,7 +158,9 @@
(inst-ms (:scheduled-at task))) (inst-ms (:scheduled-at task)))
(l/wrn :hint "skiping task, rescheduled" (l/wrn :hint "skiping task, rescheduled"
:task-id task-id :task-id task-id
:runner-id id) :runner-id id
:scheduled-at (ct/format-inst (:scheduled-at task))
:expected-scheduled-at (ct/format-inst scheduled-at))
:else :else
(let [result (run-task cfg task)] (let [result (run-task cfg task)]
@@ -179,7 +181,8 @@
{:error explain {:error explain
:status "retry" :status "retry"
:modified-at now :modified-at now
:scheduled-at (ct/plus now delay) :scheduled-at (-> (ct/plus now delay)
(ct/truncate :millisecond))
:retry-num nretry} :retry-num nretry}
{:id (:id task)}) {:id (:id task)})
nil)) nil))