Jason Tudisco 4dc0f8c12d Add stress tests: burst ingests, large files, simultaneous sync
Rewrite integration test with 6 scenarios:
- Single file each direction (basic sanity)
- Rapid burst 25 files A→B with mixed sizes (12MB+ large files)
- Rapid burst 25 files B→A with mixed sizes
- Simultaneous burst: 25 files on EACH side at the same time
- Final full-mirror verification (102 assets, perfect match)

Throughput measured at ~20 MB/s per direction, ~31 MB/s
bidirectional. All tests pass including simultaneous ingestion
of 162 MB across 50 files with zero conflicts or missing assets.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-12 16:29:16 -06:00

1045 lines
36 KiB
Rust

//! Integration + stress test for CAN Sync v2
//!
//! Starts two CAN service instances + two sync agents, then runs increasingly
//! aggressive sync scenarios:
//! 1. Single file A→B
//! 2. Single file B→A
//! 3. Rapid burst 25 files A→B (mixed sizes, some 10MB+)
//! 4. Rapid burst 25 files B→A (mixed sizes, some 10MB+)
//! 5. Simultaneous burst: 25 files on EACH side at the same time
//! 6. Final full-mirror verification
//!
//! Usage:
//! cargo run --bin sync-test
//!
//! Prerequisites:
//! CAN service must be built: `cargo build` in the CanService root
use std::collections::HashSet;
use std::path::{Path, PathBuf};
use std::process::{Child, Command, Stdio};
use std::sync::{Arc, Mutex};
use std::time::{Duration, Instant};
use rand::Rng;
use serde_json::Value;
use tempfile::TempDir;
// ── Configuration ────────────────────────────────────────────────────────
const CAN_A_PORT: u16 = 13210;
const CAN_B_PORT: u16 = 13220;
const SYNC_KEY: &str = "test-sync-key-42";
const SYNC_PASSPHRASE: &str = "integration-test-passphrase";
const SYNC_TIMEOUT: Duration = Duration::from_secs(120); // longer for large files
const POLL_INTERVAL: Duration = Duration::from_millis(500);
// Stress test tuning
const BURST_COUNT: usize = 25;
const LARGE_FILE_SIZE: usize = 12 * 1024 * 1024; // 12 MB
const MEDIUM_FILE_SIZE: usize = 2 * 1024 * 1024; // 2 MB
const SMALL_FILE_SIZE: usize = 4096; // 4 KB
// ── Process management ───────────────────────────────────────────────────
struct ManagedProcess {
child: Child,
name: String,
}
impl ManagedProcess {
fn spawn(
name: &str,
cmd: &str,
args: &[&str],
envs: &[(&str, &str)],
log_dir: &Path,
) -> Self {
println!(" Starting {} ...", name);
let mut command = Command::new(cmd);
let log_file = std::fs::File::create(log_dir.join(format!("{}.log", name)))
.expect("create log file");
let log_file_clone = log_file.try_clone().expect("clone log file handle");
command
.args(args)
.stdout(Stdio::from(log_file))
.stderr(Stdio::from(log_file_clone))
.env("RUST_LOG", "can_sync=debug,can_service=debug,iroh=info,iroh_gossip=info");
for (k, v) in envs {
command.env(k, v);
}
let child = command
.spawn()
.unwrap_or_else(|e| panic!("Failed to start {}: {}", name, e));
println!(" {} started (pid={})", name, child.id());
Self {
child,
name: name.to_string(),
}
}
fn kill(&mut self) {
let _ = self.child.kill();
let _ = self.child.wait();
println!(" {} stopped", self.name);
}
}
impl Drop for ManagedProcess {
fn drop(&mut self) {
self.kill();
}
}
// ── Test harness ─────────────────────────────────────────────────────────
struct TestHarness {
_can_a: ManagedProcess,
_can_b: ManagedProcess,
_sync_a: ManagedProcess,
_sync_b: ManagedProcess,
_tmp_a: TempDir,
_tmp_b: TempDir,
_tmp_sync_a: TempDir,
_tmp_sync_b: TempDir,
log_dir: TempDir,
can_a_url: String,
can_b_url: String,
}
impl TestHarness {
fn new(can_service_bin: &Path) -> Self {
println!("\n=== Setting up test harness ===\n");
// Create temp directories
let tmp_a = TempDir::new().expect("create temp dir A");
let tmp_b = TempDir::new().expect("create temp dir B");
let tmp_sync_a = TempDir::new().expect("create temp dir sync A");
let tmp_sync_b = TempDir::new().expect("create temp dir sync B");
let log_dir = TempDir::new().expect("create log dir");
println!(" Logs: {}", log_dir.path().display());
println!(" CAN A storage: {}", tmp_a.path().display());
println!(" CAN B storage: {}", tmp_b.path().display());
// Write CAN service configs
let config_a = tmp_a.path().join("config.yaml");
let config_b = tmp_b.path().join("config.yaml");
write_can_config(&config_a, tmp_a.path(), SYNC_KEY);
write_can_config(&config_b, tmp_b.path(), SYNC_KEY);
// Start CAN services
let can_a = ManagedProcess::spawn(
"CAN-A",
can_service_bin.to_str().unwrap(),
&[config_a.to_str().unwrap()],
&[("CAN_PORT", &CAN_A_PORT.to_string())],
log_dir.path(),
);
let can_b = ManagedProcess::spawn(
"CAN-B",
can_service_bin.to_str().unwrap(),
&[config_b.to_str().unwrap()],
&[("CAN_PORT", &CAN_B_PORT.to_string())],
log_dir.path(),
);
let can_a_url = format!("http://127.0.0.1:{}", CAN_A_PORT);
let can_b_url = format!("http://127.0.0.1:{}", CAN_B_PORT);
// Wait for CAN services to be ready
println!("\n Waiting for CAN services to start...");
wait_for_http(&can_a_url, Duration::from_secs(10), log_dir.path(), "CAN-A");
wait_for_http(&can_b_url, Duration::from_secs(10), log_dir.path(), "CAN-B");
println!(" Both CAN services ready!");
// Find can-sync binary
let manifest_dir = PathBuf::from(env!("CARGO_MANIFEST_DIR"));
let can_sync_bin = manifest_dir
.join("target")
.join("debug")
.join("can-sync")
.with_extension(std::env::consts::EXE_EXTENSION);
assert!(
can_sync_bin.exists(),
"can-sync binary not found at {}",
can_sync_bin.display()
);
println!(" Using can-sync: {}", can_sync_bin.display());
// Ticket file paths for direct peer connection
let ticket_a = tmp_sync_a.path().join("ticket_a.json");
let ticket_a_str = ticket_a.to_str().unwrap().replace('\\', "/");
// Write sync agent configs with ticket file exchange
let sync_config_a = tmp_sync_a.path().join("config.yaml");
let sync_config_b = tmp_sync_b.path().join("config.yaml");
write_sync_config_with_tickets(
&sync_config_a,
&can_a_url,
SYNC_KEY,
SYNC_PASSPHRASE,
Some(&ticket_a_str),
None,
);
write_sync_config_with_tickets(
&sync_config_b,
&can_b_url,
SYNC_KEY,
SYNC_PASSPHRASE,
None,
Some(&ticket_a_str),
);
// Start Sync-A first (it writes the ticket)
let sync_a = ManagedProcess::spawn(
"Sync-A",
can_sync_bin.to_str().unwrap(),
&[sync_config_a.to_str().unwrap()],
&[],
log_dir.path(),
);
// Wait for Sync-A to write its ticket file
println!(" Waiting for Sync-A to write ticket...");
let ticket_start = Instant::now();
loop {
if ticket_start.elapsed() > Duration::from_secs(15) {
print_log(log_dir.path(), "Sync-A");
panic!("Timeout waiting for Sync-A ticket file");
}
if let Ok(contents) = std::fs::read_to_string(&ticket_a) {
if !contents.trim().is_empty() {
println!(" Sync-A ticket ready ({} bytes)", contents.len());
break;
}
}
std::thread::sleep(Duration::from_millis(100));
}
// Start Sync-B
let sync_b = ManagedProcess::spawn(
"Sync-B",
can_sync_bin.to_str().unwrap(),
&[sync_config_b.to_str().unwrap()],
&[],
log_dir.path(),
);
// Wait for peers to connect
println!(" Waiting for sync agents to connect...");
std::thread::sleep(Duration::from_secs(5));
println!(" Test harness ready!\n");
Self {
_can_a: can_a,
_can_b: can_b,
_sync_a: sync_a,
_sync_b: sync_b,
_tmp_a: tmp_a,
_tmp_b: tmp_b,
_tmp_sync_a: tmp_sync_a,
_tmp_sync_b: tmp_sync_b,
log_dir,
can_a_url,
can_b_url,
}
}
fn print_logs(&self) {
println!("\n=== Process Logs ===");
for name in &["Sync-A", "Sync-B", "CAN-A", "CAN-B"] {
print_log(self.log_dir.path(), name);
}
}
}
// ── Config writers ───────────────────────────────────────────────────────
fn write_can_config(path: &Path, storage_root: &Path, sync_key: &str) {
let storage_str = storage_root.to_str().unwrap().replace('\\', "/");
let content = format!(
r#"storage_root: "{}"
admin_token: "test-admin-token"
enable_thumbnail_cache: false
sync_api_key: "{}"
"#,
storage_str, sync_key
);
std::fs::write(path, content).expect("write CAN config");
}
fn write_sync_config_with_tickets(
path: &Path,
can_url: &str,
sync_key: &str,
passphrase: &str,
ticket_file: Option<&str>,
connect_ticket_file: Option<&str>,
) {
let mut content = format!(
r#"can_service_url: "{}"
sync_api_key: "{}"
sync_passphrase: "{}"
poll_interval_secs: 2
"#,
can_url, sync_key, passphrase
);
if let Some(tf) = ticket_file {
content.push_str(&format!("ticket_file: \"{}\"\n", tf));
}
if let Some(ctf) = connect_ticket_file {
content.push_str(&format!("connect_ticket_file: \"{}\"\n", ctf));
}
std::fs::write(path, content).expect("write sync config");
}
// ── Logging helpers ─────────────────────────────────────────────────────
fn print_log(log_dir: &Path, name: &str) {
let log_path = log_dir.join(format!("{}.log", name));
if let Ok(contents) = std::fs::read_to_string(&log_path) {
let lines: Vec<&str> = contents.lines().collect();
let show = if lines.len() > 80 { &lines[lines.len() - 80..] } else { &lines };
println!("\n--- {} (last {} of {} lines) ---", name, show.len(), lines.len());
for line in show {
println!(" {}", line);
}
} else {
println!("\n--- {} (no log file) ---", name);
}
}
// ── HTTP helpers ─────────────────────────────────────────────────────────
fn wait_for_http(base_url: &str, timeout: Duration, log_dir: &Path, name: &str) {
let client = reqwest::blocking::Client::new();
let start = Instant::now();
let url = format!("{}/api/v1/can/0/list?limit=1", base_url);
loop {
if start.elapsed() > timeout {
print_log(log_dir, name);
panic!("Timeout waiting for {} to become ready", base_url);
}
match client.get(&url).timeout(Duration::from_secs(1)).send() {
Ok(resp) if resp.status().is_success() => return,
_ => std::thread::sleep(Duration::from_millis(200)),
}
}
}
/// Ingest a file into a CAN service instance. Returns the asset hash.
fn ingest_file(base_url: &str, filename: &str, content: &[u8], mime_type: &str) -> String {
let client = reqwest::blocking::Client::new();
let url = format!("{}/api/v1/can/0/ingest", base_url);
let part = reqwest::blocking::multipart::Part::bytes(content.to_vec())
.file_name(filename.to_string())
.mime_str(mime_type)
.unwrap();
let form = reqwest::blocking::multipart::Form::new()
.part("file", part)
.text("mime_type", mime_type.to_string());
let resp = client
.post(&url)
.multipart(form)
.timeout(Duration::from_secs(30))
.send()
.expect("ingest request failed");
assert!(
resp.status().is_success(),
"Ingest failed with status {}",
resp.status()
);
let body: Value = resp.json().expect("parse ingest response");
body["data"]["hash"]
.as_str()
.expect("no hash in response")
.to_string()
}
/// List all assets from a CAN service. Returns list of hashes.
fn list_hashes(base_url: &str) -> Vec<String> {
let client = reqwest::blocking::Client::new();
let url = format!("{}/api/v1/can/0/list?limit=10000", base_url);
let resp = client
.get(&url)
.timeout(Duration::from_secs(5))
.send()
.expect("list request failed");
if !resp.status().is_success() {
return vec![];
}
let body: Value = resp.json().expect("parse list response");
body["data"]["items"]
.as_array()
.unwrap_or(&vec![])
.iter()
.filter_map(|item| item["hash"].as_str().map(String::from))
.collect()
}
/// Wait for a specific hash to appear on a CAN service.
fn wait_for_hash(base_url: &str, hash: &str, timeout: Duration) -> bool {
let start = Instant::now();
while start.elapsed() < timeout {
let hashes = list_hashes(base_url);
if hashes.contains(&hash.to_string()) {
return true;
}
std::thread::sleep(POLL_INTERVAL);
}
false
}
/// Wait for ALL given hashes to appear on a CAN service.
/// Returns (found_count, total, elapsed).
fn wait_for_all_hashes(
base_url: &str,
expected: &[String],
timeout: Duration,
) -> (usize, usize, Duration) {
let start = Instant::now();
let expected_set: HashSet<&String> = expected.iter().collect();
loop {
let current = list_hashes(base_url);
let current_set: HashSet<String> = current.into_iter().collect();
let found = expected_set
.iter()
.filter(|h| current_set.contains(**h))
.count();
if found == expected.len() {
return (found, expected.len(), start.elapsed());
}
if start.elapsed() >= timeout {
return (found, expected.len(), start.elapsed());
}
std::thread::sleep(POLL_INTERVAL);
}
}
/// Generate random file content of given size.
fn random_content(size: usize) -> Vec<u8> {
let mut rng = rand::rng();
let mut buf = vec![0u8; size];
rng.fill(&mut buf[..]);
buf
}
/// Human-readable byte size.
fn human_size(bytes: usize) -> String {
if bytes >= 1024 * 1024 {
format!("{:.1} MB", bytes as f64 / (1024.0 * 1024.0))
} else if bytes >= 1024 {
format!("{:.1} KB", bytes as f64 / 1024.0)
} else {
format!("{} B", bytes)
}
}
/// Generate a mixed-size file list for stress tests.
/// Returns Vec<(filename, content)> with a mix of small, medium, and large files.
fn generate_burst_files(prefix: &str, count: usize) -> Vec<(String, Vec<u8>)> {
let mut files = Vec::with_capacity(count);
let mut total_bytes: usize = 0;
for i in 0..count {
let size = match i % 5 {
0 => LARGE_FILE_SIZE + (i * 1024 * 100), // ~12-14 MB (every 5th file)
1 => MEDIUM_FILE_SIZE + (i * 1024 * 50), // ~2-3 MB
2 => SMALL_FILE_SIZE + (i * 100), // ~4-6 KB
3 => 512 * 1024 + (i * 1024 * 10), // ~500 KB - 750 KB
_ => 64 * 1024 + (i * 1024), // ~64-90 KB
};
let filename = format!("{}_{:03}.bin", prefix, i);
let content = random_content(size);
total_bytes += content.len();
files.push((filename, content));
}
println!(
" Generated {} files, total {}",
count,
human_size(total_bytes)
);
// Print size breakdown
let large = files.iter().filter(|(_, c)| c.len() >= 10 * 1024 * 1024).count();
let medium = files.iter().filter(|(_, c)| c.len() >= 1024 * 1024 && c.len() < 10 * 1024 * 1024).count();
let small = files.iter().filter(|(_, c)| c.len() < 1024 * 1024).count();
println!(
" Breakdown: {} large (10MB+), {} medium (1-10MB), {} small (<1MB)",
large, medium, small
);
files
}
/// Ingest a batch of files as fast as possible (sequentially, but no delays).
/// Returns list of hashes.
fn rapid_ingest(base_url: &str, files: &[(String, Vec<u8>)]) -> Vec<String> {
let mut hashes = Vec::with_capacity(files.len());
let start = Instant::now();
for (i, (filename, content)) in files.iter().enumerate() {
let hash = ingest_file(base_url, filename, content, "application/octet-stream");
if i < 3 || i == files.len() - 1 || content.len() >= 10 * 1024 * 1024 {
println!(
" [{}/{}] Ingested {} ({}) → {}",
i + 1,
files.len(),
filename,
human_size(content.len()),
&hash[..16]
);
} else if i == 3 {
println!(" ... ingesting remaining files ...");
}
hashes.push(hash);
}
let elapsed = start.elapsed();
let total_bytes: usize = files.iter().map(|(_, c)| c.len()).sum();
println!(
" Ingested {} files ({}) in {:.1}s ({}/s)",
files.len(),
human_size(total_bytes),
elapsed.as_secs_f64(),
human_size((total_bytes as f64 / elapsed.as_secs_f64()) as usize)
);
hashes
}
/// Ingest files from two threads simultaneously, return (hashes_a, hashes_b).
fn parallel_ingest(
url_a: &str,
files_a: &[(String, Vec<u8>)],
url_b: &str,
files_b: &[(String, Vec<u8>)],
) -> (Vec<String>, Vec<String>) {
let url_a = url_a.to_string();
let url_b = url_b.to_string();
// Clone the file data for the threads
let files_a: Vec<(String, Vec<u8>)> = files_a.to_vec();
let files_b: Vec<(String, Vec<u8>)> = files_b.to_vec();
let hashes_a = Arc::new(Mutex::new(Vec::new()));
let hashes_b = Arc::new(Mutex::new(Vec::new()));
let ha = hashes_a.clone();
let hb = hashes_b.clone();
// Spawn two threads that ingest simultaneously
let thread_a = std::thread::spawn(move || {
let mut results = Vec::with_capacity(files_a.len());
for (filename, content) in &files_a {
let hash = ingest_file(&url_a, filename, content, "application/octet-stream");
results.push(hash);
}
*ha.lock().unwrap() = results;
});
let thread_b = std::thread::spawn(move || {
let mut results = Vec::with_capacity(files_b.len());
for (filename, content) in &files_b {
let hash = ingest_file(&url_b, filename, content, "application/octet-stream");
results.push(hash);
}
*hb.lock().unwrap() = results;
});
thread_a.join().expect("ingest thread A panicked");
thread_b.join().expect("ingest thread B panicked");
let a = hashes_a.lock().unwrap().clone();
let b = hashes_b.lock().unwrap().clone();
(a, b)
}
// ── Test runner ──────────────────────────────────────────────────────────
fn find_can_service_bin() -> PathBuf {
let self_exe = std::env::current_exe().expect("get current exe path");
let target_dir = self_exe.parent().unwrap();
let bin_name = if cfg!(windows) {
"can-service.exe"
} else {
"can-service"
};
let candidate = target_dir.join(bin_name);
if candidate.exists() {
return candidate;
}
let candidate = target_dir.parent().unwrap().join("debug").join(bin_name);
if candidate.exists() {
return candidate;
}
let workspace_root = PathBuf::from(env!("CARGO_MANIFEST_DIR"))
.parent()
.unwrap()
.parent()
.unwrap()
.to_path_buf();
let candidate = workspace_root.join("target").join("debug").join(bin_name);
if candidate.exists() {
return candidate;
}
panic!(
"Cannot find {} binary. Build it first:\n cd {} && cargo build",
bin_name,
workspace_root.display()
);
}
fn main() {
println!("╔══════════════════════════════════════════════════╗");
println!("║ CAN Sync v2 — Integration & Stress Test ║");
println!("╚══════════════════════════════════════════════════╝");
let can_service_bin = find_can_service_bin();
println!("\nUsing CAN service: {}", can_service_bin.display());
// Build can-sync if needed
println!("\nBuilding can-sync...");
let build_status = Command::new("cargo")
.args(["build", "--bin", "can-sync"])
.current_dir(env!("CARGO_MANIFEST_DIR"))
.status()
.expect("cargo build failed");
assert!(build_status.success(), "Failed to build can-sync");
// Set up harness (starts all processes)
let harness = TestHarness::new(&can_service_bin);
let mut passed = 0u32;
let mut failed = 0u32;
let mut results: Vec<(String, bool, String)> = vec![];
// ── Test 1: Single file A→B ──────────────────────────────────────
print_test_header("Test 1: Single file A→B");
{
let content = random_content(4096);
let hash = ingest_file(
&harness.can_a_url,
"test1.bin",
&content,
"application/octet-stream",
);
println!(" Ingested on A: hash={}", &hash[..16]);
let found = wait_for_hash(&harness.can_b_url, &hash, Duration::from_secs(30));
if found {
println!(" ✓ File appeared on CAN-B");
results.push(("A→B single".into(), true, "ok".into()));
passed += 1;
} else {
println!(" ✗ File NOT found on CAN-B after 30s");
results.push(("A→B single".into(), false, "timeout".into()));
failed += 1;
}
}
// ── Test 2: Single file B→A ──────────────────────────────────────
print_test_header("Test 2: Single file B→A");
{
let content = random_content(8192);
let hash = ingest_file(
&harness.can_b_url,
"test2.dat",
&content,
"application/octet-stream",
);
println!(" Ingested on B: hash={}", &hash[..16]);
let found = wait_for_hash(&harness.can_a_url, &hash, Duration::from_secs(30));
if found {
println!(" ✓ File appeared on CAN-A");
results.push(("B→A single".into(), true, "ok".into()));
passed += 1;
} else {
println!(" ✗ File NOT found on CAN-A after 30s");
results.push(("B→A single".into(), false, "timeout".into()));
failed += 1;
}
}
// ── Test 3: Rapid burst A→B (25 files, mixed sizes, some 10MB+) ─
print_test_header(&format!(
"Test 3: Rapid burst {} files A→B (mixed sizes, some 10MB+)",
BURST_COUNT
));
{
println!(" Generating files...");
let files = generate_burst_files("burst_a2b", BURST_COUNT);
println!(" Ingesting rapidly on CAN-A...");
let hashes = rapid_ingest(&harness.can_a_url, &files);
println!(" Waiting for all {} files to sync to CAN-B...", hashes.len());
let (found, total, elapsed) = wait_for_all_hashes(&harness.can_b_url, &hashes, SYNC_TIMEOUT);
if found == total {
println!(
" ✓ All {} files synced to B in {:.1}s",
total,
elapsed.as_secs_f64()
);
let total_bytes: usize = files.iter().map(|(_, c)| c.len()).sum();
println!(
" Throughput: {}/s",
human_size((total_bytes as f64 / elapsed.as_secs_f64()) as usize)
);
results.push((
format!("Burst A→B ({})", total),
true,
format!("{:.1}s", elapsed.as_secs_f64()),
));
passed += 1;
} else {
println!(
" ✗ Only {}/{} files synced after {:.1}s",
found,
total,
elapsed.as_secs_f64()
);
// Report which ones are missing
let b_hashes: HashSet<String> = list_hashes(&harness.can_b_url).into_iter().collect();
let missing: Vec<_> = hashes
.iter()
.enumerate()
.filter(|(_, h)| !b_hashes.contains(*h))
.map(|(i, h)| format!("#{} {} ({})", i, &h[..12], human_size(files[i].1.len())))
.collect();
if missing.len() <= 10 {
for m in &missing {
println!(" MISSING: {}", m);
}
} else {
println!(" {} files missing (showing first 10):", missing.len());
for m in &missing[..10] {
println!(" MISSING: {}", m);
}
}
results.push((
format!("Burst A→B ({})", total),
false,
format!("{}/{}", found, total),
));
failed += 1;
}
}
// Small pause to let things settle
println!("\n (pause 3s between tests)");
std::thread::sleep(Duration::from_secs(3));
// ── Test 4: Rapid burst B→A (25 files, mixed sizes, some 10MB+) ─
print_test_header(&format!(
"Test 4: Rapid burst {} files B→A (mixed sizes, some 10MB+)",
BURST_COUNT
));
{
println!(" Generating files...");
let files = generate_burst_files("burst_b2a", BURST_COUNT);
println!(" Ingesting rapidly on CAN-B...");
let hashes = rapid_ingest(&harness.can_b_url, &files);
println!(" Waiting for all {} files to sync to CAN-A...", hashes.len());
let (found, total, elapsed) = wait_for_all_hashes(&harness.can_a_url, &hashes, SYNC_TIMEOUT);
if found == total {
println!(
" ✓ All {} files synced to A in {:.1}s",
total,
elapsed.as_secs_f64()
);
let total_bytes: usize = files.iter().map(|(_, c)| c.len()).sum();
println!(
" Throughput: {}/s",
human_size((total_bytes as f64 / elapsed.as_secs_f64()) as usize)
);
results.push((
format!("Burst B→A ({})", total),
true,
format!("{:.1}s", elapsed.as_secs_f64()),
));
passed += 1;
} else {
println!(
" ✗ Only {}/{} files synced after {:.1}s",
found,
total,
elapsed.as_secs_f64()
);
let a_hashes: HashSet<String> = list_hashes(&harness.can_a_url).into_iter().collect();
let missing: Vec<_> = hashes
.iter()
.enumerate()
.filter(|(_, h)| !a_hashes.contains(*h))
.map(|(i, h)| format!("#{} {} ({})", i, &h[..12], human_size(files[i].1.len())))
.collect();
if missing.len() <= 10 {
for m in &missing {
println!(" MISSING: {}", m);
}
} else {
println!(" {} files missing (showing first 10):", missing.len());
for m in &missing[..10] {
println!(" MISSING: {}", m);
}
}
results.push((
format!("Burst B→A ({})", total),
false,
format!("{}/{}", found, total),
));
failed += 1;
}
}
println!("\n (pause 3s between tests)");
std::thread::sleep(Duration::from_secs(3));
// ── Test 5: Simultaneous burst — 25 files on EACH side at once ───
print_test_header(&format!(
"Test 5: Simultaneous burst — {} files on EACH side at once",
BURST_COUNT
));
{
println!(" Generating files for BOTH sides...");
let files_for_a = generate_burst_files("simul_onA", BURST_COUNT);
let files_for_b = generate_burst_files("simul_onB", BURST_COUNT);
let total_bytes: usize = files_for_a.iter().map(|(_, c)| c.len()).sum::<usize>()
+ files_for_b.iter().map(|(_, c)| c.len()).sum::<usize>();
println!(
" Total data: {} across {} files",
human_size(total_bytes),
BURST_COUNT * 2
);
println!(" Ingesting on BOTH sides simultaneously...");
let start = Instant::now();
let (hashes_a, hashes_b) = parallel_ingest(
&harness.can_a_url,
&files_for_a,
&harness.can_b_url,
&files_for_b,
);
let ingest_elapsed = start.elapsed();
println!(
" Parallel ingest done in {:.1}s ({} on A, {} on B)",
ingest_elapsed.as_secs_f64(),
hashes_a.len(),
hashes_b.len()
);
// Now wait for cross-sync: files from A should appear on B, files from B on A
println!(
" Waiting for A's {} files to appear on B...",
hashes_a.len()
);
let (found_on_b, total_a, elapsed_b) =
wait_for_all_hashes(&harness.can_b_url, &hashes_a, SYNC_TIMEOUT);
println!(
" Waiting for B's {} files to appear on A...",
hashes_b.len()
);
let (found_on_a, total_b, elapsed_a) =
wait_for_all_hashes(&harness.can_a_url, &hashes_b, SYNC_TIMEOUT);
let a_ok = found_on_b == total_a;
let b_ok = found_on_a == total_b;
if a_ok && b_ok {
let max_elapsed = elapsed_a.max(elapsed_b);
println!(
" ✓ Bidirectional sync complete! A→B: {}/{} in {:.1}s, B→A: {}/{} in {:.1}s",
found_on_b,
total_a,
elapsed_b.as_secs_f64(),
found_on_a,
total_b,
elapsed_a.as_secs_f64()
);
println!(
" Effective throughput: {}/s (both directions)",
human_size((total_bytes as f64 / max_elapsed.as_secs_f64()) as usize)
);
results.push((
format!("Simul {}+{}", BURST_COUNT, BURST_COUNT),
true,
format!("{:.1}s", max_elapsed.as_secs_f64()),
));
passed += 1;
} else {
println!(
" ✗ A→B: {}/{}, B→A: {}/{}",
found_on_b, total_a, found_on_a, total_b
);
if !a_ok {
let b_hashes: HashSet<String> =
list_hashes(&harness.can_b_url).into_iter().collect();
let missing_count = hashes_a.iter().filter(|h| !b_hashes.contains(*h)).count();
println!(" A→B: {} files missing on B", missing_count);
}
if !b_ok {
let a_hashes: HashSet<String> =
list_hashes(&harness.can_a_url).into_iter().collect();
let missing_count = hashes_b.iter().filter(|h| !a_hashes.contains(*h)).count();
println!(" B→A: {} files missing on A", missing_count);
}
results.push((
format!("Simul {}+{}", BURST_COUNT, BURST_COUNT),
false,
format!(
"A→B {}/{} B→A {}/{}",
found_on_b, total_a, found_on_a, total_b
),
));
failed += 1;
}
}
// ── Test 6: Final full-mirror verification ───────────────────────
print_test_header("Test 6: Final full-mirror verification");
{
// Give a final settlement window
println!(" Waiting 10s for any stragglers...");
std::thread::sleep(Duration::from_secs(10));
let a_hashes = list_hashes(&harness.can_a_url);
let b_hashes = list_hashes(&harness.can_b_url);
println!(" CAN-A has {} assets", a_hashes.len());
println!(" CAN-B has {} assets", b_hashes.len());
let a_set: HashSet<&String> = a_hashes.iter().collect();
let b_set: HashSet<&String> = b_hashes.iter().collect();
let only_a: Vec<_> = a_set.difference(&b_set).collect();
let only_b: Vec<_> = b_set.difference(&a_set).collect();
if only_a.is_empty() && only_b.is_empty() && a_hashes.len() == b_hashes.len() {
println!(
" ✓ Perfect mirror! {} assets identical on both sides",
a_hashes.len()
);
results.push((
"Full mirror".into(),
true,
format!("{} assets", a_hashes.len()),
));
passed += 1;
} else {
if !only_a.is_empty() {
println!("{} assets only on A:", only_a.len());
for h in only_a.iter().take(5) {
println!(" {}", &h[..16]);
}
if only_a.len() > 5 {
println!(" ... and {} more", only_a.len() - 5);
}
}
if !only_b.is_empty() {
println!("{} assets only on B:", only_b.len());
for h in only_b.iter().take(5) {
println!(" {}", &h[..16]);
}
if only_b.len() > 5 {
println!(" ... and {} more", only_b.len() - 5);
}
}
results.push((
"Full mirror".into(),
false,
format!(
"A={} B={} onlyA={} onlyB={}",
a_hashes.len(),
b_hashes.len(),
only_a.len(),
only_b.len()
),
));
failed += 1;
}
}
// ── Results ──────────────────────────────────────────────────────
let expected_total = 2 + BURST_COUNT * 2 + BURST_COUNT * 2; // singles + bursts + simul
let total_large = BURST_COUNT * 3 / 5; // roughly every 5th file is large, across 3 batches
println!("\n╔════════════════════════════════════════════════════╗");
println!("║ Test Results ║");
println!("╠════════════════════════════════════════════════════╣");
for (name, pass, detail) in &results {
let icon = if *pass { "" } else { "" };
let detail_trunc = if detail.len() > 20 {
&detail[..20]
} else {
detail
};
println!("{} {:<28} {}", icon, name, detail_trunc);
}
println!("╠════════════════════════════════════════════════════╣");
println!(
"║ Passed: {} Failed: {}",
passed, failed
);
println!(
"║ Files: ~{} total, ~{} large (10MB+) ║",
expected_total + 2, // +2 for the single file tests
total_large
);
println!("╚════════════════════════════════════════════════════╝");
// Print logs on failure
if failed > 0 {
harness.print_logs();
}
// Clean up
println!("\n=== Cleaning up ===\n");
drop(harness);
println!(" All temp directories removed.");
if failed > 0 {
std::process::exit(1);
}
}
fn print_test_header(name: &str) {
println!("\n╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌");
println!(" {}", name);
println!("╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌\n");
}