Add internet peer discovery via pkarr relay rendezvous

Peers sharing the same sync_passphrase can now find each other
automatically over the internet without manual ticket exchange or
port forwarding. Uses n0's public pkarr relay servers as a
rendezvous point.

How it works:
- Derive 8 deterministic Ed25519 keypair "slots" from the passphrase
- Each peer claims a slot by publishing its EndpointId as a TXT record
- All peers scan all 8 slots every 15s to discover new peers
- Re-publish every 60s with 5min TTL to stay visible
- Discovered EndpointIds feed into the same peer channel as gossip

This runs alongside the existing gossip discovery (which still needs
bootstrap peers) and direct ticket-file connections (used by tests).

All 6 stress tests pass (102 assets, 63+ MB/s bidirectional).

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
Jason Tudisco 2026-03-13 13:53:34 -06:00
parent 71002939a1
commit 69e4f13c22
4 changed files with 215 additions and 3 deletions

View File

@ -229,17 +229,20 @@ dependencies = [
"anyhow",
"blake3",
"bytes",
"ed25519-dalek",
"futures-util",
"hex",
"iroh",
"iroh-gossip",
"n0-future 0.1.3",
"pkarr",
"prost",
"rand",
"reqwest 0.12.28",
"serde",
"serde_json",
"serde_yaml",
"simple-dns",
"tempfile",
"tokio",
"tokio-stream",

View File

@ -29,6 +29,13 @@ serde_yaml = "0.9"
# Crypto
blake3 = "1"
ed25519-dalek = "3.0.0-pre.1"
# Pkarr (internet rendezvous via relay servers — relay only, no DHT to avoid digest conflict)
pkarr = { version = "5", default-features = false, features = ["relays"] }
# DNS record parsing (used by pkarr)
simple-dns = "0.9"
# Async runtime
tokio = { version = "1", features = ["full"] }

View File

@ -11,6 +11,7 @@ mod config;
mod discovery;
mod peer;
mod protocol;
mod rendezvous;
use std::path::Path;
@ -24,6 +25,7 @@ use tracing::{error, info, warn};
use crate::can_client::CanSyncClient;
use crate::config::SyncConfig;
use crate::discovery::Discovery;
use crate::rendezvous::Rendezvous;
/// ALPN protocol identifier for CAN sync peer connections.
const SYNC_ALPN: &[u8] = b"can-sync/1";
@ -94,11 +96,22 @@ async fn main() -> Result<()> {
// Channel for discovered peers
let (peer_tx, mut peer_rx) = mpsc::channel::<EndpointId>(32);
// Spawn discovery via gossip
// Spawn discovery via gossip (works once bootstrap peers are known)
let disc = Discovery::new(endpoint.clone(), gossip.clone(), &config.sync_passphrase);
let peer_tx_gossip = peer_tx.clone();
tokio::spawn(async move {
if let Err(e) = disc.run(peer_tx.clone()).await {
error!("Discovery failed: {:#}", e);
if let Err(e) = disc.run(peer_tx_gossip).await {
error!("Gossip discovery failed: {:#}", e);
}
});
// Spawn internet rendezvous via pkarr relay (discovers peers worldwide)
let rendezvous = Rendezvous::new(&config.sync_passphrase, node_id)
.context("creating rendezvous")?;
let peer_tx_rdv = peer_tx.clone();
tokio::spawn(async move {
if let Err(e) = rendezvous.run(peer_tx_rdv).await {
error!("Rendezvous discovery failed: {:#}", e);
}
});

View File

@ -0,0 +1,189 @@
//! Internet peer discovery via pkarr relay servers.
//!
//! Derives deterministic keypair "slots" from the shared passphrase.
//! Each peer claims a slot by publishing its EndpointId as a TXT record.
//! All peers scan all slots periodically to discover each other.
//!
//! This works over the internet — no LAN, no port forwarding needed.
//! Uses n0's public pkarr relay servers (same infrastructure as iroh).
use std::collections::HashSet;
use std::time::Duration;
use anyhow::{Context, Result};
use iroh::EndpointId;
use pkarr::{Client as PkarrClient, Keypair, SignedPacket};
use simple_dns::rdata::RData;
use tokio::sync::mpsc;
use tracing::{debug, info, warn};
const NUM_SLOTS: usize = 8;
const PUBLISH_INTERVAL: Duration = Duration::from_secs(60);
const SCAN_INTERVAL: Duration = Duration::from_secs(15);
const RECORD_NAME: &str = "_can_sync";
/// Derive a deterministic pkarr keypair for a given slot index.
fn derive_slot_keypair(passphrase: &str, slot: usize) -> Keypair {
let seed = blake3::hash(
format!("can-sync-rendezvous:{}:{}", passphrase, slot).as_bytes(),
);
let seed_bytes: [u8; 32] = *seed.as_bytes();
let secret = ed25519_dalek::SecretKey::from(seed_bytes);
Keypair::from_secret_key(&secret)
}
/// Internet peer discovery via pkarr relay.
pub struct Rendezvous {
slots: Vec<Keypair>,
our_id: EndpointId,
client: PkarrClient,
}
impl Rendezvous {
pub fn new(passphrase: &str, our_id: EndpointId) -> Result<Self> {
let slots: Vec<Keypair> = (0..NUM_SLOTS)
.map(|i| derive_slot_keypair(passphrase, i))
.collect();
let client = PkarrClient::builder()
.build()
.context("creating pkarr client")?;
Ok(Self {
slots,
our_id,
client,
})
}
/// Run the rendezvous loop: claim a slot, periodically re-publish and scan.
pub async fn run(self, tx: mpsc::Sender<EndpointId>) -> Result<()> {
let our_id_hex = hex::encode(self.our_id.as_bytes());
info!(
"Rendezvous: starting internet discovery ({} slots, publish every {}s, scan every {}s)",
NUM_SLOTS,
PUBLISH_INTERVAL.as_secs(),
SCAN_INTERVAL.as_secs(),
);
// Claim our slot (first empty, or hash-based fallback)
let our_slot = self.claim_slot(&our_id_hex).await;
info!("Rendezvous: claimed slot {}", our_slot);
let mut known_peers: HashSet<EndpointId> = HashSet::new();
let mut publish_tick = tokio::time::interval(PUBLISH_INTERVAL);
let mut scan_tick = tokio::time::interval(SCAN_INTERVAL);
// Do an initial scan immediately
self.scan_all_slots(&mut known_peers, &tx).await;
loop {
tokio::select! {
_ = publish_tick.tick() => {
if let Err(e) = self.publish_slot(our_slot, &our_id_hex).await {
warn!("Rendezvous: failed to re-publish slot {}: {:#}", our_slot, e);
}
}
_ = scan_tick.tick() => {
self.scan_all_slots(&mut known_peers, &tx).await;
}
}
}
}
async fn scan_all_slots(
&self,
known_peers: &mut HashSet<EndpointId>,
tx: &mpsc::Sender<EndpointId>,
) {
for i in 0..NUM_SLOTS {
match self.read_slot(i).await {
Some(peer_id) if peer_id != self.our_id && known_peers.insert(peer_id) => {
info!(
"Rendezvous: discovered peer {} in slot {}",
peer_id.fmt_short(),
i
);
let _ = tx.send(peer_id).await;
}
_ => {}
}
}
}
async fn claim_slot(&self, our_id_hex: &str) -> usize {
// Check if we already own a slot (from a previous run)
for i in 0..NUM_SLOTS {
if let Some(peer_id) = self.read_slot(i).await {
if peer_id == self.our_id {
debug!("Rendezvous: already own slot {}", i);
return i;
}
}
}
// Claim first empty slot
for i in 0..NUM_SLOTS {
if self.read_slot(i).await.is_none() {
if let Err(e) = self.publish_slot(i, our_id_hex).await {
warn!("Rendezvous: failed to claim slot {}: {:#}", i, e);
continue;
}
return i;
}
}
// All slots occupied — use deterministic slot based on our ID
let slot = {
let h = blake3::hash(self.our_id.as_bytes());
let bytes: [u8; 8] = h.as_bytes()[..8].try_into().unwrap();
u64::from_le_bytes(bytes) as usize % NUM_SLOTS
};
let _ = self.publish_slot(slot, our_id_hex).await;
slot
}
async fn publish_slot(&self, slot: usize, our_id_hex: &str) -> Result<()> {
let keypair = &self.slots[slot];
let packet = SignedPacket::builder()
.txt(
RECORD_NAME.try_into().context("invalid record name")?,
our_id_hex.try_into().context("invalid txt value")?,
300, // 5 min TTL
)
.sign(keypair)
.context("signing pkarr packet")?;
self.client
.publish(&packet, None)
.await
.context("publishing to pkarr relay")?;
debug!("Rendezvous: published slot {}", slot);
Ok(())
}
async fn read_slot(&self, slot: usize) -> Option<EndpointId> {
let public_key = self.slots[slot].public_key();
let packet = self.client.resolve(&public_key).await?;
// Use pkarr's resource_records iterator to find our TXT record
for record in packet.resource_records(RECORD_NAME) {
if let RData::TXT(txt) = &record.rdata {
// Try to extract the hex-encoded EndpointId from TXT attributes
if let Ok(txt_string) = String::try_from(txt.clone()) {
let hex_str = txt_string.trim();
if let Ok(bytes) = hex::decode(hex_str) {
if bytes.len() == 32 {
if let Ok(arr) = <[u8; 32]>::try_from(bytes.as_slice()) {
return EndpointId::from_bytes(&arr).ok();
}
}
}
}
}
}
None
}
}