From c1058c7fdbec22f92f07cad9e4e80a230d5ae187 Mon Sep 17 00:00:00 2001 From: Andrey Antukh Date: Wed, 6 Aug 2025 15:43:47 +0200 Subject: [PATCH] :recycle: Add minor refactor for internal concurrency model Replace general usage of virtual threads with platform threads and use virtual threads for lightweight procs such that websocket connections. This decision is made mainly because virtual threads does not appear on thread dumps in an easy way so debugging issues becomes very difficult. The threads requirement of penpot for serving http requests is not very big so having so this decision does not really affects the resource usage. --- CHANGES.md | 20 +++ backend/deps.edn | 8 +- backend/src/app/binfile/common.clj | 7 +- backend/src/app/config.clj | 14 +-- backend/src/app/features/fdata.clj | 8 +- backend/src/app/http.clj | 54 ++++---- backend/src/app/http/awsns.clj | 6 +- backend/src/app/http/sse.clj | 2 +- backend/src/app/main.clj | 74 +++++------ backend/src/app/msgbus.clj | 3 +- backend/src/app/redis.clj | 45 ++++--- backend/src/app/rpc/climit.clj | 11 +- backend/src/app/rpc/commands/auth.clj | 7 +- backend/src/app/rpc/commands/binfile.clj | 11 +- backend/src/app/rpc/commands/demo.clj | 4 +- backend/src/app/rpc/commands/files.clj | 14 +-- backend/src/app/rpc/commands/files_update.clj | 20 ++- backend/src/app/rpc/commands/fonts.clj | 8 +- backend/src/app/rpc/commands/management.clj | 32 ++--- backend/src/app/rpc/commands/media.clj | 8 +- backend/src/app/rpc/commands/profile.clj | 21 +--- backend/src/app/srepl/cli.clj | 8 +- backend/src/app/storage/s3.clj | 28 ++--- backend/src/app/storage/tmp.clj | 2 +- backend/src/app/util/events.clj | 4 +- backend/src/app/worker/dispatcher.clj | 2 +- backend/src/app/worker/executor.clj | 118 ++++++++---------- backend/src/app/worker/runner.clj | 4 +- backend/test/backend_tests/helpers.clj | 1 - common/src/app/common/time.cljc | 3 +- 30 files changed, 249 insertions(+), 298 deletions(-) diff --git a/CHANGES.md b/CHANGES.md index 315dcb759e..b2a565c81e 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -6,6 +6,26 @@ ### :boom: Breaking changes & Deprecations +- Remove already deprecated configuration variables with the prefix + `PENPOT_ASSETS_*` (replaced by variables named with + `PENPOT_OBJECTS_STORAGE_*`. + +- Replace the `PENPOT_OBJECTS_STORAGE_S3_IO_THREADS` with a more + general configuration `PENPOT_NETTY_IO_THREADS` used to configure a + shared netty resources across different services which use netty + internally (redis connection, S3 SDK client). This configuration is + not very commonly used so don't expected real impact on any user. + +- Add `PENPOT_NETTY_IO_THREADS` and `PENPOT_EXECUTOR_THREADS` + variables for provide the control over concurrency of the shared + resources used by netty. Penpot uses the netty IO threads for AWS S3 + SDK and Redis/Valkey communication, and the EXEC threads to perform + out of HTTP serving threads tasks such that cache invalidation, S3 + response completion, configuration reloading and many other auxiliar + tasks. By default they use a half number if available cpus with a + minumum of 2 for both executors. You should not touch that variables + unless you are really know what you are doing. + ### :heart: Community contributions (Thank you!) ### :sparkles: New features & Enhancements diff --git a/backend/deps.edn b/backend/deps.edn index 752ed86b2b..a6f6fcfecd 100644 --- a/backend/deps.edn +++ b/backend/deps.edn @@ -28,8 +28,8 @@ com.google.guava/guava {:mvn/version "33.4.8-jre"} funcool/yetti - {:git/tag "v11.4" - :git/sha "ce50d42" + {:git/tag "v11.6" + :git/sha "94dc017" :git/url "https://github.com/funcool/yetti.git" :exclusions [org.slf4j/slf4j-api]} @@ -86,7 +86,9 @@ :test {:main-opts ["-m" "kaocha.runner"] - :jvm-opts ["-Dlog4j2.configurationFile=log4j2-devenv-repl.xml"] + :jvm-opts ["-Dlog4j2.configurationFile=log4j2-devenv-repl.xml" + "--sun-misc-unsafe-memory-access=allow" + "--enable-native-access=ALL-UNNAMED"] :extra-deps {lambdaisland/kaocha {:mvn/version "1.91.1392"}}} :outdated diff --git a/backend/src/app/binfile/common.clj b/backend/src/app/binfile/common.clj index 6299ab9179..33de882825 100644 --- a/backend/src/app/binfile/common.clj +++ b/backend/src/app/binfile/common.clj @@ -34,8 +34,7 @@ [clojure.set :as set] [cuerdas.core :as str] [datoteka.fs :as fs] - [datoteka.io :as io] - [promesa.exec :as px])) + [datoteka.io :as io])) (set! *warn-on-reflection* true) @@ -476,7 +475,7 @@ (vary-meta dissoc ::fmg/migrated)))) (defn encode-file - [{:keys [::wrk/executor] :as cfg} {:keys [id features] :as file}] + [cfg {:keys [id features] :as file}] (let [file (if (and (contains? features "fdata/objects-map") (:data file)) (fdata/enable-objects-map file) @@ -493,7 +492,7 @@ (-> file (d/update-when :features into-array) - (d/update-when :data (fn [data] (px/invoke! executor #(blob/encode data))))))) + (d/update-when :data blob/encode)))) (defn- file->params [file] diff --git a/backend/src/app/config.clj b/backend/src/app/config.clj index 805608da5f..fe8daaed7f 100644 --- a/backend/src/app/config.clj +++ b/backend/src/app/config.clj @@ -96,7 +96,7 @@ [:http-server-max-body-size {:optional true} ::sm/int] [:http-server-max-multipart-body-size {:optional true} ::sm/int] [:http-server-io-threads {:optional true} ::sm/int] - [:http-server-worker-threads {:optional true} ::sm/int] + [:http-server-max-worker-threads {:optional true} ::sm/int] [:telemetry-uri {:optional true} :string] [:telemetry-with-taiga {:optional true} ::sm/boolean] ;; DELETE @@ -214,20 +214,14 @@ [:media-uri {:optional true} :string] [:assets-path {:optional true} :string] - ;; Legacy, will be removed in 2.5 - [:assets-storage-backend {:optional true} :keyword] - [:storage-assets-fs-directory {:optional true} :string] - [:storage-assets-s3-bucket {:optional true} :string] - [:storage-assets-s3-region {:optional true} :keyword] - [:storage-assets-s3-endpoint {:optional true} ::sm/uri] - [:storage-assets-s3-io-threads {:optional true} ::sm/int] + [:netty-io-threads {:optional true} ::sm/int] + [:executor-threads {:optional true} ::sm/int] [:objects-storage-backend {:optional true} :keyword] [:objects-storage-fs-directory {:optional true} :string] [:objects-storage-s3-bucket {:optional true} :string] [:objects-storage-s3-region {:optional true} :keyword] - [:objects-storage-s3-endpoint {:optional true} ::sm/uri] - [:objects-storage-s3-io-threads {:optional true} ::sm/int]])) + [:objects-storage-s3-endpoint {:optional true} ::sm/uri]])) (defn- parse-flags [config] diff --git a/backend/src/app/features/fdata.clj b/backend/src/app/features/fdata.clj index c59cdd0ca4..7c24a477b9 100644 --- a/backend/src/app/features/fdata.clj +++ b/backend/src/app/features/fdata.clj @@ -18,9 +18,7 @@ [app.storage :as sto] [app.util.blob :as blob] [app.util.objects-map :as omap] - [app.util.pointer-map :as pmap] - [app.worker :as wrk] - [promesa.exec :as px])) + [app.util.pointer-map :as pmap])) ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; ;; OFFLOAD @@ -84,10 +82,10 @@ (assoc file :data data))) (defn decode-file-data - [{:keys [::wrk/executor]} {:keys [data] :as file}] + [_system {:keys [data] :as file}] (cond-> file (bytes? data) - (assoc :data (px/invoke! executor #(blob/decode data))))) + (assoc :data (blob/decode data)))) (defn load-pointer "A database loader pointer helper" diff --git a/backend/src/app/http.clj b/backend/src/app/http.clj index 275f081fd0..6be1ad500c 100644 --- a/backend/src/app/http.clj +++ b/backend/src/app/http.clj @@ -26,9 +26,7 @@ [app.rpc :as-alias rpc] [app.rpc.doc :as-alias rpc.doc] [app.setup :as-alias setup] - [app.worker :as wrk] [integrant.core :as ig] - [promesa.exec :as px] [reitit.core :as r] [reitit.middleware :as rr] [yetti.adapter :as yt] @@ -55,6 +53,8 @@ [:map [::port ::sm/int] [::host ::sm/text] + [::io-threads {:optional true} ::sm/int] + [::max-worker-threads {:optional true} ::sm/int] [::max-body-size {:optional true} ::sm/int] [::max-multipart-body-size {:optional true} ::sm/int] [::router {:optional true} [:fn r/router?]] @@ -65,31 +65,41 @@ (assert (sm/check schema:server-params params))) (defmethod ig/init-key ::server - [_ {:keys [::handler ::router ::host ::port ::wrk/executor] :as cfg}] + [_ {:keys [::handler ::router ::host ::port ::mtx/metrics] :as cfg}] (l/info :hint "starting http server" :port port :host host) - (let [options {:http/port port - :http/host host - :http/max-body-size (::max-body-size cfg) - :http/max-multipart-body-size (::max-multipart-body-size cfg) - :xnio/direct-buffers false - :xnio/io-threads (or (::io-threads cfg) - (max 3 (px/get-available-processors))) - :xnio/dispatch executor - :ring/compat :ring2 - :socket/backlog 4069} + (let [on-dispatch + (fn [_ start-at-ns] + (let [timing (- (System/nanoTime) start-at-ns) + timing (int (/ timing 1000000))] + (mtx/run! metrics + :id :http-server-dispatch-timing + :val timing))) - handler (cond - (some? router) - (router-handler router) + options + {:http/port port + :http/host host + :http/max-body-size (::max-body-size cfg) + :http/max-multipart-body-size (::max-multipart-body-size cfg) + :xnio/direct-buffers false + :xnio/io-threads (::io-threads cfg) + :xnio/max-worker-threads (::max-worker-threads cfg) + :ring/compat :ring2 + :events/on-dispatch on-dispatch + :socket/backlog 4069} - (some? handler) - handler + handler + (cond + (some? router) + (router-handler router) - :else - (throw (UnsupportedOperationException. "handler or router are required"))) + (some? handler) + handler - options (d/without-nils options) - server (yt/server handler options)] + :else + (throw (UnsupportedOperationException. "handler or router are required"))) + + server + (yt/server handler (d/without-nils options))] (assoc cfg ::server (yt/start! server)))) diff --git a/backend/src/app/http/awsns.clj b/backend/src/app/http/awsns.clj index 1a937e4444..0d3f1d4066 100644 --- a/backend/src/app/http/awsns.clj +++ b/backend/src/app/http/awsns.clj @@ -17,11 +17,9 @@ [app.main :as-alias main] [app.setup :as-alias setup] [app.tokens :as tokens] - [app.worker :as-alias wrk] [clojure.data.json :as j] [cuerdas.core :as str] [integrant.core :as ig] - [promesa.exec :as px] [yetti.request :as yreq] [yetti.response :as-alias yres])) @@ -40,8 +38,8 @@ [_ cfg] (letfn [(handler [request] (let [data (-> request yreq/body slurp)] - (px/run! :vthread (partial handle-request cfg data))) - {::yres/status 200})] + (handle-request cfg data) + {::yres/status 200}))] ["/sns" {:handler handler :allowed-methods #{:post}}])) diff --git a/backend/src/app/http/sse.clj b/backend/src/app/http/sse.clj index 8d431cc93c..8f09f69d25 100644 --- a/backend/src/app/http/sse.clj +++ b/backend/src/app/http/sse.clj @@ -54,7 +54,7 @@ ::yres/body (yres/stream-body (fn [_ output] (let [channel (sp/chan :buf buf :xf (keep encode)) - listener (events/start-listener + listener (events/spawn-listener channel (partial write! output) (partial pu/close! output))] diff --git a/backend/src/app/main.clj b/backend/src/app/main.clj index 706e273bd6..0f776b6e0d 100644 --- a/backend/src/app/main.clj +++ b/backend/src/app/main.clj @@ -42,6 +42,7 @@ [app.svgo :as-alias svgo] [app.util.cron] [app.worker :as-alias wrk] + [app.worker.executor] [clojure.test :as test] [clojure.tools.namespace.repl :as repl] [cuerdas.core :as str] @@ -148,23 +149,11 @@ ::mdef/labels [] ::mdef/type :histogram} - :executors-active-threads - {::mdef/name "penpot_executors_active_threads" - ::mdef/help "Current number of threads available in the executor service." - ::mdef/labels ["name"] - ::mdef/type :gauge} - - :executors-completed-tasks - {::mdef/name "penpot_executors_completed_tasks_total" - ::mdef/help "Approximate number of completed tasks by the executor." - ::mdef/labels ["name"] - ::mdef/type :counter} - - :executors-running-threads - {::mdef/name "penpot_executors_running_threads" - ::mdef/help "Current number of threads with state RUNNING." - ::mdef/labels ["name"] - ::mdef/type :gauge}}) + :http-server-dispatch-timing + {::mdef/name "penpot_http_server_dispatch_timing" + ::mdef/help "Histogram of dispatch handler" + ::mdef/labels [] + ::mdef/type :histogram}}) (def system-config {::db/pool @@ -176,14 +165,12 @@ ::db/max-size (cf/get :database-max-pool-size 60) ::mtx/metrics (ig/ref ::mtx/metrics)} - ;; Default thread pool for IO operations - ::wrk/executor - {} + ;; Default netty IO pool (shared between several services) + ::wrk/netty-io-executor + {:threads (cf/get :netty-io-threads)} - ::wrk/monitor - {::mtx/metrics (ig/ref ::mtx/metrics) - ::wrk/executor (ig/ref ::wrk/executor) - ::wrk/name "default"} + ::wrk/netty-executor + {:threads (cf/get :executor-threads)} :app.migrations/migrations {::db/pool (ig/ref ::db/pool)} @@ -197,14 +184,19 @@ ::rds/redis {::rds/uri (cf/get :redis-uri) ::mtx/metrics (ig/ref ::mtx/metrics) - ::wrk/executor (ig/ref ::wrk/executor)} + + ::wrk/netty-executor + (ig/ref ::wrk/netty-executor) + + ::wrk/netty-io-executor + (ig/ref ::wrk/netty-io-executor)} ::mbus/msgbus - {::wrk/executor (ig/ref ::wrk/executor) + {::wrk/executor (ig/ref ::wrk/netty-executor) ::rds/redis (ig/ref ::rds/redis)} :app.storage.tmp/cleaner - {::wrk/executor (ig/ref ::wrk/executor)} + {::wrk/executor (ig/ref ::wrk/netty-executor)} ::sto.gc-deleted/handler {::db/pool (ig/ref ::db/pool) @@ -232,9 +224,10 @@ ::http/host (cf/get :http-server-host) ::http/router (ig/ref ::http/router) ::http/io-threads (cf/get :http-server-io-threads) + ::http/max-worker-threads (cf/get :http-server-max-worker-threads) ::http/max-body-size (cf/get :http-server-max-body-size) ::http/max-multipart-body-size (cf/get :http-server-max-multipart-body-size) - ::wrk/executor (ig/ref ::wrk/executor)} + ::mtx/metrics (ig/ref ::mtx/metrics)} ::ldap/provider {:host (cf/get :ldap-host) @@ -312,17 +305,17 @@ ::rpc/climit {::mtx/metrics (ig/ref ::mtx/metrics) - ::wrk/executor (ig/ref ::wrk/executor) + ::wrk/executor (ig/ref ::wrk/netty-executor) ::climit/config (cf/get :rpc-climit-config) ::climit/enabled (contains? cf/flags :rpc-climit)} :app.rpc/rlimit - {::wrk/executor (ig/ref ::wrk/executor)} + {::wrk/executor (ig/ref ::wrk/netty-executor)} :app.rpc/methods {::http.client/client (ig/ref ::http.client/client) ::db/pool (ig/ref ::db/pool) - ::wrk/executor (ig/ref ::wrk/executor) + ::wrk/executor (ig/ref ::wrk/netty-executor) ::session/manager (ig/ref ::session/manager) ::ldap/provider (ig/ref ::ldap/provider) ::sto/storage (ig/ref ::sto/storage) @@ -468,20 +461,15 @@ :assets-fs (ig/ref :app.storage.fs/backend)}} :app.storage.s3/backend - {::sto.s3/region (or (cf/get :storage-assets-s3-region) - (cf/get :objects-storage-s3-region)) - ::sto.s3/endpoint (or (cf/get :storage-assets-s3-endpoint) - (cf/get :objects-storage-s3-endpoint)) - ::sto.s3/bucket (or (cf/get :storage-assets-s3-bucket) - (cf/get :objects-storage-s3-bucket)) - ::sto.s3/io-threads (or (cf/get :storage-assets-s3-io-threads) - (cf/get :objects-storage-s3-io-threads)) - ::wrk/executor (ig/ref ::wrk/executor)} + {::sto.s3/region (cf/get :objects-storage-s3-region) + ::sto.s3/endpoint (cf/get :objects-storage-s3-endpoint) + ::sto.s3/bucket (cf/get :objects-storage-s3-bucket) + + ::wrk/netty-io-executor + (ig/ref ::wrk/netty-io-executor)} :app.storage.fs/backend - {::sto.fs/directory (or (cf/get :storage-assets-fs-directory) - (cf/get :objects-storage-fs-directory))}}) - + {::sto.fs/directory (cf/get :objects-storage-fs-directory)}}) (def worker-config {::wrk/cron diff --git a/backend/src/app/msgbus.clj b/backend/src/app/msgbus.clj index b58a51103b..18cf269983 100644 --- a/backend/src/app/msgbus.clj +++ b/backend/src/app/msgbus.clj @@ -216,8 +216,7 @@ (rds/add-listener sconn (create-listener rcv-ch)) (px/thread - {:name "penpot/msgbus/io-loop" - :virtual true} + {:name "penpot/msgbus"} (try (loop [] (let [timeout-ch (sp/timeout-chan 1000) diff --git a/backend/src/app/redis.clj b/backend/src/app/redis.clj index 510069e47c..5bcd157b3f 100644 --- a/backend/src/app/redis.clj +++ b/backend/src/app/redis.clj @@ -21,8 +21,7 @@ [clojure.java.io :as io] [cuerdas.core :as str] [integrant.core :as ig] - [promesa.core :as p] - [promesa.exec :as px]) + [promesa.core :as p]) (:import clojure.lang.MapEntry io.lettuce.core.KeyValue @@ -45,8 +44,10 @@ io.lettuce.core.pubsub.api.sync.RedisPubSubCommands io.lettuce.core.resource.ClientResources io.lettuce.core.resource.DefaultClientResources + io.netty.channel.nio.NioEventLoopGroup io.netty.util.HashedWheelTimer io.netty.util.Timer + io.netty.util.concurrent.EventExecutorGroup java.lang.AutoCloseable java.time.Duration)) @@ -111,20 +112,15 @@ (defmethod ig/expand-key ::redis [k v] - (let [cpus (px/get-available-processors) - threads (max 1 (int (* cpus 0.2)))] - {k (-> (d/without-nils v) - (assoc ::timeout (ct/duration "10s")) - (assoc ::io-threads (max 3 threads)) - (assoc ::worker-threads (max 3 threads)))})) + {k (-> (d/without-nils v) + (assoc ::timeout (ct/duration "10s")))}) (def ^:private schema:redis-params [:map {:title "redis-params"} - ::wrk/executor + ::wrk/netty-io-executor + ::wrk/netty-executor ::mtx/metrics [::uri ::sm/uri] - [::worker-threads ::sm/int] - [::io-threads ::sm/int] [::timeout ::ct/duration]]) (defmethod ig/assert-key ::redis @@ -141,17 +137,30 @@ (defn- initialize-resources "Initialize redis connection resources" - [{:keys [::uri ::io-threads ::worker-threads ::wrk/executor ::mtx/metrics] :as params}] + [{:keys [::uri ::mtx/metrics ::wrk/netty-io-executor ::wrk/netty-executor] :as params}] (l/inf :hint "initialize redis resources" - :uri (str uri) - :io-threads io-threads - :worker-threads worker-threads) + :uri (str uri)) (let [timer (HashedWheelTimer.) resources (.. (DefaultClientResources/builder) - (ioThreadPoolSize ^long io-threads) - (computationThreadPoolSize ^long worker-threads) + (eventExecutorGroup ^EventExecutorGroup netty-executor) + + ;; We provide lettuce with a shared event loop + ;; group instance instead of letting lettuce to + ;; create its own + (eventLoopGroupProvider + (reify io.lettuce.core.resource.EventLoopGroupProvider + (allocate [_ _] netty-io-executor) + (threadPoolSize [_] + (.executorCount ^NioEventLoopGroup netty-io-executor)) + (release [_ _ _ _ _] + ;; Do nothing + ) + (shutdown [_ _ _ _] + ;; Do nothing + ))) + (timer ^Timer timer) (build)) @@ -166,7 +175,7 @@ (l/trace :hint "evict connection (cache)" :key key :reason cause) (some-> val d/close!)) - cache (cache/create :executor executor + cache (cache/create :executor netty-executor :on-remove on-remove :keepalive "5m")] (reify diff --git a/backend/src/app/rpc/climit.clj b/backend/src/app/rpc/climit.clj index f03f19e79c..6c773b94d6 100644 --- a/backend/src/app/rpc/climit.clj +++ b/backend/src/app/rpc/climit.clj @@ -21,7 +21,6 @@ [clojure.set :as set] [datoteka.fs :as fs] [integrant.core :as ig] - [promesa.exec :as px] [promesa.exec.bulkhead :as pbh]) (:import clojure.lang.ExceptionInfo @@ -289,13 +288,9 @@ (get-limits cfg))) (defn invoke! - "Run a function in context of climit. - Intended to be used in virtual threads." - [{:keys [::executor ::rpc/climit] :as cfg} f params] + "Run a function in context of climit." + [{:keys [::rpc/climit] :as cfg} f params] (let [f (if climit - (let [f (if (some? executor) - (fn [cfg params] (px/await! (px/submit! executor (fn [] (f cfg params))))) - f)] - (build-exec-chain cfg f)) + (build-exec-chain cfg f) f)] (f cfg params))) diff --git a/backend/src/app/rpc/commands/auth.clj b/backend/src/app/rpc/commands/auth.clj index 20e1a9cdc9..eb8621fd1b 100644 --- a/backend/src/app/rpc/commands/auth.clj +++ b/backend/src/app/rpc/commands/auth.clj @@ -6,6 +6,7 @@ (ns app.rpc.commands.auth (:require + [app.auth :as auth] [app.common.data :as d] [app.common.data.macros :as dm] [app.common.exceptions :as ex] @@ -62,7 +63,7 @@ (ex/raise :type :validation :code :account-without-password :hint "the current account does not have password") - (let [result (profile/verify-password cfg password (:password profile))] + (let [result (auth/verify-password password (:password profile))] (when (:update result) (l/trc :hint "updating profile password" :id (str (:id profile)) @@ -156,7 +157,7 @@ (:profile-id tdata))) (update-password [conn profile-id] - (let [pwd (profile/derive-password cfg password)] + (let [pwd (auth/derive-password password)] (db/update! conn :profile {:password pwd :is-active true} {:id profile-id}) nil))] @@ -378,7 +379,7 @@ (not (contains? cf/flags :email-verification))) params (-> params (assoc :is-active is-active) - (update :password #(profile/derive-password cfg %))) + (update :password auth/derive-password)) profile (->> (create-profile! conn params) (create-profile-rels! conn))] (vary-meta profile assoc :created true)))) diff --git a/backend/src/app/rpc/commands/binfile.clj b/backend/src/app/rpc/commands/binfile.clj index c9542d4089..f0fc211e46 100644 --- a/backend/src/app/rpc/commands/binfile.clj +++ b/backend/src/app/rpc/commands/binfile.clj @@ -28,7 +28,6 @@ [app.tasks.file-gc] [app.util.services :as sv] [app.worker :as-alias wrk] - [promesa.exec :as px] [yetti.response :as yres])) (set! *warn-on-reflection* true) @@ -94,7 +93,7 @@ ;; --- Command: import-binfile (defn- import-binfile - [{:keys [::db/pool ::wrk/executor] :as cfg} {:keys [profile-id project-id version name file]}] + [{:keys [::db/pool] :as cfg} {:keys [profile-id project-id version name file]}] (let [team (teams/get-team pool :profile-id profile-id :project-id project-id) @@ -105,13 +104,9 @@ (assoc ::bfc/name name) (assoc ::bfc/input (:path file))) - ;; NOTE: the importation process performs some operations that are - ;; not very friendly with virtual threads, and for avoid - ;; unexpected blocking of other concurrent operations we dispatch - ;; that operation to a dedicated executor. result (case (int version) - 1 (px/invoke! executor (partial bf.v1/import-files! cfg)) - 3 (px/invoke! executor (partial bf.v3/import-files! cfg)))] + 1 (bf.v1/import-files! cfg) + 3 (bf.v3/import-files! cfg))] (db/update! pool :project {:modified-at (ct/now)} diff --git a/backend/src/app/rpc/commands/demo.clj b/backend/src/app/rpc/commands/demo.clj index 4e64eac7ba..005ae170fa 100644 --- a/backend/src/app/rpc/commands/demo.clj +++ b/backend/src/app/rpc/commands/demo.clj @@ -7,6 +7,7 @@ (ns app.rpc.commands.demo "A demo specific mutations." (:require + [app.auth :refer [derive-password]] [app.common.exceptions :as ex] [app.common.time :as ct] [app.config :as cf] @@ -14,7 +15,6 @@ [app.loggers.audit :as audit] [app.rpc :as-alias rpc] [app.rpc.commands.auth :as auth] - [app.rpc.commands.profile :as profile] [app.rpc.doc :as-alias doc] [app.util.services :as sv] [buddy.core.codecs :as bc] @@ -46,7 +46,7 @@ :fullname fullname :is-active true :deleted-at (ct/in-future (cf/get-deletion-delay)) - :password (profile/derive-password cfg password) + :password (derive-password password) :props {}} profile (db/tx-run! cfg (fn [{:keys [::db/conn]}] (->> (auth/create-profile! conn params) diff --git a/backend/src/app/rpc/commands/files.clj b/backend/src/app/rpc/commands/files.clj index 4b4149603f..ab020338e8 100644 --- a/backend/src/app/rpc/commands/files.clj +++ b/backend/src/app/rpc/commands/files.clj @@ -39,8 +39,7 @@ [app.util.pointer-map :as pmap] [app.util.services :as sv] [app.worker :as wrk] - [cuerdas.core :as str] - [promesa.exec :as px])) + [cuerdas.core :as str])) ;; --- FEATURES @@ -251,7 +250,7 @@ (feat.fmigr/resolve-applied-migrations cfg file)))))) (defn get-file - [{:keys [::db/conn ::wrk/executor] :as cfg} id + [{:keys [::db/conn] :as cfg} id & {:keys [project-id migrate? include-deleted? @@ -273,13 +272,8 @@ ::db/remove-deleted (not include-deleted?) ::sql/for-update lock-for-update?}) (feat.fmigr/resolve-applied-migrations cfg) - (feat.fdata/resolve-file-data cfg)) - - ;; NOTE: we perform the file decoding in a separate thread - ;; because it has heavy and synchronous operations for - ;; decoding file body that are not very friendly with virtual - ;; threads. - file (px/invoke! executor #(decode-row file)) + (feat.fdata/resolve-file-data cfg) + (decode-row)) file (if (and migrate? (fmg/need-migration? file)) (migrate-file cfg file options) diff --git a/backend/src/app/rpc/commands/files_update.clj b/backend/src/app/rpc/commands/files_update.clj index e499ea2642..f21860cbea 100644 --- a/backend/src/app/rpc/commands/files_update.clj +++ b/backend/src/app/rpc/commands/files_update.clj @@ -37,9 +37,7 @@ [app.util.blob :as blob] [app.util.pointer-map :as pmap] [app.util.services :as sv] - [app.worker :as wrk] - [clojure.set :as set] - [promesa.exec :as px])) + [clojure.set :as set])) (declare ^:private get-lagged-changes) (declare ^:private send-notifications!) @@ -209,7 +207,7 @@ Follow the inner implementation to `update-file-data!` function. Only intended for internal use on this module." - [{:keys [::db/conn ::wrk/executor ::timestamp] :as cfg} + [{:keys [::db/conn ::timestamp] :as cfg} {:keys [profile-id file team features changes session-id skip-validate] :as params}] (let [;; Retrieve the file data @@ -222,15 +220,11 @@ ;; We create a new lexycal scope for clearly delimit the result of ;; executing this update file operation and all its side effects - (let [file (px/invoke! executor - (fn [] - ;; Process the file data on separated thread for avoid to do - ;; the CPU intensive operation on vthread. - (binding [cfeat/*current* features - cfeat/*previous* (:features file)] - (update-file-data! cfg file - process-changes-and-validate - changes skip-validate))))] + (let [file (binding [cfeat/*current* features + cfeat/*previous* (:features file)] + (update-file-data! cfg file + process-changes-and-validate + changes skip-validate))] (feat.fmigr/upsert-migrations! conn file) (persist-file! cfg file) diff --git a/backend/src/app/rpc/commands/fonts.clj b/backend/src/app/rpc/commands/fonts.clj index c417c28f1a..535d868f95 100644 --- a/backend/src/app/rpc/commands/fonts.clj +++ b/backend/src/app/rpc/commands/fonts.clj @@ -26,9 +26,7 @@ [app.rpc.helpers :as rph] [app.rpc.quotes :as quotes] [app.storage :as sto] - [app.util.services :as sv] - [app.worker :as-alias wrk] - [promesa.exec :as px])) + [app.util.services :as sv])) (def valid-weight #{100 200 300 400 500 600 700 800 900 950}) (def valid-style #{"normal" "italic"}) @@ -105,7 +103,7 @@ (create-font-variant cfg (assoc params :profile-id profile-id))))) (defn create-font-variant - [{:keys [::sto/storage ::db/conn ::wrk/executor]} {:keys [data] :as params}] + [{:keys [::sto/storage ::db/conn]} {:keys [data] :as params}] (letfn [(generate-missing! [data] (let [data (media/run {:cmd :generate-fonts :input data})] (when (and (not (contains? data "font/otf")) @@ -157,7 +155,7 @@ :otf-file-id (:id otf) :ttf-file-id (:id ttf)}))] - (let [data (px/invoke! executor (partial generate-missing! data)) + (let [data (generate-missing! data) assets (persist-fonts-files! data) result (insert-font-variant! assets)] (vary-meta result assoc ::audit/replace-props (update params :data (comp vec keys)))))) diff --git a/backend/src/app/rpc/commands/management.clj b/backend/src/app/rpc/commands/management.clj index ca46a8f824..fe8391aeb3 100644 --- a/backend/src/app/rpc/commands/management.clj +++ b/backend/src/app/rpc/commands/management.clj @@ -28,9 +28,7 @@ [app.setup :as-alias setup] [app.setup.templates :as tmpl] [app.storage.tmp :as tmp] - [app.util.services :as sv] - [app.worker :as-alias wrk] - [promesa.exec :as px])) + [app.util.services :as sv])) ;; --- COMMAND: Duplicate File @@ -313,15 +311,14 @@ ;; Update the modification date of the all affected projects ;; ensuring that the destination project is the most recent one. - (doseq [project-id (into (list project-id) source)] - - ;; NOTE: as this is executed on virtual thread, sleeping does - ;; not causes major issues, and allows an easy way to set a - ;; trully different modification date to each file. - (px/sleep 10) - (db/update! conn :project - {:modified-at (ct/now)} - {:id project-id})) + (loop [project-ids (into (list project-id) source) + modified-at (ct/now)] + (when-let [project-id (first project-ids)] + (db/update! conn :project + {:modified-at modified-at} + {:id project-id}) + (recur (rest project-ids) + (ct/plus modified-at 10)))) nil)) @@ -396,12 +393,7 @@ ;; --- COMMAND: Clone Template (defn clone-template - [{:keys [::db/pool ::wrk/executor] :as cfg} {:keys [project-id profile-id] :as params} template] - - ;; NOTE: the importation process performs some operations - ;; that are not very friendly with virtual threads, and for - ;; avoid unexpected blocking of other concurrent operations - ;; we dispatch that operation to a dedicated executor. + [{:keys [::db/pool] :as cfg} {:keys [project-id profile-id] :as params} template] (let [template (tmp/tempfile-from template :prefix "penpot.template." :suffix "" @@ -419,8 +411,8 @@ (assoc ::bfc/features (cfeat/get-team-enabled-features cf/flags team))) result (if (= format :binfile-v3) - (px/invoke! executor (partial bf.v3/import-files! cfg)) - (px/invoke! executor (partial bf.v1/import-files! cfg)))] + (bf.v3/import-files! cfg) + (bf.v1/import-files! cfg))] (db/tx-run! cfg (fn [{:keys [::db/conn] :as cfg}] diff --git a/backend/src/app/rpc/commands/media.clj b/backend/src/app/rpc/commands/media.clj index 5fa70e9837..63793742a0 100644 --- a/backend/src/app/rpc/commands/media.clj +++ b/backend/src/app/rpc/commands/media.clj @@ -24,10 +24,8 @@ [app.storage :as sto] [app.storage.tmp :as tmp] [app.util.services :as sv] - [app.worker :as-alias wrk] [cuerdas.core :as str] - [datoteka.io :as io] - [promesa.exec :as px])) + [datoteka.io :as io])) (def default-max-file-size (* 1024 1024 10)) ; 10 MiB @@ -153,9 +151,9 @@ (assoc ::image (process-main-image info))))) (defn- create-file-media-object - [{:keys [::sto/storage ::db/conn ::wrk/executor] :as cfg} + [{:keys [::sto/storage ::db/conn] :as cfg} {:keys [id file-id is-local name content]}] - (let [result (px/invoke! executor (partial process-image content)) + (let [result (process-image content) image (sto/put-object! storage (::image result)) thumb (when-let [params (::thumb result)] (sto/put-object! storage params))] diff --git a/backend/src/app/rpc/commands/profile.clj b/backend/src/app/rpc/commands/profile.clj index 7735c68928..747c1c4f50 100644 --- a/backend/src/app/rpc/commands/profile.clj +++ b/backend/src/app/rpc/commands/profile.clj @@ -30,16 +30,13 @@ [app.tokens :as tokens] [app.util.services :as sv] [app.worker :as wrk] - [cuerdas.core :as str] - [promesa.exec :as px])) + [cuerdas.core :as str])) (declare check-profile-existence!) (declare decode-row) -(declare derive-password) (declare filter-props) (declare get-profile) (declare strip-private-attrs) -(declare verify-password) (def schema:props-notifications [:map {:title "props-notifications"} @@ -192,7 +189,7 @@ [{:keys [::db/conn] :as cfg} {:keys [profile-id old-password] :as params}] (let [profile (db/get-by-id conn :profile profile-id ::sql/for-update true)] (when (and (not= (:password profile) "!") - (not (:valid (verify-password cfg old-password (:password profile))))) + (not (:valid (auth/verify-password old-password (:password profile))))) (ex/raise :type :validation :code :old-password-not-match)) profile)) @@ -201,7 +198,7 @@ [{:keys [::db/conn] :as cfg} {:keys [id password] :as profile}] (when-not (db/read-only? conn) (db/update! conn :profile - {:password (derive-password cfg password)} + {:password (auth/derive-password password)} {:id id}) nil)) @@ -303,12 +300,11 @@ :content-type (:mtype thumb)})) (defn upload-photo - [{:keys [::sto/storage ::wrk/executor] :as cfg} {:keys [file] :as params}] + [{:keys [::sto/storage] :as cfg} {:keys [file] :as params}] (let [params (-> cfg (assoc ::climit/id [[:process-image/by-profile (:profile-id params)] [:process-image/global]]) (assoc ::climit/label "upload-photo") - (assoc ::climit/executor executor) (climit/invoke! generate-thumbnail! file))] (sto/put-object! storage params))) @@ -548,15 +544,6 @@ [props] (into {} (filter (fn [[k _]] (simple-ident? k))) props)) -(defn derive-password - [{:keys [::wrk/executor]} password] - (when password - (px/invoke! executor (partial auth/derive-password password)))) - -(defn verify-password - [{:keys [::wrk/executor]} password password-data] - (px/invoke! executor (partial auth/verify-password password password-data))) - (defn decode-row [{:keys [props] :as row}] (cond-> row diff --git a/backend/src/app/srepl/cli.clj b/backend/src/app/srepl/cli.clj index 22139f5631..48876c905e 100644 --- a/backend/src/app/srepl/cli.clj +++ b/backend/src/app/srepl/cli.clj @@ -7,7 +7,7 @@ (ns app.srepl.cli "PREPL API for external usage (CLI or ADMIN)" (:require - [app.auth :as auth] + [app.auth :refer [derive-password]] [app.common.exceptions :as ex] [app.common.schema :as sm] [app.common.schema.generators :as sg] @@ -54,7 +54,7 @@ (some-> (get-current-system) (db/tx-run! (fn [{:keys [::db/conn] :as system}] - (let [password (cmd.profile/derive-password system password) + (let [password (derive-password password) params {:id (uuid/next) :email email :fullname fullname @@ -74,7 +74,7 @@ (assoc :fullname fullname) (some? password) - (assoc :password (auth/derive-password password)) + (assoc :password (derive-password password)) (some? is-active) (assoc :is-active is-active))] @@ -124,7 +124,7 @@ (defmethod exec-command "derive-password" [{:keys [password]}] - (auth/derive-password password)) + (derive-password password)) (defmethod exec-command "authenticate" [{:keys [token]}] diff --git a/backend/src/app/storage/s3.clj b/backend/src/app/storage/s3.clj index 73929c1639..8d28a9a9f3 100644 --- a/backend/src/app/storage/s3.clj +++ b/backend/src/app/storage/s3.clj @@ -31,13 +31,13 @@ java.time.Duration java.util.Collection java.util.Optional + java.util.concurrent.atomic.AtomicLong org.reactivestreams.Subscriber software.amazon.awssdk.core.ResponseBytes software.amazon.awssdk.core.async.AsyncRequestBody software.amazon.awssdk.core.async.AsyncResponseTransformer software.amazon.awssdk.core.async.BlockingInputStreamAsyncRequestBody software.amazon.awssdk.core.client.config.ClientAsyncConfiguration - software.amazon.awssdk.core.client.config.SdkAdvancedAsyncClientOption software.amazon.awssdk.http.nio.netty.NettyNioAsyncHttpClient software.amazon.awssdk.http.nio.netty.SdkEventLoopGroup software.amazon.awssdk.regions.Region @@ -87,12 +87,11 @@ (def ^:private schema:config [:map {:title "s3-backend-config"} - ::wrk/executor + ::wrk/netty-io-executor [::region {:optional true} :keyword] [::bucket {:optional true} ::sm/text] [::prefix {:optional true} ::sm/text] - [::endpoint {:optional true} ::sm/uri] - [::io-threads {:optional true} ::sm/int]]) + [::endpoint {:optional true} ::sm/uri]]) (defmethod ig/expand-key ::backend [k v] @@ -110,6 +109,7 @@ presigner (build-s3-presigner params)] (assoc params ::sto/type :s3 + ::counter (AtomicLong. 0) ::client @client ::presigner presigner ::close-fn #(.close ^java.lang.AutoCloseable client))))) @@ -121,7 +121,7 @@ (defmethod ig/halt-key! ::backend [_ {:keys [::close-fn]}] (when (fn? close-fn) - (px/run! close-fn))) + (close-fn))) (def ^:private schema:backend [:map {:title "s3-backend"} @@ -198,19 +198,16 @@ (Region/of (name region))) (defn- build-s3-client - [{:keys [::region ::endpoint ::io-threads ::wrk/executor]}] + [{:keys [::region ::endpoint ::wrk/netty-io-executor]}] (let [aconfig (-> (ClientAsyncConfiguration/builder) - (.advancedOption SdkAdvancedAsyncClientOption/FUTURE_COMPLETION_EXECUTOR executor) (.build)) sconfig (-> (S3Configuration/builder) (cond-> (some? endpoint) (.pathStyleAccessEnabled true)) (.build)) - thr-num (or io-threads (min 16 (px/get-available-processors))) hclient (-> (NettyNioAsyncHttpClient/builder) - (.eventLoopGroupBuilder (-> (SdkEventLoopGroup/builder) - (.numberOfThreads (int thr-num)))) + (.eventLoopGroup (SdkEventLoopGroup/create netty-io-executor)) (.connectionAcquisitionTimeout default-timeout) (.connectionTimeout default-timeout) (.readTimeout default-timeout) @@ -262,7 +259,7 @@ (.close ^InputStream input)))) (defn- make-request-body - [executor content] + [counter content] (let [size (impl/get-size content)] (reify AsyncRequestBody @@ -272,16 +269,19 @@ (^void subscribe [_ ^Subscriber subscriber] (let [delegate (AsyncRequestBody/forBlockingInputStream (long size)) input (io/input-stream content)] - (px/run! executor (partial write-input-stream delegate input)) + + (px/thread-call (partial write-input-stream delegate input) + {:name (str "penpot/storage/" (.getAndIncrement ^AtomicLong counter))}) + (.subscribe ^BlockingInputStreamAsyncRequestBody delegate ^Subscriber subscriber)))))) (defn- put-object - [{:keys [::client ::bucket ::prefix ::wrk/executor]} {:keys [id] :as object} content] + [{:keys [::client ::bucket ::prefix ::counter]} {:keys [id] :as object} content] (let [path (dm/str prefix (impl/id->path id)) mdata (meta object) mtype (:content-type mdata "application/octet-stream") - rbody (make-request-body executor content) + rbody (make-request-body counter content) request (.. (PutObjectRequest/builder) (bucket bucket) (contentType mtype) diff --git a/backend/src/app/storage/tmp.clj b/backend/src/app/storage/tmp.clj index 915195fbb2..4bbff4a72d 100644 --- a/backend/src/app/storage/tmp.clj +++ b/backend/src/app/storage/tmp.clj @@ -44,7 +44,7 @@ [_ cfg] (fs/create-dir default-tmp-dir) (px/fn->thread (partial io-loop cfg) - {:name "penpot/storage/tmp-cleaner" :virtual true})) + {:name "penpot/storage/tmp-cleaner"})) (defmethod ig/halt-key! ::cleaner [_ thread] diff --git a/backend/src/app/util/events.clj b/backend/src/app/util/events.clj index b26810cb04..5fd8b6474c 100644 --- a/backend/src/app/util/events.clj +++ b/backend/src/app/util/events.clj @@ -27,7 +27,7 @@ (sp/put! channel [type data]) nil))) -(defn start-listener +(defn spawn-listener [channel on-event on-close] (assert (sp/chan? channel) "expected active events channel") @@ -51,7 +51,7 @@ [f on-event] (binding [*channel* (sp/chan :buf 32)] - (let [listener (start-listener *channel* on-event (constantly nil))] + (let [listener (spawn-listener *channel* on-event (constantly nil))] (try (f) (finally diff --git a/backend/src/app/worker/dispatcher.clj b/backend/src/app/worker/dispatcher.clj index 3d1ed18707..2b49d43d77 100644 --- a/backend/src/app/worker/dispatcher.clj +++ b/backend/src/app/worker/dispatcher.clj @@ -112,7 +112,7 @@ (if (db/read-only? pool) (l/wrn :hint "not started (db is read-only)") - (px/fn->thread dispatcher :name "penpot/worker/dispatcher" :virtual false)))) + (px/fn->thread dispatcher :name "penpot/worker-dispatcher")))) (defmethod ig/halt-key! ::wrk/dispatcher [_ thread] diff --git a/backend/src/app/worker/executor.clj b/backend/src/app/worker/executor.clj index acd6290bba..f34239e7e0 100644 --- a/backend/src/app/worker/executor.clj +++ b/backend/src/app/worker/executor.clj @@ -7,97 +7,79 @@ (ns app.worker.executor "Async tasks abstraction (impl)." (:require - [app.common.data :as d] [app.common.logging :as l] + [app.common.math :as mth] [app.common.schema :as sm] - [app.common.time :as ct] - [app.metrics :as mtx] [app.worker :as-alias wrk] [integrant.core :as ig] [promesa.exec :as px]) (:import - java.util.concurrent.ThreadPoolExecutor)) + io.netty.channel.nio.NioEventLoopGroup + io.netty.util.concurrent.DefaultEventExecutorGroup + java.util.concurrent.ExecutorService + java.util.concurrent.ThreadFactory)) (set! *warn-on-reflection* true) (sm/register! {:type ::wrk/executor - :pred #(instance? ThreadPoolExecutor %) + :pred #(instance? ExecutorService %) :type-properties {:title "executor" - :description "Instance of ThreadPoolExecutor"}}) + :description "Instance of ExecutorService"}}) + +(sm/register! + {:type ::wrk/netty-io-executor + :pred #(instance? NioEventLoopGroup %) + :type-properties + {:title "executor" + :description "Instance of NioEventLoopGroup"}}) + +(sm/register! + {:type ::wrk/netty-executor + :pred #(instance? DefaultEventExecutorGroup %) + :type-properties + {:title "executor" + :description "Instance of DefaultEventExecutorGroup"}}) ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; -;; EXECUTOR +;; IO Executor ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; -(defmethod ig/init-key ::wrk/executor - [_ _] - (let [factory (px/thread-factory :prefix "penpot/default/") - executor (px/cached-executor :factory factory :keepalive 60000)] - (l/inf :hint "executor started") - executor)) +(defmethod ig/assert-key ::wrk/netty-io-executor + [_ {:keys [threads]}] + (assert (or (nil? threads) (int? threads)) + "expected valid threads value, revisit PENPOT_NETTY_IO_THREADS environment variable")) -(defmethod ig/halt-key! ::wrk/executor +(defmethod ig/init-key ::wrk/netty-io-executor + [_ {:keys [threads]}] + (let [factory (px/thread-factory :prefix "penpot/netty-io/") + nthreads (or threads (mth/round (/ (px/get-available-processors) 2))) + nthreads (max 2 nthreads)] + (l/inf :hint "start netty io executor" :threads nthreads) + (NioEventLoopGroup. (int nthreads) ^ThreadFactory factory))) + +(defmethod ig/halt-key! ::wrk/netty-io-executor [_ instance] (px/shutdown! instance)) ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; -;; MONITOR +;; IO Offload Executor ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; -(defn- get-stats - [^ThreadPoolExecutor executor] - {:active (.getPoolSize ^ThreadPoolExecutor executor) - :running (.getActiveCount ^ThreadPoolExecutor executor) - :completed (.getCompletedTaskCount ^ThreadPoolExecutor executor)}) +(defmethod ig/assert-key ::wrk/netty-executor + [_ {:keys [threads]}] + (assert (or (nil? threads) (int? threads)) + "expected valid threads value, revisit PENPOT_EXEC_THREADS environment variable")) -(defmethod ig/expand-key ::wrk/monitor - [k v] - {k (-> (d/without-nils v) - (assoc ::interval (ct/duration "2s")))}) +(defmethod ig/init-key ::wrk/netty-executor + [_ {:keys [threads]}] + (let [factory (px/thread-factory :prefix "penpot/exec/") + nthreads (or threads (mth/round (/ (px/get-available-processors) 2))) + nthreads (max 2 nthreads)] + (l/inf :hint "start default executor" :threads nthreads) + (DefaultEventExecutorGroup. (int nthreads) ^ThreadFactory factory))) -(defmethod ig/init-key ::wrk/monitor - [_ {:keys [::wrk/executor ::mtx/metrics ::interval ::wrk/name]}] - (letfn [(monitor! [executor prev-completed] - (let [labels (into-array String [(d/name name)]) - stats (get-stats executor) - - completed (:completed stats) - completed-inc (- completed prev-completed) - completed-inc (if (neg? completed-inc) 0 completed-inc)] - - (mtx/run! metrics - :id :executor-active-threads - :labels labels - :val (:active stats)) - - (mtx/run! metrics - :id :executor-running-threads - :labels labels - :val (:running stats)) - - (mtx/run! metrics - :id :executors-completed-tasks - :labels labels - :inc completed-inc) - - completed-inc))] - - (px/thread - {:name "penpot/executors-monitor" :virtual true} - (l/inf :hint "monitor started" :name name) - (try - (loop [completed 0] - (px/sleep interval) - (recur (long (monitor! executor completed)))) - (catch InterruptedException _cause - (l/trc :hint "monitor: interrupted" :name name)) - (catch Throwable cause - (l/err :hint "monitor: unexpected error" :name name :cause cause)) - (finally - (l/inf :hint "monitor: terminated" :name name)))))) - -(defmethod ig/halt-key! ::wrk/monitor - [_ thread] - (px/interrupt! thread)) +(defmethod ig/halt-key! ::wrk/netty-executor + [_ instance] + (px/shutdown! instance)) diff --git a/backend/src/app/worker/runner.clj b/backend/src/app/worker/runner.clj index ce9b7ee5a9..c7c1a0e61f 100644 --- a/backend/src/app/worker/runner.clj +++ b/backend/src/app/worker/runner.clj @@ -248,7 +248,7 @@ (defn- start-thread! [{:keys [::rds/redis ::id ::queue ::wrk/tenant] :as cfg}] (px/thread - {:name (format "penpot/worker/runner:%s" id)} + {:name (str "penpot/worker-runner/" id)} (l/inf :hint "started" :id id :queue queue) (try (dm/with-open [rconn (rds/connect redis)] @@ -303,7 +303,7 @@ (l/wrn :hint "not started (db is read-only)" :queue queue :parallelism parallelism) (doall (->> (range parallelism) - (map #(assoc cfg ::id %)) + (map #(assoc cfg ::id (str queue "/" %))) (map start-thread!)))))) (defmethod ig/halt-key! ::wrk/runner diff --git a/backend/test/backend_tests/helpers.clj b/backend/test/backend_tests/helpers.clj index a589c6fe7e..6f5e8d07f2 100644 --- a/backend/test/backend_tests/helpers.clj +++ b/backend/test/backend_tests/helpers.clj @@ -113,7 +113,6 @@ :app.auth.oidc.providers/generic :app.setup/templates :app.auth.oidc/routes - :app.worker/monitor :app.http.oauth/handler :app.notifications/handler :app.loggers.mattermost/reporter diff --git a/common/src/app/common/time.cljc b/common/src/app/common/time.cljc index d32ecfbefe..3fe0350bd0 100644 --- a/common/src/app/common/time.cljc +++ b/common/src/app/common/time.cljc @@ -130,7 +130,6 @@ ms-or-obj (integer? ms-or-obj) - (Duration/ofMillis ms-or-obj) :else @@ -433,4 +432,4 @@ #?(:cljs (extend-protocol cljs.core/IEncodeJS js/Date - (-clj->js [x] x))) \ No newline at end of file + (-clj->js [x] x)))