diff --git a/Cargo.lock b/Cargo.lock index aa4e667..4614ab8 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -182,6 +182,7 @@ dependencies = [ "tempfile", "thiserror", "tokio", + "tokio-stream", "tokio-test", "tokio-util", "tower-http", @@ -1914,6 +1915,7 @@ dependencies = [ "futures-core", "pin-project-lite", "tokio", + "tokio-util", ] [[package]] diff --git a/Cargo.toml b/Cargo.toml index dd8194f..d09daa8 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -40,6 +40,9 @@ tracing-subscriber = { version = "0.3", features = ["env-filter"] } # Protobuf (sync API) prost = "0.13" +# Stream utilities (SSE for sync events) +tokio-stream = { version = "0.1", features = ["sync"] } + # Utilities chrono = { version = "0.4", features = ["serde"] } anyhow = "1" diff --git a/examples/can-sync/Cargo.lock b/examples/can-sync/Cargo.lock index 512b845..36a79b2 100644 --- a/examples/can-sync/Cargo.lock +++ b/examples/can-sync/Cargo.lock @@ -229,6 +229,7 @@ dependencies = [ "anyhow", "blake3", "bytes", + "futures-util", "hex", "iroh", "iroh-gossip", @@ -241,6 +242,7 @@ dependencies = [ "serde_yaml", "tempfile", "tokio", + "tokio-stream", "tracing", "tracing-subscriber", ] diff --git a/examples/can-sync/Cargo.toml b/examples/can-sync/Cargo.toml index 78842a6..700948f 100644 --- a/examples/can-sync/Cargo.toml +++ b/examples/can-sync/Cargo.toml @@ -40,6 +40,10 @@ tracing-subscriber = { version = "0.3", features = ["env-filter"] } # Stream utilities (needed for gossip event stream) n0-future = "0.1" +# SSE client (for real-time events from CAN service) +tokio-stream = "0.1" +futures-util = "0.3" + # Utilities anyhow = "1" bytes = "1" diff --git a/examples/can-sync/src/can_client.rs b/examples/can-sync/src/can_client.rs index 8ab0bf8..6f71ac9 100644 --- a/examples/can-sync/src/can_client.rs +++ b/examples/can-sync/src/can_client.rs @@ -1,10 +1,23 @@ //! HTTP client for CAN service's private sync API (protobuf-encoded). +//! +//! Includes SSE subscription for real-time ingest notifications and +//! incremental hash queries via `?since=` parameter. use anyhow::{Context, Result}; +use futures_util::StreamExt; use prost::Message; +use tokio::sync::mpsc; +use tracing::{debug, info, warn}; use crate::protocol::*; +/// Event received from the CAN service SSE stream. +#[derive(Debug, Clone)] +pub struct SyncEvent { + pub hash: String, + pub timestamp: i64, +} + /// Client for CAN service's /sync/* endpoints. #[derive(Clone)] pub struct CanSyncClient { @@ -44,7 +57,7 @@ impl CanSyncClient { resp.bytes().await.with_context(|| format!("reading body from {}", path)) } - /// Get all asset digests for reconciliation. + /// Get all asset digests (full list — use for initial reconciliation only). pub async fn get_hashes(&self) -> Result { let req = HashListRequest {}; let mut buf = Vec::with_capacity(req.encoded_len()); @@ -54,6 +67,33 @@ impl CanSyncClient { HashListResponse::decode(resp_bytes).context("decode HashListResponse") } + /// Get only asset digests newer than `since` timestamp (incremental query). + pub async fn get_hashes_since(&self, since: i64) -> Result { + let req = HashListRequest {}; + let mut buf = Vec::with_capacity(req.encoded_len()); + req.encode(&mut buf)?; + + let url = format!("{}/sync/hashes?since={}", self.base_url, since); + let resp = self + .http + .post(&url) + .header("X-Sync-Key", &self.sync_key) + .header("Content-Type", "application/x-protobuf") + .body(buf) + .send() + .await + .with_context(|| format!("POST {}", url))?; + + if !resp.status().is_success() { + let status = resp.status(); + let text = resp.text().await.unwrap_or_default(); + anyhow::bail!("/sync/hashes?since={} returned {}: {}", since, status, text); + } + + let resp_bytes = resp.bytes().await?; + HashListResponse::decode(resp_bytes).context("decode HashListResponse") + } + /// Pull full assets by hash. pub async fn pull(&self, hashes: Vec) -> Result { let req = PullRequest { hashes }; @@ -101,4 +141,102 @@ impl CanSyncClient { pub async fn health_check(&self) -> bool { self.get_hashes().await.is_ok() } + + /// Subscribe to SSE events from CAN service. Sends `SyncEvent` on the + /// returned channel whenever the CAN service ingests a new asset. + /// + /// Automatically reconnects on disconnect (with incremental catch-up). + /// Returns a channel receiver that yields events. + pub fn subscribe_events(&self) -> mpsc::UnboundedReceiver { + let (tx, rx) = mpsc::unbounded_channel(); + let url = format!( + "{}/sync/events?key={}", + self.base_url, self.sync_key + ); + let http = self.http.clone(); + + tokio::spawn(async move { + loop { + info!("Connecting to SSE stream: {}", url.split('?').next().unwrap_or(&url)); + match Self::run_sse_stream(&http, &url, &tx).await { + Ok(()) => { + info!("SSE stream ended cleanly"); + } + Err(e) => { + warn!("SSE stream error: {:#}", e); + } + } + + // If the receiver is dropped, stop reconnecting + if tx.is_closed() { + debug!("SSE subscriber dropped, stopping reconnect loop"); + break; + } + + // Reconnect after a short delay + info!("Reconnecting SSE in 2s..."); + tokio::time::sleep(std::time::Duration::from_secs(2)).await; + } + }); + + rx + } + + /// Internal: connect to SSE and forward events until the stream ends or errors. + async fn run_sse_stream( + http: &reqwest::Client, + url: &str, + tx: &mpsc::UnboundedSender, + ) -> Result<()> { + let resp = http + .get(url) + .header("Accept", "text/event-stream") + .send() + .await + .context("SSE connect")?; + + if !resp.status().is_success() { + anyhow::bail!("SSE returned status {}", resp.status()); + } + + let mut stream = resp.bytes_stream(); + let mut buffer = String::new(); + + while let Some(chunk) = stream.next().await { + let chunk = chunk.context("reading SSE chunk")?; + buffer.push_str(&String::from_utf8_lossy(&chunk)); + + // Process complete SSE messages (separated by double newlines) + while let Some(pos) = buffer.find("\n\n") { + let message = buffer[..pos].to_string(); + buffer = buffer[pos + 2..].to_string(); + + // Parse SSE message: look for "data: {...}" lines + for line in message.lines() { + if let Some(data) = line.strip_prefix("data:") { + let data = data.trim(); + if data == "ping" || data.is_empty() { + continue; + } + + // Parse JSON: {"hash":"...","timestamp":...} + if let Ok(value) = serde_json::from_str::(data) { + if let (Some(hash), Some(ts)) = ( + value["hash"].as_str(), + value["timestamp"].as_i64(), + ) { + debug!("SSE event: new_asset hash={}", &hash[..hash.len().min(12)]); + let _ = tx.send(SyncEvent { + hash: hash.to_string(), + timestamp: ts, + }); + } + } + } + } + } + } + + Ok(()) + } } diff --git a/examples/can-sync/src/main.rs b/examples/can-sync/src/main.rs index 71f0681..b5a4605 100644 --- a/examples/can-sync/src/main.rs +++ b/examples/can-sync/src/main.rs @@ -107,7 +107,6 @@ async fn main() -> Result<()> { let ticket_path = ticket_path.clone(); let endpoint_direct = endpoint.clone(); let can_direct = can.clone(); - let poll_interval = std::time::Duration::from_secs(config.poll_interval_secs); tokio::spawn(async move { info!("Waiting for peer addr file: {}", ticket_path); @@ -146,8 +145,8 @@ async fn main() -> Result<()> { info!("Initial sync with {} complete, starting live sync", short); - // Live sync: push new local assets + accept incoming streams - peer::run_live_sync(conn, can_direct, poll_interval).await; + // Live sync: SSE-driven push + accept incoming streams + peer::run_live_sync(conn, can_direct).await; } Err(e) => { error!("Failed to connect to {}: {:#}", short, e); @@ -159,18 +158,16 @@ async fn main() -> Result<()> { // Spawn incoming connection handler let endpoint_accept = endpoint.clone(); let can_accept = can.clone(); - let accept_poll_interval = std::time::Duration::from_secs(config.poll_interval_secs); tokio::spawn(async move { loop { match endpoint_accept.accept().await { Some(incoming) => { let can_clone = can_accept.clone(); - let poll_dur = accept_poll_interval; tokio::spawn(async move { match incoming.await { Ok(conn) => { info!("Accepted incoming connection from {}", conn.remote_id().fmt_short()); - peer::handle_incoming(conn, can_clone, poll_dur).await; + peer::handle_incoming(conn, can_clone, std::time::Duration::from_secs(0)).await; } Err(e) => { warn!("Failed to accept connection: {:#}", e); @@ -187,8 +184,6 @@ async fn main() -> Result<()> { }); // Main loop: connect to discovered peers (from gossip) and sync - let poll_interval = std::time::Duration::from_secs(config.poll_interval_secs); - info!("Waiting for peers..."); while let Some(peer_id) = peer_rx.recv().await { @@ -197,7 +192,6 @@ async fn main() -> Result<()> { let endpoint_clone = endpoint.clone(); let can_clone = can.clone(); - let poll_dur = poll_interval; tokio::spawn(async move { let conn = match endpoint_clone.connect(peer_id, SYNC_ALPN).await { @@ -213,7 +207,7 @@ async fn main() -> Result<()> { return; } - peer::run_live_sync(conn, can_clone, poll_dur).await; + peer::run_live_sync(conn, can_clone).await; }); } diff --git a/examples/can-sync/src/peer.rs b/examples/can-sync/src/peer.rs index 1b0f926..9663c35 100644 --- a/examples/can-sync/src/peer.rs +++ b/examples/can-sync/src/peer.rs @@ -4,11 +4,15 @@ //! 1. Exchange hash lists (from their local CAN services) //! 2. Compute the diff (what each side is missing) //! 3. Send/receive missing assets concurrently (avoids deadlock) -//! 4. Continue polling for new assets and pushing them +//! 4. Subscribe to SSE events from local CAN for instant push on new assets //! -//! The live sync uses an unbounded channel to share received hashes -//! from the receive loop to the push loop, preventing "echo" where -//! an asset received from a peer gets pushed right back to them. +//! The live sync uses: +//! - **SSE events** from local CAN service to detect new assets instantly +//! (replaces the old polling loop — no more wasted hash-list queries) +//! - An unbounded channel to share received hashes from the receive loop +//! to the push loop, preventing "echo" where an asset received from a +//! peer gets pushed right back to them. +//! - A fallback incremental poll on timeout for catch-up if SSE was briefly down. use std::collections::{HashMap, HashSet}; @@ -19,7 +23,7 @@ use tokio::io::{AsyncReadExt, AsyncWriteExt}; use tokio::sync::mpsc; use tracing::{debug, error, info, warn}; -use crate::can_client::CanSyncClient; +use crate::can_client::{CanSyncClient, SyncEvent}; use crate::protocol::*; // Message type tags for QUIC stream framing @@ -140,10 +144,6 @@ pub async fn run_sync_session( ); // Step 5+6: Send and receive assets CONCURRENTLY to avoid deadlock. - // - // If both sides have many large assets, doing send-then-receive sequentially - // can deadlock: QUIC flow control stalls the writer because the reader hasn't - // drained its buffer (it's busy writing). Running both in parallel avoids this. let send_fut = async { if !they_need.is_empty() { send_assets(&can, &mut send, &they_need, &short_id).await?; @@ -171,7 +171,6 @@ async fn send_assets( hashes: &[String], peer_short: &str, ) -> Result<()> { - // Pull assets in batches to avoid huge single requests for chunk in hashes.chunks(10) { let pull_resp = can .pull(chunk.to_vec()) @@ -263,7 +262,7 @@ async fn receive_assets( pub async fn handle_incoming( conn: Connection, can: CanSyncClient, - poll_interval: std::time::Duration, + _poll_interval: std::time::Duration, ) { let peer_id = conn.remote_id(); let short_id = peer_id.fmt_short().to_string(); @@ -275,29 +274,28 @@ pub async fn handle_incoming( } info!("Initial sync with {} complete, starting live sync", short_id); - - // Run both live loops concurrently - run_live_sync(conn, can, poll_interval).await; + run_live_sync(conn, can).await; } /// Run both live sync loops (push + receive) concurrently. /// -/// Uses an unbounded channel to prevent the "echo" problem: when we receive -/// an asset from the peer, we tell the push loop about it so it doesn't -/// push the same asset right back. +/// Uses SSE events from CAN service for instant push (no polling). +/// Uses an unbounded channel to prevent the "echo" problem. pub async fn run_live_sync( conn: Connection, can: CanSyncClient, - poll_interval: std::time::Duration, ) { let short_id = conn.remote_id().fmt_short().to_string(); // Channel for receive loop to notify push loop about received hashes let (received_tx, received_rx) = mpsc::unbounded_channel::(); + // Subscribe to SSE events from local CAN service + let sse_rx = can.subscribe_events(); + // Run push loop and receive loop concurrently — when either ends, we're done tokio::select! { - result = live_push_loop(conn.clone(), can.clone(), poll_interval, received_rx) => { + result = live_push_loop(conn.clone(), can.clone(), received_rx, sse_rx) => { if let Err(e) = result { warn!("Live push loop with {} ended: {:#}", short_id, e); } @@ -310,81 +308,124 @@ pub async fn run_live_sync( } } -/// Poll for new local assets and push them to the peer via new QUIC streams. +/// Wait for SSE events from local CAN service and push new assets to the peer. /// -/// Drains the `received_rx` channel each tick to learn about hashes that arrived -/// from the peer, so we don't echo them back. +/// Drains the `received_rx` channel to learn about hashes that arrived from +/// the peer, so we don't echo them back. +/// +/// Falls back to incremental poll if no SSE events arrive within 30s. async fn live_push_loop( conn: Connection, can: CanSyncClient, - poll_interval: std::time::Duration, mut received_rx: mpsc::UnboundedReceiver, + mut sse_rx: mpsc::UnboundedReceiver, ) -> Result<()> { let peer_id = conn.remote_id(); let short_id = peer_id.fmt_short().to_string(); - info!("Starting live push loop with {}", short_id); + info!("Starting live push loop with {} (SSE-driven)", short_id); // Track what we've already synced (local + received from peer) - let mut known_hashes: HashSet = { - let resp = can.get_hashes().await?; - resp.assets.into_iter().map(|a| a.hash).collect() - }; + let resp = can.get_hashes().await?; + let mut max_timestamp: i64 = resp.assets.iter().map(|a| a.timestamp).max().unwrap_or(0); + let mut known_hashes: HashSet = resp.assets.into_iter().map(|a| a.hash).collect(); - let mut interval = tokio::time::interval(poll_interval); + // Fallback: if no SSE event in 30s, do an incremental poll to catch gaps + let fallback_interval = std::time::Duration::from_secs(30); loop { - interval.tick().await; + // Wait for SSE event, or fallback timeout + let new_hashes: Vec = tokio::select! { + event = sse_rx.recv() => { + match event { + Some(evt) => { + // Drain any additional events that arrived at the same time + let mut batch = vec![evt]; + while let Ok(more) = sse_rx.try_recv() { + batch.push(more); + } - // Drain any hashes that the receive loop got from the peer. - // This prevents us from pushing them right back (echo prevention). - while let Ok(hash) = received_rx.try_recv() { - known_hashes.insert(hash); - } + // Drain received-from-peer hashes (echo prevention) + while let Ok(hash) = received_rx.try_recv() { + known_hashes.insert(hash); + } - // Poll for new assets - let resp = match can.get_hashes().await { - Ok(r) => r, - Err(e) => { - warn!("Failed to poll CAN service: {:#}", e); - continue; + // Filter to only truly new hashes + batch + .into_iter() + .filter(|e| { + if e.timestamp > max_timestamp { + max_timestamp = e.timestamp; + } + !known_hashes.contains(&e.hash) + }) + .map(|e| e.hash) + .collect() + } + None => { + warn!("SSE channel closed, stopping push loop"); + break; + } + } + } + + // Fallback: periodic incremental poll + _ = tokio::time::sleep(fallback_interval) => { + debug!("Fallback incremental poll (no SSE events in {}s)", fallback_interval.as_secs()); + + while let Ok(hash) = received_rx.try_recv() { + known_hashes.insert(hash); + } + + match can.get_hashes_since(max_timestamp).await { + Ok(resp) => { + resp.assets + .into_iter() + .filter(|a| { + if a.timestamp > max_timestamp { + max_timestamp = a.timestamp; + } + !known_hashes.contains(&a.hash) + }) + .map(|a| a.hash) + .collect() + } + Err(e) => { + warn!("Fallback poll failed: {:#}", e); + continue; + } + } } }; - let current_hashes: HashSet = - resp.assets.iter().map(|a| a.hash.clone()).collect(); - let new_hashes: Vec = current_hashes - .difference(&known_hashes) - .cloned() - .collect(); + if new_hashes.is_empty() { + continue; + } - if !new_hashes.is_empty() { - info!( - "Detected {} new local assets, pushing to {}", - new_hashes.len(), - short_id - ); + info!( + "Pushing {} new assets to peer {}", + new_hashes.len(), + short_id + ); - // Open a new stream for this batch - match conn.open_bi().await { - Ok((mut send, _recv)) => { - if let Err(e) = send_assets(&can, &mut send, &new_hashes, &short_id).await { - error!("Failed to push new assets to {}: {:#}", short_id, e); - } - // Send done marker - let done_frame = encode_frame(MSG_DONE, &[]); - let _ = send.write_all(&done_frame).await; - let _ = send.flush().await; - let _ = send.finish(); - } - Err(e) => { - warn!("Failed to open stream to {}: {:#}", short_id, e); - break; // Connection probably dead + // Open a new QUIC stream for this batch + match conn.open_bi().await { + Ok((mut send, _recv)) => { + if let Err(e) = send_assets(&can, &mut send, &new_hashes, &short_id).await { + error!("Failed to push new assets to {}: {:#}", short_id, e); } + let done_frame = encode_frame(MSG_DONE, &[]); + let _ = send.write_all(&done_frame).await; + let _ = send.flush().await; + let _ = send.finish(); + } + Err(e) => { + warn!("Failed to open stream to {}: {:#}", short_id, e); + break; // Connection probably dead } } - // Update our known set (union with current so we keep peer-received hashes too) - for h in current_hashes { + // Update known set + for h in new_hashes { known_hashes.insert(h); } } @@ -393,10 +434,6 @@ async fn live_push_loop( } /// Accept incoming QUIC bi-streams from the peer and receive assets. -/// -/// The peer's live_push_loop opens new bi-streams for each batch of assets. -/// This loop accepts those streams, ingests assets into the local CAN service, -/// and notifies the push loop via channel to prevent echo. async fn live_receive_loop( conn: Connection, can: CanSyncClient, @@ -412,7 +449,6 @@ async fn live_receive_loop( info!("Accepted live sync stream from peer {}", short_id); match receive_assets(&can, &mut recv, &short_id).await { Ok(received_hashes) => { - // Notify push loop about received hashes to prevent echo for hash in received_hashes { let _ = received_tx.send(hash); } diff --git a/src/db.rs b/src/db.rs index 08227d9..0c6542d 100644 --- a/src/db.rs +++ b/src/db.rs @@ -457,6 +457,36 @@ pub fn get_all_assets(conn: &Connection) -> rusqlite::Result> { Ok(assets) } +/// Get assets with `timestamp > since` (for incremental sync queries). +pub fn get_assets_since(conn: &Connection, since: i64) -> rusqlite::Result> { + let mut stmt = conn.prepare( + "SELECT id, timestamp, hash, mime_type, application, user_identity, description, + actual_filename, human_filename, human_path, is_trashed, is_corrupted, size + FROM assets WHERE timestamp > ?1 + ORDER BY timestamp ASC", + )?; + let assets = stmt + .query_map([since], |row| { + Ok(Asset { + id: row.get(0)?, + timestamp: row.get(1)?, + hash: row.get(2)?, + mime_type: row.get(3)?, + application: row.get(4)?, + user_identity: row.get(5)?, + description: row.get(6)?, + actual_filename: row.get(7)?, + human_filename: row.get(8)?, + human_path: row.get(9)?, + is_trashed: row.get(10)?, + is_corrupted: row.get(11)?, + size: row.get(12)?, + }) + })? + .collect::>>()?; + Ok(assets) +} + /// Get all non-trashed asset records (for verifier startup scan). pub fn get_all_active_assets(conn: &Connection) -> rusqlite::Result> { let mut stmt = conn.prepare( diff --git a/src/lib.rs b/src/lib.rs index 199acf1..5eb0db7 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -13,8 +13,13 @@ use std::sync::Arc; use crate::config::Config; use crate::db::Db; +/// Broadcast channel for notifying sync subscribers about new assets. +/// Each message is `"hash:timestamp"` (e.g. `"abc123def456:1710000000000"`). +pub type SyncEventSender = tokio::sync::broadcast::Sender; + #[derive(Clone)] pub struct AppState { pub config: Arc, pub db: Db, + pub sync_events: SyncEventSender, } diff --git a/src/main.rs b/src/main.rs index b5a3b4c..7d3c68a 100644 --- a/src/main.rs +++ b/src/main.rs @@ -41,9 +41,14 @@ async fn main() -> anyhow::Result<()> { // Start background verifier verifier::start((*config).clone(), db.clone()); + // Broadcast channel for SSE sync events (capacity doesn't matter much — + // slow receivers just miss events and do a full reconciliation on reconnect) + let (sync_events, _) = tokio::sync::broadcast::channel::(256); + let state = AppState { config: config.clone(), db, + sync_events, }; // Build router diff --git a/src/routes/ingest.rs b/src/routes/ingest.rs index 8bd6151..1b1c683 100644 --- a/src/routes/ingest.rs +++ b/src/routes/ingest.rs @@ -85,6 +85,13 @@ fn do_ingest(state: &AppState, input: IngestInput) -> Result Router { .route("/sync/pull", post(sync_pull)) .route("/sync/push", post(sync_push)) .route("/sync/meta", post(sync_meta)) + .route("/sync/events", get(sync_events)) +} + +/// Query params for /sync/hashes (optional `since` timestamp for incremental queries). +#[derive(serde::Deserialize, Default)] +struct HashesQuery { + /// Only return assets with `timestamp > since`. Omit or 0 for full list. + since: Option, } // ── Auth ──────────────────────────────────────────────────────────────── @@ -157,14 +173,22 @@ fn proto_response(buf: Vec) -> (StatusCode, [(&'static str, &'static str); 1 async fn sync_hashes( State(state): State, headers: HeaderMap, + query: Query, _body: Bytes, ) -> Result { check_sync_key(&state, &headers)?; + let since = query.since.unwrap_or(0); + let assets = { let conn = state.db.lock().unwrap(); - db::get_all_assets(&conn) - .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, format!("DB error: {}", e)))? + if since > 0 { + db::get_assets_since(&conn, since) + .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, format!("DB error: {}", e)))? + } else { + db::get_all_assets(&conn) + .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, format!("DB error: {}", e)))? + } }; let resp = HashListResponse { @@ -333,6 +357,13 @@ async fn sync_push( tracing::info!("Sync push: ingested {} ({}B)", &bundle.hash[..12], bundle.content.len()); + // Notify SSE subscribers about the new asset + let event_data = format!( + r#"{{"hash":"{}","timestamp":{}}}"#, + bundle.hash, bundle.timestamp + ); + let _ = state.sync_events.send(event_data); + Ok(proto_response(encode_proto(&PushResponse { hash: bundle.hash, already_existed: false, @@ -379,3 +410,57 @@ async fn sync_meta( success: true, })?)) } + +// ── GET /sync/events (SSE) ──────────────────────────────────────────── + +/// Server-Sent Events endpoint. Streams `new_asset` events whenever a file is +/// ingested (via public API or sync push). Requires `X-Sync-Key` as a query +/// param (`?key=...`) since SSE/EventSource doesn't support custom headers. +/// +/// Each event is: +/// ```text +/// event: new_asset +/// data: {"hash":"abc...","timestamp":1710000000000} +/// ``` +async fn sync_events( + State(state): State, + headers: HeaderMap, + query: Query, +) -> Result>>, (StatusCode, String)> +{ + // SSE clients (EventSource) can't set custom headers, so accept key from query param too + let key_ok = check_sync_key(&state, &headers).is_ok() + || query + .key + .as_deref() + .map(|k| { + state + .config + .sync_api_key + .as_deref() + .map(|expected| k == expected) + .unwrap_or(false) + }) + .unwrap_or(false); + + if !key_ok { + return Err((StatusCode::UNAUTHORIZED, "Invalid sync key".into())); + } + + let rx = state.sync_events.subscribe(); + let stream = BroadcastStream::new(rx).filter_map(|result| match result { + Ok(data) => Some(Ok(Event::default().event("new_asset").data(data))), + Err(_) => None, // lagged — skip missed events, client will reconcile + }); + + Ok(Sse::new(stream).keep_alive( + axum::response::sse::KeepAlive::new() + .interval(std::time::Duration::from_secs(15)) + .text("ping"), + )) +} + +#[derive(serde::Deserialize, Default)] +struct SseQuery { + key: Option, +}