Add live SSE auto-refresh to paste app

- CAN service: public SSE endpoint at /api/v1/can/0/events broadcasts
  new_asset events on ingest and sync push (no auth required)
- Paste backend: SSE proxy at /paste/events streams from CAN service,
  with auto-reconnect on connection loss
- Paste frontend: EventSource subscribes to /paste/events and calls
  loadItems() on new_asset events for instant UI refresh
- When assets arrive via P2P sync, paste updates automatically

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
Jason Tudisco 2026-03-13 15:01:45 -06:00
parent 69e4f13c22
commit c77e6d4105
6 changed files with 164 additions and 1 deletions

View File

@ -241,6 +241,23 @@ version = "0.3.32"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7e3450815272ef58cec6d564423f6e755e25379b217b0bc688e295ba24df6b1d" checksum = "7e3450815272ef58cec6d564423f6e755e25379b217b0bc688e295ba24df6b1d"
[[package]]
name = "futures-io"
version = "0.3.32"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cecba35d7ad927e23624b22ad55235f2239cfa44fd10428eecbeba6d6a717718"
[[package]]
name = "futures-macro"
version = "0.3.32"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e835b70203e41293343137df5c0664546da5745f82ec9b84d40be8336958447b"
dependencies = [
"proc-macro2",
"quote",
"syn",
]
[[package]] [[package]]
name = "futures-sink" name = "futures-sink"
version = "0.3.32" version = "0.3.32"
@ -260,7 +277,11 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "389ca41296e6190b48053de0321d02a77f32f8a5d2461dd38762c0593805c6d6" checksum = "389ca41296e6190b48053de0321d02a77f32f8a5d2461dd38762c0593805c6d6"
dependencies = [ dependencies = [
"futures-core", "futures-core",
"futures-io",
"futures-macro",
"futures-sink",
"futures-task", "futures-task",
"memchr",
"pin-project-lite", "pin-project-lite",
"slab", "slab",
] ]
@ -850,11 +871,13 @@ name = "paste"
version = "0.1.0" version = "0.1.0"
dependencies = [ dependencies = [
"axum", "axum",
"futures-util",
"open", "open",
"reqwest", "reqwest",
"serde", "serde",
"serde_json", "serde_json",
"tokio", "tokio",
"tokio-stream",
"tracing", "tracing",
"tracing-subscriber", "tracing-subscriber",
] ]
@ -991,12 +1014,14 @@ dependencies = [
"sync_wrapper", "sync_wrapper",
"tokio", "tokio",
"tokio-native-tls", "tokio-native-tls",
"tokio-util",
"tower", "tower",
"tower-http", "tower-http",
"tower-service", "tower-service",
"url", "url",
"wasm-bindgen", "wasm-bindgen",
"wasm-bindgen-futures", "wasm-bindgen-futures",
"wasm-streams",
"web-sys", "web-sys",
] ]
@ -1379,6 +1404,17 @@ dependencies = [
"tokio", "tokio",
] ]
[[package]]
name = "tokio-stream"
version = "0.1.18"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "32da49809aab5c3bc678af03902d4ccddea2a87d028d86392a4b1560c6906c70"
dependencies = [
"futures-core",
"pin-project-lite",
"tokio",
]
[[package]] [[package]]
name = "tokio-util" name = "tokio-util"
version = "0.7.18" version = "0.7.18"
@ -1680,6 +1716,19 @@ dependencies = [
"wasmparser", "wasmparser",
] ]
[[package]]
name = "wasm-streams"
version = "0.4.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "15053d8d85c7eccdbefef60f06769760a563c7f0a9d6902a13d35c7800b0ad65"
dependencies = [
"futures-util",
"js-sys",
"wasm-bindgen",
"wasm-bindgen-futures",
"web-sys",
]
[[package]] [[package]]
name = "wasmparser" name = "wasmparser"
version = "0.244.0" version = "0.244.0"

View File

@ -12,9 +12,11 @@ path = "src/main.rs"
[dependencies] [dependencies]
axum = { version = "0.8", features = ["multipart"] } axum = { version = "0.8", features = ["multipart"] }
tokio = { version = "1", features = ["full"] } tokio = { version = "1", features = ["full"] }
reqwest = { version = "0.12", features = ["multipart", "json"] } reqwest = { version = "0.12", features = ["multipart", "json", "stream"] }
serde = { version = "1", features = ["derive"] } serde = { version = "1", features = ["derive"] }
serde_json = "1" serde_json = "1"
open = "5" open = "5"
tracing = "0.1" tracing = "0.1"
tracing-subscriber = { version = "0.3", features = ["env-filter"] } tracing-subscriber = { version = "0.3", features = ["env-filter"] }
tokio-stream = "0.1"
futures-util = "0.3"

View File

@ -376,6 +376,14 @@ fileInput.addEventListener('change', () => {
// Initial load // Initial load
loadItems(); loadItems();
// Live updates via SSE — auto-refresh when assets arrive from sync or other sources
const evtSource = new EventSource('/paste/events');
evtSource.addEventListener('new_asset', () => loadItems());
evtSource.onerror = () => {
// EventSource auto-reconnects; just log for debugging
console.debug('SSE connection lost, reconnecting...');
};
</script> </script>
</body> </body>

View File

@ -2,10 +2,12 @@ mod html;
use axum::extract::{DefaultBodyLimit, Multipart, Path, State}; use axum::extract::{DefaultBodyLimit, Multipart, Path, State};
use axum::http::{header, StatusCode}; use axum::http::{header, StatusCode};
use axum::response::sse::{Event, Sse};
use axum::response::{Html, IntoResponse, Response}; use axum::response::{Html, IntoResponse, Response};
use axum::routing::{get, post}; use axum::routing::{get, post};
use axum::{Json, Router}; use axum::{Json, Router};
use serde::Deserialize; use serde::Deserialize;
use std::convert::Infallible;
use std::net::SocketAddr; use std::net::SocketAddr;
const CAN_API: &str = "http://127.0.0.1:3210/api/v1/can/0"; const CAN_API: &str = "http://127.0.0.1:3210/api/v1/can/0";
@ -225,6 +227,66 @@ async fn proxy_thumb(
forward(resp).await forward(resp).await
} }
/// Proxy SSE events from CAN service so the frontend gets live updates.
async fn paste_events(
State(state): State<AppState>,
) -> Sse<impl futures_util::Stream<Item = Result<Event, Infallible>>> {
let (tx, rx) = tokio::sync::mpsc::channel::<Result<Event, Infallible>>(32);
let client = state.client.clone();
tokio::spawn(async move {
loop {
match client.get(format!("{CAN_API}/events")).send().await {
Ok(resp) => {
use futures_util::StreamExt;
let mut stream = resp.bytes_stream();
let mut buf = String::new();
while let Some(chunk) = stream.next().await {
let Ok(bytes) = chunk else { break };
buf.push_str(&String::from_utf8_lossy(&bytes));
// Parse SSE frames (double-newline delimited)
while let Some(pos) = buf.find("\n\n") {
let frame = buf[..pos].to_string();
buf = buf[pos + 2..].to_string();
let mut event_type = None;
let mut data = None;
for line in frame.lines() {
if let Some(v) = line.strip_prefix("event: ") {
event_type = Some(v.to_string());
} else if let Some(v) = line.strip_prefix("data: ") {
data = Some(v.to_string());
}
// lines starting with ':' are SSE comments (keepalive) — skip
}
if let Some(d) = data {
let mut evt = Event::default().data(d);
if let Some(t) = event_type {
evt = evt.event(t);
}
if tx.send(Ok(evt)).await.is_err() {
return; // client disconnected
}
}
}
}
}
Err(e) => {
tracing::warn!("SSE proxy connect failed: {e}");
}
}
// Reconnect after a short delay
tokio::time::sleep(std::time::Duration::from_secs(2)).await;
}
});
let stream = tokio_stream::wrappers::ReceiverStream::new(rx);
Sse::new(stream).keep_alive(
axum::response::sse::KeepAlive::new()
.interval(std::time::Duration::from_secs(15))
.text("ping"),
)
}
// ── Main ───────────────────────────────────────────────────────────────── // ── Main ─────────────────────────────────────────────────────────────────
#[tokio::main] #[tokio::main]
@ -247,6 +309,7 @@ async fn main() {
.route("/paste/list", get(paste_list)) .route("/paste/list", get(paste_list))
.route("/paste/asset/{hash}", get(proxy_asset)) .route("/paste/asset/{hash}", get(proxy_asset))
.route("/paste/thumb/{hash}", get(proxy_thumb)) .route("/paste/thumb/{hash}", get(proxy_thumb))
.route("/paste/events", get(paste_events))
.layer(DefaultBodyLimit::max(100 * 1024 * 1024)) // 100 MB .layer(DefaultBodyLimit::max(100 * 1024 * 1024)) // 100 MB
.with_state(state); .with_state(state);

39
src/routes/events.rs Normal file
View File

@ -0,0 +1,39 @@
//! Public SSE endpoint for real-time asset notifications.
//!
//! `GET /api/v1/can/0/events` — no authentication required.
//! Streams `new_asset` events whenever an asset is ingested or synced.
//! Used by frontends (e.g. Paste) to auto-refresh when content arrives.
use std::convert::Infallible;
use axum::extract::State;
use axum::response::sse::{Event, Sse};
use axum::routing::get;
use axum::Router;
use tokio_stream::wrappers::BroadcastStream;
use tokio_stream::StreamExt;
use crate::AppState;
pub fn router() -> Router<AppState> {
Router::new().route("/api/v1/can/0/events", get(asset_events))
}
/// Public SSE stream of new asset events.
///
/// Each event is `event: new_asset` with `data: {"hash":"...","timestamp":...}`.
async fn asset_events(
State(state): State<AppState>,
) -> Sse<impl tokio_stream::Stream<Item = Result<Event, Infallible>>> {
let rx = state.sync_events.subscribe();
let stream = BroadcastStream::new(rx).filter_map(|result| match result {
Ok(data) => Some(Ok(Event::default().event("new_asset").data(data))),
Err(_) => None, // lagged — skip, client will see the data on next loadItems()
});
Sse::new(stream).keep_alive(
axum::response::sse::KeepAlive::new()
.interval(std::time::Duration::from_secs(15))
.text("ping"),
)
}

View File

@ -5,6 +5,7 @@ pub mod list;
pub mod search; pub mod search;
pub mod thumb; pub mod thumb;
pub mod sync; pub mod sync;
pub mod events;
use axum::Router; use axum::Router;
use crate::AppState; use crate::AppState;
@ -18,4 +19,5 @@ pub fn router() -> Router<AppState> {
.merge(search::router()) .merge(search::router())
.merge(thumb::router()) .merge(thumb::router())
.merge(sync::router()) .merge(sync::router())
.merge(events::router())
} }