mirror of
https://github.com/penpot/penpot.git
synced 2025-12-11 22:14:05 +01:00
♻️ Make storage and other objects deletion task vclock aware
This simplifes the mental model on how it works and simplifies testing of the related code. This also normalizes storage object deletion in the same way as the rest of objects in penpot (now future deletion date on storage object also means storage object to be deleted).
This commit is contained in:
@@ -163,9 +163,6 @@
|
||||
backend
|
||||
(:metadata result))))
|
||||
|
||||
(def ^:private sql:retrieve-storage-object
|
||||
"select * from storage_object where id = ? and (deleted_at is null or deleted_at > now())")
|
||||
|
||||
(defn row->storage-object [res]
|
||||
(let [mdata (or (some-> (:metadata res) (db/decode-transit-pgobject)) {})]
|
||||
(impl/storage-object
|
||||
@@ -177,9 +174,15 @@
|
||||
(keyword (:backend res))
|
||||
mdata)))
|
||||
|
||||
(defn- retrieve-database-object
|
||||
(def ^:private sql:get-storage-object
|
||||
"SELECT *
|
||||
FROM storage_object
|
||||
WHERE id = ?
|
||||
AND (deleted_at IS NULL)")
|
||||
|
||||
(defn- get-database-object
|
||||
[conn id]
|
||||
(some-> (db/exec-one! conn [sql:retrieve-storage-object id])
|
||||
(some-> (db/exec-one! conn [sql:get-storage-object id])
|
||||
(row->storage-object)))
|
||||
|
||||
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
|
||||
@@ -202,7 +205,7 @@
|
||||
(defn get-object
|
||||
[{:keys [::db/connectable] :as storage} id]
|
||||
(assert (valid-storage? storage))
|
||||
(retrieve-database-object connectable id))
|
||||
(get-database-object connectable id))
|
||||
|
||||
(defn put-object!
|
||||
"Creates a new object with the provided content."
|
||||
|
||||
@@ -37,7 +37,6 @@
|
||||
(into #{} (map :id))
|
||||
(not-empty))))
|
||||
|
||||
|
||||
(def ^:private sql:delete-sobjects
|
||||
"DELETE FROM storage_object
|
||||
WHERE id = ANY(?::uuid[])")
|
||||
@@ -77,47 +76,37 @@
|
||||
(d/group-by (comp keyword :backend) :id #{} items))
|
||||
|
||||
(def ^:private sql:get-deleted-sobjects
|
||||
"SELECT s.* FROM storage_object AS s
|
||||
"SELECT s.*
|
||||
FROM storage_object AS s
|
||||
WHERE s.deleted_at IS NOT NULL
|
||||
AND s.deleted_at < now() - ?::interval
|
||||
AND s.deleted_at <= ?
|
||||
ORDER BY s.deleted_at ASC")
|
||||
|
||||
(defn- get-buckets
|
||||
[conn min-age]
|
||||
(let [age (db/interval min-age)]
|
||||
[conn]
|
||||
(let [now (ct/now)]
|
||||
(sequence
|
||||
(comp (partition-all 25)
|
||||
(mapcat group-by-backend))
|
||||
(db/cursor conn [sql:get-deleted-sobjects age]))))
|
||||
|
||||
(db/cursor conn [sql:get-deleted-sobjects now]))))
|
||||
|
||||
(defn- clean-deleted!
|
||||
[{:keys [::db/conn ::min-age] :as cfg}]
|
||||
[{:keys [::db/conn] :as cfg}]
|
||||
(reduce (fn [total [backend-id ids]]
|
||||
(let [deleted (delete-in-bulk! cfg backend-id ids)]
|
||||
(+ total (or deleted 0))))
|
||||
0
|
||||
(get-buckets conn min-age)))
|
||||
(get-buckets conn)))
|
||||
|
||||
(defmethod ig/assert-key ::handler
|
||||
[_ params]
|
||||
(assert (sto/valid-storage? (::sto/storage params)) "expect valid storage")
|
||||
(assert (db/pool? (::db/pool params)) "expect valid storage"))
|
||||
|
||||
(defmethod ig/expand-key ::handler
|
||||
[k v]
|
||||
{k (assoc v ::min-age (ct/duration {:hours 2}))})
|
||||
|
||||
(defmethod ig/init-key ::handler
|
||||
[_ {:keys [::min-age] :as cfg}]
|
||||
(fn [{:keys [props] :as task}]
|
||||
(let [min-age (ct/duration (or (:min-age props) min-age))]
|
||||
(db/tx-run! cfg (fn [cfg]
|
||||
(let [cfg (assoc cfg ::min-age min-age)
|
||||
total (clean-deleted! cfg)]
|
||||
|
||||
(l/inf :hint "task finished"
|
||||
:min-age (ct/format-duration min-age)
|
||||
:total total)
|
||||
|
||||
{:deleted total}))))))
|
||||
[_ cfg]
|
||||
(fn [_]
|
||||
(db/tx-run! cfg (fn [cfg]
|
||||
(let [total (clean-deleted! cfg)]
|
||||
(l/inf :hint "task finished" :total total)
|
||||
{:deleted total})))))
|
||||
|
||||
@@ -22,6 +22,8 @@
|
||||
[app.common.data.macros :as dm]
|
||||
[app.common.exceptions :as ex]
|
||||
[app.common.logging :as l]
|
||||
[app.common.time :as ct]
|
||||
[app.config :as cf]
|
||||
[app.db :as db]
|
||||
[app.storage :as-alias sto]
|
||||
[app.storage.impl :as impl]
|
||||
@@ -101,14 +103,15 @@
|
||||
|
||||
(def ^:private sql:mark-delete-in-bulk
|
||||
"UPDATE storage_object
|
||||
SET deleted_at = now(),
|
||||
SET deleted_at = ?,
|
||||
touched_at = NULL
|
||||
WHERE id = ANY(?::uuid[])")
|
||||
|
||||
(defn- mark-delete-in-bulk!
|
||||
[conn ids]
|
||||
(let [ids (db/create-array conn "uuid" ids)]
|
||||
(db/exec-one! conn [sql:mark-delete-in-bulk ids])))
|
||||
[conn deletion-delay ids]
|
||||
(let [ids (db/create-array conn "uuid" ids)
|
||||
now (ct/plus (ct/now) deletion-delay)]
|
||||
(db/exec-one! conn [sql:mark-delete-in-bulk now ids])))
|
||||
|
||||
;; NOTE: A getter that retrieves the key which will be used for group
|
||||
;; ids; previously we have no value, then we introduced the
|
||||
@@ -137,18 +140,20 @@
|
||||
(if-let [{:keys [id] :as object} (first objects)]
|
||||
(if (has-refs? conn object)
|
||||
(do
|
||||
(l/debug :id (str id)
|
||||
:status "freeze"
|
||||
:bucket bucket)
|
||||
(l/dbg :id (str id)
|
||||
:status "freeze"
|
||||
:bucket bucket)
|
||||
(recur (conj to-freeze id) to-delete (rest objects)))
|
||||
(do
|
||||
(l/debug :id (str id)
|
||||
:status "delete"
|
||||
:bucket bucket)
|
||||
(l/dbg :id (str id)
|
||||
:status "delete"
|
||||
:bucket bucket)
|
||||
(recur to-freeze (conj to-delete id) (rest objects))))
|
||||
(do
|
||||
(let [deletion-delay (if (= bucket "tempfile")
|
||||
(ct/duration {:hours 2})
|
||||
(cf/get-deletion-delay))]
|
||||
(some->> (seq to-freeze) (mark-freeze-in-bulk! conn))
|
||||
(some->> (seq to-delete) (mark-delete-in-bulk! conn))
|
||||
(some->> (seq to-delete) (mark-delete-in-bulk! conn deletion-delay))
|
||||
[(count to-freeze) (count to-delete)]))))
|
||||
|
||||
(defn- process-bucket!
|
||||
@@ -173,27 +178,27 @@
|
||||
[0 0]
|
||||
(d/group-by lookup-bucket identity #{} chunk)))
|
||||
|
||||
(def ^:private
|
||||
sql:get-touched-storage-objects
|
||||
(def ^:private sql:get-touched-storage-objects
|
||||
"SELECT so.*
|
||||
FROM storage_object AS so
|
||||
WHERE so.touched_at IS NOT NULL
|
||||
AND so.touched_at <= ?
|
||||
ORDER BY touched_at ASC
|
||||
FOR UPDATE
|
||||
SKIP LOCKED
|
||||
LIMIT 10")
|
||||
|
||||
(defn get-chunk
|
||||
[conn]
|
||||
(->> (db/exec! conn [sql:get-touched-storage-objects])
|
||||
[conn timestamp]
|
||||
(->> (db/exec! conn [sql:get-touched-storage-objects timestamp])
|
||||
(map impl/decode-row)
|
||||
(not-empty)))
|
||||
|
||||
(defn- process-touched!
|
||||
[{:keys [::db/pool] :as cfg}]
|
||||
[{:keys [::db/pool ::timestamp] :as cfg}]
|
||||
(loop [freezed 0
|
||||
deleted 0]
|
||||
(if-let [chunk (get-chunk pool)]
|
||||
(if-let [chunk (get-chunk pool timestamp)]
|
||||
(let [[nfo ndo] (db/tx-run! cfg process-chunk! chunk)]
|
||||
(recur (long (+ freezed nfo))
|
||||
(long (+ deleted ndo))))
|
||||
@@ -209,5 +214,6 @@
|
||||
|
||||
(defmethod ig/init-key ::handler
|
||||
[_ cfg]
|
||||
(fn [_] (process-touched! cfg)))
|
||||
(fn [_]
|
||||
(process-touched! (assoc cfg ::timestamp (ct/now)))))
|
||||
|
||||
|
||||
@@ -18,15 +18,15 @@
|
||||
(def ^:private sql:get-profiles
|
||||
"SELECT id, photo_id FROM profile
|
||||
WHERE deleted_at IS NOT NULL
|
||||
AND deleted_at < now() + ?::interval
|
||||
AND deleted_at <= ?
|
||||
ORDER BY deleted_at ASC
|
||||
LIMIT ?
|
||||
FOR UPDATE
|
||||
SKIP LOCKED")
|
||||
|
||||
(defn- delete-profiles!
|
||||
[{:keys [::db/conn ::deletion-threshold ::chunk-size ::sto/storage] :as cfg}]
|
||||
(->> (db/plan conn [sql:get-profiles deletion-threshold chunk-size] {:fetch-size 5})
|
||||
[{:keys [::db/conn ::timestamp ::chunk-size ::sto/storage] :as cfg}]
|
||||
(->> (db/plan conn [sql:get-profiles timestamp chunk-size] {:fetch-size 5})
|
||||
(reduce (fn [total {:keys [id photo-id]}]
|
||||
(l/trc :obj "profile" :id (str id))
|
||||
|
||||
@@ -41,15 +41,15 @@
|
||||
(def ^:private sql:get-teams
|
||||
"SELECT deleted_at, id, photo_id FROM team
|
||||
WHERE deleted_at IS NOT NULL
|
||||
AND deleted_at < now() + ?::interval
|
||||
AND deleted_at <= ?
|
||||
ORDER BY deleted_at ASC
|
||||
LIMIT ?
|
||||
FOR UPDATE
|
||||
SKIP LOCKED")
|
||||
|
||||
(defn- delete-teams!
|
||||
[{:keys [::db/conn ::deletion-threshold ::chunk-size ::sto/storage] :as cfg}]
|
||||
(->> (db/plan conn [sql:get-teams deletion-threshold chunk-size] {:fetch-size 5})
|
||||
[{:keys [::db/conn ::timestamp ::chunk-size ::sto/storage] :as cfg}]
|
||||
(->> (db/plan conn [sql:get-teams timestamp chunk-size] {:fetch-size 5})
|
||||
(reduce (fn [total {:keys [id photo-id deleted-at]}]
|
||||
(l/trc :obj "team"
|
||||
:id (str id)
|
||||
@@ -68,15 +68,15 @@
|
||||
"SELECT id, team_id, deleted_at, woff1_file_id, woff2_file_id, otf_file_id, ttf_file_id
|
||||
FROM team_font_variant
|
||||
WHERE deleted_at IS NOT NULL
|
||||
AND deleted_at < now() + ?::interval
|
||||
AND deleted_at <= ?
|
||||
ORDER BY deleted_at ASC
|
||||
LIMIT ?
|
||||
FOR UPDATE
|
||||
SKIP LOCKED")
|
||||
|
||||
(defn- delete-fonts!
|
||||
[{:keys [::db/conn ::deletion-threshold ::chunk-size ::sto/storage] :as cfg}]
|
||||
(->> (db/plan conn [sql:get-fonts deletion-threshold chunk-size] {:fetch-size 5})
|
||||
[{:keys [::db/conn ::timestamp ::chunk-size ::sto/storage] :as cfg}]
|
||||
(->> (db/plan conn [sql:get-fonts timestamp chunk-size] {:fetch-size 5})
|
||||
(reduce (fn [total {:keys [id team-id deleted-at] :as font}]
|
||||
(l/trc :obj "font-variant"
|
||||
:id (str id)
|
||||
@@ -98,15 +98,15 @@
|
||||
"SELECT id, deleted_at, team_id
|
||||
FROM project
|
||||
WHERE deleted_at IS NOT NULL
|
||||
AND deleted_at < now() + ?::interval
|
||||
AND deleted_at <= ?
|
||||
ORDER BY deleted_at ASC
|
||||
LIMIT ?
|
||||
FOR UPDATE
|
||||
SKIP LOCKED")
|
||||
|
||||
(defn- delete-projects!
|
||||
[{:keys [::db/conn ::deletion-threshold ::chunk-size] :as cfg}]
|
||||
(->> (db/plan conn [sql:get-projects deletion-threshold chunk-size] {:fetch-size 5})
|
||||
[{:keys [::db/conn ::timestamp ::chunk-size] :as cfg}]
|
||||
(->> (db/plan conn [sql:get-projects timestamp chunk-size] {:fetch-size 5})
|
||||
(reduce (fn [total {:keys [id team-id deleted-at]}]
|
||||
(l/trc :obj "project"
|
||||
:id (str id)
|
||||
@@ -124,15 +124,15 @@
|
||||
f.project_id
|
||||
FROM file AS f
|
||||
WHERE f.deleted_at IS NOT NULL
|
||||
AND f.deleted_at < now() + ?::interval
|
||||
AND f.deleted_at <= ?
|
||||
ORDER BY f.deleted_at ASC
|
||||
LIMIT ?
|
||||
FOR UPDATE
|
||||
SKIP LOCKED")
|
||||
|
||||
(defn- delete-files!
|
||||
[{:keys [::db/conn ::deletion-threshold ::chunk-size] :as cfg}]
|
||||
(->> (db/plan conn [sql:get-files deletion-threshold chunk-size] {:fetch-size 5})
|
||||
[{:keys [::db/conn ::timestamp ::chunk-size] :as cfg}]
|
||||
(->> (db/plan conn [sql:get-files timestamp chunk-size] {:fetch-size 5})
|
||||
(reduce (fn [total {:keys [id deleted-at project-id] :as file}]
|
||||
(l/trc :obj "file"
|
||||
:id (str id)
|
||||
@@ -148,15 +148,15 @@
|
||||
"SELECT file_id, revn, media_id, deleted_at
|
||||
FROM file_thumbnail
|
||||
WHERE deleted_at IS NOT NULL
|
||||
AND deleted_at < now() + ?::interval
|
||||
AND deleted_at <= ?
|
||||
ORDER BY deleted_at ASC
|
||||
LIMIT ?
|
||||
FOR UPDATE
|
||||
SKIP LOCKED")
|
||||
|
||||
(defn delete-file-thumbnails!
|
||||
[{:keys [::db/conn ::deletion-threshold ::chunk-size ::sto/storage] :as cfg}]
|
||||
(->> (db/plan conn [sql:get-file-thumbnails deletion-threshold chunk-size] {:fetch-size 5})
|
||||
[{:keys [::db/conn ::timestamp ::chunk-size ::sto/storage] :as cfg}]
|
||||
(->> (db/plan conn [sql:get-file-thumbnails timestamp chunk-size] {:fetch-size 5})
|
||||
(reduce (fn [total {:keys [file-id revn media-id deleted-at]}]
|
||||
(l/trc :obj "file-thumbnail"
|
||||
:file-id (str file-id)
|
||||
@@ -175,15 +175,15 @@
|
||||
"SELECT file_id, object_id, media_id, deleted_at
|
||||
FROM file_tagged_object_thumbnail
|
||||
WHERE deleted_at IS NOT NULL
|
||||
AND deleted_at < now() + ?::interval
|
||||
AND deleted_at <= ?
|
||||
ORDER BY deleted_at ASC
|
||||
LIMIT ?
|
||||
FOR UPDATE
|
||||
SKIP LOCKED")
|
||||
|
||||
(defn delete-file-object-thumbnails!
|
||||
[{:keys [::db/conn ::deletion-threshold ::chunk-size ::sto/storage] :as cfg}]
|
||||
(->> (db/plan conn [sql:get-file-object-thumbnails deletion-threshold chunk-size] {:fetch-size 5})
|
||||
[{:keys [::db/conn ::timestamp ::chunk-size ::sto/storage] :as cfg}]
|
||||
(->> (db/plan conn [sql:get-file-object-thumbnails timestamp chunk-size] {:fetch-size 5})
|
||||
(reduce (fn [total {:keys [file-id object-id media-id deleted-at]}]
|
||||
(l/trc :obj "file-object-thumbnail"
|
||||
:file-id (str file-id)
|
||||
@@ -203,15 +203,15 @@
|
||||
"SELECT id, file_id, media_id, thumbnail_id, deleted_at
|
||||
FROM file_media_object
|
||||
WHERE deleted_at IS NOT NULL
|
||||
AND deleted_at < now() + ?::interval
|
||||
AND deleted_at <= ?
|
||||
ORDER BY deleted_at ASC
|
||||
LIMIT ?
|
||||
FOR UPDATE
|
||||
SKIP LOCKED")
|
||||
|
||||
(defn- delete-file-media-objects!
|
||||
[{:keys [::db/conn ::deletion-threshold ::chunk-size ::sto/storage] :as cfg}]
|
||||
(->> (db/plan conn [sql:get-file-media-objects deletion-threshold chunk-size] {:fetch-size 5})
|
||||
[{:keys [::db/conn ::timestamp ::chunk-size ::sto/storage] :as cfg}]
|
||||
(->> (db/plan conn [sql:get-file-media-objects timestamp chunk-size] {:fetch-size 5})
|
||||
(reduce (fn [total {:keys [id file-id deleted-at] :as fmo}]
|
||||
(l/trc :obj "file-media-object"
|
||||
:id (str id)
|
||||
@@ -231,16 +231,15 @@
|
||||
"SELECT file_id, id, type, deleted_at, metadata, backend
|
||||
FROM file_data
|
||||
WHERE deleted_at IS NOT NULL
|
||||
AND deleted_at < now() + ?::interval
|
||||
AND deleted_at <= ?
|
||||
ORDER BY deleted_at ASC
|
||||
LIMIT ?
|
||||
FOR UPDATE
|
||||
SKIP LOCKED")
|
||||
|
||||
(defn- delete-file-data!
|
||||
[{:keys [::db/conn ::deletion-threshold ::chunk-size] :as cfg}]
|
||||
|
||||
(->> (db/plan conn [sql:get-file-data deletion-threshold chunk-size] {:fetch-size 5})
|
||||
[{:keys [::db/conn ::timestamp ::chunk-size] :as cfg}]
|
||||
(->> (db/plan conn [sql:get-file-data timestamp chunk-size] {:fetch-size 5})
|
||||
(reduce (fn [total {:keys [file-id id type deleted-at metadata backend]}]
|
||||
|
||||
(some->> metadata
|
||||
@@ -266,15 +265,15 @@
|
||||
"SELECT id, file_id, deleted_at
|
||||
FROM file_change
|
||||
WHERE deleted_at IS NOT NULL
|
||||
AND deleted_at < now() + ?::interval
|
||||
AND deleted_at <= ?
|
||||
ORDER BY deleted_at ASC
|
||||
LIMIT ?
|
||||
FOR UPDATE
|
||||
SKIP LOCKED")
|
||||
|
||||
(defn- delete-file-changes!
|
||||
[{:keys [::db/conn ::deletion-threshold ::chunk-size] :as cfg}]
|
||||
(->> (db/plan conn [sql:get-file-change deletion-threshold chunk-size] {:fetch-size 5})
|
||||
[{:keys [::db/conn ::timestamp ::chunk-size] :as cfg}]
|
||||
(->> (db/plan conn [sql:get-file-change timestamp chunk-size] {:fetch-size 5})
|
||||
(reduce (fn [total {:keys [id file-id deleted-at] :as xlog}]
|
||||
(l/trc :obj "file-change"
|
||||
:id (str id)
|
||||
@@ -322,9 +321,8 @@
|
||||
|
||||
(defmethod ig/init-key ::handler
|
||||
[_ cfg]
|
||||
(fn [{:keys [props] :as task}]
|
||||
(let [threshold (ct/duration (get props :deletion-threshold 0))
|
||||
cfg (assoc cfg ::deletion-threshold (db/interval threshold))]
|
||||
(fn [_]
|
||||
(let [cfg (assoc cfg ::timestamp (ct/now))]
|
||||
(loop [procs (map deref deletion-proc-vars)
|
||||
total 0]
|
||||
(if-let [proc-fn (first procs)]
|
||||
|
||||
Reference in New Issue
Block a user