Fix echo bounce and initial sync deadlock in peer protocol

Add mpsc channel between live receive and push loops so hashes
received from a peer aren't pushed right back (echo prevention).
Change initial reconciliation to use tokio::join! for concurrent
send/receive, avoiding QUIC flow-control deadlock when both sides
have large transfers. Update known_hashes to union-insert so
peer-received hashes persist across poll cycles.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
Jason Tudisco 2026-03-12 16:11:47 -06:00
parent 06c01baf44
commit ccce1474d4

View File

@ -3,8 +3,12 @@
//! When two sync agents connect, they:
//! 1. Exchange hash lists (from their local CAN services)
//! 2. Compute the diff (what each side is missing)
//! 3. Send missing assets to each other
//! 3. Send/receive missing assets concurrently (avoids deadlock)
//! 4. Continue polling for new assets and pushing them
//!
//! The live sync uses an unbounded channel to share received hashes
//! from the receive loop to the push loop, preventing "echo" where
//! an asset received from a peer gets pushed right back to them.
use std::collections::{HashMap, HashSet};
@ -12,6 +16,7 @@ use anyhow::{Context, Result};
use iroh::endpoint::Connection;
use prost::Message;
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::sync::mpsc;
use tracing::{debug, error, info, warn};
use crate::can_client::CanSyncClient;
@ -52,8 +57,9 @@ async fn read_frame(recv: &mut iroh::endpoint::RecvStream) -> Result<(u8, Vec<u8
/// Run a full sync session with a connected peer.
///
/// This handles both initial reconciliation and ongoing transfer.
/// Called for both outgoing connections (we initiated) and incoming connections.
/// This handles initial reconciliation: exchange hash lists, compute diffs,
/// then send/receive missing assets **concurrently** to avoid deadlock when
/// both sides have large amounts of data to transfer.
pub async fn run_sync_session(
conn: Connection,
can: CanSyncClient,
@ -133,18 +139,26 @@ pub async fn run_sync_session(
they_need.len()
);
// Step 5: Send assets the peer is missing
// Step 5+6: Send and receive assets CONCURRENTLY to avoid deadlock.
//
// If both sides have many large assets, doing send-then-receive sequentially
// can deadlock: QUIC flow control stalls the writer because the reader hasn't
// drained its buffer (it's busy writing). Running both in parallel avoids this.
let send_fut = async {
if !they_need.is_empty() {
send_assets(&can, &mut send, &they_need, &short_id).await?;
}
// Send DONE marker
let done_frame = encode_frame(MSG_DONE, &[]);
send.write_all(&done_frame).await?;
send.flush().await?;
send.write_all(&done_frame).await.context("sending DONE")?;
send.flush().await.context("flushing after DONE")?;
Ok::<_, anyhow::Error>(())
};
// Step 6: Receive assets we're missing
receive_assets(&can, &mut recv, &short_id).await?;
let recv_fut = receive_assets(&can, &mut recv, &short_id);
let (send_result, recv_result) = tokio::join!(send_fut, recv_fut);
send_result.context("sending assets to peer")?;
recv_result.context("receiving assets from peer")?;
info!("Sync session with {} complete", short_id);
Ok(())
@ -179,11 +193,14 @@ async fn send_assets(
}
/// Receive assets from peer and push them to local CAN service.
/// Returns the list of hashes that were successfully ingested.
async fn receive_assets(
can: &CanSyncClient,
recv: &mut iroh::endpoint::RecvStream,
peer_short: &str,
) -> Result<()> {
) -> Result<Vec<String>> {
let mut received = Vec::new();
loop {
let (msg_type, payload) = read_frame(recv).await.context("reading asset from peer")?;
@ -195,7 +212,8 @@ async fn receive_assets(
MSG_ASSET_BUNDLE => {
let bundle =
AssetBundle::decode(payload.as_slice()).context("decoding asset bundle")?;
let hash_short = bundle.hash[..bundle.hash.len().min(12)].to_string();
let hash = bundle.hash.clone();
let hash_short = hash[..hash.len().min(12)].to_string();
info!("Received asset {} from peer {}", hash_short, peer_short);
match can.push(bundle).await {
@ -205,6 +223,7 @@ async fn receive_assets(
} else {
info!("Ingested asset {} from peer {}", resp.hash, peer_short);
}
received.push(hash);
}
Err(e) => {
error!("Failed to push asset {} to CAN: {:#}", hash_short, e);
@ -237,7 +256,7 @@ async fn receive_assets(
}
}
}
Ok(())
Ok(received)
}
/// Handle an incoming connection from a peer who connected to us.
@ -263,7 +282,9 @@ pub async fn handle_incoming(
/// Run both live sync loops (push + receive) concurrently.
///
/// This should be called after initial reconciliation is complete.
/// Uses an unbounded channel to prevent the "echo" problem: when we receive
/// an asset from the peer, we tell the push loop about it so it doesn't
/// push the same asset right back.
pub async fn run_live_sync(
conn: Connection,
can: CanSyncClient,
@ -271,14 +292,17 @@ pub async fn run_live_sync(
) {
let short_id = conn.remote_id().fmt_short().to_string();
// Channel for receive loop to notify push loop about received hashes
let (received_tx, received_rx) = mpsc::unbounded_channel::<String>();
// Run push loop and receive loop concurrently — when either ends, we're done
tokio::select! {
result = live_push_loop(conn.clone(), can.clone(), poll_interval) => {
result = live_push_loop(conn.clone(), can.clone(), poll_interval, received_rx) => {
if let Err(e) = result {
warn!("Live push loop with {} ended: {:#}", short_id, e);
}
}
result = live_receive_loop(conn, can) => {
result = live_receive_loop(conn, can, received_tx) => {
if let Err(e) = result {
warn!("Live receive loop with {} ended: {:#}", short_id, e);
}
@ -287,16 +311,20 @@ pub async fn run_live_sync(
}
/// Poll for new local assets and push them to the peer via new QUIC streams.
///
/// Drains the `received_rx` channel each tick to learn about hashes that arrived
/// from the peer, so we don't echo them back.
async fn live_push_loop(
conn: Connection,
can: CanSyncClient,
poll_interval: std::time::Duration,
mut received_rx: mpsc::UnboundedReceiver<String>,
) -> Result<()> {
let peer_id = conn.remote_id();
let short_id = peer_id.fmt_short().to_string();
info!("Starting live push loop with {}", short_id);
// Track what we've already synced
// Track what we've already synced (local + received from peer)
let mut known_hashes: HashSet<String> = {
let resp = can.get_hashes().await?;
resp.assets.into_iter().map(|a| a.hash).collect()
@ -307,6 +335,12 @@ async fn live_push_loop(
loop {
interval.tick().await;
// Drain any hashes that the receive loop got from the peer.
// This prevents us from pushing them right back (echo prevention).
while let Ok(hash) = received_rx.try_recv() {
known_hashes.insert(hash);
}
// Poll for new assets
let resp = match can.get_hashes().await {
Ok(r) => r,
@ -349,8 +383,10 @@ async fn live_push_loop(
}
}
// Update our known set
known_hashes = current_hashes;
// Update our known set (union with current so we keep peer-received hashes too)
for h in current_hashes {
known_hashes.insert(h);
}
}
Ok(())
@ -359,10 +395,12 @@ async fn live_push_loop(
/// Accept incoming QUIC bi-streams from the peer and receive assets.
///
/// The peer's live_push_loop opens new bi-streams for each batch of assets.
/// This loop accepts those streams and ingests the assets into the local CAN service.
/// This loop accepts those streams, ingests assets into the local CAN service,
/// and notifies the push loop via channel to prevent echo.
async fn live_receive_loop(
conn: Connection,
can: CanSyncClient,
received_tx: mpsc::UnboundedSender<String>,
) -> Result<()> {
let peer_id = conn.remote_id();
let short_id = peer_id.fmt_short().to_string();
@ -372,10 +410,18 @@ async fn live_receive_loop(
match conn.accept_bi().await {
Ok((_send, mut recv)) => {
info!("Accepted live sync stream from peer {}", short_id);
if let Err(e) = receive_assets(&can, &mut recv, &short_id).await {
match receive_assets(&can, &mut recv, &short_id).await {
Ok(received_hashes) => {
// Notify push loop about received hashes to prevent echo
for hash in received_hashes {
let _ = received_tx.send(hash);
}
}
Err(e) => {
warn!("Error receiving live assets from {}: {:#}", short_id, e);
}
}
}
Err(e) => {
info!("Live receive loop: connection to {} closed: {:#}", short_id, e);
break;