From 06c01baf44ae6c46062f421e763fe5580e3a16fc Mon Sep 17 00:00:00 2001 From: Jason Tudisco Date: Thu, 12 Mar 2026 15:20:49 -0600 Subject: [PATCH] Fix QUIC stream protocol bugs and add integration tests MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Fix bidirectional stream handling: responder uses accept_bi() instead of open_bi() so both sides communicate on the same stream - Add live_receive_loop to accept incoming bi-streams during ongoing sync (peer's push loop opens new streams per batch) - Split live_sync_loop into live_push_loop + live_receive_loop running concurrently via tokio::select in new run_live_sync() - Update handle_incoming to run live sync after initial reconciliation - Add direct peer connection via ticket files (EndpointAddr JSON exchange) for local testing without gossip bootstrap - Add CAN_PORT env var override for running multiple CAN instances - Add integration test binary (sync_test.rs): starts 2 CAN services + 2 sync agents, ingests files on each side, verifies bidirectional sync with 4 test cases (A→B, B→A, batch, count match) - Add PowerShell script (run-integration-test.ps1) for one-command test Co-Authored-By: Claude Opus 4.6 --- examples/can-sync/Cargo.lock | 21 + examples/can-sync/Cargo.toml | 9 +- examples/can-sync/run-integration-test.ps1 | 75 +++ examples/can-sync/src/config.rs | 10 + examples/can-sync/src/main.rs | 91 ++- examples/can-sync/src/peer.rs | 85 ++- examples/can-sync/tests/sync_test.rs | 616 +++++++++++++++++++++ src/main.rs | 6 +- 8 files changed, 890 insertions(+), 23 deletions(-) create mode 100644 examples/can-sync/run-integration-test.ps1 create mode 100644 examples/can-sync/tests/sync_test.rs diff --git a/examples/can-sync/Cargo.lock b/examples/can-sync/Cargo.lock index 20c99d8..512b845 100644 --- a/examples/can-sync/Cargo.lock +++ b/examples/can-sync/Cargo.lock @@ -234,9 +234,12 @@ dependencies = [ "iroh-gossip", "n0-future 0.1.3", "prost", + "rand", "reqwest 0.12.28", "serde", + "serde_json", "serde_yaml", + "tempfile", "tokio", "tracing", "tracing-subscriber", @@ -1928,6 +1931,16 @@ version = "0.3.17" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6877bb514081ee2a7ff5ef9de3281f14a4dd4bceac4c09388074a6b5df8a139a" +[[package]] +name = "mime_guess" +version = "2.0.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f7c44f8e672c00fe5308fa235f821cb4198414e1c77935c1ab6948d3fd78550e" +dependencies = [ + "mime", + "unicase", +] + [[package]] name = "mio" version = "1.1.1" @@ -2794,6 +2807,7 @@ dependencies = [ "base64", "bytes", "encoding_rs", + "futures-channel", "futures-core", "futures-util", "h2", @@ -2807,6 +2821,7 @@ dependencies = [ "js-sys", "log", "mime", + "mime_guess", "native-tls", "percent-encoding", "pin-project-lite", @@ -3769,6 +3784,12 @@ version = "1.19.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "562d481066bde0658276a35467c4af00bdc6ee726305698a55b86e61d7ad82bb" +[[package]] +name = "unicase" +version = "2.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dbc4bc3a9f746d862c45cb89d705aa10f187bb96c76001afab07a0d35ce60142" + [[package]] name = "unicode-ident" version = "1.0.24" diff --git a/examples/can-sync/Cargo.toml b/examples/can-sync/Cargo.toml index 1f6d75e..78842a6 100644 --- a/examples/can-sync/Cargo.toml +++ b/examples/can-sync/Cargo.toml @@ -8,6 +8,10 @@ description = "P2P sync agent for CAN service — full mirror replication via ir name = "can-sync" path = "src/main.rs" +[[bin]] +name = "sync-test" +path = "tests/sync_test.rs" + [dependencies] # P2P networking (iroh for transport + gossip for discovery — NO iroh-docs) iroh = "0.96" @@ -17,7 +21,7 @@ iroh-gossip = "0.96" prost = "0.13" # HTTP client for CAN service sync API -reqwest = { version = "0.12", features = ["json"] } +reqwest = { version = "0.12", features = ["json", "multipart", "blocking"] } # Serialization serde = { version = "1", features = ["derive"] } @@ -40,3 +44,6 @@ n0-future = "0.1" anyhow = "1" bytes = "1" hex = "0.4" +serde_json = "1" +tempfile = "3" +rand = "0.9" diff --git a/examples/can-sync/run-integration-test.ps1 b/examples/can-sync/run-integration-test.ps1 new file mode 100644 index 0000000..82db5aa --- /dev/null +++ b/examples/can-sync/run-integration-test.ps1 @@ -0,0 +1,75 @@ +#!/usr/bin/env pwsh +# CAN Sync v2 Integration Test Runner +# +# Usage: +# .\run-integration-test.ps1 # Build + run test +# .\run-integration-test.ps1 -NoBuild # Skip building, just run + +param( + [switch]$NoBuild +) + +$ErrorActionPreference = "Stop" +$canServiceRoot = Resolve-Path (Join-Path $PSScriptRoot "../..") +$canSyncRoot = $PSScriptRoot + +Write-Host "" +Write-Host "========================================" -ForegroundColor Cyan +Write-Host " CAN Sync v2 - Integration Test Runner" -ForegroundColor Cyan +Write-Host "========================================" -ForegroundColor Cyan +Write-Host "" + +# Step 1: Build CAN service +if (-not $NoBuild) { + Write-Host "[1/3] Building CAN service..." -ForegroundColor Yellow + Push-Location $canServiceRoot + try { + cargo build 2>&1 | ForEach-Object { Write-Host " $_" -ForegroundColor DarkGray } + if ($LASTEXITCODE -ne 0) { + Write-Host "FAILED: CAN service build failed!" -ForegroundColor Red + exit 1 + } + Write-Host " CAN service built OK" -ForegroundColor Green + } finally { + Pop-Location + } + + # Step 2: Build can-sync + sync-test + Write-Host "" + Write-Host "[2/3] Building can-sync and sync-test..." -ForegroundColor Yellow + Push-Location $canSyncRoot + try { + cargo build --bin can-sync --bin sync-test 2>&1 | ForEach-Object { Write-Host " $_" -ForegroundColor DarkGray } + if ($LASTEXITCODE -ne 0) { + Write-Host "FAILED: can-sync build failed!" -ForegroundColor Red + exit 1 + } + Write-Host " can-sync built OK" -ForegroundColor Green + } finally { + Pop-Location + } +} else { + Write-Host "[SKIP] Builds skipped (-NoBuild)" -ForegroundColor DarkYellow +} + +# Step 3: Run integration test +Write-Host "" +Write-Host "[3/3] Running integration test..." -ForegroundColor Yellow +Write-Host "" + +Push-Location $canSyncRoot +try { + cargo run --bin sync-test + $testResult = $LASTEXITCODE +} finally { + Pop-Location +} + +Write-Host "" +if ($testResult -eq 0) { + Write-Host "ALL TESTS PASSED" -ForegroundColor Green +} else { + Write-Host "SOME TESTS FAILED (exit code: $testResult)" -ForegroundColor Red +} + +exit $testResult diff --git a/examples/can-sync/src/config.rs b/examples/can-sync/src/config.rs index 5ac9a7a..8c00932 100644 --- a/examples/can-sync/src/config.rs +++ b/examples/can-sync/src/config.rs @@ -15,6 +15,16 @@ pub struct SyncConfig { /// Seconds between polls for new local assets #[serde(default = "default_poll_interval")] pub poll_interval_secs: u64, + + /// Optional: path to write this node's ticket to (for direct connection) + #[serde(default)] + pub ticket_file: Option, + + /// Optional: path to a file containing a peer's node ticket (for direct connection). + /// If set, the agent will read this ticket and connect directly instead of waiting + /// for gossip discovery. The file is polled until it exists and is non-empty. + #[serde(default)] + pub connect_ticket_file: Option, } fn default_poll_interval() -> u64 { diff --git a/examples/can-sync/src/main.rs b/examples/can-sync/src/main.rs index 7b6c3d2..71f0681 100644 --- a/examples/can-sync/src/main.rs +++ b/examples/can-sync/src/main.rs @@ -12,9 +12,11 @@ mod discovery; mod peer; mod protocol; +use std::path::Path; + use anyhow::{Context, Result}; use iroh::endpoint::presets::N0; -use iroh::{Endpoint, EndpointId}; +use iroh::{Endpoint, EndpointAddr, EndpointId}; use iroh_gossip::net::Gossip; use tokio::sync::mpsc; use tracing::{error, info, warn}; @@ -40,7 +42,7 @@ async fn main() -> Result<()> { let config_path = std::env::args() .nth(1) .unwrap_or_else(|| "config.yaml".to_string()); - let config = SyncConfig::load(std::path::Path::new(&config_path)) + let config = SyncConfig::load(Path::new(&config_path)) .with_context(|| format!("loading config from {}", config_path))?; info!("CAN Sync v2 starting"); @@ -73,32 +75,102 @@ async fn main() -> Result<()> { info!("Listening on {}", addr); } + // Write our EndpointAddr to file if configured (for direct peer connection in tests) + if let Some(ref ticket_path) = config.ticket_file { + // Wait briefly for the endpoint to register with relay + tokio::time::sleep(std::time::Duration::from_millis(500)).await; + + let addr = endpoint.addr(); + let addr_json = serde_json::to_string(&addr) + .context("serializing EndpointAddr")?; + std::fs::write(ticket_path, &addr_json) + .with_context(|| format!("writing addr to {}", ticket_path))?; + info!("Wrote EndpointAddr to {}", ticket_path); + } + // Create gossip instance for peer discovery (not async — returns directly) let gossip = Gossip::builder().spawn(endpoint.clone()); // Channel for discovered peers let (peer_tx, mut peer_rx) = mpsc::channel::(32); - // Spawn discovery + // Spawn discovery via gossip let disc = Discovery::new(endpoint.clone(), gossip.clone(), &config.sync_passphrase); tokio::spawn(async move { - if let Err(e) = disc.run(peer_tx).await { + if let Err(e) = disc.run(peer_tx.clone()).await { error!("Discovery failed: {:#}", e); } }); + // If a direct connect ticket file is specified, spawn a task to read it and connect + if let Some(ref ticket_path) = config.connect_ticket_file { + let ticket_path = ticket_path.clone(); + let endpoint_direct = endpoint.clone(); + let can_direct = can.clone(); + let poll_interval = std::time::Duration::from_secs(config.poll_interval_secs); + + tokio::spawn(async move { + info!("Waiting for peer addr file: {}", ticket_path); + + // Poll until the file exists and is non-empty + let addr_json = loop { + match std::fs::read_to_string(&ticket_path) { + Ok(s) if !s.trim().is_empty() => break s.trim().to_string(), + _ => tokio::time::sleep(std::time::Duration::from_millis(200)).await, + } + }; + + info!("Read peer addr from {}", ticket_path); + + let peer_addr: EndpointAddr = match serde_json::from_str(&addr_json) { + Ok(a) => a, + Err(e) => { + error!("Invalid EndpointAddr JSON: {:#}", e); + return; + } + }; + + let peer_id = peer_addr.id; + let short = peer_id.fmt_short().to_string(); + info!("Direct connecting to peer: {} (from addr file)", short); + + match endpoint_direct.connect(peer_addr, SYNC_ALPN).await { + Ok(conn) => { + info!("Direct connection to {} established!", short); + + // Initial reconciliation + if let Err(e) = peer::run_sync_session(conn.clone(), can_direct.clone(), true).await { + error!("Initial sync with {} failed: {:#}", short, e); + return; + } + + info!("Initial sync with {} complete, starting live sync", short); + + // Live sync: push new local assets + accept incoming streams + peer::run_live_sync(conn, can_direct, poll_interval).await; + } + Err(e) => { + error!("Failed to connect to {}: {:#}", short, e); + } + } + }); + } + // Spawn incoming connection handler let endpoint_accept = endpoint.clone(); let can_accept = can.clone(); + let accept_poll_interval = std::time::Duration::from_secs(config.poll_interval_secs); tokio::spawn(async move { loop { match endpoint_accept.accept().await { Some(incoming) => { let can_clone = can_accept.clone(); + let poll_dur = accept_poll_interval; tokio::spawn(async move { match incoming.await { Ok(conn) => { - peer::handle_incoming(conn, can_clone).await; + info!("Accepted incoming connection from {}", conn.remote_id().fmt_short()); + peer::handle_incoming(conn, can_clone, poll_dur).await; } Err(e) => { warn!("Failed to accept connection: {:#}", e); @@ -114,7 +186,7 @@ async fn main() -> Result<()> { } }); - // Main loop: connect to discovered peers and sync + // Main loop: connect to discovered peers (from gossip) and sync let poll_interval = std::time::Duration::from_secs(config.poll_interval_secs); info!("Waiting for peers..."); @@ -128,7 +200,6 @@ async fn main() -> Result<()> { let poll_dur = poll_interval; tokio::spawn(async move { - // Connect to peer (EndpointId implements Into) let conn = match endpoint_clone.connect(peer_id, SYNC_ALPN).await { Ok(c) => c, Err(e) => { @@ -137,16 +208,12 @@ async fn main() -> Result<()> { } }; - // Initial reconciliation if let Err(e) = peer::run_sync_session(conn.clone(), can_clone.clone(), true).await { error!("Initial sync with {} failed: {:#}", short, e); return; } - // Live sync loop — keep pushing new assets - if let Err(e) = peer::live_sync_loop(conn, can_clone, poll_dur).await { - warn!("Live sync with {} ended: {:#}", short, e); - } + peer::run_live_sync(conn, can_clone, poll_dur).await; }); } diff --git a/examples/can-sync/src/peer.rs b/examples/can-sync/src/peer.rs index d583a28..d4c8795 100644 --- a/examples/can-sync/src/peer.rs +++ b/examples/can-sync/src/peer.rs @@ -63,8 +63,12 @@ pub async fn run_sync_session( let short_id = peer_id.fmt_short().to_string(); info!("Starting sync session with {} (initiator={})", short_id, is_initiator); - // Open a bi-directional stream for the sync protocol - let (mut send, mut recv) = conn.open_bi().await.context("opening bi stream")?; + // Initiator opens the stream, responder accepts it + let (mut send, mut recv) = if is_initiator { + conn.open_bi().await.context("opening bi stream")? + } else { + conn.accept_bi().await.context("accepting bi stream")? + }; // Step 1: Get our local hash list from CAN service let our_hashes = can.get_hashes().await.context("getting local hashes")?; @@ -237,27 +241,60 @@ async fn receive_assets( } /// Handle an incoming connection from a peer who connected to us. -pub async fn handle_incoming(conn: Connection, can: CanSyncClient) { +pub async fn handle_incoming( + conn: Connection, + can: CanSyncClient, + poll_interval: std::time::Duration, +) { let peer_id = conn.remote_id(); let short_id = peer_id.fmt_short().to_string(); info!("Incoming sync connection from {}", short_id); - if let Err(e) = run_sync_session(conn, can, false).await { + if let Err(e) = run_sync_session(conn.clone(), can.clone(), false).await { error!("Sync session with {} failed: {:#}", short_id, e); + return; + } + + info!("Initial sync with {} complete, starting live sync", short_id); + + // Run both live loops concurrently + run_live_sync(conn, can, poll_interval).await; +} + +/// Run both live sync loops (push + receive) concurrently. +/// +/// This should be called after initial reconciliation is complete. +pub async fn run_live_sync( + conn: Connection, + can: CanSyncClient, + poll_interval: std::time::Duration, +) { + let short_id = conn.remote_id().fmt_short().to_string(); + + // Run push loop and receive loop concurrently — when either ends, we're done + tokio::select! { + result = live_push_loop(conn.clone(), can.clone(), poll_interval) => { + if let Err(e) = result { + warn!("Live push loop with {} ended: {:#}", short_id, e); + } + } + result = live_receive_loop(conn, can) => { + if let Err(e) = result { + warn!("Live receive loop with {} ended: {:#}", short_id, e); + } + } } } -/// Run the live sync loop: poll for new local assets and push to peer. -/// -/// This runs after initial reconciliation and keeps peers in sync. -pub async fn live_sync_loop( +/// Poll for new local assets and push them to the peer via new QUIC streams. +async fn live_push_loop( conn: Connection, can: CanSyncClient, poll_interval: std::time::Duration, ) -> Result<()> { let peer_id = conn.remote_id(); let short_id = peer_id.fmt_short().to_string(); - info!("Starting live sync loop with {}", short_id); + info!("Starting live push loop with {}", short_id); // Track what we've already synced let mut known_hashes: HashSet = { @@ -318,3 +355,33 @@ pub async fn live_sync_loop( Ok(()) } + +/// Accept incoming QUIC bi-streams from the peer and receive assets. +/// +/// The peer's live_push_loop opens new bi-streams for each batch of assets. +/// This loop accepts those streams and ingests the assets into the local CAN service. +async fn live_receive_loop( + conn: Connection, + can: CanSyncClient, +) -> Result<()> { + let peer_id = conn.remote_id(); + let short_id = peer_id.fmt_short().to_string(); + info!("Starting live receive loop with {}", short_id); + + loop { + match conn.accept_bi().await { + Ok((_send, mut recv)) => { + info!("Accepted live sync stream from peer {}", short_id); + if let Err(e) = receive_assets(&can, &mut recv, &short_id).await { + warn!("Error receiving live assets from {}: {:#}", short_id, e); + } + } + Err(e) => { + info!("Live receive loop: connection to {} closed: {:#}", short_id, e); + break; + } + } + } + + Ok(()) +} diff --git a/examples/can-sync/tests/sync_test.rs b/examples/can-sync/tests/sync_test.rs new file mode 100644 index 0000000..cdebea0 --- /dev/null +++ b/examples/can-sync/tests/sync_test.rs @@ -0,0 +1,616 @@ +//! Integration test for CAN Sync v2 +//! +//! Starts two CAN service instances + two sync agents, ingests files on each +//! side, and verifies bidirectional sync. +//! +//! Usage: +//! cargo run --bin sync-test +//! +//! Prerequisites: +//! CAN service must be built: `cargo build` in the CanService root + +use std::path::{Path, PathBuf}; +use std::process::{Child, Command, Stdio}; +use std::time::{Duration, Instant}; + +use rand::Rng; +use serde_json::Value; +use tempfile::TempDir; + +// ── Configuration ──────────────────────────────────────────────────────── + +const CAN_A_PORT: u16 = 13210; +const CAN_B_PORT: u16 = 13220; +const SYNC_KEY: &str = "test-sync-key-42"; +const SYNC_PASSPHRASE: &str = "integration-test-passphrase"; +const SYNC_TIMEOUT: Duration = Duration::from_secs(60); +const POLL_INTERVAL: Duration = Duration::from_millis(500); + +// ── Process management ─────────────────────────────────────────────────── + +struct ManagedProcess { + child: Child, + name: String, +} + +impl ManagedProcess { + fn spawn( + name: &str, + cmd: &str, + args: &[&str], + envs: &[(&str, &str)], + log_dir: &Path, + ) -> Self { + println!(" Starting {} ...", name); + let mut command = Command::new(cmd); + + let log_file = std::fs::File::create(log_dir.join(format!("{}.log", name))) + .expect("create log file"); + let log_file_clone = log_file.try_clone().expect("clone log file handle"); + command + .args(args) + .stdout(Stdio::from(log_file)) + .stderr(Stdio::from(log_file_clone)) + .env("RUST_LOG", "can_sync=debug,can_service=debug,iroh=info,iroh_gossip=info"); + + for (k, v) in envs { + command.env(k, v); + } + let child = command + .spawn() + .unwrap_or_else(|e| panic!("Failed to start {}: {}", name, e)); + println!(" {} started (pid={})", name, child.id()); + Self { + child, + name: name.to_string(), + } + } + + fn kill(&mut self) { + let _ = self.child.kill(); + let _ = self.child.wait(); + println!(" {} stopped", self.name); + } +} + +impl Drop for ManagedProcess { + fn drop(&mut self) { + self.kill(); + } +} + +// ── Test harness ───────────────────────────────────────────────────────── + +struct TestHarness { + _can_a: ManagedProcess, + _can_b: ManagedProcess, + _sync_a: ManagedProcess, + _sync_b: ManagedProcess, + _tmp_a: TempDir, + _tmp_b: TempDir, + _tmp_sync_a: TempDir, + _tmp_sync_b: TempDir, + log_dir: TempDir, + can_a_url: String, + can_b_url: String, +} + +impl TestHarness { + fn new(can_service_bin: &Path) -> Self { + println!("\n=== Setting up test harness ===\n"); + + // Create temp directories + let tmp_a = TempDir::new().expect("create temp dir A"); + let tmp_b = TempDir::new().expect("create temp dir B"); + let tmp_sync_a = TempDir::new().expect("create temp dir sync A"); + let tmp_sync_b = TempDir::new().expect("create temp dir sync B"); + let log_dir = TempDir::new().expect("create log dir"); + + println!(" Logs: {}", log_dir.path().display()); + println!(" CAN A storage: {}", tmp_a.path().display()); + println!(" CAN B storage: {}", tmp_b.path().display()); + + // Write CAN service configs + let config_a = tmp_a.path().join("config.yaml"); + let config_b = tmp_b.path().join("config.yaml"); + write_can_config(&config_a, tmp_a.path(), SYNC_KEY); + write_can_config(&config_b, tmp_b.path(), SYNC_KEY); + + // Start CAN services + let can_a = ManagedProcess::spawn( + "CAN-A", + can_service_bin.to_str().unwrap(), + &[config_a.to_str().unwrap()], + &[("CAN_PORT", &CAN_A_PORT.to_string())], + log_dir.path(), + ); + let can_b = ManagedProcess::spawn( + "CAN-B", + can_service_bin.to_str().unwrap(), + &[config_b.to_str().unwrap()], + &[("CAN_PORT", &CAN_B_PORT.to_string())], + log_dir.path(), + ); + + let can_a_url = format!("http://127.0.0.1:{}", CAN_A_PORT); + let can_b_url = format!("http://127.0.0.1:{}", CAN_B_PORT); + + // Wait for CAN services to be ready + println!("\n Waiting for CAN services to start..."); + wait_for_http(&can_a_url, Duration::from_secs(10), log_dir.path(), "CAN-A"); + wait_for_http(&can_b_url, Duration::from_secs(10), log_dir.path(), "CAN-B"); + println!(" Both CAN services ready!"); + + // Find can-sync binary + let manifest_dir = PathBuf::from(env!("CARGO_MANIFEST_DIR")); + let can_sync_bin = manifest_dir + .join("target") + .join("debug") + .join("can-sync") + .with_extension(std::env::consts::EXE_EXTENSION); + assert!( + can_sync_bin.exists(), + "can-sync binary not found at {}", + can_sync_bin.display() + ); + println!(" Using can-sync: {}", can_sync_bin.display()); + + // Ticket file paths for direct peer connection + // Sync-A writes its addr, Sync-B reads it and connects directly + let ticket_a = tmp_sync_a.path().join("ticket_a.json"); + let ticket_a_str = ticket_a.to_str().unwrap().replace('\\', "/"); + + // Write sync agent configs with ticket file exchange + let sync_config_a = tmp_sync_a.path().join("config.yaml"); + let sync_config_b = tmp_sync_b.path().join("config.yaml"); + + // Sync-A: write its own ticket + write_sync_config_with_tickets( + &sync_config_a, + &can_a_url, + SYNC_KEY, + SYNC_PASSPHRASE, + Some(&ticket_a_str), // write our ticket here + None, // don't connect to anyone + ); + + // Sync-B: read Sync-A's ticket and connect directly + write_sync_config_with_tickets( + &sync_config_b, + &can_b_url, + SYNC_KEY, + SYNC_PASSPHRASE, + None, // don't write our own ticket + Some(&ticket_a_str), // connect to Sync-A using this ticket + ); + + // Start Sync-A first (it writes the ticket) + let sync_a = ManagedProcess::spawn( + "Sync-A", + can_sync_bin.to_str().unwrap(), + &[sync_config_a.to_str().unwrap()], + &[], + log_dir.path(), + ); + + // Wait for Sync-A to write its ticket file + println!(" Waiting for Sync-A to write ticket..."); + let ticket_start = Instant::now(); + loop { + if ticket_start.elapsed() > Duration::from_secs(15) { + print_log(log_dir.path(), "Sync-A"); + panic!("Timeout waiting for Sync-A ticket file"); + } + if let Ok(contents) = std::fs::read_to_string(&ticket_a) { + if !contents.trim().is_empty() { + println!(" Sync-A ticket ready ({} bytes)", contents.len()); + break; + } + } + std::thread::sleep(Duration::from_millis(100)); + } + + // Start Sync-B (it will read Sync-A's ticket and connect) + let sync_b = ManagedProcess::spawn( + "Sync-B", + can_sync_bin.to_str().unwrap(), + &[sync_config_b.to_str().unwrap()], + &[], + log_dir.path(), + ); + + // Wait for peers to connect + println!(" Waiting for sync agents to connect..."); + std::thread::sleep(Duration::from_secs(5)); + + println!(" Test harness ready!\n"); + + Self { + _can_a: can_a, + _can_b: can_b, + _sync_a: sync_a, + _sync_b: sync_b, + _tmp_a: tmp_a, + _tmp_b: tmp_b, + _tmp_sync_a: tmp_sync_a, + _tmp_sync_b: tmp_sync_b, + log_dir, + can_a_url, + can_b_url, + } + } + + fn print_logs(&self) { + println!("\n=== Process Logs ==="); + for name in &["Sync-A", "Sync-B", "CAN-A", "CAN-B"] { + print_log(self.log_dir.path(), name); + } + } +} + +// ── Config writers ─────────────────────────────────────────────────────── + +fn write_can_config(path: &Path, storage_root: &Path, sync_key: &str) { + let storage_str = storage_root.to_str().unwrap().replace('\\', "/"); + let content = format!( + r#"storage_root: "{}" +admin_token: "test-admin-token" +enable_thumbnail_cache: false +sync_api_key: "{}" +"#, + storage_str, sync_key + ); + std::fs::write(path, content).expect("write CAN config"); +} + +fn write_sync_config_with_tickets( + path: &Path, + can_url: &str, + sync_key: &str, + passphrase: &str, + ticket_file: Option<&str>, + connect_ticket_file: Option<&str>, +) { + let mut content = format!( + r#"can_service_url: "{}" +sync_api_key: "{}" +sync_passphrase: "{}" +poll_interval_secs: 2 +"#, + can_url, sync_key, passphrase + ); + + if let Some(tf) = ticket_file { + content.push_str(&format!("ticket_file: \"{}\"\n", tf)); + } + if let Some(ctf) = connect_ticket_file { + content.push_str(&format!("connect_ticket_file: \"{}\"\n", ctf)); + } + + std::fs::write(path, content).expect("write sync config"); +} + +// ── Logging helpers ───────────────────────────────────────────────────── + +fn print_log(log_dir: &Path, name: &str) { + let log_path = log_dir.join(format!("{}.log", name)); + if let Ok(contents) = std::fs::read_to_string(&log_path) { + let lines: Vec<&str> = contents.lines().collect(); + let show = if lines.len() > 50 { &lines[lines.len() - 50..] } else { &lines }; + println!("\n--- {} (last {} of {} lines) ---", name, show.len(), lines.len()); + for line in show { + println!(" {}", line); + } + } else { + println!("\n--- {} (no log file) ---", name); + } +} + +// ── HTTP helpers ───────────────────────────────────────────────────────── + +fn wait_for_http(base_url: &str, timeout: Duration, log_dir: &Path, name: &str) { + let client = reqwest::blocking::Client::new(); + let start = Instant::now(); + let url = format!("{}/api/v1/can/0/list?limit=1", base_url); + + loop { + if start.elapsed() > timeout { + print_log(log_dir, name); + panic!("Timeout waiting for {} to become ready", base_url); + } + match client.get(&url).timeout(Duration::from_secs(1)).send() { + Ok(resp) if resp.status().is_success() => return, + _ => std::thread::sleep(Duration::from_millis(200)), + } + } +} + +/// Ingest a file into a CAN service instance. Returns the asset hash. +fn ingest_file(base_url: &str, filename: &str, content: &[u8], mime_type: &str) -> String { + let client = reqwest::blocking::Client::new(); + let url = format!("{}/api/v1/can/0/ingest", base_url); + + let part = reqwest::blocking::multipart::Part::bytes(content.to_vec()) + .file_name(filename.to_string()) + .mime_str(mime_type) + .unwrap(); + + let form = reqwest::blocking::multipart::Form::new() + .part("file", part) + .text("mime_type", mime_type.to_string()); + + let resp = client + .post(&url) + .multipart(form) + .send() + .expect("ingest request failed"); + + assert!( + resp.status().is_success(), + "Ingest failed with status {}", + resp.status() + ); + + let body: Value = resp.json().expect("parse ingest response"); + body["data"]["hash"] + .as_str() + .expect("no hash in response") + .to_string() +} + +/// List all assets from a CAN service. Returns list of hashes. +fn list_hashes(base_url: &str) -> Vec { + let client = reqwest::blocking::Client::new(); + let url = format!("{}/api/v1/can/0/list?limit=1000", base_url); + + let resp = client + .get(&url) + .timeout(Duration::from_secs(5)) + .send() + .expect("list request failed"); + + if !resp.status().is_success() { + return vec![]; + } + + let body: Value = resp.json().expect("parse list response"); + body["data"]["items"] + .as_array() + .unwrap_or(&vec![]) + .iter() + .filter_map(|item| item["hash"].as_str().map(String::from)) + .collect() +} + +/// Wait for a specific hash to appear on a CAN service. +fn wait_for_hash(base_url: &str, hash: &str, timeout: Duration) -> bool { + let start = Instant::now(); + while start.elapsed() < timeout { + let hashes = list_hashes(base_url); + if hashes.contains(&hash.to_string()) { + let elapsed = start.elapsed(); + println!(" (synced in {:.1}s)", elapsed.as_secs_f64()); + return true; + } + std::thread::sleep(POLL_INTERVAL); + } + false +} + +/// Generate random file content of given size. +fn random_content(size: usize) -> Vec { + let mut rng = rand::rng(); + let mut buf = vec![0u8; size]; + rng.fill(&mut buf[..]); + buf +} + +// ── Test runner ────────────────────────────────────────────────────────── + +fn find_can_service_bin() -> PathBuf { + let self_exe = std::env::current_exe().expect("get current exe path"); + let target_dir = self_exe.parent().unwrap(); + + let bin_name = if cfg!(windows) { + "can-service.exe" + } else { + "can-service" + }; + + // Check same directory + let candidate = target_dir.join(bin_name); + if candidate.exists() { + return candidate; + } + + // Check parent/debug + let candidate = target_dir.parent().unwrap().join("debug").join(bin_name); + if candidate.exists() { + return candidate; + } + + // Check workspace target/debug + let workspace_root = PathBuf::from(env!("CARGO_MANIFEST_DIR")) + .parent() + .unwrap() + .parent() + .unwrap() + .to_path_buf(); + let candidate = workspace_root.join("target").join("debug").join(bin_name); + if candidate.exists() { + return candidate; + } + + panic!( + "Cannot find {} binary. Build it first:\n cd {} && cargo build", + bin_name, + workspace_root.display() + ); +} + +fn main() { + println!("╔══════════════════════════════════════════╗"); + println!("║ CAN Sync v2 Integration Test ║"); + println!("╚══════════════════════════════════════════╝"); + + let can_service_bin = find_can_service_bin(); + println!("\nUsing CAN service: {}", can_service_bin.display()); + + // Build can-sync if needed + println!("\nBuilding can-sync..."); + let build_status = Command::new("cargo") + .args(["build", "--bin", "can-sync"]) + .current_dir(env!("CARGO_MANIFEST_DIR")) + .status() + .expect("cargo build failed"); + assert!(build_status.success(), "Failed to build can-sync"); + + // Set up harness (starts all processes) + let harness = TestHarness::new(&can_service_bin); + + let mut passed = 0; + let mut failed = 0; + let mut results: Vec<(String, bool, String)> = vec![]; + + // ── Test 1: Ingest on A → appears on B ─────────────────────────── + print_test_header("Test 1: Ingest file on CAN-A, verify sync to CAN-B"); + { + let content = random_content(4096); + let hash = ingest_file(&harness.can_a_url, "test1.bin", &content, "application/octet-stream"); + println!(" Ingested on A: hash={}", &hash[..16]); + + let found = wait_for_hash(&harness.can_b_url, &hash, SYNC_TIMEOUT); + if found { + println!(" ✓ File appeared on CAN-B!"); + results.push(("A→B sync".into(), true, format!("hash {}", &hash[..16]))); + passed += 1; + } else { + println!(" ✗ File NOT found on CAN-B after {:?}", SYNC_TIMEOUT); + results.push(("A→B sync".into(), false, "timeout".into())); + failed += 1; + } + } + + // ── Test 2: Ingest on B → appears on A ─────────────────────────── + print_test_header("Test 2: Ingest file on CAN-B, verify sync to CAN-A"); + { + let content = random_content(8192); + let hash = ingest_file(&harness.can_b_url, "test2.dat", &content, "application/octet-stream"); + println!(" Ingested on B: hash={}", &hash[..16]); + + let found = wait_for_hash(&harness.can_a_url, &hash, SYNC_TIMEOUT); + if found { + println!(" ✓ File appeared on CAN-A!"); + results.push(("B→A sync".into(), true, format!("hash {}", &hash[..16]))); + passed += 1; + } else { + println!(" ✗ File NOT found on CAN-A after {:?}", SYNC_TIMEOUT); + results.push(("B→A sync".into(), false, "timeout".into())); + failed += 1; + } + } + + // ── Test 3: Multiple files batch ───────────────────────────────── + print_test_header("Test 3: Ingest 5 files on A, verify all sync to B"); + { + let mut hashes = vec![]; + for i in 0..5 { + let content = random_content(1024 + i * 512); + let fname = format!("batch_{}.bin", i); + let hash = ingest_file(&harness.can_a_url, &fname, &content, "application/octet-stream"); + println!(" Ingested batch file {}: hash={}", i, &hash[..16]); + hashes.push(hash); + } + + let mut all_found = true; + for (i, hash) in hashes.iter().enumerate() { + let found = wait_for_hash(&harness.can_b_url, hash, SYNC_TIMEOUT); + if found { + println!(" ✓ Batch file {} synced", i); + } else { + println!(" ✗ Batch file {} NOT synced", i); + all_found = false; + } + } + + if all_found { + results.push(("Batch A→B (5 files)".into(), true, "all synced".into())); + passed += 1; + } else { + results.push(("Batch A→B (5 files)".into(), false, "some missing".into())); + failed += 1; + } + } + + // ── Test 4: Verify total counts match ──────────────────────────── + print_test_header("Test 4: Verify asset counts match on both sides"); + { + std::thread::sleep(Duration::from_secs(5)); + + let a_hashes = list_hashes(&harness.can_a_url); + let b_hashes = list_hashes(&harness.can_b_url); + + println!(" CAN-A has {} assets", a_hashes.len()); + println!(" CAN-B has {} assets", b_hashes.len()); + + if a_hashes.len() == b_hashes.len() { + let a_set: std::collections::HashSet<_> = a_hashes.iter().collect(); + let b_set: std::collections::HashSet<_> = b_hashes.iter().collect(); + let matching = a_set == b_set; + + if matching { + println!(" ✓ Both sides have identical asset sets ({} assets)", a_hashes.len()); + results.push(("Count match".into(), true, format!("{} == {}", a_hashes.len(), b_hashes.len()))); + passed += 1; + } else { + println!(" ✗ Same count but different hashes!"); + let only_a: Vec<_> = a_set.difference(&b_set).collect(); + let only_b: Vec<_> = b_set.difference(&a_set).collect(); + if !only_a.is_empty() { + println!(" Only on A: {:?}", only_a.iter().map(|h| &h[..16]).collect::>()); + } + if !only_b.is_empty() { + println!(" Only on B: {:?}", only_b.iter().map(|h| &h[..16]).collect::>()); + } + results.push(("Count match".into(), false, "hash mismatch".into())); + failed += 1; + } + } else { + println!(" ✗ Count mismatch: A={}, B={}", a_hashes.len(), b_hashes.len()); + results.push(("Count match".into(), false, format!("{} != {}", a_hashes.len(), b_hashes.len()))); + failed += 1; + } + } + + // ── Results ────────────────────────────────────────────────────── + println!("\n╔══════════════════════════════════════════╗"); + println!("║ Test Results ║"); + println!("╠══════════════════════════════════════════╣"); + for (name, pass, detail) in &results { + let icon = if *pass { "✓" } else { "✗" }; + println!("║ {} {:<25} {}", + icon, + name, + if detail.len() > 12 { &detail[..12] } else { detail } + ); + } + println!("╠══════════════════════════════════════════╣"); + println!("║ Passed: {} Failed: {} ║", passed, failed); + println!("╚══════════════════════════════════════════╝"); + + // Always print logs + harness.print_logs(); + + // Clean up + println!("\n=== Cleaning up ===\n"); + drop(harness); + println!(" All temp directories removed."); + + if failed > 0 { + std::process::exit(1); + } +} + +fn print_test_header(name: &str) { + println!("\n--- {} ---\n", name); +} diff --git a/src/main.rs b/src/main.rs index 05fdab6..b5a3b4c 100644 --- a/src/main.rs +++ b/src/main.rs @@ -54,7 +54,11 @@ async fn main() -> anyhow::Result<()> { .layer(CorsLayer::permissive()) .with_state(state); - let addr = SocketAddr::from(([0, 0, 0, 0], 3210)); + let port: u16 = std::env::var("CAN_PORT") + .ok() + .and_then(|p| p.parse().ok()) + .unwrap_or(3210); + let addr = SocketAddr::from(([0, 0, 0, 0], port)); tracing::info!("CAN service listening on {}", addr); let listener = tokio::net::TcpListener::bind(addr).await?;