Files
penpot/backend/src/app/rpc/rlimit.clj
Andrey Antukh 88fb5e7ab5 ♻️ Update integrant to latest version
This upgrade also includes complete elimination of use spec
from the backend codebase, completing the long running migration
to fully use malli for validation and decoding.
2024-11-13 19:09:19 +01:00

419 lines
14 KiB
Clojure

;; This Source Code Form is subject to the terms of the Mozilla Public
;; License, v. 2.0. If a copy of the MPL was not distributed with this
;; file, You can obtain one at http://mozilla.org/MPL/2.0/.
;;
;; Copyright (c) KALEIDOS INC
(ns app.rpc.rlimit
"Rate limit strategies implementation for RPC services.
It mainly implements two strategies: fixed window and bucket. You
can use one of them or both to create a combination of limits. All
limits are updated in each request and the most restrictive one
blocks the user activity.
On the HTTP layer it translates to the 429 http response.
The limits are defined as vector of 3 elements:
[<name:keyword> <strategy:keyword> <opts:string>]
The opts format is strategy dependent. With fixed `:window` strategy
you have the following format:
[:somename :window \"1000/m\"]
Where the first number means the quantity of allowed request and the
letter indicates the window unit, that can be `w` for weeks, `h` for
hours, `m` for minutes and `s` for seconds.
The the `:bucket` strategy you will have something like this:
[:somename :bucket \"100/10/15s]
Where the first number indicates the total tokens capacity (or
available burst), the second number indicates the refill rate and
the last number suffixed with the unit indicates the time window (or
interval) of the refill. This means that this limit configurations
allow burst of 100 elements and will refill 10 tokens each 15s (1
token each 1.5segons).
The bucket strategy works well for small intervals and window
strategy works better for large intervals.
All limits uses the profile-id as user identifier. In case of the
profile-id is not available, the IP address is used as fallback
value.
"
(:require
[app.common.data :as d]
[app.common.exceptions :as ex]
[app.common.logging :as l]
[app.common.schema :as sm]
[app.common.uri :as uri]
[app.common.uuid :as uuid]
[app.config :as cf]
[app.http :as-alias http]
[app.redis :as rds]
[app.redis.script :as-alias rscript]
[app.rpc :as-alias rpc]
[app.rpc.helpers :as rph]
[app.rpc.rlimit.result :as-alias lresult]
[app.util.inet :as inet]
[app.util.services :as-alias sv]
[app.util.time :as dt]
[app.worker :as wrk]
[clojure.edn :as edn]
[cuerdas.core :as str]
[datoteka.fs :as fs]
[integrant.core :as ig]
[promesa.exec :as px]))
(def ^:private default-timeout
(dt/duration 400))
(def ^:private default-options
{:codec rds/string-codec
:timeout default-timeout})
(def ^:private bucket-rate-limit-script
{::rscript/name ::bucket-rate-limit
::rscript/path "app/rpc/rlimit/bucket.lua"})
(def ^:private window-rate-limit-script
{::rscript/name ::window-rate-limit
::rscript/path "app/rpc/rlimit/window.lua"})
(def enabled
"Allows on runtime completely disable rate limiting."
(atom true))
(def ^:private window-opts-re
#"^(\d+)/([wdhms])$")
(def ^:private bucket-opts-re
#"^(\d+)/(\d+)/(\d+[hms])$")
(defmulti parse-limit (fn [[_ strategy _]] strategy))
(defmulti process-limit (fn [_ _ _ o] (::strategy o)))
(sm/register!
{:type ::rpc/rlimit
:pred #(instance? clojure.lang.Agent %)})
(def ^:private schema:strategy
[:enum :window :bucket])
(def ^:private schema:limit-tuple
[:tuple :keyword schema:strategy :string])
(def ^:private schema:limit
[:and
[:map
[::name :any]
[::strategy schema:strategy]
[::key :string]
[::opts :string]]
[:or
[:map
[::capacity ::sm/int]
[::rate ::sm/int]
[::internal ::dt/duration]
[::params [::sm/vec :any]]]
[:map
[::nreq ::sm/int]
[::unit [:enum :days :hours :minutes :seconds :weeks]]]]])
(def ^:private schema:limits
[:map-of :keyword [::sm/vec schema:limit]])
(def ^:private valid-limit-tuple?
(sm/lazy-validator schema:limit-tuple))
(def ^:private valid-rlimit-instance?
(sm/lazy-validator ::rpc/rlimit))
(defmethod parse-limit :window
[[name strategy opts :as vlimit]]
(assert (valid-limit-tuple? vlimit) "expected valid limit tuple")
(merge
{::name name
::strategy strategy}
(if-let [[_ nreq unit] (re-find window-opts-re opts)]
(let [nreq (parse-long nreq)]
{::nreq nreq
::unit (case unit
"d" :days
"h" :hours
"m" :minutes
"s" :seconds
"w" :weeks)
::key (str "ratelimit.window." (d/name name))
::opts opts})
(ex/raise :type :validation
:code :invalid-window-limit-opts
:hint (str/ffmt "looks like '%' does not have a valid format" opts)))))
(defmethod parse-limit :bucket
[[name strategy opts :as vlimit]]
(assert (valid-limit-tuple? vlimit) "expected valid limit tuple")
(if-let [[_ capacity rate interval] (re-find bucket-opts-re opts)]
(let [interval (dt/duration interval)
rate (parse-long rate)
capacity (parse-long capacity)]
{::name name
::strategy strategy
::capacity capacity
::rate rate
::interval interval
::opts opts
::params [(dt/->seconds interval) rate capacity]
::key (str "ratelimit.bucket." (d/name name))})
(ex/raise :type :validation
:code :invalid-bucket-limit-opts
:hint (str/ffmt "looks like '%' does not have a valid format" opts))))
(defmethod process-limit :bucket
[redis user-id now {:keys [::key ::params ::service ::capacity ::interval ::rate] :as limit}]
(let [script (-> bucket-rate-limit-script
(assoc ::rscript/keys [(str key "." service "." user-id)])
(assoc ::rscript/vals (conj params (dt/->seconds now))))
result (rds/eval redis script)
allowed? (boolean (nth result 0))
remaining (nth result 1)
reset (* (/ (inst-ms interval) rate)
(- capacity remaining))]
(l/trace :hint "limit processed"
:service service
:limit (name (::name limit))
:strategy (name (::strategy limit))
:opts (::opts limit)
:allowed allowed?
:remaining remaining)
(-> limit
(assoc ::lresult/allowed allowed?)
(assoc ::lresult/reset (dt/plus now reset))
(assoc ::lresult/remaining remaining))))
(defmethod process-limit :window
[redis user-id now {:keys [::nreq ::unit ::key ::service] :as limit}]
(let [ts (dt/truncate now unit)
ttl (dt/diff now (dt/plus ts {unit 1}))
script (-> window-rate-limit-script
(assoc ::rscript/keys [(str key "." service "." user-id "." (dt/format-instant ts))])
(assoc ::rscript/vals [nreq (dt/->seconds ttl)]))
result (rds/eval redis script)
allowed? (boolean (nth result 0))
remaining (nth result 1)]
(l/trace :hint "limit processed"
:service service
:limit (name (::name limit))
:strategy (name (::strategy limit))
:opts (::opts limit)
:allowed allowed?
:remaining remaining)
(-> limit
(assoc ::lresult/allowed allowed?)
(assoc ::lresult/remaining remaining)
(assoc ::lresult/reset (dt/plus ts {unit 1})))))
(defn- process-limits!
[redis user-id limits now]
(let [results (into [] (map (partial process-limit redis user-id now)) limits)
remaining (->> results
(d/index-by ::name ::lresult/remaining)
(uri/map->query-string))
reset (->> results
(d/index-by ::name (comp dt/->seconds ::lresult/reset))
(uri/map->query-string))
rejected (d/seek (complement ::lresult/allowed) results)]
(when rejected
(l/warn :hint "rejected rate limit"
:user-id (str user-id)
:limit-service (-> rejected ::service name)
:limit-name (-> rejected ::name name)
:limit-strategy (-> rejected ::strategy name)))
{::enabled true
::allowed (not (some? rejected))
::remaingin remaining
::reset reset
::headers {"x-rate-limit-remaining" remaining
"x-rate-limit-reset" reset}}))
(defn- get-limits
[state skey sname]
(when-let [limits (or (get-in @state [::limits skey])
(get-in @state [::limits :default]))]
(into [] (map #(assoc % ::service sname)) limits)))
(defn- get-uid
[{:keys [::rpc/profile-id] :as params}]
(let [request (-> params meta ::http/request)]
(or profile-id
(some-> request inet/parse-request)
uuid/zero)))
(defn process-request!
[{:keys [::rpc/rlimit ::rds/redis ::skey ::sname] :as cfg} params]
(when-let [limits (get-limits rlimit skey sname)]
(let [redis (rds/get-or-connect redis ::rpc/rlimit default-options)
uid (get-uid params)
;; FIXME: why not clasic try/catch?
result (ex/try! (process-limits! redis uid limits (dt/now)))]
(l/trc :hint "process-limits"
:service sname
:remaining (::remaingin result)
:reset (::reset result))
(cond
(ex/exception? result)
(do
(l/error :hint "error on processing rate-limit" :cause result)
{::enabled false})
(contains? cf/flags :soft-rpc-rlimit)
{::enabled false}
:else
result))))
(defn wrap
[{:keys [::rpc/rlimit ::rds/redis] :as cfg} f mdata]
(assert (rds/redis? redis) "expected a valid redis instance")
(assert (or (nil? rlimit) (valid-rlimit-instance? rlimit)) "expected a valid rlimit instance")
(if rlimit
(let [skey (keyword (::rpc/type cfg) (->> mdata ::sv/spec name))
sname (str (::rpc/type cfg) "." (->> mdata ::sv/spec name))
cfg (-> cfg
(assoc ::skey skey)
(assoc ::sname sname))]
(fn [hcfg params]
(if @enabled
(let [result (process-request! cfg params)]
(if (::enabled result)
(if (::allowed result)
(-> (f hcfg params)
(rph/wrap)
(vary-meta update ::http/headers merge (::headers result)))
(ex/raise :type :rate-limit
:code :request-blocked
:hint "rate limit reached"
::http/headers (::headers result)))
(f hcfg params)))
(f hcfg params))))
f))
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
;; CONFIG WATCHER
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
(def ^:private schema:config
[:map-of
[:or :keyword [:set :keyword]]
[:vector schema:limit-tuple]])
(def ^:private check-config
(sm/check-fn schema:config))
(def ^:private check-refresh
(sm/check-fn ::dt/duration))
(def ^:private check-limits
(sm/check-fn schema:limits))
(defn read-config
[path]
(letfn [(compile-pass-1 [config]
(reduce-kv (fn [o k v]
(cond
(keyword? k)
(assoc o k (mapv parse-limit v))
(set? k)
(let [limits (mapv parse-limit v)]
(reduce #(assoc %1 %2 limits) o k))
:else
(throw (ex-info "invalid arguments" {}))))
{}
config))
(compile-pass-2 [config]
(let [default (:default config)]
(reduce-kv (fn [o k v]
(assoc o k (into [] (d/distinct-xf ::name) (concat v default))))
{}
config)))]
(when-let [config (some->> path slurp edn/read-string check-config)]
(let [refresh (->> config meta :refresh dt/duration check-refresh)
limits (->> config compile-pass-1 compile-pass-2 check-limits)]
{::refresh refresh
::limits limits}))))
(defn- refresh-config
[{:keys [::state ::path ::wrk/executor] :as cfg}]
(letfn [(update-config [{:keys [::updated-at] :as state}]
(let [updated-at' (fs/last-modified-time path)]
(merge state
{::updated-at updated-at'}
(when (or (nil? updated-at)
(not= (inst-ms updated-at')
(inst-ms updated-at)))
(let [state (read-config path)]
(l/info :hint "config refreshed"
:loaded-limits (count (::limits state))
::l/sync? true)
state)))))
(schedule-next [state]
(px/schedule! (inst-ms (::refresh state))
(partial refresh-config cfg))
state)]
(send-via executor state update-config)
(send-via executor state schedule-next)))
(defn- on-refresh-error
[_ cause]
(when-not (instance? java.util.concurrent.RejectedExecutionException cause)
(if-let [explain (-> cause ex-data ex/explain)]
(l/warn ::l/raw (str "unable to refresh config, invalid format:\n" explain)
::l/sync? true)
(l/warn :hint "unexpected exception on loading config"
:cause cause
::l/sync? true))))
(defn- get-config-path
[]
(when-let [path (cf/get :rpc-rlimit-config)]
(and (fs/exists? path) (fs/regular-file? path) path)))
(defmethod ig/assert-key :app.rpc/rlimit
[_ {:keys [::wrk/executor]}]
(assert (sm/valid? ::wrk/executor executor) "expect valid executor"))
(defmethod ig/init-key ::rpc/rlimit
[_ {:keys [::wrk/executor] :as cfg}]
(when (contains? cf/flags :rpc-rlimit)
(let [state (agent {})]
(set-error-handler! state on-refresh-error)
(set-error-mode! state :continue)
(when-let [path (get-config-path)]
(l/info :hint "initializing rlimit config reader" :path (str path))
;; Initialize the state with initial refresh value
(send-via executor state (constantly {::refresh (dt/duration "5s")}))
;; Force a refresh
(refresh-config (assoc cfg ::path path ::state state)))
state)))