From 69e4f13c2281aa22f4214c2c0d13ad291916659a Mon Sep 17 00:00:00 2001 From: Jason Tudisco Date: Fri, 13 Mar 2026 13:53:34 -0600 Subject: [PATCH] Add internet peer discovery via pkarr relay rendezvous Peers sharing the same sync_passphrase can now find each other automatically over the internet without manual ticket exchange or port forwarding. Uses n0's public pkarr relay servers as a rendezvous point. How it works: - Derive 8 deterministic Ed25519 keypair "slots" from the passphrase - Each peer claims a slot by publishing its EndpointId as a TXT record - All peers scan all 8 slots every 15s to discover new peers - Re-publish every 60s with 5min TTL to stay visible - Discovered EndpointIds feed into the same peer channel as gossip This runs alongside the existing gossip discovery (which still needs bootstrap peers) and direct ticket-file connections (used by tests). All 6 stress tests pass (102 assets, 63+ MB/s bidirectional). Co-Authored-By: Claude Opus 4.6 --- examples/can-sync/Cargo.lock | 3 + examples/can-sync/Cargo.toml | 7 ++ examples/can-sync/src/main.rs | 19 ++- examples/can-sync/src/rendezvous.rs | 189 ++++++++++++++++++++++++++++ 4 files changed, 215 insertions(+), 3 deletions(-) create mode 100644 examples/can-sync/src/rendezvous.rs diff --git a/examples/can-sync/Cargo.lock b/examples/can-sync/Cargo.lock index 36a79b2..6dd5f78 100644 --- a/examples/can-sync/Cargo.lock +++ b/examples/can-sync/Cargo.lock @@ -229,17 +229,20 @@ dependencies = [ "anyhow", "blake3", "bytes", + "ed25519-dalek", "futures-util", "hex", "iroh", "iroh-gossip", "n0-future 0.1.3", + "pkarr", "prost", "rand", "reqwest 0.12.28", "serde", "serde_json", "serde_yaml", + "simple-dns", "tempfile", "tokio", "tokio-stream", diff --git a/examples/can-sync/Cargo.toml b/examples/can-sync/Cargo.toml index 700948f..4221293 100644 --- a/examples/can-sync/Cargo.toml +++ b/examples/can-sync/Cargo.toml @@ -29,6 +29,13 @@ serde_yaml = "0.9" # Crypto blake3 = "1" +ed25519-dalek = "3.0.0-pre.1" + +# Pkarr (internet rendezvous via relay servers — relay only, no DHT to avoid digest conflict) +pkarr = { version = "5", default-features = false, features = ["relays"] } + +# DNS record parsing (used by pkarr) +simple-dns = "0.9" # Async runtime tokio = { version = "1", features = ["full"] } diff --git a/examples/can-sync/src/main.rs b/examples/can-sync/src/main.rs index b5a4605..c7a4287 100644 --- a/examples/can-sync/src/main.rs +++ b/examples/can-sync/src/main.rs @@ -11,6 +11,7 @@ mod config; mod discovery; mod peer; mod protocol; +mod rendezvous; use std::path::Path; @@ -24,6 +25,7 @@ use tracing::{error, info, warn}; use crate::can_client::CanSyncClient; use crate::config::SyncConfig; use crate::discovery::Discovery; +use crate::rendezvous::Rendezvous; /// ALPN protocol identifier for CAN sync peer connections. const SYNC_ALPN: &[u8] = b"can-sync/1"; @@ -94,11 +96,22 @@ async fn main() -> Result<()> { // Channel for discovered peers let (peer_tx, mut peer_rx) = mpsc::channel::(32); - // Spawn discovery via gossip + // Spawn discovery via gossip (works once bootstrap peers are known) let disc = Discovery::new(endpoint.clone(), gossip.clone(), &config.sync_passphrase); + let peer_tx_gossip = peer_tx.clone(); tokio::spawn(async move { - if let Err(e) = disc.run(peer_tx.clone()).await { - error!("Discovery failed: {:#}", e); + if let Err(e) = disc.run(peer_tx_gossip).await { + error!("Gossip discovery failed: {:#}", e); + } + }); + + // Spawn internet rendezvous via pkarr relay (discovers peers worldwide) + let rendezvous = Rendezvous::new(&config.sync_passphrase, node_id) + .context("creating rendezvous")?; + let peer_tx_rdv = peer_tx.clone(); + tokio::spawn(async move { + if let Err(e) = rendezvous.run(peer_tx_rdv).await { + error!("Rendezvous discovery failed: {:#}", e); } }); diff --git a/examples/can-sync/src/rendezvous.rs b/examples/can-sync/src/rendezvous.rs new file mode 100644 index 0000000..44687ab --- /dev/null +++ b/examples/can-sync/src/rendezvous.rs @@ -0,0 +1,189 @@ +//! Internet peer discovery via pkarr relay servers. +//! +//! Derives deterministic keypair "slots" from the shared passphrase. +//! Each peer claims a slot by publishing its EndpointId as a TXT record. +//! All peers scan all slots periodically to discover each other. +//! +//! This works over the internet — no LAN, no port forwarding needed. +//! Uses n0's public pkarr relay servers (same infrastructure as iroh). + +use std::collections::HashSet; +use std::time::Duration; + +use anyhow::{Context, Result}; +use iroh::EndpointId; +use pkarr::{Client as PkarrClient, Keypair, SignedPacket}; +use simple_dns::rdata::RData; +use tokio::sync::mpsc; +use tracing::{debug, info, warn}; + +const NUM_SLOTS: usize = 8; +const PUBLISH_INTERVAL: Duration = Duration::from_secs(60); +const SCAN_INTERVAL: Duration = Duration::from_secs(15); +const RECORD_NAME: &str = "_can_sync"; + +/// Derive a deterministic pkarr keypair for a given slot index. +fn derive_slot_keypair(passphrase: &str, slot: usize) -> Keypair { + let seed = blake3::hash( + format!("can-sync-rendezvous:{}:{}", passphrase, slot).as_bytes(), + ); + let seed_bytes: [u8; 32] = *seed.as_bytes(); + let secret = ed25519_dalek::SecretKey::from(seed_bytes); + Keypair::from_secret_key(&secret) +} + +/// Internet peer discovery via pkarr relay. +pub struct Rendezvous { + slots: Vec, + our_id: EndpointId, + client: PkarrClient, +} + +impl Rendezvous { + pub fn new(passphrase: &str, our_id: EndpointId) -> Result { + let slots: Vec = (0..NUM_SLOTS) + .map(|i| derive_slot_keypair(passphrase, i)) + .collect(); + + let client = PkarrClient::builder() + .build() + .context("creating pkarr client")?; + + Ok(Self { + slots, + our_id, + client, + }) + } + + /// Run the rendezvous loop: claim a slot, periodically re-publish and scan. + pub async fn run(self, tx: mpsc::Sender) -> Result<()> { + let our_id_hex = hex::encode(self.our_id.as_bytes()); + info!( + "Rendezvous: starting internet discovery ({} slots, publish every {}s, scan every {}s)", + NUM_SLOTS, + PUBLISH_INTERVAL.as_secs(), + SCAN_INTERVAL.as_secs(), + ); + + // Claim our slot (first empty, or hash-based fallback) + let our_slot = self.claim_slot(&our_id_hex).await; + info!("Rendezvous: claimed slot {}", our_slot); + + let mut known_peers: HashSet = HashSet::new(); + let mut publish_tick = tokio::time::interval(PUBLISH_INTERVAL); + let mut scan_tick = tokio::time::interval(SCAN_INTERVAL); + + // Do an initial scan immediately + self.scan_all_slots(&mut known_peers, &tx).await; + + loop { + tokio::select! { + _ = publish_tick.tick() => { + if let Err(e) = self.publish_slot(our_slot, &our_id_hex).await { + warn!("Rendezvous: failed to re-publish slot {}: {:#}", our_slot, e); + } + } + _ = scan_tick.tick() => { + self.scan_all_slots(&mut known_peers, &tx).await; + } + } + } + } + + async fn scan_all_slots( + &self, + known_peers: &mut HashSet, + tx: &mpsc::Sender, + ) { + for i in 0..NUM_SLOTS { + match self.read_slot(i).await { + Some(peer_id) if peer_id != self.our_id && known_peers.insert(peer_id) => { + info!( + "Rendezvous: discovered peer {} in slot {}", + peer_id.fmt_short(), + i + ); + let _ = tx.send(peer_id).await; + } + _ => {} + } + } + } + + async fn claim_slot(&self, our_id_hex: &str) -> usize { + // Check if we already own a slot (from a previous run) + for i in 0..NUM_SLOTS { + if let Some(peer_id) = self.read_slot(i).await { + if peer_id == self.our_id { + debug!("Rendezvous: already own slot {}", i); + return i; + } + } + } + + // Claim first empty slot + for i in 0..NUM_SLOTS { + if self.read_slot(i).await.is_none() { + if let Err(e) = self.publish_slot(i, our_id_hex).await { + warn!("Rendezvous: failed to claim slot {}: {:#}", i, e); + continue; + } + return i; + } + } + + // All slots occupied — use deterministic slot based on our ID + let slot = { + let h = blake3::hash(self.our_id.as_bytes()); + let bytes: [u8; 8] = h.as_bytes()[..8].try_into().unwrap(); + u64::from_le_bytes(bytes) as usize % NUM_SLOTS + }; + let _ = self.publish_slot(slot, our_id_hex).await; + slot + } + + async fn publish_slot(&self, slot: usize, our_id_hex: &str) -> Result<()> { + let keypair = &self.slots[slot]; + let packet = SignedPacket::builder() + .txt( + RECORD_NAME.try_into().context("invalid record name")?, + our_id_hex.try_into().context("invalid txt value")?, + 300, // 5 min TTL + ) + .sign(keypair) + .context("signing pkarr packet")?; + + self.client + .publish(&packet, None) + .await + .context("publishing to pkarr relay")?; + + debug!("Rendezvous: published slot {}", slot); + Ok(()) + } + + async fn read_slot(&self, slot: usize) -> Option { + let public_key = self.slots[slot].public_key(); + let packet = self.client.resolve(&public_key).await?; + + // Use pkarr's resource_records iterator to find our TXT record + for record in packet.resource_records(RECORD_NAME) { + if let RData::TXT(txt) = &record.rdata { + // Try to extract the hex-encoded EndpointId from TXT attributes + if let Ok(txt_string) = String::try_from(txt.clone()) { + let hex_str = txt_string.trim(); + if let Ok(bytes) = hex::decode(hex_str) { + if bytes.len() == 32 { + if let Ok(arr) = <[u8; 32]>::try_from(bytes.as_slice()) { + return EndpointId::from_bytes(&arr).ok(); + } + } + } + } + } + } + + None + } +}