mirror of
https://github.com/penpot/penpot.git
synced 2025-12-11 22:14:05 +01:00
Merge remote-tracking branch 'origin/staging' into develop
Some checks failed
_DEVELOP / build-bundle (push) Has been cancelled
_DEVELOP / build-docker (push) Has been cancelled
_STAGING / build-bundle (push) Has been cancelled
_STAGING / build-docker (push) Has been cancelled
Commit Message Check / Check Commit Message (push) Has been cancelled
Some checks failed
_DEVELOP / build-bundle (push) Has been cancelled
_DEVELOP / build-docker (push) Has been cancelled
_STAGING / build-bundle (push) Has been cancelled
_STAGING / build-docker (push) Has been cancelled
Commit Message Check / Check Commit Message (push) Has been cancelled
This commit is contained in:
@@ -218,6 +218,9 @@
|
||||
(when (or (nil? revn) (= revn (:revn file)))
|
||||
file)))
|
||||
|
||||
;; FIXME: we should skip files that does not match the revn on the
|
||||
;; props and add proper schema for this task props
|
||||
|
||||
(defn- process-file!
|
||||
[cfg {:keys [file-id] :as props}]
|
||||
(if-let [file (get-file cfg props)]
|
||||
|
||||
@@ -8,6 +8,7 @@
|
||||
"A maintenance task that is responsible of properly scheduling the
|
||||
file-gc task for all files that matches the eligibility threshold."
|
||||
(:require
|
||||
[app.common.logging :as l]
|
||||
[app.common.time :as ct]
|
||||
[app.config :as cf]
|
||||
[app.db :as db]
|
||||
@@ -21,25 +22,24 @@
|
||||
f.modified_at
|
||||
FROM file AS f
|
||||
WHERE f.has_media_trimmed IS false
|
||||
AND f.modified_at < now() - ?::interval
|
||||
AND f.modified_at < ?
|
||||
AND f.deleted_at IS NULL
|
||||
ORDER BY f.modified_at DESC
|
||||
FOR UPDATE OF f
|
||||
SKIP LOCKED")
|
||||
|
||||
(defn- get-candidates
|
||||
[{:keys [::db/conn ::min-age] :as cfg}]
|
||||
(let [min-age (db/interval min-age)]
|
||||
(db/plan conn [sql:get-candidates min-age] {:fetch-size 10})))
|
||||
|
||||
(defn- schedule!
|
||||
[cfg]
|
||||
[{:keys [::db/conn] :as cfg} threshold]
|
||||
(let [total (reduce (fn [total {:keys [id modified-at revn]}]
|
||||
(let [params {:file-id id :modified-at modified-at :revn revn}]
|
||||
(let [params {:file-id id :revn revn}]
|
||||
(l/trc :hint "schedule"
|
||||
:file-id (str id)
|
||||
:revn revn
|
||||
:modified-at (ct/format-inst modified-at))
|
||||
(wrk/submit! (assoc cfg ::wrk/params params))
|
||||
(inc total)))
|
||||
0
|
||||
(get-candidates cfg))]
|
||||
(db/plan conn [sql:get-candidates threshold] {:fetch-size 10}))]
|
||||
{:processed total}))
|
||||
|
||||
(defmethod ig/assert-key ::handler
|
||||
@@ -53,12 +53,12 @@
|
||||
(defmethod ig/init-key ::handler
|
||||
[_ cfg]
|
||||
(fn [{:keys [props] :as task}]
|
||||
(let [min-age (ct/duration (or (:min-age props) (::min-age cfg)))]
|
||||
(let [threshold (-> (ct/duration (or (:min-age props) (::min-age cfg)))
|
||||
(ct/in-past))]
|
||||
(-> cfg
|
||||
(assoc ::db/rollback (:rollback? props))
|
||||
(assoc ::min-age min-age)
|
||||
(assoc ::wrk/task :file-gc)
|
||||
(assoc ::wrk/priority 10)
|
||||
(assoc ::wrk/mark-retries 0)
|
||||
(assoc ::wrk/delay 1000)
|
||||
(db/tx-run! schedule!)))))
|
||||
(assoc ::wrk/delay 10000)
|
||||
(db/tx-run! schedule! threshold)))))
|
||||
|
||||
@@ -77,8 +77,8 @@
|
||||
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
|
||||
|
||||
(def ^:private sql:insert-new-task
|
||||
"insert into task (id, name, props, queue, label, priority, max_retries, scheduled_at)
|
||||
values (?, ?, ?, ?, ?, ?, ?, now() + ?)
|
||||
"insert into task (id, name, props, queue, label, priority, max_retries, created_at, modified_at, scheduled_at)
|
||||
values (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
|
||||
returning id")
|
||||
|
||||
(def ^:private
|
||||
@@ -88,7 +88,7 @@
|
||||
AND queue=?
|
||||
AND label=?
|
||||
AND status = 'new'
|
||||
AND scheduled_at > now()")
|
||||
AND scheduled_at > ?")
|
||||
|
||||
(def ^:private schema:options
|
||||
[:map {:title "submit-options"}
|
||||
@@ -111,17 +111,19 @@
|
||||
|
||||
(check-options! options)
|
||||
|
||||
(let [duration (ct/duration delay)
|
||||
interval (db/interval duration)
|
||||
props (db/tjson params)
|
||||
id (uuid/next)
|
||||
tenant (cf/get :tenant)
|
||||
task (d/name task)
|
||||
queue (str/ffmt "%:%" tenant (d/name queue))
|
||||
conn (db/get-connectable options)
|
||||
deleted (when dedupe
|
||||
(-> (db/exec-one! conn [sql:remove-not-started-tasks task queue label])
|
||||
:next.jdbc/update-count))]
|
||||
(let [delay (ct/duration delay)
|
||||
now (ct/now)
|
||||
scheduled-at (-> (ct/plus now delay)
|
||||
(ct/truncate :millisecond))
|
||||
props (db/tjson params)
|
||||
id (uuid/next)
|
||||
tenant (cf/get :tenant)
|
||||
task (d/name task)
|
||||
queue (str/ffmt "%:%" tenant (d/name queue))
|
||||
conn (db/get-connectable options)
|
||||
deleted (when dedupe
|
||||
(-> (db/exec-one! conn [sql:remove-not-started-tasks task queue label now])
|
||||
(db/get-update-count)))]
|
||||
|
||||
(l/trc :hint "submit task"
|
||||
:name task
|
||||
@@ -129,11 +131,13 @@
|
||||
:queue queue
|
||||
:label label
|
||||
:dedupe (boolean dedupe)
|
||||
:delay (ct/format-duration duration)
|
||||
:delay (ct/format-duration delay)
|
||||
:replace (or deleted 0))
|
||||
|
||||
(db/exec-one! conn [sql:insert-new-task id task props queue
|
||||
label priority max-retries interval])
|
||||
label priority max-retries
|
||||
now now scheduled-at])
|
||||
|
||||
id))
|
||||
|
||||
(defn invoke!
|
||||
|
||||
@@ -158,7 +158,9 @@
|
||||
(inst-ms (:scheduled-at task)))
|
||||
(l/wrn :hint "skiping task, rescheduled"
|
||||
:task-id task-id
|
||||
:runner-id id)
|
||||
:runner-id id
|
||||
:scheduled-at (ct/format-inst (:scheduled-at task))
|
||||
:expected-scheduled-at (ct/format-inst scheduled-at))
|
||||
|
||||
:else
|
||||
(let [result (run-task cfg task)]
|
||||
@@ -179,7 +181,8 @@
|
||||
{:error explain
|
||||
:status "retry"
|
||||
:modified-at now
|
||||
:scheduled-at (ct/plus now delay)
|
||||
:scheduled-at (-> (ct/plus now delay)
|
||||
(ct/truncate :millisecond))
|
||||
:retry-num nretry}
|
||||
{:id (:id task)})
|
||||
nil))
|
||||
|
||||
@@ -549,6 +549,44 @@
|
||||
(io/copy r sw)
|
||||
(.toString sw))))
|
||||
|
||||
(defn parse-sse
|
||||
[content]
|
||||
(let [state
|
||||
(reduce (fn [{:keys [events data event id] :as state} line]
|
||||
(cond
|
||||
;; empty line → dispatch event if we have data
|
||||
(str/blank? line)
|
||||
(if (seq data)
|
||||
(-> state
|
||||
(update :events conj {:event (or event "message")
|
||||
:data (-> (str/join "\n" data))})
|
||||
(assoc :data [] :event nil))
|
||||
state)
|
||||
|
||||
;; comment line (starts with :)
|
||||
(str/starts-with? line ":")
|
||||
state
|
||||
|
||||
:else
|
||||
(let [[field raw-value] (str/split line #":" 2)
|
||||
value (some-> raw-value (str/replace #"^ " ""))]
|
||||
(case field
|
||||
"data" (update state :data conj (or value ""))
|
||||
"event" (assoc state :event value)
|
||||
;; ignore retry and unknown fields
|
||||
state))))
|
||||
{:events [] :data [] :event nil}
|
||||
(str/split content #"\r?\n"))
|
||||
|
||||
;; handle unterminated last event (no trailing blank line)
|
||||
state (if (seq (:data state))
|
||||
(update state :events conj
|
||||
{:event (or (:event state) "message")
|
||||
:data (str/join "\n" (:data state))})
|
||||
state)]
|
||||
|
||||
(:events state)))
|
||||
|
||||
(defn consume-sse
|
||||
[callback]
|
||||
(let [{:keys [::yres/status ::yres/body ::yres/headers] :as response} (callback {})
|
||||
@@ -558,12 +596,9 @@
|
||||
(try
|
||||
(px/exec! :virtual #(rcp/write-body-to-stream body nil output))
|
||||
(into []
|
||||
(map (fn [event]
|
||||
(let [[item1 item2] (re-seq #"(.*): (.*)\n?" event)]
|
||||
|
||||
[(keyword (nth item1 2))
|
||||
(tr/decode-str (nth item2 2))])))
|
||||
(-> (slurp' input)
|
||||
(str/split "\n\n")))
|
||||
(map (fn [{:keys [event data]}]
|
||||
[(keyword event)
|
||||
(tr/decode-str data)]))
|
||||
(parse-sse (slurp' input)))
|
||||
(finally
|
||||
(.close input)))))
|
||||
|
||||
Reference in New Issue
Block a user