From ccce1474d45553f2f7edeb4dc37031659373780d Mon Sep 17 00:00:00 2001 From: Jason Tudisco Date: Thu, 12 Mar 2026 16:11:47 -0600 Subject: [PATCH] Fix echo bounce and initial sync deadlock in peer protocol Add mpsc channel between live receive and push loops so hashes received from a peer aren't pushed right back (echo prevention). Change initial reconciliation to use tokio::join! for concurrent send/receive, avoiding QUIC flow-control deadlock when both sides have large transfers. Update known_hashes to union-insert so peer-received hashes persist across poll cycles. Co-Authored-By: Claude Opus 4.6 --- examples/can-sync/src/peer.rs | 96 ++++++++++++++++++++++++++--------- 1 file changed, 71 insertions(+), 25 deletions(-) diff --git a/examples/can-sync/src/peer.rs b/examples/can-sync/src/peer.rs index d4c8795..1b0f926 100644 --- a/examples/can-sync/src/peer.rs +++ b/examples/can-sync/src/peer.rs @@ -3,8 +3,12 @@ //! When two sync agents connect, they: //! 1. Exchange hash lists (from their local CAN services) //! 2. Compute the diff (what each side is missing) -//! 3. Send missing assets to each other +//! 3. Send/receive missing assets concurrently (avoids deadlock) //! 4. Continue polling for new assets and pushing them +//! +//! The live sync uses an unbounded channel to share received hashes +//! from the receive loop to the push loop, preventing "echo" where +//! an asset received from a peer gets pushed right back to them. use std::collections::{HashMap, HashSet}; @@ -12,6 +16,7 @@ use anyhow::{Context, Result}; use iroh::endpoint::Connection; use prost::Message; use tokio::io::{AsyncReadExt, AsyncWriteExt}; +use tokio::sync::mpsc; use tracing::{debug, error, info, warn}; use crate::can_client::CanSyncClient; @@ -52,8 +57,9 @@ async fn read_frame(recv: &mut iroh::endpoint::RecvStream) -> Result<(u8, Vec(()) + }; - // Send DONE marker - let done_frame = encode_frame(MSG_DONE, &[]); - send.write_all(&done_frame).await?; - send.flush().await?; + let recv_fut = receive_assets(&can, &mut recv, &short_id); - // Step 6: Receive assets we're missing - receive_assets(&can, &mut recv, &short_id).await?; + let (send_result, recv_result) = tokio::join!(send_fut, recv_fut); + send_result.context("sending assets to peer")?; + recv_result.context("receiving assets from peer")?; info!("Sync session with {} complete", short_id); Ok(()) @@ -179,11 +193,14 @@ async fn send_assets( } /// Receive assets from peer and push them to local CAN service. +/// Returns the list of hashes that were successfully ingested. async fn receive_assets( can: &CanSyncClient, recv: &mut iroh::endpoint::RecvStream, peer_short: &str, -) -> Result<()> { +) -> Result> { + let mut received = Vec::new(); + loop { let (msg_type, payload) = read_frame(recv).await.context("reading asset from peer")?; @@ -195,7 +212,8 @@ async fn receive_assets( MSG_ASSET_BUNDLE => { let bundle = AssetBundle::decode(payload.as_slice()).context("decoding asset bundle")?; - let hash_short = bundle.hash[..bundle.hash.len().min(12)].to_string(); + let hash = bundle.hash.clone(); + let hash_short = hash[..hash.len().min(12)].to_string(); info!("Received asset {} from peer {}", hash_short, peer_short); match can.push(bundle).await { @@ -205,6 +223,7 @@ async fn receive_assets( } else { info!("Ingested asset {} from peer {}", resp.hash, peer_short); } + received.push(hash); } Err(e) => { error!("Failed to push asset {} to CAN: {:#}", hash_short, e); @@ -237,7 +256,7 @@ async fn receive_assets( } } } - Ok(()) + Ok(received) } /// Handle an incoming connection from a peer who connected to us. @@ -263,7 +282,9 @@ pub async fn handle_incoming( /// Run both live sync loops (push + receive) concurrently. /// -/// This should be called after initial reconciliation is complete. +/// Uses an unbounded channel to prevent the "echo" problem: when we receive +/// an asset from the peer, we tell the push loop about it so it doesn't +/// push the same asset right back. pub async fn run_live_sync( conn: Connection, can: CanSyncClient, @@ -271,14 +292,17 @@ pub async fn run_live_sync( ) { let short_id = conn.remote_id().fmt_short().to_string(); + // Channel for receive loop to notify push loop about received hashes + let (received_tx, received_rx) = mpsc::unbounded_channel::(); + // Run push loop and receive loop concurrently — when either ends, we're done tokio::select! { - result = live_push_loop(conn.clone(), can.clone(), poll_interval) => { + result = live_push_loop(conn.clone(), can.clone(), poll_interval, received_rx) => { if let Err(e) = result { warn!("Live push loop with {} ended: {:#}", short_id, e); } } - result = live_receive_loop(conn, can) => { + result = live_receive_loop(conn, can, received_tx) => { if let Err(e) = result { warn!("Live receive loop with {} ended: {:#}", short_id, e); } @@ -287,16 +311,20 @@ pub async fn run_live_sync( } /// Poll for new local assets and push them to the peer via new QUIC streams. +/// +/// Drains the `received_rx` channel each tick to learn about hashes that arrived +/// from the peer, so we don't echo them back. async fn live_push_loop( conn: Connection, can: CanSyncClient, poll_interval: std::time::Duration, + mut received_rx: mpsc::UnboundedReceiver, ) -> Result<()> { let peer_id = conn.remote_id(); let short_id = peer_id.fmt_short().to_string(); info!("Starting live push loop with {}", short_id); - // Track what we've already synced + // Track what we've already synced (local + received from peer) let mut known_hashes: HashSet = { let resp = can.get_hashes().await?; resp.assets.into_iter().map(|a| a.hash).collect() @@ -307,6 +335,12 @@ async fn live_push_loop( loop { interval.tick().await; + // Drain any hashes that the receive loop got from the peer. + // This prevents us from pushing them right back (echo prevention). + while let Ok(hash) = received_rx.try_recv() { + known_hashes.insert(hash); + } + // Poll for new assets let resp = match can.get_hashes().await { Ok(r) => r, @@ -349,8 +383,10 @@ async fn live_push_loop( } } - // Update our known set - known_hashes = current_hashes; + // Update our known set (union with current so we keep peer-received hashes too) + for h in current_hashes { + known_hashes.insert(h); + } } Ok(()) @@ -359,10 +395,12 @@ async fn live_push_loop( /// Accept incoming QUIC bi-streams from the peer and receive assets. /// /// The peer's live_push_loop opens new bi-streams for each batch of assets. -/// This loop accepts those streams and ingests the assets into the local CAN service. +/// This loop accepts those streams, ingests assets into the local CAN service, +/// and notifies the push loop via channel to prevent echo. async fn live_receive_loop( conn: Connection, can: CanSyncClient, + received_tx: mpsc::UnboundedSender, ) -> Result<()> { let peer_id = conn.remote_id(); let short_id = peer_id.fmt_short().to_string(); @@ -372,8 +410,16 @@ async fn live_receive_loop( match conn.accept_bi().await { Ok((_send, mut recv)) => { info!("Accepted live sync stream from peer {}", short_id); - if let Err(e) = receive_assets(&can, &mut recv, &short_id).await { - warn!("Error receiving live assets from {}: {:#}", short_id, e); + match receive_assets(&can, &mut recv, &short_id).await { + Ok(received_hashes) => { + // Notify push loop about received hashes to prevent echo + for hash in received_hashes { + let _ = received_tx.send(hash); + } + } + Err(e) => { + warn!("Error receiving live assets from {}: {:#}", short_id, e); + } } } Err(e) => {