diff --git a/backend/src/app/main.clj b/backend/src/app/main.clj index d6f4cfc4f5..57aebd30df 100644 --- a/backend/src/app/main.clj +++ b/backend/src/app/main.clj @@ -181,9 +181,9 @@ ::mtx/routes {::mtx/metrics (ig/ref ::mtx/metrics)} - ::rds/redis - {::rds/uri (cf/get :redis-uri) - ::mtx/metrics (ig/ref ::mtx/metrics) + ::rds/client + {::rds/uri + (cf/get :redis-uri) ::wrk/netty-executor (ig/ref ::wrk/netty-executor) @@ -191,9 +191,14 @@ ::wrk/netty-io-executor (ig/ref ::wrk/netty-io-executor)} + ::rds/pool + {::rds/client (ig/ref ::rds/client) + ::mtx/metrics (ig/ref ::mtx/metrics)} + ::mbus/msgbus - {::wrk/executor (ig/ref ::wrk/netty-executor) - ::rds/redis (ig/ref ::rds/redis)} + {::wrk/executor (ig/ref ::wrk/netty-executor) + ::rds/client (ig/ref ::rds/client) + ::mtx/metrics (ig/ref ::mtx/metrics)} :app.storage.tmp/cleaner {::wrk/executor (ig/ref ::wrk/netty-executor)} @@ -315,13 +320,14 @@ :app.rpc/methods {::http.client/client (ig/ref ::http.client/client) ::db/pool (ig/ref ::db/pool) + ::rds/pool (ig/ref ::rds/pool) ::wrk/executor (ig/ref ::wrk/netty-executor) ::session/manager (ig/ref ::session/manager) ::ldap/provider (ig/ref ::ldap/provider) ::sto/storage (ig/ref ::sto/storage) ::mtx/metrics (ig/ref ::mtx/metrics) ::mbus/msgbus (ig/ref ::mbus/msgbus) - ::rds/redis (ig/ref ::rds/redis) + ::rds/client (ig/ref ::rds/client) ::rpc/climit (ig/ref ::rpc/climit) ::rpc/rlimit (ig/ref ::rpc/rlimit) @@ -515,7 +521,7 @@ :task :audit-log-gc})]} ::wrk/dispatcher - {::rds/redis (ig/ref ::rds/redis) + {::rds/client (ig/ref ::rds/client) ::mtx/metrics (ig/ref ::mtx/metrics) ::db/pool (ig/ref ::db/pool) ::wrk/tenant (cf/get :tenant)} @@ -524,7 +530,7 @@ {::wrk/parallelism (cf/get ::worker-default-parallelism 1) ::wrk/queue :default ::wrk/tenant (cf/get :tenant) - ::rds/redis (ig/ref ::rds/redis) + ::rds/client (ig/ref ::rds/client) ::wrk/registry (ig/ref ::wrk/registry) ::mtx/metrics (ig/ref ::mtx/metrics) ::db/pool (ig/ref ::db/pool)} @@ -533,7 +539,7 @@ {::wrk/parallelism (cf/get ::worker-webhook-parallelism 1) ::wrk/queue :webhooks ::wrk/tenant (cf/get :tenant) - ::rds/redis (ig/ref ::rds/redis) + ::rds/client (ig/ref ::rds/client) ::wrk/registry (ig/ref ::wrk/registry) ::mtx/metrics (ig/ref ::mtx/metrics) ::db/pool (ig/ref ::db/pool)}}) diff --git a/backend/src/app/msgbus.clj b/backend/src/app/msgbus.clj index 18cf269983..eda7431e2e 100644 --- a/backend/src/app/msgbus.clj +++ b/backend/src/app/msgbus.clj @@ -16,7 +16,6 @@ [app.redis :as rds] [app.worker :as wrk] [integrant.core :as ig] - [promesa.core :as p] [promesa.exec :as px] [promesa.exec.csp :as sp])) @@ -59,14 +58,16 @@ (assoc ::timeout (ct/duration {:seconds 30})))}) (def ^:private schema:params - [:map ::rds/redis ::wrk/executor]) + [:map + ::rds/client + ::wrk/executor]) (defmethod ig/assert-key ::msgbus [_ params] (assert (sm/check schema:params params))) (defmethod ig/init-key ::msgbus - [_ {:keys [::buffer-size ::wrk/executor ::timeout ::rds/redis] :as cfg}] + [_ {:keys [::buffer-size ::wrk/executor ::timeout] :as cfg}] (l/info :hint "initialize msgbus" :buffer-size buffer-size) (let [cmd-ch (sp/chan :buf buffer-size) rcv-ch (sp/chan :buf (sp/dropping-buffer buffer-size)) @@ -74,8 +75,9 @@ :xf xform-prefix-topic) state (agent {}) - pconn (rds/connect redis :type :default :timeout timeout) - sconn (rds/connect redis :type :pubsub :timeout timeout) + ;; Open persistent connections to redis + pconn (rds/connect cfg :timeout timeout) + sconn (rds/connect-pubsub cfg :timeout timeout) _ (set-error-handler! state #(l/error :cause % :hint "unexpected error on agent" ::l/sync? true)) _ (set-error-mode! state :continue) @@ -189,14 +191,13 @@ (defn- create-listener [rcv-ch] - (rds/pubsub-listener - :on-message (fn [_ topic message] + {:on-message (fn [_ topic message] ;; There are no back pressure, so we use a slidding ;; buffer for cases when the pubsub broker sends ;; more messages that we can process. - (let [val {:topic topic :message (t/decode message)}] + (let [val {:topic topic :message (t/decode-str message)}] (when-not (sp/offer! rcv-ch val) - (l/warn :msg "dropping message on subscription loop")))))) + (l/warn :msg "dropping message on subscription loop"))))}) (defn- process-input [{:keys [::state ::wrk/executor] :as cfg} topic message] @@ -262,7 +263,7 @@ intended to be used in core.async go blocks." [{:keys [::pconn] :as cfg} {:keys [topic message]}] (try - (p/await! (rds/publish pconn topic (t/encode message))) + (rds/publish pconn topic (t/encode-str message)) (catch InterruptedException cause (throw cause)) (catch Throwable cause diff --git a/backend/src/app/redis.clj b/backend/src/app/redis.clj index 5bcd157b3f..96e6b07be5 100644 --- a/backend/src/app/redis.clj +++ b/backend/src/app/redis.clj @@ -6,22 +6,22 @@ (ns app.redis "The msgbus abstraction implemented using redis as underlying backend." - (:refer-clojure :exclude [eval]) + (:refer-clojure :exclude [eval get set run!]) (:require [app.common.data :as d] [app.common.exceptions :as ex] + [app.common.generic-pool :as gpool] [app.common.logging :as l] [app.common.schema :as sm] [app.common.time :as ct] [app.metrics :as mtx] [app.redis.script :as-alias rscript] - [app.util.cache :as cache] - [app.worker :as-alias wrk] + [app.worker :as wrk] + [app.worker.executor] [clojure.core :as c] [clojure.java.io :as io] [cuerdas.core :as str] - [integrant.core :as ig] - [promesa.core :as p]) + [integrant.core :as ig]) (:import clojure.lang.MapEntry io.lettuce.core.KeyValue @@ -31,12 +31,10 @@ io.lettuce.core.RedisException io.lettuce.core.RedisURI io.lettuce.core.ScriptOutputType - io.lettuce.core.api.StatefulConnection + io.lettuce.core.SetArgs io.lettuce.core.api.StatefulRedisConnection - io.lettuce.core.api.async.RedisAsyncCommands - io.lettuce.core.api.async.RedisScriptingAsyncCommands io.lettuce.core.api.sync.RedisCommands - io.lettuce.core.codec.ByteArrayCodec + io.lettuce.core.api.sync.RedisScriptingCommands io.lettuce.core.codec.RedisCodec io.lettuce.core.codec.StringCodec io.lettuce.core.pubsub.RedisPubSubListener @@ -53,245 +51,229 @@ (set! *warn-on-reflection* true) -(declare ^:private initialize-resources) -(declare ^:private shutdown-resources) -(declare ^:private impl-eval) +(def ^:const MAX-EVAL-RETRIES 18) -(defprotocol IRedis - (-connect [_ options]) - (-get-or-connect [_ key options])) +(def default-timeout + (ct/duration "10s")) + +;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; +;; IMPL & PRIVATE API +;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; (defprotocol IConnection - (publish [_ topic message]) - (rpush [_ key payload]) - (blpop [_ timeout keys]) - (eval [_ script])) + (-set-timeout [_ timeout] "set connection timeout") + (-get-timeout [_] "get current timeout") + (-reset-timeout [_] "reset to default timeout")) + +(defprotocol IDefaultConnection + "Public API of default redis connection" + (-publish [_ topic message]) + (-rpush [_ key payload]) + (-blpop [_ timeout keys]) + (-eval [_ script]) + (-get [_ key]) + (-set [_ key val args]) + (-del [_ key-or-keys]) + (-ping [_])) (defprotocol IPubSubConnection - (add-listener [_ listener]) - (subscribe [_ topics]) - (unsubscribe [_ topics])) + (-add-listener [_ listener]) + (-subscribe [_ topics]) + (-unsubscribe [_ topics])) -(def default-codec - (RedisCodec/of StringCodec/UTF8 ByteArrayCodec/INSTANCE)) - -(def string-codec +(def ^:private default-codec (RedisCodec/of StringCodec/UTF8 StringCodec/UTF8)) -(sm/register! - {:type ::connection - :pred #(satisfies? IConnection %) - :type-properties - {:title "connection" - :description "redis connection instance"}}) +(defn- impl-eval + [cmd cache metrics script] + (let [keys (into-array String (map str (::rscript/keys script))) + vals (into-array String (map str (::rscript/vals script))) + sname (::rscript/name script) -(sm/register! - {:type ::pubsub-connection - :pred #(satisfies? IPubSubConnection %) - :type-properties - {:title "connection" - :description "redis connection instance"}}) + read-script + (fn [] + (-> script ::rscript/path io/resource slurp)) -(defn redis? - [o] - (satisfies? IRedis o)) + load-script + (fn [] + (let [id (.scriptLoad ^RedisScriptingCommands cmd + ^String (read-script))] + (swap! cache assoc sname id) + (l/trc :hint "load script" :name sname :id id) -(sm/register! - {:type ::redis - :pred redis?}) + id)) -(def ^:private schema:script - [:map {:title "script"} - [::rscript/name qualified-keyword?] - [::rscript/path ::sm/text] - [::rscript/keys {:optional true} [:vector :any]] - [::rscript/vals {:optional true} [:vector :any]]]) + eval-script + (fn [id] + (try + (let [tpoint (ct/tpoint) + result (.evalsha ^RedisScriptingCommands cmd + ^String id + ^ScriptOutputType ScriptOutputType/MULTI + ^"[Ljava.lang.String;" keys + ^"[Ljava.lang.String;" vals) + elapsed (tpoint)] -(def valid-script? - (sm/lazy-validator schema:script)) + (mtx/run! metrics {:id :redis-eval-timing + :labels [(name sname)] + :val (inst-ms elapsed)}) -(defmethod ig/expand-key ::redis - [k v] - {k (-> (d/without-nils v) - (assoc ::timeout (ct/duration "10s")))}) + (l/trc :hint "eval script" + :name (name sname) + :id id + :params (str/join "," (::rscript/vals script)) + :elapsed (ct/format-duration elapsed)) -(def ^:private schema:redis-params - [:map {:title "redis-params"} - ::wrk/netty-io-executor - ::wrk/netty-executor - ::mtx/metrics - [::uri ::sm/uri] - [::timeout ::ct/duration]]) + result) -(defmethod ig/assert-key ::redis - [_ params] - (assert (sm/check schema:redis-params params))) + (catch io.lettuce.core.RedisNoScriptException _cause + ::load) -(defmethod ig/init-key ::redis - [_ params] - (initialize-resources params)) + (catch Throwable cause + (when-let [on-error (::rscript/on-error script)] + (on-error cause)) + (throw cause)))) -(defmethod ig/halt-key! ::redis - [_ instance] - (d/close! instance)) + eval-script' + (fn [id] + (loop [id id + retries 0] + (if (> retries MAX-EVAL-RETRIES) + (ex/raise :type :internal + :code ::max-eval-retries-reached + :hint (str "unable to eval redis script " sname)) + (let [result (eval-script id)] + (if (= result ::load) + (recur (load-script) + (inc retries)) + result)))))] -(defn- initialize-resources - "Initialize redis connection resources" - [{:keys [::uri ::mtx/metrics ::wrk/netty-io-executor ::wrk/netty-executor] :as params}] + (if-let [id (c/get @cache sname)] + (eval-script' id) + (-> (load-script) + (eval-script'))))) - (l/inf :hint "initialize redis resources" - :uri (str uri)) +(deftype Connection [^StatefulRedisConnection conn + ^RedisCommands cmd + ^Duration timeout + cache metrics] + AutoCloseable + (close [_] + (ex/ignoring (.close conn))) - (let [timer (HashedWheelTimer.) - resources (.. (DefaultClientResources/builder) - (eventExecutorGroup ^EventExecutorGroup netty-executor) + IConnection + (-set-timeout [_ timeout] + (.setTimeout conn ^Duration timeout)) - ;; We provide lettuce with a shared event loop - ;; group instance instead of letting lettuce to - ;; create its own - (eventLoopGroupProvider - (reify io.lettuce.core.resource.EventLoopGroupProvider - (allocate [_ _] netty-io-executor) - (threadPoolSize [_] - (.executorCount ^NioEventLoopGroup netty-io-executor)) - (release [_ _ _ _ _] - ;; Do nothing - ) - (shutdown [_ _ _ _] - ;; Do nothing - ))) + (-reset-timeout [_] + (.setTimeout conn timeout)) - (timer ^Timer timer) - (build)) + (-get-timeout [_] + (.getTimeout conn)) - redis-uri (RedisURI/create ^String (str uri)) + IDefaultConnection + (-publish [_ topic message] + (.publish cmd ^String topic ^String message)) - shutdown (fn [client conn] - (ex/ignoring (.close ^StatefulConnection conn)) - (ex/ignoring (.close ^RedisClient client)) - (l/trc :hint "disconnect" :hid (hash client))) + (-rpush [_ key elements] + (try + (let [vals (make-array String (count elements))] + (loop [i 0 xs (seq elements)] + (when xs + (aset ^"[[Ljava.lang.String;" vals i ^String (first xs)) + (recur (inc i) (next xs)))) - on-remove (fn [key val cause] - (l/trace :hint "evict connection (cache)" :key key :reason cause) - (some-> val d/close!)) + (.rpush cmd + ^String key + ^"[[Ljava.lang.String;" vals)) - cache (cache/create :executor netty-executor - :on-remove on-remove - :keepalive "5m")] - (reify - java.lang.AutoCloseable - (close [_] - (ex/ignoring (cache/invalidate! cache)) - (ex/ignoring (.shutdown ^ClientResources resources)) - (ex/ignoring (.stop ^Timer timer))) + (catch RedisCommandInterruptedException cause + (throw (InterruptedException. (ex-message cause)))))) - IRedis - (-get-or-connect [this key options] - (let [create (fn [_] (-connect this options))] - (cache/get cache key create))) + (-blpop [_ keys timeout] + (try + (let [keys (into-array String keys)] + (when-let [res (.blpop cmd + ^double timeout + ^"[Ljava.lang.String;" keys)] + (MapEntry/create + (.getKey ^KeyValue res) + (.getValue ^KeyValue res)))) + (catch RedisCommandInterruptedException cause + (throw (InterruptedException. (ex-message cause)))))) - (-connect [_ options] - (let [timeout (or (:timeout options) (::timeout params)) - codec (get options :codec default-codec) - type (get options :type :default) - client (RedisClient/create ^ClientResources resources - ^RedisURI redis-uri)] + (-get [_ key] + (assert (string? key) "key expected to be string") + (.get cmd ^String key)) - (l/trc :hint "connect" :hid (hash client)) - (if (= type :pubsub) - (let [conn (.connectPubSub ^RedisClient client - ^RedisCodec codec)] - (.setTimeout ^StatefulConnection conn - ^Duration timeout) - (reify - IPubSubConnection - (add-listener [_ listener] - (assert (instance? RedisPubSubListener listener) "expected listener instance") - (.addListener ^StatefulRedisPubSubConnection conn - ^RedisPubSubListener listener)) + (-set [_ key val args] + (.set cmd + ^String key + ^bytes val + ^SetArgs args)) - (subscribe [_ topics] - (try - (let [topics (into-array String (map str topics)) - cmd (.sync ^StatefulRedisPubSubConnection conn)] - (.subscribe ^RedisPubSubCommands cmd topics)) - (catch RedisCommandInterruptedException cause - (throw (InterruptedException. (ex-message cause)))))) + (-del [_ keys] + (let [keys (into-array String keys)] + (.del cmd ^String/1 keys))) - (unsubscribe [_ topics] - (try - (let [topics (into-array String (map str topics)) - cmd (.sync ^StatefulRedisPubSubConnection conn)] - (.unsubscribe ^RedisPubSubCommands cmd topics)) - (catch RedisCommandInterruptedException cause - (throw (InterruptedException. (ex-message cause)))))) + (-ping [_] + (.ping cmd)) + + (-eval [_ script] + (impl-eval cmd cache metrics script))) - AutoCloseable - (close [_] (shutdown client conn)))) +(deftype SubscriptionConnection [^StatefulRedisPubSubConnection conn + ^RedisPubSubCommands cmd + ^Duration timeout] + AutoCloseable + (close [_] + (ex/ignoring (.close conn))) - (let [conn (.connect ^RedisClient client ^RedisCodec codec)] - (.setTimeout ^StatefulConnection conn ^Duration timeout) - (reify - IConnection - (publish [_ topic message] - (assert (string? topic) "expected topic to be string") - (assert (bytes? message) "expected message to be a byte array") + IConnection + (-set-timeout [_ timeout] + (.setTimeout conn ^Duration timeout)) - (let [pcomm (.async ^StatefulRedisConnection conn)] - (.publish ^RedisAsyncCommands pcomm ^String topic ^bytes message))) + (-reset-timeout [_] + (.setTimeout conn timeout)) - (rpush [_ key payload] - (assert (or (and (vector? payload) - (every? bytes? payload)) - (bytes? payload))) - (try - (let [cmd (.sync ^StatefulRedisConnection conn) - data (if (vector? payload) payload [payload]) - vals (make-array (. Class (forName "[B")) (count data))] + (-get-timeout [_] + (.getTimeout conn)) - (loop [i 0 xs (seq data)] - (when xs - (aset ^"[[B" vals i ^bytes (first xs)) - (recur (inc i) (next xs)))) + IPubSubConnection + (-add-listener [_ listener] + (.addListener conn ^RedisPubSubListener listener)) - (.rpush ^RedisCommands cmd - ^String key - ^"[[B" vals)) + (-subscribe [_ topics] + (try + (let [topics (into-array String topics)] + (.subscribe cmd topics)) + (catch RedisCommandInterruptedException cause + (throw (InterruptedException. (ex-message cause)))))) - (catch RedisCommandInterruptedException cause - (throw (InterruptedException. (ex-message cause)))))) + (-unsubscribe [_ topics] + (try + (let [topics (into-array String topics)] + (.unsubscribe cmd topics)) + (catch RedisCommandInterruptedException cause + (throw (InterruptedException. (ex-message cause))))))) - (blpop [_ timeout keys] - (try - (let [keys (into-array Object (map str keys)) - cmd (.sync ^StatefulRedisConnection conn) - timeout (/ (double (inst-ms timeout)) 1000.0)] - (when-let [res (.blpop ^RedisCommands cmd - ^double timeout - ^"[Ljava.lang.String;" keys)] - (MapEntry/create - (.getKey ^KeyValue res) - (.getValue ^KeyValue res)))) - (catch RedisCommandInterruptedException cause - (throw (InterruptedException. (ex-message cause)))))) +;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; +;; PUBLIC API +;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; - (eval [_ script] - (assert (valid-script? script) "expected valid script") - (impl-eval conn metrics script)) - - AutoCloseable - (close [_] (shutdown client conn)))))))))) - -(defn connect - [instance & {:as opts}] - (assert (satisfies? IRedis instance) "expected valid redis instance") - (-connect instance opts)) - -(defn get-or-connect - [instance key & {:as opts}] - (assert (satisfies? IRedis instance) "expected valid redis instance") - (-get-or-connect instance key opts)) +(defn build-set-args + [options] + (reduce-kv (fn [^SetArgs args k v] + (case k + :ex (if (instance? Duration v) + (.ex args ^Duration v) + (.ex args (long v))) + :px (.px args (long v)) + :nx (if v (.nx args) args) + :keep-ttl (if v (.keepttl args) args))) + (SetArgs.) + options)) (defn pubsub-listener [& {:keys [on-message on-subscribe on-unsubscribe]}] @@ -320,61 +302,172 @@ (when on-unsubscribe (on-unsubscribe nil topic count))))) -(def ^:private scripts-cache (atom {})) +(defn connect + [cfg & {:as options}] + (assert (contains? cfg ::mtx/metrics) "missing ::mtx/metrics on provided system") + (assert (contains? cfg ::client) "missing ::rds/client on provided system") -(defn- impl-eval - [^StatefulRedisConnection connection metrics script] - (let [cmd (.async ^StatefulRedisConnection connection) - keys (into-array String (map str (::rscript/keys script))) - vals (into-array String (map str (::rscript/vals script))) - sname (::rscript/name script)] + (let [state (::client cfg) - (letfn [(on-error [cause] - (if (instance? io.lettuce.core.RedisNoScriptException cause) - (do - (l/error :hint "no script found" :name sname :cause cause) - (->> (load-script) - (p/mcat eval-script))) - (if-let [on-error (::rscript/on-error script)] - (on-error cause) - (p/rejected cause)))) + cache (::cache state) + client (::client state) + timeout (or (some-> (:timeout options) ct/duration) + (::timeout state)) - (eval-script [sha] - (let [tpoint (ct/tpoint)] - (->> (.evalsha ^RedisScriptingAsyncCommands cmd - ^String sha - ^ScriptOutputType ScriptOutputType/MULTI - ^"[Ljava.lang.String;" keys - ^"[Ljava.lang.String;" vals) - (p/fmap (fn [result] - (let [elapsed (tpoint)] - (mtx/run! metrics {:id :redis-eval-timing - :labels [(name sname)] - :val (inst-ms elapsed)}) - (l/trace :hint "eval script" - :name (name sname) - :sha sha - :params (str/join "," (::rscript/vals script)) - :elapsed (ct/format-duration elapsed)) - result))) - (p/merr on-error)))) + conn (.connect ^RedisClient client + ^RedisCodec default-codec) + cmd (.sync ^StatefulRedisConnection conn)] - (read-script [] - (-> script ::rscript/path io/resource slurp)) + (.setTimeout ^StatefulRedisConnection conn ^Duration timeout) + (->Connection conn cmd timeout cache (::mtx/metrics cfg)))) - (load-script [] - (l/trace :hint "load script" :name sname) - (->> (.scriptLoad ^RedisScriptingAsyncCommands cmd - ^String (read-script)) - (p/fmap (fn [sha] - (swap! scripts-cache assoc sname sha) - sha))))] +(defn connect-pubsub + [cfg & {:as options}] + (let [state (::client cfg) + client (::client state) - (p/await! - (if-let [sha (get @scripts-cache sname)] - (eval-script sha) - (->> (load-script) - (p/mapcat eval-script))))))) + timeout (or (some-> (:timeout options) ct/duration) + (::timeout state)) + conn (.connectPubSub ^RedisClient client + ^RedisCodec default-codec) + cmd (.sync ^StatefulRedisPubSubConnection conn)] + + + (.setTimeout ^StatefulRedisPubSubConnection conn + ^Duration timeout) + (->SubscriptionConnection conn cmd timeout))) + +(defn get + [conn key] + (assert (string? key) "key must be string instance") + (try + (-get conn key) + (catch RedisCommandTimeoutException cause + (l/err :hint "timeout on get redis key" :key key :cause cause) + nil))) + +(defn set + ([conn key val] + (set conn key val nil)) + ([conn key val args] + (assert (string? key) "key must be string instance") + (assert (string? val) "val must be string instance") + (let [args (cond + (or (instance? SetArgs args) + (nil? args)) + args + + (map? args) + (build-set-args args) + + :else + (throw (IllegalArgumentException. "invalid args")))] + + (try + (-set conn key val args) + (catch RedisCommandTimeoutException cause + (l/err :hint "timeout on set redis key" :key key :cause cause) + nil))))) + +(defn del + [conn key-or-keys] + (let [keys (if (vector? key-or-keys) key-or-keys [key-or-keys])] + (assert (every? string? keys) "only string keys allowed") + (try + (-del conn keys) + (catch RedisCommandTimeoutException cause + (l/err :hint "timeout on del redis key" :key key :cause cause) + nil)))) + +(defn ping + [conn] + (-ping conn)) + +(defn blpop + [conn key-or-keys timeout] + (let [keys (if (vector? key-or-keys) key-or-keys [key-or-keys]) + timeout (cond + (ct/duration? timeout) + (/ (double (inst-ms timeout)) 1000.0) + + (double? timeout) + timeout + + (int? timeout) + (/ (double timeout) 1000.0) + + :else + 0)] + + (assert (every? string? keys) "only string keys allowed") + (-blpop conn keys timeout))) + +(defn rpush + [conn key elements] + (assert (string? key) "key must be string instance") + (assert (every? string? elements) "elements should be all strings") + (let [elements (vec elements)] + (-rpush conn key elements))) + +(defn publish + [conn topic payload] + (assert (string? topic) "expected topic to be string") + (assert (string? payload) "expected message to be a byte array") + (-publish conn topic payload)) + +(def ^:private schema:script + [:map {:title "script"} + [::rscript/name qualified-keyword?] + [::rscript/path ::sm/text] + [::rscript/keys {:optional true} [:vector :any]] + [::rscript/vals {:optional true} [:vector :any]]]) + +(def ^:private valid-script? + (sm/lazy-validator schema:script)) + +(defn eval + [conn script] + (assert (valid-script? script) "expected valid script") + (-eval conn script)) + +(defn add-listener + [conn listener] + (let [listener (cond + (map? listener) + (pubsub-listener listener) + + (instance? RedisPubSubListener listener) + listener + + :else + (throw (IllegalArgumentException. "invalid listener provided")))] + + (-add-listener conn listener))) + +(defn subscribe + [conn topic-or-topics] + (let [topics (if (vector? topic-or-topics) topic-or-topics [topic-or-topics])] + (assert (every? string? topics)) + (-subscribe conn topics))) + +(defn unsubscribe + [conn topic-or-topics] + (let [topics (if (vector? topic-or-topics) topic-or-topics [topic-or-topics])] + (assert (every? string? topics)) + (-unsubscribe conn topics))) + +(defn set-timeout + [conn timeout] + (let [timeout (ct/duration timeout)] + (-set-timeout conn timeout))) + +(defn get-timeout + [conn] + (-get-timeout conn)) + +(defn reset-timeout + [conn] + (-reset-timeout conn)) (defn timeout-exception? [cause] @@ -383,3 +476,121 @@ (defn exception? [cause] (instance? RedisException cause)) + +(defn get-pooled + [cfg] + (let [pool (::pool cfg)] + (gpool/get pool))) + +(defn close + [o] + (.close ^AutoCloseable o)) + +(defn pool + [cfg & {:as options}] + (gpool/create :create-fn (partial connect cfg options) + :destroy-fn close + :dispose-fn -reset-timeout)) + +(defn run! + [cfg f & args] + (if (gpool/pool? cfg) + (apply f {::pool cfg} f args) + (let [pool (::pool cfg)] + (with-open [^AutoCloseable conn (gpool/get pool)] + (apply f (assoc cfg ::conn @conn) args))))) + +;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; +;; INITIALIZATION +;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; + +(defmethod ig/expand-key ::client + [k v] + {k (-> (d/without-nils v) + (assoc ::timeout (ct/duration "10s")))}) + +(def ^:private schema:client + [:map {:title "RedisClient"} + [::timer [:fn #(instance? HashedWheelTimer %)]] + [::cache ::sm/atom] + [::timeout ::ct/duration] + [::resources [:fn #(instance? DefaultClientResources %)]]]) + +(def check-client + (sm/check-fn schema:client)) + +(sm/register! ::client schema:client) +(sm/register! + {:type ::pool + :pred gpool/pool?}) + +(def ^:private schema:client-params + [:map {:title "redis-params"} + ::wrk/netty-io-executor + ::wrk/netty-executor + [::uri ::sm/uri] + [::timeout ::ct/duration]]) + +(def ^:private check-client-params + (sm/check-fn schema:client-params)) + +(defmethod ig/assert-key ::client + [_ params] + (check-client-params params)) + +(defmethod ig/init-key ::client + [_ {:keys [::uri ::wrk/netty-io-executor ::wrk/netty-executor] :as params}] + + (l/inf :hint "initialize redis client" :uri (str uri)) + + (let [timer (HashedWheelTimer.) + cache (atom {}) + + resources (.. (DefaultClientResources/builder) + (eventExecutorGroup ^EventExecutorGroup netty-executor) + + ;; We provide lettuce with a shared event loop + ;; group instance instead of letting lettuce to + ;; create its own + (eventLoopGroupProvider + (reify io.lettuce.core.resource.EventLoopGroupProvider + (allocate [_ _] netty-io-executor) + (threadPoolSize [_] + (.executorCount ^NioEventLoopGroup netty-io-executor)) + (release [_ _ _ _ _] + ;; Do nothing + ) + (shutdown [_ _ _ _] + ;; Do nothing + ))) + + (timer ^Timer timer) + (build)) + + redis-uri (RedisURI/create ^String (str uri)) + client (RedisClient/create ^ClientResources resources + ^RedisURI redis-uri)] + + {::client client + ::cache cache + ::timer timer + ::timeout default-timeout + ::resources resources})) + +(defmethod ig/halt-key! ::client + [_ {:keys [::client ::timer ::resources]}] + (ex/ignoring (.shutdown ^RedisClient client)) + (ex/ignoring (.shutdown ^ClientResources resources)) + (ex/ignoring (.stop ^Timer timer))) + +(defmethod ig/assert-key ::pool + [_ {:keys [::client]}] + (check-client client)) + +(defmethod ig/init-key ::pool + [_ cfg] + (pool cfg {:timeout (ct/duration 2000)})) + +(defmethod ig/halt-key! ::pool + [_ instance] + (.close ^java.lang.AutoCloseable instance)) diff --git a/backend/src/app/rpc.clj b/backend/src/app/rpc.clj index 101ed659a1..63d778a89c 100644 --- a/backend/src/app/rpc.clj +++ b/backend/src/app/rpc.clj @@ -23,6 +23,7 @@ [app.main :as-alias main] [app.metrics :as mtx] [app.msgbus :as-alias mbus] + [app.redis :as rds] [app.rpc.climit :as climit] [app.rpc.cond :as cond] [app.rpc.helpers :as rph] @@ -261,6 +262,7 @@ ::session/manager ::http.client/client ::db/pool + ::rds/pool ::mbus/msgbus ::sto/storage ::mtx/metrics diff --git a/backend/src/app/rpc/rlimit.clj b/backend/src/app/rpc/rlimit.clj index bb2ccededa..d181b843fe 100644 --- a/backend/src/app/rpc/rlimit.clj +++ b/backend/src/app/rpc/rlimit.clj @@ -66,13 +66,6 @@ [integrant.core :as ig] [promesa.exec :as px])) -(def ^:private default-timeout - (ct/duration 400)) - -(def ^:private default-options - {:codec rds/string-codec - :timeout default-timeout}) - (def ^:private bucket-rate-limit-script {::rscript/name ::bucket-rate-limit ::rscript/path "app/rpc/rlimit/bucket.lua"}) @@ -177,11 +170,11 @@ :hint (str/ffmt "looks like '%' does not have a valid format" opts)))) (defmethod process-limit :bucket - [redis user-id now {:keys [::key ::params ::service ::capacity ::interval ::rate] :as limit}] + [rconn user-id now {:keys [::key ::params ::service ::capacity ::interval ::rate] :as limit}] (let [script (-> bucket-rate-limit-script (assoc ::rscript/keys [(str key "." service "." user-id)]) (assoc ::rscript/vals (conj params (->seconds now)))) - result (rds/eval redis script) + result (rds/eval rconn script) allowed? (boolean (nth result 0)) remaining (nth result 1) reset (* (/ (inst-ms interval) rate) @@ -199,13 +192,13 @@ (assoc ::lresult/remaining remaining)))) (defmethod process-limit :window - [redis user-id now {:keys [::nreq ::unit ::key ::service] :as limit}] + [rconn user-id now {:keys [::nreq ::unit ::key ::service] :as limit}] (let [ts (ct/truncate now unit) ttl (ct/diff now (ct/plus ts {unit 1})) script (-> window-rate-limit-script (assoc ::rscript/keys [(str key "." service "." user-id "." (ct/format-inst ts))]) (assoc ::rscript/vals [nreq (->seconds ttl)])) - result (rds/eval redis script) + result (rds/eval rconn script) allowed? (boolean (nth result 0)) remaining (nth result 1)] (l/trace :hint "limit processed" @@ -220,9 +213,9 @@ (assoc ::lresult/remaining remaining) (assoc ::lresult/reset (ct/plus ts {unit 1}))))) -(defn- process-limits! - [redis user-id limits now] - (let [results (into [] (map (partial process-limit redis user-id now)) limits) +(defn- process-limits + [rconn user-id limits now] + (let [results (into [] (map (partial process-limit rconn user-id now)) limits) remaining (->> results (d/index-by ::name ::lresult/remaining) (uri/map->query-string)) @@ -259,34 +252,25 @@ (some-> request inet/parse-request) uuid/zero))) -(defn process-request! - [{:keys [::rpc/rlimit ::rds/redis ::skey ::sname] :as cfg} params] - (when-let [limits (get-limits rlimit skey sname)] - (let [redis (rds/get-or-connect redis ::rpc/rlimit default-options) - uid (get-uid params) - ;; FIXME: why not clasic try/catch? - result (ex/try! (process-limits! redis uid limits (ct/now)))] - - (l/trc :hint "process-limits" - :service sname - :remaining (::remaingin result) - :reset (::reset result)) - - (cond - (ex/exception? result) - (do - (l/error :hint "error on processing rate-limit" :cause result) - {::enabled false}) - - (contains? cf/flags :soft-rpc-rlimit) +(defn- process-request' + [{:keys [::rds/conn] :as cfg} limits params] + (try + (let [uid (get-uid params) + result (process-limits conn uid limits (ct/now))] + (if (contains? cf/flags :soft-rpc-rlimit) {::enabled false} + result)) + (catch Throwable cause + (l/error :hint "error on processing rate-limit" :cause cause) + {::enabled false}))) - :else - result)))) +(defn- process-request + [{:keys [::rpc/rlimit ::skey ::sname] :as cfg} params] + (when-let [limits (get-limits rlimit skey sname)] + (rds/run! cfg process-request' limits params))) (defn wrap - [{:keys [::rpc/rlimit ::rds/redis] :as cfg} f mdata] - (assert (rds/redis? redis) "expected a valid redis instance") + [{:keys [::rpc/rlimit] :as cfg} f mdata] (assert (or (nil? rlimit) (valid-rlimit-instance? rlimit)) "expected a valid rlimit instance") (if rlimit @@ -298,7 +282,7 @@ (fn [hcfg params] (if @enabled - (let [result (process-request! cfg params)] + (let [result (process-request cfg params)] (if (::enabled result) (if (::allowed result) (-> (f hcfg params) @@ -399,7 +383,7 @@ (when-let [path (cf/get :rpc-rlimit-config)] (and (fs/exists? path) (fs/regular-file? path) path))) -(defmethod ig/assert-key :app.rpc/rlimit +(defmethod ig/assert-key ::rpc/rlimit [_ {:keys [::wrk/executor]}] (assert (sm/valid? ::wrk/executor executor) "expect valid executor")) diff --git a/backend/src/app/worker/dispatcher.clj b/backend/src/app/worker/dispatcher.clj index 2b49d43d77..62b03b80fa 100644 --- a/backend/src/app/worker/dispatcher.clj +++ b/backend/src/app/worker/dispatcher.clj @@ -7,7 +7,6 @@ (ns app.worker.dispatcher (:require [app.common.data :as d] - [app.common.data.macros :as dm] [app.common.logging :as l] [app.common.schema :as sm] [app.common.time :as ct] @@ -18,7 +17,9 @@ [app.worker :as-alias wrk] [cuerdas.core :as str] [integrant.core :as ig] - [promesa.exec :as px])) + [promesa.exec :as px]) + (:import + java.lang.AutoCloseable)) (set! *warn-on-reflection* true) @@ -27,7 +28,7 @@ [::wrk/tenant ::sm/text] ::mtx/metrics ::db/pool - ::rds/redis]) + ::rds/client]) (defmethod ig/expand-key ::wrk/dispatcher [k v] @@ -49,59 +50,74 @@ limit ? for update skip locked") -(defmethod ig/init-key ::wrk/dispatcher - [_ {:keys [::db/pool ::rds/redis ::wrk/tenant ::batch-size ::timeout] :as cfg}] - (letfn [(get-tasks [conn] - (let [prefix (str tenant ":%")] - (seq (db/exec! conn [sql:select-next-tasks prefix batch-size])))) +(def ^:private sql:mark-task-scheduled + "UPDATE task SET status = 'scheduled' + WHERE id = ANY(?)") - (push-tasks! [conn rconn [queue tasks]] +(defmethod ig/init-key ::wrk/dispatcher + [_ {:keys [::db/pool ::wrk/tenant ::batch-size ::timeout] :as cfg}] + (letfn [(get-tasks [{:keys [::db/conn]}] + (let [prefix (str tenant ":%")] + (not-empty (db/exec! conn [sql:select-next-tasks prefix batch-size])))) + + (mark-as-scheduled [{:keys [::db/conn]} ids] + (let [sql [sql:mark-task-scheduled + (db/create-array conn "uuid" ids)]] + (db/exec-one! conn sql))) + + (push-tasks [{:keys [::rds/conn] :as cfg} [queue tasks]] (let [ids (mapv :id tasks) key (str/ffmt "taskq:%" queue) - res (rds/rpush rconn key (mapv t/encode ids)) - sql [(str "update task set status = 'scheduled'" - " where id = ANY(?)") - (db/create-array conn "uuid" ids)]] + res (rds/rpush conn key (mapv t/encode-str ids))] - (db/exec-one! conn sql) + (mark-as-scheduled cfg ids) (l/trc :hist "enqueue tasks on redis" :queue queue :tasks (count ids) :queued res))) - (run-batch! [rconn] - (try - (db/tx-run! cfg (fn [{:keys [::db/conn]}] - (if-let [tasks (get-tasks conn)] - (->> (group-by :queue tasks) - (run! (partial push-tasks! conn rconn))) - ;; FIXME: this sleep should be outside the transaction - (px/sleep (::wait-duration cfg))))) - (catch InterruptedException cause - (throw cause)) - (catch Exception cause - (cond - (rds/exception? cause) - (do - (l/wrn :hint "redis exception (will retry in an instant)" :cause cause) - (px/sleep timeout)) + (run-batch' [cfg] + (if-let [tasks (get-tasks cfg)] + (->> (group-by :queue tasks) + (run! (partial push-tasks cfg))) + ::wait)) - (db/sql-exception? cause) - (do - (l/wrn :hint "database exception (will retry in an instant)" :cause cause) - (px/sleep timeout)) + (run-batch [] + (let [rconn (rds/connect cfg)] + (try + (-> cfg + (assoc ::rds/conn rconn) + (db/tx-run! run-batch')) - :else - (do - (l/err :hint "unhandled exception (will retry in an instant)" :cause cause) - (px/sleep timeout)))))) + (catch InterruptedException cause + (throw cause)) + (catch Exception cause + (cond + (rds/exception? cause) + (do + (l/wrn :hint "redis exception (will retry in an instant)" :cause cause) + (px/sleep timeout)) + + (db/sql-exception? cause) + (do + (l/wrn :hint "database exception (will retry in an instant)" :cause cause) + (px/sleep timeout)) + + :else + (do + (l/err :hint "unhandled exception (will retry in an instant)" :cause cause) + (px/sleep timeout)))) + + (finally + (.close ^AutoCloseable rconn))))) (dispatcher [] (l/inf :hint "started") (try - (dm/with-open [rconn (rds/connect redis)] - (loop [] - (run-batch! rconn) + (loop [] + (let [result (run-batch)] + (when (= result ::wait) + (px/sleep (::wait-duration cfg))) (recur))) (catch InterruptedException _ (l/trc :hint "interrupted")) diff --git a/backend/src/app/worker/runner.clj b/backend/src/app/worker/runner.clj index c47d4d7759..0f245da19b 100644 --- a/backend/src/app/worker/runner.clj +++ b/backend/src/app/worker/runner.clj @@ -8,7 +8,6 @@ "Async tasks abstraction (impl)." (:require [app.common.data :as d] - [app.common.data.macros :as dm] [app.common.exceptions :as ex] [app.common.logging :as l] [app.common.schema :as sm] @@ -20,7 +19,9 @@ [app.worker :as wrk] [cuerdas.core :as str] [integrant.core :as ig] - [promesa.exec :as px])) + [promesa.exec :as px]) + (:import + java.lang.AutoCloseable)) (set! *warn-on-reflection* true) @@ -76,6 +77,7 @@ :queue queue :runner-id id :retry (:retry-num task)) + (let [tpoint (ct/tpoint) task-fn (wrk/get-task registry (:name task)) result (when task-fn (task-fn task)) @@ -149,7 +151,7 @@ {::task task}))))) (defn- run-worker-loop! - [{:keys [::db/pool ::rds/rconn ::timeout ::queue] :as cfg}] + [{:keys [::db/pool ::rds/conn ::timeout ::queue] :as cfg}] (letfn [(handle-task-retry [{:keys [error inc-by delay] :or {inc-by 1 delay 1000} :as result}] (let [explain (if (ex/exception? error) (ex-message error) @@ -187,9 +189,9 @@ {:id (:id task)}) nil)) - (decode-payload [^bytes payload] + (decode-payload [payload] (try - (let [task-id (t/decode payload)] + (let [task-id (t/decode-str payload)] (if (uuid? task-id) task-id (l/err :hint "received unexpected payload (uuid expected)" @@ -197,7 +199,7 @@ (catch Throwable cause (l/err :hint "unable to decode payload" :payload payload - :length (alength payload) + :length (alength ^String/1 payload) :cause cause)))) (process-result [{:keys [status] :as result}] @@ -224,8 +226,8 @@ :cause cause))))))] (try - (let [key (str/ffmt "taskq:%" queue) - [_ payload] (rds/blpop rconn timeout [key])] + (let [key (str/ffmt "taskq:%" queue) + [_ payload] (rds/blpop conn [key] timeout)] (some-> payload decode-payload run-task-loop)) @@ -244,36 +246,37 @@ (l/err :hint "unhandled exception" :cause cause)))))) (defn- start-thread! - [{:keys [::rds/redis ::id ::queue ::wrk/tenant] :as cfg}] + [{:keys [::id ::queue ::wrk/tenant] :as cfg}] (px/thread - {:name (str "penpot/worker-runner/" id)} + {:name (str "penpot/job-runner/" id)} (l/inf :hint "started" :id id :queue queue) - (try - (dm/with-open [rconn (rds/connect redis)] - (let [cfg (-> cfg - (assoc ::rds/rconn rconn) - (assoc ::queue (str/ffmt "%:%" tenant queue)) - (assoc ::timeout (ct/duration "5s")))] - (loop [] - (when (px/interrupted?) - (throw (InterruptedException. "interrupted"))) - (run-worker-loop! cfg) - (recur)))) + (let [rconn (rds/connect cfg)] + (try + (loop [cfg (-> cfg + (assoc ::rds/conn rconn) + (assoc ::queue (str/ffmt "%:%" tenant queue)) + (assoc ::timeout (ct/duration "5s")))] + (when (px/interrupted?) + (throw (InterruptedException. "interrupted"))) - (catch InterruptedException _ - (l/dbg :hint "interrupted" - :id id - :queue queue)) - (catch Throwable cause - (l/err :hint "unexpected exception" - :id id - :queue queue - :cause cause)) - (finally - (l/inf :hint "terminated" - :id id - :queue queue))))) + (run-worker-loop! cfg) + (recur cfg)) + + (catch InterruptedException _ + (l/dbg :hint "interrupted" + :id id + :queue queue)) + (catch Throwable cause + (l/err :hint "unexpected exception" + :id id + :queue queue + :cause cause)) + (finally + (.close ^AutoCloseable rconn) + (l/inf :hint "terminated" + :id id + :queue queue)))))) (def ^:private schema:params [:map @@ -283,7 +286,7 @@ ::wrk/registry ::mtx/metrics ::db/pool - ::rds/redis]) + ::rds/client]) (defmethod ig/assert-key ::wrk/runner [_ params] diff --git a/backend/test/backend_tests/helpers.clj b/backend/test/backend_tests/helpers.clj index bd7b616a55..d504e05aa8 100644 --- a/backend/test/backend_tests/helpers.clj +++ b/backend/test/backend_tests/helpers.clj @@ -97,7 +97,7 @@ :thumbnail-uri "test" :path (-> "backend_tests/test_files/template.penpot" io/resource fs/path)}] system (-> (merge main/system-config main/worker-config) - (assoc-in [:app.redis/redis :app.redis/uri] (:redis-uri config)) + (assoc-in [:app.redis/client :app.redis/uri] (:redis-uri config)) (assoc-in [::db/pool ::db/uri] (:database-uri config)) (assoc-in [::db/pool ::db/username] (:database-username config)) (assoc-in [::db/pool ::db/password] (:database-password config)) diff --git a/common/deps.edn b/common/deps.edn index 13a272380e..dcdf4fe0a8 100644 --- a/common/deps.edn +++ b/common/deps.edn @@ -6,6 +6,8 @@ org.clojure/data.fressian {:mvn/version "1.1.0"} org.clojure/clojurescript {:mvn/version "1.12.42"} + org.apache.commons/commons-pool2 {:mvn/version "2.12.1"} + ;; Logging org.apache.logging.log4j/log4j-api {:mvn/version "2.25.1"} org.apache.logging.log4j/log4j-core {:mvn/version "2.25.1"} diff --git a/common/src/app/common/generic_pool.clj b/common/src/app/common/generic_pool.clj new file mode 100644 index 0000000000..bccf0b06ec --- /dev/null +++ b/common/src/app/common/generic_pool.clj @@ -0,0 +1,58 @@ +;; This Source Code Form is subject to the terms of the Mozilla Public +;; License, v. 2.0. If a copy of the MPL was not distributed with this +;; file, You can obtain one at http://mozilla.org/MPL/2.0/. +;; +;; Copyright (c) KALEIDOS INC + +(ns app.common.generic-pool + (:refer-clojure :exclude [get]) + (:import + java.lang.AutoCloseable + org.apache.commons.pool2.ObjectPool + org.apache.commons.pool2.PooledObject + org.apache.commons.pool2.PooledObjectFactory + org.apache.commons.pool2.impl.DefaultPooledObject + org.apache.commons.pool2.impl.SoftReferenceObjectPool)) + +(defn pool? + [o] + (instance? ObjectPool o)) + +(defn create + [& {:keys [create-fn destroy-fn validate-fn dispose-fn]}] + (SoftReferenceObjectPool. + (reify PooledObjectFactory + (activateObject [_ _]) + (destroyObject [_ o] + (let [object (.getObject ^PooledObject o)] + (destroy-fn object))) + + (destroyObject [_ o _] + (let [object (.getObject ^PooledObject o)] + (destroy-fn object))) + + (passivateObject [_ o] + (when (fn? dispose-fn) + (let [object (.getObject ^PooledObject o)] + (dispose-fn object)))) + + (validateObject [_ o] + (if (fn? validate-fn) + (let [object (.getObject ^PooledObject o)] + (validate-fn object)) + true)) + + (makeObject [_] + (let [object (create-fn)] + (DefaultPooledObject. object)))))) + +(defn get + [^ObjectPool pool] + (let [object (.borrowObject pool)] + (reify + clojure.lang.IDeref + (deref [_] object) + + AutoCloseable + (close [_] + (.returnObject pool object)))))