♻️ Refactor file storage

Make it more scallable and make it easily extensible
This commit is contained in:
Andrey Antukh
2025-07-10 17:26:42 +02:00
parent 27bed84543
commit 5717708b56
56 changed files with 2291 additions and 2426 deletions

View File

@@ -15,6 +15,7 @@
[app.config :as cf]
[app.db :as db]
[app.db.sql :as sql]
[app.features.fdata :as fdata]
[app.http :as http]
[app.rpc :as-alias rpc]
[app.rpc.commands.files :as files]
@@ -185,10 +186,10 @@
shape-id (uuid/random)]
;; Preventive file-gc
(t/is (true? (th/run-task! :file-gc {:min-age 0 :file-id (:id file)})))
(t/is (true? (th/run-task! :file-gc {:file-id (:id file) :revn (:revn file)})))
;; Check the number of fragments before adding the page
(let [rows (th/db-query :file-data-fragment {:file-id (:id file)})]
(let [rows (th/db-query :file-data {:file-id (:id file) :type "fragment"})]
(t/is (= 2 (count rows))))
;; Add page
@@ -203,22 +204,23 @@
:id page-id}])
;; Check the number of fragments before adding the page
(let [rows (th/db-query :file-data-fragment {:file-id (:id file)})]
(let [rows (th/db-query :file-data {:file-id (:id file) :type "fragment"})]
(t/is (= 3 (count rows))))
;; The file-gc should mark for remove unused fragments
(t/is (true? (th/run-task! :file-gc {:min-age 0 :file-id (:id file)})))
(t/is (true? (th/run-task! :file-gc {:file-id (:id file)})))
;; Check the number of fragments
(let [rows (th/db-query :file-data-fragment {:file-id (:id file)})]
(t/is (= 5 (count rows))))
(let [rows (th/db-query :file-data {:file-id (:id file) :type "fragment"})]
(t/is (= 5 (count rows)))
(t/is (= 3 (count (filterv :deleted-at rows)))))
;; The objects-gc should remove unused fragments
(let [res (th/run-task! :objects-gc {})]
(t/is (= 3 (:processed res))))
;; Check the number of fragments
(let [rows (th/db-query :file-data-fragment {:file-id (:id file)})]
(let [rows (th/db-query :file-data {:file-id (:id file) :type "fragment"})]
(t/is (= 2 (count rows))))
;; Add shape to page that should add a new fragment
@@ -242,44 +244,47 @@
:type :rect})}])
;; Check the number of fragments
(let [rows (th/db-query :file-data-fragment {:file-id (:id file)})]
(let [rows (th/db-query :file-data {:file-id (:id file) :type "fragment"})]
(t/is (= 3 (count rows))))
;; The file-gc should mark for remove unused fragments
(t/is (true? (th/run-task! :file-gc {:min-age 0 :file-id (:id file)})))
(t/is (true? (th/run-task! :file-gc {:file-id (:id file)})))
;; The objects-gc should remove unused fragments
(let [res (th/run-task! :objects-gc {})]
(t/is (= 3 (:processed res))))
;; Check the number of fragments;
(let [rows (th/db-query :file-data-fragment {:file-id (:id file)
:deleted-at nil})]
(let [rows (th/db-query :file-data {:file-id (:id file)
:type "fragment"
:deleted-at nil})]
(t/is (= 2 (count rows))))
;; Lets proceed to delete all changes
(th/db-delete! :file-change {:file-id (:id file)})
(th/db-delete! :file-data {:file-id (:id file) :type "snapshot"})
(th/db-update! :file
{:has-media-trimmed false}
{:id (:id file)})
;; The file-gc should remove fragments related to changes
;; snapshots previously deleted.
(t/is (true? (th/run-task! :file-gc {:min-age 0 :file-id (:id file)})))
(t/is (true? (th/run-task! :file-gc {:file-id (:id file)})))
;; Check the number of fragments;
(let [rows (th/db-query :file-data-fragment {:file-id (:id file)})]
(let [rows (th/db-query :file-data {:file-id (:id file) :type "fragment"})]
;; (pp/pprint rows)
(t/is (= 4 (count rows)))
(t/is (= 2 (count (remove (comp some? :deleted-at) rows)))))
(t/is (= 2 (count (remove :deleted-at rows)))))
(let [res (th/run-task! :objects-gc {})]
(t/is (= 2 (:processed res))))
(let [rows (th/db-query :file-data-fragment {:file-id (:id file)})]
(let [rows (th/db-query :file-data {:file-id (:id file) :type "fragment"})]
(t/is (= 2 (count rows)))))))
(t/deftest file-gc-task-with-thumbnails
(t/deftest file-gc-with-thumbnails
(letfn [(add-file-media-object [& {:keys [profile-id file-id]}]
(let [mfile {:filename "sample.jpg"
:path (th/tempfile "backend_tests/test_files/sample.jpg")
@@ -347,7 +352,7 @@
:fills [{:fill-opacity 1
:fill-image {:id (:id fmo1) :width 100 :height 100 :mtype "image/jpeg"}}]})}])
;; Check that reference storage objects on filemediaobjects
;; Check that reference storage objects on file_media_objects
;; are the same because of deduplication feature.
(t/is (= (:media-id fmo1) (:media-id fmo2)))
(t/is (= (:thumbnail-id fmo1) (:thumbnail-id fmo2)))
@@ -360,32 +365,33 @@
(t/is (= 2 (:freeze res)))
(t/is (= 0 (:delete res))))
;; run the file-gc task immediately without forced min-age
(t/is (false? (th/run-task! :file-gc {:file-id (:id file)})))
;; run the task again
(t/is (true? (th/run-task! :file-gc {:min-age 0 :file-id (:id file)})))
(t/is (true? (th/run-task! :file-gc {:file-id (:id file)})))
;; retrieve file and check trimmed attribute
(let [row (th/db-get :file {:id (:id file)})]
(t/is (true? (:has-media-trimmed row))))
;; check file media objects
(let [rows (th/db-query :file-media-object {:file-id (:id file)})]
(t/is (= 2 (count rows)))
(t/is (= 1 (count (remove (comp some? :deleted-at) rows)))))
(let [[row1 row2 :as rows]
(th/db-query :file-media-object
{:file-id (:id file)}
{:order-by [:created-at]})]
(t/is (= (:id fmo1) (:id row1)))
(t/is (= (:id fmo2) (:id row2)))
(t/is (ct/inst? (:deleted-at row2))))
(let [res (th/run-task! :objects-gc {})]
;; delete 2 fragments and 1 media object
(t/is (= 3 (:processed res))))
;; check file media objects
(let [rows (th/db-query :file-media-object {:file-id (:id file)})]
(t/is (= 1 (count rows)))
(t/is (= 1 (count (remove (comp some? :deleted-at) rows)))))
(t/is (= 1 (count (remove :deleted-at rows)))))
;; The underlying storage objects are still available.
(t/is (some? (sto/get-object storage (:media-id fmo2))))
(t/is (some? (sto/get-object storage (:thumbnail-id fmo2))))
(t/is (some? (sto/get-object storage (:media-id fmo1))))
(t/is (some? (sto/get-object storage (:thumbnail-id fmo1))))
@@ -402,34 +408,40 @@
;; Now, we have deleted the usage of pointers to the
;; file-media-objects, if we paste file-gc, they should be marked
;; as deleted.
(t/is (true? (th/run-task! :file-gc {:min-age 0 :file-id (:id file)})))
(t/is (true? (th/run-task! :file-gc {:file-id (:id file)})))
;; This only clears fragments, the file media objects still referenced because
;; snapshots are preserved
(let [res (th/run-task! :objects-gc {})]
(t/is (= 2 (:processed res))))
;; Mark all snapshots to be a non-snapshot file change
(th/db-exec! ["update file_change set data = null where file_id = ?" (:id file)])
;; Delete all snapshots
(th/db-exec! ["update file_data set deleted_at = now() where file_id = ? and type = 'snapshot'" (:id file)])
(th/db-exec! ["update file_change set deleted_at = now() where file_id = ? and label is not null" (:id file)])
(th/db-exec! ["update file set has_media_trimmed = false where id = ?" (:id file)])
(let [res (th/run-task! :objects-gc {:deletion-threshold 0})]
;; this will remove the file change and file data entries for two snapshots
(t/is (= 4 (:processed res))))
;; Rerun the file-gc and objects-gc
(t/is (true? (th/run-task! :file-gc {:min-age 0 :file-id (:id file)})))
(let [res (th/run-task! :objects-gc {})]
(t/is (true? (th/run-task! :file-gc {:file-id (:id file)})))
(let [res (th/run-task! :objects-gc {:deletion-threshold 0})]
;; this will remove the file media objects marked as deleted
;; on prev file-gc
(t/is (= 2 (:processed res))))
;; Now that file-gc have deleted the file-media-object usage,
;; lets execute the touched-gc task, we should see that two of
;; them are marked to be deleted.
;; them are marked to be deleted
(let [res (th/run-task! :storage-gc-touched {:min-age 0})]
(t/is (= 0 (:freeze res)))
(t/is (= 2 (:delete res))))
;; Finally, check that some of the objects that are marked as
;; deleted we are unable to retrieve them using standard storage
;; public api.
(t/is (nil? (sto/get-object storage (:media-id fmo2))))
(t/is (nil? (sto/get-object storage (:thumbnail-id fmo2))))
;; public api
(t/is (nil? (sto/get-object storage (:media-id fmo1))))
(t/is (nil? (sto/get-object storage (:thumbnail-id fmo1)))))))
@@ -470,8 +482,9 @@
page-id (first (get-in file [:data :pages]))]
(let [rows (th/db-query :file-data-fragment {:file-id (:id file)
:deleted-at nil})]
(let [rows (th/db-query :file-data {:file-id (:id file)
:type "fragment"
:deleted-at nil})]
(t/is (= (count rows) 1)))
;; Update file inserting a new image object
@@ -536,17 +549,15 @@
:strokes [{:stroke-opacity 1 :stroke-image {:id (:id fmo5) :width 100 :height 100 :mtype "image/jpeg"}}]})}])
;; run the file-gc task immediately without forced min-age
(t/is (false? (th/run-task! :file-gc {:file-id (:id file)})))
;; run the task again
(t/is (true? (th/run-task! :file-gc {:min-age 0 :file-id (:id file)})))
(t/is (true? (th/run-task! :file-gc {:file-id (:id file)})))
(let [res (th/run-task! :objects-gc {})]
(t/is (= 2 (:processed res))))
(let [rows (th/db-query :file-data-fragment {:file-id (:id file)
:deleted-at nil})]
(let [rows (th/db-query :file-data {:file-id (:id file)
:type "fragment"
:deleted-at nil})]
(t/is (= (count rows) 1)))
;; retrieve file and check trimmed attribute
@@ -583,7 +594,7 @@
;; Now, we have deleted the usage of pointers to the
;; file-media-objects, if we paste file-gc, they should be marked
;; as deleted.
(t/is (true? (th/run-task! :file-gc {:min-age 0 :file-id (:id file)})))
(t/is (true? (th/run-task! :file-gc {:file-id (:id file)})))
;; This only removes unused fragments, file media are still
;; referenced on snapshots.
@@ -592,16 +603,18 @@
;; Mark all snapshots to be a non-snapshot file change
(th/db-exec! ["update file set has_media_trimmed = false where id = ?" (:id file)])
(th/db-exec! ["update file_change set data = null where file_id = ?" (:id file)])
(th/db-delete! :file-data {:file-id (:id file)
:type "snapshot"})
;; Rerun file-gc and objects-gc task for the same file once all snapshots are
;; "expired/deleted"
(t/is (true? (th/run-task! :file-gc {:min-age 0 :file-id (:id file)})))
(t/is (true? (th/run-task! :file-gc {:file-id (:id file)})))
(let [res (th/run-task! :objects-gc {})]
(t/is (= 6 (:processed res))))
(let [rows (th/db-query :file-data-fragment {:file-id (:id file)
:deleted-at nil})]
(let [rows (th/db-query :file-data {:file-id (:id file)
:type "fragment"
:deleted-at nil})]
(t/is (= (count rows) 1)))
;; Now that file-gc have deleted the file-media-object usage,
@@ -620,7 +633,7 @@
(t/is (nil? (sto/get-object storage (:media-id fmo2))))
(t/is (nil? (sto/get-object storage (:media-id fmo1)))))))
(t/deftest file-gc-task-with-object-thumbnails
(t/deftest file-gc-with-object-thumbnails
(letfn [(insert-file-object-thumbnail! [& {:keys [profile-id file-id page-id frame-id]}]
(let [object-id (thc/fmt-object-id file-id page-id frame-id "frame")
mfile {:filename "sample.jpg"
@@ -704,11 +717,7 @@
(t/is (= 1 (:freeze res)))
(t/is (= 0 (:delete res))))
;; run the file-gc task immediately without forced min-age
(t/is (false? (th/run-task! :file-gc {:file-id (:id file)})))
;; run the task again
(t/is (true? (th/run-task! :file-gc {:min-age 0 :file-id (:id file)})))
(t/is (true? (th/run-task! :file-gc {:file-id (:id file)})))
;; retrieve file and check trimmed attribute
(let [row (th/db-get :file {:id (:id file)})]
@@ -738,7 +747,7 @@
:page-id page-id
:id frame-id-2}])
(t/is (true? (th/run-task! :file-gc {:min-age 0 :file-id (:id file)})))
(t/is (true? (th/run-task! :file-gc {:file-id (:id file)})))
(let [rows (th/db-query :file-tagged-object-thumbnail {:file-id file-id})]
(t/is (= 2 (count rows)))
@@ -772,7 +781,7 @@
:page-id page-id
:id frame-id-1}])
(t/is (true? (th/run-task! :file-gc {:min-age 0 :file-id (:id file)})))
(t/is (true? (th/run-task! :file-gc {:file-id (:id file)})))
(let [rows (th/db-query :file-tagged-object-thumbnail {:file-id file-id})]
(t/is (= 1 (count rows)))
@@ -933,6 +942,8 @@
out (th/command! params)]
(t/is (nil? (:error out))))
(th/run-pending-tasks!)
;; query the list of files after soft deletion
(let [data {::th/type :get-project-files
::rpc/profile-id (:id profile1)
@@ -943,11 +954,6 @@
(let [result (:result out)]
(t/is (= 0 (count result)))))
;; run permanent deletion (should be noop)
(let [result (th/run-task! :objects-gc {})]
(t/is (= 0 (:processed result))))
;; query the list of file libraries of a after hard deletion
(let [data {::th/type :get-file-libraries
::rpc/profile-id (:id profile1)
:file-id (:id file)}
@@ -957,9 +963,13 @@
(let [result (:result out)]
(t/is (= 0 (count result)))))
;; run permanent deletion (should be noop)
(let [result (th/run-task! :objects-gc {})]
(t/is (= 0 (:processed result))))
;; run permanent deletion
(let [result (th/run-task! :objects-gc {:deletion-threshold (cf/get-deletion-delay)})]
(t/is (= 1 (:processed result))))
(t/is (= 3 (:processed result))))
;; query the list of file libraries of a after hard deletion
(let [data {::th/type :get-file-libraries
@@ -972,7 +982,6 @@
(t/is (th/ex-info? error))
(t/is (= (:type error-data) :not-found))))))
(t/deftest object-thumbnails-ops
(let [prof (th/create-profile* 1 {:is-active true})
file (th/create-file* 1 {:profile-id (:id prof)
@@ -1282,17 +1291,19 @@
:is-shared false})
page-id (uuid/random)
shape-id (uuid/random)]
shape-id (uuid/random)
sobject (volatile! nil)]
;; Preventive file-gc
(t/is (true? (th/run-task! :file-gc {:min-age 0 :file-id (:id file)})))
;; Preventive file-gc
(t/is (true? (th/run-task! :file-gc {:file-id (:id file)})))
;; Preventive objects-gc
;; Preventive objects-gc
(let [result (th/run-task! :objects-gc {})]
;; deletes the fragment created by file-gc
(t/is (= 1 (:processed result))))
;; Check the number of fragments before adding the page
(let [rows (th/db-query :file-data-fragment {:file-id (:id file)})]
;; Check the number of fragments before adding the page
(let [rows (th/db-query :file-data {:file-id (:id file) :type "fragment"})]
(t/is (= 1 (count rows)))
(t/is (every? #(some? (:data %)) rows)))
@@ -1301,35 +1312,42 @@
{:has-media-trimmed false}
{:id (:id file)})
;; Run FileGC again, with tiered storage activated
;; Run FileGC again, with tiered storage activated
(with-redefs [app.config/flags (conj app.config/flags :tiered-file-data-storage)]
(t/is (true? (th/run-task! :file-gc {:min-age 0 :file-id (:id file)})))
(t/is (true? (th/run-task! :file-gc {:file-id (:id file)}))))
;; The FileGC task will schedule an inner taskq
(th/run-pending-tasks!))
;; The FileGC task will schedule an inner taskq
(th/run-pending-tasks!)
;; Clean objects after file-gc
(let [res (th/run-task! :storage-gc-touched {:min-age 0})]
(t/is (= 2 (:freeze res)))
(t/is (= 0 (:delete res))))
;; Clean objects after file-gc
(let [result (th/run-task! :objects-gc {})]
(t/is (= 1 (:processed result))))
;; Check the number of fragments before adding the page
(let [rows (th/db-query :file-data-fragment {:file-id (:id file)})]
;; Check the number of fragments before adding the page
(let [rows (th/db-query :file-data {:file-id (:id file) :type "fragment"})]
;; (pp/pprint rows)
(t/is (= 1 (count rows)))
(t/is (every? #(nil? (:data %)) rows))
(t/is (every? #(uuid? (:data-ref-id %)) rows))
(t/is (every? #(= "objects-storage" (:data-backend %)) rows)))
(t/is (every? #(= "storage" (:backend %)) rows)))
(let [file (th/db-get :file {:id (:id file)})
(let [file (-> (th/db-get :file-data {:id (:id file) :type "main"})
(update :metadata fdata/decode-metadata))
storage (sto/resolve th/*system*)]
(t/is (= "objects-storage" (:data-backend file)))
;; (pp/pprint file)
(t/is (= "storage" (:backend file)))
(t/is (nil? (:data file)))
(t/is (uuid? (:data-ref-id file)))
(let [sobj (sto/get-object storage (:data-ref-id file))]
(let [sobj (sto/get-object storage (-> file :metadata :storage-ref-id))]
(vreset! sobject sobj)
;; (pp/pprint (meta sobj))
(t/is (= "file-data" (:bucket (meta sobj))))
(t/is (= (:id file) (:file-id (meta sobj))))))
;; Add shape to page that should load from cold storage again into the hot storage (db)
;; Add shape to page that should load from cold storage again into the hot storage (db)
(update-file!
:file-id (:id file)
:profile-id (:id profile)
@@ -1340,36 +1358,68 @@
:name "test"
:id page-id}])
;; Check the number of fragments
(let [rows (th/db-query :file-data-fragment {:file-id (:id file)})]
(t/is (= 2 (count rows))))
;; Check the number of fragments
;; Check the number of fragments
(let [[row1 row2 :as rows]
(th/db-query :file-data-fragment
(th/db-query :file-data
{:file-id (:id file)
:deleted-at nil}
:type "fragment"}
{:order-by [:created-at]})]
;; (pp/pprint rows)
(t/is (= 2 (count rows)))
(t/is (nil? (:data row1)))
(t/is (= "objects-storage" (:data-backend row1)))
(t/is (= "storage" (:backend row1)))
(t/is (bytes? (:data row2)))
(t/is (nil? (:data-backend row2))))
(t/is (= "db" (:backend row2))))
;; The file-gc should mark for remove unused fragments
(t/is (true? (th/run-task! :file-gc {:min-age 0 :file-id (:id file)})))
;; The objects-gc should remove unused fragments
;; The file-gc should mark for remove unused fragments
(t/is (true? (th/run-task! :file-gc {:file-id (:id file)})))
;; The file-gc task, recreates all fragments, so after it we have
;; now the double of fragments, and the old ones are marked as
;; deleted, and the new ones are on DB
(let [[row1 row2 row3 row4 :as rows]
(th/db-query :file-data
{:file-id (:id file)
:type "fragment"}
{:order-by [:created-at]})]
;; (pp/pprint rows)
(t/is (= 4 (count rows)))
(t/is (nil? (:data row1)))
(t/is (ct/inst? (:deleted-at row1)))
(t/is (= "storage" (:backend row1)))
(t/is (bytes? (:data row2)))
(t/is (= "db" (:backend row2)))
(t/is (ct/inst? (:deleted-at row2)))
(t/is (bytes? (:data row3)))
(t/is (= "db" (:backend row3)))
(t/is (nil? (:deleted-at row3)))
(t/is (bytes? (:data row4)))
(t/is (= "db" (:backend row4)))
(t/is (nil? (:deleted-at row4))))
;; The objects-gc should remove the marked to delete fragments
(let [res (th/run-task! :objects-gc {})]
(t/is (= 2 (:processed res))))
;; Check the number of fragments before adding the page
(let [rows (th/db-query :file-data-fragment {:file-id (:id file)})]
(let [rows (th/db-query :file-data {:file-id (:id file) :type "fragment"})]
(t/is (= 2 (count rows)))
(t/is (every? #(bytes? (:data %)) rows))
(t/is (every? #(nil? (:data-ref-id %)) rows))
(t/is (every? #(nil? (:data-backend %)) rows)))))
(t/is (every? #(= "db" (:backend %)) rows)))
;; we ensure that once object-gc is passed and marked two storage
;; objects to delete
(let [res (th/run-task! :storage-gc-touched {:min-age 0})]
(t/is (= 0 (:freeze res)))
(t/is (= 2 (:delete res))))
(let [storage (sto/resolve th/*system*)]
(t/is (uuid? (:id @sobject)))
(t/is (nil? (sto/get-object storage (:id @sobject)))))))
(t/deftest file-gc-with-components-1
(let [storage (:app.storage/storage th/*system*)
@@ -1384,8 +1434,9 @@
page-id (first (get-in file [:data :pages]))]
(let [rows (th/db-query :file-data-fragment {:file-id (:id file)
:deleted-at nil})]
(let [rows (th/db-query :file-data {:file-id (:id file)
:type "fragment"
:deleted-at nil})]
(t/is (= (count rows) 1)))
;; Update file inserting new component
@@ -1437,11 +1488,8 @@
:id c-id
:anotation nil}])
;; Run the file-gc task immediately without forced min-age
(t/is (false? (th/run-task! :file-gc {:file-id (:id file)})))
;; Run the task again
(t/is (true? (th/run-task! :file-gc {:min-age 0 :file-id (:id file)})))
(t/is (true? (th/run-task! :file-gc {:file-id (:id file)})))
;; Retrieve file and check trimmed attribute
(let [row (th/db-get :file {:id (:id file)})]
@@ -1651,8 +1699,7 @@
(t/is (some? (not-empty (:objects component))))))
;; Re-run the file-gc task
(t/is (true? (th/run-task! :file-gc {:min-age 0 :file-id (:id file-1)})))
(t/is (false? (th/run-task! :file-gc {:min-age 0 :file-id (:id file-2)})))
(t/is (true? (th/run-task! :file-gc {:file-id (:id file-1)})))
;; Check that component is still there after file-gc task
(let [data {::th/type :get-file