Fix QUIC stream protocol bugs and add integration tests
- Fix bidirectional stream handling: responder uses accept_bi() instead of open_bi() so both sides communicate on the same stream - Add live_receive_loop to accept incoming bi-streams during ongoing sync (peer's push loop opens new streams per batch) - Split live_sync_loop into live_push_loop + live_receive_loop running concurrently via tokio::select in new run_live_sync() - Update handle_incoming to run live sync after initial reconciliation - Add direct peer connection via ticket files (EndpointAddr JSON exchange) for local testing without gossip bootstrap - Add CAN_PORT env var override for running multiple CAN instances - Add integration test binary (sync_test.rs): starts 2 CAN services + 2 sync agents, ingests files on each side, verifies bidirectional sync with 4 test cases (A→B, B→A, batch, count match) - Add PowerShell script (run-integration-test.ps1) for one-command test Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
parent
a28fac6c9a
commit
06c01baf44
21
examples/can-sync/Cargo.lock
generated
21
examples/can-sync/Cargo.lock
generated
@ -234,9 +234,12 @@ dependencies = [
|
|||||||
"iroh-gossip",
|
"iroh-gossip",
|
||||||
"n0-future 0.1.3",
|
"n0-future 0.1.3",
|
||||||
"prost",
|
"prost",
|
||||||
|
"rand",
|
||||||
"reqwest 0.12.28",
|
"reqwest 0.12.28",
|
||||||
"serde",
|
"serde",
|
||||||
|
"serde_json",
|
||||||
"serde_yaml",
|
"serde_yaml",
|
||||||
|
"tempfile",
|
||||||
"tokio",
|
"tokio",
|
||||||
"tracing",
|
"tracing",
|
||||||
"tracing-subscriber",
|
"tracing-subscriber",
|
||||||
@ -1928,6 +1931,16 @@ version = "0.3.17"
|
|||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "6877bb514081ee2a7ff5ef9de3281f14a4dd4bceac4c09388074a6b5df8a139a"
|
checksum = "6877bb514081ee2a7ff5ef9de3281f14a4dd4bceac4c09388074a6b5df8a139a"
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "mime_guess"
|
||||||
|
version = "2.0.5"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "f7c44f8e672c00fe5308fa235f821cb4198414e1c77935c1ab6948d3fd78550e"
|
||||||
|
dependencies = [
|
||||||
|
"mime",
|
||||||
|
"unicase",
|
||||||
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "mio"
|
name = "mio"
|
||||||
version = "1.1.1"
|
version = "1.1.1"
|
||||||
@ -2794,6 +2807,7 @@ dependencies = [
|
|||||||
"base64",
|
"base64",
|
||||||
"bytes",
|
"bytes",
|
||||||
"encoding_rs",
|
"encoding_rs",
|
||||||
|
"futures-channel",
|
||||||
"futures-core",
|
"futures-core",
|
||||||
"futures-util",
|
"futures-util",
|
||||||
"h2",
|
"h2",
|
||||||
@ -2807,6 +2821,7 @@ dependencies = [
|
|||||||
"js-sys",
|
"js-sys",
|
||||||
"log",
|
"log",
|
||||||
"mime",
|
"mime",
|
||||||
|
"mime_guess",
|
||||||
"native-tls",
|
"native-tls",
|
||||||
"percent-encoding",
|
"percent-encoding",
|
||||||
"pin-project-lite",
|
"pin-project-lite",
|
||||||
@ -3769,6 +3784,12 @@ version = "1.19.0"
|
|||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "562d481066bde0658276a35467c4af00bdc6ee726305698a55b86e61d7ad82bb"
|
checksum = "562d481066bde0658276a35467c4af00bdc6ee726305698a55b86e61d7ad82bb"
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "unicase"
|
||||||
|
version = "2.9.0"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "dbc4bc3a9f746d862c45cb89d705aa10f187bb96c76001afab07a0d35ce60142"
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "unicode-ident"
|
name = "unicode-ident"
|
||||||
version = "1.0.24"
|
version = "1.0.24"
|
||||||
|
|||||||
@ -8,6 +8,10 @@ description = "P2P sync agent for CAN service — full mirror replication via ir
|
|||||||
name = "can-sync"
|
name = "can-sync"
|
||||||
path = "src/main.rs"
|
path = "src/main.rs"
|
||||||
|
|
||||||
|
[[bin]]
|
||||||
|
name = "sync-test"
|
||||||
|
path = "tests/sync_test.rs"
|
||||||
|
|
||||||
[dependencies]
|
[dependencies]
|
||||||
# P2P networking (iroh for transport + gossip for discovery — NO iroh-docs)
|
# P2P networking (iroh for transport + gossip for discovery — NO iroh-docs)
|
||||||
iroh = "0.96"
|
iroh = "0.96"
|
||||||
@ -17,7 +21,7 @@ iroh-gossip = "0.96"
|
|||||||
prost = "0.13"
|
prost = "0.13"
|
||||||
|
|
||||||
# HTTP client for CAN service sync API
|
# HTTP client for CAN service sync API
|
||||||
reqwest = { version = "0.12", features = ["json"] }
|
reqwest = { version = "0.12", features = ["json", "multipart", "blocking"] }
|
||||||
|
|
||||||
# Serialization
|
# Serialization
|
||||||
serde = { version = "1", features = ["derive"] }
|
serde = { version = "1", features = ["derive"] }
|
||||||
@ -40,3 +44,6 @@ n0-future = "0.1"
|
|||||||
anyhow = "1"
|
anyhow = "1"
|
||||||
bytes = "1"
|
bytes = "1"
|
||||||
hex = "0.4"
|
hex = "0.4"
|
||||||
|
serde_json = "1"
|
||||||
|
tempfile = "3"
|
||||||
|
rand = "0.9"
|
||||||
|
|||||||
75
examples/can-sync/run-integration-test.ps1
Normal file
75
examples/can-sync/run-integration-test.ps1
Normal file
@ -0,0 +1,75 @@
|
|||||||
|
#!/usr/bin/env pwsh
|
||||||
|
# CAN Sync v2 Integration Test Runner
|
||||||
|
#
|
||||||
|
# Usage:
|
||||||
|
# .\run-integration-test.ps1 # Build + run test
|
||||||
|
# .\run-integration-test.ps1 -NoBuild # Skip building, just run
|
||||||
|
|
||||||
|
param(
|
||||||
|
[switch]$NoBuild
|
||||||
|
)
|
||||||
|
|
||||||
|
$ErrorActionPreference = "Stop"
|
||||||
|
$canServiceRoot = Resolve-Path (Join-Path $PSScriptRoot "../..")
|
||||||
|
$canSyncRoot = $PSScriptRoot
|
||||||
|
|
||||||
|
Write-Host ""
|
||||||
|
Write-Host "========================================" -ForegroundColor Cyan
|
||||||
|
Write-Host " CAN Sync v2 - Integration Test Runner" -ForegroundColor Cyan
|
||||||
|
Write-Host "========================================" -ForegroundColor Cyan
|
||||||
|
Write-Host ""
|
||||||
|
|
||||||
|
# Step 1: Build CAN service
|
||||||
|
if (-not $NoBuild) {
|
||||||
|
Write-Host "[1/3] Building CAN service..." -ForegroundColor Yellow
|
||||||
|
Push-Location $canServiceRoot
|
||||||
|
try {
|
||||||
|
cargo build 2>&1 | ForEach-Object { Write-Host " $_" -ForegroundColor DarkGray }
|
||||||
|
if ($LASTEXITCODE -ne 0) {
|
||||||
|
Write-Host "FAILED: CAN service build failed!" -ForegroundColor Red
|
||||||
|
exit 1
|
||||||
|
}
|
||||||
|
Write-Host " CAN service built OK" -ForegroundColor Green
|
||||||
|
} finally {
|
||||||
|
Pop-Location
|
||||||
|
}
|
||||||
|
|
||||||
|
# Step 2: Build can-sync + sync-test
|
||||||
|
Write-Host ""
|
||||||
|
Write-Host "[2/3] Building can-sync and sync-test..." -ForegroundColor Yellow
|
||||||
|
Push-Location $canSyncRoot
|
||||||
|
try {
|
||||||
|
cargo build --bin can-sync --bin sync-test 2>&1 | ForEach-Object { Write-Host " $_" -ForegroundColor DarkGray }
|
||||||
|
if ($LASTEXITCODE -ne 0) {
|
||||||
|
Write-Host "FAILED: can-sync build failed!" -ForegroundColor Red
|
||||||
|
exit 1
|
||||||
|
}
|
||||||
|
Write-Host " can-sync built OK" -ForegroundColor Green
|
||||||
|
} finally {
|
||||||
|
Pop-Location
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
Write-Host "[SKIP] Builds skipped (-NoBuild)" -ForegroundColor DarkYellow
|
||||||
|
}
|
||||||
|
|
||||||
|
# Step 3: Run integration test
|
||||||
|
Write-Host ""
|
||||||
|
Write-Host "[3/3] Running integration test..." -ForegroundColor Yellow
|
||||||
|
Write-Host ""
|
||||||
|
|
||||||
|
Push-Location $canSyncRoot
|
||||||
|
try {
|
||||||
|
cargo run --bin sync-test
|
||||||
|
$testResult = $LASTEXITCODE
|
||||||
|
} finally {
|
||||||
|
Pop-Location
|
||||||
|
}
|
||||||
|
|
||||||
|
Write-Host ""
|
||||||
|
if ($testResult -eq 0) {
|
||||||
|
Write-Host "ALL TESTS PASSED" -ForegroundColor Green
|
||||||
|
} else {
|
||||||
|
Write-Host "SOME TESTS FAILED (exit code: $testResult)" -ForegroundColor Red
|
||||||
|
}
|
||||||
|
|
||||||
|
exit $testResult
|
||||||
@ -15,6 +15,16 @@ pub struct SyncConfig {
|
|||||||
/// Seconds between polls for new local assets
|
/// Seconds between polls for new local assets
|
||||||
#[serde(default = "default_poll_interval")]
|
#[serde(default = "default_poll_interval")]
|
||||||
pub poll_interval_secs: u64,
|
pub poll_interval_secs: u64,
|
||||||
|
|
||||||
|
/// Optional: path to write this node's ticket to (for direct connection)
|
||||||
|
#[serde(default)]
|
||||||
|
pub ticket_file: Option<String>,
|
||||||
|
|
||||||
|
/// Optional: path to a file containing a peer's node ticket (for direct connection).
|
||||||
|
/// If set, the agent will read this ticket and connect directly instead of waiting
|
||||||
|
/// for gossip discovery. The file is polled until it exists and is non-empty.
|
||||||
|
#[serde(default)]
|
||||||
|
pub connect_ticket_file: Option<String>,
|
||||||
}
|
}
|
||||||
|
|
||||||
fn default_poll_interval() -> u64 {
|
fn default_poll_interval() -> u64 {
|
||||||
|
|||||||
@ -12,9 +12,11 @@ mod discovery;
|
|||||||
mod peer;
|
mod peer;
|
||||||
mod protocol;
|
mod protocol;
|
||||||
|
|
||||||
|
use std::path::Path;
|
||||||
|
|
||||||
use anyhow::{Context, Result};
|
use anyhow::{Context, Result};
|
||||||
use iroh::endpoint::presets::N0;
|
use iroh::endpoint::presets::N0;
|
||||||
use iroh::{Endpoint, EndpointId};
|
use iroh::{Endpoint, EndpointAddr, EndpointId};
|
||||||
use iroh_gossip::net::Gossip;
|
use iroh_gossip::net::Gossip;
|
||||||
use tokio::sync::mpsc;
|
use tokio::sync::mpsc;
|
||||||
use tracing::{error, info, warn};
|
use tracing::{error, info, warn};
|
||||||
@ -40,7 +42,7 @@ async fn main() -> Result<()> {
|
|||||||
let config_path = std::env::args()
|
let config_path = std::env::args()
|
||||||
.nth(1)
|
.nth(1)
|
||||||
.unwrap_or_else(|| "config.yaml".to_string());
|
.unwrap_or_else(|| "config.yaml".to_string());
|
||||||
let config = SyncConfig::load(std::path::Path::new(&config_path))
|
let config = SyncConfig::load(Path::new(&config_path))
|
||||||
.with_context(|| format!("loading config from {}", config_path))?;
|
.with_context(|| format!("loading config from {}", config_path))?;
|
||||||
|
|
||||||
info!("CAN Sync v2 starting");
|
info!("CAN Sync v2 starting");
|
||||||
@ -73,32 +75,102 @@ async fn main() -> Result<()> {
|
|||||||
info!("Listening on {}", addr);
|
info!("Listening on {}", addr);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Write our EndpointAddr to file if configured (for direct peer connection in tests)
|
||||||
|
if let Some(ref ticket_path) = config.ticket_file {
|
||||||
|
// Wait briefly for the endpoint to register with relay
|
||||||
|
tokio::time::sleep(std::time::Duration::from_millis(500)).await;
|
||||||
|
|
||||||
|
let addr = endpoint.addr();
|
||||||
|
let addr_json = serde_json::to_string(&addr)
|
||||||
|
.context("serializing EndpointAddr")?;
|
||||||
|
std::fs::write(ticket_path, &addr_json)
|
||||||
|
.with_context(|| format!("writing addr to {}", ticket_path))?;
|
||||||
|
info!("Wrote EndpointAddr to {}", ticket_path);
|
||||||
|
}
|
||||||
|
|
||||||
// Create gossip instance for peer discovery (not async — returns directly)
|
// Create gossip instance for peer discovery (not async — returns directly)
|
||||||
let gossip = Gossip::builder().spawn(endpoint.clone());
|
let gossip = Gossip::builder().spawn(endpoint.clone());
|
||||||
|
|
||||||
// Channel for discovered peers
|
// Channel for discovered peers
|
||||||
let (peer_tx, mut peer_rx) = mpsc::channel::<EndpointId>(32);
|
let (peer_tx, mut peer_rx) = mpsc::channel::<EndpointId>(32);
|
||||||
|
|
||||||
// Spawn discovery
|
// Spawn discovery via gossip
|
||||||
let disc = Discovery::new(endpoint.clone(), gossip.clone(), &config.sync_passphrase);
|
let disc = Discovery::new(endpoint.clone(), gossip.clone(), &config.sync_passphrase);
|
||||||
tokio::spawn(async move {
|
tokio::spawn(async move {
|
||||||
if let Err(e) = disc.run(peer_tx).await {
|
if let Err(e) = disc.run(peer_tx.clone()).await {
|
||||||
error!("Discovery failed: {:#}", e);
|
error!("Discovery failed: {:#}", e);
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
|
||||||
|
// If a direct connect ticket file is specified, spawn a task to read it and connect
|
||||||
|
if let Some(ref ticket_path) = config.connect_ticket_file {
|
||||||
|
let ticket_path = ticket_path.clone();
|
||||||
|
let endpoint_direct = endpoint.clone();
|
||||||
|
let can_direct = can.clone();
|
||||||
|
let poll_interval = std::time::Duration::from_secs(config.poll_interval_secs);
|
||||||
|
|
||||||
|
tokio::spawn(async move {
|
||||||
|
info!("Waiting for peer addr file: {}", ticket_path);
|
||||||
|
|
||||||
|
// Poll until the file exists and is non-empty
|
||||||
|
let addr_json = loop {
|
||||||
|
match std::fs::read_to_string(&ticket_path) {
|
||||||
|
Ok(s) if !s.trim().is_empty() => break s.trim().to_string(),
|
||||||
|
_ => tokio::time::sleep(std::time::Duration::from_millis(200)).await,
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
info!("Read peer addr from {}", ticket_path);
|
||||||
|
|
||||||
|
let peer_addr: EndpointAddr = match serde_json::from_str(&addr_json) {
|
||||||
|
Ok(a) => a,
|
||||||
|
Err(e) => {
|
||||||
|
error!("Invalid EndpointAddr JSON: {:#}", e);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
let peer_id = peer_addr.id;
|
||||||
|
let short = peer_id.fmt_short().to_string();
|
||||||
|
info!("Direct connecting to peer: {} (from addr file)", short);
|
||||||
|
|
||||||
|
match endpoint_direct.connect(peer_addr, SYNC_ALPN).await {
|
||||||
|
Ok(conn) => {
|
||||||
|
info!("Direct connection to {} established!", short);
|
||||||
|
|
||||||
|
// Initial reconciliation
|
||||||
|
if let Err(e) = peer::run_sync_session(conn.clone(), can_direct.clone(), true).await {
|
||||||
|
error!("Initial sync with {} failed: {:#}", short, e);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
info!("Initial sync with {} complete, starting live sync", short);
|
||||||
|
|
||||||
|
// Live sync: push new local assets + accept incoming streams
|
||||||
|
peer::run_live_sync(conn, can_direct, poll_interval).await;
|
||||||
|
}
|
||||||
|
Err(e) => {
|
||||||
|
error!("Failed to connect to {}: {:#}", short, e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
// Spawn incoming connection handler
|
// Spawn incoming connection handler
|
||||||
let endpoint_accept = endpoint.clone();
|
let endpoint_accept = endpoint.clone();
|
||||||
let can_accept = can.clone();
|
let can_accept = can.clone();
|
||||||
|
let accept_poll_interval = std::time::Duration::from_secs(config.poll_interval_secs);
|
||||||
tokio::spawn(async move {
|
tokio::spawn(async move {
|
||||||
loop {
|
loop {
|
||||||
match endpoint_accept.accept().await {
|
match endpoint_accept.accept().await {
|
||||||
Some(incoming) => {
|
Some(incoming) => {
|
||||||
let can_clone = can_accept.clone();
|
let can_clone = can_accept.clone();
|
||||||
|
let poll_dur = accept_poll_interval;
|
||||||
tokio::spawn(async move {
|
tokio::spawn(async move {
|
||||||
match incoming.await {
|
match incoming.await {
|
||||||
Ok(conn) => {
|
Ok(conn) => {
|
||||||
peer::handle_incoming(conn, can_clone).await;
|
info!("Accepted incoming connection from {}", conn.remote_id().fmt_short());
|
||||||
|
peer::handle_incoming(conn, can_clone, poll_dur).await;
|
||||||
}
|
}
|
||||||
Err(e) => {
|
Err(e) => {
|
||||||
warn!("Failed to accept connection: {:#}", e);
|
warn!("Failed to accept connection: {:#}", e);
|
||||||
@ -114,7 +186,7 @@ async fn main() -> Result<()> {
|
|||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
|
||||||
// Main loop: connect to discovered peers and sync
|
// Main loop: connect to discovered peers (from gossip) and sync
|
||||||
let poll_interval = std::time::Duration::from_secs(config.poll_interval_secs);
|
let poll_interval = std::time::Duration::from_secs(config.poll_interval_secs);
|
||||||
|
|
||||||
info!("Waiting for peers...");
|
info!("Waiting for peers...");
|
||||||
@ -128,7 +200,6 @@ async fn main() -> Result<()> {
|
|||||||
let poll_dur = poll_interval;
|
let poll_dur = poll_interval;
|
||||||
|
|
||||||
tokio::spawn(async move {
|
tokio::spawn(async move {
|
||||||
// Connect to peer (EndpointId implements Into<EndpointAddr>)
|
|
||||||
let conn = match endpoint_clone.connect(peer_id, SYNC_ALPN).await {
|
let conn = match endpoint_clone.connect(peer_id, SYNC_ALPN).await {
|
||||||
Ok(c) => c,
|
Ok(c) => c,
|
||||||
Err(e) => {
|
Err(e) => {
|
||||||
@ -137,16 +208,12 @@ async fn main() -> Result<()> {
|
|||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
// Initial reconciliation
|
|
||||||
if let Err(e) = peer::run_sync_session(conn.clone(), can_clone.clone(), true).await {
|
if let Err(e) = peer::run_sync_session(conn.clone(), can_clone.clone(), true).await {
|
||||||
error!("Initial sync with {} failed: {:#}", short, e);
|
error!("Initial sync with {} failed: {:#}", short, e);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
// Live sync loop — keep pushing new assets
|
peer::run_live_sync(conn, can_clone, poll_dur).await;
|
||||||
if let Err(e) = peer::live_sync_loop(conn, can_clone, poll_dur).await {
|
|
||||||
warn!("Live sync with {} ended: {:#}", short, e);
|
|
||||||
}
|
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@ -63,8 +63,12 @@ pub async fn run_sync_session(
|
|||||||
let short_id = peer_id.fmt_short().to_string();
|
let short_id = peer_id.fmt_short().to_string();
|
||||||
info!("Starting sync session with {} (initiator={})", short_id, is_initiator);
|
info!("Starting sync session with {} (initiator={})", short_id, is_initiator);
|
||||||
|
|
||||||
// Open a bi-directional stream for the sync protocol
|
// Initiator opens the stream, responder accepts it
|
||||||
let (mut send, mut recv) = conn.open_bi().await.context("opening bi stream")?;
|
let (mut send, mut recv) = if is_initiator {
|
||||||
|
conn.open_bi().await.context("opening bi stream")?
|
||||||
|
} else {
|
||||||
|
conn.accept_bi().await.context("accepting bi stream")?
|
||||||
|
};
|
||||||
|
|
||||||
// Step 1: Get our local hash list from CAN service
|
// Step 1: Get our local hash list from CAN service
|
||||||
let our_hashes = can.get_hashes().await.context("getting local hashes")?;
|
let our_hashes = can.get_hashes().await.context("getting local hashes")?;
|
||||||
@ -237,27 +241,60 @@ async fn receive_assets(
|
|||||||
}
|
}
|
||||||
|
|
||||||
/// Handle an incoming connection from a peer who connected to us.
|
/// Handle an incoming connection from a peer who connected to us.
|
||||||
pub async fn handle_incoming(conn: Connection, can: CanSyncClient) {
|
pub async fn handle_incoming(
|
||||||
|
conn: Connection,
|
||||||
|
can: CanSyncClient,
|
||||||
|
poll_interval: std::time::Duration,
|
||||||
|
) {
|
||||||
let peer_id = conn.remote_id();
|
let peer_id = conn.remote_id();
|
||||||
let short_id = peer_id.fmt_short().to_string();
|
let short_id = peer_id.fmt_short().to_string();
|
||||||
info!("Incoming sync connection from {}", short_id);
|
info!("Incoming sync connection from {}", short_id);
|
||||||
|
|
||||||
if let Err(e) = run_sync_session(conn, can, false).await {
|
if let Err(e) = run_sync_session(conn.clone(), can.clone(), false).await {
|
||||||
error!("Sync session with {} failed: {:#}", short_id, e);
|
error!("Sync session with {} failed: {:#}", short_id, e);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
info!("Initial sync with {} complete, starting live sync", short_id);
|
||||||
|
|
||||||
|
// Run both live loops concurrently
|
||||||
|
run_live_sync(conn, can, poll_interval).await;
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Run both live sync loops (push + receive) concurrently.
|
||||||
|
///
|
||||||
|
/// This should be called after initial reconciliation is complete.
|
||||||
|
pub async fn run_live_sync(
|
||||||
|
conn: Connection,
|
||||||
|
can: CanSyncClient,
|
||||||
|
poll_interval: std::time::Duration,
|
||||||
|
) {
|
||||||
|
let short_id = conn.remote_id().fmt_short().to_string();
|
||||||
|
|
||||||
|
// Run push loop and receive loop concurrently — when either ends, we're done
|
||||||
|
tokio::select! {
|
||||||
|
result = live_push_loop(conn.clone(), can.clone(), poll_interval) => {
|
||||||
|
if let Err(e) = result {
|
||||||
|
warn!("Live push loop with {} ended: {:#}", short_id, e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
result = live_receive_loop(conn, can) => {
|
||||||
|
if let Err(e) = result {
|
||||||
|
warn!("Live receive loop with {} ended: {:#}", short_id, e);
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Run the live sync loop: poll for new local assets and push to peer.
|
/// Poll for new local assets and push them to the peer via new QUIC streams.
|
||||||
///
|
async fn live_push_loop(
|
||||||
/// This runs after initial reconciliation and keeps peers in sync.
|
|
||||||
pub async fn live_sync_loop(
|
|
||||||
conn: Connection,
|
conn: Connection,
|
||||||
can: CanSyncClient,
|
can: CanSyncClient,
|
||||||
poll_interval: std::time::Duration,
|
poll_interval: std::time::Duration,
|
||||||
) -> Result<()> {
|
) -> Result<()> {
|
||||||
let peer_id = conn.remote_id();
|
let peer_id = conn.remote_id();
|
||||||
let short_id = peer_id.fmt_short().to_string();
|
let short_id = peer_id.fmt_short().to_string();
|
||||||
info!("Starting live sync loop with {}", short_id);
|
info!("Starting live push loop with {}", short_id);
|
||||||
|
|
||||||
// Track what we've already synced
|
// Track what we've already synced
|
||||||
let mut known_hashes: HashSet<String> = {
|
let mut known_hashes: HashSet<String> = {
|
||||||
@ -318,3 +355,33 @@ pub async fn live_sync_loop(
|
|||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Accept incoming QUIC bi-streams from the peer and receive assets.
|
||||||
|
///
|
||||||
|
/// The peer's live_push_loop opens new bi-streams for each batch of assets.
|
||||||
|
/// This loop accepts those streams and ingests the assets into the local CAN service.
|
||||||
|
async fn live_receive_loop(
|
||||||
|
conn: Connection,
|
||||||
|
can: CanSyncClient,
|
||||||
|
) -> Result<()> {
|
||||||
|
let peer_id = conn.remote_id();
|
||||||
|
let short_id = peer_id.fmt_short().to_string();
|
||||||
|
info!("Starting live receive loop with {}", short_id);
|
||||||
|
|
||||||
|
loop {
|
||||||
|
match conn.accept_bi().await {
|
||||||
|
Ok((_send, mut recv)) => {
|
||||||
|
info!("Accepted live sync stream from peer {}", short_id);
|
||||||
|
if let Err(e) = receive_assets(&can, &mut recv, &short_id).await {
|
||||||
|
warn!("Error receiving live assets from {}: {:#}", short_id, e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Err(e) => {
|
||||||
|
info!("Live receive loop: connection to {} closed: {:#}", short_id, e);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|||||||
616
examples/can-sync/tests/sync_test.rs
Normal file
616
examples/can-sync/tests/sync_test.rs
Normal file
@ -0,0 +1,616 @@
|
|||||||
|
//! Integration test for CAN Sync v2
|
||||||
|
//!
|
||||||
|
//! Starts two CAN service instances + two sync agents, ingests files on each
|
||||||
|
//! side, and verifies bidirectional sync.
|
||||||
|
//!
|
||||||
|
//! Usage:
|
||||||
|
//! cargo run --bin sync-test
|
||||||
|
//!
|
||||||
|
//! Prerequisites:
|
||||||
|
//! CAN service must be built: `cargo build` in the CanService root
|
||||||
|
|
||||||
|
use std::path::{Path, PathBuf};
|
||||||
|
use std::process::{Child, Command, Stdio};
|
||||||
|
use std::time::{Duration, Instant};
|
||||||
|
|
||||||
|
use rand::Rng;
|
||||||
|
use serde_json::Value;
|
||||||
|
use tempfile::TempDir;
|
||||||
|
|
||||||
|
// ── Configuration ────────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
const CAN_A_PORT: u16 = 13210;
|
||||||
|
const CAN_B_PORT: u16 = 13220;
|
||||||
|
const SYNC_KEY: &str = "test-sync-key-42";
|
||||||
|
const SYNC_PASSPHRASE: &str = "integration-test-passphrase";
|
||||||
|
const SYNC_TIMEOUT: Duration = Duration::from_secs(60);
|
||||||
|
const POLL_INTERVAL: Duration = Duration::from_millis(500);
|
||||||
|
|
||||||
|
// ── Process management ───────────────────────────────────────────────────
|
||||||
|
|
||||||
|
struct ManagedProcess {
|
||||||
|
child: Child,
|
||||||
|
name: String,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl ManagedProcess {
|
||||||
|
fn spawn(
|
||||||
|
name: &str,
|
||||||
|
cmd: &str,
|
||||||
|
args: &[&str],
|
||||||
|
envs: &[(&str, &str)],
|
||||||
|
log_dir: &Path,
|
||||||
|
) -> Self {
|
||||||
|
println!(" Starting {} ...", name);
|
||||||
|
let mut command = Command::new(cmd);
|
||||||
|
|
||||||
|
let log_file = std::fs::File::create(log_dir.join(format!("{}.log", name)))
|
||||||
|
.expect("create log file");
|
||||||
|
let log_file_clone = log_file.try_clone().expect("clone log file handle");
|
||||||
|
command
|
||||||
|
.args(args)
|
||||||
|
.stdout(Stdio::from(log_file))
|
||||||
|
.stderr(Stdio::from(log_file_clone))
|
||||||
|
.env("RUST_LOG", "can_sync=debug,can_service=debug,iroh=info,iroh_gossip=info");
|
||||||
|
|
||||||
|
for (k, v) in envs {
|
||||||
|
command.env(k, v);
|
||||||
|
}
|
||||||
|
let child = command
|
||||||
|
.spawn()
|
||||||
|
.unwrap_or_else(|e| panic!("Failed to start {}: {}", name, e));
|
||||||
|
println!(" {} started (pid={})", name, child.id());
|
||||||
|
Self {
|
||||||
|
child,
|
||||||
|
name: name.to_string(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn kill(&mut self) {
|
||||||
|
let _ = self.child.kill();
|
||||||
|
let _ = self.child.wait();
|
||||||
|
println!(" {} stopped", self.name);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Drop for ManagedProcess {
|
||||||
|
fn drop(&mut self) {
|
||||||
|
self.kill();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// ── Test harness ─────────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
struct TestHarness {
|
||||||
|
_can_a: ManagedProcess,
|
||||||
|
_can_b: ManagedProcess,
|
||||||
|
_sync_a: ManagedProcess,
|
||||||
|
_sync_b: ManagedProcess,
|
||||||
|
_tmp_a: TempDir,
|
||||||
|
_tmp_b: TempDir,
|
||||||
|
_tmp_sync_a: TempDir,
|
||||||
|
_tmp_sync_b: TempDir,
|
||||||
|
log_dir: TempDir,
|
||||||
|
can_a_url: String,
|
||||||
|
can_b_url: String,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl TestHarness {
|
||||||
|
fn new(can_service_bin: &Path) -> Self {
|
||||||
|
println!("\n=== Setting up test harness ===\n");
|
||||||
|
|
||||||
|
// Create temp directories
|
||||||
|
let tmp_a = TempDir::new().expect("create temp dir A");
|
||||||
|
let tmp_b = TempDir::new().expect("create temp dir B");
|
||||||
|
let tmp_sync_a = TempDir::new().expect("create temp dir sync A");
|
||||||
|
let tmp_sync_b = TempDir::new().expect("create temp dir sync B");
|
||||||
|
let log_dir = TempDir::new().expect("create log dir");
|
||||||
|
|
||||||
|
println!(" Logs: {}", log_dir.path().display());
|
||||||
|
println!(" CAN A storage: {}", tmp_a.path().display());
|
||||||
|
println!(" CAN B storage: {}", tmp_b.path().display());
|
||||||
|
|
||||||
|
// Write CAN service configs
|
||||||
|
let config_a = tmp_a.path().join("config.yaml");
|
||||||
|
let config_b = tmp_b.path().join("config.yaml");
|
||||||
|
write_can_config(&config_a, tmp_a.path(), SYNC_KEY);
|
||||||
|
write_can_config(&config_b, tmp_b.path(), SYNC_KEY);
|
||||||
|
|
||||||
|
// Start CAN services
|
||||||
|
let can_a = ManagedProcess::spawn(
|
||||||
|
"CAN-A",
|
||||||
|
can_service_bin.to_str().unwrap(),
|
||||||
|
&[config_a.to_str().unwrap()],
|
||||||
|
&[("CAN_PORT", &CAN_A_PORT.to_string())],
|
||||||
|
log_dir.path(),
|
||||||
|
);
|
||||||
|
let can_b = ManagedProcess::spawn(
|
||||||
|
"CAN-B",
|
||||||
|
can_service_bin.to_str().unwrap(),
|
||||||
|
&[config_b.to_str().unwrap()],
|
||||||
|
&[("CAN_PORT", &CAN_B_PORT.to_string())],
|
||||||
|
log_dir.path(),
|
||||||
|
);
|
||||||
|
|
||||||
|
let can_a_url = format!("http://127.0.0.1:{}", CAN_A_PORT);
|
||||||
|
let can_b_url = format!("http://127.0.0.1:{}", CAN_B_PORT);
|
||||||
|
|
||||||
|
// Wait for CAN services to be ready
|
||||||
|
println!("\n Waiting for CAN services to start...");
|
||||||
|
wait_for_http(&can_a_url, Duration::from_secs(10), log_dir.path(), "CAN-A");
|
||||||
|
wait_for_http(&can_b_url, Duration::from_secs(10), log_dir.path(), "CAN-B");
|
||||||
|
println!(" Both CAN services ready!");
|
||||||
|
|
||||||
|
// Find can-sync binary
|
||||||
|
let manifest_dir = PathBuf::from(env!("CARGO_MANIFEST_DIR"));
|
||||||
|
let can_sync_bin = manifest_dir
|
||||||
|
.join("target")
|
||||||
|
.join("debug")
|
||||||
|
.join("can-sync")
|
||||||
|
.with_extension(std::env::consts::EXE_EXTENSION);
|
||||||
|
assert!(
|
||||||
|
can_sync_bin.exists(),
|
||||||
|
"can-sync binary not found at {}",
|
||||||
|
can_sync_bin.display()
|
||||||
|
);
|
||||||
|
println!(" Using can-sync: {}", can_sync_bin.display());
|
||||||
|
|
||||||
|
// Ticket file paths for direct peer connection
|
||||||
|
// Sync-A writes its addr, Sync-B reads it and connects directly
|
||||||
|
let ticket_a = tmp_sync_a.path().join("ticket_a.json");
|
||||||
|
let ticket_a_str = ticket_a.to_str().unwrap().replace('\\', "/");
|
||||||
|
|
||||||
|
// Write sync agent configs with ticket file exchange
|
||||||
|
let sync_config_a = tmp_sync_a.path().join("config.yaml");
|
||||||
|
let sync_config_b = tmp_sync_b.path().join("config.yaml");
|
||||||
|
|
||||||
|
// Sync-A: write its own ticket
|
||||||
|
write_sync_config_with_tickets(
|
||||||
|
&sync_config_a,
|
||||||
|
&can_a_url,
|
||||||
|
SYNC_KEY,
|
||||||
|
SYNC_PASSPHRASE,
|
||||||
|
Some(&ticket_a_str), // write our ticket here
|
||||||
|
None, // don't connect to anyone
|
||||||
|
);
|
||||||
|
|
||||||
|
// Sync-B: read Sync-A's ticket and connect directly
|
||||||
|
write_sync_config_with_tickets(
|
||||||
|
&sync_config_b,
|
||||||
|
&can_b_url,
|
||||||
|
SYNC_KEY,
|
||||||
|
SYNC_PASSPHRASE,
|
||||||
|
None, // don't write our own ticket
|
||||||
|
Some(&ticket_a_str), // connect to Sync-A using this ticket
|
||||||
|
);
|
||||||
|
|
||||||
|
// Start Sync-A first (it writes the ticket)
|
||||||
|
let sync_a = ManagedProcess::spawn(
|
||||||
|
"Sync-A",
|
||||||
|
can_sync_bin.to_str().unwrap(),
|
||||||
|
&[sync_config_a.to_str().unwrap()],
|
||||||
|
&[],
|
||||||
|
log_dir.path(),
|
||||||
|
);
|
||||||
|
|
||||||
|
// Wait for Sync-A to write its ticket file
|
||||||
|
println!(" Waiting for Sync-A to write ticket...");
|
||||||
|
let ticket_start = Instant::now();
|
||||||
|
loop {
|
||||||
|
if ticket_start.elapsed() > Duration::from_secs(15) {
|
||||||
|
print_log(log_dir.path(), "Sync-A");
|
||||||
|
panic!("Timeout waiting for Sync-A ticket file");
|
||||||
|
}
|
||||||
|
if let Ok(contents) = std::fs::read_to_string(&ticket_a) {
|
||||||
|
if !contents.trim().is_empty() {
|
||||||
|
println!(" Sync-A ticket ready ({} bytes)", contents.len());
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
std::thread::sleep(Duration::from_millis(100));
|
||||||
|
}
|
||||||
|
|
||||||
|
// Start Sync-B (it will read Sync-A's ticket and connect)
|
||||||
|
let sync_b = ManagedProcess::spawn(
|
||||||
|
"Sync-B",
|
||||||
|
can_sync_bin.to_str().unwrap(),
|
||||||
|
&[sync_config_b.to_str().unwrap()],
|
||||||
|
&[],
|
||||||
|
log_dir.path(),
|
||||||
|
);
|
||||||
|
|
||||||
|
// Wait for peers to connect
|
||||||
|
println!(" Waiting for sync agents to connect...");
|
||||||
|
std::thread::sleep(Duration::from_secs(5));
|
||||||
|
|
||||||
|
println!(" Test harness ready!\n");
|
||||||
|
|
||||||
|
Self {
|
||||||
|
_can_a: can_a,
|
||||||
|
_can_b: can_b,
|
||||||
|
_sync_a: sync_a,
|
||||||
|
_sync_b: sync_b,
|
||||||
|
_tmp_a: tmp_a,
|
||||||
|
_tmp_b: tmp_b,
|
||||||
|
_tmp_sync_a: tmp_sync_a,
|
||||||
|
_tmp_sync_b: tmp_sync_b,
|
||||||
|
log_dir,
|
||||||
|
can_a_url,
|
||||||
|
can_b_url,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn print_logs(&self) {
|
||||||
|
println!("\n=== Process Logs ===");
|
||||||
|
for name in &["Sync-A", "Sync-B", "CAN-A", "CAN-B"] {
|
||||||
|
print_log(self.log_dir.path(), name);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// ── Config writers ───────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
fn write_can_config(path: &Path, storage_root: &Path, sync_key: &str) {
|
||||||
|
let storage_str = storage_root.to_str().unwrap().replace('\\', "/");
|
||||||
|
let content = format!(
|
||||||
|
r#"storage_root: "{}"
|
||||||
|
admin_token: "test-admin-token"
|
||||||
|
enable_thumbnail_cache: false
|
||||||
|
sync_api_key: "{}"
|
||||||
|
"#,
|
||||||
|
storage_str, sync_key
|
||||||
|
);
|
||||||
|
std::fs::write(path, content).expect("write CAN config");
|
||||||
|
}
|
||||||
|
|
||||||
|
fn write_sync_config_with_tickets(
|
||||||
|
path: &Path,
|
||||||
|
can_url: &str,
|
||||||
|
sync_key: &str,
|
||||||
|
passphrase: &str,
|
||||||
|
ticket_file: Option<&str>,
|
||||||
|
connect_ticket_file: Option<&str>,
|
||||||
|
) {
|
||||||
|
let mut content = format!(
|
||||||
|
r#"can_service_url: "{}"
|
||||||
|
sync_api_key: "{}"
|
||||||
|
sync_passphrase: "{}"
|
||||||
|
poll_interval_secs: 2
|
||||||
|
"#,
|
||||||
|
can_url, sync_key, passphrase
|
||||||
|
);
|
||||||
|
|
||||||
|
if let Some(tf) = ticket_file {
|
||||||
|
content.push_str(&format!("ticket_file: \"{}\"\n", tf));
|
||||||
|
}
|
||||||
|
if let Some(ctf) = connect_ticket_file {
|
||||||
|
content.push_str(&format!("connect_ticket_file: \"{}\"\n", ctf));
|
||||||
|
}
|
||||||
|
|
||||||
|
std::fs::write(path, content).expect("write sync config");
|
||||||
|
}
|
||||||
|
|
||||||
|
// ── Logging helpers ─────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
fn print_log(log_dir: &Path, name: &str) {
|
||||||
|
let log_path = log_dir.join(format!("{}.log", name));
|
||||||
|
if let Ok(contents) = std::fs::read_to_string(&log_path) {
|
||||||
|
let lines: Vec<&str> = contents.lines().collect();
|
||||||
|
let show = if lines.len() > 50 { &lines[lines.len() - 50..] } else { &lines };
|
||||||
|
println!("\n--- {} (last {} of {} lines) ---", name, show.len(), lines.len());
|
||||||
|
for line in show {
|
||||||
|
println!(" {}", line);
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
println!("\n--- {} (no log file) ---", name);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// ── HTTP helpers ─────────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
fn wait_for_http(base_url: &str, timeout: Duration, log_dir: &Path, name: &str) {
|
||||||
|
let client = reqwest::blocking::Client::new();
|
||||||
|
let start = Instant::now();
|
||||||
|
let url = format!("{}/api/v1/can/0/list?limit=1", base_url);
|
||||||
|
|
||||||
|
loop {
|
||||||
|
if start.elapsed() > timeout {
|
||||||
|
print_log(log_dir, name);
|
||||||
|
panic!("Timeout waiting for {} to become ready", base_url);
|
||||||
|
}
|
||||||
|
match client.get(&url).timeout(Duration::from_secs(1)).send() {
|
||||||
|
Ok(resp) if resp.status().is_success() => return,
|
||||||
|
_ => std::thread::sleep(Duration::from_millis(200)),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Ingest a file into a CAN service instance. Returns the asset hash.
|
||||||
|
fn ingest_file(base_url: &str, filename: &str, content: &[u8], mime_type: &str) -> String {
|
||||||
|
let client = reqwest::blocking::Client::new();
|
||||||
|
let url = format!("{}/api/v1/can/0/ingest", base_url);
|
||||||
|
|
||||||
|
let part = reqwest::blocking::multipart::Part::bytes(content.to_vec())
|
||||||
|
.file_name(filename.to_string())
|
||||||
|
.mime_str(mime_type)
|
||||||
|
.unwrap();
|
||||||
|
|
||||||
|
let form = reqwest::blocking::multipart::Form::new()
|
||||||
|
.part("file", part)
|
||||||
|
.text("mime_type", mime_type.to_string());
|
||||||
|
|
||||||
|
let resp = client
|
||||||
|
.post(&url)
|
||||||
|
.multipart(form)
|
||||||
|
.send()
|
||||||
|
.expect("ingest request failed");
|
||||||
|
|
||||||
|
assert!(
|
||||||
|
resp.status().is_success(),
|
||||||
|
"Ingest failed with status {}",
|
||||||
|
resp.status()
|
||||||
|
);
|
||||||
|
|
||||||
|
let body: Value = resp.json().expect("parse ingest response");
|
||||||
|
body["data"]["hash"]
|
||||||
|
.as_str()
|
||||||
|
.expect("no hash in response")
|
||||||
|
.to_string()
|
||||||
|
}
|
||||||
|
|
||||||
|
/// List all assets from a CAN service. Returns list of hashes.
|
||||||
|
fn list_hashes(base_url: &str) -> Vec<String> {
|
||||||
|
let client = reqwest::blocking::Client::new();
|
||||||
|
let url = format!("{}/api/v1/can/0/list?limit=1000", base_url);
|
||||||
|
|
||||||
|
let resp = client
|
||||||
|
.get(&url)
|
||||||
|
.timeout(Duration::from_secs(5))
|
||||||
|
.send()
|
||||||
|
.expect("list request failed");
|
||||||
|
|
||||||
|
if !resp.status().is_success() {
|
||||||
|
return vec![];
|
||||||
|
}
|
||||||
|
|
||||||
|
let body: Value = resp.json().expect("parse list response");
|
||||||
|
body["data"]["items"]
|
||||||
|
.as_array()
|
||||||
|
.unwrap_or(&vec![])
|
||||||
|
.iter()
|
||||||
|
.filter_map(|item| item["hash"].as_str().map(String::from))
|
||||||
|
.collect()
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Wait for a specific hash to appear on a CAN service.
|
||||||
|
fn wait_for_hash(base_url: &str, hash: &str, timeout: Duration) -> bool {
|
||||||
|
let start = Instant::now();
|
||||||
|
while start.elapsed() < timeout {
|
||||||
|
let hashes = list_hashes(base_url);
|
||||||
|
if hashes.contains(&hash.to_string()) {
|
||||||
|
let elapsed = start.elapsed();
|
||||||
|
println!(" (synced in {:.1}s)", elapsed.as_secs_f64());
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
std::thread::sleep(POLL_INTERVAL);
|
||||||
|
}
|
||||||
|
false
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Generate random file content of given size.
|
||||||
|
fn random_content(size: usize) -> Vec<u8> {
|
||||||
|
let mut rng = rand::rng();
|
||||||
|
let mut buf = vec![0u8; size];
|
||||||
|
rng.fill(&mut buf[..]);
|
||||||
|
buf
|
||||||
|
}
|
||||||
|
|
||||||
|
// ── Test runner ──────────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
fn find_can_service_bin() -> PathBuf {
|
||||||
|
let self_exe = std::env::current_exe().expect("get current exe path");
|
||||||
|
let target_dir = self_exe.parent().unwrap();
|
||||||
|
|
||||||
|
let bin_name = if cfg!(windows) {
|
||||||
|
"can-service.exe"
|
||||||
|
} else {
|
||||||
|
"can-service"
|
||||||
|
};
|
||||||
|
|
||||||
|
// Check same directory
|
||||||
|
let candidate = target_dir.join(bin_name);
|
||||||
|
if candidate.exists() {
|
||||||
|
return candidate;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Check parent/debug
|
||||||
|
let candidate = target_dir.parent().unwrap().join("debug").join(bin_name);
|
||||||
|
if candidate.exists() {
|
||||||
|
return candidate;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Check workspace target/debug
|
||||||
|
let workspace_root = PathBuf::from(env!("CARGO_MANIFEST_DIR"))
|
||||||
|
.parent()
|
||||||
|
.unwrap()
|
||||||
|
.parent()
|
||||||
|
.unwrap()
|
||||||
|
.to_path_buf();
|
||||||
|
let candidate = workspace_root.join("target").join("debug").join(bin_name);
|
||||||
|
if candidate.exists() {
|
||||||
|
return candidate;
|
||||||
|
}
|
||||||
|
|
||||||
|
panic!(
|
||||||
|
"Cannot find {} binary. Build it first:\n cd {} && cargo build",
|
||||||
|
bin_name,
|
||||||
|
workspace_root.display()
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
fn main() {
|
||||||
|
println!("╔══════════════════════════════════════════╗");
|
||||||
|
println!("║ CAN Sync v2 Integration Test ║");
|
||||||
|
println!("╚══════════════════════════════════════════╝");
|
||||||
|
|
||||||
|
let can_service_bin = find_can_service_bin();
|
||||||
|
println!("\nUsing CAN service: {}", can_service_bin.display());
|
||||||
|
|
||||||
|
// Build can-sync if needed
|
||||||
|
println!("\nBuilding can-sync...");
|
||||||
|
let build_status = Command::new("cargo")
|
||||||
|
.args(["build", "--bin", "can-sync"])
|
||||||
|
.current_dir(env!("CARGO_MANIFEST_DIR"))
|
||||||
|
.status()
|
||||||
|
.expect("cargo build failed");
|
||||||
|
assert!(build_status.success(), "Failed to build can-sync");
|
||||||
|
|
||||||
|
// Set up harness (starts all processes)
|
||||||
|
let harness = TestHarness::new(&can_service_bin);
|
||||||
|
|
||||||
|
let mut passed = 0;
|
||||||
|
let mut failed = 0;
|
||||||
|
let mut results: Vec<(String, bool, String)> = vec![];
|
||||||
|
|
||||||
|
// ── Test 1: Ingest on A → appears on B ───────────────────────────
|
||||||
|
print_test_header("Test 1: Ingest file on CAN-A, verify sync to CAN-B");
|
||||||
|
{
|
||||||
|
let content = random_content(4096);
|
||||||
|
let hash = ingest_file(&harness.can_a_url, "test1.bin", &content, "application/octet-stream");
|
||||||
|
println!(" Ingested on A: hash={}", &hash[..16]);
|
||||||
|
|
||||||
|
let found = wait_for_hash(&harness.can_b_url, &hash, SYNC_TIMEOUT);
|
||||||
|
if found {
|
||||||
|
println!(" ✓ File appeared on CAN-B!");
|
||||||
|
results.push(("A→B sync".into(), true, format!("hash {}", &hash[..16])));
|
||||||
|
passed += 1;
|
||||||
|
} else {
|
||||||
|
println!(" ✗ File NOT found on CAN-B after {:?}", SYNC_TIMEOUT);
|
||||||
|
results.push(("A→B sync".into(), false, "timeout".into()));
|
||||||
|
failed += 1;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// ── Test 2: Ingest on B → appears on A ───────────────────────────
|
||||||
|
print_test_header("Test 2: Ingest file on CAN-B, verify sync to CAN-A");
|
||||||
|
{
|
||||||
|
let content = random_content(8192);
|
||||||
|
let hash = ingest_file(&harness.can_b_url, "test2.dat", &content, "application/octet-stream");
|
||||||
|
println!(" Ingested on B: hash={}", &hash[..16]);
|
||||||
|
|
||||||
|
let found = wait_for_hash(&harness.can_a_url, &hash, SYNC_TIMEOUT);
|
||||||
|
if found {
|
||||||
|
println!(" ✓ File appeared on CAN-A!");
|
||||||
|
results.push(("B→A sync".into(), true, format!("hash {}", &hash[..16])));
|
||||||
|
passed += 1;
|
||||||
|
} else {
|
||||||
|
println!(" ✗ File NOT found on CAN-A after {:?}", SYNC_TIMEOUT);
|
||||||
|
results.push(("B→A sync".into(), false, "timeout".into()));
|
||||||
|
failed += 1;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// ── Test 3: Multiple files batch ─────────────────────────────────
|
||||||
|
print_test_header("Test 3: Ingest 5 files on A, verify all sync to B");
|
||||||
|
{
|
||||||
|
let mut hashes = vec![];
|
||||||
|
for i in 0..5 {
|
||||||
|
let content = random_content(1024 + i * 512);
|
||||||
|
let fname = format!("batch_{}.bin", i);
|
||||||
|
let hash = ingest_file(&harness.can_a_url, &fname, &content, "application/octet-stream");
|
||||||
|
println!(" Ingested batch file {}: hash={}", i, &hash[..16]);
|
||||||
|
hashes.push(hash);
|
||||||
|
}
|
||||||
|
|
||||||
|
let mut all_found = true;
|
||||||
|
for (i, hash) in hashes.iter().enumerate() {
|
||||||
|
let found = wait_for_hash(&harness.can_b_url, hash, SYNC_TIMEOUT);
|
||||||
|
if found {
|
||||||
|
println!(" ✓ Batch file {} synced", i);
|
||||||
|
} else {
|
||||||
|
println!(" ✗ Batch file {} NOT synced", i);
|
||||||
|
all_found = false;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if all_found {
|
||||||
|
results.push(("Batch A→B (5 files)".into(), true, "all synced".into()));
|
||||||
|
passed += 1;
|
||||||
|
} else {
|
||||||
|
results.push(("Batch A→B (5 files)".into(), false, "some missing".into()));
|
||||||
|
failed += 1;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// ── Test 4: Verify total counts match ────────────────────────────
|
||||||
|
print_test_header("Test 4: Verify asset counts match on both sides");
|
||||||
|
{
|
||||||
|
std::thread::sleep(Duration::from_secs(5));
|
||||||
|
|
||||||
|
let a_hashes = list_hashes(&harness.can_a_url);
|
||||||
|
let b_hashes = list_hashes(&harness.can_b_url);
|
||||||
|
|
||||||
|
println!(" CAN-A has {} assets", a_hashes.len());
|
||||||
|
println!(" CAN-B has {} assets", b_hashes.len());
|
||||||
|
|
||||||
|
if a_hashes.len() == b_hashes.len() {
|
||||||
|
let a_set: std::collections::HashSet<_> = a_hashes.iter().collect();
|
||||||
|
let b_set: std::collections::HashSet<_> = b_hashes.iter().collect();
|
||||||
|
let matching = a_set == b_set;
|
||||||
|
|
||||||
|
if matching {
|
||||||
|
println!(" ✓ Both sides have identical asset sets ({} assets)", a_hashes.len());
|
||||||
|
results.push(("Count match".into(), true, format!("{} == {}", a_hashes.len(), b_hashes.len())));
|
||||||
|
passed += 1;
|
||||||
|
} else {
|
||||||
|
println!(" ✗ Same count but different hashes!");
|
||||||
|
let only_a: Vec<_> = a_set.difference(&b_set).collect();
|
||||||
|
let only_b: Vec<_> = b_set.difference(&a_set).collect();
|
||||||
|
if !only_a.is_empty() {
|
||||||
|
println!(" Only on A: {:?}", only_a.iter().map(|h| &h[..16]).collect::<Vec<_>>());
|
||||||
|
}
|
||||||
|
if !only_b.is_empty() {
|
||||||
|
println!(" Only on B: {:?}", only_b.iter().map(|h| &h[..16]).collect::<Vec<_>>());
|
||||||
|
}
|
||||||
|
results.push(("Count match".into(), false, "hash mismatch".into()));
|
||||||
|
failed += 1;
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
println!(" ✗ Count mismatch: A={}, B={}", a_hashes.len(), b_hashes.len());
|
||||||
|
results.push(("Count match".into(), false, format!("{} != {}", a_hashes.len(), b_hashes.len())));
|
||||||
|
failed += 1;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// ── Results ──────────────────────────────────────────────────────
|
||||||
|
println!("\n╔══════════════════════════════════════════╗");
|
||||||
|
println!("║ Test Results ║");
|
||||||
|
println!("╠══════════════════════════════════════════╣");
|
||||||
|
for (name, pass, detail) in &results {
|
||||||
|
let icon = if *pass { "✓" } else { "✗" };
|
||||||
|
println!("║ {} {:<25} {}",
|
||||||
|
icon,
|
||||||
|
name,
|
||||||
|
if detail.len() > 12 { &detail[..12] } else { detail }
|
||||||
|
);
|
||||||
|
}
|
||||||
|
println!("╠══════════════════════════════════════════╣");
|
||||||
|
println!("║ Passed: {} Failed: {} ║", passed, failed);
|
||||||
|
println!("╚══════════════════════════════════════════╝");
|
||||||
|
|
||||||
|
// Always print logs
|
||||||
|
harness.print_logs();
|
||||||
|
|
||||||
|
// Clean up
|
||||||
|
println!("\n=== Cleaning up ===\n");
|
||||||
|
drop(harness);
|
||||||
|
println!(" All temp directories removed.");
|
||||||
|
|
||||||
|
if failed > 0 {
|
||||||
|
std::process::exit(1);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn print_test_header(name: &str) {
|
||||||
|
println!("\n--- {} ---\n", name);
|
||||||
|
}
|
||||||
@ -54,7 +54,11 @@ async fn main() -> anyhow::Result<()> {
|
|||||||
.layer(CorsLayer::permissive())
|
.layer(CorsLayer::permissive())
|
||||||
.with_state(state);
|
.with_state(state);
|
||||||
|
|
||||||
let addr = SocketAddr::from(([0, 0, 0, 0], 3210));
|
let port: u16 = std::env::var("CAN_PORT")
|
||||||
|
.ok()
|
||||||
|
.and_then(|p| p.parse().ok())
|
||||||
|
.unwrap_or(3210);
|
||||||
|
let addr = SocketAddr::from(([0, 0, 0, 0], port));
|
||||||
tracing::info!("CAN service listening on {}", addr);
|
tracing::info!("CAN service listening on {}", addr);
|
||||||
|
|
||||||
let listener = tokio::net::TcpListener::bind(addr).await?;
|
let listener = tokio::net::TcpListener::bind(addr).await?;
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user