mirror of
https://github.com/penpot/penpot.git
synced 2025-12-11 22:14:05 +01:00
♻️ Refactor redis internal API
The main idea behind this refactor is make the API less especialized for specific use of out internal submidules and make it more general and usable for more general purposes (per example cache)
This commit is contained in:
@@ -181,9 +181,9 @@
|
||||
::mtx/routes
|
||||
{::mtx/metrics (ig/ref ::mtx/metrics)}
|
||||
|
||||
::rds/redis
|
||||
{::rds/uri (cf/get :redis-uri)
|
||||
::mtx/metrics (ig/ref ::mtx/metrics)
|
||||
::rds/client
|
||||
{::rds/uri
|
||||
(cf/get :redis-uri)
|
||||
|
||||
::wrk/netty-executor
|
||||
(ig/ref ::wrk/netty-executor)
|
||||
@@ -191,9 +191,14 @@
|
||||
::wrk/netty-io-executor
|
||||
(ig/ref ::wrk/netty-io-executor)}
|
||||
|
||||
::rds/pool
|
||||
{::rds/client (ig/ref ::rds/client)
|
||||
::mtx/metrics (ig/ref ::mtx/metrics)}
|
||||
|
||||
::mbus/msgbus
|
||||
{::wrk/executor (ig/ref ::wrk/netty-executor)
|
||||
::rds/redis (ig/ref ::rds/redis)}
|
||||
{::wrk/executor (ig/ref ::wrk/netty-executor)
|
||||
::rds/client (ig/ref ::rds/client)
|
||||
::mtx/metrics (ig/ref ::mtx/metrics)}
|
||||
|
||||
:app.storage.tmp/cleaner
|
||||
{::wrk/executor (ig/ref ::wrk/netty-executor)}
|
||||
@@ -315,13 +320,14 @@
|
||||
:app.rpc/methods
|
||||
{::http.client/client (ig/ref ::http.client/client)
|
||||
::db/pool (ig/ref ::db/pool)
|
||||
::rds/pool (ig/ref ::rds/pool)
|
||||
::wrk/executor (ig/ref ::wrk/netty-executor)
|
||||
::session/manager (ig/ref ::session/manager)
|
||||
::ldap/provider (ig/ref ::ldap/provider)
|
||||
::sto/storage (ig/ref ::sto/storage)
|
||||
::mtx/metrics (ig/ref ::mtx/metrics)
|
||||
::mbus/msgbus (ig/ref ::mbus/msgbus)
|
||||
::rds/redis (ig/ref ::rds/redis)
|
||||
::rds/client (ig/ref ::rds/client)
|
||||
|
||||
::rpc/climit (ig/ref ::rpc/climit)
|
||||
::rpc/rlimit (ig/ref ::rpc/rlimit)
|
||||
@@ -515,7 +521,7 @@
|
||||
:task :audit-log-gc})]}
|
||||
|
||||
::wrk/dispatcher
|
||||
{::rds/redis (ig/ref ::rds/redis)
|
||||
{::rds/client (ig/ref ::rds/client)
|
||||
::mtx/metrics (ig/ref ::mtx/metrics)
|
||||
::db/pool (ig/ref ::db/pool)
|
||||
::wrk/tenant (cf/get :tenant)}
|
||||
@@ -524,7 +530,7 @@
|
||||
{::wrk/parallelism (cf/get ::worker-default-parallelism 1)
|
||||
::wrk/queue :default
|
||||
::wrk/tenant (cf/get :tenant)
|
||||
::rds/redis (ig/ref ::rds/redis)
|
||||
::rds/client (ig/ref ::rds/client)
|
||||
::wrk/registry (ig/ref ::wrk/registry)
|
||||
::mtx/metrics (ig/ref ::mtx/metrics)
|
||||
::db/pool (ig/ref ::db/pool)}
|
||||
@@ -533,7 +539,7 @@
|
||||
{::wrk/parallelism (cf/get ::worker-webhook-parallelism 1)
|
||||
::wrk/queue :webhooks
|
||||
::wrk/tenant (cf/get :tenant)
|
||||
::rds/redis (ig/ref ::rds/redis)
|
||||
::rds/client (ig/ref ::rds/client)
|
||||
::wrk/registry (ig/ref ::wrk/registry)
|
||||
::mtx/metrics (ig/ref ::mtx/metrics)
|
||||
::db/pool (ig/ref ::db/pool)}})
|
||||
|
||||
@@ -16,7 +16,6 @@
|
||||
[app.redis :as rds]
|
||||
[app.worker :as wrk]
|
||||
[integrant.core :as ig]
|
||||
[promesa.core :as p]
|
||||
[promesa.exec :as px]
|
||||
[promesa.exec.csp :as sp]))
|
||||
|
||||
@@ -59,14 +58,16 @@
|
||||
(assoc ::timeout (ct/duration {:seconds 30})))})
|
||||
|
||||
(def ^:private schema:params
|
||||
[:map ::rds/redis ::wrk/executor])
|
||||
[:map
|
||||
::rds/client
|
||||
::wrk/executor])
|
||||
|
||||
(defmethod ig/assert-key ::msgbus
|
||||
[_ params]
|
||||
(assert (sm/check schema:params params)))
|
||||
|
||||
(defmethod ig/init-key ::msgbus
|
||||
[_ {:keys [::buffer-size ::wrk/executor ::timeout ::rds/redis] :as cfg}]
|
||||
[_ {:keys [::buffer-size ::wrk/executor ::timeout] :as cfg}]
|
||||
(l/info :hint "initialize msgbus" :buffer-size buffer-size)
|
||||
(let [cmd-ch (sp/chan :buf buffer-size)
|
||||
rcv-ch (sp/chan :buf (sp/dropping-buffer buffer-size))
|
||||
@@ -74,8 +75,9 @@
|
||||
:xf xform-prefix-topic)
|
||||
state (agent {})
|
||||
|
||||
pconn (rds/connect redis :type :default :timeout timeout)
|
||||
sconn (rds/connect redis :type :pubsub :timeout timeout)
|
||||
;; Open persistent connections to redis
|
||||
pconn (rds/connect cfg :timeout timeout)
|
||||
sconn (rds/connect-pubsub cfg :timeout timeout)
|
||||
|
||||
_ (set-error-handler! state #(l/error :cause % :hint "unexpected error on agent" ::l/sync? true))
|
||||
_ (set-error-mode! state :continue)
|
||||
@@ -189,14 +191,13 @@
|
||||
|
||||
(defn- create-listener
|
||||
[rcv-ch]
|
||||
(rds/pubsub-listener
|
||||
:on-message (fn [_ topic message]
|
||||
{:on-message (fn [_ topic message]
|
||||
;; There are no back pressure, so we use a slidding
|
||||
;; buffer for cases when the pubsub broker sends
|
||||
;; more messages that we can process.
|
||||
(let [val {:topic topic :message (t/decode message)}]
|
||||
(let [val {:topic topic :message (t/decode-str message)}]
|
||||
(when-not (sp/offer! rcv-ch val)
|
||||
(l/warn :msg "dropping message on subscription loop"))))))
|
||||
(l/warn :msg "dropping message on subscription loop"))))})
|
||||
|
||||
(defn- process-input
|
||||
[{:keys [::state ::wrk/executor] :as cfg} topic message]
|
||||
@@ -262,7 +263,7 @@
|
||||
intended to be used in core.async go blocks."
|
||||
[{:keys [::pconn] :as cfg} {:keys [topic message]}]
|
||||
(try
|
||||
(p/await! (rds/publish pconn topic (t/encode message)))
|
||||
(rds/publish pconn topic (t/encode-str message))
|
||||
(catch InterruptedException cause
|
||||
(throw cause))
|
||||
(catch Throwable cause
|
||||
|
||||
@@ -6,22 +6,22 @@
|
||||
|
||||
(ns app.redis
|
||||
"The msgbus abstraction implemented using redis as underlying backend."
|
||||
(:refer-clojure :exclude [eval])
|
||||
(:refer-clojure :exclude [eval get set run!])
|
||||
(:require
|
||||
[app.common.data :as d]
|
||||
[app.common.exceptions :as ex]
|
||||
[app.common.generic-pool :as gpool]
|
||||
[app.common.logging :as l]
|
||||
[app.common.schema :as sm]
|
||||
[app.common.time :as ct]
|
||||
[app.metrics :as mtx]
|
||||
[app.redis.script :as-alias rscript]
|
||||
[app.util.cache :as cache]
|
||||
[app.worker :as-alias wrk]
|
||||
[app.worker :as wrk]
|
||||
[app.worker.executor]
|
||||
[clojure.core :as c]
|
||||
[clojure.java.io :as io]
|
||||
[cuerdas.core :as str]
|
||||
[integrant.core :as ig]
|
||||
[promesa.core :as p])
|
||||
[integrant.core :as ig])
|
||||
(:import
|
||||
clojure.lang.MapEntry
|
||||
io.lettuce.core.KeyValue
|
||||
@@ -31,12 +31,10 @@
|
||||
io.lettuce.core.RedisException
|
||||
io.lettuce.core.RedisURI
|
||||
io.lettuce.core.ScriptOutputType
|
||||
io.lettuce.core.api.StatefulConnection
|
||||
io.lettuce.core.SetArgs
|
||||
io.lettuce.core.api.StatefulRedisConnection
|
||||
io.lettuce.core.api.async.RedisAsyncCommands
|
||||
io.lettuce.core.api.async.RedisScriptingAsyncCommands
|
||||
io.lettuce.core.api.sync.RedisCommands
|
||||
io.lettuce.core.codec.ByteArrayCodec
|
||||
io.lettuce.core.api.sync.RedisScriptingCommands
|
||||
io.lettuce.core.codec.RedisCodec
|
||||
io.lettuce.core.codec.StringCodec
|
||||
io.lettuce.core.pubsub.RedisPubSubListener
|
||||
@@ -53,245 +51,229 @@
|
||||
|
||||
(set! *warn-on-reflection* true)
|
||||
|
||||
(declare ^:private initialize-resources)
|
||||
(declare ^:private shutdown-resources)
|
||||
(declare ^:private impl-eval)
|
||||
(def ^:const MAX-EVAL-RETRIES 18)
|
||||
|
||||
(defprotocol IRedis
|
||||
(-connect [_ options])
|
||||
(-get-or-connect [_ key options]))
|
||||
(def default-timeout
|
||||
(ct/duration "10s"))
|
||||
|
||||
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
|
||||
;; IMPL & PRIVATE API
|
||||
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
|
||||
|
||||
(defprotocol IConnection
|
||||
(publish [_ topic message])
|
||||
(rpush [_ key payload])
|
||||
(blpop [_ timeout keys])
|
||||
(eval [_ script]))
|
||||
(-set-timeout [_ timeout] "set connection timeout")
|
||||
(-get-timeout [_] "get current timeout")
|
||||
(-reset-timeout [_] "reset to default timeout"))
|
||||
|
||||
(defprotocol IDefaultConnection
|
||||
"Public API of default redis connection"
|
||||
(-publish [_ topic message])
|
||||
(-rpush [_ key payload])
|
||||
(-blpop [_ timeout keys])
|
||||
(-eval [_ script])
|
||||
(-get [_ key])
|
||||
(-set [_ key val args])
|
||||
(-del [_ key-or-keys])
|
||||
(-ping [_]))
|
||||
|
||||
(defprotocol IPubSubConnection
|
||||
(add-listener [_ listener])
|
||||
(subscribe [_ topics])
|
||||
(unsubscribe [_ topics]))
|
||||
(-add-listener [_ listener])
|
||||
(-subscribe [_ topics])
|
||||
(-unsubscribe [_ topics]))
|
||||
|
||||
(def default-codec
|
||||
(RedisCodec/of StringCodec/UTF8 ByteArrayCodec/INSTANCE))
|
||||
|
||||
(def string-codec
|
||||
(def ^:private default-codec
|
||||
(RedisCodec/of StringCodec/UTF8 StringCodec/UTF8))
|
||||
|
||||
(sm/register!
|
||||
{:type ::connection
|
||||
:pred #(satisfies? IConnection %)
|
||||
:type-properties
|
||||
{:title "connection"
|
||||
:description "redis connection instance"}})
|
||||
(defn- impl-eval
|
||||
[cmd cache metrics script]
|
||||
(let [keys (into-array String (map str (::rscript/keys script)))
|
||||
vals (into-array String (map str (::rscript/vals script)))
|
||||
sname (::rscript/name script)
|
||||
|
||||
(sm/register!
|
||||
{:type ::pubsub-connection
|
||||
:pred #(satisfies? IPubSubConnection %)
|
||||
:type-properties
|
||||
{:title "connection"
|
||||
:description "redis connection instance"}})
|
||||
read-script
|
||||
(fn []
|
||||
(-> script ::rscript/path io/resource slurp))
|
||||
|
||||
(defn redis?
|
||||
[o]
|
||||
(satisfies? IRedis o))
|
||||
load-script
|
||||
(fn []
|
||||
(let [id (.scriptLoad ^RedisScriptingCommands cmd
|
||||
^String (read-script))]
|
||||
(swap! cache assoc sname id)
|
||||
(l/trc :hint "load script" :name sname :id id)
|
||||
|
||||
(sm/register!
|
||||
{:type ::redis
|
||||
:pred redis?})
|
||||
id))
|
||||
|
||||
(def ^:private schema:script
|
||||
[:map {:title "script"}
|
||||
[::rscript/name qualified-keyword?]
|
||||
[::rscript/path ::sm/text]
|
||||
[::rscript/keys {:optional true} [:vector :any]]
|
||||
[::rscript/vals {:optional true} [:vector :any]]])
|
||||
eval-script
|
||||
(fn [id]
|
||||
(try
|
||||
(let [tpoint (ct/tpoint)
|
||||
result (.evalsha ^RedisScriptingCommands cmd
|
||||
^String id
|
||||
^ScriptOutputType ScriptOutputType/MULTI
|
||||
^"[Ljava.lang.String;" keys
|
||||
^"[Ljava.lang.String;" vals)
|
||||
elapsed (tpoint)]
|
||||
|
||||
(def valid-script?
|
||||
(sm/lazy-validator schema:script))
|
||||
(mtx/run! metrics {:id :redis-eval-timing
|
||||
:labels [(name sname)]
|
||||
:val (inst-ms elapsed)})
|
||||
|
||||
(defmethod ig/expand-key ::redis
|
||||
[k v]
|
||||
{k (-> (d/without-nils v)
|
||||
(assoc ::timeout (ct/duration "10s")))})
|
||||
(l/trc :hint "eval script"
|
||||
:name (name sname)
|
||||
:id id
|
||||
:params (str/join "," (::rscript/vals script))
|
||||
:elapsed (ct/format-duration elapsed))
|
||||
|
||||
(def ^:private schema:redis-params
|
||||
[:map {:title "redis-params"}
|
||||
::wrk/netty-io-executor
|
||||
::wrk/netty-executor
|
||||
::mtx/metrics
|
||||
[::uri ::sm/uri]
|
||||
[::timeout ::ct/duration]])
|
||||
result)
|
||||
|
||||
(defmethod ig/assert-key ::redis
|
||||
[_ params]
|
||||
(assert (sm/check schema:redis-params params)))
|
||||
(catch io.lettuce.core.RedisNoScriptException _cause
|
||||
::load)
|
||||
|
||||
(defmethod ig/init-key ::redis
|
||||
[_ params]
|
||||
(initialize-resources params))
|
||||
(catch Throwable cause
|
||||
(when-let [on-error (::rscript/on-error script)]
|
||||
(on-error cause))
|
||||
(throw cause))))
|
||||
|
||||
(defmethod ig/halt-key! ::redis
|
||||
[_ instance]
|
||||
(d/close! instance))
|
||||
eval-script'
|
||||
(fn [id]
|
||||
(loop [id id
|
||||
retries 0]
|
||||
(if (> retries MAX-EVAL-RETRIES)
|
||||
(ex/raise :type :internal
|
||||
:code ::max-eval-retries-reached
|
||||
:hint (str "unable to eval redis script " sname))
|
||||
(let [result (eval-script id)]
|
||||
(if (= result ::load)
|
||||
(recur (load-script)
|
||||
(inc retries))
|
||||
result)))))]
|
||||
|
||||
(defn- initialize-resources
|
||||
"Initialize redis connection resources"
|
||||
[{:keys [::uri ::mtx/metrics ::wrk/netty-io-executor ::wrk/netty-executor] :as params}]
|
||||
(if-let [id (c/get @cache sname)]
|
||||
(eval-script' id)
|
||||
(-> (load-script)
|
||||
(eval-script')))))
|
||||
|
||||
(l/inf :hint "initialize redis resources"
|
||||
:uri (str uri))
|
||||
(deftype Connection [^StatefulRedisConnection conn
|
||||
^RedisCommands cmd
|
||||
^Duration timeout
|
||||
cache metrics]
|
||||
AutoCloseable
|
||||
(close [_]
|
||||
(ex/ignoring (.close conn)))
|
||||
|
||||
(let [timer (HashedWheelTimer.)
|
||||
resources (.. (DefaultClientResources/builder)
|
||||
(eventExecutorGroup ^EventExecutorGroup netty-executor)
|
||||
IConnection
|
||||
(-set-timeout [_ timeout]
|
||||
(.setTimeout conn ^Duration timeout))
|
||||
|
||||
;; We provide lettuce with a shared event loop
|
||||
;; group instance instead of letting lettuce to
|
||||
;; create its own
|
||||
(eventLoopGroupProvider
|
||||
(reify io.lettuce.core.resource.EventLoopGroupProvider
|
||||
(allocate [_ _] netty-io-executor)
|
||||
(threadPoolSize [_]
|
||||
(.executorCount ^NioEventLoopGroup netty-io-executor))
|
||||
(release [_ _ _ _ _]
|
||||
;; Do nothing
|
||||
)
|
||||
(shutdown [_ _ _ _]
|
||||
;; Do nothing
|
||||
)))
|
||||
(-reset-timeout [_]
|
||||
(.setTimeout conn timeout))
|
||||
|
||||
(timer ^Timer timer)
|
||||
(build))
|
||||
(-get-timeout [_]
|
||||
(.getTimeout conn))
|
||||
|
||||
redis-uri (RedisURI/create ^String (str uri))
|
||||
IDefaultConnection
|
||||
(-publish [_ topic message]
|
||||
(.publish cmd ^String topic ^String message))
|
||||
|
||||
shutdown (fn [client conn]
|
||||
(ex/ignoring (.close ^StatefulConnection conn))
|
||||
(ex/ignoring (.close ^RedisClient client))
|
||||
(l/trc :hint "disconnect" :hid (hash client)))
|
||||
(-rpush [_ key elements]
|
||||
(try
|
||||
(let [vals (make-array String (count elements))]
|
||||
(loop [i 0 xs (seq elements)]
|
||||
(when xs
|
||||
(aset ^"[[Ljava.lang.String;" vals i ^String (first xs))
|
||||
(recur (inc i) (next xs))))
|
||||
|
||||
on-remove (fn [key val cause]
|
||||
(l/trace :hint "evict connection (cache)" :key key :reason cause)
|
||||
(some-> val d/close!))
|
||||
(.rpush cmd
|
||||
^String key
|
||||
^"[[Ljava.lang.String;" vals))
|
||||
|
||||
cache (cache/create :executor netty-executor
|
||||
:on-remove on-remove
|
||||
:keepalive "5m")]
|
||||
(reify
|
||||
java.lang.AutoCloseable
|
||||
(close [_]
|
||||
(ex/ignoring (cache/invalidate! cache))
|
||||
(ex/ignoring (.shutdown ^ClientResources resources))
|
||||
(ex/ignoring (.stop ^Timer timer)))
|
||||
(catch RedisCommandInterruptedException cause
|
||||
(throw (InterruptedException. (ex-message cause))))))
|
||||
|
||||
IRedis
|
||||
(-get-or-connect [this key options]
|
||||
(let [create (fn [_] (-connect this options))]
|
||||
(cache/get cache key create)))
|
||||
(-blpop [_ keys timeout]
|
||||
(try
|
||||
(let [keys (into-array String keys)]
|
||||
(when-let [res (.blpop cmd
|
||||
^double timeout
|
||||
^"[Ljava.lang.String;" keys)]
|
||||
(MapEntry/create
|
||||
(.getKey ^KeyValue res)
|
||||
(.getValue ^KeyValue res))))
|
||||
(catch RedisCommandInterruptedException cause
|
||||
(throw (InterruptedException. (ex-message cause))))))
|
||||
|
||||
(-connect [_ options]
|
||||
(let [timeout (or (:timeout options) (::timeout params))
|
||||
codec (get options :codec default-codec)
|
||||
type (get options :type :default)
|
||||
client (RedisClient/create ^ClientResources resources
|
||||
^RedisURI redis-uri)]
|
||||
(-get [_ key]
|
||||
(assert (string? key) "key expected to be string")
|
||||
(.get cmd ^String key))
|
||||
|
||||
(l/trc :hint "connect" :hid (hash client))
|
||||
(if (= type :pubsub)
|
||||
(let [conn (.connectPubSub ^RedisClient client
|
||||
^RedisCodec codec)]
|
||||
(.setTimeout ^StatefulConnection conn
|
||||
^Duration timeout)
|
||||
(reify
|
||||
IPubSubConnection
|
||||
(add-listener [_ listener]
|
||||
(assert (instance? RedisPubSubListener listener) "expected listener instance")
|
||||
(.addListener ^StatefulRedisPubSubConnection conn
|
||||
^RedisPubSubListener listener))
|
||||
(-set [_ key val args]
|
||||
(.set cmd
|
||||
^String key
|
||||
^bytes val
|
||||
^SetArgs args))
|
||||
|
||||
(subscribe [_ topics]
|
||||
(try
|
||||
(let [topics (into-array String (map str topics))
|
||||
cmd (.sync ^StatefulRedisPubSubConnection conn)]
|
||||
(.subscribe ^RedisPubSubCommands cmd topics))
|
||||
(catch RedisCommandInterruptedException cause
|
||||
(throw (InterruptedException. (ex-message cause))))))
|
||||
(-del [_ keys]
|
||||
(let [keys (into-array String keys)]
|
||||
(.del cmd ^String/1 keys)))
|
||||
|
||||
(unsubscribe [_ topics]
|
||||
(try
|
||||
(let [topics (into-array String (map str topics))
|
||||
cmd (.sync ^StatefulRedisPubSubConnection conn)]
|
||||
(.unsubscribe ^RedisPubSubCommands cmd topics))
|
||||
(catch RedisCommandInterruptedException cause
|
||||
(throw (InterruptedException. (ex-message cause))))))
|
||||
(-ping [_]
|
||||
(.ping cmd))
|
||||
|
||||
(-eval [_ script]
|
||||
(impl-eval cmd cache metrics script)))
|
||||
|
||||
|
||||
AutoCloseable
|
||||
(close [_] (shutdown client conn))))
|
||||
(deftype SubscriptionConnection [^StatefulRedisPubSubConnection conn
|
||||
^RedisPubSubCommands cmd
|
||||
^Duration timeout]
|
||||
AutoCloseable
|
||||
(close [_]
|
||||
(ex/ignoring (.close conn)))
|
||||
|
||||
(let [conn (.connect ^RedisClient client ^RedisCodec codec)]
|
||||
(.setTimeout ^StatefulConnection conn ^Duration timeout)
|
||||
(reify
|
||||
IConnection
|
||||
(publish [_ topic message]
|
||||
(assert (string? topic) "expected topic to be string")
|
||||
(assert (bytes? message) "expected message to be a byte array")
|
||||
IConnection
|
||||
(-set-timeout [_ timeout]
|
||||
(.setTimeout conn ^Duration timeout))
|
||||
|
||||
(let [pcomm (.async ^StatefulRedisConnection conn)]
|
||||
(.publish ^RedisAsyncCommands pcomm ^String topic ^bytes message)))
|
||||
(-reset-timeout [_]
|
||||
(.setTimeout conn timeout))
|
||||
|
||||
(rpush [_ key payload]
|
||||
(assert (or (and (vector? payload)
|
||||
(every? bytes? payload))
|
||||
(bytes? payload)))
|
||||
(try
|
||||
(let [cmd (.sync ^StatefulRedisConnection conn)
|
||||
data (if (vector? payload) payload [payload])
|
||||
vals (make-array (. Class (forName "[B")) (count data))]
|
||||
(-get-timeout [_]
|
||||
(.getTimeout conn))
|
||||
|
||||
(loop [i 0 xs (seq data)]
|
||||
(when xs
|
||||
(aset ^"[[B" vals i ^bytes (first xs))
|
||||
(recur (inc i) (next xs))))
|
||||
IPubSubConnection
|
||||
(-add-listener [_ listener]
|
||||
(.addListener conn ^RedisPubSubListener listener))
|
||||
|
||||
(.rpush ^RedisCommands cmd
|
||||
^String key
|
||||
^"[[B" vals))
|
||||
(-subscribe [_ topics]
|
||||
(try
|
||||
(let [topics (into-array String topics)]
|
||||
(.subscribe cmd topics))
|
||||
(catch RedisCommandInterruptedException cause
|
||||
(throw (InterruptedException. (ex-message cause))))))
|
||||
|
||||
(catch RedisCommandInterruptedException cause
|
||||
(throw (InterruptedException. (ex-message cause))))))
|
||||
(-unsubscribe [_ topics]
|
||||
(try
|
||||
(let [topics (into-array String topics)]
|
||||
(.unsubscribe cmd topics))
|
||||
(catch RedisCommandInterruptedException cause
|
||||
(throw (InterruptedException. (ex-message cause)))))))
|
||||
|
||||
(blpop [_ timeout keys]
|
||||
(try
|
||||
(let [keys (into-array Object (map str keys))
|
||||
cmd (.sync ^StatefulRedisConnection conn)
|
||||
timeout (/ (double (inst-ms timeout)) 1000.0)]
|
||||
(when-let [res (.blpop ^RedisCommands cmd
|
||||
^double timeout
|
||||
^"[Ljava.lang.String;" keys)]
|
||||
(MapEntry/create
|
||||
(.getKey ^KeyValue res)
|
||||
(.getValue ^KeyValue res))))
|
||||
(catch RedisCommandInterruptedException cause
|
||||
(throw (InterruptedException. (ex-message cause))))))
|
||||
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
|
||||
;; PUBLIC API
|
||||
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
|
||||
|
||||
(eval [_ script]
|
||||
(assert (valid-script? script) "expected valid script")
|
||||
(impl-eval conn metrics script))
|
||||
|
||||
AutoCloseable
|
||||
(close [_] (shutdown client conn))))))))))
|
||||
|
||||
(defn connect
|
||||
[instance & {:as opts}]
|
||||
(assert (satisfies? IRedis instance) "expected valid redis instance")
|
||||
(-connect instance opts))
|
||||
|
||||
(defn get-or-connect
|
||||
[instance key & {:as opts}]
|
||||
(assert (satisfies? IRedis instance) "expected valid redis instance")
|
||||
(-get-or-connect instance key opts))
|
||||
(defn build-set-args
|
||||
[options]
|
||||
(reduce-kv (fn [^SetArgs args k v]
|
||||
(case k
|
||||
:ex (if (instance? Duration v)
|
||||
(.ex args ^Duration v)
|
||||
(.ex args (long v)))
|
||||
:px (.px args (long v))
|
||||
:nx (if v (.nx args) args)
|
||||
:keep-ttl (if v (.keepttl args) args)))
|
||||
(SetArgs.)
|
||||
options))
|
||||
|
||||
(defn pubsub-listener
|
||||
[& {:keys [on-message on-subscribe on-unsubscribe]}]
|
||||
@@ -320,61 +302,172 @@
|
||||
(when on-unsubscribe
|
||||
(on-unsubscribe nil topic count)))))
|
||||
|
||||
(def ^:private scripts-cache (atom {}))
|
||||
(defn connect
|
||||
[cfg & {:as options}]
|
||||
(assert (contains? cfg ::mtx/metrics) "missing ::mtx/metrics on provided system")
|
||||
(assert (contains? cfg ::client) "missing ::rds/client on provided system")
|
||||
|
||||
(defn- impl-eval
|
||||
[^StatefulRedisConnection connection metrics script]
|
||||
(let [cmd (.async ^StatefulRedisConnection connection)
|
||||
keys (into-array String (map str (::rscript/keys script)))
|
||||
vals (into-array String (map str (::rscript/vals script)))
|
||||
sname (::rscript/name script)]
|
||||
(let [state (::client cfg)
|
||||
|
||||
(letfn [(on-error [cause]
|
||||
(if (instance? io.lettuce.core.RedisNoScriptException cause)
|
||||
(do
|
||||
(l/error :hint "no script found" :name sname :cause cause)
|
||||
(->> (load-script)
|
||||
(p/mcat eval-script)))
|
||||
(if-let [on-error (::rscript/on-error script)]
|
||||
(on-error cause)
|
||||
(p/rejected cause))))
|
||||
cache (::cache state)
|
||||
client (::client state)
|
||||
timeout (or (some-> (:timeout options) ct/duration)
|
||||
(::timeout state))
|
||||
|
||||
(eval-script [sha]
|
||||
(let [tpoint (ct/tpoint)]
|
||||
(->> (.evalsha ^RedisScriptingAsyncCommands cmd
|
||||
^String sha
|
||||
^ScriptOutputType ScriptOutputType/MULTI
|
||||
^"[Ljava.lang.String;" keys
|
||||
^"[Ljava.lang.String;" vals)
|
||||
(p/fmap (fn [result]
|
||||
(let [elapsed (tpoint)]
|
||||
(mtx/run! metrics {:id :redis-eval-timing
|
||||
:labels [(name sname)]
|
||||
:val (inst-ms elapsed)})
|
||||
(l/trace :hint "eval script"
|
||||
:name (name sname)
|
||||
:sha sha
|
||||
:params (str/join "," (::rscript/vals script))
|
||||
:elapsed (ct/format-duration elapsed))
|
||||
result)))
|
||||
(p/merr on-error))))
|
||||
conn (.connect ^RedisClient client
|
||||
^RedisCodec default-codec)
|
||||
cmd (.sync ^StatefulRedisConnection conn)]
|
||||
|
||||
(read-script []
|
||||
(-> script ::rscript/path io/resource slurp))
|
||||
(.setTimeout ^StatefulRedisConnection conn ^Duration timeout)
|
||||
(->Connection conn cmd timeout cache (::mtx/metrics cfg))))
|
||||
|
||||
(load-script []
|
||||
(l/trace :hint "load script" :name sname)
|
||||
(->> (.scriptLoad ^RedisScriptingAsyncCommands cmd
|
||||
^String (read-script))
|
||||
(p/fmap (fn [sha]
|
||||
(swap! scripts-cache assoc sname sha)
|
||||
sha))))]
|
||||
(defn connect-pubsub
|
||||
[cfg & {:as options}]
|
||||
(let [state (::client cfg)
|
||||
client (::client state)
|
||||
|
||||
(p/await!
|
||||
(if-let [sha (get @scripts-cache sname)]
|
||||
(eval-script sha)
|
||||
(->> (load-script)
|
||||
(p/mapcat eval-script)))))))
|
||||
timeout (or (some-> (:timeout options) ct/duration)
|
||||
(::timeout state))
|
||||
conn (.connectPubSub ^RedisClient client
|
||||
^RedisCodec default-codec)
|
||||
cmd (.sync ^StatefulRedisPubSubConnection conn)]
|
||||
|
||||
|
||||
(.setTimeout ^StatefulRedisPubSubConnection conn
|
||||
^Duration timeout)
|
||||
(->SubscriptionConnection conn cmd timeout)))
|
||||
|
||||
(defn get
|
||||
[conn key]
|
||||
(assert (string? key) "key must be string instance")
|
||||
(try
|
||||
(-get conn key)
|
||||
(catch RedisCommandTimeoutException cause
|
||||
(l/err :hint "timeout on get redis key" :key key :cause cause)
|
||||
nil)))
|
||||
|
||||
(defn set
|
||||
([conn key val]
|
||||
(set conn key val nil))
|
||||
([conn key val args]
|
||||
(assert (string? key) "key must be string instance")
|
||||
(assert (string? val) "val must be string instance")
|
||||
(let [args (cond
|
||||
(or (instance? SetArgs args)
|
||||
(nil? args))
|
||||
args
|
||||
|
||||
(map? args)
|
||||
(build-set-args args)
|
||||
|
||||
:else
|
||||
(throw (IllegalArgumentException. "invalid args")))]
|
||||
|
||||
(try
|
||||
(-set conn key val args)
|
||||
(catch RedisCommandTimeoutException cause
|
||||
(l/err :hint "timeout on set redis key" :key key :cause cause)
|
||||
nil)))))
|
||||
|
||||
(defn del
|
||||
[conn key-or-keys]
|
||||
(let [keys (if (vector? key-or-keys) key-or-keys [key-or-keys])]
|
||||
(assert (every? string? keys) "only string keys allowed")
|
||||
(try
|
||||
(-del conn keys)
|
||||
(catch RedisCommandTimeoutException cause
|
||||
(l/err :hint "timeout on del redis key" :key key :cause cause)
|
||||
nil))))
|
||||
|
||||
(defn ping
|
||||
[conn]
|
||||
(-ping conn))
|
||||
|
||||
(defn blpop
|
||||
[conn key-or-keys timeout]
|
||||
(let [keys (if (vector? key-or-keys) key-or-keys [key-or-keys])
|
||||
timeout (cond
|
||||
(ct/duration? timeout)
|
||||
(/ (double (inst-ms timeout)) 1000.0)
|
||||
|
||||
(double? timeout)
|
||||
timeout
|
||||
|
||||
(int? timeout)
|
||||
(/ (double timeout) 1000.0)
|
||||
|
||||
:else
|
||||
0)]
|
||||
|
||||
(assert (every? string? keys) "only string keys allowed")
|
||||
(-blpop conn keys timeout)))
|
||||
|
||||
(defn rpush
|
||||
[conn key elements]
|
||||
(assert (string? key) "key must be string instance")
|
||||
(assert (every? string? elements) "elements should be all strings")
|
||||
(let [elements (vec elements)]
|
||||
(-rpush conn key elements)))
|
||||
|
||||
(defn publish
|
||||
[conn topic payload]
|
||||
(assert (string? topic) "expected topic to be string")
|
||||
(assert (string? payload) "expected message to be a byte array")
|
||||
(-publish conn topic payload))
|
||||
|
||||
(def ^:private schema:script
|
||||
[:map {:title "script"}
|
||||
[::rscript/name qualified-keyword?]
|
||||
[::rscript/path ::sm/text]
|
||||
[::rscript/keys {:optional true} [:vector :any]]
|
||||
[::rscript/vals {:optional true} [:vector :any]]])
|
||||
|
||||
(def ^:private valid-script?
|
||||
(sm/lazy-validator schema:script))
|
||||
|
||||
(defn eval
|
||||
[conn script]
|
||||
(assert (valid-script? script) "expected valid script")
|
||||
(-eval conn script))
|
||||
|
||||
(defn add-listener
|
||||
[conn listener]
|
||||
(let [listener (cond
|
||||
(map? listener)
|
||||
(pubsub-listener listener)
|
||||
|
||||
(instance? RedisPubSubListener listener)
|
||||
listener
|
||||
|
||||
:else
|
||||
(throw (IllegalArgumentException. "invalid listener provided")))]
|
||||
|
||||
(-add-listener conn listener)))
|
||||
|
||||
(defn subscribe
|
||||
[conn topic-or-topics]
|
||||
(let [topics (if (vector? topic-or-topics) topic-or-topics [topic-or-topics])]
|
||||
(assert (every? string? topics))
|
||||
(-subscribe conn topics)))
|
||||
|
||||
(defn unsubscribe
|
||||
[conn topic-or-topics]
|
||||
(let [topics (if (vector? topic-or-topics) topic-or-topics [topic-or-topics])]
|
||||
(assert (every? string? topics))
|
||||
(-unsubscribe conn topics)))
|
||||
|
||||
(defn set-timeout
|
||||
[conn timeout]
|
||||
(let [timeout (ct/duration timeout)]
|
||||
(-set-timeout conn timeout)))
|
||||
|
||||
(defn get-timeout
|
||||
[conn]
|
||||
(-get-timeout conn))
|
||||
|
||||
(defn reset-timeout
|
||||
[conn]
|
||||
(-reset-timeout conn))
|
||||
|
||||
(defn timeout-exception?
|
||||
[cause]
|
||||
@@ -383,3 +476,121 @@
|
||||
(defn exception?
|
||||
[cause]
|
||||
(instance? RedisException cause))
|
||||
|
||||
(defn get-pooled
|
||||
[cfg]
|
||||
(let [pool (::pool cfg)]
|
||||
(gpool/get pool)))
|
||||
|
||||
(defn close
|
||||
[o]
|
||||
(.close ^AutoCloseable o))
|
||||
|
||||
(defn pool
|
||||
[cfg & {:as options}]
|
||||
(gpool/create :create-fn (partial connect cfg options)
|
||||
:destroy-fn close
|
||||
:dispose-fn -reset-timeout))
|
||||
|
||||
(defn run!
|
||||
[cfg f & args]
|
||||
(if (gpool/pool? cfg)
|
||||
(apply f {::pool cfg} f args)
|
||||
(let [pool (::pool cfg)]
|
||||
(with-open [^AutoCloseable conn (gpool/get pool)]
|
||||
(apply f (assoc cfg ::conn @conn) args)))))
|
||||
|
||||
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
|
||||
;; INITIALIZATION
|
||||
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
|
||||
|
||||
(defmethod ig/expand-key ::client
|
||||
[k v]
|
||||
{k (-> (d/without-nils v)
|
||||
(assoc ::timeout (ct/duration "10s")))})
|
||||
|
||||
(def ^:private schema:client
|
||||
[:map {:title "RedisClient"}
|
||||
[::timer [:fn #(instance? HashedWheelTimer %)]]
|
||||
[::cache ::sm/atom]
|
||||
[::timeout ::ct/duration]
|
||||
[::resources [:fn #(instance? DefaultClientResources %)]]])
|
||||
|
||||
(def check-client
|
||||
(sm/check-fn schema:client))
|
||||
|
||||
(sm/register! ::client schema:client)
|
||||
(sm/register!
|
||||
{:type ::pool
|
||||
:pred gpool/pool?})
|
||||
|
||||
(def ^:private schema:client-params
|
||||
[:map {:title "redis-params"}
|
||||
::wrk/netty-io-executor
|
||||
::wrk/netty-executor
|
||||
[::uri ::sm/uri]
|
||||
[::timeout ::ct/duration]])
|
||||
|
||||
(def ^:private check-client-params
|
||||
(sm/check-fn schema:client-params))
|
||||
|
||||
(defmethod ig/assert-key ::client
|
||||
[_ params]
|
||||
(check-client-params params))
|
||||
|
||||
(defmethod ig/init-key ::client
|
||||
[_ {:keys [::uri ::wrk/netty-io-executor ::wrk/netty-executor] :as params}]
|
||||
|
||||
(l/inf :hint "initialize redis client" :uri (str uri))
|
||||
|
||||
(let [timer (HashedWheelTimer.)
|
||||
cache (atom {})
|
||||
|
||||
resources (.. (DefaultClientResources/builder)
|
||||
(eventExecutorGroup ^EventExecutorGroup netty-executor)
|
||||
|
||||
;; We provide lettuce with a shared event loop
|
||||
;; group instance instead of letting lettuce to
|
||||
;; create its own
|
||||
(eventLoopGroupProvider
|
||||
(reify io.lettuce.core.resource.EventLoopGroupProvider
|
||||
(allocate [_ _] netty-io-executor)
|
||||
(threadPoolSize [_]
|
||||
(.executorCount ^NioEventLoopGroup netty-io-executor))
|
||||
(release [_ _ _ _ _]
|
||||
;; Do nothing
|
||||
)
|
||||
(shutdown [_ _ _ _]
|
||||
;; Do nothing
|
||||
)))
|
||||
|
||||
(timer ^Timer timer)
|
||||
(build))
|
||||
|
||||
redis-uri (RedisURI/create ^String (str uri))
|
||||
client (RedisClient/create ^ClientResources resources
|
||||
^RedisURI redis-uri)]
|
||||
|
||||
{::client client
|
||||
::cache cache
|
||||
::timer timer
|
||||
::timeout default-timeout
|
||||
::resources resources}))
|
||||
|
||||
(defmethod ig/halt-key! ::client
|
||||
[_ {:keys [::client ::timer ::resources]}]
|
||||
(ex/ignoring (.shutdown ^RedisClient client))
|
||||
(ex/ignoring (.shutdown ^ClientResources resources))
|
||||
(ex/ignoring (.stop ^Timer timer)))
|
||||
|
||||
(defmethod ig/assert-key ::pool
|
||||
[_ {:keys [::client]}]
|
||||
(check-client client))
|
||||
|
||||
(defmethod ig/init-key ::pool
|
||||
[_ cfg]
|
||||
(pool cfg {:timeout (ct/duration 2000)}))
|
||||
|
||||
(defmethod ig/halt-key! ::pool
|
||||
[_ instance]
|
||||
(.close ^java.lang.AutoCloseable instance))
|
||||
|
||||
@@ -23,6 +23,7 @@
|
||||
[app.main :as-alias main]
|
||||
[app.metrics :as mtx]
|
||||
[app.msgbus :as-alias mbus]
|
||||
[app.redis :as rds]
|
||||
[app.rpc.climit :as climit]
|
||||
[app.rpc.cond :as cond]
|
||||
[app.rpc.helpers :as rph]
|
||||
@@ -261,6 +262,7 @@
|
||||
::session/manager
|
||||
::http.client/client
|
||||
::db/pool
|
||||
::rds/pool
|
||||
::mbus/msgbus
|
||||
::sto/storage
|
||||
::mtx/metrics
|
||||
|
||||
@@ -66,13 +66,6 @@
|
||||
[integrant.core :as ig]
|
||||
[promesa.exec :as px]))
|
||||
|
||||
(def ^:private default-timeout
|
||||
(ct/duration 400))
|
||||
|
||||
(def ^:private default-options
|
||||
{:codec rds/string-codec
|
||||
:timeout default-timeout})
|
||||
|
||||
(def ^:private bucket-rate-limit-script
|
||||
{::rscript/name ::bucket-rate-limit
|
||||
::rscript/path "app/rpc/rlimit/bucket.lua"})
|
||||
@@ -177,11 +170,11 @@
|
||||
:hint (str/ffmt "looks like '%' does not have a valid format" opts))))
|
||||
|
||||
(defmethod process-limit :bucket
|
||||
[redis user-id now {:keys [::key ::params ::service ::capacity ::interval ::rate] :as limit}]
|
||||
[rconn user-id now {:keys [::key ::params ::service ::capacity ::interval ::rate] :as limit}]
|
||||
(let [script (-> bucket-rate-limit-script
|
||||
(assoc ::rscript/keys [(str key "." service "." user-id)])
|
||||
(assoc ::rscript/vals (conj params (->seconds now))))
|
||||
result (rds/eval redis script)
|
||||
result (rds/eval rconn script)
|
||||
allowed? (boolean (nth result 0))
|
||||
remaining (nth result 1)
|
||||
reset (* (/ (inst-ms interval) rate)
|
||||
@@ -199,13 +192,13 @@
|
||||
(assoc ::lresult/remaining remaining))))
|
||||
|
||||
(defmethod process-limit :window
|
||||
[redis user-id now {:keys [::nreq ::unit ::key ::service] :as limit}]
|
||||
[rconn user-id now {:keys [::nreq ::unit ::key ::service] :as limit}]
|
||||
(let [ts (ct/truncate now unit)
|
||||
ttl (ct/diff now (ct/plus ts {unit 1}))
|
||||
script (-> window-rate-limit-script
|
||||
(assoc ::rscript/keys [(str key "." service "." user-id "." (ct/format-inst ts))])
|
||||
(assoc ::rscript/vals [nreq (->seconds ttl)]))
|
||||
result (rds/eval redis script)
|
||||
result (rds/eval rconn script)
|
||||
allowed? (boolean (nth result 0))
|
||||
remaining (nth result 1)]
|
||||
(l/trace :hint "limit processed"
|
||||
@@ -220,9 +213,9 @@
|
||||
(assoc ::lresult/remaining remaining)
|
||||
(assoc ::lresult/reset (ct/plus ts {unit 1})))))
|
||||
|
||||
(defn- process-limits!
|
||||
[redis user-id limits now]
|
||||
(let [results (into [] (map (partial process-limit redis user-id now)) limits)
|
||||
(defn- process-limits
|
||||
[rconn user-id limits now]
|
||||
(let [results (into [] (map (partial process-limit rconn user-id now)) limits)
|
||||
remaining (->> results
|
||||
(d/index-by ::name ::lresult/remaining)
|
||||
(uri/map->query-string))
|
||||
@@ -259,34 +252,25 @@
|
||||
(some-> request inet/parse-request)
|
||||
uuid/zero)))
|
||||
|
||||
(defn process-request!
|
||||
[{:keys [::rpc/rlimit ::rds/redis ::skey ::sname] :as cfg} params]
|
||||
(when-let [limits (get-limits rlimit skey sname)]
|
||||
(let [redis (rds/get-or-connect redis ::rpc/rlimit default-options)
|
||||
uid (get-uid params)
|
||||
;; FIXME: why not clasic try/catch?
|
||||
result (ex/try! (process-limits! redis uid limits (ct/now)))]
|
||||
|
||||
(l/trc :hint "process-limits"
|
||||
:service sname
|
||||
:remaining (::remaingin result)
|
||||
:reset (::reset result))
|
||||
|
||||
(cond
|
||||
(ex/exception? result)
|
||||
(do
|
||||
(l/error :hint "error on processing rate-limit" :cause result)
|
||||
{::enabled false})
|
||||
|
||||
(contains? cf/flags :soft-rpc-rlimit)
|
||||
(defn- process-request'
|
||||
[{:keys [::rds/conn] :as cfg} limits params]
|
||||
(try
|
||||
(let [uid (get-uid params)
|
||||
result (process-limits conn uid limits (ct/now))]
|
||||
(if (contains? cf/flags :soft-rpc-rlimit)
|
||||
{::enabled false}
|
||||
result))
|
||||
(catch Throwable cause
|
||||
(l/error :hint "error on processing rate-limit" :cause cause)
|
||||
{::enabled false})))
|
||||
|
||||
:else
|
||||
result))))
|
||||
(defn- process-request
|
||||
[{:keys [::rpc/rlimit ::skey ::sname] :as cfg} params]
|
||||
(when-let [limits (get-limits rlimit skey sname)]
|
||||
(rds/run! cfg process-request' limits params)))
|
||||
|
||||
(defn wrap
|
||||
[{:keys [::rpc/rlimit ::rds/redis] :as cfg} f mdata]
|
||||
(assert (rds/redis? redis) "expected a valid redis instance")
|
||||
[{:keys [::rpc/rlimit] :as cfg} f mdata]
|
||||
(assert (or (nil? rlimit) (valid-rlimit-instance? rlimit)) "expected a valid rlimit instance")
|
||||
|
||||
(if rlimit
|
||||
@@ -298,7 +282,7 @@
|
||||
|
||||
(fn [hcfg params]
|
||||
(if @enabled
|
||||
(let [result (process-request! cfg params)]
|
||||
(let [result (process-request cfg params)]
|
||||
(if (::enabled result)
|
||||
(if (::allowed result)
|
||||
(-> (f hcfg params)
|
||||
@@ -399,7 +383,7 @@
|
||||
(when-let [path (cf/get :rpc-rlimit-config)]
|
||||
(and (fs/exists? path) (fs/regular-file? path) path)))
|
||||
|
||||
(defmethod ig/assert-key :app.rpc/rlimit
|
||||
(defmethod ig/assert-key ::rpc/rlimit
|
||||
[_ {:keys [::wrk/executor]}]
|
||||
(assert (sm/valid? ::wrk/executor executor) "expect valid executor"))
|
||||
|
||||
|
||||
@@ -7,7 +7,6 @@
|
||||
(ns app.worker.dispatcher
|
||||
(:require
|
||||
[app.common.data :as d]
|
||||
[app.common.data.macros :as dm]
|
||||
[app.common.logging :as l]
|
||||
[app.common.schema :as sm]
|
||||
[app.common.time :as ct]
|
||||
@@ -18,7 +17,9 @@
|
||||
[app.worker :as-alias wrk]
|
||||
[cuerdas.core :as str]
|
||||
[integrant.core :as ig]
|
||||
[promesa.exec :as px]))
|
||||
[promesa.exec :as px])
|
||||
(:import
|
||||
java.lang.AutoCloseable))
|
||||
|
||||
(set! *warn-on-reflection* true)
|
||||
|
||||
@@ -27,7 +28,7 @@
|
||||
[::wrk/tenant ::sm/text]
|
||||
::mtx/metrics
|
||||
::db/pool
|
||||
::rds/redis])
|
||||
::rds/client])
|
||||
|
||||
(defmethod ig/expand-key ::wrk/dispatcher
|
||||
[k v]
|
||||
@@ -49,59 +50,74 @@
|
||||
limit ?
|
||||
for update skip locked")
|
||||
|
||||
(defmethod ig/init-key ::wrk/dispatcher
|
||||
[_ {:keys [::db/pool ::rds/redis ::wrk/tenant ::batch-size ::timeout] :as cfg}]
|
||||
(letfn [(get-tasks [conn]
|
||||
(let [prefix (str tenant ":%")]
|
||||
(seq (db/exec! conn [sql:select-next-tasks prefix batch-size]))))
|
||||
(def ^:private sql:mark-task-scheduled
|
||||
"UPDATE task SET status = 'scheduled'
|
||||
WHERE id = ANY(?)")
|
||||
|
||||
(push-tasks! [conn rconn [queue tasks]]
|
||||
(defmethod ig/init-key ::wrk/dispatcher
|
||||
[_ {:keys [::db/pool ::wrk/tenant ::batch-size ::timeout] :as cfg}]
|
||||
(letfn [(get-tasks [{:keys [::db/conn]}]
|
||||
(let [prefix (str tenant ":%")]
|
||||
(not-empty (db/exec! conn [sql:select-next-tasks prefix batch-size]))))
|
||||
|
||||
(mark-as-scheduled [{:keys [::db/conn]} ids]
|
||||
(let [sql [sql:mark-task-scheduled
|
||||
(db/create-array conn "uuid" ids)]]
|
||||
(db/exec-one! conn sql)))
|
||||
|
||||
(push-tasks [{:keys [::rds/conn] :as cfg} [queue tasks]]
|
||||
(let [ids (mapv :id tasks)
|
||||
key (str/ffmt "taskq:%" queue)
|
||||
res (rds/rpush rconn key (mapv t/encode ids))
|
||||
sql [(str "update task set status = 'scheduled'"
|
||||
" where id = ANY(?)")
|
||||
(db/create-array conn "uuid" ids)]]
|
||||
res (rds/rpush conn key (mapv t/encode-str ids))]
|
||||
|
||||
(db/exec-one! conn sql)
|
||||
(mark-as-scheduled cfg ids)
|
||||
(l/trc :hist "enqueue tasks on redis"
|
||||
:queue queue
|
||||
:tasks (count ids)
|
||||
:queued res)))
|
||||
|
||||
(run-batch! [rconn]
|
||||
(try
|
||||
(db/tx-run! cfg (fn [{:keys [::db/conn]}]
|
||||
(if-let [tasks (get-tasks conn)]
|
||||
(->> (group-by :queue tasks)
|
||||
(run! (partial push-tasks! conn rconn)))
|
||||
;; FIXME: this sleep should be outside the transaction
|
||||
(px/sleep (::wait-duration cfg)))))
|
||||
(catch InterruptedException cause
|
||||
(throw cause))
|
||||
(catch Exception cause
|
||||
(cond
|
||||
(rds/exception? cause)
|
||||
(do
|
||||
(l/wrn :hint "redis exception (will retry in an instant)" :cause cause)
|
||||
(px/sleep timeout))
|
||||
(run-batch' [cfg]
|
||||
(if-let [tasks (get-tasks cfg)]
|
||||
(->> (group-by :queue tasks)
|
||||
(run! (partial push-tasks cfg)))
|
||||
::wait))
|
||||
|
||||
(db/sql-exception? cause)
|
||||
(do
|
||||
(l/wrn :hint "database exception (will retry in an instant)" :cause cause)
|
||||
(px/sleep timeout))
|
||||
(run-batch []
|
||||
(let [rconn (rds/connect cfg)]
|
||||
(try
|
||||
(-> cfg
|
||||
(assoc ::rds/conn rconn)
|
||||
(db/tx-run! run-batch'))
|
||||
|
||||
:else
|
||||
(do
|
||||
(l/err :hint "unhandled exception (will retry in an instant)" :cause cause)
|
||||
(px/sleep timeout))))))
|
||||
(catch InterruptedException cause
|
||||
(throw cause))
|
||||
(catch Exception cause
|
||||
(cond
|
||||
(rds/exception? cause)
|
||||
(do
|
||||
(l/wrn :hint "redis exception (will retry in an instant)" :cause cause)
|
||||
(px/sleep timeout))
|
||||
|
||||
(db/sql-exception? cause)
|
||||
(do
|
||||
(l/wrn :hint "database exception (will retry in an instant)" :cause cause)
|
||||
(px/sleep timeout))
|
||||
|
||||
:else
|
||||
(do
|
||||
(l/err :hint "unhandled exception (will retry in an instant)" :cause cause)
|
||||
(px/sleep timeout))))
|
||||
|
||||
(finally
|
||||
(.close ^AutoCloseable rconn)))))
|
||||
|
||||
(dispatcher []
|
||||
(l/inf :hint "started")
|
||||
(try
|
||||
(dm/with-open [rconn (rds/connect redis)]
|
||||
(loop []
|
||||
(run-batch! rconn)
|
||||
(loop []
|
||||
(let [result (run-batch)]
|
||||
(when (= result ::wait)
|
||||
(px/sleep (::wait-duration cfg)))
|
||||
(recur)))
|
||||
(catch InterruptedException _
|
||||
(l/trc :hint "interrupted"))
|
||||
|
||||
@@ -8,7 +8,6 @@
|
||||
"Async tasks abstraction (impl)."
|
||||
(:require
|
||||
[app.common.data :as d]
|
||||
[app.common.data.macros :as dm]
|
||||
[app.common.exceptions :as ex]
|
||||
[app.common.logging :as l]
|
||||
[app.common.schema :as sm]
|
||||
@@ -20,7 +19,9 @@
|
||||
[app.worker :as wrk]
|
||||
[cuerdas.core :as str]
|
||||
[integrant.core :as ig]
|
||||
[promesa.exec :as px]))
|
||||
[promesa.exec :as px])
|
||||
(:import
|
||||
java.lang.AutoCloseable))
|
||||
|
||||
(set! *warn-on-reflection* true)
|
||||
|
||||
@@ -76,6 +77,7 @@
|
||||
:queue queue
|
||||
:runner-id id
|
||||
:retry (:retry-num task))
|
||||
|
||||
(let [tpoint (ct/tpoint)
|
||||
task-fn (wrk/get-task registry (:name task))
|
||||
result (when task-fn (task-fn task))
|
||||
@@ -149,7 +151,7 @@
|
||||
{::task task})))))
|
||||
|
||||
(defn- run-worker-loop!
|
||||
[{:keys [::db/pool ::rds/rconn ::timeout ::queue] :as cfg}]
|
||||
[{:keys [::db/pool ::rds/conn ::timeout ::queue] :as cfg}]
|
||||
(letfn [(handle-task-retry [{:keys [error inc-by delay] :or {inc-by 1 delay 1000} :as result}]
|
||||
(let [explain (if (ex/exception? error)
|
||||
(ex-message error)
|
||||
@@ -187,9 +189,9 @@
|
||||
{:id (:id task)})
|
||||
nil))
|
||||
|
||||
(decode-payload [^bytes payload]
|
||||
(decode-payload [payload]
|
||||
(try
|
||||
(let [task-id (t/decode payload)]
|
||||
(let [task-id (t/decode-str payload)]
|
||||
(if (uuid? task-id)
|
||||
task-id
|
||||
(l/err :hint "received unexpected payload (uuid expected)"
|
||||
@@ -197,7 +199,7 @@
|
||||
(catch Throwable cause
|
||||
(l/err :hint "unable to decode payload"
|
||||
:payload payload
|
||||
:length (alength payload)
|
||||
:length (alength ^String/1 payload)
|
||||
:cause cause))))
|
||||
|
||||
(process-result [{:keys [status] :as result}]
|
||||
@@ -224,8 +226,8 @@
|
||||
:cause cause))))))]
|
||||
|
||||
(try
|
||||
(let [key (str/ffmt "taskq:%" queue)
|
||||
[_ payload] (rds/blpop rconn timeout [key])]
|
||||
(let [key (str/ffmt "taskq:%" queue)
|
||||
[_ payload] (rds/blpop conn [key] timeout)]
|
||||
(some-> payload
|
||||
decode-payload
|
||||
run-task-loop))
|
||||
@@ -244,36 +246,37 @@
|
||||
(l/err :hint "unhandled exception" :cause cause))))))
|
||||
|
||||
(defn- start-thread!
|
||||
[{:keys [::rds/redis ::id ::queue ::wrk/tenant] :as cfg}]
|
||||
[{:keys [::id ::queue ::wrk/tenant] :as cfg}]
|
||||
(px/thread
|
||||
{:name (str "penpot/worker-runner/" id)}
|
||||
{:name (str "penpot/job-runner/" id)}
|
||||
(l/inf :hint "started" :id id :queue queue)
|
||||
(try
|
||||
(dm/with-open [rconn (rds/connect redis)]
|
||||
(let [cfg (-> cfg
|
||||
(assoc ::rds/rconn rconn)
|
||||
(assoc ::queue (str/ffmt "%:%" tenant queue))
|
||||
(assoc ::timeout (ct/duration "5s")))]
|
||||
(loop []
|
||||
(when (px/interrupted?)
|
||||
(throw (InterruptedException. "interrupted")))
|
||||
|
||||
(run-worker-loop! cfg)
|
||||
(recur))))
|
||||
(let [rconn (rds/connect cfg)]
|
||||
(try
|
||||
(loop [cfg (-> cfg
|
||||
(assoc ::rds/conn rconn)
|
||||
(assoc ::queue (str/ffmt "%:%" tenant queue))
|
||||
(assoc ::timeout (ct/duration "5s")))]
|
||||
(when (px/interrupted?)
|
||||
(throw (InterruptedException. "interrupted")))
|
||||
|
||||
(catch InterruptedException _
|
||||
(l/dbg :hint "interrupted"
|
||||
:id id
|
||||
:queue queue))
|
||||
(catch Throwable cause
|
||||
(l/err :hint "unexpected exception"
|
||||
:id id
|
||||
:queue queue
|
||||
:cause cause))
|
||||
(finally
|
||||
(l/inf :hint "terminated"
|
||||
:id id
|
||||
:queue queue)))))
|
||||
(run-worker-loop! cfg)
|
||||
(recur cfg))
|
||||
|
||||
(catch InterruptedException _
|
||||
(l/dbg :hint "interrupted"
|
||||
:id id
|
||||
:queue queue))
|
||||
(catch Throwable cause
|
||||
(l/err :hint "unexpected exception"
|
||||
:id id
|
||||
:queue queue
|
||||
:cause cause))
|
||||
(finally
|
||||
(.close ^AutoCloseable rconn)
|
||||
(l/inf :hint "terminated"
|
||||
:id id
|
||||
:queue queue))))))
|
||||
|
||||
(def ^:private schema:params
|
||||
[:map
|
||||
@@ -283,7 +286,7 @@
|
||||
::wrk/registry
|
||||
::mtx/metrics
|
||||
::db/pool
|
||||
::rds/redis])
|
||||
::rds/client])
|
||||
|
||||
(defmethod ig/assert-key ::wrk/runner
|
||||
[_ params]
|
||||
|
||||
@@ -97,7 +97,7 @@
|
||||
:thumbnail-uri "test"
|
||||
:path (-> "backend_tests/test_files/template.penpot" io/resource fs/path)}]
|
||||
system (-> (merge main/system-config main/worker-config)
|
||||
(assoc-in [:app.redis/redis :app.redis/uri] (:redis-uri config))
|
||||
(assoc-in [:app.redis/client :app.redis/uri] (:redis-uri config))
|
||||
(assoc-in [::db/pool ::db/uri] (:database-uri config))
|
||||
(assoc-in [::db/pool ::db/username] (:database-username config))
|
||||
(assoc-in [::db/pool ::db/password] (:database-password config))
|
||||
|
||||
@@ -6,6 +6,8 @@
|
||||
org.clojure/data.fressian {:mvn/version "1.1.0"}
|
||||
org.clojure/clojurescript {:mvn/version "1.12.42"}
|
||||
|
||||
org.apache.commons/commons-pool2 {:mvn/version "2.12.1"}
|
||||
|
||||
;; Logging
|
||||
org.apache.logging.log4j/log4j-api {:mvn/version "2.25.1"}
|
||||
org.apache.logging.log4j/log4j-core {:mvn/version "2.25.1"}
|
||||
|
||||
58
common/src/app/common/generic_pool.clj
Normal file
58
common/src/app/common/generic_pool.clj
Normal file
@@ -0,0 +1,58 @@
|
||||
;; This Source Code Form is subject to the terms of the Mozilla Public
|
||||
;; License, v. 2.0. If a copy of the MPL was not distributed with this
|
||||
;; file, You can obtain one at http://mozilla.org/MPL/2.0/.
|
||||
;;
|
||||
;; Copyright (c) KALEIDOS INC
|
||||
|
||||
(ns app.common.generic-pool
|
||||
(:refer-clojure :exclude [get])
|
||||
(:import
|
||||
java.lang.AutoCloseable
|
||||
org.apache.commons.pool2.ObjectPool
|
||||
org.apache.commons.pool2.PooledObject
|
||||
org.apache.commons.pool2.PooledObjectFactory
|
||||
org.apache.commons.pool2.impl.DefaultPooledObject
|
||||
org.apache.commons.pool2.impl.SoftReferenceObjectPool))
|
||||
|
||||
(defn pool?
|
||||
[o]
|
||||
(instance? ObjectPool o))
|
||||
|
||||
(defn create
|
||||
[& {:keys [create-fn destroy-fn validate-fn dispose-fn]}]
|
||||
(SoftReferenceObjectPool.
|
||||
(reify PooledObjectFactory
|
||||
(activateObject [_ _])
|
||||
(destroyObject [_ o]
|
||||
(let [object (.getObject ^PooledObject o)]
|
||||
(destroy-fn object)))
|
||||
|
||||
(destroyObject [_ o _]
|
||||
(let [object (.getObject ^PooledObject o)]
|
||||
(destroy-fn object)))
|
||||
|
||||
(passivateObject [_ o]
|
||||
(when (fn? dispose-fn)
|
||||
(let [object (.getObject ^PooledObject o)]
|
||||
(dispose-fn object))))
|
||||
|
||||
(validateObject [_ o]
|
||||
(if (fn? validate-fn)
|
||||
(let [object (.getObject ^PooledObject o)]
|
||||
(validate-fn object))
|
||||
true))
|
||||
|
||||
(makeObject [_]
|
||||
(let [object (create-fn)]
|
||||
(DefaultPooledObject. object))))))
|
||||
|
||||
(defn get
|
||||
[^ObjectPool pool]
|
||||
(let [object (.borrowObject pool)]
|
||||
(reify
|
||||
clojure.lang.IDeref
|
||||
(deref [_] object)
|
||||
|
||||
AutoCloseable
|
||||
(close [_]
|
||||
(.returnObject pool object)))))
|
||||
Reference in New Issue
Block a user