Add stress tests: burst ingests, large files, simultaneous sync
Rewrite integration test with 6 scenarios: - Single file each direction (basic sanity) - Rapid burst 25 files A→B with mixed sizes (12MB+ large files) - Rapid burst 25 files B→A with mixed sizes - Simultaneous burst: 25 files on EACH side at the same time - Final full-mirror verification (102 assets, perfect match) Throughput measured at ~20 MB/s per direction, ~31 MB/s bidirectional. All tests pass including simultaneous ingestion of 162 MB across 50 files with zero conflicts or missing assets. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
parent
ccce1474d4
commit
4dc0f8c12d
@ -1,7 +1,13 @@
|
|||||||
//! Integration test for CAN Sync v2
|
//! Integration + stress test for CAN Sync v2
|
||||||
//!
|
//!
|
||||||
//! Starts two CAN service instances + two sync agents, ingests files on each
|
//! Starts two CAN service instances + two sync agents, then runs increasingly
|
||||||
//! side, and verifies bidirectional sync.
|
//! aggressive sync scenarios:
|
||||||
|
//! 1. Single file A→B
|
||||||
|
//! 2. Single file B→A
|
||||||
|
//! 3. Rapid burst 25 files A→B (mixed sizes, some 10MB+)
|
||||||
|
//! 4. Rapid burst 25 files B→A (mixed sizes, some 10MB+)
|
||||||
|
//! 5. Simultaneous burst: 25 files on EACH side at the same time
|
||||||
|
//! 6. Final full-mirror verification
|
||||||
//!
|
//!
|
||||||
//! Usage:
|
//! Usage:
|
||||||
//! cargo run --bin sync-test
|
//! cargo run --bin sync-test
|
||||||
@ -9,8 +15,10 @@
|
|||||||
//! Prerequisites:
|
//! Prerequisites:
|
||||||
//! CAN service must be built: `cargo build` in the CanService root
|
//! CAN service must be built: `cargo build` in the CanService root
|
||||||
|
|
||||||
|
use std::collections::HashSet;
|
||||||
use std::path::{Path, PathBuf};
|
use std::path::{Path, PathBuf};
|
||||||
use std::process::{Child, Command, Stdio};
|
use std::process::{Child, Command, Stdio};
|
||||||
|
use std::sync::{Arc, Mutex};
|
||||||
use std::time::{Duration, Instant};
|
use std::time::{Duration, Instant};
|
||||||
|
|
||||||
use rand::Rng;
|
use rand::Rng;
|
||||||
@ -23,9 +31,15 @@ const CAN_A_PORT: u16 = 13210;
|
|||||||
const CAN_B_PORT: u16 = 13220;
|
const CAN_B_PORT: u16 = 13220;
|
||||||
const SYNC_KEY: &str = "test-sync-key-42";
|
const SYNC_KEY: &str = "test-sync-key-42";
|
||||||
const SYNC_PASSPHRASE: &str = "integration-test-passphrase";
|
const SYNC_PASSPHRASE: &str = "integration-test-passphrase";
|
||||||
const SYNC_TIMEOUT: Duration = Duration::from_secs(60);
|
const SYNC_TIMEOUT: Duration = Duration::from_secs(120); // longer for large files
|
||||||
const POLL_INTERVAL: Duration = Duration::from_millis(500);
|
const POLL_INTERVAL: Duration = Duration::from_millis(500);
|
||||||
|
|
||||||
|
// Stress test tuning
|
||||||
|
const BURST_COUNT: usize = 25;
|
||||||
|
const LARGE_FILE_SIZE: usize = 12 * 1024 * 1024; // 12 MB
|
||||||
|
const MEDIUM_FILE_SIZE: usize = 2 * 1024 * 1024; // 2 MB
|
||||||
|
const SMALL_FILE_SIZE: usize = 4096; // 4 KB
|
||||||
|
|
||||||
// ── Process management ───────────────────────────────────────────────────
|
// ── Process management ───────────────────────────────────────────────────
|
||||||
|
|
||||||
struct ManagedProcess {
|
struct ManagedProcess {
|
||||||
@ -156,7 +170,6 @@ impl TestHarness {
|
|||||||
println!(" Using can-sync: {}", can_sync_bin.display());
|
println!(" Using can-sync: {}", can_sync_bin.display());
|
||||||
|
|
||||||
// Ticket file paths for direct peer connection
|
// Ticket file paths for direct peer connection
|
||||||
// Sync-A writes its addr, Sync-B reads it and connects directly
|
|
||||||
let ticket_a = tmp_sync_a.path().join("ticket_a.json");
|
let ticket_a = tmp_sync_a.path().join("ticket_a.json");
|
||||||
let ticket_a_str = ticket_a.to_str().unwrap().replace('\\', "/");
|
let ticket_a_str = ticket_a.to_str().unwrap().replace('\\', "/");
|
||||||
|
|
||||||
@ -164,24 +177,22 @@ impl TestHarness {
|
|||||||
let sync_config_a = tmp_sync_a.path().join("config.yaml");
|
let sync_config_a = tmp_sync_a.path().join("config.yaml");
|
||||||
let sync_config_b = tmp_sync_b.path().join("config.yaml");
|
let sync_config_b = tmp_sync_b.path().join("config.yaml");
|
||||||
|
|
||||||
// Sync-A: write its own ticket
|
|
||||||
write_sync_config_with_tickets(
|
write_sync_config_with_tickets(
|
||||||
&sync_config_a,
|
&sync_config_a,
|
||||||
&can_a_url,
|
&can_a_url,
|
||||||
SYNC_KEY,
|
SYNC_KEY,
|
||||||
SYNC_PASSPHRASE,
|
SYNC_PASSPHRASE,
|
||||||
Some(&ticket_a_str), // write our ticket here
|
Some(&ticket_a_str),
|
||||||
None, // don't connect to anyone
|
None,
|
||||||
);
|
);
|
||||||
|
|
||||||
// Sync-B: read Sync-A's ticket and connect directly
|
|
||||||
write_sync_config_with_tickets(
|
write_sync_config_with_tickets(
|
||||||
&sync_config_b,
|
&sync_config_b,
|
||||||
&can_b_url,
|
&can_b_url,
|
||||||
SYNC_KEY,
|
SYNC_KEY,
|
||||||
SYNC_PASSPHRASE,
|
SYNC_PASSPHRASE,
|
||||||
None, // don't write our own ticket
|
None,
|
||||||
Some(&ticket_a_str), // connect to Sync-A using this ticket
|
Some(&ticket_a_str),
|
||||||
);
|
);
|
||||||
|
|
||||||
// Start Sync-A first (it writes the ticket)
|
// Start Sync-A first (it writes the ticket)
|
||||||
@ -210,7 +221,7 @@ impl TestHarness {
|
|||||||
std::thread::sleep(Duration::from_millis(100));
|
std::thread::sleep(Duration::from_millis(100));
|
||||||
}
|
}
|
||||||
|
|
||||||
// Start Sync-B (it will read Sync-A's ticket and connect)
|
// Start Sync-B
|
||||||
let sync_b = ManagedProcess::spawn(
|
let sync_b = ManagedProcess::spawn(
|
||||||
"Sync-B",
|
"Sync-B",
|
||||||
can_sync_bin.to_str().unwrap(),
|
can_sync_bin.to_str().unwrap(),
|
||||||
@ -296,7 +307,7 @@ fn print_log(log_dir: &Path, name: &str) {
|
|||||||
let log_path = log_dir.join(format!("{}.log", name));
|
let log_path = log_dir.join(format!("{}.log", name));
|
||||||
if let Ok(contents) = std::fs::read_to_string(&log_path) {
|
if let Ok(contents) = std::fs::read_to_string(&log_path) {
|
||||||
let lines: Vec<&str> = contents.lines().collect();
|
let lines: Vec<&str> = contents.lines().collect();
|
||||||
let show = if lines.len() > 50 { &lines[lines.len() - 50..] } else { &lines };
|
let show = if lines.len() > 80 { &lines[lines.len() - 80..] } else { &lines };
|
||||||
println!("\n--- {} (last {} of {} lines) ---", name, show.len(), lines.len());
|
println!("\n--- {} (last {} of {} lines) ---", name, show.len(), lines.len());
|
||||||
for line in show {
|
for line in show {
|
||||||
println!(" {}", line);
|
println!(" {}", line);
|
||||||
@ -342,6 +353,7 @@ fn ingest_file(base_url: &str, filename: &str, content: &[u8], mime_type: &str)
|
|||||||
let resp = client
|
let resp = client
|
||||||
.post(&url)
|
.post(&url)
|
||||||
.multipart(form)
|
.multipart(form)
|
||||||
|
.timeout(Duration::from_secs(30))
|
||||||
.send()
|
.send()
|
||||||
.expect("ingest request failed");
|
.expect("ingest request failed");
|
||||||
|
|
||||||
@ -361,7 +373,7 @@ fn ingest_file(base_url: &str, filename: &str, content: &[u8], mime_type: &str)
|
|||||||
/// List all assets from a CAN service. Returns list of hashes.
|
/// List all assets from a CAN service. Returns list of hashes.
|
||||||
fn list_hashes(base_url: &str) -> Vec<String> {
|
fn list_hashes(base_url: &str) -> Vec<String> {
|
||||||
let client = reqwest::blocking::Client::new();
|
let client = reqwest::blocking::Client::new();
|
||||||
let url = format!("{}/api/v1/can/0/list?limit=1000", base_url);
|
let url = format!("{}/api/v1/can/0/list?limit=10000", base_url);
|
||||||
|
|
||||||
let resp = client
|
let resp = client
|
||||||
.get(&url)
|
.get(&url)
|
||||||
@ -388,8 +400,6 @@ fn wait_for_hash(base_url: &str, hash: &str, timeout: Duration) -> bool {
|
|||||||
while start.elapsed() < timeout {
|
while start.elapsed() < timeout {
|
||||||
let hashes = list_hashes(base_url);
|
let hashes = list_hashes(base_url);
|
||||||
if hashes.contains(&hash.to_string()) {
|
if hashes.contains(&hash.to_string()) {
|
||||||
let elapsed = start.elapsed();
|
|
||||||
println!(" (synced in {:.1}s)", elapsed.as_secs_f64());
|
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
std::thread::sleep(POLL_INTERVAL);
|
std::thread::sleep(POLL_INTERVAL);
|
||||||
@ -397,6 +407,36 @@ fn wait_for_hash(base_url: &str, hash: &str, timeout: Duration) -> bool {
|
|||||||
false
|
false
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Wait for ALL given hashes to appear on a CAN service.
|
||||||
|
/// Returns (found_count, total, elapsed).
|
||||||
|
fn wait_for_all_hashes(
|
||||||
|
base_url: &str,
|
||||||
|
expected: &[String],
|
||||||
|
timeout: Duration,
|
||||||
|
) -> (usize, usize, Duration) {
|
||||||
|
let start = Instant::now();
|
||||||
|
let expected_set: HashSet<&String> = expected.iter().collect();
|
||||||
|
|
||||||
|
loop {
|
||||||
|
let current = list_hashes(base_url);
|
||||||
|
let current_set: HashSet<String> = current.into_iter().collect();
|
||||||
|
let found = expected_set
|
||||||
|
.iter()
|
||||||
|
.filter(|h| current_set.contains(**h))
|
||||||
|
.count();
|
||||||
|
|
||||||
|
if found == expected.len() {
|
||||||
|
return (found, expected.len(), start.elapsed());
|
||||||
|
}
|
||||||
|
|
||||||
|
if start.elapsed() >= timeout {
|
||||||
|
return (found, expected.len(), start.elapsed());
|
||||||
|
}
|
||||||
|
|
||||||
|
std::thread::sleep(POLL_INTERVAL);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/// Generate random file content of given size.
|
/// Generate random file content of given size.
|
||||||
fn random_content(size: usize) -> Vec<u8> {
|
fn random_content(size: usize) -> Vec<u8> {
|
||||||
let mut rng = rand::rng();
|
let mut rng = rand::rng();
|
||||||
@ -405,6 +445,139 @@ fn random_content(size: usize) -> Vec<u8> {
|
|||||||
buf
|
buf
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Human-readable byte size.
|
||||||
|
fn human_size(bytes: usize) -> String {
|
||||||
|
if bytes >= 1024 * 1024 {
|
||||||
|
format!("{:.1} MB", bytes as f64 / (1024.0 * 1024.0))
|
||||||
|
} else if bytes >= 1024 {
|
||||||
|
format!("{:.1} KB", bytes as f64 / 1024.0)
|
||||||
|
} else {
|
||||||
|
format!("{} B", bytes)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Generate a mixed-size file list for stress tests.
|
||||||
|
/// Returns Vec<(filename, content)> with a mix of small, medium, and large files.
|
||||||
|
fn generate_burst_files(prefix: &str, count: usize) -> Vec<(String, Vec<u8>)> {
|
||||||
|
let mut files = Vec::with_capacity(count);
|
||||||
|
let mut total_bytes: usize = 0;
|
||||||
|
|
||||||
|
for i in 0..count {
|
||||||
|
let size = match i % 5 {
|
||||||
|
0 => LARGE_FILE_SIZE + (i * 1024 * 100), // ~12-14 MB (every 5th file)
|
||||||
|
1 => MEDIUM_FILE_SIZE + (i * 1024 * 50), // ~2-3 MB
|
||||||
|
2 => SMALL_FILE_SIZE + (i * 100), // ~4-6 KB
|
||||||
|
3 => 512 * 1024 + (i * 1024 * 10), // ~500 KB - 750 KB
|
||||||
|
_ => 64 * 1024 + (i * 1024), // ~64-90 KB
|
||||||
|
};
|
||||||
|
|
||||||
|
let filename = format!("{}_{:03}.bin", prefix, i);
|
||||||
|
let content = random_content(size);
|
||||||
|
total_bytes += content.len();
|
||||||
|
files.push((filename, content));
|
||||||
|
}
|
||||||
|
|
||||||
|
println!(
|
||||||
|
" Generated {} files, total {}",
|
||||||
|
count,
|
||||||
|
human_size(total_bytes)
|
||||||
|
);
|
||||||
|
|
||||||
|
// Print size breakdown
|
||||||
|
let large = files.iter().filter(|(_, c)| c.len() >= 10 * 1024 * 1024).count();
|
||||||
|
let medium = files.iter().filter(|(_, c)| c.len() >= 1024 * 1024 && c.len() < 10 * 1024 * 1024).count();
|
||||||
|
let small = files.iter().filter(|(_, c)| c.len() < 1024 * 1024).count();
|
||||||
|
println!(
|
||||||
|
" Breakdown: {} large (10MB+), {} medium (1-10MB), {} small (<1MB)",
|
||||||
|
large, medium, small
|
||||||
|
);
|
||||||
|
|
||||||
|
files
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Ingest a batch of files as fast as possible (sequentially, but no delays).
|
||||||
|
/// Returns list of hashes.
|
||||||
|
fn rapid_ingest(base_url: &str, files: &[(String, Vec<u8>)]) -> Vec<String> {
|
||||||
|
let mut hashes = Vec::with_capacity(files.len());
|
||||||
|
let start = Instant::now();
|
||||||
|
|
||||||
|
for (i, (filename, content)) in files.iter().enumerate() {
|
||||||
|
let hash = ingest_file(base_url, filename, content, "application/octet-stream");
|
||||||
|
if i < 3 || i == files.len() - 1 || content.len() >= 10 * 1024 * 1024 {
|
||||||
|
println!(
|
||||||
|
" [{}/{}] Ingested {} ({}) → {}",
|
||||||
|
i + 1,
|
||||||
|
files.len(),
|
||||||
|
filename,
|
||||||
|
human_size(content.len()),
|
||||||
|
&hash[..16]
|
||||||
|
);
|
||||||
|
} else if i == 3 {
|
||||||
|
println!(" ... ingesting remaining files ...");
|
||||||
|
}
|
||||||
|
hashes.push(hash);
|
||||||
|
}
|
||||||
|
|
||||||
|
let elapsed = start.elapsed();
|
||||||
|
let total_bytes: usize = files.iter().map(|(_, c)| c.len()).sum();
|
||||||
|
println!(
|
||||||
|
" Ingested {} files ({}) in {:.1}s ({}/s)",
|
||||||
|
files.len(),
|
||||||
|
human_size(total_bytes),
|
||||||
|
elapsed.as_secs_f64(),
|
||||||
|
human_size((total_bytes as f64 / elapsed.as_secs_f64()) as usize)
|
||||||
|
);
|
||||||
|
|
||||||
|
hashes
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Ingest files from two threads simultaneously, return (hashes_a, hashes_b).
|
||||||
|
fn parallel_ingest(
|
||||||
|
url_a: &str,
|
||||||
|
files_a: &[(String, Vec<u8>)],
|
||||||
|
url_b: &str,
|
||||||
|
files_b: &[(String, Vec<u8>)],
|
||||||
|
) -> (Vec<String>, Vec<String>) {
|
||||||
|
let url_a = url_a.to_string();
|
||||||
|
let url_b = url_b.to_string();
|
||||||
|
|
||||||
|
// Clone the file data for the threads
|
||||||
|
let files_a: Vec<(String, Vec<u8>)> = files_a.to_vec();
|
||||||
|
let files_b: Vec<(String, Vec<u8>)> = files_b.to_vec();
|
||||||
|
|
||||||
|
let hashes_a = Arc::new(Mutex::new(Vec::new()));
|
||||||
|
let hashes_b = Arc::new(Mutex::new(Vec::new()));
|
||||||
|
|
||||||
|
let ha = hashes_a.clone();
|
||||||
|
let hb = hashes_b.clone();
|
||||||
|
|
||||||
|
// Spawn two threads that ingest simultaneously
|
||||||
|
let thread_a = std::thread::spawn(move || {
|
||||||
|
let mut results = Vec::with_capacity(files_a.len());
|
||||||
|
for (filename, content) in &files_a {
|
||||||
|
let hash = ingest_file(&url_a, filename, content, "application/octet-stream");
|
||||||
|
results.push(hash);
|
||||||
|
}
|
||||||
|
*ha.lock().unwrap() = results;
|
||||||
|
});
|
||||||
|
|
||||||
|
let thread_b = std::thread::spawn(move || {
|
||||||
|
let mut results = Vec::with_capacity(files_b.len());
|
||||||
|
for (filename, content) in &files_b {
|
||||||
|
let hash = ingest_file(&url_b, filename, content, "application/octet-stream");
|
||||||
|
results.push(hash);
|
||||||
|
}
|
||||||
|
*hb.lock().unwrap() = results;
|
||||||
|
});
|
||||||
|
|
||||||
|
thread_a.join().expect("ingest thread A panicked");
|
||||||
|
thread_b.join().expect("ingest thread B panicked");
|
||||||
|
|
||||||
|
let a = hashes_a.lock().unwrap().clone();
|
||||||
|
let b = hashes_b.lock().unwrap().clone();
|
||||||
|
(a, b)
|
||||||
|
}
|
||||||
|
|
||||||
// ── Test runner ──────────────────────────────────────────────────────────
|
// ── Test runner ──────────────────────────────────────────────────────────
|
||||||
|
|
||||||
fn find_can_service_bin() -> PathBuf {
|
fn find_can_service_bin() -> PathBuf {
|
||||||
@ -417,19 +590,16 @@ fn find_can_service_bin() -> PathBuf {
|
|||||||
"can-service"
|
"can-service"
|
||||||
};
|
};
|
||||||
|
|
||||||
// Check same directory
|
|
||||||
let candidate = target_dir.join(bin_name);
|
let candidate = target_dir.join(bin_name);
|
||||||
if candidate.exists() {
|
if candidate.exists() {
|
||||||
return candidate;
|
return candidate;
|
||||||
}
|
}
|
||||||
|
|
||||||
// Check parent/debug
|
|
||||||
let candidate = target_dir.parent().unwrap().join("debug").join(bin_name);
|
let candidate = target_dir.parent().unwrap().join("debug").join(bin_name);
|
||||||
if candidate.exists() {
|
if candidate.exists() {
|
||||||
return candidate;
|
return candidate;
|
||||||
}
|
}
|
||||||
|
|
||||||
// Check workspace target/debug
|
|
||||||
let workspace_root = PathBuf::from(env!("CARGO_MANIFEST_DIR"))
|
let workspace_root = PathBuf::from(env!("CARGO_MANIFEST_DIR"))
|
||||||
.parent()
|
.parent()
|
||||||
.unwrap()
|
.unwrap()
|
||||||
@ -449,9 +619,9 @@ fn find_can_service_bin() -> PathBuf {
|
|||||||
}
|
}
|
||||||
|
|
||||||
fn main() {
|
fn main() {
|
||||||
println!("╔══════════════════════════════════════════╗");
|
println!("╔══════════════════════════════════════════════════╗");
|
||||||
println!("║ CAN Sync v2 Integration Test ║");
|
println!("║ CAN Sync v2 — Integration & Stress Test ║");
|
||||||
println!("╚══════════════════════════════════════════╝");
|
println!("╚══════════════════════════════════════════════════╝");
|
||||||
|
|
||||||
let can_service_bin = find_can_service_bin();
|
let can_service_bin = find_can_service_bin();
|
||||||
println!("\nUsing CAN service: {}", can_service_bin.display());
|
println!("\nUsing CAN service: {}", can_service_bin.display());
|
||||||
@ -468,84 +638,304 @@ fn main() {
|
|||||||
// Set up harness (starts all processes)
|
// Set up harness (starts all processes)
|
||||||
let harness = TestHarness::new(&can_service_bin);
|
let harness = TestHarness::new(&can_service_bin);
|
||||||
|
|
||||||
let mut passed = 0;
|
let mut passed = 0u32;
|
||||||
let mut failed = 0;
|
let mut failed = 0u32;
|
||||||
let mut results: Vec<(String, bool, String)> = vec![];
|
let mut results: Vec<(String, bool, String)> = vec![];
|
||||||
|
|
||||||
// ── Test 1: Ingest on A → appears on B ───────────────────────────
|
// ── Test 1: Single file A→B ──────────────────────────────────────
|
||||||
print_test_header("Test 1: Ingest file on CAN-A, verify sync to CAN-B");
|
print_test_header("Test 1: Single file A→B");
|
||||||
{
|
{
|
||||||
let content = random_content(4096);
|
let content = random_content(4096);
|
||||||
let hash = ingest_file(&harness.can_a_url, "test1.bin", &content, "application/octet-stream");
|
let hash = ingest_file(
|
||||||
|
&harness.can_a_url,
|
||||||
|
"test1.bin",
|
||||||
|
&content,
|
||||||
|
"application/octet-stream",
|
||||||
|
);
|
||||||
println!(" Ingested on A: hash={}", &hash[..16]);
|
println!(" Ingested on A: hash={}", &hash[..16]);
|
||||||
|
|
||||||
let found = wait_for_hash(&harness.can_b_url, &hash, SYNC_TIMEOUT);
|
let found = wait_for_hash(&harness.can_b_url, &hash, Duration::from_secs(30));
|
||||||
if found {
|
if found {
|
||||||
println!(" ✓ File appeared on CAN-B!");
|
println!(" ✓ File appeared on CAN-B");
|
||||||
results.push(("A→B sync".into(), true, format!("hash {}", &hash[..16])));
|
results.push(("A→B single".into(), true, "ok".into()));
|
||||||
passed += 1;
|
passed += 1;
|
||||||
} else {
|
} else {
|
||||||
println!(" ✗ File NOT found on CAN-B after {:?}", SYNC_TIMEOUT);
|
println!(" ✗ File NOT found on CAN-B after 30s");
|
||||||
results.push(("A→B sync".into(), false, "timeout".into()));
|
results.push(("A→B single".into(), false, "timeout".into()));
|
||||||
failed += 1;
|
failed += 1;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// ── Test 2: Ingest on B → appears on A ───────────────────────────
|
// ── Test 2: Single file B→A ──────────────────────────────────────
|
||||||
print_test_header("Test 2: Ingest file on CAN-B, verify sync to CAN-A");
|
print_test_header("Test 2: Single file B→A");
|
||||||
{
|
{
|
||||||
let content = random_content(8192);
|
let content = random_content(8192);
|
||||||
let hash = ingest_file(&harness.can_b_url, "test2.dat", &content, "application/octet-stream");
|
let hash = ingest_file(
|
||||||
|
&harness.can_b_url,
|
||||||
|
"test2.dat",
|
||||||
|
&content,
|
||||||
|
"application/octet-stream",
|
||||||
|
);
|
||||||
println!(" Ingested on B: hash={}", &hash[..16]);
|
println!(" Ingested on B: hash={}", &hash[..16]);
|
||||||
|
|
||||||
let found = wait_for_hash(&harness.can_a_url, &hash, SYNC_TIMEOUT);
|
let found = wait_for_hash(&harness.can_a_url, &hash, Duration::from_secs(30));
|
||||||
if found {
|
if found {
|
||||||
println!(" ✓ File appeared on CAN-A!");
|
println!(" ✓ File appeared on CAN-A");
|
||||||
results.push(("B→A sync".into(), true, format!("hash {}", &hash[..16])));
|
results.push(("B→A single".into(), true, "ok".into()));
|
||||||
passed += 1;
|
passed += 1;
|
||||||
} else {
|
} else {
|
||||||
println!(" ✗ File NOT found on CAN-A after {:?}", SYNC_TIMEOUT);
|
println!(" ✗ File NOT found on CAN-A after 30s");
|
||||||
results.push(("B→A sync".into(), false, "timeout".into()));
|
results.push(("B→A single".into(), false, "timeout".into()));
|
||||||
failed += 1;
|
failed += 1;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// ── Test 3: Multiple files batch ─────────────────────────────────
|
// ── Test 3: Rapid burst A→B (25 files, mixed sizes, some 10MB+) ─
|
||||||
print_test_header("Test 3: Ingest 5 files on A, verify all sync to B");
|
print_test_header(&format!(
|
||||||
|
"Test 3: Rapid burst {} files A→B (mixed sizes, some 10MB+)",
|
||||||
|
BURST_COUNT
|
||||||
|
));
|
||||||
{
|
{
|
||||||
let mut hashes = vec![];
|
println!(" Generating files...");
|
||||||
for i in 0..5 {
|
let files = generate_burst_files("burst_a2b", BURST_COUNT);
|
||||||
let content = random_content(1024 + i * 512);
|
|
||||||
let fname = format!("batch_{}.bin", i);
|
|
||||||
let hash = ingest_file(&harness.can_a_url, &fname, &content, "application/octet-stream");
|
|
||||||
println!(" Ingested batch file {}: hash={}", i, &hash[..16]);
|
|
||||||
hashes.push(hash);
|
|
||||||
}
|
|
||||||
|
|
||||||
let mut all_found = true;
|
println!(" Ingesting rapidly on CAN-A...");
|
||||||
for (i, hash) in hashes.iter().enumerate() {
|
let hashes = rapid_ingest(&harness.can_a_url, &files);
|
||||||
let found = wait_for_hash(&harness.can_b_url, hash, SYNC_TIMEOUT);
|
|
||||||
if found {
|
|
||||||
println!(" ✓ Batch file {} synced", i);
|
|
||||||
} else {
|
|
||||||
println!(" ✗ Batch file {} NOT synced", i);
|
|
||||||
all_found = false;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if all_found {
|
println!(" Waiting for all {} files to sync to CAN-B...", hashes.len());
|
||||||
results.push(("Batch A→B (5 files)".into(), true, "all synced".into()));
|
let (found, total, elapsed) = wait_for_all_hashes(&harness.can_b_url, &hashes, SYNC_TIMEOUT);
|
||||||
|
|
||||||
|
if found == total {
|
||||||
|
println!(
|
||||||
|
" ✓ All {} files synced to B in {:.1}s",
|
||||||
|
total,
|
||||||
|
elapsed.as_secs_f64()
|
||||||
|
);
|
||||||
|
let total_bytes: usize = files.iter().map(|(_, c)| c.len()).sum();
|
||||||
|
println!(
|
||||||
|
" Throughput: {}/s",
|
||||||
|
human_size((total_bytes as f64 / elapsed.as_secs_f64()) as usize)
|
||||||
|
);
|
||||||
|
results.push((
|
||||||
|
format!("Burst A→B ({})", total),
|
||||||
|
true,
|
||||||
|
format!("{:.1}s", elapsed.as_secs_f64()),
|
||||||
|
));
|
||||||
passed += 1;
|
passed += 1;
|
||||||
} else {
|
} else {
|
||||||
results.push(("Batch A→B (5 files)".into(), false, "some missing".into()));
|
println!(
|
||||||
|
" ✗ Only {}/{} files synced after {:.1}s",
|
||||||
|
found,
|
||||||
|
total,
|
||||||
|
elapsed.as_secs_f64()
|
||||||
|
);
|
||||||
|
// Report which ones are missing
|
||||||
|
let b_hashes: HashSet<String> = list_hashes(&harness.can_b_url).into_iter().collect();
|
||||||
|
let missing: Vec<_> = hashes
|
||||||
|
.iter()
|
||||||
|
.enumerate()
|
||||||
|
.filter(|(_, h)| !b_hashes.contains(*h))
|
||||||
|
.map(|(i, h)| format!("#{} {} ({})", i, &h[..12], human_size(files[i].1.len())))
|
||||||
|
.collect();
|
||||||
|
if missing.len() <= 10 {
|
||||||
|
for m in &missing {
|
||||||
|
println!(" MISSING: {}", m);
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
println!(" {} files missing (showing first 10):", missing.len());
|
||||||
|
for m in &missing[..10] {
|
||||||
|
println!(" MISSING: {}", m);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
results.push((
|
||||||
|
format!("Burst A→B ({})", total),
|
||||||
|
false,
|
||||||
|
format!("{}/{}", found, total),
|
||||||
|
));
|
||||||
failed += 1;
|
failed += 1;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// ── Test 4: Verify total counts match ────────────────────────────
|
// Small pause to let things settle
|
||||||
print_test_header("Test 4: Verify asset counts match on both sides");
|
println!("\n (pause 3s between tests)");
|
||||||
|
std::thread::sleep(Duration::from_secs(3));
|
||||||
|
|
||||||
|
// ── Test 4: Rapid burst B→A (25 files, mixed sizes, some 10MB+) ─
|
||||||
|
print_test_header(&format!(
|
||||||
|
"Test 4: Rapid burst {} files B→A (mixed sizes, some 10MB+)",
|
||||||
|
BURST_COUNT
|
||||||
|
));
|
||||||
{
|
{
|
||||||
std::thread::sleep(Duration::from_secs(5));
|
println!(" Generating files...");
|
||||||
|
let files = generate_burst_files("burst_b2a", BURST_COUNT);
|
||||||
|
|
||||||
|
println!(" Ingesting rapidly on CAN-B...");
|
||||||
|
let hashes = rapid_ingest(&harness.can_b_url, &files);
|
||||||
|
|
||||||
|
println!(" Waiting for all {} files to sync to CAN-A...", hashes.len());
|
||||||
|
let (found, total, elapsed) = wait_for_all_hashes(&harness.can_a_url, &hashes, SYNC_TIMEOUT);
|
||||||
|
|
||||||
|
if found == total {
|
||||||
|
println!(
|
||||||
|
" ✓ All {} files synced to A in {:.1}s",
|
||||||
|
total,
|
||||||
|
elapsed.as_secs_f64()
|
||||||
|
);
|
||||||
|
let total_bytes: usize = files.iter().map(|(_, c)| c.len()).sum();
|
||||||
|
println!(
|
||||||
|
" Throughput: {}/s",
|
||||||
|
human_size((total_bytes as f64 / elapsed.as_secs_f64()) as usize)
|
||||||
|
);
|
||||||
|
results.push((
|
||||||
|
format!("Burst B→A ({})", total),
|
||||||
|
true,
|
||||||
|
format!("{:.1}s", elapsed.as_secs_f64()),
|
||||||
|
));
|
||||||
|
passed += 1;
|
||||||
|
} else {
|
||||||
|
println!(
|
||||||
|
" ✗ Only {}/{} files synced after {:.1}s",
|
||||||
|
found,
|
||||||
|
total,
|
||||||
|
elapsed.as_secs_f64()
|
||||||
|
);
|
||||||
|
let a_hashes: HashSet<String> = list_hashes(&harness.can_a_url).into_iter().collect();
|
||||||
|
let missing: Vec<_> = hashes
|
||||||
|
.iter()
|
||||||
|
.enumerate()
|
||||||
|
.filter(|(_, h)| !a_hashes.contains(*h))
|
||||||
|
.map(|(i, h)| format!("#{} {} ({})", i, &h[..12], human_size(files[i].1.len())))
|
||||||
|
.collect();
|
||||||
|
if missing.len() <= 10 {
|
||||||
|
for m in &missing {
|
||||||
|
println!(" MISSING: {}", m);
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
println!(" {} files missing (showing first 10):", missing.len());
|
||||||
|
for m in &missing[..10] {
|
||||||
|
println!(" MISSING: {}", m);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
results.push((
|
||||||
|
format!("Burst B→A ({})", total),
|
||||||
|
false,
|
||||||
|
format!("{}/{}", found, total),
|
||||||
|
));
|
||||||
|
failed += 1;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
println!("\n (pause 3s between tests)");
|
||||||
|
std::thread::sleep(Duration::from_secs(3));
|
||||||
|
|
||||||
|
// ── Test 5: Simultaneous burst — 25 files on EACH side at once ───
|
||||||
|
print_test_header(&format!(
|
||||||
|
"Test 5: Simultaneous burst — {} files on EACH side at once",
|
||||||
|
BURST_COUNT
|
||||||
|
));
|
||||||
|
{
|
||||||
|
println!(" Generating files for BOTH sides...");
|
||||||
|
let files_for_a = generate_burst_files("simul_onA", BURST_COUNT);
|
||||||
|
let files_for_b = generate_burst_files("simul_onB", BURST_COUNT);
|
||||||
|
|
||||||
|
let total_bytes: usize = files_for_a.iter().map(|(_, c)| c.len()).sum::<usize>()
|
||||||
|
+ files_for_b.iter().map(|(_, c)| c.len()).sum::<usize>();
|
||||||
|
println!(
|
||||||
|
" Total data: {} across {} files",
|
||||||
|
human_size(total_bytes),
|
||||||
|
BURST_COUNT * 2
|
||||||
|
);
|
||||||
|
|
||||||
|
println!(" Ingesting on BOTH sides simultaneously...");
|
||||||
|
let start = Instant::now();
|
||||||
|
let (hashes_a, hashes_b) = parallel_ingest(
|
||||||
|
&harness.can_a_url,
|
||||||
|
&files_for_a,
|
||||||
|
&harness.can_b_url,
|
||||||
|
&files_for_b,
|
||||||
|
);
|
||||||
|
let ingest_elapsed = start.elapsed();
|
||||||
|
println!(
|
||||||
|
" Parallel ingest done in {:.1}s ({} on A, {} on B)",
|
||||||
|
ingest_elapsed.as_secs_f64(),
|
||||||
|
hashes_a.len(),
|
||||||
|
hashes_b.len()
|
||||||
|
);
|
||||||
|
|
||||||
|
// Now wait for cross-sync: files from A should appear on B, files from B on A
|
||||||
|
println!(
|
||||||
|
" Waiting for A's {} files to appear on B...",
|
||||||
|
hashes_a.len()
|
||||||
|
);
|
||||||
|
let (found_on_b, total_a, elapsed_b) =
|
||||||
|
wait_for_all_hashes(&harness.can_b_url, &hashes_a, SYNC_TIMEOUT);
|
||||||
|
|
||||||
|
println!(
|
||||||
|
" Waiting for B's {} files to appear on A...",
|
||||||
|
hashes_b.len()
|
||||||
|
);
|
||||||
|
let (found_on_a, total_b, elapsed_a) =
|
||||||
|
wait_for_all_hashes(&harness.can_a_url, &hashes_b, SYNC_TIMEOUT);
|
||||||
|
|
||||||
|
let a_ok = found_on_b == total_a;
|
||||||
|
let b_ok = found_on_a == total_b;
|
||||||
|
|
||||||
|
if a_ok && b_ok {
|
||||||
|
let max_elapsed = elapsed_a.max(elapsed_b);
|
||||||
|
println!(
|
||||||
|
" ✓ Bidirectional sync complete! A→B: {}/{} in {:.1}s, B→A: {}/{} in {:.1}s",
|
||||||
|
found_on_b,
|
||||||
|
total_a,
|
||||||
|
elapsed_b.as_secs_f64(),
|
||||||
|
found_on_a,
|
||||||
|
total_b,
|
||||||
|
elapsed_a.as_secs_f64()
|
||||||
|
);
|
||||||
|
println!(
|
||||||
|
" Effective throughput: {}/s (both directions)",
|
||||||
|
human_size((total_bytes as f64 / max_elapsed.as_secs_f64()) as usize)
|
||||||
|
);
|
||||||
|
results.push((
|
||||||
|
format!("Simul {}+{}", BURST_COUNT, BURST_COUNT),
|
||||||
|
true,
|
||||||
|
format!("{:.1}s", max_elapsed.as_secs_f64()),
|
||||||
|
));
|
||||||
|
passed += 1;
|
||||||
|
} else {
|
||||||
|
println!(
|
||||||
|
" ✗ A→B: {}/{}, B→A: {}/{}",
|
||||||
|
found_on_b, total_a, found_on_a, total_b
|
||||||
|
);
|
||||||
|
if !a_ok {
|
||||||
|
let b_hashes: HashSet<String> =
|
||||||
|
list_hashes(&harness.can_b_url).into_iter().collect();
|
||||||
|
let missing_count = hashes_a.iter().filter(|h| !b_hashes.contains(*h)).count();
|
||||||
|
println!(" A→B: {} files missing on B", missing_count);
|
||||||
|
}
|
||||||
|
if !b_ok {
|
||||||
|
let a_hashes: HashSet<String> =
|
||||||
|
list_hashes(&harness.can_a_url).into_iter().collect();
|
||||||
|
let missing_count = hashes_b.iter().filter(|h| !a_hashes.contains(*h)).count();
|
||||||
|
println!(" B→A: {} files missing on A", missing_count);
|
||||||
|
}
|
||||||
|
results.push((
|
||||||
|
format!("Simul {}+{}", BURST_COUNT, BURST_COUNT),
|
||||||
|
false,
|
||||||
|
format!(
|
||||||
|
"A→B {}/{} B→A {}/{}",
|
||||||
|
found_on_b, total_a, found_on_a, total_b
|
||||||
|
),
|
||||||
|
));
|
||||||
|
failed += 1;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// ── Test 6: Final full-mirror verification ───────────────────────
|
||||||
|
print_test_header("Test 6: Final full-mirror verification");
|
||||||
|
{
|
||||||
|
// Give a final settlement window
|
||||||
|
println!(" Waiting 10s for any stragglers...");
|
||||||
|
std::thread::sleep(Duration::from_secs(10));
|
||||||
|
|
||||||
let a_hashes = list_hashes(&harness.can_a_url);
|
let a_hashes = list_hashes(&harness.can_a_url);
|
||||||
let b_hashes = list_hashes(&harness.can_b_url);
|
let b_hashes = list_hashes(&harness.can_b_url);
|
||||||
@ -553,53 +943,89 @@ fn main() {
|
|||||||
println!(" CAN-A has {} assets", a_hashes.len());
|
println!(" CAN-A has {} assets", a_hashes.len());
|
||||||
println!(" CAN-B has {} assets", b_hashes.len());
|
println!(" CAN-B has {} assets", b_hashes.len());
|
||||||
|
|
||||||
if a_hashes.len() == b_hashes.len() {
|
let a_set: HashSet<&String> = a_hashes.iter().collect();
|
||||||
let a_set: std::collections::HashSet<_> = a_hashes.iter().collect();
|
let b_set: HashSet<&String> = b_hashes.iter().collect();
|
||||||
let b_set: std::collections::HashSet<_> = b_hashes.iter().collect();
|
|
||||||
let matching = a_set == b_set;
|
|
||||||
|
|
||||||
if matching {
|
|
||||||
println!(" ✓ Both sides have identical asset sets ({} assets)", a_hashes.len());
|
|
||||||
results.push(("Count match".into(), true, format!("{} == {}", a_hashes.len(), b_hashes.len())));
|
|
||||||
passed += 1;
|
|
||||||
} else {
|
|
||||||
println!(" ✗ Same count but different hashes!");
|
|
||||||
let only_a: Vec<_> = a_set.difference(&b_set).collect();
|
let only_a: Vec<_> = a_set.difference(&b_set).collect();
|
||||||
let only_b: Vec<_> = b_set.difference(&a_set).collect();
|
let only_b: Vec<_> = b_set.difference(&a_set).collect();
|
||||||
|
|
||||||
|
if only_a.is_empty() && only_b.is_empty() && a_hashes.len() == b_hashes.len() {
|
||||||
|
println!(
|
||||||
|
" ✓ Perfect mirror! {} assets identical on both sides",
|
||||||
|
a_hashes.len()
|
||||||
|
);
|
||||||
|
results.push((
|
||||||
|
"Full mirror".into(),
|
||||||
|
true,
|
||||||
|
format!("{} assets", a_hashes.len()),
|
||||||
|
));
|
||||||
|
passed += 1;
|
||||||
|
} else {
|
||||||
if !only_a.is_empty() {
|
if !only_a.is_empty() {
|
||||||
println!(" Only on A: {:?}", only_a.iter().map(|h| &h[..16]).collect::<Vec<_>>());
|
println!(" ✗ {} assets only on A:", only_a.len());
|
||||||
|
for h in only_a.iter().take(5) {
|
||||||
|
println!(" {}", &h[..16]);
|
||||||
|
}
|
||||||
|
if only_a.len() > 5 {
|
||||||
|
println!(" ... and {} more", only_a.len() - 5);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
if !only_b.is_empty() {
|
if !only_b.is_empty() {
|
||||||
println!(" Only on B: {:?}", only_b.iter().map(|h| &h[..16]).collect::<Vec<_>>());
|
println!(" ✗ {} assets only on B:", only_b.len());
|
||||||
|
for h in only_b.iter().take(5) {
|
||||||
|
println!(" {}", &h[..16]);
|
||||||
}
|
}
|
||||||
results.push(("Count match".into(), false, "hash mismatch".into()));
|
if only_b.len() > 5 {
|
||||||
failed += 1;
|
println!(" ... and {} more", only_b.len() - 5);
|
||||||
}
|
}
|
||||||
} else {
|
}
|
||||||
println!(" ✗ Count mismatch: A={}, B={}", a_hashes.len(), b_hashes.len());
|
results.push((
|
||||||
results.push(("Count match".into(), false, format!("{} != {}", a_hashes.len(), b_hashes.len())));
|
"Full mirror".into(),
|
||||||
|
false,
|
||||||
|
format!(
|
||||||
|
"A={} B={} onlyA={} onlyB={}",
|
||||||
|
a_hashes.len(),
|
||||||
|
b_hashes.len(),
|
||||||
|
only_a.len(),
|
||||||
|
only_b.len()
|
||||||
|
),
|
||||||
|
));
|
||||||
failed += 1;
|
failed += 1;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// ── Results ──────────────────────────────────────────────────────
|
// ── Results ──────────────────────────────────────────────────────
|
||||||
println!("\n╔══════════════════════════════════════════╗");
|
let expected_total = 2 + BURST_COUNT * 2 + BURST_COUNT * 2; // singles + bursts + simul
|
||||||
|
let total_large = BURST_COUNT * 3 / 5; // roughly every 5th file is large, across 3 batches
|
||||||
|
|
||||||
|
println!("\n╔════════════════════════════════════════════════════╗");
|
||||||
println!("║ Test Results ║");
|
println!("║ Test Results ║");
|
||||||
println!("╠══════════════════════════════════════════╣");
|
println!("╠════════════════════════════════════════════════════╣");
|
||||||
for (name, pass, detail) in &results {
|
for (name, pass, detail) in &results {
|
||||||
let icon = if *pass { "✓" } else { "✗" };
|
let icon = if *pass { "✓" } else { "✗" };
|
||||||
println!("║ {} {:<25} {}",
|
let detail_trunc = if detail.len() > 20 {
|
||||||
icon,
|
&detail[..20]
|
||||||
name,
|
} else {
|
||||||
if detail.len() > 12 { &detail[..12] } else { detail }
|
detail
|
||||||
);
|
};
|
||||||
|
println!("║ {} {:<28} {}", icon, name, detail_trunc);
|
||||||
}
|
}
|
||||||
println!("╠══════════════════════════════════════════╣");
|
println!("╠════════════════════════════════════════════════════╣");
|
||||||
println!("║ Passed: {} Failed: {} ║", passed, failed);
|
println!(
|
||||||
println!("╚══════════════════════════════════════════╝");
|
"║ Passed: {} Failed: {} ║",
|
||||||
|
passed, failed
|
||||||
|
);
|
||||||
|
println!(
|
||||||
|
"║ Files: ~{} total, ~{} large (10MB+) ║",
|
||||||
|
expected_total + 2, // +2 for the single file tests
|
||||||
|
total_large
|
||||||
|
);
|
||||||
|
println!("╚════════════════════════════════════════════════════╝");
|
||||||
|
|
||||||
// Always print logs
|
// Print logs on failure
|
||||||
|
if failed > 0 {
|
||||||
harness.print_logs();
|
harness.print_logs();
|
||||||
|
}
|
||||||
|
|
||||||
// Clean up
|
// Clean up
|
||||||
println!("\n=== Cleaning up ===\n");
|
println!("\n=== Cleaning up ===\n");
|
||||||
@ -612,5 +1038,7 @@ fn main() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
fn print_test_header(name: &str) {
|
fn print_test_header(name: &str) {
|
||||||
println!("\n--- {} ---\n", name);
|
println!("\n╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌");
|
||||||
|
println!(" {}", name);
|
||||||
|
println!("╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌\n");
|
||||||
}
|
}
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user