From c77e6d410584b429df582ab0fb84104ee1e266ca Mon Sep 17 00:00:00 2001 From: Jason Tudisco Date: Fri, 13 Mar 2026 15:01:45 -0600 Subject: [PATCH] Add live SSE auto-refresh to paste app - CAN service: public SSE endpoint at /api/v1/can/0/events broadcasts new_asset events on ingest and sync push (no auth required) - Paste backend: SSE proxy at /paste/events streams from CAN service, with auto-reconnect on connection loss - Paste frontend: EventSource subscribes to /paste/events and calls loadItems() on new_asset events for instant UI refresh - When assets arrive via P2P sync, paste updates automatically Co-Authored-By: Claude Opus 4.6 --- examples/paste/Cargo.lock | 49 +++++++++++++++++++++++++++++ examples/paste/Cargo.toml | 4 ++- examples/paste/src/html.rs | 8 +++++ examples/paste/src/main.rs | 63 ++++++++++++++++++++++++++++++++++++++ src/routes/events.rs | 39 +++++++++++++++++++++++ src/routes/mod.rs | 2 ++ 6 files changed, 164 insertions(+), 1 deletion(-) create mode 100644 src/routes/events.rs diff --git a/examples/paste/Cargo.lock b/examples/paste/Cargo.lock index 4834dad..174121e 100644 --- a/examples/paste/Cargo.lock +++ b/examples/paste/Cargo.lock @@ -241,6 +241,23 @@ version = "0.3.32" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7e3450815272ef58cec6d564423f6e755e25379b217b0bc688e295ba24df6b1d" +[[package]] +name = "futures-io" +version = "0.3.32" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cecba35d7ad927e23624b22ad55235f2239cfa44fd10428eecbeba6d6a717718" + +[[package]] +name = "futures-macro" +version = "0.3.32" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e835b70203e41293343137df5c0664546da5745f82ec9b84d40be8336958447b" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "futures-sink" version = "0.3.32" @@ -260,7 +277,11 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "389ca41296e6190b48053de0321d02a77f32f8a5d2461dd38762c0593805c6d6" dependencies = [ "futures-core", + "futures-io", + "futures-macro", + "futures-sink", "futures-task", + "memchr", "pin-project-lite", "slab", ] @@ -850,11 +871,13 @@ name = "paste" version = "0.1.0" dependencies = [ "axum", + "futures-util", "open", "reqwest", "serde", "serde_json", "tokio", + "tokio-stream", "tracing", "tracing-subscriber", ] @@ -991,12 +1014,14 @@ dependencies = [ "sync_wrapper", "tokio", "tokio-native-tls", + "tokio-util", "tower", "tower-http", "tower-service", "url", "wasm-bindgen", "wasm-bindgen-futures", + "wasm-streams", "web-sys", ] @@ -1379,6 +1404,17 @@ dependencies = [ "tokio", ] +[[package]] +name = "tokio-stream" +version = "0.1.18" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "32da49809aab5c3bc678af03902d4ccddea2a87d028d86392a4b1560c6906c70" +dependencies = [ + "futures-core", + "pin-project-lite", + "tokio", +] + [[package]] name = "tokio-util" version = "0.7.18" @@ -1680,6 +1716,19 @@ dependencies = [ "wasmparser", ] +[[package]] +name = "wasm-streams" +version = "0.4.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "15053d8d85c7eccdbefef60f06769760a563c7f0a9d6902a13d35c7800b0ad65" +dependencies = [ + "futures-util", + "js-sys", + "wasm-bindgen", + "wasm-bindgen-futures", + "web-sys", +] + [[package]] name = "wasmparser" version = "0.244.0" diff --git a/examples/paste/Cargo.toml b/examples/paste/Cargo.toml index d019dbd..2a6d1a7 100644 --- a/examples/paste/Cargo.toml +++ b/examples/paste/Cargo.toml @@ -12,9 +12,11 @@ path = "src/main.rs" [dependencies] axum = { version = "0.8", features = ["multipart"] } tokio = { version = "1", features = ["full"] } -reqwest = { version = "0.12", features = ["multipart", "json"] } +reqwest = { version = "0.12", features = ["multipart", "json", "stream"] } serde = { version = "1", features = ["derive"] } serde_json = "1" open = "5" tracing = "0.1" tracing-subscriber = { version = "0.3", features = ["env-filter"] } +tokio-stream = "0.1" +futures-util = "0.3" diff --git a/examples/paste/src/html.rs b/examples/paste/src/html.rs index c641bd1..85db561 100644 --- a/examples/paste/src/html.rs +++ b/examples/paste/src/html.rs @@ -376,6 +376,14 @@ fileInput.addEventListener('change', () => { // Initial load loadItems(); + +// Live updates via SSE — auto-refresh when assets arrive from sync or other sources +const evtSource = new EventSource('/paste/events'); +evtSource.addEventListener('new_asset', () => loadItems()); +evtSource.onerror = () => { + // EventSource auto-reconnects; just log for debugging + console.debug('SSE connection lost, reconnecting...'); +}; diff --git a/examples/paste/src/main.rs b/examples/paste/src/main.rs index 0c54e9b..ace4f44 100644 --- a/examples/paste/src/main.rs +++ b/examples/paste/src/main.rs @@ -2,10 +2,12 @@ mod html; use axum::extract::{DefaultBodyLimit, Multipart, Path, State}; use axum::http::{header, StatusCode}; +use axum::response::sse::{Event, Sse}; use axum::response::{Html, IntoResponse, Response}; use axum::routing::{get, post}; use axum::{Json, Router}; use serde::Deserialize; +use std::convert::Infallible; use std::net::SocketAddr; const CAN_API: &str = "http://127.0.0.1:3210/api/v1/can/0"; @@ -225,6 +227,66 @@ async fn proxy_thumb( forward(resp).await } +/// Proxy SSE events from CAN service so the frontend gets live updates. +async fn paste_events( + State(state): State, +) -> Sse>> { + let (tx, rx) = tokio::sync::mpsc::channel::>(32); + + let client = state.client.clone(); + tokio::spawn(async move { + loop { + match client.get(format!("{CAN_API}/events")).send().await { + Ok(resp) => { + use futures_util::StreamExt; + let mut stream = resp.bytes_stream(); + let mut buf = String::new(); + while let Some(chunk) = stream.next().await { + let Ok(bytes) = chunk else { break }; + buf.push_str(&String::from_utf8_lossy(&bytes)); + // Parse SSE frames (double-newline delimited) + while let Some(pos) = buf.find("\n\n") { + let frame = buf[..pos].to_string(); + buf = buf[pos + 2..].to_string(); + let mut event_type = None; + let mut data = None; + for line in frame.lines() { + if let Some(v) = line.strip_prefix("event: ") { + event_type = Some(v.to_string()); + } else if let Some(v) = line.strip_prefix("data: ") { + data = Some(v.to_string()); + } + // lines starting with ':' are SSE comments (keepalive) — skip + } + if let Some(d) = data { + let mut evt = Event::default().data(d); + if let Some(t) = event_type { + evt = evt.event(t); + } + if tx.send(Ok(evt)).await.is_err() { + return; // client disconnected + } + } + } + } + } + Err(e) => { + tracing::warn!("SSE proxy connect failed: {e}"); + } + } + // Reconnect after a short delay + tokio::time::sleep(std::time::Duration::from_secs(2)).await; + } + }); + + let stream = tokio_stream::wrappers::ReceiverStream::new(rx); + Sse::new(stream).keep_alive( + axum::response::sse::KeepAlive::new() + .interval(std::time::Duration::from_secs(15)) + .text("ping"), + ) +} + // ── Main ───────────────────────────────────────────────────────────────── #[tokio::main] @@ -247,6 +309,7 @@ async fn main() { .route("/paste/list", get(paste_list)) .route("/paste/asset/{hash}", get(proxy_asset)) .route("/paste/thumb/{hash}", get(proxy_thumb)) + .route("/paste/events", get(paste_events)) .layer(DefaultBodyLimit::max(100 * 1024 * 1024)) // 100 MB .with_state(state); diff --git a/src/routes/events.rs b/src/routes/events.rs new file mode 100644 index 0000000..5589edb --- /dev/null +++ b/src/routes/events.rs @@ -0,0 +1,39 @@ +//! Public SSE endpoint for real-time asset notifications. +//! +//! `GET /api/v1/can/0/events` — no authentication required. +//! Streams `new_asset` events whenever an asset is ingested or synced. +//! Used by frontends (e.g. Paste) to auto-refresh when content arrives. + +use std::convert::Infallible; + +use axum::extract::State; +use axum::response::sse::{Event, Sse}; +use axum::routing::get; +use axum::Router; +use tokio_stream::wrappers::BroadcastStream; +use tokio_stream::StreamExt; + +use crate::AppState; + +pub fn router() -> Router { + Router::new().route("/api/v1/can/0/events", get(asset_events)) +} + +/// Public SSE stream of new asset events. +/// +/// Each event is `event: new_asset` with `data: {"hash":"...","timestamp":...}`. +async fn asset_events( + State(state): State, +) -> Sse>> { + let rx = state.sync_events.subscribe(); + let stream = BroadcastStream::new(rx).filter_map(|result| match result { + Ok(data) => Some(Ok(Event::default().event("new_asset").data(data))), + Err(_) => None, // lagged — skip, client will see the data on next loadItems() + }); + + Sse::new(stream).keep_alive( + axum::response::sse::KeepAlive::new() + .interval(std::time::Duration::from_secs(15)) + .text("ping"), + ) +} diff --git a/src/routes/mod.rs b/src/routes/mod.rs index 8d3ebd4..752e9f2 100644 --- a/src/routes/mod.rs +++ b/src/routes/mod.rs @@ -5,6 +5,7 @@ pub mod list; pub mod search; pub mod thumb; pub mod sync; +pub mod events; use axum::Router; use crate::AppState; @@ -18,4 +19,5 @@ pub fn router() -> Router { .merge(search::router()) .merge(thumb::router()) .merge(sync::router()) + .merge(events::router()) }