//! Per-peer sync: reconciliation and live bidirectional asset transfer. //! //! When two sync agents connect, they: //! 1. Exchange hash lists (from their local CAN services) //! 2. Compute the diff (what each side is missing) //! 3. Send/receive missing assets concurrently (avoids deadlock) //! 4. Subscribe to SSE events from local CAN for instant push on new assets //! //! The live sync uses: //! - **SSE events** from local CAN service to detect new assets instantly //! (replaces the old polling loop — no more wasted hash-list queries) //! - An unbounded channel to share received hashes from the receive loop //! to the push loop, preventing "echo" where an asset received from a //! peer gets pushed right back to them. //! - A fallback incremental poll on timeout for catch-up if SSE was briefly down. use std::collections::{HashMap, HashSet}; use anyhow::{Context, Result}; use iroh::endpoint::Connection; use prost::Message; use tokio::io::{AsyncReadExt, AsyncWriteExt}; use tokio::sync::mpsc; use tracing::{debug, error, info, warn}; use crate::can_client::{CanSyncClient, SyncEvent}; use crate::protocol::*; // Message type tags for QUIC stream framing const MSG_HASH_SET: u8 = 0x01; const MSG_ASSET_BUNDLE: u8 = 0x02; const MSG_META_UPDATE: u8 = 0x03; const MSG_DONE: u8 = 0x04; /// Frame a protobuf message with a type tag and length prefix. fn encode_frame(msg_type: u8, payload: &[u8]) -> Vec { let len = payload.len() as u32; let mut frame = Vec::with_capacity(5 + payload.len()); frame.push(msg_type); frame.extend_from_slice(&len.to_be_bytes()); frame.extend_from_slice(payload); frame } /// Read a single framed message from a QUIC recv stream. /// Returns (msg_type, payload_bytes). async fn read_frame(recv: &mut iroh::endpoint::RecvStream) -> Result<(u8, Vec)> { let msg_type = recv.read_u8().await.context("reading message type")?; let len = recv.read_u32().await.context("reading message length")?; if len > 256 * 1024 * 1024 { anyhow::bail!("Message too large: {} bytes", len); } let mut payload = vec![0u8; len as usize]; recv.read_exact(&mut payload) .await .context("reading message payload")?; Ok((msg_type, payload)) } /// Run a full sync session with a connected peer. /// /// This handles initial reconciliation: exchange hash lists, compute diffs, /// then send/receive missing assets **concurrently** to avoid deadlock when /// both sides have large amounts of data to transfer. pub async fn run_sync_session( conn: Connection, can: CanSyncClient, is_initiator: bool, ) -> Result<()> { let peer_id = conn.remote_id(); let short_id = peer_id.fmt_short().to_string(); info!("Starting sync session with {} (initiator={})", short_id, is_initiator); // Initiator opens the stream, responder accepts it let (mut send, mut recv) = if is_initiator { conn.open_bi().await.context("opening bi stream")? } else { conn.accept_bi().await.context("accepting bi stream")? }; // Step 1: Get our local hash list from CAN service let our_hashes = can.get_hashes().await.context("getting local hashes")?; let our_hash_map: HashMap = our_hashes .assets .iter() .map(|a| (a.hash.clone(), a)) .collect(); info!( "Local state: {} assets, sending to peer {}", our_hashes.assets.len(), short_id ); // Step 2: Send our hash set to peer let hash_set_msg = PeerHashSet { assets: our_hashes.assets.clone(), }; let mut buf = Vec::with_capacity(hash_set_msg.encoded_len()); hash_set_msg.encode(&mut buf)?; let frame = encode_frame(MSG_HASH_SET, &buf); send.write_all(&frame).await.context("sending hash set")?; send.flush().await?; // Step 3: Receive peer's hash set let (msg_type, payload) = read_frame(&mut recv).await.context("reading peer hash set")?; if msg_type != MSG_HASH_SET { anyhow::bail!("Expected hash set message, got type {}", msg_type); } let peer_hash_set = PeerHashSet::decode(payload.as_slice()).context("decoding peer hash set")?; let peer_hash_map: HashMap = peer_hash_set .assets .iter() .map(|a| (a.hash.clone(), a)) .collect(); info!( "Peer {} has {} assets", short_id, peer_hash_set.assets.len() ); // Step 4: Compute diffs let our_hashes_set: HashSet<&String> = our_hash_map.keys().collect(); let peer_hashes_set: HashSet<&String> = peer_hash_map.keys().collect(); let we_need: Vec = peer_hashes_set .difference(&our_hashes_set) .map(|h| (*h).clone()) .collect(); let they_need: Vec = our_hashes_set .difference(&peer_hashes_set) .map(|h| (*h).clone()) .collect(); info!( "Diff with {}: we need {}, they need {}", short_id, we_need.len(), they_need.len() ); // Step 5+6: Send and receive assets CONCURRENTLY to avoid deadlock. let send_fut = async { if !they_need.is_empty() { send_assets(&can, &mut send, &they_need, &short_id).await?; } let done_frame = encode_frame(MSG_DONE, &[]); send.write_all(&done_frame).await.context("sending DONE")?; send.flush().await.context("flushing after DONE")?; Ok::<_, anyhow::Error>(()) }; let recv_fut = receive_assets(&can, &mut recv, &short_id); let (send_result, recv_result) = tokio::join!(send_fut, recv_fut); send_result.context("sending assets to peer")?; recv_result.context("receiving assets from peer")?; info!("Sync session with {} complete", short_id); Ok(()) } /// Pull assets from local CAN service and send them to the peer. async fn send_assets( can: &CanSyncClient, send: &mut iroh::endpoint::SendStream, hashes: &[String], peer_short: &str, ) -> Result<()> { for chunk in hashes.chunks(10) { let pull_resp = can .pull(chunk.to_vec()) .await .context("pulling assets from CAN")?; for bundle in pull_resp.bundles { let hash_short = &bundle.hash[..bundle.hash.len().min(12)]; info!("Sending asset {} to peer {}", hash_short, peer_short); let mut buf = Vec::with_capacity(bundle.encoded_len()); bundle.encode(&mut buf)?; let frame = encode_frame(MSG_ASSET_BUNDLE, &buf); send.write_all(&frame).await?; send.flush().await?; } } Ok(()) } /// Receive assets from peer and push them to local CAN service. /// Returns the list of hashes that were successfully ingested. async fn receive_assets( can: &CanSyncClient, recv: &mut iroh::endpoint::RecvStream, peer_short: &str, ) -> Result> { let mut received = Vec::new(); loop { let (msg_type, payload) = read_frame(recv).await.context("reading asset from peer")?; match msg_type { MSG_DONE => { debug!("Peer {} finished sending assets", peer_short); break; } MSG_ASSET_BUNDLE => { let bundle = AssetBundle::decode(payload.as_slice()).context("decoding asset bundle")?; let hash = bundle.hash.clone(); let hash_short = hash[..hash.len().min(12)].to_string(); info!("Received asset {} from peer {}", hash_short, peer_short); match can.push(bundle).await { Ok(resp) => { if resp.already_existed { debug!("Asset {} already existed locally", hash_short); } else { info!("Ingested asset {} from peer {}", resp.hash, peer_short); } received.push(hash); } Err(e) => { error!("Failed to push asset {} to CAN: {:#}", hash_short, e); } } } MSG_META_UPDATE => { let meta = MetaUpdateRequest::decode(payload.as_slice()) .context("decoding meta update")?; let hash_short = meta.hash[..meta.hash.len().min(12)].to_string(); debug!( "Received meta update for {} from peer {}", hash_short, peer_short ); if let Err(e) = can .update_meta( meta.hash.clone(), meta.description.clone(), meta.tags.clone(), meta.is_trashed, ) .await { error!("Failed to update meta for {}: {:#}", hash_short, e); } } other => { warn!("Unknown message type {} from peer {}", other, peer_short); } } } Ok(received) } /// Handle an incoming connection from a peer who connected to us. pub async fn handle_incoming( conn: Connection, can: CanSyncClient, _poll_interval: std::time::Duration, ) { let peer_id = conn.remote_id(); let short_id = peer_id.fmt_short().to_string(); info!("Incoming sync connection from {}", short_id); if let Err(e) = run_sync_session(conn.clone(), can.clone(), false).await { error!("Sync session with {} failed: {:#}", short_id, e); return; } info!("Initial sync with {} complete, starting live sync", short_id); run_live_sync(conn, can).await; } /// Run both live sync loops (push + receive) concurrently. /// /// Uses SSE events from CAN service for instant push (no polling). /// Uses an unbounded channel to prevent the "echo" problem. pub async fn run_live_sync( conn: Connection, can: CanSyncClient, ) { let short_id = conn.remote_id().fmt_short().to_string(); // Channel for receive loop to notify push loop about received hashes let (received_tx, received_rx) = mpsc::unbounded_channel::(); // Subscribe to SSE events from local CAN service let sse_rx = can.subscribe_events(); // Run push loop and receive loop concurrently — when either ends, we're done tokio::select! { result = live_push_loop(conn.clone(), can.clone(), received_rx, sse_rx) => { if let Err(e) = result { warn!("Live push loop with {} ended: {:#}", short_id, e); } } result = live_receive_loop(conn, can, received_tx) => { if let Err(e) = result { warn!("Live receive loop with {} ended: {:#}", short_id, e); } } } } /// Wait for SSE events from local CAN service and push new assets to the peer. /// /// Drains the `received_rx` channel to learn about hashes that arrived from /// the peer, so we don't echo them back. /// /// Falls back to incremental poll if no SSE events arrive within 30s. async fn live_push_loop( conn: Connection, can: CanSyncClient, mut received_rx: mpsc::UnboundedReceiver, mut sse_rx: mpsc::UnboundedReceiver, ) -> Result<()> { let peer_id = conn.remote_id(); let short_id = peer_id.fmt_short().to_string(); info!("Starting live push loop with {} (SSE-driven)", short_id); // Track what we've already synced (local + received from peer) let resp = can.get_hashes().await?; let mut max_timestamp: i64 = resp.assets.iter().map(|a| a.timestamp).max().unwrap_or(0); let mut known_hashes: HashSet = resp.assets.into_iter().map(|a| a.hash).collect(); // Fallback: if no SSE event in 30s, do an incremental poll to catch gaps let fallback_interval = std::time::Duration::from_secs(30); loop { // Wait for SSE event, or fallback timeout let new_hashes: Vec = tokio::select! { event = sse_rx.recv() => { match event { Some(evt) => { // Drain any additional events that arrived at the same time let mut batch = vec![evt]; while let Ok(more) = sse_rx.try_recv() { batch.push(more); } // Drain received-from-peer hashes (echo prevention) while let Ok(hash) = received_rx.try_recv() { known_hashes.insert(hash); } // Filter to only truly new hashes batch .into_iter() .filter(|e| { if e.timestamp > max_timestamp { max_timestamp = e.timestamp; } !known_hashes.contains(&e.hash) }) .map(|e| e.hash) .collect() } None => { warn!("SSE channel closed, stopping push loop"); break; } } } // Fallback: periodic incremental poll _ = tokio::time::sleep(fallback_interval) => { debug!("Fallback incremental poll (no SSE events in {}s)", fallback_interval.as_secs()); while let Ok(hash) = received_rx.try_recv() { known_hashes.insert(hash); } match can.get_hashes_since(max_timestamp).await { Ok(resp) => { resp.assets .into_iter() .filter(|a| { if a.timestamp > max_timestamp { max_timestamp = a.timestamp; } !known_hashes.contains(&a.hash) }) .map(|a| a.hash) .collect() } Err(e) => { warn!("Fallback poll failed: {:#}", e); continue; } } } }; if new_hashes.is_empty() { continue; } info!( "Pushing {} new assets to peer {}", new_hashes.len(), short_id ); // Open a new QUIC stream for this batch match conn.open_bi().await { Ok((mut send, _recv)) => { if let Err(e) = send_assets(&can, &mut send, &new_hashes, &short_id).await { error!("Failed to push new assets to {}: {:#}", short_id, e); } let done_frame = encode_frame(MSG_DONE, &[]); let _ = send.write_all(&done_frame).await; let _ = send.flush().await; let _ = send.finish(); } Err(e) => { warn!("Failed to open stream to {}: {:#}", short_id, e); break; // Connection probably dead } } // Update known set for h in new_hashes { known_hashes.insert(h); } } Ok(()) } /// Accept incoming QUIC bi-streams from the peer and receive assets. async fn live_receive_loop( conn: Connection, can: CanSyncClient, received_tx: mpsc::UnboundedSender, ) -> Result<()> { let peer_id = conn.remote_id(); let short_id = peer_id.fmt_short().to_string(); info!("Starting live receive loop with {}", short_id); loop { match conn.accept_bi().await { Ok((_send, mut recv)) => { info!("Accepted live sync stream from peer {}", short_id); match receive_assets(&can, &mut recv, &short_id).await { Ok(received_hashes) => { for hash in received_hashes { let _ = received_tx.send(hash); } } Err(e) => { warn!("Error receiving live assets from {}: {:#}", short_id, e); } } } Err(e) => { info!("Live receive loop: connection to {} closed: {:#}", short_id, e); break; } } } Ok(()) }