From 097897d8da37e1a029d29d85959f82d918a4deba Mon Sep 17 00:00:00 2001 From: Andrey Antukh Date: Thu, 6 Nov 2025 15:54:22 +0100 Subject: [PATCH] :sparkles: Add better sse parser for backend tests --- backend/test/backend_tests/helpers.clj | 49 ++++++++++++++++++++++---- 1 file changed, 42 insertions(+), 7 deletions(-) diff --git a/backend/test/backend_tests/helpers.clj b/backend/test/backend_tests/helpers.clj index d504e05aa8..20f83ce455 100644 --- a/backend/test/backend_tests/helpers.clj +++ b/backend/test/backend_tests/helpers.clj @@ -549,6 +549,44 @@ (io/copy r sw) (.toString sw)))) +(defn parse-sse + [content] + (let [state + (reduce (fn [{:keys [events data event id] :as state} line] + (cond + ;; empty line → dispatch event if we have data + (str/blank? line) + (if (seq data) + (-> state + (update :events conj {:event (or event "message") + :data (-> (str/join "\n" data))}) + (assoc :data [] :event nil)) + state) + + ;; comment line (starts with :) + (str/starts-with? line ":") + state + + :else + (let [[field raw-value] (str/split line #":" 2) + value (some-> raw-value (str/replace #"^ " ""))] + (case field + "data" (update state :data conj (or value "")) + "event" (assoc state :event value) + ;; ignore retry and unknown fields + state)))) + {:events [] :data [] :event nil} + (str/split content #"\r?\n")) + + ;; handle unterminated last event (no trailing blank line) + state (if (seq (:data state)) + (update state :events conj + {:event (or (:event state) "message") + :data (str/join "\n" (:data state))}) + state)] + + (:events state))) + (defn consume-sse [callback] (let [{:keys [::yres/status ::yres/body ::yres/headers] :as response} (callback {}) @@ -558,12 +596,9 @@ (try (px/exec! :virtual #(rcp/write-body-to-stream body nil output)) (into [] - (map (fn [event] - (let [[item1 item2] (re-seq #"(.*): (.*)\n?" event)] - - [(keyword (nth item1 2)) - (tr/decode-str (nth item2 2))]))) - (-> (slurp' input) - (str/split "\n\n"))) + (map (fn [{:keys [event data]}] + [(keyword event) + (tr/decode-str data)])) + (parse-sse (slurp' input))) (finally (.close input)))))