From a28fac6c9a64f79d013c5434ded4bb53d4f297ff Mon Sep 17 00:00:00 2001 From: Jason Tudisco Date: Thu, 12 Mar 2026 12:30:12 -0600 Subject: [PATCH] Rewrite can-sync v2: simplified P2P full-mirror replication Replace the over-engineered iroh-docs/libraries/filters architecture with a simple peer-to-peer sync using: - iroh 0.96 Endpoint for QUIC transport + NAT traversal - iroh-gossip for peer discovery via shared passphrase - Protobuf messages over QUIC streams for asset transfer - CAN service's private /sync/* API for local data access Deleted: announcer, fetcher, library, manifest, node, routes (2860 lines) Added: discovery, peer, protocol (simplified ~600 lines) Co-Authored-By: Claude Opus 4.6 --- examples/can-sync/Cargo.lock | 1042 ++++----------------------- examples/can-sync/Cargo.toml | 42 +- examples/can-sync/config.yaml | 20 +- examples/can-sync/src/announcer.rs | 234 ------ examples/can-sync/src/can_client.rs | 333 ++------- examples/can-sync/src/config.rs | 76 +- examples/can-sync/src/discovery.rs | 109 +++ examples/can-sync/src/fetcher.rs | 352 --------- examples/can-sync/src/library.rs | 288 -------- examples/can-sync/src/main.rs | 220 +++--- examples/can-sync/src/manifest.rs | 75 -- examples/can-sync/src/node.rs | 150 ---- examples/can-sync/src/peer.rs | 320 ++++++++ examples/can-sync/src/protocol.rs | 123 ++++ examples/can-sync/src/routes.rs | 430 ----------- 15 files changed, 944 insertions(+), 2870 deletions(-) delete mode 100644 examples/can-sync/src/announcer.rs create mode 100644 examples/can-sync/src/discovery.rs delete mode 100644 examples/can-sync/src/fetcher.rs delete mode 100644 examples/can-sync/src/library.rs delete mode 100644 examples/can-sync/src/manifest.rs delete mode 100644 examples/can-sync/src/node.rs create mode 100644 examples/can-sync/src/peer.rs create mode 100644 examples/can-sync/src/protocol.rs delete mode 100644 examples/can-sync/src/routes.rs diff --git a/examples/can-sync/Cargo.lock b/examples/can-sync/Cargo.lock index c36ce67..20c99d8 100644 --- a/examples/can-sync/Cargo.lock +++ b/examples/can-sync/Cargo.lock @@ -2,18 +2,6 @@ # It is not intended for manual editing. version = 4 -[[package]] -name = "ahash" -version = "0.8.12" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5a15f179cd60c4584b8a8c596927aadc462e27f2ca70c04e0071964a73ba7a75" -dependencies = [ - "cfg-if", - "once_cell", - "version_check", - "zerocopy", -] - [[package]] name = "aho-corasick" version = "1.1.4" @@ -56,57 +44,6 @@ version = "0.7.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7c02d123df017efcdfbd739ef81735b36c5ba83ec3c59c80a9d7ecc718f92e50" -[[package]] -name = "asn1-rs" -version = "0.7.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "56624a96882bb8c26d61312ae18cb45868e5a9992ea73c58e45c3101e56a1e60" -dependencies = [ - "asn1-rs-derive", - "asn1-rs-impl", - "displaydoc", - "nom", - "num-traits", - "rusticata-macros", - "thiserror 2.0.18", - "time", -] - -[[package]] -name = "asn1-rs-derive" -version = "0.6.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3109e49b1e4909e9db6515a30c633684d68cdeaa252f215214cb4fa1a5bfee2c" -dependencies = [ - "proc-macro2", - "quote", - "syn 2.0.117", - "synstructure", -] - -[[package]] -name = "asn1-rs-impl" -version = "0.2.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7b18050c2cd6fe86c3a76584ef5e0baf286d038cda203eb6223df2cc413565f7" -dependencies = [ - "proc-macro2", - "quote", - "syn 2.0.117", -] - -[[package]] -name = "async-channel" -version = "2.5.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "924ed96dd52d1b75e9c1a3e6275715fd320f5f9439fb5a4a11fa51f4221158d2" -dependencies = [ - "concurrent-queue", - "event-listener-strategy", - "futures-core", - "pin-project-lite", -] - [[package]] name = "async-compat" version = "0.2.5" @@ -128,7 +65,7 @@ checksum = "9035ad2d096bed7955a320ee7e2230574d28fd3c3a0f186cbea1ff3c7eed5dbb" dependencies = [ "proc-macro2", "quote", - "syn 2.0.117", + "syn", ] [[package]] @@ -197,58 +134,6 @@ dependencies = [ "fs_extra", ] -[[package]] -name = "axum" -version = "0.8.8" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8b52af3cb4058c895d37317bb27508dccc8e5f2d39454016b297bf4a400597b8" -dependencies = [ - "axum-core", - "bytes", - "form_urlencoded", - "futures-util", - "http", - "http-body", - "http-body-util", - "hyper", - "hyper-util", - "itoa", - "matchit", - "memchr", - "mime", - "percent-encoding", - "pin-project-lite", - "serde_core", - "serde_json", - "serde_path_to_error", - "serde_urlencoded", - "sync_wrapper", - "tokio", - "tower", - "tower-layer", - "tower-service", - "tracing", -] - -[[package]] -name = "axum-core" -version = "0.5.6" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "08c78f31d7b1291f7ee735c1c6780ccde7785daae9a9206026862dab7d8792d1" -dependencies = [ - "bytes", - "futures-core", - "http", - "http-body", - "http-body-util", - "mime", - "pin-project-lite", - "sync_wrapper", - "tower-layer", - "tower-service", - "tracing", -] - [[package]] name = "backon" version = "1.6.0" @@ -260,25 +145,6 @@ dependencies = [ "tokio", ] -[[package]] -name = "bao-tree" -version = "0.16.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "06384416b1825e6e04fde63262fda2dc408f5b64c02d04e0d8b70ae72c17a52b" -dependencies = [ - "blake3", - "bytes", - "futures-lite", - "genawaiter", - "iroh-io", - "positioned-io", - "range-collections", - "self_cell", - "serde", - "smallvec", - "tokio", -] - [[package]] name = "base32" version = "0.5.1" @@ -297,12 +163,6 @@ version = "1.8.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2af50177e190e07a26ab74f8b1efbfe2ef87da2116221318cb1c2e82baf7de06" -[[package]] -name = "binary-merge" -version = "0.1.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "597bb81c80a54b6a4381b23faba8d7774b144c94cbd1d6fe3f1329bd776554ab" - [[package]] name = "bitflags" version = "2.11.0" @@ -323,15 +183,6 @@ dependencies = [ "cpufeatures", ] -[[package]] -name = "block-buffer" -version = "0.10.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3078c7629b62d3f0439517fa394996acacc5cbc91c5a20d8c658e77abd503a71" -dependencies = [ - "generic-array", -] - [[package]] name = "block-buffer" version = "0.11.0" @@ -373,32 +224,22 @@ dependencies = [ [[package]] name = "can-sync" -version = "0.1.0" +version = "0.2.0" dependencies = [ "anyhow", - "axum", + "blake3", "bytes", - "chrono", - "futures-lite", "hex", "iroh", - "iroh-blobs", - "iroh-docs", "iroh-gossip", - "open", - "postcard", + "n0-future 0.1.3", + "prost", "reqwest 0.12.28", - "rusqlite", "serde", - "serde_json", "serde_yaml", - "sha2 0.10.9", "tokio", - "tokio-util", - "tower-http", "tracing", "tracing-subscriber", - "uuid", ] [[package]] @@ -438,10 +279,8 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c673075a2e0e5f4a1dde27ce9dee1ea4558c7ffe648f576438a20ca1d2acc4b0" dependencies = [ "iana-time-zone", - "js-sys", "num-traits", "serde", - "wasm-bindgen", "windows-link", ] @@ -473,15 +312,6 @@ dependencies = [ "memchr", ] -[[package]] -name = "concurrent-queue" -version = "2.5.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4ca0197aee26d1ae37445ee532fefce43251d24cc7c166799f4d46817f1d3973" -dependencies = [ - "crossbeam-utils", -] - [[package]] name = "const-oid" version = "0.10.2" @@ -578,16 +408,6 @@ version = "0.8.21" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d0a5c400df2834b80a4c3327b3aad3a4c4cd4de0629063962b03235697506a28" -[[package]] -name = "crypto-common" -version = "0.1.7" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "78c8292055d1c1df0cce5d180393dc8cce0abec0a7102adb6c7b1eef6016d60a" -dependencies = [ - "generic-array", - "typenum", -] - [[package]] name = "crypto-common" version = "0.2.1" @@ -606,7 +426,7 @@ dependencies = [ "cfg-if", "cpufeatures", "curve25519-dalek-derive", - "digest 0.11.0-rc.10", + "digest", "fiat-crypto", "rand_core", "rustc_version", @@ -623,7 +443,7 @@ checksum = "f46882e17999c6cc590af592290432be3bce0428cb0d5f8b6715e4dc7b383eb3" dependencies = [ "proc-macro2", "quote", - "syn 2.0.117", + "syn", ] [[package]] @@ -647,7 +467,7 @@ dependencies = [ "proc-macro2", "quote", "strsim", - "syn 2.0.117", + "syn", ] [[package]] @@ -658,7 +478,7 @@ checksum = "fc34b93ccb385b40dc71c6fceac4b2ad23662c7eeb248cf10d529b7e055b6ead" dependencies = [ "darling_core", "quote", - "syn 2.0.117", + "syn", ] [[package]] @@ -678,20 +498,6 @@ dependencies = [ "zeroize", ] -[[package]] -name = "der-parser" -version = "10.0.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "07da5016415d5a3c4dd39b11ed26f915f52fc4e0dc197d87908bc916e51bc1a6" -dependencies = [ - "asn1-rs", - "displaydoc", - "nom", - "num-bigint", - "num-traits", - "rusticata-macros", -] - [[package]] name = "deranged" version = "0.5.8" @@ -719,7 +525,7 @@ dependencies = [ "darling", "proc-macro2", "quote", - "syn 2.0.117", + "syn", ] [[package]] @@ -729,7 +535,16 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ab63b0e2bf4d5928aff72e83a7dace85d7bba5fe12dcc3c5a572d78caffd3f3c" dependencies = [ "derive_builder_core", - "syn 2.0.117", + "syn", +] + +[[package]] +name = "derive_more" +version = "1.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4a9b99b9cbbe49445b21764dc0625032a89b145a2642e67603e1c936f5458d05" +dependencies = [ + "derive_more-impl 1.0.0", ] [[package]] @@ -738,7 +553,19 @@ version = "2.1.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d751e9e49156b02b44f9c1815bcb94b984cdcc4396ecc32521c739452808b134" dependencies = [ - "derive_more-impl", + "derive_more-impl 2.1.1", +] + +[[package]] +name = "derive_more-impl" +version = "1.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cb7330aeadfbe296029522e6c40f315320aba36fc43a5b3632f3795348f3bd22" +dependencies = [ + "proc-macro2", + "quote", + "syn", + "unicode-xid", ] [[package]] @@ -751,7 +578,7 @@ dependencies = [ "proc-macro2", "quote", "rustc_version", - "syn 2.0.117", + "syn", "unicode-xid", ] @@ -761,25 +588,15 @@ version = "0.2.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ab03c107fafeb3ee9f5925686dbb7a73bc76e3932abb0d2b365cb64b169cf04c" -[[package]] -name = "digest" -version = "0.10.7" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9ed9a281f7bc9b7576e61468ba615a66a5c8cfdff42420a70aa82701a3b1e292" -dependencies = [ - "block-buffer 0.10.4", - "crypto-common 0.1.7", -] - [[package]] name = "digest" version = "0.11.0-rc.10" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "afa94b64bfc6549e6e4b5a3216f22593224174083da7a90db47e951c4fb31725" dependencies = [ - "block-buffer 0.11.0", + "block-buffer", "const-oid", - "crypto-common 0.2.1", + "crypto-common", ] [[package]] @@ -802,7 +619,7 @@ checksum = "97369cbbc041bc366949bc74d34658d6cda5621039731c6310521892a3a20ae0" dependencies = [ "proc-macro2", "quote", - "syn 2.0.117", + "syn", ] [[package]] @@ -858,12 +675,18 @@ dependencies = [ "ed25519", "rand_core", "serde", - "sha2 0.11.0-rc.2", + "sha2", "signature", "subtle", "zeroize", ] +[[package]] +name = "either" +version = "1.15.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "48c757948c5ede0e46177b7add2e67155f70e33c07fea8284df6576da70b3719" + [[package]] name = "embedded-io" version = "0.4.0" @@ -894,7 +717,7 @@ dependencies = [ "heck", "proc-macro2", "quote", - "syn 2.0.117", + "syn", ] [[package]] @@ -905,7 +728,7 @@ checksum = "3ed8956bd5c1f0415200516e78ff07ec9e16415ade83c056c230d7b7ea0d55b7" dependencies = [ "proc-macro2", "quote", - "syn 2.0.117", + "syn", ] [[package]] @@ -924,39 +747,6 @@ dependencies = [ "windows-sys 0.61.2", ] -[[package]] -name = "event-listener" -version = "5.4.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e13b66accf52311f30a0db42147dadea9850cb48cd070028831ae5f5d4b856ab" -dependencies = [ - "concurrent-queue", - "parking", - "pin-project-lite", -] - -[[package]] -name = "event-listener-strategy" -version = "0.5.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8be9f3dfaaffdae2972880079a491a1a8bb7cbed0b8dd7a347f668b4150a3b93" -dependencies = [ - "event-listener", - "pin-project-lite", -] - -[[package]] -name = "fallible-iterator" -version = "0.3.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2acce4a10f12dc2fb14a218589d4f1f62ef011b2d0cc4b3cb1bba8e94da14649" - -[[package]] -name = "fallible-streaming-iterator" -version = "0.1.9" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7360491ce676a36bf9bb3c56c1aa791658183a54d2744120f27285738d90465a" - [[package]] name = "fastbloom" version = "0.14.1" @@ -1136,7 +926,7 @@ checksum = "e835b70203e41293343137df5c0664546da5745f82ec9b84d40be8336958447b" dependencies = [ "proc-macro2", "quote", - "syn 2.0.117", + "syn", ] [[package]] @@ -1168,37 +958,6 @@ dependencies = [ "slab", ] -[[package]] -name = "genawaiter" -version = "0.99.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c86bd0361bcbde39b13475e6e36cb24c329964aa2611be285289d1e4b751c1a0" -dependencies = [ - "futures-core", - "genawaiter-macro", - "genawaiter-proc-macro", - "proc-macro-hack", -] - -[[package]] -name = "genawaiter-macro" -version = "0.99.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0b32dfe1fdfc0bbde1f22a5da25355514b5e450c33a6af6770884c8750aedfbc" - -[[package]] -name = "genawaiter-proc-macro" -version = "0.99.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "784f84eebc366e15251c4a8c3acee82a6a6f427949776ecb88377362a9621738" -dependencies = [ - "proc-macro-error", - "proc-macro-hack", - "proc-macro2", - "quote", - "syn 1.0.109", -] - [[package]] name = "generator" version = "0.8.8" @@ -1214,16 +973,6 @@ dependencies = [ "windows-result", ] -[[package]] -name = "generic-array" -version = "0.14.7" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "85649ca51fd72272d7821adaf274ad91c288277713d9c18820d8499a7ff69e9a" -dependencies = [ - "typenum", - "version_check", -] - [[package]] name = "getrandom" version = "0.2.17" @@ -1306,15 +1055,6 @@ dependencies = [ "byteorder", ] -[[package]] -name = "hashbrown" -version = "0.14.5" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e5274423e17b7c9fc20b6e7e208532f9b19825d82dfd615708b70edd83df41f1" -dependencies = [ - "ahash", -] - [[package]] name = "hashbrown" version = "0.15.5" @@ -1335,15 +1075,6 @@ dependencies = [ "foldhash 0.2.0", ] -[[package]] -name = "hashlink" -version = "0.9.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6ba4ff7128dee98c7dc9794b6a411377e1404dba1c97deb8d1a55297bd25d8af" -dependencies = [ - "hashbrown 0.14.5", -] - [[package]] name = "heapless" version = "0.7.17" @@ -1735,15 +1466,6 @@ dependencies = [ "serde_core", ] -[[package]] -name = "inplace-vec-builder" -version = "0.1.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cf64c2edc8226891a71f127587a2861b132d2b942310843814d5001d99a1d307" -dependencies = [ - "smallvec", -] - [[package]] name = "ipconfig" version = "0.3.2" @@ -1782,21 +1504,21 @@ dependencies = [ "bytes", "cfg_aliases", "data-encoding", - "derive_more", + "derive_more 2.1.1", "ed25519-dalek", "futures-util", "getrandom 0.3.4", "hickory-resolver", "http", "igd-next", - "iroh-base 0.96.1", + "iroh-base", "iroh-metrics", "iroh-quinn", "iroh-quinn-proto", "iroh-quinn-udp", "iroh-relay", "n0-error", - "n0-future", + "n0-future 0.3.2", "n0-watcher", "netdev", "netwatch", @@ -1813,7 +1535,7 @@ dependencies = [ "rustls-webpki", "serde", "smallvec", - "strum 0.27.2", + "strum", "sync_wrapper", "time", "tokio", @@ -1825,24 +1547,6 @@ dependencies = [ "webpki-roots", ] -[[package]] -name = "iroh-base" -version = "0.95.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "25a8c5fb1cc65589f0d7ab44269a76f615a8c4458356952c9b0ef1c93ea45ff8" -dependencies = [ - "curve25519-dalek", - "data-encoding", - "derive_more", - "ed25519-dalek", - "n0-error", - "rand_core", - "serde", - "url", - "zeroize", - "zeroize_derive", -] - [[package]] name = "iroh-base" version = "0.96.1" @@ -1851,98 +1555,18 @@ checksum = "20c99d836a1c99e037e98d1bf3ef209c3a4df97555a00ce9510eb78eccdf5567" dependencies = [ "curve25519-dalek", "data-encoding", - "derive_more", - "digest 0.11.0-rc.10", + "derive_more 2.1.1", + "digest", "ed25519-dalek", "n0-error", "rand_core", "serde", - "sha2 0.11.0-rc.2", + "sha2", "url", "zeroize", "zeroize_derive", ] -[[package]] -name = "iroh-blobs" -version = "0.98.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f1f253ea06293e51e166a88a3faa019b67e187d12bd7c6a04369a0ec86f53272" -dependencies = [ - "arrayvec", - "bao-tree", - "bytes", - "cfg_aliases", - "chrono", - "data-encoding", - "derive_more", - "futures-lite", - "genawaiter", - "hex", - "iroh", - "iroh-base 0.96.1", - "iroh-io", - "iroh-metrics", - "iroh-quinn", - "iroh-tickets 0.3.0", - "irpc", - "n0-error", - "n0-future", - "nested_enum_utils", - "postcard", - "rand", - "range-collections", - "redb", - "ref-cast", - "reflink-copy", - "self_cell", - "serde", - "smallvec", - "tokio", - "tracing", -] - -[[package]] -name = "iroh-docs" -version = "0.96.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "42292da17d6be0b73c5897f1ff395ad7c6f858b107ff76a7605867fbdd6c2e72" -dependencies = [ - "anyhow", - "async-channel", - "blake3", - "bytes", - "derive_more", - "ed25519-dalek", - "futures-buffered", - "futures-lite", - "futures-util", - "hex", - "iroh", - "iroh-blobs", - "iroh-gossip", - "iroh-metrics", - "iroh-quinn", - "iroh-tickets 0.2.0", - "irpc", - "n0-error", - "n0-future", - "num_enum", - "postcard", - "rand", - "redb", - "self_cell", - "serde", - "serde-error", - "strum 0.26.3", - "tempfile", - "thiserror 2.0.18", - "tokio", - "tokio-stream", - "tokio-util", - "tracing", -] - [[package]] name = "iroh-gossip" version = "0.96.0" @@ -1952,7 +1576,7 @@ dependencies = [ "blake3", "bytes", "data-encoding", - "derive_more", + "derive_more 2.1.1", "ed25519-dalek", "futures-concurrency", "futures-lite", @@ -1960,11 +1584,11 @@ dependencies = [ "hex", "indexmap", "iroh", - "iroh-base 0.96.1", + "iroh-base", "iroh-metrics", "irpc", "n0-error", - "n0-future", + "n0-future 0.3.2", "postcard", "rand", "serde", @@ -1973,19 +1597,6 @@ dependencies = [ "tracing", ] -[[package]] -name = "iroh-io" -version = "0.6.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e0a5feb781017b983ff1b155cd1faf8174da2acafd807aa482876da2d7e6577a" -dependencies = [ - "bytes", - "futures-lite", - "pin-project", - "smallvec", - "tokio", -] - [[package]] name = "iroh-metrics" version = "0.38.3" @@ -2011,7 +1622,7 @@ dependencies = [ "heck", "proc-macro2", "quote", - "syn 2.0.117", + "syn", ] [[package]] @@ -2042,7 +1653,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0de99ad8adc878ee0e68509ad256152ce23b8bbe45f5539d04e179630aca40a9" dependencies = [ "bytes", - "derive_more", + "derive_more 2.1.1", "enum-assoc", "fastbloom", "getrandom 0.3.4", @@ -2053,7 +1664,6 @@ dependencies = [ "rustc-hash", "rustls", "rustls-pki-types", - "rustls-platform-verifier", "slab", "sorted-index-buffer", "thiserror 2.0.18", @@ -2085,20 +1695,20 @@ dependencies = [ "bytes", "cfg_aliases", "data-encoding", - "derive_more", + "derive_more 2.1.1", "getrandom 0.3.4", "hickory-resolver", "http", "http-body-util", "hyper", "hyper-util", - "iroh-base 0.96.1", + "iroh-base", "iroh-metrics", "iroh-quinn", "iroh-quinn-proto", "lru", "n0-error", - "n0-future", + "n0-future 0.3.2", "num_enum", "pin-project", "pkarr", @@ -2109,7 +1719,7 @@ dependencies = [ "rustls-pki-types", "serde", "serde_bytes", - "strum 0.27.2", + "strum", "tokio", "tokio-rustls", "tokio-util", @@ -2122,51 +1732,17 @@ dependencies = [ "z32", ] -[[package]] -name = "iroh-tickets" -version = "0.2.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1a322053cacddeca222f0999ce3cf6aa45c64ae5ad8c8911eac9b66008ffbaa5" -dependencies = [ - "data-encoding", - "derive_more", - "iroh-base 0.95.1", - "n0-error", - "postcard", - "serde", -] - -[[package]] -name = "iroh-tickets" -version = "0.3.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "09cd580bf680db919cbbce6886a47314acb0e9b4f7b639acebcea5e9f485d183" -dependencies = [ - "data-encoding", - "derive_more", - "iroh-base 0.96.1", - "n0-error", - "postcard", - "serde", -] - [[package]] name = "irpc" version = "0.12.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f1bbc84aaeab13a6d7502bae4f40f2517b643924842e0230ea0bf807477cc208" dependencies = [ - "futures-buffered", "futures-util", - "iroh-quinn", "irpc-derive", "n0-error", - "n0-future", - "postcard", - "rcgen", - "rustls", + "n0-future 0.3.2", "serde", - "smallvec", "tokio", "tokio-util", "tracing", @@ -2180,26 +1756,16 @@ checksum = "58148196d2230183c9679431ac99b57e172000326d664e8456fa2cd27af6505a" dependencies = [ "proc-macro2", "quote", - "syn 2.0.117", + "syn", ] [[package]] -name = "is-docker" -version = "0.2.0" +name = "itertools" +version = "0.14.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "928bae27f42bc99b60d9ac7334e3a21d10ad8f1835a4e12ec3ec0464765ed1b3" +checksum = "2b192c782037fadd9cfa75548310488aabdbf3d2da73885b31bd0abd03351285" dependencies = [ - "once_cell", -] - -[[package]] -name = "is-wsl" -version = "0.4.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "173609498df190136aa7dea1a91db051746d339e18476eed5ca40521f02d7aa5" -dependencies = [ - "is-docker", - "once_cell", + "either", ] [[package]] @@ -2274,17 +1840,6 @@ version = "0.2.16" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b6d2cec3eae94f9f509c767b45932f1ada8350c4bdb85af2fcab4a3c14807981" -[[package]] -name = "libsqlite3-sys" -version = "0.30.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2e99fb7a497b1e3339bc746195567ed8d3e24945ecd636e3619d20b9de9e9149" -dependencies = [ - "cc", - "pkg-config", - "vcpkg", -] - [[package]] name = "linux-raw-sys" version = "0.12.1" @@ -2361,12 +1916,6 @@ dependencies = [ "regex-automata", ] -[[package]] -name = "matchit" -version = "0.8.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "47e1ffaa40ddd1f3ed91f717a33c8c0ee23fff369e3aa8772b9605cc1d22f4c3" - [[package]] name = "memchr" version = "2.8.0" @@ -2379,22 +1928,6 @@ version = "0.3.17" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6877bb514081ee2a7ff5ef9de3281f14a4dd4bceac4c09388074a6b5df8a139a" -[[package]] -name = "mime_guess" -version = "2.0.5" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f7c44f8e672c00fe5308fa235f821cb4198414e1c77935c1ab6948d3fd78550e" -dependencies = [ - "mime", - "unicase", -] - -[[package]] -name = "minimal-lexical" -version = "0.2.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "68354c5c6bd36d73ff3feceb05efa59b6acb7626617f4962be322a825e61f79a" - [[package]] name = "mio" version = "1.1.1" @@ -2442,7 +1975,28 @@ checksum = "03755949235714b2b307e5ae89dd8c1c2531fb127d9b8b7b4adf9c876cd3ed18" dependencies = [ "proc-macro2", "quote", - "syn 2.0.117", + "syn", +] + +[[package]] +name = "n0-future" +version = "0.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7bb0e5d99e681ab3c938842b96fcb41bf8a7bb4bfdb11ccbd653a7e83e06c794" +dependencies = [ + "cfg_aliases", + "derive_more 1.0.0", + "futures-buffered", + "futures-lite", + "futures-util", + "js-sys", + "pin-project", + "send_wrapper", + "tokio", + "tokio-util", + "wasm-bindgen", + "wasm-bindgen-futures", + "web-time", ] [[package]] @@ -2452,7 +2006,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e2ab99dfb861450e68853d34ae665243a88b8c493d01ba957321a1e9b2312bbe" dependencies = [ "cfg_aliases", - "derive_more", + "derive_more 2.1.1", "futures-buffered", "futures-lite", "futures-util", @@ -2472,9 +2026,9 @@ version = "0.6.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "38795f7932e6e9d1c6e989270ef5b3ff24ebb910e2c9d4bed2d28d8bae3007dc" dependencies = [ - "derive_more", + "derive_more 2.1.1", "n0-error", - "n0-future", + "n0-future 0.3.2", ] [[package]] @@ -2494,18 +2048,6 @@ dependencies = [ "tempfile", ] -[[package]] -name = "nested_enum_utils" -version = "0.2.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b1d5475271bdd36a4a2769eac1ef88df0f99428ea43e52dfd8b0ee5cb674695f" -dependencies = [ - "proc-macro-crate", - "proc-macro2", - "quote", - "syn 2.0.117", -] - [[package]] name = "netdev" version = "0.40.1" @@ -2597,12 +2139,12 @@ dependencies = [ "atomic-waker", "bytes", "cfg_aliases", - "derive_more", + "derive_more 2.1.1", "iroh-quinn-udp", "js-sys", "libc", "n0-error", - "n0-future", + "n0-future 0.3.2", "n0-watcher", "netdev", "netlink-packet-core", @@ -2624,16 +2166,6 @@ dependencies = [ "wmi", ] -[[package]] -name = "nom" -version = "7.1.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d273983c5a657a70a3e8f2a01329822f3b8c8172b73826411a55751e404a0a4a" -dependencies = [ - "memchr", - "minimal-lexical", -] - [[package]] name = "ntimestamp" version = "1.0.0" @@ -2658,31 +2190,12 @@ dependencies = [ "windows-sys 0.61.2", ] -[[package]] -name = "num-bigint" -version = "0.4.6" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a5e44f723f1133c9deac646763579fdb3ac745e418f2a7af9cd0c431da1f20b9" -dependencies = [ - "num-integer", - "num-traits", -] - [[package]] name = "num-conv" version = "0.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "cf97ec579c3c42f953ef76dbf8d55ac91fb219dde70e49aa4a6b7d74e9919050" -[[package]] -name = "num-integer" -version = "0.1.46" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7969661fd2958a5cb096e56c8e1ad0444ac2bbcd0061bd28660485a44879858f" -dependencies = [ - "num-traits", -] - [[package]] name = "num-traits" version = "0.2.19" @@ -2711,7 +2224,7 @@ dependencies = [ "proc-macro-crate", "proc-macro2", "quote", - "syn 2.0.117", + "syn", ] [[package]] @@ -2776,15 +2289,6 @@ dependencies = [ "objc2-security", ] -[[package]] -name = "oid-registry" -version = "0.8.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "12f40cff3dde1b6087cc5d5f5d4d65712f34016a03ed60e9c08dcc392736b5b7" -dependencies = [ - "asn1-rs", -] - [[package]] name = "once_cell" version = "1.21.3" @@ -2795,17 +2299,6 @@ dependencies = [ "portable-atomic", ] -[[package]] -name = "open" -version = "5.3.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "43bb73a7fa3799b198970490a51174027ba0d4ec504b03cd08caf513d40024bc" -dependencies = [ - "is-wsl", - "libc", - "pathdiff", -] - [[package]] name = "openssl" version = "0.10.76" @@ -2829,7 +2322,7 @@ checksum = "a948666b637a0f465e8564c73e89d4dde00d72d4d473cc972f390fc3dcee7d9c" dependencies = [ "proc-macro2", "quote", - "syn 2.0.117", + "syn", ] [[package]] @@ -2895,22 +2388,6 @@ version = "1.0.15" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "57c0d7b74b563b49d38dae00a0c37d4d6de9b432382b2892f0574ddcae73fd0a" -[[package]] -name = "pathdiff" -version = "0.2.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "df94ce210e5bc13cb6651479fa48d14f601d9858cfe0467f43ae157023b938d3" - -[[package]] -name = "pem" -version = "3.0.6" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1d30c53c26bc5b31a98cd02d20f25a7c8567146caf63ed593a9d87b2775291be" -dependencies = [ - "base64", - "serde_core", -] - [[package]] name = "pem-rfc7468" version = "1.0.0" @@ -2953,7 +2430,7 @@ checksum = "d9b20ed30f105399776b9c883e68e536ef602a16ae6f596d2c473591d6ad64c6" dependencies = [ "proc-macro2", "quote", - "syn 2.0.117", + "syn", ] [[package]] @@ -3045,7 +2522,7 @@ checksum = "7d2a8825353ace3285138da3378b1e21860d60351942f7aa3b99b13b41f80318" dependencies = [ "base64", "bytes", - "derive_more", + "derive_more 2.1.1", "futures-lite", "futures-util", "hyper-util", @@ -3067,16 +2544,6 @@ dependencies = [ "url", ] -[[package]] -name = "positioned-io" -version = "0.3.5" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d4ec4b80060f033312b99b6874025d9503d2af87aef2dd4c516e253fbfcdada7" -dependencies = [ - "libc", - "winapi", -] - [[package]] name = "postcard" version = "1.1.3" @@ -3099,7 +2566,7 @@ checksum = "e0232bd009a197ceec9cc881ba46f727fcd8060a2d8d6a9dde7a69030a6fe2bb" dependencies = [ "proc-macro2", "quote", - "syn 2.0.117", + "syn", ] [[package]] @@ -3133,7 +2600,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "479ca8adacdd7ce8f1fb39ce9ecccbfe93a3f1344b3d0d97f20bc0196208f62b" dependencies = [ "proc-macro2", - "syn 2.0.117", + "syn", ] [[package]] @@ -3145,38 +2612,6 @@ dependencies = [ "toml_edit", ] -[[package]] -name = "proc-macro-error" -version = "0.4.12" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "18f33027081eba0a6d8aba6d1b1c3a3be58cbb12106341c2d5759fcd9b5277e7" -dependencies = [ - "proc-macro-error-attr", - "proc-macro2", - "quote", - "syn 1.0.109", - "version_check", -] - -[[package]] -name = "proc-macro-error-attr" -version = "0.4.12" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8a5b4b77fdb63c1eca72173d68d24501c54ab1269409f6b672c85deb18af69de" -dependencies = [ - "proc-macro2", - "quote", - "syn 1.0.109", - "syn-mid", - "version_check", -] - -[[package]] -name = "proc-macro-hack" -version = "0.5.20+deprecated" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "dc375e1527247fe1a97d8b7156678dfe7c1af2fc075c9a4db3690ecd2a148068" - [[package]] name = "proc-macro2" version = "1.0.106" @@ -3186,6 +2621,29 @@ dependencies = [ "unicode-ident", ] +[[package]] +name = "prost" +version = "0.13.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2796faa41db3ec313a31f7624d9286acf277b52de526150b7e69f3debf891ee5" +dependencies = [ + "bytes", + "prost-derive", +] + +[[package]] +name = "prost-derive" +version = "0.13.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8a56d757972c98b346a9b766e3f02746cde6dd1cd1d1d563472929fdd74bec4d" +dependencies = [ + "anyhow", + "itertools", + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "quick-xml" version = "0.38.4" @@ -3301,42 +2759,6 @@ dependencies = [ "getrandom 0.3.4", ] -[[package]] -name = "range-collections" -version = "0.4.6" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "861706ea9c4aded7584c5cd1d241cec2ea7f5f50999f236c22b65409a1f1a0d0" -dependencies = [ - "binary-merge", - "inplace-vec-builder", - "ref-cast", - "serde", - "smallvec", -] - -[[package]] -name = "rcgen" -version = "0.14.7" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "10b99e0098aa4082912d4c649628623db6aba77335e4f4569ff5083a6448b32e" -dependencies = [ - "pem", - "ring", - "rustls-pki-types", - "time", - "x509-parser", - "yasna", -] - -[[package]] -name = "redb" -version = "2.6.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8eca1e9d98d5a7e9002d0013e18d5a9b000aee942eb134883a82f06ebffb6c01" -dependencies = [ - "libc", -] - [[package]] name = "redox_syscall" version = "0.5.18" @@ -3346,38 +2768,6 @@ dependencies = [ "bitflags", ] -[[package]] -name = "ref-cast" -version = "1.0.25" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f354300ae66f76f1c85c5f84693f0ce81d747e2c3f21a45fef496d89c960bf7d" -dependencies = [ - "ref-cast-impl", -] - -[[package]] -name = "ref-cast-impl" -version = "1.0.25" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b7186006dcb21920990093f30e3dea63b7d6e977bf1256be20c3563a5db070da" -dependencies = [ - "proc-macro2", - "quote", - "syn 2.0.117", -] - -[[package]] -name = "reflink-copy" -version = "0.1.29" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "13362233b147e57674c37b802d216b7c5e3dcccbed8967c84f0d8d223868ae27" -dependencies = [ - "cfg-if", - "libc", - "rustix", - "windows", -] - [[package]] name = "regex-automata" version = "0.4.14" @@ -3417,7 +2807,6 @@ dependencies = [ "js-sys", "log", "mime", - "mime_guess", "native-tls", "percent-encoding", "pin-project-lite", @@ -3498,20 +2887,6 @@ dependencies = [ "windows-sys 0.52.0", ] -[[package]] -name = "rusqlite" -version = "0.32.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7753b721174eb8ff87a9a0e799e2d7bc3749323e773db92e0984debb00019d6e" -dependencies = [ - "bitflags", - "fallible-iterator", - "fallible-streaming-iterator", - "hashlink", - "libsqlite3-sys", - "smallvec", -] - [[package]] name = "rustc-hash" version = "2.1.1" @@ -3527,15 +2902,6 @@ dependencies = [ "semver", ] -[[package]] -name = "rusticata-macros" -version = "4.1.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "faf0c4a6ece9950b9abdb62b1cfcf2a68b3b67a10ba445b3bb85be2a293d0632" -dependencies = [ - "nom", -] - [[package]] name = "rustix" version = "1.1.4" @@ -3729,15 +3095,6 @@ dependencies = [ "serde_derive", ] -[[package]] -name = "serde-error" -version = "0.1.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "342110fb7a5d801060c885da03bf91bfa7c7ca936deafcc64bb6706375605d47" -dependencies = [ - "serde", -] - [[package]] name = "serde_bytes" version = "0.11.19" @@ -3765,7 +3122,7 @@ checksum = "d540f220d3187173da220f885ab66608367b6574e925011a9353e4badda91d79" dependencies = [ "proc-macro2", "quote", - "syn 2.0.117", + "syn", ] [[package]] @@ -3781,17 +3138,6 @@ dependencies = [ "zmij", ] -[[package]] -name = "serde_path_to_error" -version = "0.1.20" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "10a9ff822e371bb5403e391ecd83e182e0e77ba7f6fe0160b795797109d1b457" -dependencies = [ - "itoa", - "serde", - "serde_core", -] - [[package]] name = "serde_urlencoded" version = "0.7.1" @@ -3823,17 +3169,6 @@ version = "1.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "bbfa15b3dddfee50a0fff136974b3e1bde555604ba463834a7eb7deb6417705d" -[[package]] -name = "sha2" -version = "0.10.9" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a7507d819769d01a365ab707794a4084392c824f54a7a6a7862f8c3d0892b283" -dependencies = [ - "cfg-if", - "cpufeatures", - "digest 0.10.7", -] - [[package]] name = "sha2" version = "0.11.0-rc.2" @@ -3842,7 +3177,7 @@ checksum = "d1e3878ab0f98e35b2df35fe53201d088299b41a6bb63e3e34dada2ac4abd924" dependencies = [ "cfg-if", "cpufeatures", - "digest 0.11.0-rc.10", + "digest", ] [[package]] @@ -3908,9 +3243,6 @@ name = "smallvec" version = "1.15.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "67b1b7a3b5fe4f1376887184045fcf45c69e92af734b7aaddc05fb777b6fbd03" -dependencies = [ - "serde", -] [[package]] name = "socket2" @@ -3946,7 +3278,7 @@ checksum = "c87e960f4dca2788eeb86bbdde8dd246be8948790b7618d656e68f9b720a86e8" dependencies = [ "proc-macro2", "quote", - "syn 2.0.117", + "syn", ] [[package]] @@ -3986,35 +3318,13 @@ version = "0.11.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7da8b5736845d9f2fcb837ea5d9e2628564b3b043a70948a3f0b778838c5fb4f" -[[package]] -name = "strum" -version = "0.26.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8fec0f0aef304996cf250b31b5a10dee7980c85da9d759361292b8bca5a18f06" -dependencies = [ - "strum_macros 0.26.4", -] - [[package]] name = "strum" version = "0.27.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "af23d6f6c1a224baef9d3f61e287d2761385a5b88fdab4eb4c6f11aeb54c4bcf" dependencies = [ - "strum_macros 0.27.2", -] - -[[package]] -name = "strum_macros" -version = "0.26.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4c6bee85a5a24955dc440386795aa378cd9cf82acd5f764469152d2270e581be" -dependencies = [ - "heck", - "proc-macro2", - "quote", - "rustversion", - "syn 2.0.117", + "strum_macros", ] [[package]] @@ -4026,7 +3336,7 @@ dependencies = [ "heck", "proc-macro2", "quote", - "syn 2.0.117", + "syn", ] [[package]] @@ -4035,17 +3345,6 @@ version = "2.6.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "13c2bddecc57b384dee18652358fb23172facb8a2c51ccc10d74c157bdea3292" -[[package]] -name = "syn" -version = "1.0.109" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "72b64191b275b66ffe2469e8af2c1cfe3bafa67b529ead792a6d0160888b4237" -dependencies = [ - "proc-macro2", - "quote", - "unicode-ident", -] - [[package]] name = "syn" version = "2.0.117" @@ -4057,17 +3356,6 @@ dependencies = [ "unicode-ident", ] -[[package]] -name = "syn-mid" -version = "0.5.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fea305d57546cc8cd04feb14b62ec84bf17f50e3f7b12560d7bfa9265f39d9ed" -dependencies = [ - "proc-macro2", - "quote", - "syn 1.0.109", -] - [[package]] name = "sync_wrapper" version = "1.0.2" @@ -4085,7 +3373,7 @@ checksum = "728a70f3dbaf5bab7f0c4b1ac8d7ae5ea60a4b5549c8a5914361c99147a709d2" dependencies = [ "proc-macro2", "quote", - "syn 2.0.117", + "syn", ] [[package]] @@ -4154,7 +3442,7 @@ checksum = "4fee6c4efc90059e10f81e6d42c60a18f76588c3d74cb83a0b242a2b6c7504c1" dependencies = [ "proc-macro2", "quote", - "syn 2.0.117", + "syn", ] [[package]] @@ -4165,7 +3453,7 @@ checksum = "ebc4ee7f67670e9b64d05fa4253e753e016c6c95ff35b89b7941d6b856dec1d5" dependencies = [ "proc-macro2", "quote", - "syn 2.0.117", + "syn", ] [[package]] @@ -4261,7 +3549,7 @@ checksum = "5c55a2eff8b69ce66c84f85e1da1c233edc36ceb85a2058d11b0d6a3c7e7569c" dependencies = [ "proc-macro2", "quote", - "syn 2.0.117", + "syn", ] [[package]] @@ -4375,7 +3663,6 @@ dependencies = [ "tokio", "tower-layer", "tower-service", - "tracing", ] [[package]] @@ -4428,7 +3715,7 @@ checksum = "7490cfa5ec963746568740651ac6781f701c9c5ea257c58e057f3ba8cf69e8da" dependencies = [ "proc-macro2", "quote", - "syn 2.0.117", + "syn", ] [[package]] @@ -4482,12 +3769,6 @@ version = "1.19.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "562d481066bde0658276a35467c4af00bdc6ee726305698a55b86e61d7ad82bb" -[[package]] -name = "unicase" -version = "2.9.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "dbc4bc3a9f746d862c45cb89d705aa10f187bb96c76001afab07a0d35ce60142" - [[package]] name = "unicode-ident" version = "1.0.24" @@ -4608,12 +3889,6 @@ dependencies = [ "rustversion", ] -[[package]] -name = "version_check" -version = "0.9.5" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0b928f33d975fc6ad9f86c8f283853ad26bdd5b10b7f1542aa2fa15e2289105a" - [[package]] name = "walkdir" version = "2.5.0" @@ -4703,7 +3978,7 @@ dependencies = [ "bumpalo", "proc-macro2", "quote", - "syn 2.0.117", + "syn", "wasm-bindgen-shared", ] @@ -4891,7 +4166,7 @@ checksum = "053e2e040ab57b9dc951b72c264860db7eb3b0200ba345b4e4c3b14f67855ddf" dependencies = [ "proc-macro2", "quote", - "syn 2.0.117", + "syn", ] [[package]] @@ -4902,7 +4177,7 @@ checksum = "3f316c4a2570ba26bbec722032c4099d8c8bc095efccdc15688708623367e358" dependencies = [ "proc-macro2", "quote", - "syn 2.0.117", + "syn", ] [[package]] @@ -5305,7 +4580,7 @@ dependencies = [ "heck", "indexmap", "prettyplease", - "syn 2.0.117", + "syn", "wasm-metadata", "wit-bindgen-core", "wit-component", @@ -5321,7 +4596,7 @@ dependencies = [ "prettyplease", "proc-macro2", "quote", - "syn 2.0.117", + "syn", "wit-bindgen-core", "wit-bindgen-rust", ] @@ -5403,24 +4678,6 @@ dependencies = [ "web-sys", ] -[[package]] -name = "x509-parser" -version = "0.18.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d43b0f71ce057da06bc0851b23ee24f3f86190b07203dd8f567d0b706a185202" -dependencies = [ - "asn1-rs", - "data-encoding", - "der-parser", - "lazy_static", - "nom", - "oid-registry", - "ring", - "rusticata-macros", - "thiserror 2.0.18", - "time", -] - [[package]] name = "xml-rs" version = "0.8.28" @@ -5436,15 +4693,6 @@ dependencies = [ "xml-rs", ] -[[package]] -name = "yasna" -version = "0.5.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e17bb3549cc1321ae1296b9cdc2698e2b6cb1992adfa19a8c72e5b7a738f44cd" -dependencies = [ - "time", -] - [[package]] name = "yoke" version = "0.8.1" @@ -5464,7 +4712,7 @@ checksum = "b659052874eb698efe5b9e8cf382204678a0086ebf46982b79d6ca3182927e5d" dependencies = [ "proc-macro2", "quote", - "syn 2.0.117", + "syn", "synstructure", ] @@ -5491,7 +4739,7 @@ checksum = "7e6cc098ea4d3bd6246687de65af3f920c430e236bee1e3bf2e441463f08a02f" dependencies = [ "proc-macro2", "quote", - "syn 2.0.117", + "syn", ] [[package]] @@ -5511,7 +4759,7 @@ checksum = "d71e5d6e06ab090c67b5e44993ec16b72dcbaabc526db883a360057678b48502" dependencies = [ "proc-macro2", "quote", - "syn 2.0.117", + "syn", "synstructure", ] @@ -5532,7 +4780,7 @@ checksum = "85a5b4158499876c763cb03bc4e49185d3cccbabb15b33c627f7884f43db852e" dependencies = [ "proc-macro2", "quote", - "syn 2.0.117", + "syn", ] [[package]] @@ -5565,7 +4813,7 @@ checksum = "eadce39539ca5cb3985590102671f2567e659fca9666581ad3411d59207951f3" dependencies = [ "proc-macro2", "quote", - "syn 2.0.117", + "syn", ] [[package]] diff --git a/examples/can-sync/Cargo.toml b/examples/can-sync/Cargo.toml index 06660aa..1f6d75e 100644 --- a/examples/can-sync/Cargo.toml +++ b/examples/can-sync/Cargo.toml @@ -1,44 +1,42 @@ [package] name = "can-sync" -version = "0.1.0" +version = "0.2.0" edition = "2021" -description = "P2P sync service for CAN content-addressable storage" +description = "P2P sync agent for CAN service — full mirror replication via iroh" [[bin]] name = "can-sync" path = "src/main.rs" [dependencies] -# P2P networking +# P2P networking (iroh for transport + gossip for discovery — NO iroh-docs) iroh = "0.96" -iroh-blobs = "0.98" -iroh-docs = "0.96" iroh-gossip = "0.96" -# HTTP server + client -axum = "0.8" -tokio = { version = "1", features = ["full"] } -reqwest = { version = "0.12", features = ["json", "multipart"] } -tower-http = { version = "0.6", features = ["cors"] } +# Protobuf (same message types as CAN service sync API) +prost = "0.13" + +# HTTP client for CAN service sync API +reqwest = { version = "0.12", features = ["json"] } # Serialization serde = { version = "1", features = ["derive"] } -serde_json = "1" serde_yaml = "0.9" -postcard = { version = "1", features = ["alloc"] } -# Storage -rusqlite = { version = "0.32", features = ["bundled"] } +# Crypto +blake3 = "1" -# Utilities +# Async runtime +tokio = { version = "1", features = ["full"] } + +# Logging tracing = "0.1" tracing-subscriber = { version = "0.3", features = ["env-filter"] } + +# Stream utilities (needed for gossip event stream) +n0-future = "0.1" + +# Utilities anyhow = "1" -open = "5" -sha2 = "0.10" -hex = "0.4" -uuid = { version = "1", features = ["v4"] } -chrono = { version = "0.4", features = ["serde"] } bytes = "1" -futures-lite = "2" -tokio-util = { version = "0.7", features = ["io"] } +hex = "0.4" diff --git a/examples/can-sync/config.yaml b/examples/can-sync/config.yaml index c5b804c..4638ff0 100644 --- a/examples/can-sync/config.yaml +++ b/examples/can-sync/config.yaml @@ -1,7 +1,13 @@ -# CAN Sync configuration -can_service_url: "http://127.0.0.1:3210/api/v1/can/0" -listen_addr: "127.0.0.1:3213" -data_dir: "./can_sync_data" -relay_url: null -poll_interval_secs: 5 -full_scan_interval_secs: 300 +# CAN Sync v2 configuration + +# URL of the local CAN Service (sync API is at /sync/*) +can_service_url: "http://127.0.0.1:3210" + +# API key for CAN service's sync endpoints (must match sync_api_key in CAN config) +sync_api_key: "changeme" + +# Shared passphrase for peer discovery (all peers must use the same one) +sync_passphrase: "my-shared-secret" + +# Seconds between polls for new local assets +poll_interval_secs: 3 diff --git a/examples/can-sync/src/announcer.rs b/examples/can-sync/src/announcer.rs deleted file mode 100644 index 58b9b68..0000000 --- a/examples/can-sync/src/announcer.rs +++ /dev/null @@ -1,234 +0,0 @@ -use std::sync::Arc; -use std::time::Duration; - -use anyhow::Result; -use tracing::{debug, error, info, warn}; - -use crate::can_client::CanClient; -use crate::library::SyncState; -use crate::manifest::AssetSyncEntry; -use crate::node::SyncNode; - -/// The announcer periodically polls CAN service for new or changed assets -/// and writes matching entries into iroh library documents. -pub struct Announcer { - can: CanClient, - state: Arc, - node: Arc, - poll_interval: Duration, - full_scan_interval: Duration, -} - -impl Announcer { - pub fn new( - can: CanClient, - state: Arc, - node: Arc, - poll_interval_secs: u64, - full_scan_interval_secs: u64, - ) -> Self { - Self { - can, - state, - node, - poll_interval: Duration::from_secs(poll_interval_secs), - full_scan_interval: Duration::from_secs(full_scan_interval_secs), - } - } - - /// Run the announcer loop — fast polls + periodic full scans - pub async fn run(self) { - let mut fast_tick = tokio::time::interval(self.poll_interval); - let mut full_tick = tokio::time::interval(self.full_scan_interval); - // Skip the first immediate tick for full scan (let fast poll get first data) - full_tick.tick().await; - - info!( - "Announcer started (fast poll: {}s, full scan: {}s)", - self.poll_interval.as_secs(), - self.full_scan_interval.as_secs(), - ); - - loop { - tokio::select! { - _ = fast_tick.tick() => { - if let Err(e) = self.fast_poll().await { - warn!("Fast poll error: {:#}", e); - } - } - _ = full_tick.tick() => { - if let Err(e) = self.full_scan().await { - warn!("Full scan error: {:#}", e); - } - } - } - } - } - - /// Fast poll: check for recently ingested assets - async fn fast_poll(&self) -> Result<()> { - let last_ts = self - .state - .get_state("last_seen_timestamp")? - .and_then(|s| s.parse::().ok()) - .unwrap_or(0); - - // Get recent assets ordered newest first - let resp = self.can.list(50, 0, "desc", Some(last_ts)).await?; - - if resp.items.is_empty() { - return Ok(()); - } - - debug!("Fast poll found {} new assets since ts={}", resp.items.len(), last_ts); - - // Track the newest timestamp we see - let mut max_ts = last_ts; - for asset in &resp.items { - if asset.timestamp > max_ts { - max_ts = asset.timestamp; - } - } - - // Process assets against libraries - let libraries = self.state.list_libraries()?; - - for asset in &resp.items { - for lib in &libraries { - if lib.filter.matches(asset) { - self.announce_asset(lib, asset).await?; - } - } - } - - // Update last seen timestamp - self.state.set_state("last_seen_timestamp", &max_ts.to_string())?; - - Ok(()) - } - - /// Full scan: paginate through all assets, checking for metadata changes - async fn full_scan(&self) -> Result<()> { - info!("Starting full scan..."); - - let libraries = self.state.list_libraries()?; - - if libraries.is_empty() { - debug!("No libraries configured, skipping full scan"); - return Ok(()); - } - - let page_size = 100; - let mut offset = 0; - let mut total_scanned = 0; - let mut total_announced = 0; - - loop { - let resp = self.can.list_all(page_size, offset, true).await?; - let count = resp.items.len(); - total_scanned += count; - - for asset in &resp.items { - for lib in &libraries { - if lib.filter.matches(asset) { - let was_new = self.announce_asset(lib, asset).await?; - if was_new { - total_announced += 1; - } - } - } - } - - if (count as i64) < page_size { - break; - } - offset += page_size; - } - - info!( - "Full scan complete: scanned {}, announced {} new/updated", - total_scanned, total_announced - ); - Ok(()) - } - - /// Announce a single asset to a library's iroh document. - /// Returns true if the asset was newly announced or updated. - async fn announce_asset( - &self, - lib: &crate::library::Library, - asset: &crate::can_client::AssetMeta, - ) -> Result { - let doc_id = match &lib.doc_id { - Some(id) => id.clone(), - None => { - debug!("Library '{}' has no doc_id yet, skipping", lib.name); - return Ok(false); - } - }; - - // Check if already announced at current version - if self.state.is_announced(&lib.id, &asset.hash)? { - // Already announced — skip unless metadata changed - // (full scan handles re-announcement on metadata change) - return Ok(false); - } - - // Download file content from CAN service and add as iroh blob - let iroh_blob_hash = match self.can.get_asset(&asset.hash).await { - Ok(content) => { - // Add to iroh blob store so remote peers can download it - match self.node.blobs.add_bytes(content).await { - Ok(tag_info) => Some(tag_info.hash.to_string()), - Err(e) => { - warn!( - "Failed to add blob for asset {}: {:#}", - &asset.hash[..12], - e - ); - None - } - } - } - Err(e) => { - warn!( - "Failed to download asset {} from CAN service: {:#}", - &asset.hash[..12], - e - ); - None - } - }; - - // Create sync entry with the iroh blob hash - let mut entry = AssetSyncEntry::from_asset_meta(asset, &self.node.peer_id()); - entry.iroh_blob_hash = iroh_blob_hash; - let entry_bytes = entry.to_bytes(); - - // Write to iroh document (CRDT — concurrent writes merge automatically) - if let Err(e) = self - .node - .write_to_doc(&doc_id, asset.hash.as_bytes(), &entry_bytes) - .await - { - error!( - "Failed to write asset {} to doc {}: {:#}", - &asset.hash[..12], - &doc_id[..12], - e - ); - return Ok(false); - } - - // Mark as announced in local state - self.state.mark_announced(&lib.id, &asset.hash, entry.version)?; - - debug!( - "Announced asset {} to library '{}' (doc {})", - &asset.hash[..12], - lib.name, - &doc_id[..12] - ); - Ok(true) - } -} diff --git a/examples/can-sync/src/can_client.rs b/examples/can-sync/src/can_client.rs index 12d1af6..8ab0bf8 100644 --- a/examples/can-sync/src/can_client.rs +++ b/examples/can-sync/src/can_client.rs @@ -1,291 +1,104 @@ +//! HTTP client for CAN service's private sync API (protobuf-encoded). + use anyhow::{Context, Result}; -use bytes::Bytes; -use reqwest::multipart; -use serde::{Deserialize, Serialize}; +use prost::Message; -/// HTTP client for CAN service API -#[derive(Debug, Clone)] -pub struct CanClient { - client: reqwest::Client, +use crate::protocol::*; + +/// Client for CAN service's /sync/* endpoints. +#[derive(Clone)] +pub struct CanSyncClient { + http: reqwest::Client, base_url: String, + sync_key: String, } -// ── API response types (mirror CAN service) ── - -#[derive(Debug, Deserialize)] -pub struct ApiResponse { - pub status: String, - pub data: T, -} - -#[derive(Debug, Deserialize)] -pub struct ErrorResponse { - pub status: String, - pub error: String, -} - -#[derive(Debug, Clone, Serialize, Deserialize)] -pub struct AssetMeta { - pub hash: String, - pub mime_type: String, - pub application: Option, - pub user: Option, - pub tags: Vec, - pub description: Option, - pub human_filename: Option, - pub human_path: Option, - pub timestamp: i64, - pub is_trashed: bool, - #[serde(default)] - pub is_corrupted: bool, - pub size: i64, -} - -#[derive(Debug, Deserialize)] -pub struct ListResponse { - pub items: Vec, - pub pagination: Pagination, -} - -#[derive(Debug, Deserialize)] -pub struct Pagination { - pub limit: i64, - pub offset: i64, - pub total: i64, -} - -#[derive(Debug, Deserialize)] -pub struct IngestResult { - pub timestamp: i64, - pub hash: String, - pub filename: String, -} - -// ── Search parameters ── - -#[derive(Debug, Default, Serialize)] -pub struct SearchParams { - #[serde(skip_serializing_if = "Option::is_none")] - pub hash: Option, - #[serde(skip_serializing_if = "Option::is_none")] - pub start_time: Option, - #[serde(skip_serializing_if = "Option::is_none")] - pub end_time: Option, - #[serde(skip_serializing_if = "Option::is_none")] - pub tags: Option, - #[serde(skip_serializing_if = "Option::is_none")] - pub mime_type: Option, - #[serde(skip_serializing_if = "Option::is_none")] - pub user: Option, - #[serde(skip_serializing_if = "Option::is_none")] - pub application: Option, - #[serde(skip_serializing_if = "Option::is_none")] - pub limit: Option, - #[serde(skip_serializing_if = "Option::is_none")] - pub offset: Option, - #[serde(skip_serializing_if = "Option::is_none")] - pub order: Option, - #[serde(skip_serializing_if = "Option::is_none")] - pub include_trashed: Option, -} - -// ── Ingest metadata ── - -#[derive(Debug, Default)] -pub struct IngestMeta { - pub mime_type: Option, - pub human_file_name: Option, - pub human_readable_path: Option, - pub application: Option, - pub user: Option, - pub tags: Option, - pub description: Option, -} - -// ── Client implementation ── - -impl CanClient { - pub fn new(base_url: &str) -> Self { +impl CanSyncClient { + pub fn new(base_url: &str, sync_key: &str) -> Self { Self { - client: reqwest::Client::new(), + http: reqwest::Client::new(), base_url: base_url.trim_end_matches('/').to_string(), + sync_key: sync_key.to_string(), } } - /// List assets with pagination and ordering - pub async fn list( - &self, - limit: i64, - offset: i64, - order: &str, - offset_time: Option, - ) -> Result { - let mut url = format!("{}/list?limit={}&offset={}&order={}", self.base_url, limit, offset, order); - if let Some(ts) = offset_time { - url.push_str(&format!("&offset_time={}", ts)); - } - let resp = self.client.get(&url).send().await.context("list request failed")?; - let status = resp.status(); - if !status.is_success() { - let text = resp.text().await.unwrap_or_default(); - anyhow::bail!("CAN list failed ({}): {}", status, text); - } - let api: ApiResponse = resp.json().await.context("parse list response")?; - Ok(api.data) - } - - /// List all assets (paginated, including trashed for full sync) - pub async fn list_all( - &self, - limit: i64, - offset: i64, - include_trashed: bool, - ) -> Result { - let mut url = format!("{}/list?limit={}&offset={}&order=asc", self.base_url, limit, offset); - if include_trashed { - url.push_str("&include_trashed=true"); - } - let resp = self.client.get(&url).send().await.context("list_all request failed")?; - let status = resp.status(); - if !status.is_success() { - let text = resp.text().await.unwrap_or_default(); - anyhow::bail!("CAN list_all failed ({}): {}", status, text); - } - let api: ApiResponse = resp.json().await.context("parse list_all response")?; - Ok(api.data) - } - - /// Search assets by filters - pub async fn search(&self, params: &SearchParams) -> Result { + /// POST protobuf request, return protobuf response bytes + async fn post_proto(&self, path: &str, body: Vec) -> Result { + let url = format!("{}{}", self.base_url, path); let resp = self - .client - .get(&format!("{}/search", self.base_url)) - .query(params) + .http + .post(&url) + .header("X-Sync-Key", &self.sync_key) + .header("Content-Type", "application/x-protobuf") + .body(body) .send() .await - .context("search request failed")?; - let status = resp.status(); - if !status.is_success() { + .with_context(|| format!("POST {}", url))?; + + if !resp.status().is_success() { + let status = resp.status(); let text = resp.text().await.unwrap_or_default(); - anyhow::bail!("CAN search failed ({}): {}", status, text); + anyhow::bail!("{} returned {}: {}", path, status, text); } - let api: ApiResponse = resp.json().await.context("parse search response")?; - Ok(api.data) + + resp.bytes().await.with_context(|| format!("reading body from {}", path)) } - /// Download asset content by hash - pub async fn get_asset(&self, hash: &str) -> Result { - let resp = self - .client - .get(&format!("{}/asset/{}", self.base_url, hash)) - .send() - .await - .context("get_asset request failed")?; - let status = resp.status(); - if !status.is_success() { - let text = resp.text().await.unwrap_or_default(); - anyhow::bail!("CAN get_asset failed ({}): {}", status, text); - } - resp.bytes().await.context("read asset bytes") + /// Get all asset digests for reconciliation. + pub async fn get_hashes(&self) -> Result { + let req = HashListRequest {}; + let mut buf = Vec::with_capacity(req.encoded_len()); + req.encode(&mut buf)?; + + let resp_bytes = self.post_proto("/sync/hashes", buf).await?; + HashListResponse::decode(resp_bytes).context("decode HashListResponse") } - /// Get asset metadata by hash - pub async fn get_meta(&self, hash: &str) -> Result { - let resp = self - .client - .get(&format!("{}/asset/{}/meta", self.base_url, hash)) - .send() - .await - .context("get_meta request failed")?; - let status = resp.status(); - if !status.is_success() { - let text = resp.text().await.unwrap_or_default(); - anyhow::bail!("CAN get_meta failed ({}): {}", status, text); - } - let api: ApiResponse = resp.json().await.context("parse meta response")?; - Ok(api.data) + /// Pull full assets by hash. + pub async fn pull(&self, hashes: Vec) -> Result { + let req = PullRequest { hashes }; + let mut buf = Vec::with_capacity(req.encoded_len()); + req.encode(&mut buf)?; + + let resp_bytes = self.post_proto("/sync/pull", buf).await?; + PullResponse::decode(resp_bytes).context("decode PullResponse") } - /// Ingest a file into CAN service via multipart upload - pub async fn ingest(&self, content: Bytes, meta: IngestMeta) -> Result { - let file_part = multipart::Part::bytes(content.to_vec()) - .file_name(meta.human_file_name.clone().unwrap_or_else(|| "file".to_string())) - .mime_str(meta.mime_type.as_deref().unwrap_or("application/octet-stream"))?; + /// Push a single asset bundle. + pub async fn push(&self, bundle: AssetBundle) -> Result { + let req = PushRequest { + bundle: Some(bundle), + }; + let mut buf = Vec::with_capacity(req.encoded_len()); + req.encode(&mut buf)?; - let mut form = multipart::Form::new().part("file", file_part); - - if let Some(ref v) = meta.mime_type { - form = form.text("mime_type", v.clone()); - } - if let Some(ref v) = meta.human_file_name { - form = form.text("human_file_name", v.clone()); - } - if let Some(ref v) = meta.human_readable_path { - form = form.text("human_readable_path", v.clone()); - } - if let Some(ref v) = meta.application { - form = form.text("application", v.clone()); - } - if let Some(ref v) = meta.user { - form = form.text("user", v.clone()); - } - if let Some(ref v) = meta.tags { - form = form.text("tags", v.clone()); - } - if let Some(ref v) = meta.description { - form = form.text("description", v.clone()); - } - - let resp = self - .client - .post(&format!("{}/ingest", self.base_url)) - .multipart(form) - .send() - .await - .context("ingest request failed")?; - let status = resp.status(); - if !status.is_success() { - let text = resp.text().await.unwrap_or_default(); - anyhow::bail!("CAN ingest failed ({}): {}", status, text); - } - let api: ApiResponse = resp.json().await.context("parse ingest response")?; - Ok(api.data) + let resp_bytes = self.post_proto("/sync/push", buf).await?; + PushResponse::decode(resp_bytes).context("decode PushResponse") } - /// Update asset metadata (tags, description) + /// Update metadata for an existing asset. pub async fn update_meta( &self, - hash: &str, - tags: Option>, + hash: String, description: Option, - ) -> Result<()> { - #[derive(Serialize)] - struct MetadataUpdate { - #[serde(skip_serializing_if = "Option::is_none")] - tags: Option>, - #[serde(skip_serializing_if = "Option::is_none")] - description: Option, - } - let resp = self - .client - .patch(&format!("{}/asset/{}", self.base_url, hash)) - .json(&MetadataUpdate { tags, description }) - .send() - .await - .context("update_meta request failed")?; - let status = resp.status(); - if !status.is_success() { - let text = resp.text().await.unwrap_or_default(); - anyhow::bail!("CAN update_meta failed ({}): {}", status, text); - } - Ok(()) + tags: Vec, + is_trashed: bool, + ) -> Result { + let req = MetaUpdateRequest { + hash, + description, + tags, + is_trashed, + }; + let mut buf = Vec::with_capacity(req.encoded_len()); + req.encode(&mut buf)?; + + let resp_bytes = self.post_proto("/sync/meta", buf).await?; + MetaUpdateResponse::decode(resp_bytes).context("decode MetaUpdateResponse") } - /// Check if CAN service is reachable - pub async fn health_check(&self) -> Result { - match self.list(1, 0, "desc", None).await { - Ok(_) => Ok(true), - Err(_) => Ok(false), - } + /// Health check: try to get hashes (will fail if sync API disabled). + pub async fn health_check(&self) -> bool { + self.get_hashes().await.is_ok() } } diff --git a/examples/can-sync/src/config.rs b/examples/can-sync/src/config.rs index d889d1b..5ac9a7a 100644 --- a/examples/can-sync/src/config.rs +++ b/examples/can-sync/src/config.rs @@ -1,78 +1,30 @@ -use anyhow::{Context, Result}; -use serde::{Deserialize, Serialize}; -use std::path::{Path, PathBuf}; +use serde::Deserialize; +use std::path::Path; -#[derive(Debug, Clone, Serialize, Deserialize)] +#[derive(Debug, Clone, Deserialize)] pub struct SyncConfig { - /// Base URL for the CAN service API (e.g. "http://127.0.0.1:3210/api/v1/can/0") + /// Base URL of the local CAN service (e.g. "http://127.0.0.1:3210") pub can_service_url: String, - /// Address for the CAN Sync HTTP API (e.g. "127.0.0.1:3213") - pub listen_addr: String, + /// API key for CAN service's sync endpoints (must match config.sync_api_key) + pub sync_api_key: String, - /// Directory for persistent data (peer key, sync state DB) - pub data_dir: String, + /// Shared passphrase for peer discovery (all peers must use the same one) + pub sync_passphrase: String, - /// Optional custom relay URL; null uses iroh's public relay - pub relay_url: Option, - - /// Seconds between fast polls for new assets + /// Seconds between polls for new local assets #[serde(default = "default_poll_interval")] pub poll_interval_secs: u64, - - /// Seconds between full scans of all assets - #[serde(default = "default_full_scan_interval")] - pub full_scan_interval_secs: u64, } fn default_poll_interval() -> u64 { - 5 -} - -fn default_full_scan_interval() -> u64 { - 300 + 3 } impl SyncConfig { - /// Load config from a YAML file, falling back to defaults if not found - pub fn load(path: &Path) -> Result { - if path.exists() { - let contents = - std::fs::read_to_string(path).context("Failed to read config file")?; - let config: SyncConfig = - serde_yaml::from_str(&contents).context("Failed to parse config YAML")?; - Ok(config) - } else { - tracing::warn!("Config file not found at {}, using defaults", path.display()); - Ok(Self::default()) - } - } - - /// Resolved data directory path - pub fn data_path(&self) -> PathBuf { - PathBuf::from(&self.data_dir) - } - - /// Path to the peer keypair file - pub fn peer_key_path(&self) -> PathBuf { - self.data_path().join("peer_key") - } - - /// Path to the sync state SQLite database - pub fn db_path(&self) -> PathBuf { - self.data_path().join("can_sync.db") - } -} - -impl Default for SyncConfig { - fn default() -> Self { - Self { - can_service_url: "http://127.0.0.1:3210/api/v1/can/0".to_string(), - listen_addr: "127.0.0.1:3213".to_string(), - data_dir: "./can_sync_data".to_string(), - relay_url: None, - poll_interval_secs: default_poll_interval(), - full_scan_interval_secs: default_full_scan_interval(), - } + pub fn load(path: &Path) -> anyhow::Result { + let contents = std::fs::read_to_string(path)?; + let config: Self = serde_yaml::from_str(&contents)?; + Ok(config) } } diff --git a/examples/can-sync/src/discovery.rs b/examples/can-sync/src/discovery.rs new file mode 100644 index 0000000..b17d6de --- /dev/null +++ b/examples/can-sync/src/discovery.rs @@ -0,0 +1,109 @@ +//! Peer discovery via iroh-gossip using a shared passphrase. +//! +//! All CAN sync agents with the same `sync_passphrase` derive the same +//! BLAKE3 gossip topic and discover each other automatically. + +use std::collections::HashSet; + +use anyhow::Result; +use iroh::{Endpoint, EndpointId}; +use iroh_gossip::net::Gossip; +use iroh_gossip::proto::TopicId; +use n0_future::StreamExt; +use tokio::sync::mpsc; +use tracing::{debug, info, warn}; + +/// Derive a deterministic gossip TopicId from a shared passphrase. +pub fn derive_topic(passphrase: &str) -> TopicId { + let hash = blake3::hash(format!("can-sync-v1:{}", passphrase).as_bytes()); + TopicId::from_bytes(*hash.as_bytes()) +} + +/// Manages peer discovery via gossip announcements. +pub struct Discovery { + gossip: Gossip, + topic: TopicId, + endpoint: Endpoint, +} + +impl Discovery { + pub fn new(endpoint: Endpoint, gossip: Gossip, passphrase: &str) -> Self { + let topic = derive_topic(passphrase); + info!("Gossip topic: {}", hex::encode(topic.as_bytes())); + Self { + gossip, + topic, + endpoint, + } + } + + /// Subscribe to the gossip topic and yield newly discovered peer EndpointIds. + /// + /// Sends discovered EndpointIds on the channel. Runs forever. + pub async fn run(self, tx: mpsc::Sender) -> Result<()> { + info!("Joining gossip topic for peer discovery..."); + + // Subscribe to the topic with no bootstrap peers (we discover via gossip) + let mut topic = self + .gossip + .subscribe(self.topic, vec![]) + .await + .map_err(|e| anyhow::anyhow!("gossip subscribe failed: {}", e))?; + + // Wait until we have at least one neighbor + info!("Waiting for gossip neighbors..."); + + // Broadcast our EndpointId periodically + let our_id = self.endpoint.id(); + let (sender, mut receiver) = topic.split(); + let sender_clone = sender.clone(); + tokio::spawn(async move { + let msg = our_id.as_bytes().to_vec(); + loop { + if let Err(e) = sender_clone.broadcast(msg.clone().into()).await { + warn!("Failed to broadcast discovery: {}", e); + } + tokio::time::sleep(std::time::Duration::from_secs(10)).await; + } + }); + + // Listen for peer announcements + let mut known_peers: HashSet = HashSet::new(); + + while let Some(event) = receiver.next().await { + match event { + Ok(iroh_gossip::api::Event::Received(msg)) => { + if msg.content.len() == 32 { + if let Ok(bytes) = <[u8; 32]>::try_from(msg.content.as_ref()) { + if let Ok(peer_id) = EndpointId::from_bytes(&bytes) { + if peer_id != our_id && known_peers.insert(peer_id) { + info!("Discovered new peer: {}", peer_id.fmt_short()); + let _ = tx.send(peer_id).await; + } + } + } + } + } + Ok(iroh_gossip::api::Event::NeighborUp(peer_id)) => { + if peer_id != our_id && known_peers.insert(peer_id) { + info!("Gossip neighbor up: {}", peer_id.fmt_short()); + let _ = tx.send(peer_id).await; + } + } + Ok(iroh_gossip::api::Event::NeighborDown(peer_id)) => { + info!("Gossip neighbor down: {}", peer_id.fmt_short()); + known_peers.remove(&peer_id); + } + Ok(iroh_gossip::api::Event::Lagged) => { + warn!("Gossip receiver lagged, may have missed messages"); + } + Err(e) => { + warn!("Gossip receive error: {}", e); + tokio::time::sleep(std::time::Duration::from_secs(1)).await; + } + } + } + + Ok(()) + } +} diff --git a/examples/can-sync/src/fetcher.rs b/examples/can-sync/src/fetcher.rs deleted file mode 100644 index 84f1dc1..0000000 --- a/examples/can-sync/src/fetcher.rs +++ /dev/null @@ -1,352 +0,0 @@ -use std::sync::Arc; - -use anyhow::Result; -use futures_lite::StreamExt; -use sha2::{Digest, Sha256}; -use tokio::io::AsyncReadExt; -use tracing::{debug, error, info, warn}; - -use crate::can_client::{CanClient, IngestMeta}; -use crate::library::SyncState; -use crate::manifest::AssetSyncEntry; -use crate::node::SyncNode; - -/// The fetcher receives remote asset entries from iroh documents -/// and ingests them into the local CAN service. -pub struct Fetcher { - can: CanClient, - state: Arc, - node: Arc, -} - -impl Fetcher { - pub fn new(can: CanClient, state: Arc, node: Arc) -> Self { - Self { can, state, node } - } - - /// Run the fetcher — subscribes to library document events for real-time sync, - /// falls back to periodic polling for documents without active subscriptions - pub async fn run(self) { - info!("Fetcher started — watching for remote asset entries"); - - // Run two loops concurrently: - // 1. Subscription watcher — subscribes to active library docs - // 2. Periodic checker — catches anything missed - let poll_interval = tokio::time::interval(std::time::Duration::from_secs(10)); - let sub_interval = tokio::time::interval(std::time::Duration::from_secs(5)); - - tokio::pin!(poll_interval); - tokio::pin!(sub_interval); - - loop { - tokio::select! { - _ = poll_interval.tick() => { - if let Err(e) = self.check_for_new_entries().await { - warn!("Fetcher poll error: {:#}", e); - } - } - _ = sub_interval.tick() => { - // Try to subscribe to any library docs that we haven't subscribed to yet - if let Err(e) = self.subscribe_to_libraries().await { - debug!("Fetcher subscription check: {:#}", e); - } - } - } - } - } - - /// Subscribe to document events for all libraries that have doc_ids - async fn subscribe_to_libraries(&self) -> Result<()> { - let libraries = self.state.list_libraries()?; - - for lib in &libraries { - if let Some(ref doc_id_hex) = lib.doc_id { - // Open the doc and subscribe to events - let doc = match self.node.open_doc(doc_id_hex).await { - Ok(d) => d, - Err(_) => continue, - }; - - let mut events = match doc.subscribe().await { - Ok(e) => e, - Err(_) => continue, - }; - - // Spawn a task to process events from this doc - let can = self.can.clone(); - let node_peer_id = self.node.peer_id(); - let node = self.node.clone(); - let lib_name = lib.name.clone(); - - tokio::spawn(async move { - while let Some(event) = events.next().await { - match event { - Ok(iroh_docs::engine::LiveEvent::InsertRemote { - entry, - content_status, - .. - }) => { - let key = entry.key().to_vec(); - let can_hash = String::from_utf8_lossy(&key).to_string(); - - if content_status == iroh_docs::ContentStatus::Complete { - // The entry value (our AssetSyncEntry) is available - // Read the entry content from the blob store - let content_hash = entry.content_hash(); - let mut reader = node.blobs.reader(content_hash); - let mut buf = Vec::new(); - if reader.read_to_end(&mut buf).await.is_ok() { - if let Ok(sync_entry) = AssetSyncEntry::from_bytes(&buf) { - if sync_entry.last_modified_by == node_peer_id { - continue; // Skip our own entries - } - info!( - "Received remote entry for {} in library '{}'", - &can_hash[..can_hash.len().min(12)], - lib_name - ); - if let Err(e) = process_remote_entry( - &can, - &node, - &node_peer_id, - &can_hash, - sync_entry, - ) - .await - { - error!( - "Error processing remote entry {}: {:#}", - &can_hash[..can_hash.len().min(12)], - e - ); - } - } - } - } - } - Ok(iroh_docs::engine::LiveEvent::NeighborUp(peer)) => { - info!("Peer connected: {}", peer.fmt_short()); - } - Ok(iroh_docs::engine::LiveEvent::NeighborDown(peer)) => { - info!("Peer disconnected: {}", peer.fmt_short()); - } - Ok(_) => {} // Ignore other events - Err(e) => { - warn!("Document event error: {:#}", e); - break; - } - } - } - }); - - // Only subscribe to one doc per tick to avoid overwhelming - return Ok(()); - } - } - Ok(()) - } - - /// Check all library documents for entries we don't have locally (polling fallback) - async fn check_for_new_entries(&self) -> Result<()> { - let libraries = self.state.list_libraries()?; - - for lib in &libraries { - if let Some(ref doc_id_hex) = lib.doc_id { - let doc = match self.node.open_doc(doc_id_hex).await { - Ok(d) => d, - Err(_) => continue, - }; - - // Query all entries (latest per key) - let query = iroh_docs::store::Query::single_latest_per_key().build(); - let entries = match doc.get_many(query).await { - Ok(e) => e, - Err(_) => continue, - }; - tokio::pin!(entries); - - while let Some(Ok(entry)) = entries.next().await { - let key = entry.key().to_vec(); - let can_hash = String::from_utf8_lossy(&key).to_string(); - - // Read the entry value (AssetSyncEntry) - let content_hash = entry.content_hash(); - let mut reader = self.node.blobs.reader(content_hash); - let mut buf = Vec::new(); - if reader.read_to_end(&mut buf).await.is_err() { - continue; - } - - let sync_entry = match AssetSyncEntry::from_bytes(&buf) { - Ok(e) => e, - Err(_) => continue, - }; - - // Skip our own entries - if sync_entry.last_modified_by == self.node.peer_id() { - continue; - } - - // Check if already processed - if self.state.is_announced(&lib.id, &can_hash).unwrap_or(false) { - continue; - } - - info!( - "Polling found remote entry for {} in library '{}'", - &can_hash[..can_hash.len().min(12)], - lib.name - ); - - if let Err(e) = process_remote_entry( - &self.can, - &self.node, - &self.node.peer_id(), - &can_hash, - sync_entry, - ) - .await - { - error!( - "Error processing remote entry {}: {:#}", - &can_hash[..can_hash.len().min(12)], - e - ); - } - - // Mark as processed - let _ = self.state.mark_announced(&lib.id, &can_hash, 1); - } - } - } - Ok(()) - } -} - -/// Process a remote asset entry — download blob and ingest into CAN service -async fn process_remote_entry( - can: &CanClient, - node: &SyncNode, - local_peer_id: &str, - can_hash: &str, - entry: AssetSyncEntry, -) -> Result<()> { - // Skip if this is our own entry - if entry.last_modified_by == local_peer_id { - return Ok(()); - } - - // Check if already in local CAN service - match can.get_meta(can_hash).await { - Ok(existing) => { - // Asset exists — check if metadata needs updating - if entry.tags != existing.tags - || entry.description != existing.description - || entry.is_trashed != existing.is_trashed - { - info!("Updating metadata for {} from remote peer", &can_hash[..12]); - can.update_meta( - can_hash, - Some(entry.tags.clone()), - entry.description.clone(), - ) - .await?; - } - return Ok(()); - } - Err(_) => { - // Asset not found locally — need to fetch and ingest - } - } - - info!( - "Fetching remote asset {} ({}B) from peer {}", - &can_hash[..12], - entry.size, - &entry.last_modified_by[..entry.last_modified_by.len().min(12)] - ); - - // Download blob via iroh - let content = download_blob(node, &entry).await?; - - if content.is_empty() { - warn!("Downloaded empty blob for {} — skipping", &can_hash[..12]); - return Ok(()); - } - - // Verify CAN hash: SHA256(timestamp_bytes + content) - if !verify_can_hash(can_hash, entry.timestamp, &content) { - error!( - "CAN hash verification failed for {} — rejecting", - &can_hash[..12] - ); - return Ok(()); - } - - // Ingest into local CAN service - let meta = IngestMeta { - mime_type: Some(entry.mime_type.clone()), - human_file_name: entry.human_filename.clone(), - human_readable_path: entry.human_path.clone(), - application: entry.application.clone(), - user: entry.user.clone(), - tags: if entry.tags.is_empty() { - None - } else { - Some(entry.tags.join(",")) - }, - description: entry.description.clone(), - }; - - match can.ingest(content.into(), meta).await { - Ok(result) => { - info!( - "Ingested remote asset: hash={}, filename={}", - &result.hash[..12], - result.filename - ); - } - Err(e) => { - error!("Failed to ingest remote asset {}: {:#}", &can_hash[..12], e); - } - } - - Ok(()) -} - -/// Download a blob via iroh using the blob hash from the sync entry -async fn download_blob(node: &SyncNode, entry: &AssetSyncEntry) -> Result> { - let blob_hash_str = match &entry.iroh_blob_hash { - Some(h) => h, - None => { - warn!("No iroh blob hash in sync entry — cannot download"); - return Ok(Vec::new()); - } - }; - - // Parse the BLAKE3 hash - let blob_hash: iroh_blobs::Hash = blob_hash_str - .parse() - .map_err(|_| anyhow::anyhow!("Invalid iroh blob hash: {}", &blob_hash_str[..12]))?; - - // Read from the local blob store (iroh-docs should have synced it) - let mut reader = node.blobs.reader(blob_hash); - let mut buf = Vec::with_capacity(entry.size as usize); - reader.read_to_end(&mut buf).await?; - - debug!( - "Downloaded blob {} ({} bytes)", - &blob_hash_str[..12], - buf.len() - ); - Ok(buf) -} - -/// Verify CAN hash: SHA256(timestamp_string + content) matches expected hash -fn verify_can_hash(expected_hash: &str, timestamp: i64, content: &[u8]) -> bool { - let mut hasher = Sha256::new(); - hasher.update(timestamp.to_string().as_bytes()); - hasher.update(content); - let computed = hex::encode(hasher.finalize()); - computed == expected_hash -} diff --git a/examples/can-sync/src/library.rs b/examples/can-sync/src/library.rs deleted file mode 100644 index 1c0f985..0000000 --- a/examples/can-sync/src/library.rs +++ /dev/null @@ -1,288 +0,0 @@ -use anyhow::{Context, Result}; -use rusqlite::Connection; -use serde::{Deserialize, Serialize}; - -use crate::can_client::AssetMeta; - -/// Filter criteria that determines which CAN assets belong to a library -#[derive(Debug, Clone, Serialize, Deserialize)] -pub struct LibraryFilter { - /// Match assets with this application tag - #[serde(skip_serializing_if = "Option::is_none")] - pub application: Option, - /// Match assets with any of these tags - #[serde(skip_serializing_if = "Option::is_none")] - pub tags: Option>, - /// Match assets from this user - #[serde(skip_serializing_if = "Option::is_none")] - pub user: Option, - /// Match assets with MIME type prefix (e.g. "image/") - #[serde(skip_serializing_if = "Option::is_none")] - pub mime_prefix: Option, - /// Manual list of specific hashes to include - #[serde(skip_serializing_if = "Option::is_none")] - pub hashes: Option>, -} - -impl LibraryFilter { - /// Check if an asset matches this filter - pub fn matches(&self, asset: &AssetMeta) -> bool { - // If hashes list is set, only match those exact hashes - if let Some(ref hashes) = self.hashes { - return hashes.contains(&asset.hash); - } - - // All set criteria must match (AND logic) - if let Some(ref app) = self.application { - if asset.application.as_deref() != Some(app.as_str()) { - return false; - } - } - - if let Some(ref required_tags) = self.tags { - // Asset must have at least one of the required tags - if !required_tags.iter().any(|t| asset.tags.contains(t)) { - return false; - } - } - - if let Some(ref user) = self.user { - if asset.user.as_deref() != Some(user.as_str()) { - return false; - } - } - - if let Some(ref prefix) = self.mime_prefix { - if !asset.mime_type.starts_with(prefix.as_str()) { - return false; - } - } - - true - } -} - -/// A library definition stored locally -#[derive(Debug, Clone, Serialize, Deserialize)] -pub struct Library { - /// Unique library ID (UUID) - pub id: String, - /// Human-readable name - pub name: String, - /// Filter criteria - pub filter: LibraryFilter, - /// iroh document ID (namespace) — set after creation - pub doc_id: Option, - /// Whether this library was created locally or joined from remote - pub is_local: bool, - /// Creation timestamp - pub created_at: i64, -} - -/// Tracks which assets have been announced to which libraries. -/// Uses std::sync::Mutex because rusqlite::Connection is !Send, -/// so tokio::sync::RwLock won't work across .await points. -pub struct SyncState { - db: std::sync::Mutex, -} - -impl SyncState { - /// Open or create the sync state database - pub fn open(path: &std::path::Path) -> Result { - let db = Connection::open(path).context("open sync state DB")?; - db.execute_batch( - " - CREATE TABLE IF NOT EXISTS libraries ( - id TEXT PRIMARY KEY, - name TEXT NOT NULL, - filter_json TEXT NOT NULL, - doc_id TEXT, - is_local INTEGER NOT NULL DEFAULT 1, - created_at INTEGER NOT NULL - ); - - CREATE TABLE IF NOT EXISTS announced_assets ( - library_id TEXT NOT NULL, - hash TEXT NOT NULL, - version INTEGER NOT NULL DEFAULT 1, - announced_at INTEGER NOT NULL, - PRIMARY KEY (library_id, hash), - FOREIGN KEY (library_id) REFERENCES libraries(id) ON DELETE CASCADE - ); - - CREATE TABLE IF NOT EXISTS sync_state ( - key TEXT PRIMARY KEY, - value TEXT NOT NULL - ); - ", - ) - .context("init sync state tables")?; - Ok(Self { - db: std::sync::Mutex::new(db), - }) - } - - fn lock_db(&self) -> std::sync::MutexGuard<'_, Connection> { - self.db.lock().expect("sync state DB lock poisoned") - } - - // ── Library CRUD ── - - pub fn save_library(&self, lib: &Library) -> Result<()> { - let db = self.lock_db(); - let filter_json = serde_json::to_string(&lib.filter)?; - db.execute( - "INSERT OR REPLACE INTO libraries (id, name, filter_json, doc_id, is_local, created_at) - VALUES (?1, ?2, ?3, ?4, ?5, ?6)", - rusqlite::params![ - lib.id, - lib.name, - filter_json, - lib.doc_id, - lib.is_local as i32, - lib.created_at, - ], - )?; - Ok(()) - } - - pub fn list_libraries(&self) -> Result> { - let db = self.lock_db(); - let mut stmt = - db.prepare("SELECT id, name, filter_json, doc_id, is_local, created_at FROM libraries")?; - let libs = stmt - .query_map([], |row| { - let filter_json: String = row.get(2)?; - Ok(Library { - id: row.get(0)?, - name: row.get(1)?, - filter: serde_json::from_str(&filter_json).unwrap_or(LibraryFilter { - application: None, - tags: None, - user: None, - mime_prefix: None, - hashes: None, - }), - doc_id: row.get(3)?, - is_local: row.get::<_, i32>(4)? != 0, - created_at: row.get(5)?, - }) - })? - .collect::, _>>()?; - Ok(libs) - } - - pub fn get_library(&self, id: &str) -> Result> { - let db = self.lock_db(); - let mut stmt = db.prepare( - "SELECT id, name, filter_json, doc_id, is_local, created_at FROM libraries WHERE id = ?1", - )?; - let mut rows = stmt.query_map([id], |row| { - let filter_json: String = row.get(2)?; - Ok(Library { - id: row.get(0)?, - name: row.get(1)?, - filter: serde_json::from_str(&filter_json).unwrap_or(LibraryFilter { - application: None, - tags: None, - user: None, - mime_prefix: None, - hashes: None, - }), - doc_id: row.get(3)?, - is_local: row.get::<_, i32>(4)? != 0, - created_at: row.get(5)?, - }) - })?; - match rows.next() { - Some(Ok(lib)) => Ok(Some(lib)), - Some(Err(e)) => Err(e.into()), - None => Ok(None), - } - } - - pub fn delete_library(&self, id: &str) -> Result<()> { - let db = self.lock_db(); - db.execute("DELETE FROM announced_assets WHERE library_id = ?1", [id])?; - db.execute("DELETE FROM libraries WHERE id = ?1", [id])?; - Ok(()) - } - - pub fn update_library_doc_id(&self, id: &str, doc_id: &str) -> Result<()> { - let db = self.lock_db(); - db.execute( - "UPDATE libraries SET doc_id = ?1 WHERE id = ?2", - [doc_id, id], - )?; - Ok(()) - } - - // ── Asset announcement tracking ── - - pub fn is_announced(&self, library_id: &str, hash: &str) -> Result { - let db = self.lock_db(); - let count: i64 = db.query_row( - "SELECT COUNT(*) FROM announced_assets WHERE library_id = ?1 AND hash = ?2", - [library_id, hash], - |row| row.get(0), - )?; - Ok(count > 0) - } - - pub fn get_announced_version(&self, library_id: &str, hash: &str) -> Result> { - let db = self.lock_db(); - let mut stmt = db.prepare( - "SELECT version FROM announced_assets WHERE library_id = ?1 AND hash = ?2", - )?; - let mut rows = stmt.query_map(rusqlite::params![library_id, hash], |row| { - row.get::<_, i64>(0) - })?; - match rows.next() { - Some(Ok(v)) => Ok(Some(v as u64)), - Some(Err(e)) => Err(e.into()), - None => Ok(None), - } - } - - pub fn mark_announced(&self, library_id: &str, hash: &str, version: u64) -> Result<()> { - let db = self.lock_db(); - let now = chrono::Utc::now().timestamp_millis(); - db.execute( - "INSERT OR REPLACE INTO announced_assets (library_id, hash, version, announced_at) - VALUES (?1, ?2, ?3, ?4)", - rusqlite::params![library_id, hash, version as i64, now], - )?; - Ok(()) - } - - pub fn remove_announced(&self, library_id: &str, hash: &str) -> Result<()> { - let db = self.lock_db(); - db.execute( - "DELETE FROM announced_assets WHERE library_id = ?1 AND hash = ?2", - [library_id, hash], - )?; - Ok(()) - } - - // ── General state ── - - pub fn get_state(&self, key: &str) -> Result> { - let db = self.lock_db(); - let mut stmt = db.prepare("SELECT value FROM sync_state WHERE key = ?1")?; - let mut rows = stmt.query_map([key], |row| row.get::<_, String>(0))?; - match rows.next() { - Some(Ok(v)) => Ok(Some(v)), - Some(Err(e)) => Err(e.into()), - None => Ok(None), - } - } - - pub fn set_state(&self, key: &str, value: &str) -> Result<()> { - let db = self.lock_db(); - db.execute( - "INSERT OR REPLACE INTO sync_state (key, value) VALUES (?1, ?2)", - [key, value], - )?; - Ok(()) - } -} diff --git a/examples/can-sync/src/main.rs b/examples/can-sync/src/main.rs index 4dc31ad..7b6c3d2 100644 --- a/examples/can-sync/src/main.rs +++ b/examples/can-sync/src/main.rs @@ -1,121 +1,155 @@ -#![allow(dead_code)] +//! CAN Sync — P2P full-mirror replication agent for CAN Service. +//! +//! Uses iroh for encrypted QUIC transport + NAT traversal, +//! and iroh-gossip for peer discovery via a shared passphrase. +//! +//! Each instance talks to its local CAN Service via the private +//! protobuf sync API (/sync/*), authenticated with an API key. -mod announcer; mod can_client; mod config; -mod fetcher; -mod library; -mod manifest; -mod node; -mod routes; - -use std::path::PathBuf; -use std::sync::Arc; +mod discovery; +mod peer; +mod protocol; use anyhow::{Context, Result}; -use tracing::info; +use iroh::endpoint::presets::N0; +use iroh::{Endpoint, EndpointId}; +use iroh_gossip::net::Gossip; +use tokio::sync::mpsc; +use tracing::{error, info, warn}; -use crate::announcer::Announcer; -use crate::can_client::CanClient; +use crate::can_client::CanSyncClient; use crate::config::SyncConfig; -use crate::fetcher::Fetcher; -use crate::library::SyncState; -use crate::node::SyncNode; -use crate::routes::AppState; +use crate::discovery::Discovery; + +/// ALPN protocol identifier for CAN sync peer connections. +const SYNC_ALPN: &[u8] = b"can-sync/1"; #[tokio::main] async fn main() -> Result<()> { - // Initialize tracing + // Initialize logging tracing_subscriber::fmt() .with_env_filter( tracing_subscriber::EnvFilter::try_from_default_env() - .unwrap_or_else(|_| "can_sync=info,iroh=warn".parse().unwrap()), + .unwrap_or_else(|_| "can_sync=info,iroh=warn,iroh_gossip=warn".parse().unwrap()), ) .init(); // Load config let config_path = std::env::args() .nth(1) - .map(PathBuf::from) - .unwrap_or_else(|| PathBuf::from("config.yaml")); + .unwrap_or_else(|| "config.yaml".to_string()); + let config = SyncConfig::load(std::path::Path::new(&config_path)) + .with_context(|| format!("loading config from {}", config_path))?; - let config = SyncConfig::load(&config_path)?; - info!("CAN Sync starting..."); - info!(" CAN service: {}", config.can_service_url); - info!(" Listen addr: {}", config.listen_addr); - info!(" Data dir: {}", config.data_dir); + info!("CAN Sync v2 starting"); + info!("CAN service: {}", config.can_service_url); + info!("Poll interval: {}s", config.poll_interval_secs); - // Ensure data directory exists - std::fs::create_dir_all(config.data_path()) - .context("Failed to create data directory")?; + // Create HTTP client for local CAN service's sync API + let can = CanSyncClient::new(&config.can_service_url, &config.sync_api_key); - // Initialize CAN service client - let can = CanClient::new(&config.can_service_url); + // Verify CAN service is reachable + if can.health_check().await { + info!("CAN service sync API is healthy"); + } else { + warn!("CAN service sync API not reachable — will retry on sync"); + } - // Check CAN service health - match can.health_check().await { - Ok(true) => info!("CAN service is reachable"), - Ok(false) | Err(_) => { - tracing::warn!( - "CAN service at {} is not reachable — will retry on each poll", - config.can_service_url - ); + // Create iroh endpoint for QUIC transport with n0 defaults (relay + discovery) + let endpoint = Endpoint::builder() + .preset(N0) + .alpns(vec![SYNC_ALPN.to_vec()]) + .bind() + .await + .context("creating iroh endpoint")?; + + let node_id = endpoint.id(); + info!("Node ID: {}", node_id); + + let addrs = endpoint.bound_sockets(); + if let Some(addr) = addrs.first() { + info!("Listening on {}", addr); + } + + // Create gossip instance for peer discovery (not async — returns directly) + let gossip = Gossip::builder().spawn(endpoint.clone()); + + // Channel for discovered peers + let (peer_tx, mut peer_rx) = mpsc::channel::(32); + + // Spawn discovery + let disc = Discovery::new(endpoint.clone(), gossip.clone(), &config.sync_passphrase); + tokio::spawn(async move { + if let Err(e) = disc.run(peer_tx).await { + error!("Discovery failed: {:#}", e); } + }); + + // Spawn incoming connection handler + let endpoint_accept = endpoint.clone(); + let can_accept = can.clone(); + tokio::spawn(async move { + loop { + match endpoint_accept.accept().await { + Some(incoming) => { + let can_clone = can_accept.clone(); + tokio::spawn(async move { + match incoming.await { + Ok(conn) => { + peer::handle_incoming(conn, can_clone).await; + } + Err(e) => { + warn!("Failed to accept connection: {:#}", e); + } + } + }); + } + None => { + info!("Endpoint closed, stopping accept loop"); + break; + } + } + } + }); + + // Main loop: connect to discovered peers and sync + let poll_interval = std::time::Duration::from_secs(config.poll_interval_secs); + + info!("Waiting for peers..."); + + while let Some(peer_id) = peer_rx.recv().await { + let short = peer_id.fmt_short(); + info!("Connecting to discovered peer: {}", short); + + let endpoint_clone = endpoint.clone(); + let can_clone = can.clone(); + let poll_dur = poll_interval; + + tokio::spawn(async move { + // Connect to peer (EndpointId implements Into) + let conn = match endpoint_clone.connect(peer_id, SYNC_ALPN).await { + Ok(c) => c, + Err(e) => { + error!("Failed to connect to {}: {:#}", short, e); + return; + } + }; + + // Initial reconciliation + if let Err(e) = peer::run_sync_session(conn.clone(), can_clone.clone(), true).await { + error!("Initial sync with {} failed: {:#}", short, e); + return; + } + + // Live sync loop — keep pushing new assets + if let Err(e) = peer::live_sync_loop(conn, can_clone, poll_dur).await { + warn!("Live sync with {} ended: {:#}", short, e); + } + }); } - // Open sync state database - let state = SyncState::open(&config.db_path())?; - let state = Arc::new(state); - info!("Sync state DB opened at {}", config.db_path().display()); - - // Start iroh P2P node - let node = SyncNode::spawn(&config).await?; - let node = Arc::new(node); - info!("iroh node ID: {}", node.peer_id()); - - // Build shared app state - let app_state = Arc::new(AppState { - node: node.clone(), - state: state.clone(), - can: can.clone(), - }); - - // Start the announcer (polls CAN service for new assets) - let announcer = Announcer::new( - can.clone(), - state.clone(), - node.clone(), - config.poll_interval_secs, - config.full_scan_interval_secs, - ); - tokio::spawn(async move { - announcer.run().await; - }); - - // Start the fetcher (receives remote assets and ingests them) - let fetcher = Fetcher::new(can.clone(), state.clone(), node.clone()); - tokio::spawn(async move { - fetcher.run().await; - }); - - // Build HTTP router - let router = routes::build_router(app_state); - - // Start HTTP server - let listener = tokio::net::TcpListener::bind(&config.listen_addr) - .await - .context("Failed to bind HTTP listener")?; - info!("CAN Sync API listening on http://{}", config.listen_addr); - - // Open browser to status page - let status_url = format!("http://{}/status", config.listen_addr); - if open::that(&status_url).is_err() { - info!("Open {} in your browser to check status", status_url); - } - - axum::serve(listener, router) - .await - .context("HTTP server error")?; - + info!("CAN Sync shutting down"); Ok(()) } diff --git a/examples/can-sync/src/manifest.rs b/examples/can-sync/src/manifest.rs deleted file mode 100644 index b8210ed..0000000 --- a/examples/can-sync/src/manifest.rs +++ /dev/null @@ -1,75 +0,0 @@ -use serde::{Deserialize, Serialize}; - -use crate::can_client::AssetMeta; - -/// Entry stored in iroh documents for each synced asset. -/// Key = CAN hash, Value = serialized AssetSyncEntry -#[derive(Debug, Clone, Serialize, Deserialize)] -pub struct AssetSyncEntry { - /// CAN timestamp (milliseconds since epoch) - pub timestamp: i64, - /// MIME type - pub mime_type: String, - /// Application tag - pub application: Option, - /// User identity - pub user: Option, - /// Tags list - pub tags: Vec, - /// Description - pub description: Option, - /// Original human-readable filename - pub human_filename: Option, - /// Original human-readable path - pub human_path: Option, - /// File size in bytes - pub size: i64, - /// Whether the asset is trashed - pub is_trashed: bool, - /// iroh blob hash (BLAKE3) for downloading via iroh - pub iroh_blob_hash: Option, - /// Version counter for conflict resolution (higher wins) - pub version: u64, - /// Peer ID that last modified this entry - pub last_modified_by: String, -} - -impl AssetSyncEntry { - /// Create from CAN service asset metadata - pub fn from_asset_meta(meta: &AssetMeta, peer_id: &str) -> Self { - Self { - timestamp: meta.timestamp, - mime_type: meta.mime_type.clone(), - application: meta.application.clone(), - user: meta.user.clone(), - tags: meta.tags.clone(), - description: meta.description.clone(), - human_filename: meta.human_filename.clone(), - human_path: meta.human_path.clone(), - size: meta.size, - is_trashed: meta.is_trashed, - iroh_blob_hash: None, - version: 1, - last_modified_by: peer_id.to_string(), - } - } - - /// Serialize to bytes for storage in iroh document - pub fn to_bytes(&self) -> Vec { - postcard::to_allocvec(self).expect("serialize AssetSyncEntry") - } - - /// Deserialize from bytes - pub fn from_bytes(bytes: &[u8]) -> anyhow::Result { - Ok(postcard::from_bytes(bytes)?) - } - - /// Check if metadata differs from a CAN asset (indicates update needed) - pub fn metadata_differs(&self, meta: &AssetMeta) -> bool { - self.tags != meta.tags - || self.description != meta.description - || self.is_trashed != meta.is_trashed - || self.human_filename != meta.human_filename - || self.human_path != meta.human_path - } -} diff --git a/examples/can-sync/src/node.rs b/examples/can-sync/src/node.rs deleted file mode 100644 index 4a32d61..0000000 --- a/examples/can-sync/src/node.rs +++ /dev/null @@ -1,150 +0,0 @@ -use anyhow::{Context, Result}; -use iroh::protocol::Router; -use iroh::Endpoint; -use iroh_blobs::store::mem::MemStore; -use iroh_blobs::{BlobsProtocol, ALPN as BLOBS_ALPN}; -use iroh_docs::api::protocol::{AddrInfoOptions, ShareMode}; -use iroh_docs::protocol::Docs; -use iroh_docs::{AuthorId, DocTicket, NamespaceId, ALPN as DOCS_ALPN}; -use iroh_gossip::net::Gossip; -use iroh_gossip::ALPN as GOSSIP_ALPN; -use tokio::sync::OnceCell; - -use crate::config::SyncConfig; - -/// Holds all iroh subsystems for the P2P node -pub struct SyncNode { - pub endpoint: Endpoint, - pub blobs: BlobsProtocol, - pub docs: Docs, - pub gossip: Gossip, - pub router: Router, - /// Cached default author ID (created once on startup) - author_id: OnceCell, -} - -impl SyncNode { - /// Start the iroh node with all protocol handlers - pub async fn spawn(_config: &SyncConfig) -> Result { - // Build endpoint (Ed25519 keypair auto-generated and cached) - let endpoint = Endpoint::bind() - .await - .map_err(|e| anyhow::anyhow!("Failed to bind iroh endpoint: {}", e))?; - - tracing::info!( - "iroh node started — EndpointID: {}", - endpoint.id() - ); - - // Gossip for peer communication - let gossip = Gossip::builder().spawn(endpoint.clone()); - - // Blob store (in-memory — blobs are transient, CAN service is authoritative) - let mem_store = MemStore::default(); - let blobs_store: &iroh_blobs::api::Store = &mem_store; - let blobs = BlobsProtocol::new(blobs_store, None); - - // Document sync (CRDT-replicated key-value store) - let docs = Docs::memory() - .spawn(endpoint.clone(), blobs_store.clone(), gossip.clone()) - .await - .context("Failed to spawn iroh-docs")?; - - // Router accepts incoming connections and dispatches to handlers - let router = Router::builder(endpoint.clone()) - .accept(BLOBS_ALPN, blobs.clone()) - .accept(GOSSIP_ALPN, gossip.clone()) - .accept(DOCS_ALPN, docs.clone()) - .spawn(); - - Ok(Self { - endpoint, - blobs, - docs, - gossip, - router, - author_id: OnceCell::new(), - }) - } - - /// Get this node's peer ID as a string - pub fn peer_id(&self) -> String { - self.endpoint.id().to_string() - } - - /// Get the node's endpoint address info for sharing - pub fn endpoint_addr(&self) -> iroh::EndpointAddr { - self.endpoint.addr() - } - - /// Get or create the default author for writing to documents - pub async fn author(&self) -> Result { - self.author_id - .get_or_try_init(|| async { - self.docs.author_default().await - }) - .await - .copied() - } - - /// Create a new iroh document and return its NamespaceId as a hex string - pub async fn create_doc(&self) -> Result { - let doc = self.docs.create().await?; - let ns_id = doc.id(); - Ok(hex::encode(ns_id.to_bytes())) - } - - /// Open an existing document by its hex-encoded namespace ID - pub async fn open_doc(&self, doc_id_hex: &str) -> Result { - let ns_id = parse_namespace_id(doc_id_hex)?; - self.docs - .open(ns_id) - .await? - .ok_or_else(|| anyhow::anyhow!("Document {} not found", &doc_id_hex[..12])) - } - - /// Write a key-value entry to a document - pub async fn write_to_doc( - &self, - doc_id_hex: &str, - key: &[u8], - value: &[u8], - ) -> Result<()> { - let doc = self.open_doc(doc_id_hex).await?; - let author = self.author().await?; - doc.set_bytes(author, key.to_vec(), value.to_vec()).await?; - Ok(()) - } - - /// Generate a share ticket (DocTicket) for a document - pub async fn share_doc(&self, doc_id_hex: &str) -> Result { - let doc = self.open_doc(doc_id_hex).await?; - let ticket = doc - .share(ShareMode::Write, AddrInfoOptions::RelayAndAddresses) - .await?; - Ok(ticket) - } - - /// Import a document from a DocTicket, returns the namespace ID as hex - pub async fn import_doc(&self, ticket: DocTicket) -> Result { - let doc = self.docs.import(ticket).await?; - let ns_id = doc.id(); - Ok(hex::encode(ns_id.to_bytes())) - } - - /// Graceful shutdown - pub async fn shutdown(self) -> Result<()> { - tracing::info!("Shutting down iroh node..."); - self.router.shutdown().await?; - Ok(()) - } -} - -/// Parse a hex-encoded NamespaceId -pub fn parse_namespace_id(hex_str: &str) -> Result { - let bytes: [u8; 32] = hex::decode(hex_str) - .context("Invalid hex in doc_id")? - .try_into() - .map_err(|_| anyhow::anyhow!("doc_id must be 32 bytes (64 hex chars)"))?; - Ok(NamespaceId::from(bytes)) -} diff --git a/examples/can-sync/src/peer.rs b/examples/can-sync/src/peer.rs new file mode 100644 index 0000000..d583a28 --- /dev/null +++ b/examples/can-sync/src/peer.rs @@ -0,0 +1,320 @@ +//! Per-peer sync: reconciliation and live bidirectional asset transfer. +//! +//! When two sync agents connect, they: +//! 1. Exchange hash lists (from their local CAN services) +//! 2. Compute the diff (what each side is missing) +//! 3. Send missing assets to each other +//! 4. Continue polling for new assets and pushing them + +use std::collections::{HashMap, HashSet}; + +use anyhow::{Context, Result}; +use iroh::endpoint::Connection; +use prost::Message; +use tokio::io::{AsyncReadExt, AsyncWriteExt}; +use tracing::{debug, error, info, warn}; + +use crate::can_client::CanSyncClient; +use crate::protocol::*; + +// Message type tags for QUIC stream framing +const MSG_HASH_SET: u8 = 0x01; +const MSG_ASSET_BUNDLE: u8 = 0x02; +const MSG_META_UPDATE: u8 = 0x03; +const MSG_DONE: u8 = 0x04; + +/// Frame a protobuf message with a type tag and length prefix. +fn encode_frame(msg_type: u8, payload: &[u8]) -> Vec { + let len = payload.len() as u32; + let mut frame = Vec::with_capacity(5 + payload.len()); + frame.push(msg_type); + frame.extend_from_slice(&len.to_be_bytes()); + frame.extend_from_slice(payload); + frame +} + +/// Read a single framed message from a QUIC recv stream. +/// Returns (msg_type, payload_bytes). +async fn read_frame(recv: &mut iroh::endpoint::RecvStream) -> Result<(u8, Vec)> { + let msg_type = recv.read_u8().await.context("reading message type")?; + let len = recv.read_u32().await.context("reading message length")?; + + if len > 256 * 1024 * 1024 { + anyhow::bail!("Message too large: {} bytes", len); + } + + let mut payload = vec![0u8; len as usize]; + recv.read_exact(&mut payload) + .await + .context("reading message payload")?; + Ok((msg_type, payload)) +} + +/// Run a full sync session with a connected peer. +/// +/// This handles both initial reconciliation and ongoing transfer. +/// Called for both outgoing connections (we initiated) and incoming connections. +pub async fn run_sync_session( + conn: Connection, + can: CanSyncClient, + is_initiator: bool, +) -> Result<()> { + let peer_id = conn.remote_id(); + let short_id = peer_id.fmt_short().to_string(); + info!("Starting sync session with {} (initiator={})", short_id, is_initiator); + + // Open a bi-directional stream for the sync protocol + let (mut send, mut recv) = conn.open_bi().await.context("opening bi stream")?; + + // Step 1: Get our local hash list from CAN service + let our_hashes = can.get_hashes().await.context("getting local hashes")?; + let our_hash_map: HashMap = our_hashes + .assets + .iter() + .map(|a| (a.hash.clone(), a)) + .collect(); + + info!( + "Local state: {} assets, sending to peer {}", + our_hashes.assets.len(), + short_id + ); + + // Step 2: Send our hash set to peer + let hash_set_msg = PeerHashSet { + assets: our_hashes.assets.clone(), + }; + let mut buf = Vec::with_capacity(hash_set_msg.encoded_len()); + hash_set_msg.encode(&mut buf)?; + let frame = encode_frame(MSG_HASH_SET, &buf); + send.write_all(&frame).await.context("sending hash set")?; + send.flush().await?; + + // Step 3: Receive peer's hash set + let (msg_type, payload) = read_frame(&mut recv).await.context("reading peer hash set")?; + if msg_type != MSG_HASH_SET { + anyhow::bail!("Expected hash set message, got type {}", msg_type); + } + let peer_hash_set = PeerHashSet::decode(payload.as_slice()).context("decoding peer hash set")?; + + let peer_hash_map: HashMap = peer_hash_set + .assets + .iter() + .map(|a| (a.hash.clone(), a)) + .collect(); + + info!( + "Peer {} has {} assets", + short_id, + peer_hash_set.assets.len() + ); + + // Step 4: Compute diffs + let our_hashes_set: HashSet<&String> = our_hash_map.keys().collect(); + let peer_hashes_set: HashSet<&String> = peer_hash_map.keys().collect(); + + let we_need: Vec = peer_hashes_set + .difference(&our_hashes_set) + .map(|h| (*h).clone()) + .collect(); + let they_need: Vec = our_hashes_set + .difference(&peer_hashes_set) + .map(|h| (*h).clone()) + .collect(); + + info!( + "Diff with {}: we need {}, they need {}", + short_id, + we_need.len(), + they_need.len() + ); + + // Step 5: Send assets the peer is missing + if !they_need.is_empty() { + send_assets(&can, &mut send, &they_need, &short_id).await?; + } + + // Send DONE marker + let done_frame = encode_frame(MSG_DONE, &[]); + send.write_all(&done_frame).await?; + send.flush().await?; + + // Step 6: Receive assets we're missing + receive_assets(&can, &mut recv, &short_id).await?; + + info!("Sync session with {} complete", short_id); + Ok(()) +} + +/// Pull assets from local CAN service and send them to the peer. +async fn send_assets( + can: &CanSyncClient, + send: &mut iroh::endpoint::SendStream, + hashes: &[String], + peer_short: &str, +) -> Result<()> { + // Pull assets in batches to avoid huge single requests + for chunk in hashes.chunks(10) { + let pull_resp = can + .pull(chunk.to_vec()) + .await + .context("pulling assets from CAN")?; + + for bundle in pull_resp.bundles { + let hash_short = &bundle.hash[..bundle.hash.len().min(12)]; + info!("Sending asset {} to peer {}", hash_short, peer_short); + + let mut buf = Vec::with_capacity(bundle.encoded_len()); + bundle.encode(&mut buf)?; + let frame = encode_frame(MSG_ASSET_BUNDLE, &buf); + send.write_all(&frame).await?; + send.flush().await?; + } + } + Ok(()) +} + +/// Receive assets from peer and push them to local CAN service. +async fn receive_assets( + can: &CanSyncClient, + recv: &mut iroh::endpoint::RecvStream, + peer_short: &str, +) -> Result<()> { + loop { + let (msg_type, payload) = read_frame(recv).await.context("reading asset from peer")?; + + match msg_type { + MSG_DONE => { + debug!("Peer {} finished sending assets", peer_short); + break; + } + MSG_ASSET_BUNDLE => { + let bundle = + AssetBundle::decode(payload.as_slice()).context("decoding asset bundle")?; + let hash_short = bundle.hash[..bundle.hash.len().min(12)].to_string(); + info!("Received asset {} from peer {}", hash_short, peer_short); + + match can.push(bundle).await { + Ok(resp) => { + if resp.already_existed { + debug!("Asset {} already existed locally", hash_short); + } else { + info!("Ingested asset {} from peer {}", resp.hash, peer_short); + } + } + Err(e) => { + error!("Failed to push asset {} to CAN: {:#}", hash_short, e); + } + } + } + MSG_META_UPDATE => { + let meta = MetaUpdateRequest::decode(payload.as_slice()) + .context("decoding meta update")?; + let hash_short = meta.hash[..meta.hash.len().min(12)].to_string(); + debug!( + "Received meta update for {} from peer {}", + hash_short, peer_short + ); + + if let Err(e) = can + .update_meta( + meta.hash.clone(), + meta.description.clone(), + meta.tags.clone(), + meta.is_trashed, + ) + .await + { + error!("Failed to update meta for {}: {:#}", hash_short, e); + } + } + other => { + warn!("Unknown message type {} from peer {}", other, peer_short); + } + } + } + Ok(()) +} + +/// Handle an incoming connection from a peer who connected to us. +pub async fn handle_incoming(conn: Connection, can: CanSyncClient) { + let peer_id = conn.remote_id(); + let short_id = peer_id.fmt_short().to_string(); + info!("Incoming sync connection from {}", short_id); + + if let Err(e) = run_sync_session(conn, can, false).await { + error!("Sync session with {} failed: {:#}", short_id, e); + } +} + +/// Run the live sync loop: poll for new local assets and push to peer. +/// +/// This runs after initial reconciliation and keeps peers in sync. +pub async fn live_sync_loop( + conn: Connection, + can: CanSyncClient, + poll_interval: std::time::Duration, +) -> Result<()> { + let peer_id = conn.remote_id(); + let short_id = peer_id.fmt_short().to_string(); + info!("Starting live sync loop with {}", short_id); + + // Track what we've already synced + let mut known_hashes: HashSet = { + let resp = can.get_hashes().await?; + resp.assets.into_iter().map(|a| a.hash).collect() + }; + + let mut interval = tokio::time::interval(poll_interval); + + loop { + interval.tick().await; + + // Poll for new assets + let resp = match can.get_hashes().await { + Ok(r) => r, + Err(e) => { + warn!("Failed to poll CAN service: {:#}", e); + continue; + } + }; + + let current_hashes: HashSet = + resp.assets.iter().map(|a| a.hash.clone()).collect(); + let new_hashes: Vec = current_hashes + .difference(&known_hashes) + .cloned() + .collect(); + + if !new_hashes.is_empty() { + info!( + "Detected {} new local assets, pushing to {}", + new_hashes.len(), + short_id + ); + + // Open a new stream for this batch + match conn.open_bi().await { + Ok((mut send, _recv)) => { + if let Err(e) = send_assets(&can, &mut send, &new_hashes, &short_id).await { + error!("Failed to push new assets to {}: {:#}", short_id, e); + } + // Send done marker + let done_frame = encode_frame(MSG_DONE, &[]); + let _ = send.write_all(&done_frame).await; + let _ = send.flush().await; + let _ = send.finish(); + } + Err(e) => { + warn!("Failed to open stream to {}: {:#}", short_id, e); + break; // Connection probably dead + } + } + } + + // Update our known set + known_hashes = current_hashes; + } + + Ok(()) +} diff --git a/examples/can-sync/src/protocol.rs b/examples/can-sync/src/protocol.rs new file mode 100644 index 0000000..d96645f --- /dev/null +++ b/examples/can-sync/src/protocol.rs @@ -0,0 +1,123 @@ +//! Protobuf message types for CAN sync API + peer-to-peer protocol. +//! +//! These match the types in CAN service's routes/sync.rs exactly. + +use prost::Message; + +// ── CAN Sync API messages (protobuf, same as CAN service) ─────────────── + +#[derive(Clone, PartialEq, Message)] +pub struct HashListRequest {} + +#[derive(Clone, PartialEq, Message)] +pub struct HashListResponse { + #[prost(message, repeated, tag = "1")] + pub assets: Vec, +} + +#[derive(Clone, PartialEq, Message)] +pub struct AssetDigest { + #[prost(string, tag = "1")] + pub hash: String, + #[prost(int64, tag = "2")] + pub timestamp: i64, + #[prost(int64, tag = "3")] + pub size: i64, + #[prost(bool, tag = "4")] + pub is_trashed: bool, +} + +#[derive(Clone, PartialEq, Message)] +pub struct PullRequest { + #[prost(string, repeated, tag = "1")] + pub hashes: Vec, +} + +#[derive(Clone, PartialEq, Message)] +pub struct PullResponse { + #[prost(message, repeated, tag = "1")] + pub bundles: Vec, +} + +#[derive(Clone, PartialEq, Message)] +pub struct AssetBundle { + #[prost(string, tag = "1")] + pub hash: String, + #[prost(int64, tag = "2")] + pub timestamp: i64, + #[prost(string, tag = "3")] + pub mime_type: String, + #[prost(string, optional, tag = "4")] + pub application: Option, + #[prost(string, optional, tag = "5")] + pub user_identity: Option, + #[prost(string, optional, tag = "6")] + pub description: Option, + #[prost(string, optional, tag = "7")] + pub human_filename: Option, + #[prost(string, optional, tag = "8")] + pub human_path: Option, + #[prost(bool, tag = "9")] + pub is_trashed: bool, + #[prost(int64, tag = "10")] + pub size: i64, + #[prost(string, repeated, tag = "11")] + pub tags: Vec, + #[prost(bytes = "vec", tag = "12")] + pub content: Vec, +} + +#[derive(Clone, PartialEq, Message)] +pub struct PushRequest { + #[prost(message, optional, tag = "1")] + pub bundle: Option, +} + +#[derive(Clone, PartialEq, Message)] +pub struct PushResponse { + #[prost(string, tag = "1")] + pub hash: String, + #[prost(bool, tag = "2")] + pub already_existed: bool, +} + +#[derive(Clone, PartialEq, Message)] +pub struct MetaUpdateRequest { + #[prost(string, tag = "1")] + pub hash: String, + #[prost(string, optional, tag = "2")] + pub description: Option, + #[prost(string, repeated, tag = "3")] + pub tags: Vec, + #[prost(bool, tag = "4")] + pub is_trashed: bool, +} + +#[derive(Clone, PartialEq, Message)] +pub struct MetaUpdateResponse { + #[prost(bool, tag = "1")] + pub success: bool, +} + +// ── Peer-to-peer messages (sent over QUIC streams between sync agents) ── + +/// Sent between peers during reconciliation: "here are all the hashes I have" +#[derive(Clone, PartialEq, Message)] +pub struct PeerHashSet { + #[prost(message, repeated, tag = "1")] + pub assets: Vec, +} + +/// Request from peer: "please send me these assets" +#[derive(Clone, PartialEq, Message)] +pub struct PeerPullRequest { + #[prost(string, repeated, tag = "1")] + pub hashes: Vec, +} + +/// A single asset being sent from one peer to another +#[derive(Clone, PartialEq, Message)] +pub struct PeerAssetTransfer { + #[prost(message, optional, tag = "1")] + pub bundle: Option, +} diff --git a/examples/can-sync/src/routes.rs b/examples/can-sync/src/routes.rs deleted file mode 100644 index 3175b16..0000000 --- a/examples/can-sync/src/routes.rs +++ /dev/null @@ -1,430 +0,0 @@ -use std::sync::Arc; - -use axum::{ - extract::{Path, State}, - http::StatusCode, - response::IntoResponse, - routing::{get, post}, - Json, Router, -}; -use serde::{Deserialize, Serialize}; - -use crate::can_client::CanClient; -use crate::library::{Library, LibraryFilter, SyncState}; -use crate::node::SyncNode; - -/// Shared application state for route handlers -pub struct AppState { - pub node: Arc, - pub state: Arc, - pub can: CanClient, -} - -// ── Request/Response types ── - -#[derive(Serialize)] -struct StatusResponse { - peer_id: String, - can_service_healthy: bool, - library_count: usize, -} - -#[derive(Serialize)] -struct PeerInfo { - peer_id: String, -} - -#[derive(Deserialize)] -pub struct CreateLibraryRequest { - pub name: String, - pub filter: LibraryFilter, -} - -#[derive(Serialize)] -struct LibraryResponse { - id: String, - name: String, - filter: LibraryFilter, - doc_id: Option, - is_local: bool, - created_at: i64, -} - -#[derive(Serialize)] -struct InviteResponse { - ticket: String, -} - -#[derive(Deserialize)] -pub struct JoinRequest { - pub ticket: String, -} - -#[derive(Serialize)] -struct JoinResponse { - library_id: String, - message: String, -} - -#[derive(Serialize)] -struct ApiResp { - status: String, - data: T, -} - -#[derive(Serialize)] -struct ApiErr { - status: String, - error: String, -} - -fn ok_json(data: T) -> Json> { - Json(ApiResp { - status: "success".to_string(), - data, - }) -} - -fn err_resp(status: StatusCode, msg: &str) -> (StatusCode, Json) { - ( - status, - Json(ApiErr { - status: "error".to_string(), - error: msg.to_string(), - }), - ) -} - -// ── Routes ── - -pub fn build_router(app_state: Arc) -> Router { - Router::new() - .route("/status", get(get_status)) - .route("/peers", get(get_peers)) - .route("/libraries", post(create_library).get(list_libraries)) - .route( - "/libraries/{id}", - get(get_library).delete(delete_library), - ) - .route("/libraries/{id}/invite", post(create_invite)) - .route("/join", post(join_library)) - .with_state(app_state) -} - -// ── Handlers ── - -async fn get_status(State(app): State>) -> impl IntoResponse { - let can_healthy = app.can.health_check().await.unwrap_or(false); - let lib_count = app.state.list_libraries().unwrap_or_default().len(); - - ok_json(StatusResponse { - peer_id: app.node.peer_id(), - can_service_healthy: can_healthy, - library_count: lib_count, - }) - .into_response() -} - -async fn get_peers(State(app): State>) -> impl IntoResponse { - let peers: Vec = vec![PeerInfo { - peer_id: app.node.peer_id(), - }]; - ok_json(peers).into_response() -} - -async fn create_library( - State(app): State>, - Json(req): Json, -) -> impl IntoResponse { - // Create an iroh document for this library - let doc_id = match app.node.create_doc().await { - Ok(id) => Some(id), - Err(e) => { - tracing::warn!("Failed to create iroh document for library: {:#}", e); - None - } - }; - - let lib = Library { - id: uuid::Uuid::new_v4().to_string(), - name: req.name, - filter: req.filter, - doc_id, - is_local: true, - created_at: chrono::Utc::now().timestamp_millis(), - }; - - if let Err(e) = app.state.save_library(&lib) { - return err_resp( - StatusCode::INTERNAL_SERVER_ERROR, - &format!("save failed: {}", e), - ) - .into_response(); - } - - tracing::info!( - "Created library '{}' (id={}, doc_id={:?})", - lib.name, - &lib.id[..8], - lib.doc_id.as_deref().map(|d| &d[..12.min(d.len())]) - ); - - ok_json(LibraryResponse { - id: lib.id, - name: lib.name, - filter: lib.filter, - doc_id: lib.doc_id, - is_local: lib.is_local, - created_at: lib.created_at, - }) - .into_response() -} - -async fn list_libraries(State(app): State>) -> impl IntoResponse { - match app.state.list_libraries() { - Ok(libs) => { - let responses: Vec = libs - .into_iter() - .map(|lib| LibraryResponse { - id: lib.id, - name: lib.name, - filter: lib.filter, - doc_id: lib.doc_id, - is_local: lib.is_local, - created_at: lib.created_at, - }) - .collect(); - ok_json(responses).into_response() - } - Err(e) => { - err_resp(StatusCode::INTERNAL_SERVER_ERROR, &format!("{}", e)).into_response() - } - } -} - -async fn get_library( - State(app): State>, - Path(id): Path, -) -> impl IntoResponse { - match app.state.get_library(&id) { - Ok(Some(lib)) => ok_json(LibraryResponse { - id: lib.id, - name: lib.name, - filter: lib.filter, - doc_id: lib.doc_id, - is_local: lib.is_local, - created_at: lib.created_at, - }) - .into_response(), - Ok(None) => err_resp(StatusCode::NOT_FOUND, "Library not found").into_response(), - Err(e) => { - err_resp(StatusCode::INTERNAL_SERVER_ERROR, &format!("{}", e)).into_response() - } - } -} - -async fn delete_library( - State(app): State>, - Path(id): Path, -) -> impl IntoResponse { - match app.state.delete_library(&id) { - Ok(()) => ok_json("deleted").into_response(), - Err(e) => { - err_resp(StatusCode::INTERNAL_SERVER_ERROR, &format!("{}", e)).into_response() - } - } -} - -async fn create_invite( - State(app): State>, - Path(id): Path, -) -> impl IntoResponse { - match app.state.get_library(&id) { - Ok(Some(lib)) => { - let doc_id = match &lib.doc_id { - Some(d) => d, - None => { - return err_resp( - StatusCode::BAD_REQUEST, - "Library has no iroh document — cannot create invite", - ) - .into_response() - } - }; - - // Generate a real DocTicket via iroh - match app.node.share_doc(doc_id).await { - Ok(ticket) => { - // DocTicket implements Display via iroh's Ticket trait (base32 serialization) - let ticket_str = ticket.to_string(); - - // Wrap with library metadata so the joiner knows the name and filter - let invite_data = serde_json::json!({ - "ticket": ticket_str, - "library_name": lib.name, - "filter": lib.filter, - }); - let invite_b64 = base64_encode( - &serde_json::to_vec(&invite_data).unwrap(), - ); - - ok_json(InviteResponse { ticket: invite_b64 }).into_response() - } - Err(e) => err_resp( - StatusCode::INTERNAL_SERVER_ERROR, - &format!("Failed to create invite: {}", e), - ) - .into_response(), - } - } - Ok(None) => err_resp(StatusCode::NOT_FOUND, "Library not found").into_response(), - Err(e) => { - err_resp(StatusCode::INTERNAL_SERVER_ERROR, &format!("{}", e)).into_response() - } - } -} - -async fn join_library( - State(app): State>, - Json(req): Json, -) -> impl IntoResponse { - // Decode our envelope - let ticket_bytes = match base64_decode(&req.ticket) { - Ok(b) => b, - Err(_) => { - return err_resp(StatusCode::BAD_REQUEST, "Invalid ticket encoding").into_response() - } - }; - - let ticket_data: serde_json::Value = match serde_json::from_slice(&ticket_bytes) { - Ok(v) => v, - Err(_) => { - return err_resp(StatusCode::BAD_REQUEST, "Invalid ticket data").into_response() - } - }; - - // Extract the real DocTicket string - let ticket_str = match ticket_data["ticket"].as_str() { - Some(s) => s, - None => { - return err_resp(StatusCode::BAD_REQUEST, "Missing 'ticket' field in invite") - .into_response() - } - }; - - // Parse DocTicket from the serialized string - let doc_ticket: iroh_docs::DocTicket = match ticket_str.parse() { - Ok(t) => t, - Err(e) => { - return err_resp( - StatusCode::BAD_REQUEST, - &format!("Invalid DocTicket: {}", e), - ) - .into_response() - } - }; - - // Import the document via iroh (starts sync with remote peers) - let doc_id_hex = match app.node.import_doc(doc_ticket).await { - Ok(id) => id, - Err(e) => { - return err_resp( - StatusCode::INTERNAL_SERVER_ERROR, - &format!("Failed to join document: {}", e), - ) - .into_response() - } - }; - - let name = ticket_data["library_name"] - .as_str() - .unwrap_or("remote library") - .to_string(); - - let filter: LibraryFilter = serde_json::from_value(ticket_data["filter"].clone()) - .unwrap_or(LibraryFilter { - application: None, - tags: None, - user: None, - mime_prefix: None, - hashes: None, - }); - - let lib = Library { - id: uuid::Uuid::new_v4().to_string(), - name: name.clone(), - filter, - doc_id: Some(doc_id_hex), - is_local: false, - created_at: chrono::Utc::now().timestamp_millis(), - }; - - if let Err(e) = app.state.save_library(&lib) { - return err_resp( - StatusCode::INTERNAL_SERVER_ERROR, - &format!("save failed: {}", e), - ) - .into_response(); - } - - tracing::info!( - "Joined library '{}' (id={}, doc_id={:?})", - name, - &lib.id[..8], - lib.doc_id.as_deref().map(|d| &d[..12.min(d.len())]) - ); - - ok_json(JoinResponse { - library_id: lib.id, - message: "Joined library successfully".to_string(), - }) - .into_response() -} - -// ── Base64 helpers ── - -fn base64_encode(data: &[u8]) -> String { - const CHARS: &[u8] = b"ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789+/"; - let mut result = Vec::new(); - for chunk in data.chunks(3) { - let b0 = chunk[0] as u32; - let b1 = if chunk.len() > 1 { chunk[1] as u32 } else { 0 }; - let b2 = if chunk.len() > 2 { chunk[2] as u32 } else { 0 }; - let triple = (b0 << 16) | (b1 << 8) | b2; - result.push(CHARS[((triple >> 18) & 0x3F) as usize]); - result.push(CHARS[((triple >> 12) & 0x3F) as usize]); - if chunk.len() > 1 { - result.push(CHARS[((triple >> 6) & 0x3F) as usize]); - } else { - result.push(b'='); - } - if chunk.len() > 2 { - result.push(CHARS[(triple & 0x3F) as usize]); - } else { - result.push(b'='); - } - } - String::from_utf8(result).unwrap() -} - -fn base64_decode(input: &str) -> Result, &'static str> { - const CHARS: &[u8] = b"ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789+/"; - let input = input.trim_end_matches('='); - let bytes: Vec = input - .bytes() - .filter_map(|b| CHARS.iter().position(|&c| c == b).map(|p| p as u8)) - .collect(); - let mut buf = Vec::new(); - for chunk in bytes.chunks(4) { - if chunk.len() >= 2 { - buf.push((chunk[0] << 2) | (chunk[1] >> 4)); - } - if chunk.len() >= 3 { - buf.push((chunk[1] << 4) | (chunk[2] >> 2)); - } - if chunk.len() >= 4 { - buf.push((chunk[2] << 6) | chunk[3]); - } - } - Ok(buf) -}