Rewrite can-sync v2: simplified P2P full-mirror replication

Replace the over-engineered iroh-docs/libraries/filters architecture
with a simple peer-to-peer sync using:
- iroh 0.96 Endpoint for QUIC transport + NAT traversal
- iroh-gossip for peer discovery via shared passphrase
- Protobuf messages over QUIC streams for asset transfer
- CAN service's private /sync/* API for local data access

Deleted: announcer, fetcher, library, manifest, node, routes (2860 lines)
Added: discovery, peer, protocol (simplified ~600 lines)

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
Jason Tudisco 2026-03-12 12:30:12 -06:00
parent 1b8187a484
commit a28fac6c9a
15 changed files with 944 additions and 2870 deletions

File diff suppressed because it is too large Load Diff

View File

@ -1,44 +1,42 @@
[package]
name = "can-sync"
version = "0.1.0"
version = "0.2.0"
edition = "2021"
description = "P2P sync service for CAN content-addressable storage"
description = "P2P sync agent for CAN service — full mirror replication via iroh"
[[bin]]
name = "can-sync"
path = "src/main.rs"
[dependencies]
# P2P networking
# P2P networking (iroh for transport + gossip for discovery — NO iroh-docs)
iroh = "0.96"
iroh-blobs = "0.98"
iroh-docs = "0.96"
iroh-gossip = "0.96"
# HTTP server + client
axum = "0.8"
tokio = { version = "1", features = ["full"] }
reqwest = { version = "0.12", features = ["json", "multipart"] }
tower-http = { version = "0.6", features = ["cors"] }
# Protobuf (same message types as CAN service sync API)
prost = "0.13"
# HTTP client for CAN service sync API
reqwest = { version = "0.12", features = ["json"] }
# Serialization
serde = { version = "1", features = ["derive"] }
serde_json = "1"
serde_yaml = "0.9"
postcard = { version = "1", features = ["alloc"] }
# Storage
rusqlite = { version = "0.32", features = ["bundled"] }
# Crypto
blake3 = "1"
# Utilities
# Async runtime
tokio = { version = "1", features = ["full"] }
# Logging
tracing = "0.1"
tracing-subscriber = { version = "0.3", features = ["env-filter"] }
# Stream utilities (needed for gossip event stream)
n0-future = "0.1"
# Utilities
anyhow = "1"
open = "5"
sha2 = "0.10"
hex = "0.4"
uuid = { version = "1", features = ["v4"] }
chrono = { version = "0.4", features = ["serde"] }
bytes = "1"
futures-lite = "2"
tokio-util = { version = "0.7", features = ["io"] }
hex = "0.4"

View File

@ -1,7 +1,13 @@
# CAN Sync configuration
can_service_url: "http://127.0.0.1:3210/api/v1/can/0"
listen_addr: "127.0.0.1:3213"
data_dir: "./can_sync_data"
relay_url: null
poll_interval_secs: 5
full_scan_interval_secs: 300
# CAN Sync v2 configuration
# URL of the local CAN Service (sync API is at /sync/*)
can_service_url: "http://127.0.0.1:3210"
# API key for CAN service's sync endpoints (must match sync_api_key in CAN config)
sync_api_key: "changeme"
# Shared passphrase for peer discovery (all peers must use the same one)
sync_passphrase: "my-shared-secret"
# Seconds between polls for new local assets
poll_interval_secs: 3

View File

@ -1,234 +0,0 @@
use std::sync::Arc;
use std::time::Duration;
use anyhow::Result;
use tracing::{debug, error, info, warn};
use crate::can_client::CanClient;
use crate::library::SyncState;
use crate::manifest::AssetSyncEntry;
use crate::node::SyncNode;
/// The announcer periodically polls CAN service for new or changed assets
/// and writes matching entries into iroh library documents.
pub struct Announcer {
can: CanClient,
state: Arc<SyncState>,
node: Arc<SyncNode>,
poll_interval: Duration,
full_scan_interval: Duration,
}
impl Announcer {
pub fn new(
can: CanClient,
state: Arc<SyncState>,
node: Arc<SyncNode>,
poll_interval_secs: u64,
full_scan_interval_secs: u64,
) -> Self {
Self {
can,
state,
node,
poll_interval: Duration::from_secs(poll_interval_secs),
full_scan_interval: Duration::from_secs(full_scan_interval_secs),
}
}
/// Run the announcer loop — fast polls + periodic full scans
pub async fn run(self) {
let mut fast_tick = tokio::time::interval(self.poll_interval);
let mut full_tick = tokio::time::interval(self.full_scan_interval);
// Skip the first immediate tick for full scan (let fast poll get first data)
full_tick.tick().await;
info!(
"Announcer started (fast poll: {}s, full scan: {}s)",
self.poll_interval.as_secs(),
self.full_scan_interval.as_secs(),
);
loop {
tokio::select! {
_ = fast_tick.tick() => {
if let Err(e) = self.fast_poll().await {
warn!("Fast poll error: {:#}", e);
}
}
_ = full_tick.tick() => {
if let Err(e) = self.full_scan().await {
warn!("Full scan error: {:#}", e);
}
}
}
}
}
/// Fast poll: check for recently ingested assets
async fn fast_poll(&self) -> Result<()> {
let last_ts = self
.state
.get_state("last_seen_timestamp")?
.and_then(|s| s.parse::<i64>().ok())
.unwrap_or(0);
// Get recent assets ordered newest first
let resp = self.can.list(50, 0, "desc", Some(last_ts)).await?;
if resp.items.is_empty() {
return Ok(());
}
debug!("Fast poll found {} new assets since ts={}", resp.items.len(), last_ts);
// Track the newest timestamp we see
let mut max_ts = last_ts;
for asset in &resp.items {
if asset.timestamp > max_ts {
max_ts = asset.timestamp;
}
}
// Process assets against libraries
let libraries = self.state.list_libraries()?;
for asset in &resp.items {
for lib in &libraries {
if lib.filter.matches(asset) {
self.announce_asset(lib, asset).await?;
}
}
}
// Update last seen timestamp
self.state.set_state("last_seen_timestamp", &max_ts.to_string())?;
Ok(())
}
/// Full scan: paginate through all assets, checking for metadata changes
async fn full_scan(&self) -> Result<()> {
info!("Starting full scan...");
let libraries = self.state.list_libraries()?;
if libraries.is_empty() {
debug!("No libraries configured, skipping full scan");
return Ok(());
}
let page_size = 100;
let mut offset = 0;
let mut total_scanned = 0;
let mut total_announced = 0;
loop {
let resp = self.can.list_all(page_size, offset, true).await?;
let count = resp.items.len();
total_scanned += count;
for asset in &resp.items {
for lib in &libraries {
if lib.filter.matches(asset) {
let was_new = self.announce_asset(lib, asset).await?;
if was_new {
total_announced += 1;
}
}
}
}
if (count as i64) < page_size {
break;
}
offset += page_size;
}
info!(
"Full scan complete: scanned {}, announced {} new/updated",
total_scanned, total_announced
);
Ok(())
}
/// Announce a single asset to a library's iroh document.
/// Returns true if the asset was newly announced or updated.
async fn announce_asset(
&self,
lib: &crate::library::Library,
asset: &crate::can_client::AssetMeta,
) -> Result<bool> {
let doc_id = match &lib.doc_id {
Some(id) => id.clone(),
None => {
debug!("Library '{}' has no doc_id yet, skipping", lib.name);
return Ok(false);
}
};
// Check if already announced at current version
if self.state.is_announced(&lib.id, &asset.hash)? {
// Already announced — skip unless metadata changed
// (full scan handles re-announcement on metadata change)
return Ok(false);
}
// Download file content from CAN service and add as iroh blob
let iroh_blob_hash = match self.can.get_asset(&asset.hash).await {
Ok(content) => {
// Add to iroh blob store so remote peers can download it
match self.node.blobs.add_bytes(content).await {
Ok(tag_info) => Some(tag_info.hash.to_string()),
Err(e) => {
warn!(
"Failed to add blob for asset {}: {:#}",
&asset.hash[..12],
e
);
None
}
}
}
Err(e) => {
warn!(
"Failed to download asset {} from CAN service: {:#}",
&asset.hash[..12],
e
);
None
}
};
// Create sync entry with the iroh blob hash
let mut entry = AssetSyncEntry::from_asset_meta(asset, &self.node.peer_id());
entry.iroh_blob_hash = iroh_blob_hash;
let entry_bytes = entry.to_bytes();
// Write to iroh document (CRDT — concurrent writes merge automatically)
if let Err(e) = self
.node
.write_to_doc(&doc_id, asset.hash.as_bytes(), &entry_bytes)
.await
{
error!(
"Failed to write asset {} to doc {}: {:#}",
&asset.hash[..12],
&doc_id[..12],
e
);
return Ok(false);
}
// Mark as announced in local state
self.state.mark_announced(&lib.id, &asset.hash, entry.version)?;
debug!(
"Announced asset {} to library '{}' (doc {})",
&asset.hash[..12],
lib.name,
&doc_id[..12]
);
Ok(true)
}
}

View File

@ -1,291 +1,104 @@
//! HTTP client for CAN service's private sync API (protobuf-encoded).
use anyhow::{Context, Result};
use bytes::Bytes;
use reqwest::multipart;
use serde::{Deserialize, Serialize};
use prost::Message;
/// HTTP client for CAN service API
#[derive(Debug, Clone)]
pub struct CanClient {
client: reqwest::Client,
use crate::protocol::*;
/// Client for CAN service's /sync/* endpoints.
#[derive(Clone)]
pub struct CanSyncClient {
http: reqwest::Client,
base_url: String,
sync_key: String,
}
// ── API response types (mirror CAN service) ──
#[derive(Debug, Deserialize)]
pub struct ApiResponse<T> {
pub status: String,
pub data: T,
}
#[derive(Debug, Deserialize)]
pub struct ErrorResponse {
pub status: String,
pub error: String,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct AssetMeta {
pub hash: String,
pub mime_type: String,
pub application: Option<String>,
pub user: Option<String>,
pub tags: Vec<String>,
pub description: Option<String>,
pub human_filename: Option<String>,
pub human_path: Option<String>,
pub timestamp: i64,
pub is_trashed: bool,
#[serde(default)]
pub is_corrupted: bool,
pub size: i64,
}
#[derive(Debug, Deserialize)]
pub struct ListResponse {
pub items: Vec<AssetMeta>,
pub pagination: Pagination,
}
#[derive(Debug, Deserialize)]
pub struct Pagination {
pub limit: i64,
pub offset: i64,
pub total: i64,
}
#[derive(Debug, Deserialize)]
pub struct IngestResult {
pub timestamp: i64,
pub hash: String,
pub filename: String,
}
// ── Search parameters ──
#[derive(Debug, Default, Serialize)]
pub struct SearchParams {
#[serde(skip_serializing_if = "Option::is_none")]
pub hash: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub start_time: Option<i64>,
#[serde(skip_serializing_if = "Option::is_none")]
pub end_time: Option<i64>,
#[serde(skip_serializing_if = "Option::is_none")]
pub tags: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub mime_type: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub user: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub application: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub limit: Option<i64>,
#[serde(skip_serializing_if = "Option::is_none")]
pub offset: Option<i64>,
#[serde(skip_serializing_if = "Option::is_none")]
pub order: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub include_trashed: Option<bool>,
}
// ── Ingest metadata ──
#[derive(Debug, Default)]
pub struct IngestMeta {
pub mime_type: Option<String>,
pub human_file_name: Option<String>,
pub human_readable_path: Option<String>,
pub application: Option<String>,
pub user: Option<String>,
pub tags: Option<String>,
pub description: Option<String>,
}
// ── Client implementation ──
impl CanClient {
pub fn new(base_url: &str) -> Self {
impl CanSyncClient {
pub fn new(base_url: &str, sync_key: &str) -> Self {
Self {
client: reqwest::Client::new(),
http: reqwest::Client::new(),
base_url: base_url.trim_end_matches('/').to_string(),
sync_key: sync_key.to_string(),
}
}
/// List assets with pagination and ordering
pub async fn list(
&self,
limit: i64,
offset: i64,
order: &str,
offset_time: Option<i64>,
) -> Result<ListResponse> {
let mut url = format!("{}/list?limit={}&offset={}&order={}", self.base_url, limit, offset, order);
if let Some(ts) = offset_time {
url.push_str(&format!("&offset_time={}", ts));
}
let resp = self.client.get(&url).send().await.context("list request failed")?;
let status = resp.status();
if !status.is_success() {
let text = resp.text().await.unwrap_or_default();
anyhow::bail!("CAN list failed ({}): {}", status, text);
}
let api: ApiResponse<ListResponse> = resp.json().await.context("parse list response")?;
Ok(api.data)
}
/// List all assets (paginated, including trashed for full sync)
pub async fn list_all(
&self,
limit: i64,
offset: i64,
include_trashed: bool,
) -> Result<ListResponse> {
let mut url = format!("{}/list?limit={}&offset={}&order=asc", self.base_url, limit, offset);
if include_trashed {
url.push_str("&include_trashed=true");
}
let resp = self.client.get(&url).send().await.context("list_all request failed")?;
let status = resp.status();
if !status.is_success() {
let text = resp.text().await.unwrap_or_default();
anyhow::bail!("CAN list_all failed ({}): {}", status, text);
}
let api: ApiResponse<ListResponse> = resp.json().await.context("parse list_all response")?;
Ok(api.data)
}
/// Search assets by filters
pub async fn search(&self, params: &SearchParams) -> Result<ListResponse> {
/// POST protobuf request, return protobuf response bytes
async fn post_proto(&self, path: &str, body: Vec<u8>) -> Result<bytes::Bytes> {
let url = format!("{}{}", self.base_url, path);
let resp = self
.client
.get(&format!("{}/search", self.base_url))
.query(params)
.http
.post(&url)
.header("X-Sync-Key", &self.sync_key)
.header("Content-Type", "application/x-protobuf")
.body(body)
.send()
.await
.context("search request failed")?;
let status = resp.status();
if !status.is_success() {
.with_context(|| format!("POST {}", url))?;
if !resp.status().is_success() {
let status = resp.status();
let text = resp.text().await.unwrap_or_default();
anyhow::bail!("CAN search failed ({}): {}", status, text);
anyhow::bail!("{} returned {}: {}", path, status, text);
}
let api: ApiResponse<ListResponse> = resp.json().await.context("parse search response")?;
Ok(api.data)
resp.bytes().await.with_context(|| format!("reading body from {}", path))
}
/// Download asset content by hash
pub async fn get_asset(&self, hash: &str) -> Result<Bytes> {
let resp = self
.client
.get(&format!("{}/asset/{}", self.base_url, hash))
.send()
.await
.context("get_asset request failed")?;
let status = resp.status();
if !status.is_success() {
let text = resp.text().await.unwrap_or_default();
anyhow::bail!("CAN get_asset failed ({}): {}", status, text);
}
resp.bytes().await.context("read asset bytes")
/// Get all asset digests for reconciliation.
pub async fn get_hashes(&self) -> Result<HashListResponse> {
let req = HashListRequest {};
let mut buf = Vec::with_capacity(req.encoded_len());
req.encode(&mut buf)?;
let resp_bytes = self.post_proto("/sync/hashes", buf).await?;
HashListResponse::decode(resp_bytes).context("decode HashListResponse")
}
/// Get asset metadata by hash
pub async fn get_meta(&self, hash: &str) -> Result<AssetMeta> {
let resp = self
.client
.get(&format!("{}/asset/{}/meta", self.base_url, hash))
.send()
.await
.context("get_meta request failed")?;
let status = resp.status();
if !status.is_success() {
let text = resp.text().await.unwrap_or_default();
anyhow::bail!("CAN get_meta failed ({}): {}", status, text);
}
let api: ApiResponse<AssetMeta> = resp.json().await.context("parse meta response")?;
Ok(api.data)
/// Pull full assets by hash.
pub async fn pull(&self, hashes: Vec<String>) -> Result<PullResponse> {
let req = PullRequest { hashes };
let mut buf = Vec::with_capacity(req.encoded_len());
req.encode(&mut buf)?;
let resp_bytes = self.post_proto("/sync/pull", buf).await?;
PullResponse::decode(resp_bytes).context("decode PullResponse")
}
/// Ingest a file into CAN service via multipart upload
pub async fn ingest(&self, content: Bytes, meta: IngestMeta) -> Result<IngestResult> {
let file_part = multipart::Part::bytes(content.to_vec())
.file_name(meta.human_file_name.clone().unwrap_or_else(|| "file".to_string()))
.mime_str(meta.mime_type.as_deref().unwrap_or("application/octet-stream"))?;
/// Push a single asset bundle.
pub async fn push(&self, bundle: AssetBundle) -> Result<PushResponse> {
let req = PushRequest {
bundle: Some(bundle),
};
let mut buf = Vec::with_capacity(req.encoded_len());
req.encode(&mut buf)?;
let mut form = multipart::Form::new().part("file", file_part);
if let Some(ref v) = meta.mime_type {
form = form.text("mime_type", v.clone());
}
if let Some(ref v) = meta.human_file_name {
form = form.text("human_file_name", v.clone());
}
if let Some(ref v) = meta.human_readable_path {
form = form.text("human_readable_path", v.clone());
}
if let Some(ref v) = meta.application {
form = form.text("application", v.clone());
}
if let Some(ref v) = meta.user {
form = form.text("user", v.clone());
}
if let Some(ref v) = meta.tags {
form = form.text("tags", v.clone());
}
if let Some(ref v) = meta.description {
form = form.text("description", v.clone());
}
let resp = self
.client
.post(&format!("{}/ingest", self.base_url))
.multipart(form)
.send()
.await
.context("ingest request failed")?;
let status = resp.status();
if !status.is_success() {
let text = resp.text().await.unwrap_or_default();
anyhow::bail!("CAN ingest failed ({}): {}", status, text);
}
let api: ApiResponse<IngestResult> = resp.json().await.context("parse ingest response")?;
Ok(api.data)
let resp_bytes = self.post_proto("/sync/push", buf).await?;
PushResponse::decode(resp_bytes).context("decode PushResponse")
}
/// Update asset metadata (tags, description)
/// Update metadata for an existing asset.
pub async fn update_meta(
&self,
hash: &str,
tags: Option<Vec<String>>,
hash: String,
description: Option<String>,
) -> Result<()> {
#[derive(Serialize)]
struct MetadataUpdate {
#[serde(skip_serializing_if = "Option::is_none")]
tags: Option<Vec<String>>,
#[serde(skip_serializing_if = "Option::is_none")]
description: Option<String>,
}
let resp = self
.client
.patch(&format!("{}/asset/{}", self.base_url, hash))
.json(&MetadataUpdate { tags, description })
.send()
.await
.context("update_meta request failed")?;
let status = resp.status();
if !status.is_success() {
let text = resp.text().await.unwrap_or_default();
anyhow::bail!("CAN update_meta failed ({}): {}", status, text);
}
Ok(())
tags: Vec<String>,
is_trashed: bool,
) -> Result<MetaUpdateResponse> {
let req = MetaUpdateRequest {
hash,
description,
tags,
is_trashed,
};
let mut buf = Vec::with_capacity(req.encoded_len());
req.encode(&mut buf)?;
let resp_bytes = self.post_proto("/sync/meta", buf).await?;
MetaUpdateResponse::decode(resp_bytes).context("decode MetaUpdateResponse")
}
/// Check if CAN service is reachable
pub async fn health_check(&self) -> Result<bool> {
match self.list(1, 0, "desc", None).await {
Ok(_) => Ok(true),
Err(_) => Ok(false),
}
/// Health check: try to get hashes (will fail if sync API disabled).
pub async fn health_check(&self) -> bool {
self.get_hashes().await.is_ok()
}
}

View File

@ -1,78 +1,30 @@
use anyhow::{Context, Result};
use serde::{Deserialize, Serialize};
use std::path::{Path, PathBuf};
use serde::Deserialize;
use std::path::Path;
#[derive(Debug, Clone, Serialize, Deserialize)]
#[derive(Debug, Clone, Deserialize)]
pub struct SyncConfig {
/// Base URL for the CAN service API (e.g. "http://127.0.0.1:3210/api/v1/can/0")
/// Base URL of the local CAN service (e.g. "http://127.0.0.1:3210")
pub can_service_url: String,
/// Address for the CAN Sync HTTP API (e.g. "127.0.0.1:3213")
pub listen_addr: String,
/// API key for CAN service's sync endpoints (must match config.sync_api_key)
pub sync_api_key: String,
/// Directory for persistent data (peer key, sync state DB)
pub data_dir: String,
/// Shared passphrase for peer discovery (all peers must use the same one)
pub sync_passphrase: String,
/// Optional custom relay URL; null uses iroh's public relay
pub relay_url: Option<String>,
/// Seconds between fast polls for new assets
/// Seconds between polls for new local assets
#[serde(default = "default_poll_interval")]
pub poll_interval_secs: u64,
/// Seconds between full scans of all assets
#[serde(default = "default_full_scan_interval")]
pub full_scan_interval_secs: u64,
}
fn default_poll_interval() -> u64 {
5
}
fn default_full_scan_interval() -> u64 {
300
3
}
impl SyncConfig {
/// Load config from a YAML file, falling back to defaults if not found
pub fn load(path: &Path) -> Result<Self> {
if path.exists() {
let contents =
std::fs::read_to_string(path).context("Failed to read config file")?;
let config: SyncConfig =
serde_yaml::from_str(&contents).context("Failed to parse config YAML")?;
Ok(config)
} else {
tracing::warn!("Config file not found at {}, using defaults", path.display());
Ok(Self::default())
}
}
/// Resolved data directory path
pub fn data_path(&self) -> PathBuf {
PathBuf::from(&self.data_dir)
}
/// Path to the peer keypair file
pub fn peer_key_path(&self) -> PathBuf {
self.data_path().join("peer_key")
}
/// Path to the sync state SQLite database
pub fn db_path(&self) -> PathBuf {
self.data_path().join("can_sync.db")
}
}
impl Default for SyncConfig {
fn default() -> Self {
Self {
can_service_url: "http://127.0.0.1:3210/api/v1/can/0".to_string(),
listen_addr: "127.0.0.1:3213".to_string(),
data_dir: "./can_sync_data".to_string(),
relay_url: None,
poll_interval_secs: default_poll_interval(),
full_scan_interval_secs: default_full_scan_interval(),
}
pub fn load(path: &Path) -> anyhow::Result<Self> {
let contents = std::fs::read_to_string(path)?;
let config: Self = serde_yaml::from_str(&contents)?;
Ok(config)
}
}

View File

@ -0,0 +1,109 @@
//! Peer discovery via iroh-gossip using a shared passphrase.
//!
//! All CAN sync agents with the same `sync_passphrase` derive the same
//! BLAKE3 gossip topic and discover each other automatically.
use std::collections::HashSet;
use anyhow::Result;
use iroh::{Endpoint, EndpointId};
use iroh_gossip::net::Gossip;
use iroh_gossip::proto::TopicId;
use n0_future::StreamExt;
use tokio::sync::mpsc;
use tracing::{debug, info, warn};
/// Derive a deterministic gossip TopicId from a shared passphrase.
pub fn derive_topic(passphrase: &str) -> TopicId {
let hash = blake3::hash(format!("can-sync-v1:{}", passphrase).as_bytes());
TopicId::from_bytes(*hash.as_bytes())
}
/// Manages peer discovery via gossip announcements.
pub struct Discovery {
gossip: Gossip,
topic: TopicId,
endpoint: Endpoint,
}
impl Discovery {
pub fn new(endpoint: Endpoint, gossip: Gossip, passphrase: &str) -> Self {
let topic = derive_topic(passphrase);
info!("Gossip topic: {}", hex::encode(topic.as_bytes()));
Self {
gossip,
topic,
endpoint,
}
}
/// Subscribe to the gossip topic and yield newly discovered peer EndpointIds.
///
/// Sends discovered EndpointIds on the channel. Runs forever.
pub async fn run(self, tx: mpsc::Sender<EndpointId>) -> Result<()> {
info!("Joining gossip topic for peer discovery...");
// Subscribe to the topic with no bootstrap peers (we discover via gossip)
let mut topic = self
.gossip
.subscribe(self.topic, vec![])
.await
.map_err(|e| anyhow::anyhow!("gossip subscribe failed: {}", e))?;
// Wait until we have at least one neighbor
info!("Waiting for gossip neighbors...");
// Broadcast our EndpointId periodically
let our_id = self.endpoint.id();
let (sender, mut receiver) = topic.split();
let sender_clone = sender.clone();
tokio::spawn(async move {
let msg = our_id.as_bytes().to_vec();
loop {
if let Err(e) = sender_clone.broadcast(msg.clone().into()).await {
warn!("Failed to broadcast discovery: {}", e);
}
tokio::time::sleep(std::time::Duration::from_secs(10)).await;
}
});
// Listen for peer announcements
let mut known_peers: HashSet<EndpointId> = HashSet::new();
while let Some(event) = receiver.next().await {
match event {
Ok(iroh_gossip::api::Event::Received(msg)) => {
if msg.content.len() == 32 {
if let Ok(bytes) = <[u8; 32]>::try_from(msg.content.as_ref()) {
if let Ok(peer_id) = EndpointId::from_bytes(&bytes) {
if peer_id != our_id && known_peers.insert(peer_id) {
info!("Discovered new peer: {}", peer_id.fmt_short());
let _ = tx.send(peer_id).await;
}
}
}
}
}
Ok(iroh_gossip::api::Event::NeighborUp(peer_id)) => {
if peer_id != our_id && known_peers.insert(peer_id) {
info!("Gossip neighbor up: {}", peer_id.fmt_short());
let _ = tx.send(peer_id).await;
}
}
Ok(iroh_gossip::api::Event::NeighborDown(peer_id)) => {
info!("Gossip neighbor down: {}", peer_id.fmt_short());
known_peers.remove(&peer_id);
}
Ok(iroh_gossip::api::Event::Lagged) => {
warn!("Gossip receiver lagged, may have missed messages");
}
Err(e) => {
warn!("Gossip receive error: {}", e);
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
}
}
}
Ok(())
}
}

View File

@ -1,352 +0,0 @@
use std::sync::Arc;
use anyhow::Result;
use futures_lite::StreamExt;
use sha2::{Digest, Sha256};
use tokio::io::AsyncReadExt;
use tracing::{debug, error, info, warn};
use crate::can_client::{CanClient, IngestMeta};
use crate::library::SyncState;
use crate::manifest::AssetSyncEntry;
use crate::node::SyncNode;
/// The fetcher receives remote asset entries from iroh documents
/// and ingests them into the local CAN service.
pub struct Fetcher {
can: CanClient,
state: Arc<SyncState>,
node: Arc<SyncNode>,
}
impl Fetcher {
pub fn new(can: CanClient, state: Arc<SyncState>, node: Arc<SyncNode>) -> Self {
Self { can, state, node }
}
/// Run the fetcher — subscribes to library document events for real-time sync,
/// falls back to periodic polling for documents without active subscriptions
pub async fn run(self) {
info!("Fetcher started — watching for remote asset entries");
// Run two loops concurrently:
// 1. Subscription watcher — subscribes to active library docs
// 2. Periodic checker — catches anything missed
let poll_interval = tokio::time::interval(std::time::Duration::from_secs(10));
let sub_interval = tokio::time::interval(std::time::Duration::from_secs(5));
tokio::pin!(poll_interval);
tokio::pin!(sub_interval);
loop {
tokio::select! {
_ = poll_interval.tick() => {
if let Err(e) = self.check_for_new_entries().await {
warn!("Fetcher poll error: {:#}", e);
}
}
_ = sub_interval.tick() => {
// Try to subscribe to any library docs that we haven't subscribed to yet
if let Err(e) = self.subscribe_to_libraries().await {
debug!("Fetcher subscription check: {:#}", e);
}
}
}
}
}
/// Subscribe to document events for all libraries that have doc_ids
async fn subscribe_to_libraries(&self) -> Result<()> {
let libraries = self.state.list_libraries()?;
for lib in &libraries {
if let Some(ref doc_id_hex) = lib.doc_id {
// Open the doc and subscribe to events
let doc = match self.node.open_doc(doc_id_hex).await {
Ok(d) => d,
Err(_) => continue,
};
let mut events = match doc.subscribe().await {
Ok(e) => e,
Err(_) => continue,
};
// Spawn a task to process events from this doc
let can = self.can.clone();
let node_peer_id = self.node.peer_id();
let node = self.node.clone();
let lib_name = lib.name.clone();
tokio::spawn(async move {
while let Some(event) = events.next().await {
match event {
Ok(iroh_docs::engine::LiveEvent::InsertRemote {
entry,
content_status,
..
}) => {
let key = entry.key().to_vec();
let can_hash = String::from_utf8_lossy(&key).to_string();
if content_status == iroh_docs::ContentStatus::Complete {
// The entry value (our AssetSyncEntry) is available
// Read the entry content from the blob store
let content_hash = entry.content_hash();
let mut reader = node.blobs.reader(content_hash);
let mut buf = Vec::new();
if reader.read_to_end(&mut buf).await.is_ok() {
if let Ok(sync_entry) = AssetSyncEntry::from_bytes(&buf) {
if sync_entry.last_modified_by == node_peer_id {
continue; // Skip our own entries
}
info!(
"Received remote entry for {} in library '{}'",
&can_hash[..can_hash.len().min(12)],
lib_name
);
if let Err(e) = process_remote_entry(
&can,
&node,
&node_peer_id,
&can_hash,
sync_entry,
)
.await
{
error!(
"Error processing remote entry {}: {:#}",
&can_hash[..can_hash.len().min(12)],
e
);
}
}
}
}
}
Ok(iroh_docs::engine::LiveEvent::NeighborUp(peer)) => {
info!("Peer connected: {}", peer.fmt_short());
}
Ok(iroh_docs::engine::LiveEvent::NeighborDown(peer)) => {
info!("Peer disconnected: {}", peer.fmt_short());
}
Ok(_) => {} // Ignore other events
Err(e) => {
warn!("Document event error: {:#}", e);
break;
}
}
}
});
// Only subscribe to one doc per tick to avoid overwhelming
return Ok(());
}
}
Ok(())
}
/// Check all library documents for entries we don't have locally (polling fallback)
async fn check_for_new_entries(&self) -> Result<()> {
let libraries = self.state.list_libraries()?;
for lib in &libraries {
if let Some(ref doc_id_hex) = lib.doc_id {
let doc = match self.node.open_doc(doc_id_hex).await {
Ok(d) => d,
Err(_) => continue,
};
// Query all entries (latest per key)
let query = iroh_docs::store::Query::single_latest_per_key().build();
let entries = match doc.get_many(query).await {
Ok(e) => e,
Err(_) => continue,
};
tokio::pin!(entries);
while let Some(Ok(entry)) = entries.next().await {
let key = entry.key().to_vec();
let can_hash = String::from_utf8_lossy(&key).to_string();
// Read the entry value (AssetSyncEntry)
let content_hash = entry.content_hash();
let mut reader = self.node.blobs.reader(content_hash);
let mut buf = Vec::new();
if reader.read_to_end(&mut buf).await.is_err() {
continue;
}
let sync_entry = match AssetSyncEntry::from_bytes(&buf) {
Ok(e) => e,
Err(_) => continue,
};
// Skip our own entries
if sync_entry.last_modified_by == self.node.peer_id() {
continue;
}
// Check if already processed
if self.state.is_announced(&lib.id, &can_hash).unwrap_or(false) {
continue;
}
info!(
"Polling found remote entry for {} in library '{}'",
&can_hash[..can_hash.len().min(12)],
lib.name
);
if let Err(e) = process_remote_entry(
&self.can,
&self.node,
&self.node.peer_id(),
&can_hash,
sync_entry,
)
.await
{
error!(
"Error processing remote entry {}: {:#}",
&can_hash[..can_hash.len().min(12)],
e
);
}
// Mark as processed
let _ = self.state.mark_announced(&lib.id, &can_hash, 1);
}
}
}
Ok(())
}
}
/// Process a remote asset entry — download blob and ingest into CAN service
async fn process_remote_entry(
can: &CanClient,
node: &SyncNode,
local_peer_id: &str,
can_hash: &str,
entry: AssetSyncEntry,
) -> Result<()> {
// Skip if this is our own entry
if entry.last_modified_by == local_peer_id {
return Ok(());
}
// Check if already in local CAN service
match can.get_meta(can_hash).await {
Ok(existing) => {
// Asset exists — check if metadata needs updating
if entry.tags != existing.tags
|| entry.description != existing.description
|| entry.is_trashed != existing.is_trashed
{
info!("Updating metadata for {} from remote peer", &can_hash[..12]);
can.update_meta(
can_hash,
Some(entry.tags.clone()),
entry.description.clone(),
)
.await?;
}
return Ok(());
}
Err(_) => {
// Asset not found locally — need to fetch and ingest
}
}
info!(
"Fetching remote asset {} ({}B) from peer {}",
&can_hash[..12],
entry.size,
&entry.last_modified_by[..entry.last_modified_by.len().min(12)]
);
// Download blob via iroh
let content = download_blob(node, &entry).await?;
if content.is_empty() {
warn!("Downloaded empty blob for {} — skipping", &can_hash[..12]);
return Ok(());
}
// Verify CAN hash: SHA256(timestamp_bytes + content)
if !verify_can_hash(can_hash, entry.timestamp, &content) {
error!(
"CAN hash verification failed for {} — rejecting",
&can_hash[..12]
);
return Ok(());
}
// Ingest into local CAN service
let meta = IngestMeta {
mime_type: Some(entry.mime_type.clone()),
human_file_name: entry.human_filename.clone(),
human_readable_path: entry.human_path.clone(),
application: entry.application.clone(),
user: entry.user.clone(),
tags: if entry.tags.is_empty() {
None
} else {
Some(entry.tags.join(","))
},
description: entry.description.clone(),
};
match can.ingest(content.into(), meta).await {
Ok(result) => {
info!(
"Ingested remote asset: hash={}, filename={}",
&result.hash[..12],
result.filename
);
}
Err(e) => {
error!("Failed to ingest remote asset {}: {:#}", &can_hash[..12], e);
}
}
Ok(())
}
/// Download a blob via iroh using the blob hash from the sync entry
async fn download_blob(node: &SyncNode, entry: &AssetSyncEntry) -> Result<Vec<u8>> {
let blob_hash_str = match &entry.iroh_blob_hash {
Some(h) => h,
None => {
warn!("No iroh blob hash in sync entry — cannot download");
return Ok(Vec::new());
}
};
// Parse the BLAKE3 hash
let blob_hash: iroh_blobs::Hash = blob_hash_str
.parse()
.map_err(|_| anyhow::anyhow!("Invalid iroh blob hash: {}", &blob_hash_str[..12]))?;
// Read from the local blob store (iroh-docs should have synced it)
let mut reader = node.blobs.reader(blob_hash);
let mut buf = Vec::with_capacity(entry.size as usize);
reader.read_to_end(&mut buf).await?;
debug!(
"Downloaded blob {} ({} bytes)",
&blob_hash_str[..12],
buf.len()
);
Ok(buf)
}
/// Verify CAN hash: SHA256(timestamp_string + content) matches expected hash
fn verify_can_hash(expected_hash: &str, timestamp: i64, content: &[u8]) -> bool {
let mut hasher = Sha256::new();
hasher.update(timestamp.to_string().as_bytes());
hasher.update(content);
let computed = hex::encode(hasher.finalize());
computed == expected_hash
}

View File

@ -1,288 +0,0 @@
use anyhow::{Context, Result};
use rusqlite::Connection;
use serde::{Deserialize, Serialize};
use crate::can_client::AssetMeta;
/// Filter criteria that determines which CAN assets belong to a library
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct LibraryFilter {
/// Match assets with this application tag
#[serde(skip_serializing_if = "Option::is_none")]
pub application: Option<String>,
/// Match assets with any of these tags
#[serde(skip_serializing_if = "Option::is_none")]
pub tags: Option<Vec<String>>,
/// Match assets from this user
#[serde(skip_serializing_if = "Option::is_none")]
pub user: Option<String>,
/// Match assets with MIME type prefix (e.g. "image/")
#[serde(skip_serializing_if = "Option::is_none")]
pub mime_prefix: Option<String>,
/// Manual list of specific hashes to include
#[serde(skip_serializing_if = "Option::is_none")]
pub hashes: Option<Vec<String>>,
}
impl LibraryFilter {
/// Check if an asset matches this filter
pub fn matches(&self, asset: &AssetMeta) -> bool {
// If hashes list is set, only match those exact hashes
if let Some(ref hashes) = self.hashes {
return hashes.contains(&asset.hash);
}
// All set criteria must match (AND logic)
if let Some(ref app) = self.application {
if asset.application.as_deref() != Some(app.as_str()) {
return false;
}
}
if let Some(ref required_tags) = self.tags {
// Asset must have at least one of the required tags
if !required_tags.iter().any(|t| asset.tags.contains(t)) {
return false;
}
}
if let Some(ref user) = self.user {
if asset.user.as_deref() != Some(user.as_str()) {
return false;
}
}
if let Some(ref prefix) = self.mime_prefix {
if !asset.mime_type.starts_with(prefix.as_str()) {
return false;
}
}
true
}
}
/// A library definition stored locally
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Library {
/// Unique library ID (UUID)
pub id: String,
/// Human-readable name
pub name: String,
/// Filter criteria
pub filter: LibraryFilter,
/// iroh document ID (namespace) — set after creation
pub doc_id: Option<String>,
/// Whether this library was created locally or joined from remote
pub is_local: bool,
/// Creation timestamp
pub created_at: i64,
}
/// Tracks which assets have been announced to which libraries.
/// Uses std::sync::Mutex because rusqlite::Connection is !Send,
/// so tokio::sync::RwLock won't work across .await points.
pub struct SyncState {
db: std::sync::Mutex<Connection>,
}
impl SyncState {
/// Open or create the sync state database
pub fn open(path: &std::path::Path) -> Result<Self> {
let db = Connection::open(path).context("open sync state DB")?;
db.execute_batch(
"
CREATE TABLE IF NOT EXISTS libraries (
id TEXT PRIMARY KEY,
name TEXT NOT NULL,
filter_json TEXT NOT NULL,
doc_id TEXT,
is_local INTEGER NOT NULL DEFAULT 1,
created_at INTEGER NOT NULL
);
CREATE TABLE IF NOT EXISTS announced_assets (
library_id TEXT NOT NULL,
hash TEXT NOT NULL,
version INTEGER NOT NULL DEFAULT 1,
announced_at INTEGER NOT NULL,
PRIMARY KEY (library_id, hash),
FOREIGN KEY (library_id) REFERENCES libraries(id) ON DELETE CASCADE
);
CREATE TABLE IF NOT EXISTS sync_state (
key TEXT PRIMARY KEY,
value TEXT NOT NULL
);
",
)
.context("init sync state tables")?;
Ok(Self {
db: std::sync::Mutex::new(db),
})
}
fn lock_db(&self) -> std::sync::MutexGuard<'_, Connection> {
self.db.lock().expect("sync state DB lock poisoned")
}
// ── Library CRUD ──
pub fn save_library(&self, lib: &Library) -> Result<()> {
let db = self.lock_db();
let filter_json = serde_json::to_string(&lib.filter)?;
db.execute(
"INSERT OR REPLACE INTO libraries (id, name, filter_json, doc_id, is_local, created_at)
VALUES (?1, ?2, ?3, ?4, ?5, ?6)",
rusqlite::params![
lib.id,
lib.name,
filter_json,
lib.doc_id,
lib.is_local as i32,
lib.created_at,
],
)?;
Ok(())
}
pub fn list_libraries(&self) -> Result<Vec<Library>> {
let db = self.lock_db();
let mut stmt =
db.prepare("SELECT id, name, filter_json, doc_id, is_local, created_at FROM libraries")?;
let libs = stmt
.query_map([], |row| {
let filter_json: String = row.get(2)?;
Ok(Library {
id: row.get(0)?,
name: row.get(1)?,
filter: serde_json::from_str(&filter_json).unwrap_or(LibraryFilter {
application: None,
tags: None,
user: None,
mime_prefix: None,
hashes: None,
}),
doc_id: row.get(3)?,
is_local: row.get::<_, i32>(4)? != 0,
created_at: row.get(5)?,
})
})?
.collect::<Result<Vec<_>, _>>()?;
Ok(libs)
}
pub fn get_library(&self, id: &str) -> Result<Option<Library>> {
let db = self.lock_db();
let mut stmt = db.prepare(
"SELECT id, name, filter_json, doc_id, is_local, created_at FROM libraries WHERE id = ?1",
)?;
let mut rows = stmt.query_map([id], |row| {
let filter_json: String = row.get(2)?;
Ok(Library {
id: row.get(0)?,
name: row.get(1)?,
filter: serde_json::from_str(&filter_json).unwrap_or(LibraryFilter {
application: None,
tags: None,
user: None,
mime_prefix: None,
hashes: None,
}),
doc_id: row.get(3)?,
is_local: row.get::<_, i32>(4)? != 0,
created_at: row.get(5)?,
})
})?;
match rows.next() {
Some(Ok(lib)) => Ok(Some(lib)),
Some(Err(e)) => Err(e.into()),
None => Ok(None),
}
}
pub fn delete_library(&self, id: &str) -> Result<()> {
let db = self.lock_db();
db.execute("DELETE FROM announced_assets WHERE library_id = ?1", [id])?;
db.execute("DELETE FROM libraries WHERE id = ?1", [id])?;
Ok(())
}
pub fn update_library_doc_id(&self, id: &str, doc_id: &str) -> Result<()> {
let db = self.lock_db();
db.execute(
"UPDATE libraries SET doc_id = ?1 WHERE id = ?2",
[doc_id, id],
)?;
Ok(())
}
// ── Asset announcement tracking ──
pub fn is_announced(&self, library_id: &str, hash: &str) -> Result<bool> {
let db = self.lock_db();
let count: i64 = db.query_row(
"SELECT COUNT(*) FROM announced_assets WHERE library_id = ?1 AND hash = ?2",
[library_id, hash],
|row| row.get(0),
)?;
Ok(count > 0)
}
pub fn get_announced_version(&self, library_id: &str, hash: &str) -> Result<Option<u64>> {
let db = self.lock_db();
let mut stmt = db.prepare(
"SELECT version FROM announced_assets WHERE library_id = ?1 AND hash = ?2",
)?;
let mut rows = stmt.query_map(rusqlite::params![library_id, hash], |row| {
row.get::<_, i64>(0)
})?;
match rows.next() {
Some(Ok(v)) => Ok(Some(v as u64)),
Some(Err(e)) => Err(e.into()),
None => Ok(None),
}
}
pub fn mark_announced(&self, library_id: &str, hash: &str, version: u64) -> Result<()> {
let db = self.lock_db();
let now = chrono::Utc::now().timestamp_millis();
db.execute(
"INSERT OR REPLACE INTO announced_assets (library_id, hash, version, announced_at)
VALUES (?1, ?2, ?3, ?4)",
rusqlite::params![library_id, hash, version as i64, now],
)?;
Ok(())
}
pub fn remove_announced(&self, library_id: &str, hash: &str) -> Result<()> {
let db = self.lock_db();
db.execute(
"DELETE FROM announced_assets WHERE library_id = ?1 AND hash = ?2",
[library_id, hash],
)?;
Ok(())
}
// ── General state ──
pub fn get_state(&self, key: &str) -> Result<Option<String>> {
let db = self.lock_db();
let mut stmt = db.prepare("SELECT value FROM sync_state WHERE key = ?1")?;
let mut rows = stmt.query_map([key], |row| row.get::<_, String>(0))?;
match rows.next() {
Some(Ok(v)) => Ok(Some(v)),
Some(Err(e)) => Err(e.into()),
None => Ok(None),
}
}
pub fn set_state(&self, key: &str, value: &str) -> Result<()> {
let db = self.lock_db();
db.execute(
"INSERT OR REPLACE INTO sync_state (key, value) VALUES (?1, ?2)",
[key, value],
)?;
Ok(())
}
}

View File

@ -1,121 +1,155 @@
#![allow(dead_code)]
//! CAN Sync — P2P full-mirror replication agent for CAN Service.
//!
//! Uses iroh for encrypted QUIC transport + NAT traversal,
//! and iroh-gossip for peer discovery via a shared passphrase.
//!
//! Each instance talks to its local CAN Service via the private
//! protobuf sync API (/sync/*), authenticated with an API key.
mod announcer;
mod can_client;
mod config;
mod fetcher;
mod library;
mod manifest;
mod node;
mod routes;
use std::path::PathBuf;
use std::sync::Arc;
mod discovery;
mod peer;
mod protocol;
use anyhow::{Context, Result};
use tracing::info;
use iroh::endpoint::presets::N0;
use iroh::{Endpoint, EndpointId};
use iroh_gossip::net::Gossip;
use tokio::sync::mpsc;
use tracing::{error, info, warn};
use crate::announcer::Announcer;
use crate::can_client::CanClient;
use crate::can_client::CanSyncClient;
use crate::config::SyncConfig;
use crate::fetcher::Fetcher;
use crate::library::SyncState;
use crate::node::SyncNode;
use crate::routes::AppState;
use crate::discovery::Discovery;
/// ALPN protocol identifier for CAN sync peer connections.
const SYNC_ALPN: &[u8] = b"can-sync/1";
#[tokio::main]
async fn main() -> Result<()> {
// Initialize tracing
// Initialize logging
tracing_subscriber::fmt()
.with_env_filter(
tracing_subscriber::EnvFilter::try_from_default_env()
.unwrap_or_else(|_| "can_sync=info,iroh=warn".parse().unwrap()),
.unwrap_or_else(|_| "can_sync=info,iroh=warn,iroh_gossip=warn".parse().unwrap()),
)
.init();
// Load config
let config_path = std::env::args()
.nth(1)
.map(PathBuf::from)
.unwrap_or_else(|| PathBuf::from("config.yaml"));
.unwrap_or_else(|| "config.yaml".to_string());
let config = SyncConfig::load(std::path::Path::new(&config_path))
.with_context(|| format!("loading config from {}", config_path))?;
let config = SyncConfig::load(&config_path)?;
info!("CAN Sync starting...");
info!(" CAN service: {}", config.can_service_url);
info!(" Listen addr: {}", config.listen_addr);
info!(" Data dir: {}", config.data_dir);
info!("CAN Sync v2 starting");
info!("CAN service: {}", config.can_service_url);
info!("Poll interval: {}s", config.poll_interval_secs);
// Ensure data directory exists
std::fs::create_dir_all(config.data_path())
.context("Failed to create data directory")?;
// Create HTTP client for local CAN service's sync API
let can = CanSyncClient::new(&config.can_service_url, &config.sync_api_key);
// Initialize CAN service client
let can = CanClient::new(&config.can_service_url);
// Verify CAN service is reachable
if can.health_check().await {
info!("CAN service sync API is healthy");
} else {
warn!("CAN service sync API not reachable — will retry on sync");
}
// Check CAN service health
match can.health_check().await {
Ok(true) => info!("CAN service is reachable"),
Ok(false) | Err(_) => {
tracing::warn!(
"CAN service at {} is not reachable — will retry on each poll",
config.can_service_url
);
// Create iroh endpoint for QUIC transport with n0 defaults (relay + discovery)
let endpoint = Endpoint::builder()
.preset(N0)
.alpns(vec![SYNC_ALPN.to_vec()])
.bind()
.await
.context("creating iroh endpoint")?;
let node_id = endpoint.id();
info!("Node ID: {}", node_id);
let addrs = endpoint.bound_sockets();
if let Some(addr) = addrs.first() {
info!("Listening on {}", addr);
}
// Create gossip instance for peer discovery (not async — returns directly)
let gossip = Gossip::builder().spawn(endpoint.clone());
// Channel for discovered peers
let (peer_tx, mut peer_rx) = mpsc::channel::<EndpointId>(32);
// Spawn discovery
let disc = Discovery::new(endpoint.clone(), gossip.clone(), &config.sync_passphrase);
tokio::spawn(async move {
if let Err(e) = disc.run(peer_tx).await {
error!("Discovery failed: {:#}", e);
}
});
// Spawn incoming connection handler
let endpoint_accept = endpoint.clone();
let can_accept = can.clone();
tokio::spawn(async move {
loop {
match endpoint_accept.accept().await {
Some(incoming) => {
let can_clone = can_accept.clone();
tokio::spawn(async move {
match incoming.await {
Ok(conn) => {
peer::handle_incoming(conn, can_clone).await;
}
Err(e) => {
warn!("Failed to accept connection: {:#}", e);
}
}
});
}
None => {
info!("Endpoint closed, stopping accept loop");
break;
}
}
}
});
// Main loop: connect to discovered peers and sync
let poll_interval = std::time::Duration::from_secs(config.poll_interval_secs);
info!("Waiting for peers...");
while let Some(peer_id) = peer_rx.recv().await {
let short = peer_id.fmt_short();
info!("Connecting to discovered peer: {}", short);
let endpoint_clone = endpoint.clone();
let can_clone = can.clone();
let poll_dur = poll_interval;
tokio::spawn(async move {
// Connect to peer (EndpointId implements Into<EndpointAddr>)
let conn = match endpoint_clone.connect(peer_id, SYNC_ALPN).await {
Ok(c) => c,
Err(e) => {
error!("Failed to connect to {}: {:#}", short, e);
return;
}
};
// Initial reconciliation
if let Err(e) = peer::run_sync_session(conn.clone(), can_clone.clone(), true).await {
error!("Initial sync with {} failed: {:#}", short, e);
return;
}
// Live sync loop — keep pushing new assets
if let Err(e) = peer::live_sync_loop(conn, can_clone, poll_dur).await {
warn!("Live sync with {} ended: {:#}", short, e);
}
});
}
// Open sync state database
let state = SyncState::open(&config.db_path())?;
let state = Arc::new(state);
info!("Sync state DB opened at {}", config.db_path().display());
// Start iroh P2P node
let node = SyncNode::spawn(&config).await?;
let node = Arc::new(node);
info!("iroh node ID: {}", node.peer_id());
// Build shared app state
let app_state = Arc::new(AppState {
node: node.clone(),
state: state.clone(),
can: can.clone(),
});
// Start the announcer (polls CAN service for new assets)
let announcer = Announcer::new(
can.clone(),
state.clone(),
node.clone(),
config.poll_interval_secs,
config.full_scan_interval_secs,
);
tokio::spawn(async move {
announcer.run().await;
});
// Start the fetcher (receives remote assets and ingests them)
let fetcher = Fetcher::new(can.clone(), state.clone(), node.clone());
tokio::spawn(async move {
fetcher.run().await;
});
// Build HTTP router
let router = routes::build_router(app_state);
// Start HTTP server
let listener = tokio::net::TcpListener::bind(&config.listen_addr)
.await
.context("Failed to bind HTTP listener")?;
info!("CAN Sync API listening on http://{}", config.listen_addr);
// Open browser to status page
let status_url = format!("http://{}/status", config.listen_addr);
if open::that(&status_url).is_err() {
info!("Open {} in your browser to check status", status_url);
}
axum::serve(listener, router)
.await
.context("HTTP server error")?;
info!("CAN Sync shutting down");
Ok(())
}

View File

@ -1,75 +0,0 @@
use serde::{Deserialize, Serialize};
use crate::can_client::AssetMeta;
/// Entry stored in iroh documents for each synced asset.
/// Key = CAN hash, Value = serialized AssetSyncEntry
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct AssetSyncEntry {
/// CAN timestamp (milliseconds since epoch)
pub timestamp: i64,
/// MIME type
pub mime_type: String,
/// Application tag
pub application: Option<String>,
/// User identity
pub user: Option<String>,
/// Tags list
pub tags: Vec<String>,
/// Description
pub description: Option<String>,
/// Original human-readable filename
pub human_filename: Option<String>,
/// Original human-readable path
pub human_path: Option<String>,
/// File size in bytes
pub size: i64,
/// Whether the asset is trashed
pub is_trashed: bool,
/// iroh blob hash (BLAKE3) for downloading via iroh
pub iroh_blob_hash: Option<String>,
/// Version counter for conflict resolution (higher wins)
pub version: u64,
/// Peer ID that last modified this entry
pub last_modified_by: String,
}
impl AssetSyncEntry {
/// Create from CAN service asset metadata
pub fn from_asset_meta(meta: &AssetMeta, peer_id: &str) -> Self {
Self {
timestamp: meta.timestamp,
mime_type: meta.mime_type.clone(),
application: meta.application.clone(),
user: meta.user.clone(),
tags: meta.tags.clone(),
description: meta.description.clone(),
human_filename: meta.human_filename.clone(),
human_path: meta.human_path.clone(),
size: meta.size,
is_trashed: meta.is_trashed,
iroh_blob_hash: None,
version: 1,
last_modified_by: peer_id.to_string(),
}
}
/// Serialize to bytes for storage in iroh document
pub fn to_bytes(&self) -> Vec<u8> {
postcard::to_allocvec(self).expect("serialize AssetSyncEntry")
}
/// Deserialize from bytes
pub fn from_bytes(bytes: &[u8]) -> anyhow::Result<Self> {
Ok(postcard::from_bytes(bytes)?)
}
/// Check if metadata differs from a CAN asset (indicates update needed)
pub fn metadata_differs(&self, meta: &AssetMeta) -> bool {
self.tags != meta.tags
|| self.description != meta.description
|| self.is_trashed != meta.is_trashed
|| self.human_filename != meta.human_filename
|| self.human_path != meta.human_path
}
}

View File

@ -1,150 +0,0 @@
use anyhow::{Context, Result};
use iroh::protocol::Router;
use iroh::Endpoint;
use iroh_blobs::store::mem::MemStore;
use iroh_blobs::{BlobsProtocol, ALPN as BLOBS_ALPN};
use iroh_docs::api::protocol::{AddrInfoOptions, ShareMode};
use iroh_docs::protocol::Docs;
use iroh_docs::{AuthorId, DocTicket, NamespaceId, ALPN as DOCS_ALPN};
use iroh_gossip::net::Gossip;
use iroh_gossip::ALPN as GOSSIP_ALPN;
use tokio::sync::OnceCell;
use crate::config::SyncConfig;
/// Holds all iroh subsystems for the P2P node
pub struct SyncNode {
pub endpoint: Endpoint,
pub blobs: BlobsProtocol,
pub docs: Docs,
pub gossip: Gossip,
pub router: Router,
/// Cached default author ID (created once on startup)
author_id: OnceCell<AuthorId>,
}
impl SyncNode {
/// Start the iroh node with all protocol handlers
pub async fn spawn(_config: &SyncConfig) -> Result<Self> {
// Build endpoint (Ed25519 keypair auto-generated and cached)
let endpoint = Endpoint::bind()
.await
.map_err(|e| anyhow::anyhow!("Failed to bind iroh endpoint: {}", e))?;
tracing::info!(
"iroh node started — EndpointID: {}",
endpoint.id()
);
// Gossip for peer communication
let gossip = Gossip::builder().spawn(endpoint.clone());
// Blob store (in-memory — blobs are transient, CAN service is authoritative)
let mem_store = MemStore::default();
let blobs_store: &iroh_blobs::api::Store = &mem_store;
let blobs = BlobsProtocol::new(blobs_store, None);
// Document sync (CRDT-replicated key-value store)
let docs = Docs::memory()
.spawn(endpoint.clone(), blobs_store.clone(), gossip.clone())
.await
.context("Failed to spawn iroh-docs")?;
// Router accepts incoming connections and dispatches to handlers
let router = Router::builder(endpoint.clone())
.accept(BLOBS_ALPN, blobs.clone())
.accept(GOSSIP_ALPN, gossip.clone())
.accept(DOCS_ALPN, docs.clone())
.spawn();
Ok(Self {
endpoint,
blobs,
docs,
gossip,
router,
author_id: OnceCell::new(),
})
}
/// Get this node's peer ID as a string
pub fn peer_id(&self) -> String {
self.endpoint.id().to_string()
}
/// Get the node's endpoint address info for sharing
pub fn endpoint_addr(&self) -> iroh::EndpointAddr {
self.endpoint.addr()
}
/// Get or create the default author for writing to documents
pub async fn author(&self) -> Result<AuthorId> {
self.author_id
.get_or_try_init(|| async {
self.docs.author_default().await
})
.await
.copied()
}
/// Create a new iroh document and return its NamespaceId as a hex string
pub async fn create_doc(&self) -> Result<String> {
let doc = self.docs.create().await?;
let ns_id = doc.id();
Ok(hex::encode(ns_id.to_bytes()))
}
/// Open an existing document by its hex-encoded namespace ID
pub async fn open_doc(&self, doc_id_hex: &str) -> Result<iroh_docs::api::Doc> {
let ns_id = parse_namespace_id(doc_id_hex)?;
self.docs
.open(ns_id)
.await?
.ok_or_else(|| anyhow::anyhow!("Document {} not found", &doc_id_hex[..12]))
}
/// Write a key-value entry to a document
pub async fn write_to_doc(
&self,
doc_id_hex: &str,
key: &[u8],
value: &[u8],
) -> Result<()> {
let doc = self.open_doc(doc_id_hex).await?;
let author = self.author().await?;
doc.set_bytes(author, key.to_vec(), value.to_vec()).await?;
Ok(())
}
/// Generate a share ticket (DocTicket) for a document
pub async fn share_doc(&self, doc_id_hex: &str) -> Result<DocTicket> {
let doc = self.open_doc(doc_id_hex).await?;
let ticket = doc
.share(ShareMode::Write, AddrInfoOptions::RelayAndAddresses)
.await?;
Ok(ticket)
}
/// Import a document from a DocTicket, returns the namespace ID as hex
pub async fn import_doc(&self, ticket: DocTicket) -> Result<String> {
let doc = self.docs.import(ticket).await?;
let ns_id = doc.id();
Ok(hex::encode(ns_id.to_bytes()))
}
/// Graceful shutdown
pub async fn shutdown(self) -> Result<()> {
tracing::info!("Shutting down iroh node...");
self.router.shutdown().await?;
Ok(())
}
}
/// Parse a hex-encoded NamespaceId
pub fn parse_namespace_id(hex_str: &str) -> Result<NamespaceId> {
let bytes: [u8; 32] = hex::decode(hex_str)
.context("Invalid hex in doc_id")?
.try_into()
.map_err(|_| anyhow::anyhow!("doc_id must be 32 bytes (64 hex chars)"))?;
Ok(NamespaceId::from(bytes))
}

View File

@ -0,0 +1,320 @@
//! Per-peer sync: reconciliation and live bidirectional asset transfer.
//!
//! When two sync agents connect, they:
//! 1. Exchange hash lists (from their local CAN services)
//! 2. Compute the diff (what each side is missing)
//! 3. Send missing assets to each other
//! 4. Continue polling for new assets and pushing them
use std::collections::{HashMap, HashSet};
use anyhow::{Context, Result};
use iroh::endpoint::Connection;
use prost::Message;
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tracing::{debug, error, info, warn};
use crate::can_client::CanSyncClient;
use crate::protocol::*;
// Message type tags for QUIC stream framing
const MSG_HASH_SET: u8 = 0x01;
const MSG_ASSET_BUNDLE: u8 = 0x02;
const MSG_META_UPDATE: u8 = 0x03;
const MSG_DONE: u8 = 0x04;
/// Frame a protobuf message with a type tag and length prefix.
fn encode_frame(msg_type: u8, payload: &[u8]) -> Vec<u8> {
let len = payload.len() as u32;
let mut frame = Vec::with_capacity(5 + payload.len());
frame.push(msg_type);
frame.extend_from_slice(&len.to_be_bytes());
frame.extend_from_slice(payload);
frame
}
/// Read a single framed message from a QUIC recv stream.
/// Returns (msg_type, payload_bytes).
async fn read_frame(recv: &mut iroh::endpoint::RecvStream) -> Result<(u8, Vec<u8>)> {
let msg_type = recv.read_u8().await.context("reading message type")?;
let len = recv.read_u32().await.context("reading message length")?;
if len > 256 * 1024 * 1024 {
anyhow::bail!("Message too large: {} bytes", len);
}
let mut payload = vec![0u8; len as usize];
recv.read_exact(&mut payload)
.await
.context("reading message payload")?;
Ok((msg_type, payload))
}
/// Run a full sync session with a connected peer.
///
/// This handles both initial reconciliation and ongoing transfer.
/// Called for both outgoing connections (we initiated) and incoming connections.
pub async fn run_sync_session(
conn: Connection,
can: CanSyncClient,
is_initiator: bool,
) -> Result<()> {
let peer_id = conn.remote_id();
let short_id = peer_id.fmt_short().to_string();
info!("Starting sync session with {} (initiator={})", short_id, is_initiator);
// Open a bi-directional stream for the sync protocol
let (mut send, mut recv) = conn.open_bi().await.context("opening bi stream")?;
// Step 1: Get our local hash list from CAN service
let our_hashes = can.get_hashes().await.context("getting local hashes")?;
let our_hash_map: HashMap<String, &AssetDigest> = our_hashes
.assets
.iter()
.map(|a| (a.hash.clone(), a))
.collect();
info!(
"Local state: {} assets, sending to peer {}",
our_hashes.assets.len(),
short_id
);
// Step 2: Send our hash set to peer
let hash_set_msg = PeerHashSet {
assets: our_hashes.assets.clone(),
};
let mut buf = Vec::with_capacity(hash_set_msg.encoded_len());
hash_set_msg.encode(&mut buf)?;
let frame = encode_frame(MSG_HASH_SET, &buf);
send.write_all(&frame).await.context("sending hash set")?;
send.flush().await?;
// Step 3: Receive peer's hash set
let (msg_type, payload) = read_frame(&mut recv).await.context("reading peer hash set")?;
if msg_type != MSG_HASH_SET {
anyhow::bail!("Expected hash set message, got type {}", msg_type);
}
let peer_hash_set = PeerHashSet::decode(payload.as_slice()).context("decoding peer hash set")?;
let peer_hash_map: HashMap<String, &AssetDigest> = peer_hash_set
.assets
.iter()
.map(|a| (a.hash.clone(), a))
.collect();
info!(
"Peer {} has {} assets",
short_id,
peer_hash_set.assets.len()
);
// Step 4: Compute diffs
let our_hashes_set: HashSet<&String> = our_hash_map.keys().collect();
let peer_hashes_set: HashSet<&String> = peer_hash_map.keys().collect();
let we_need: Vec<String> = peer_hashes_set
.difference(&our_hashes_set)
.map(|h| (*h).clone())
.collect();
let they_need: Vec<String> = our_hashes_set
.difference(&peer_hashes_set)
.map(|h| (*h).clone())
.collect();
info!(
"Diff with {}: we need {}, they need {}",
short_id,
we_need.len(),
they_need.len()
);
// Step 5: Send assets the peer is missing
if !they_need.is_empty() {
send_assets(&can, &mut send, &they_need, &short_id).await?;
}
// Send DONE marker
let done_frame = encode_frame(MSG_DONE, &[]);
send.write_all(&done_frame).await?;
send.flush().await?;
// Step 6: Receive assets we're missing
receive_assets(&can, &mut recv, &short_id).await?;
info!("Sync session with {} complete", short_id);
Ok(())
}
/// Pull assets from local CAN service and send them to the peer.
async fn send_assets(
can: &CanSyncClient,
send: &mut iroh::endpoint::SendStream,
hashes: &[String],
peer_short: &str,
) -> Result<()> {
// Pull assets in batches to avoid huge single requests
for chunk in hashes.chunks(10) {
let pull_resp = can
.pull(chunk.to_vec())
.await
.context("pulling assets from CAN")?;
for bundle in pull_resp.bundles {
let hash_short = &bundle.hash[..bundle.hash.len().min(12)];
info!("Sending asset {} to peer {}", hash_short, peer_short);
let mut buf = Vec::with_capacity(bundle.encoded_len());
bundle.encode(&mut buf)?;
let frame = encode_frame(MSG_ASSET_BUNDLE, &buf);
send.write_all(&frame).await?;
send.flush().await?;
}
}
Ok(())
}
/// Receive assets from peer and push them to local CAN service.
async fn receive_assets(
can: &CanSyncClient,
recv: &mut iroh::endpoint::RecvStream,
peer_short: &str,
) -> Result<()> {
loop {
let (msg_type, payload) = read_frame(recv).await.context("reading asset from peer")?;
match msg_type {
MSG_DONE => {
debug!("Peer {} finished sending assets", peer_short);
break;
}
MSG_ASSET_BUNDLE => {
let bundle =
AssetBundle::decode(payload.as_slice()).context("decoding asset bundle")?;
let hash_short = bundle.hash[..bundle.hash.len().min(12)].to_string();
info!("Received asset {} from peer {}", hash_short, peer_short);
match can.push(bundle).await {
Ok(resp) => {
if resp.already_existed {
debug!("Asset {} already existed locally", hash_short);
} else {
info!("Ingested asset {} from peer {}", resp.hash, peer_short);
}
}
Err(e) => {
error!("Failed to push asset {} to CAN: {:#}", hash_short, e);
}
}
}
MSG_META_UPDATE => {
let meta = MetaUpdateRequest::decode(payload.as_slice())
.context("decoding meta update")?;
let hash_short = meta.hash[..meta.hash.len().min(12)].to_string();
debug!(
"Received meta update for {} from peer {}",
hash_short, peer_short
);
if let Err(e) = can
.update_meta(
meta.hash.clone(),
meta.description.clone(),
meta.tags.clone(),
meta.is_trashed,
)
.await
{
error!("Failed to update meta for {}: {:#}", hash_short, e);
}
}
other => {
warn!("Unknown message type {} from peer {}", other, peer_short);
}
}
}
Ok(())
}
/// Handle an incoming connection from a peer who connected to us.
pub async fn handle_incoming(conn: Connection, can: CanSyncClient) {
let peer_id = conn.remote_id();
let short_id = peer_id.fmt_short().to_string();
info!("Incoming sync connection from {}", short_id);
if let Err(e) = run_sync_session(conn, can, false).await {
error!("Sync session with {} failed: {:#}", short_id, e);
}
}
/// Run the live sync loop: poll for new local assets and push to peer.
///
/// This runs after initial reconciliation and keeps peers in sync.
pub async fn live_sync_loop(
conn: Connection,
can: CanSyncClient,
poll_interval: std::time::Duration,
) -> Result<()> {
let peer_id = conn.remote_id();
let short_id = peer_id.fmt_short().to_string();
info!("Starting live sync loop with {}", short_id);
// Track what we've already synced
let mut known_hashes: HashSet<String> = {
let resp = can.get_hashes().await?;
resp.assets.into_iter().map(|a| a.hash).collect()
};
let mut interval = tokio::time::interval(poll_interval);
loop {
interval.tick().await;
// Poll for new assets
let resp = match can.get_hashes().await {
Ok(r) => r,
Err(e) => {
warn!("Failed to poll CAN service: {:#}", e);
continue;
}
};
let current_hashes: HashSet<String> =
resp.assets.iter().map(|a| a.hash.clone()).collect();
let new_hashes: Vec<String> = current_hashes
.difference(&known_hashes)
.cloned()
.collect();
if !new_hashes.is_empty() {
info!(
"Detected {} new local assets, pushing to {}",
new_hashes.len(),
short_id
);
// Open a new stream for this batch
match conn.open_bi().await {
Ok((mut send, _recv)) => {
if let Err(e) = send_assets(&can, &mut send, &new_hashes, &short_id).await {
error!("Failed to push new assets to {}: {:#}", short_id, e);
}
// Send done marker
let done_frame = encode_frame(MSG_DONE, &[]);
let _ = send.write_all(&done_frame).await;
let _ = send.flush().await;
let _ = send.finish();
}
Err(e) => {
warn!("Failed to open stream to {}: {:#}", short_id, e);
break; // Connection probably dead
}
}
}
// Update our known set
known_hashes = current_hashes;
}
Ok(())
}

View File

@ -0,0 +1,123 @@
//! Protobuf message types for CAN sync API + peer-to-peer protocol.
//!
//! These match the types in CAN service's routes/sync.rs exactly.
use prost::Message;
// ── CAN Sync API messages (protobuf, same as CAN service) ───────────────
#[derive(Clone, PartialEq, Message)]
pub struct HashListRequest {}
#[derive(Clone, PartialEq, Message)]
pub struct HashListResponse {
#[prost(message, repeated, tag = "1")]
pub assets: Vec<AssetDigest>,
}
#[derive(Clone, PartialEq, Message)]
pub struct AssetDigest {
#[prost(string, tag = "1")]
pub hash: String,
#[prost(int64, tag = "2")]
pub timestamp: i64,
#[prost(int64, tag = "3")]
pub size: i64,
#[prost(bool, tag = "4")]
pub is_trashed: bool,
}
#[derive(Clone, PartialEq, Message)]
pub struct PullRequest {
#[prost(string, repeated, tag = "1")]
pub hashes: Vec<String>,
}
#[derive(Clone, PartialEq, Message)]
pub struct PullResponse {
#[prost(message, repeated, tag = "1")]
pub bundles: Vec<AssetBundle>,
}
#[derive(Clone, PartialEq, Message)]
pub struct AssetBundle {
#[prost(string, tag = "1")]
pub hash: String,
#[prost(int64, tag = "2")]
pub timestamp: i64,
#[prost(string, tag = "3")]
pub mime_type: String,
#[prost(string, optional, tag = "4")]
pub application: Option<String>,
#[prost(string, optional, tag = "5")]
pub user_identity: Option<String>,
#[prost(string, optional, tag = "6")]
pub description: Option<String>,
#[prost(string, optional, tag = "7")]
pub human_filename: Option<String>,
#[prost(string, optional, tag = "8")]
pub human_path: Option<String>,
#[prost(bool, tag = "9")]
pub is_trashed: bool,
#[prost(int64, tag = "10")]
pub size: i64,
#[prost(string, repeated, tag = "11")]
pub tags: Vec<String>,
#[prost(bytes = "vec", tag = "12")]
pub content: Vec<u8>,
}
#[derive(Clone, PartialEq, Message)]
pub struct PushRequest {
#[prost(message, optional, tag = "1")]
pub bundle: Option<AssetBundle>,
}
#[derive(Clone, PartialEq, Message)]
pub struct PushResponse {
#[prost(string, tag = "1")]
pub hash: String,
#[prost(bool, tag = "2")]
pub already_existed: bool,
}
#[derive(Clone, PartialEq, Message)]
pub struct MetaUpdateRequest {
#[prost(string, tag = "1")]
pub hash: String,
#[prost(string, optional, tag = "2")]
pub description: Option<String>,
#[prost(string, repeated, tag = "3")]
pub tags: Vec<String>,
#[prost(bool, tag = "4")]
pub is_trashed: bool,
}
#[derive(Clone, PartialEq, Message)]
pub struct MetaUpdateResponse {
#[prost(bool, tag = "1")]
pub success: bool,
}
// ── Peer-to-peer messages (sent over QUIC streams between sync agents) ──
/// Sent between peers during reconciliation: "here are all the hashes I have"
#[derive(Clone, PartialEq, Message)]
pub struct PeerHashSet {
#[prost(message, repeated, tag = "1")]
pub assets: Vec<AssetDigest>,
}
/// Request from peer: "please send me these assets"
#[derive(Clone, PartialEq, Message)]
pub struct PeerPullRequest {
#[prost(string, repeated, tag = "1")]
pub hashes: Vec<String>,
}
/// A single asset being sent from one peer to another
#[derive(Clone, PartialEq, Message)]
pub struct PeerAssetTransfer {
#[prost(message, optional, tag = "1")]
pub bundle: Option<AssetBundle>,
}

View File

@ -1,430 +0,0 @@
use std::sync::Arc;
use axum::{
extract::{Path, State},
http::StatusCode,
response::IntoResponse,
routing::{get, post},
Json, Router,
};
use serde::{Deserialize, Serialize};
use crate::can_client::CanClient;
use crate::library::{Library, LibraryFilter, SyncState};
use crate::node::SyncNode;
/// Shared application state for route handlers
pub struct AppState {
pub node: Arc<SyncNode>,
pub state: Arc<SyncState>,
pub can: CanClient,
}
// ── Request/Response types ──
#[derive(Serialize)]
struct StatusResponse {
peer_id: String,
can_service_healthy: bool,
library_count: usize,
}
#[derive(Serialize)]
struct PeerInfo {
peer_id: String,
}
#[derive(Deserialize)]
pub struct CreateLibraryRequest {
pub name: String,
pub filter: LibraryFilter,
}
#[derive(Serialize)]
struct LibraryResponse {
id: String,
name: String,
filter: LibraryFilter,
doc_id: Option<String>,
is_local: bool,
created_at: i64,
}
#[derive(Serialize)]
struct InviteResponse {
ticket: String,
}
#[derive(Deserialize)]
pub struct JoinRequest {
pub ticket: String,
}
#[derive(Serialize)]
struct JoinResponse {
library_id: String,
message: String,
}
#[derive(Serialize)]
struct ApiResp<T: Serialize> {
status: String,
data: T,
}
#[derive(Serialize)]
struct ApiErr {
status: String,
error: String,
}
fn ok_json<T: Serialize>(data: T) -> Json<ApiResp<T>> {
Json(ApiResp {
status: "success".to_string(),
data,
})
}
fn err_resp(status: StatusCode, msg: &str) -> (StatusCode, Json<ApiErr>) {
(
status,
Json(ApiErr {
status: "error".to_string(),
error: msg.to_string(),
}),
)
}
// ── Routes ──
pub fn build_router(app_state: Arc<AppState>) -> Router {
Router::new()
.route("/status", get(get_status))
.route("/peers", get(get_peers))
.route("/libraries", post(create_library).get(list_libraries))
.route(
"/libraries/{id}",
get(get_library).delete(delete_library),
)
.route("/libraries/{id}/invite", post(create_invite))
.route("/join", post(join_library))
.with_state(app_state)
}
// ── Handlers ──
async fn get_status(State(app): State<Arc<AppState>>) -> impl IntoResponse {
let can_healthy = app.can.health_check().await.unwrap_or(false);
let lib_count = app.state.list_libraries().unwrap_or_default().len();
ok_json(StatusResponse {
peer_id: app.node.peer_id(),
can_service_healthy: can_healthy,
library_count: lib_count,
})
.into_response()
}
async fn get_peers(State(app): State<Arc<AppState>>) -> impl IntoResponse {
let peers: Vec<PeerInfo> = vec![PeerInfo {
peer_id: app.node.peer_id(),
}];
ok_json(peers).into_response()
}
async fn create_library(
State(app): State<Arc<AppState>>,
Json(req): Json<CreateLibraryRequest>,
) -> impl IntoResponse {
// Create an iroh document for this library
let doc_id = match app.node.create_doc().await {
Ok(id) => Some(id),
Err(e) => {
tracing::warn!("Failed to create iroh document for library: {:#}", e);
None
}
};
let lib = Library {
id: uuid::Uuid::new_v4().to_string(),
name: req.name,
filter: req.filter,
doc_id,
is_local: true,
created_at: chrono::Utc::now().timestamp_millis(),
};
if let Err(e) = app.state.save_library(&lib) {
return err_resp(
StatusCode::INTERNAL_SERVER_ERROR,
&format!("save failed: {}", e),
)
.into_response();
}
tracing::info!(
"Created library '{}' (id={}, doc_id={:?})",
lib.name,
&lib.id[..8],
lib.doc_id.as_deref().map(|d| &d[..12.min(d.len())])
);
ok_json(LibraryResponse {
id: lib.id,
name: lib.name,
filter: lib.filter,
doc_id: lib.doc_id,
is_local: lib.is_local,
created_at: lib.created_at,
})
.into_response()
}
async fn list_libraries(State(app): State<Arc<AppState>>) -> impl IntoResponse {
match app.state.list_libraries() {
Ok(libs) => {
let responses: Vec<LibraryResponse> = libs
.into_iter()
.map(|lib| LibraryResponse {
id: lib.id,
name: lib.name,
filter: lib.filter,
doc_id: lib.doc_id,
is_local: lib.is_local,
created_at: lib.created_at,
})
.collect();
ok_json(responses).into_response()
}
Err(e) => {
err_resp(StatusCode::INTERNAL_SERVER_ERROR, &format!("{}", e)).into_response()
}
}
}
async fn get_library(
State(app): State<Arc<AppState>>,
Path(id): Path<String>,
) -> impl IntoResponse {
match app.state.get_library(&id) {
Ok(Some(lib)) => ok_json(LibraryResponse {
id: lib.id,
name: lib.name,
filter: lib.filter,
doc_id: lib.doc_id,
is_local: lib.is_local,
created_at: lib.created_at,
})
.into_response(),
Ok(None) => err_resp(StatusCode::NOT_FOUND, "Library not found").into_response(),
Err(e) => {
err_resp(StatusCode::INTERNAL_SERVER_ERROR, &format!("{}", e)).into_response()
}
}
}
async fn delete_library(
State(app): State<Arc<AppState>>,
Path(id): Path<String>,
) -> impl IntoResponse {
match app.state.delete_library(&id) {
Ok(()) => ok_json("deleted").into_response(),
Err(e) => {
err_resp(StatusCode::INTERNAL_SERVER_ERROR, &format!("{}", e)).into_response()
}
}
}
async fn create_invite(
State(app): State<Arc<AppState>>,
Path(id): Path<String>,
) -> impl IntoResponse {
match app.state.get_library(&id) {
Ok(Some(lib)) => {
let doc_id = match &lib.doc_id {
Some(d) => d,
None => {
return err_resp(
StatusCode::BAD_REQUEST,
"Library has no iroh document — cannot create invite",
)
.into_response()
}
};
// Generate a real DocTicket via iroh
match app.node.share_doc(doc_id).await {
Ok(ticket) => {
// DocTicket implements Display via iroh's Ticket trait (base32 serialization)
let ticket_str = ticket.to_string();
// Wrap with library metadata so the joiner knows the name and filter
let invite_data = serde_json::json!({
"ticket": ticket_str,
"library_name": lib.name,
"filter": lib.filter,
});
let invite_b64 = base64_encode(
&serde_json::to_vec(&invite_data).unwrap(),
);
ok_json(InviteResponse { ticket: invite_b64 }).into_response()
}
Err(e) => err_resp(
StatusCode::INTERNAL_SERVER_ERROR,
&format!("Failed to create invite: {}", e),
)
.into_response(),
}
}
Ok(None) => err_resp(StatusCode::NOT_FOUND, "Library not found").into_response(),
Err(e) => {
err_resp(StatusCode::INTERNAL_SERVER_ERROR, &format!("{}", e)).into_response()
}
}
}
async fn join_library(
State(app): State<Arc<AppState>>,
Json(req): Json<JoinRequest>,
) -> impl IntoResponse {
// Decode our envelope
let ticket_bytes = match base64_decode(&req.ticket) {
Ok(b) => b,
Err(_) => {
return err_resp(StatusCode::BAD_REQUEST, "Invalid ticket encoding").into_response()
}
};
let ticket_data: serde_json::Value = match serde_json::from_slice(&ticket_bytes) {
Ok(v) => v,
Err(_) => {
return err_resp(StatusCode::BAD_REQUEST, "Invalid ticket data").into_response()
}
};
// Extract the real DocTicket string
let ticket_str = match ticket_data["ticket"].as_str() {
Some(s) => s,
None => {
return err_resp(StatusCode::BAD_REQUEST, "Missing 'ticket' field in invite")
.into_response()
}
};
// Parse DocTicket from the serialized string
let doc_ticket: iroh_docs::DocTicket = match ticket_str.parse() {
Ok(t) => t,
Err(e) => {
return err_resp(
StatusCode::BAD_REQUEST,
&format!("Invalid DocTicket: {}", e),
)
.into_response()
}
};
// Import the document via iroh (starts sync with remote peers)
let doc_id_hex = match app.node.import_doc(doc_ticket).await {
Ok(id) => id,
Err(e) => {
return err_resp(
StatusCode::INTERNAL_SERVER_ERROR,
&format!("Failed to join document: {}", e),
)
.into_response()
}
};
let name = ticket_data["library_name"]
.as_str()
.unwrap_or("remote library")
.to_string();
let filter: LibraryFilter = serde_json::from_value(ticket_data["filter"].clone())
.unwrap_or(LibraryFilter {
application: None,
tags: None,
user: None,
mime_prefix: None,
hashes: None,
});
let lib = Library {
id: uuid::Uuid::new_v4().to_string(),
name: name.clone(),
filter,
doc_id: Some(doc_id_hex),
is_local: false,
created_at: chrono::Utc::now().timestamp_millis(),
};
if let Err(e) = app.state.save_library(&lib) {
return err_resp(
StatusCode::INTERNAL_SERVER_ERROR,
&format!("save failed: {}", e),
)
.into_response();
}
tracing::info!(
"Joined library '{}' (id={}, doc_id={:?})",
name,
&lib.id[..8],
lib.doc_id.as_deref().map(|d| &d[..12.min(d.len())])
);
ok_json(JoinResponse {
library_id: lib.id,
message: "Joined library successfully".to_string(),
})
.into_response()
}
// ── Base64 helpers ──
fn base64_encode(data: &[u8]) -> String {
const CHARS: &[u8] = b"ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789+/";
let mut result = Vec::new();
for chunk in data.chunks(3) {
let b0 = chunk[0] as u32;
let b1 = if chunk.len() > 1 { chunk[1] as u32 } else { 0 };
let b2 = if chunk.len() > 2 { chunk[2] as u32 } else { 0 };
let triple = (b0 << 16) | (b1 << 8) | b2;
result.push(CHARS[((triple >> 18) & 0x3F) as usize]);
result.push(CHARS[((triple >> 12) & 0x3F) as usize]);
if chunk.len() > 1 {
result.push(CHARS[((triple >> 6) & 0x3F) as usize]);
} else {
result.push(b'=');
}
if chunk.len() > 2 {
result.push(CHARS[(triple & 0x3F) as usize]);
} else {
result.push(b'=');
}
}
String::from_utf8(result).unwrap()
}
fn base64_decode(input: &str) -> Result<Vec<u8>, &'static str> {
const CHARS: &[u8] = b"ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789+/";
let input = input.trim_end_matches('=');
let bytes: Vec<u8> = input
.bytes()
.filter_map(|b| CHARS.iter().position(|&c| c == b).map(|p| p as u8))
.collect();
let mut buf = Vec::new();
for chunk in bytes.chunks(4) {
if chunk.len() >= 2 {
buf.push((chunk[0] << 2) | (chunk[1] >> 4));
}
if chunk.len() >= 3 {
buf.push((chunk[1] << 4) | (chunk[2] >> 2));
}
if chunk.len() >= 4 {
buf.push((chunk[2] << 6) | chunk[3]);
}
}
Ok(buf)
}