Compare commits

...

3 Commits

Author SHA1 Message Date
689d14202b Add README for main project and each example
Main README covers quick start, API overview, and links to example READMEs.
Each example (paste, filemanager, can-sync, canfs) gets its own README
with setup instructions, architecture, and configuration details.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-03-17 14:45:20 -06:00
e7def4b819 Merge branch 'sync-v2' into master
Adds P2P sync (protobuf API, iroh QUIC transport, gossip + pkarr
discovery), SSE live refresh, plain-English code comments across
all source files and examples.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-03-17 14:36:29 -06:00
620966872e Add plain-English comments to all functions across src/ and examples/
Comments help non-Rust users understand what each function, struct, and
module does. Covers the core service (18 source files) and all four
example projects (can-sync, canfs, filemanager, paste).

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-03-17 14:35:24 -06:00
33 changed files with 660 additions and 482 deletions

162
README.md Normal file
View File

@ -0,0 +1,162 @@
# CAN Service
**Containerized Asset Network** -- A self-healing local storage daemon with HTTP REST and Protobuf APIs for ingesting, managing, and retrieving files.
CAN stores any file you throw at it, tags it with metadata, verifies integrity in the background, and syncs between machines over encrypted P2P connections. Think of it as a personal S3 that runs on your laptop and replicates to your other devices automatically.
---
## Quick Start
```bash
# Build and run (listens on port 3210)
cargo run
```
The service reads `config.yaml` from the current directory:
```yaml
storage_root: "./can_data"
admin_token: "super_secret_rebuild"
enable_thumbnail_cache: true
verify_interval_hours: 12
sync_api_key: "can-sync-default-key" # enables P2P sync endpoints
```
Override the port with `CAN_PORT=8080 cargo run`.
### Upload a file
```bash
curl -X POST http://localhost:3210/api/v1/can/0/ingest \
-F "file=@photo.jpg" \
-F "tags=vacation,summer" \
-F "application=my-app"
```
### Upload JSON data (agent-friendly)
```bash
curl -X POST http://localhost:3210/api/v1/can/0/ingest/data \
-H "Content-Type: application/json" \
-d '{"data": {"key": "value"}, "tags": "config,backup"}'
```
### Download a file
```bash
curl http://localhost:3210/api/v1/can/0/asset/{hash} -o file.jpg
```
---
## How It Works
```
+-----------+
Upload ---->| |----> SQLite index (millisecond queries)
| CAN |
Download <---| Service |----> Flat file storage (one file per asset)
| |
Search ---->| port 3210 |----> OS file attributes (disaster recovery)
| |
SSE <----| |----> Background verifier (integrity checks)
+-----------+
|
P2P Sync (protobuf over QUIC)
|
+-----------+
| CAN |
| Service | (another machine)
| port 3210 |
+-----------+
```
Each asset is saved as `{timestamp}_{sha256hash}_{tags}.{ext}` in a flat directory. Metadata lives in SQLite for fast queries and is redundantly written to OS-level file attributes (xattr on macOS/Linux, NTFS ADS on Windows) so you can recover even if the database is lost.
A background verifier re-hashes every file periodically and flags corruption. It also watches for filesystem changes in real time.
---
## API
All endpoints live under `/api/v1/can/0/`. See [API.md](API.md) for the full specification.
| Method | Path | Description |
|--------|------|-------------|
| `POST` | `/ingest` | Upload a file (multipart form) |
| `POST` | `/ingest/data` | Upload JSON data (no multipart needed) |
| `GET` | `/asset/{hash}` | Download an asset by its SHA-256 hash |
| `GET` | `/asset/{hash}/meta` | Get metadata as JSON |
| `PATCH` | `/asset/{hash}` | Update tags and/or description |
| `GET` | `/asset/{hash}/thumb/{w}/{h}` | Get a resized thumbnail (images only) |
| `GET` | `/list` | Paginated listing with filters |
| `GET` | `/search` | Search by hash prefix, time range, MIME, tags, etc. |
| `GET` | `/events` | SSE stream of new asset notifications |
Private sync endpoints (`/sync/*`) use protobuf and require the `X-Sync-Key` header.
---
## Examples
Four example apps show what you can build on top of CAN:
| Example | Port | Description | README |
|---------|------|-------------|--------|
| **[Paste](examples/paste/)** | 3211 | Pastebin -- type text or paste images, auto-tags with #hashtags | [README](examples/paste/README.md) |
| **[File Manager](examples/filemanager/)** | 3212 | Web file browser with grid/list views, search, and filters | [README](examples/filemanager/README.md) |
| **[CAN Sync](examples/can-sync/)** | -- | P2P replication agent -- encrypted sync via shared passphrase | [README](examples/can-sync/README.md) |
| **[CanFS](examples/canfs/)** | -- | Mount assets as a read-only Windows drive (WinFSP) | [README](examples/canfs/README.md) |
### Run everything at once
```powershell
# Windows
.\go_example_1.ps1
# macOS / Linux
./go_example_1.sh
```
Starts CAN Service + Sync Agent + Paste, builds everything, cleans up on Ctrl+C.
---
## Configuration
| Field | Default | Description |
|-------|---------|-------------|
| `storage_root` | (required) | Directory where assets and the database are stored |
| `admin_token` | `"changeme"` | Bearer token for admin endpoints |
| `enable_thumbnail_cache` | `true` | Cache resized thumbnails in `.thumbs/` |
| `rebuild_error_threshold` | `50` | Max errors before triggering a full rebuild |
| `verify_interval_hours` | `12` | Hours between full integrity scans |
| `sync_api_key` | (none) | API key for sync endpoints; omit to disable sync |
---
## Project Structure
```
src/
main.rs Entry point: config, DB, verifier, HTTP server
config.rs YAML config loading
db.rs SQLite CRUD (assets, tags, search)
hash.rs SHA-256 content hashing
storage.rs File I/O (write, read, trash, filename parsing)
verifier.rs Background integrity checker + file watcher
xattr.rs OS-level file attributes (xattr / NTFS ADS)
routes/ HTTP API handlers (ingest, asset, list, search, thumb, sync, events)
examples/
paste/ Pastebin web app
filemanager/ File browser web app
can-sync/ P2P sync agent (iroh + gossip + pkarr)
canfs/ Windows virtual filesystem (WinFSP)
```
## Requirements
- **Rust** 1.75+
- **SQLite** bundled (no system install needed)
- **WinFSP** only for the canfs example (Windows only)

View File

@ -1,263 +1,91 @@
# CAN Sync # CAN Sync
P2P file synchronization service that runs on top of [CAN Service](../../). Uses [iroh](https://iroh.computer/) for encrypted peer-to-peer networking with NAT traversal. P2P full-mirror replication for [CAN Service](../../). Two machines with the same passphrase automatically discover each other and sync all assets over encrypted connections. No port forwarding or static IPs needed.
``` ```
┌─────────────┐ HTTP API ┌─────────────┐ iroh (QUIC) ┌─────────────┐ ┌─────────────┐ protobuf ┌─────────────┐ iroh (QUIC) ┌─────────────┐ protobuf ┌─────────────┐
│ CAN Service │◄───────────►│ CAN Sync │◄─────────────►│ CAN Sync │ │ CAN Service │◄───────────►│ CAN Sync │◄─────────────►│ CAN Sync │◄───────────►│ CAN Service │
│ (port 3210)│ │ (port 3213)│ │ (remote) │ │ Machine A │ sync API │ Agent A │ encrypted │ Agent B │ sync API │ Machine B │
│ storage + │ │ P2P node + │ │ │ │ port 3210 │ │ │ │ │ │ port 3210 │
│ SQLite │ │ libraries │ │ │ └─────────────┘ └─────────────┘ └─────────────┘ └─────────────┘
└─────────────┘ └─────────────┘ └─────────────┘
``` ```
CAN Sync communicates with CAN Service **only** via its public HTTP API — zero changes to CAN Service required.
## Quick Start ## Quick Start
1. **Start CAN Service** (default port 3210): 1. **Start CAN Service** on each machine (port 3210):
```bash
cd ../..
cargo run
```
2. **Edit config** (optional — defaults work out of the box):
```bash
cp config.yaml my-config.yaml
# edit my-config.yaml if needed
```
3. **Start CAN Sync**:
```bash ```bash
cargo run cargo run
# or with a custom config:
cargo run -- my-config.yaml
``` ```
CAN Sync starts on `http://127.0.0.1:3213` and connects to CAN Service at `http://127.0.0.1:3210/api/v1/can/0`. 2. **Configure the sync agent** -- edit `config.yaml`:
```yaml
can_service_url: "http://127.0.0.1:3210"
sync_api_key: "can-sync-default-key"
sync_passphrase: "my-secret-phrase" # must be the same on all machines
poll_interval_secs: 30
```
3. **Start the sync agent** on each machine:
```bash
cd examples/can-sync
cargo run -- config.yaml
```
That's it. Any file uploaded to either CAN Service will appear on the other within seconds.
## How It Works
### Peer Discovery
Peers find each other through two mechanisms (both run simultaneously):
- **Gossip** -- [iroh-gossip](https://docs.rs/iroh-gossip) uses a topic derived from the shared passphrase. Peers on the same local network or connected to the same relay discover each other by broadcasting their node IDs.
- **Internet rendezvous** -- Each agent publishes its node ID to [pkarr](https://pkarr.org) relay servers using deterministic DNS-like "slots" derived from the passphrase. All agents scan these slots periodically to find peers worldwide.
### Sync Protocol
Once two peers connect over iroh's encrypted QUIC transport:
1. **Hash exchange** -- Both sides send their full list of asset hashes
2. **Diff** -- Each side computes what the other is missing
3. **Transfer** -- Missing assets are sent concurrently in both directions (metadata + file content bundled together as protobuf)
4. **Live sync** -- After the initial reconciliation, each agent subscribes to SSE events from its local CAN Service. When a new asset is ingested locally, it's pushed to the connected peer instantly.
The live sync uses SSE events (not polling) for instant propagation. A fallback incremental poll runs every 30 seconds as a safety net.
### Echo Prevention
When peer A sends an asset to peer B, B's CAN Service emits an SSE event for the new ingest. Without protection, B would try to push that asset right back to A. The sync agent tracks which hashes were received from each peer and filters them out of the push loop.
## Configuration ## Configuration
`config.yaml`: | Field | Default | Description |
|-------|---------|-------------|
| `can_service_url` | (required) | URL of the local CAN Service |
| `sync_api_key` | (required) | Must match `sync_api_key` in CAN Service's config |
| `sync_passphrase` | (required) | Shared secret for peer discovery (all peers must match) |
| `poll_interval_secs` | `3` | Fallback poll interval for catching missed events |
| `ticket_file` | (none) | Write this node's address to a file (for direct connection in tests) |
| `connect_ticket_file` | (none) | Read a peer's address from a file (for direct connection in tests) |
```yaml CAN Service must have `sync_api_key` set in its `config.yaml` for the sync endpoints to be enabled.
# URL of the local CAN Service API
can_service_url: "http://127.0.0.1:3210/api/v1/can/0"
# Address for the CAN Sync HTTP API
listen_addr: "127.0.0.1:3213"
# Directory for persistent data (peer key, sync state DB)
data_dir: "./can_sync_data"
# Custom relay server URL (null = iroh's public relay)
relay_url: null
# Seconds between fast polls for new assets
poll_interval_secs: 5
# Seconds between full scans of all assets
full_scan_interval_secs: 300
```
## Concepts
### Libraries
A **library** is a shared collection of CAN assets that syncs between peers. Each library has a **filter** that determines which assets belong to it.
Filter options (combined with AND logic):
- `application` — match assets with this application tag (e.g. `"paste"`)
- `tags` — match assets with any of these tags (e.g. `["photos", "backup"]`)
- `user` — match assets from this user identity
- `mime_prefix` — match assets whose MIME type starts with this (e.g. `"image/"`)
- `hashes` — manual list of specific asset hashes to include
### Sync Flow
**Outbound** (local → remote):
1. Announcer polls CAN Service for new/changed assets
2. Assets matching a library's filter get announced to the library's iroh document
3. iroh replicates the entry to all subscribed peers
4. Remote peer's fetcher downloads the blob and ingests it into their local CAN Service
**Inbound** (remote → local):
1. iroh document receives new entry from remote peer
2. Fetcher downloads the blob via iroh's encrypted QUIC transport
3. Fetcher verifies the CAN hash (SHA-256) independently
4. Fetcher ingests the file into local CAN Service with all metadata preserved
## API
All endpoints return JSON with `{ "status": "success", "data": ... }` or `{ "status": "error", "error": "..." }`.
### Status & Peers
| Method | Endpoint | Description |
|--------|----------|-------------|
| GET | `/status` | Node status, CAN service health, library count |
| GET | `/peers` | Connected peers list |
### Libraries
| Method | Endpoint | Description |
|--------|----------|-------------|
| POST | `/libraries` | Create a library |
| GET | `/libraries` | List all libraries |
| GET | `/libraries/{id}` | Get library details |
| DELETE | `/libraries/{id}` | Remove a library |
### Sharing
| Method | Endpoint | Description |
|--------|----------|-------------|
| POST | `/libraries/{id}/invite` | Generate a share ticket |
| POST | `/join` | Join a library from a ticket |
### Examples
**Create a library** that syncs all assets with `application=paste`:
```bash
curl -X POST http://127.0.0.1:3213/libraries \
-H "Content-Type: application/json" \
-d '{"name": "my-pastes", "filter": {"application": "paste"}}'
```
**Create a library** that syncs all images:
```bash
curl -X POST http://127.0.0.1:3213/libraries \
-H "Content-Type: application/json" \
-d '{"name": "images", "filter": {"mime_prefix": "image/"}}'
```
**Generate an invite ticket** to share with another machine:
```bash
curl -X POST http://127.0.0.1:3213/libraries/{id}/invite
```
**Join a library** on another machine using the ticket:
```bash
curl -X POST http://127.0.0.1:3213/join \
-H "Content-Type: application/json" \
-d '{"ticket": "eyJsaWJyYXJ5X25hbWUiOi..."}'
```
**List all libraries**:
```bash
curl http://127.0.0.1:3213/libraries
```
**Check status**:
```bash
curl http://127.0.0.1:3213/status
```
## Two-Machine Setup
### Machine A (the host)
**1. Start CAN Service** (default port 3210):
```bash
cd /path/to/CanService
cargo run
```
**2. Start CAN Sync** with default config (port 3213):
```bash
cd examples/can-sync
cargo run
```
**3. Create a library** (e.g. sync all images):
```bash
curl -X POST http://127.0.0.1:3213/libraries \
-H "Content-Type: application/json" \
-d '{"name": "shared-images", "filter": {"mime_prefix": "image/"}}'
```
Save the `id` from the response (e.g. `"id": "a1b2c3d4-..."`).
**4. Generate an invite ticket:**
```bash
curl -X POST http://127.0.0.1:3213/libraries/a1b2c3d4-.../invite
```
Copy the `ticket` string from the response — this is what Machine B needs.
### Machine B (the joiner)
**1. Start CAN Service** on a different port:
```bash
cd /path/to/CanService
CAN_PORT=3220 cargo run
```
**2. Create a config file** for CAN Sync pointing at Machine B's CAN Service:
```yaml
# machine-b-config.yaml
can_service_url: "http://127.0.0.1:3220/api/v1/can/0"
listen_addr: "127.0.0.1:3223"
data_dir: "./can_sync_data_b"
```
**3. Start CAN Sync** with that config:
```bash
cd examples/can-sync
cargo run -- machine-b-config.yaml
```
**4. Join the library** using Machine A's ticket:
```bash
curl -X POST http://127.0.0.1:3223/join \
-H "Content-Type: application/json" \
-d '{"ticket": "eyJsaWJyYXJ5X25hbWUiOi..."}'
```
### Verify it works
**Ingest a file on Machine A:**
```bash
curl -X POST http://127.0.0.1:3210/api/v1/can/0/ingest \
-F "file=@photo.jpg" \
-F "mime_type=image/jpeg"
```
**Check Machine B** — the file should appear within a few seconds:
```bash
curl http://127.0.0.1:3220/api/v1/can/0/list?limit=5
```
The same image (with matching hash and metadata) will be in Machine B's CAN Service, synced over iroh's encrypted P2P connection.
## Architecture
```
src/
├── main.rs — entry point: config, iroh node, announcer, fetcher, HTTP server
├── config.rs — YAML config loading
├── can_client.rs — HTTP client for CAN Service API (list, search, ingest, meta, etc.)
├── node.rs — iroh endpoint + blobs + docs + gossip + router
├── library.rs — library/filter definitions + SQLite state tracking
├── manifest.rs — AssetSyncEntry serialized into iroh document entries
├── announcer.rs — polls CAN Service, announces matching assets to libraries
├── fetcher.rs — receives remote entries, downloads blobs, ingests into CAN Service
└── routes.rs — Axum HTTP API handlers
```
## Security ## Security
- **Transport**: All peer-to-peer traffic is encrypted with QUIC + TLS 1.3 (mandatory in iroh) - **Transport** -- All peer traffic is encrypted with QUIC + TLS 1.3 (mandatory in iroh)
- **Identity**: Each node has an Ed25519 keypair generated on first run - **Identity** -- Each node gets an Ed25519 keypair on first run
- **Access control**: Library access via cryptographic capability tickets — only peers with a valid ticket can read/write - **Discovery** -- Only peers with the same passphrase can find each other
- **NAT traversal**: iroh's built-in relay servers and hole-punching - **Hash verification** -- Every received asset is re-hashed and compared before being stored
- **Hash verification**: Downloaded files are independently verified against CAN's SHA-256 hash before ingestion
## Current Status ## Project Structure
The service compiles and runs with the following fully implemented: ```
- iroh P2P node startup with all protocol handlers (blobs, docs, gossip) src/
- CAN Service HTTP client with full API coverage main.rs Entry point: config, iroh endpoint, discovery, peer connections
- Library management with SQLite persistence config.rs YAML config loading
- Announcer polling loop (fast + full scan) with real iroh-docs writes can_client.rs HTTP client for CAN Service's sync API (protobuf + SSE)
- Fetcher with iroh document event subscription for real-time sync protocol.rs Protobuf message types (shared with CAN Service)
- Fetcher blob download via iroh and CAN hash verification before ingestion discovery.rs Peer discovery via iroh-gossip
- Real DocTicket-based invite/join with cryptographic capability tokens rendezvous.rs Internet peer discovery via pkarr relay
- HTTP API for library CRUD, invite, and join peer.rs Per-peer sync: reconciliation, live push/receive, echo prevention
```

View File

@ -27,6 +27,7 @@ pub struct CanSyncClient {
} }
impl CanSyncClient { impl CanSyncClient {
/// Create a new client pointed at the given CAN service URL, authenticated with the sync API key.
pub fn new(base_url: &str, sync_key: &str) -> Self { pub fn new(base_url: &str, sync_key: &str) -> Self {
Self { Self {
http: reqwest::Client::new(), http: reqwest::Client::new(),
@ -182,7 +183,8 @@ impl CanSyncClient {
rx rx
} }
/// Internal: connect to SSE and forward events until the stream ends or errors. // Connect to the SSE endpoint and forward parsed events to the channel
// until the stream ends or an error occurs.
async fn run_sse_stream( async fn run_sse_stream(
http: &reqwest::Client, http: &reqwest::Client,
url: &str, url: &str,

View File

@ -1,6 +1,7 @@
use serde::Deserialize; use serde::Deserialize;
use std::path::Path; use std::path::Path;
/// All settings needed to run the sync agent, loaded from a YAML file.
#[derive(Debug, Clone, Deserialize)] #[derive(Debug, Clone, Deserialize)]
pub struct SyncConfig { pub struct SyncConfig {
/// Base URL of the local CAN service (e.g. "http://127.0.0.1:3210") /// Base URL of the local CAN service (e.g. "http://127.0.0.1:3210")
@ -32,6 +33,7 @@ fn default_poll_interval() -> u64 {
} }
impl SyncConfig { impl SyncConfig {
/// Read a YAML config file from disk and parse it into a SyncConfig.
pub fn load(path: &Path) -> anyhow::Result<Self> { pub fn load(path: &Path) -> anyhow::Result<Self> {
let contents = std::fs::read_to_string(path)?; let contents = std::fs::read_to_string(path)?;
let config: Self = serde_yaml::from_str(&contents)?; let config: Self = serde_yaml::from_str(&contents)?;

View File

@ -27,6 +27,7 @@ pub struct Discovery {
} }
impl Discovery { impl Discovery {
/// Create a new Discovery that listens on a gossip topic derived from the shared passphrase.
pub fn new(endpoint: Endpoint, gossip: Gossip, passphrase: &str) -> Self { pub fn new(endpoint: Endpoint, gossip: Gossip, passphrase: &str) -> Self {
let topic = derive_topic(passphrase); let topic = derive_topic(passphrase);
info!("Gossip topic: {}", hex::encode(topic.as_bytes())); info!("Gossip topic: {}", hex::encode(topic.as_bytes()));

View File

@ -30,6 +30,8 @@ use crate::rendezvous::Rendezvous;
/// ALPN protocol identifier for CAN sync peer connections. /// ALPN protocol identifier for CAN sync peer connections.
const SYNC_ALPN: &[u8] = b"can-sync/1"; const SYNC_ALPN: &[u8] = b"can-sync/1";
/// Entry point: loads config, connects to the local CAN service, sets up
/// encrypted P2P networking (iroh), and discovers + syncs with peers.
#[tokio::main] #[tokio::main]
async fn main() -> Result<()> { async fn main() -> Result<()> {
// Initialize logging // Initialize logging

View File

@ -40,6 +40,7 @@ pub struct Rendezvous {
} }
impl Rendezvous { impl Rendezvous {
/// Create a new Rendezvous by deriving keypairs for all slots from the passphrase.
pub fn new(passphrase: &str, our_id: EndpointId) -> Result<Self> { pub fn new(passphrase: &str, our_id: EndpointId) -> Result<Self> {
let slots: Vec<Keypair> = (0..NUM_SLOTS) let slots: Vec<Keypair> = (0..NUM_SLOTS)
.map(|i| derive_slot_keypair(passphrase, i)) .map(|i| derive_slot_keypair(passphrase, i))
@ -91,6 +92,7 @@ impl Rendezvous {
} }
} }
// Read every slot and report any newly discovered peer IDs.
async fn scan_all_slots( async fn scan_all_slots(
&self, &self,
known_peers: &mut HashSet<EndpointId>, known_peers: &mut HashSet<EndpointId>,
@ -111,6 +113,8 @@ impl Rendezvous {
} }
} }
// Pick an available slot for this peer: reuse our old slot, take an empty one,
// or fall back to a deterministic slot based on our ID.
async fn claim_slot(&self, our_id_hex: &str) -> usize { async fn claim_slot(&self, our_id_hex: &str) -> usize {
// Check if we already own a slot (from a previous run) // Check if we already own a slot (from a previous run)
for i in 0..NUM_SLOTS { for i in 0..NUM_SLOTS {
@ -143,6 +147,7 @@ impl Rendezvous {
slot slot
} }
// Write our EndpointId into the given slot's DNS TXT record via the pkarr relay.
async fn publish_slot(&self, slot: usize, our_id_hex: &str) -> Result<()> { async fn publish_slot(&self, slot: usize, our_id_hex: &str) -> Result<()> {
let keypair = &self.slots[slot]; let keypair = &self.slots[slot];
let packet = SignedPacket::builder() let packet = SignedPacket::builder()
@ -163,6 +168,7 @@ impl Rendezvous {
Ok(()) Ok(())
} }
// Look up a slot's DNS TXT record and parse the EndpointId stored there, if any.
async fn read_slot(&self, slot: usize) -> Option<EndpointId> { async fn read_slot(&self, slot: usize) -> Option<EndpointId> {
let public_key = self.slots[slot].public_key(); let public_key = self.slots[slot].public_key();
let packet = self.client.resolve(&public_key).await?; let packet = self.client.resolve(&public_key).await?;

90
examples/canfs/README.md Normal file
View File

@ -0,0 +1,90 @@
# CanFS
Mount [CAN Service](../../) assets as a read-only Windows drive using [WinFSP](https://winfsp.dev). Browse your assets in Windows Explorer like regular files.
## Features
- **Drive letter mount** -- assets appear as files under a drive like `X:\`
- **Virtual folder structure** -- files organized into `CAN\`, `APPLICATION\`, `DATES\`, and `TAGS\` directories
- **Lazy file loading** -- file content is fetched from CAN Service only when you actually open/read a file
- **Background refresh** -- the file tree updates periodically to pick up new assets
## Requirements
- **Windows** (this example uses WinFSP, which is Windows-only)
- **[WinFSP](https://winfsp.dev/rel/)** must be installed (the filesystem driver)
## Running
Make sure CAN Service is running on port 3210 first:
```bash
# From the repo root
cargo run
```
Then mount the filesystem:
```bash
cd examples/canfs
cargo run
```
By default, it mounts on `X:`. Customize with flags:
```bash
cargo run -- --mount Z: --can-url http://127.0.0.1:3210/api/v1/can/0 --refresh-secs 30
```
Press Ctrl+C to unmount.
## Folder Structure
When mounted, the drive shows these virtual directories:
```
X:\
CAN\ All assets by timestamp and hash
1710000000000_abc123.pdf
1710000005000_def456.jpg
APPLICATION\ Grouped by the "application" field
paste\
readme.txt
my-app\
report.pdf
DATES\ Grouped by year and month
2025\
01\
photo.jpg
03\
report.pdf
TAGS\ One folder per tag
vacation\
photo.jpg
work\
report.pdf
```
Files with a `human_filename` show their friendly name in `APPLICATION/`, `DATES/`, and `TAGS/` folders. The `CAN/` folder always shows the raw `{timestamp}_{hash}.{ext}` format.
## CLI Options
| Flag | Default | Description |
|------|---------|-------------|
| `-m, --mount` | `X:` | Drive letter or directory to mount on |
| `--can-url` | `http://127.0.0.1:3210/api/v1/can/0` | CAN Service API base URL |
| `--refresh-secs` | `60` | Seconds between cache refreshes |
## Project Structure
```
src/
main.rs Entry point: CLI args, WinFSP host setup, background refresh
api.rs Blocking HTTP client for CAN Service (list, fetch)
fs.rs WinFSP filesystem implementation (open, read, readdir, etc.)
tree.rs Virtual directory tree builder (turns flat asset list into folders)
util.rs Helpers: MIME-to-extension, timestamp conversion, path sanitization
```

View File

@ -49,6 +49,7 @@ pub struct CanClient {
} }
impl CanClient { impl CanClient {
/// Create a new client pointed at the given CAN service base URL.
pub fn new(base_url: &str) -> Self { pub fn new(base_url: &str) -> Self {
Self { Self {
client: reqwest::blocking::Client::new(), client: reqwest::blocking::Client::new(),

View File

@ -26,6 +26,7 @@ const FILE_ATTRIBUTE_DIRECTORY: u32 = 0x10;
const FILE_ATTRIBUTE_READONLY: u32 = 0x01; const FILE_ATTRIBUTE_READONLY: u32 = 0x01;
const FILE_ATTRIBUTE_ARCHIVE: u32 = 0x20; const FILE_ATTRIBUTE_ARCHIVE: u32 = 0x20;
// Wrap a raw NTSTATUS error code into WinFSP's error type.
fn ntstatus(code: i32) -> FspError { fn ntstatus(code: i32) -> FspError {
FspError::NTSTATUS(code) FspError::NTSTATUS(code)
} }
@ -54,6 +55,7 @@ pub struct CanFileContext {
impl FileSystemContext for CanFs { impl FileSystemContext for CanFs {
type FileContext = CanFileContext; type FileContext = CanFileContext;
/// Called by Windows to check if a file/folder exists and get its basic attributes before opening it.
fn get_security_by_name( fn get_security_by_name(
&self, &self,
file_name: &U16CStr, file_name: &U16CStr,
@ -83,6 +85,7 @@ impl FileSystemContext for CanFs {
}) })
} }
/// Called when a file or directory is opened; returns a context handle and fills in size/timestamps.
fn open( fn open(
&self, &self,
file_name: &U16CStr, file_name: &U16CStr,
@ -142,8 +145,10 @@ impl FileSystemContext for CanFs {
}) })
} }
/// Called when a handle is closed; nothing to clean up since content is dropped automatically.
fn close(&self, _context: Self::FileContext) {} fn close(&self, _context: Self::FileContext) {}
/// Returns up-to-date size and attribute info for an already-opened file or directory.
fn get_file_info( fn get_file_info(
&self, &self,
context: &Self::FileContext, context: &Self::FileContext,
@ -194,6 +199,7 @@ impl FileSystemContext for CanFs {
Ok(()) Ok(())
} }
/// Reads file bytes at the given offset; downloads the asset from the CAN service on first access.
fn read( fn read(
&self, &self,
context: &Self::FileContext, context: &Self::FileContext,
@ -233,6 +239,7 @@ impl FileSystemContext for CanFs {
Ok(count as u32) Ok(count as u32)
} }
/// Lists the contents of a directory, including "." and ".." entries, for Windows Explorer and dir commands.
fn read_directory( fn read_directory(
&self, &self,
context: &Self::FileContext, context: &Self::FileContext,
@ -308,6 +315,7 @@ impl FileSystemContext for CanFs {
Ok(context.dir_buffer.read(marker, buffer)) Ok(context.dir_buffer.read(marker, buffer))
} }
/// Reports the virtual drive's total and free space (shows as a 1 GB read-only volume).
fn get_volume_info(&self, out_volume_info: &mut VolumeInfo) -> winfsp::Result<()> { fn get_volume_info(&self, out_volume_info: &mut VolumeInfo) -> winfsp::Result<()> {
out_volume_info.total_size = 1024 * 1024 * 1024; // 1 GB out_volume_info.total_size = 1024 * 1024 * 1024; // 1 GB
out_volume_info.free_size = 0; out_volume_info.free_size = 0;

View File

@ -17,6 +17,7 @@ use crate::api::CanClient;
use crate::fs::{CacheState, CanFs}; use crate::fs::{CacheState, CanFs};
use crate::tree::VirtualTree; use crate::tree::VirtualTree;
/// Command-line arguments for mounting CAN service assets as a virtual Windows drive using WinFSP.
#[derive(Parser)] #[derive(Parser)]
#[command(name = "canfs", about = "Mount CAN service assets as a virtual drive")] #[command(name = "canfs", about = "Mount CAN service assets as a virtual drive")]
struct Args { struct Args {
@ -33,6 +34,7 @@ struct Args {
refresh_secs: u64, refresh_secs: u64,
} }
/// Entry point: connects to the CAN service, builds a virtual file tree, and mounts it as a read-only Windows drive.
fn main() { fn main() {
tracing_subscriber::fmt() tracing_subscriber::fmt()
.with_env_filter( .with_env_filter(

View File

@ -131,6 +131,7 @@ struct TreeBuilder {
} }
impl TreeBuilder { impl TreeBuilder {
// Create a new tree builder with an empty root directory node.
fn new() -> Self { fn new() -> Self {
let root = VNode { let root = VNode {
name: String::new(), name: String::new(),

View File

@ -0,0 +1,52 @@
# File Manager
A web-based file browser for [CAN Service](../../) assets. Grid and list views, search, filters, and a detail modal with previews.
## Features
- **Grid and list views** -- toggle between thumbnail cards and a compact file list
- **Virtual folder tree** -- assets organized into `CAN/`, `APPLICATION/`, `DATES/`, `TAGS/`, and `TYPE/` folders
- **Search** -- filter by filename, description, or hash prefix
- **Filters** -- narrow by application, MIME type, tag, or date range
- **Detail modal** -- click any file to see full metadata, preview images, and download
## Running
Make sure CAN Service is running on port 3210 first:
```bash
# From the repo root
cargo run
```
Then start the File Manager:
```bash
cd examples/filemanager
cargo run
```
Opens automatically at [http://127.0.0.1:3212](http://127.0.0.1:3212).
## How It Works
The Rust backend serves a single-page app and proxies all data requests to CAN Service:
| File Manager Route | Proxies To | Purpose |
|--------------------|-----------|---------|
| `GET /` | -- | Serve the HTML/JS/CSS frontend |
| `GET /fm/list` | `GET /api/v1/can/0/list` | Paginated asset listing |
| `GET /fm/search` | `GET /api/v1/can/0/search` | Search with filters |
| `GET /fm/asset/{hash}` | `GET /api/v1/can/0/asset/{hash}` | Download/preview a file |
| `GET /fm/asset/{hash}/meta` | `GET /api/v1/can/0/asset/{hash}/meta` | Asset metadata |
| `GET /fm/thumb/{hash}` | `GET /api/v1/can/0/asset/{hash}/thumb/200/200` | Thumbnail |
The virtual folder tree is built entirely in the browser from the flat asset list -- no folder structure exists on disk.
## Project Structure
```
src/
main.rs HTTP server: proxy handlers and query forwarding
html.rs Single-page frontend (HTML + CSS + JS, embedded as a string)
```

View File

@ -9,11 +9,13 @@ use std::collections::HashMap;
const CAN_API: &str = "http://127.0.0.1:3210/api/v1/can/0"; const CAN_API: &str = "http://127.0.0.1:3210/api/v1/can/0";
// Shared state passed to every request handler; holds a reusable HTTP client.
#[derive(Clone)] #[derive(Clone)]
struct AppState { struct AppState {
client: reqwest::Client, client: reqwest::Client,
} }
/// Web-based file manager UI that proxies requests to the CAN service API.
#[tokio::main] #[tokio::main]
async fn main() { async fn main() {
tracing_subscriber::fmt() tracing_subscriber::fmt()
@ -47,6 +49,7 @@ async fn main() {
axum::serve(listener, app).await.unwrap(); axum::serve(listener, app).await.unwrap();
} }
// Return the single-page HTML UI for the file manager.
async fn serve_index() -> Html<&'static str> { async fn serve_index() -> Html<&'static str> {
Html(html::INDEX_HTML) Html(html::INDEX_HTML)
} }
@ -86,6 +89,7 @@ fn build_qs(params: &HashMap<String, String>) -> String {
format!("?{}", qs.join("&")) format!("?{}", qs.join("&"))
} }
// Percent-encode a string for use in URL query parameters.
fn urlencoding(s: &str) -> String { fn urlencoding(s: &str) -> String {
s.chars() s.chars()
.map(|c| match c { .map(|c| match c {

51
examples/paste/README.md Normal file
View File

@ -0,0 +1,51 @@
# Paste
A minimal pastebin web app built on [CAN Service](../../). Type text and press Enter, or paste an image from your clipboard. Everything gets stored as a CAN asset.
## Features
- **Text paste** -- type and hit Enter to store a text snippet
- **Image paste** -- Ctrl+V an image from your clipboard, or click the paperclip to attach a file
- **Auto-tagging** -- use `#hashtags` in your text and they're extracted as CAN tags
- **Live refresh** -- new pastes appear instantly via Server-Sent Events (including content arriving from P2P sync on another machine)
## Running
Make sure CAN Service is running on port 3210 first:
```bash
# From the repo root
cargo run
```
Then start Paste:
```bash
cd examples/paste
cargo run
```
Opens automatically at [http://127.0.0.1:3211](http://127.0.0.1:3211).
## How It Works
Paste is a thin proxy layer. The Rust backend serves a single-page HTML/JS frontend and forwards requests to the CAN Service API:
| Paste Route | Proxies To | Purpose |
|-------------|-----------|---------|
| `POST /paste/text` | `POST /api/v1/can/0/ingest` | Store text as a `.txt` asset |
| `POST /paste/file` | `POST /api/v1/can/0/ingest` | Store an uploaded file |
| `GET /paste/list` | `GET /api/v1/can/0/list?application=paste` | List paste assets |
| `GET /paste/asset/{hash}` | `GET /api/v1/can/0/asset/{hash}` | Download an asset |
| `GET /paste/thumb/{hash}` | `GET /api/v1/can/0/asset/{hash}/thumb/200/200` | Image thumbnail |
| `GET /paste/events` | `GET /api/v1/can/0/events` | SSE stream for live updates |
All pastes are tagged with `application=paste` so they're scoped separately from other CAN content.
## Project Structure
```
src/
main.rs HTTP server: proxy handlers, tag extraction, SSE relay
html.rs Single-page frontend (HTML + CSS + JS, embedded as a string)
```

View File

@ -12,11 +12,13 @@ use std::net::SocketAddr;
const CAN_API: &str = "http://127.0.0.1:3210/api/v1/can/0"; const CAN_API: &str = "http://127.0.0.1:3210/api/v1/can/0";
/// Shared HTTP client for talking to the CAN service backend.
#[derive(Clone)] #[derive(Clone)]
struct AppState { struct AppState {
client: reqwest::Client, client: reqwest::Client,
} }
/// JSON body for the text paste endpoint.
#[derive(Deserialize)] #[derive(Deserialize)]
struct PasteTextRequest { struct PasteTextRequest {
text: String, text: String,
@ -62,6 +64,7 @@ async fn forward(resp: Result<reqwest::Response, reqwest::Error>) -> Response {
// ── Handlers ───────────────────────────────────────────────────────────── // ── Handlers ─────────────────────────────────────────────────────────────
/// Serve the single-page HTML frontend.
async fn serve_index() -> Html<&'static str> { async fn serve_index() -> Html<&'static str> {
Html(html::INDEX_HTML) Html(html::INDEX_HTML)
} }
@ -227,7 +230,8 @@ async fn proxy_thumb(
forward(resp).await forward(resp).await
} }
/// Proxy SSE events from CAN service so the frontend gets live updates. /// Proxy SSE (Server-Sent Events) from the CAN service to the browser so
/// the frontend auto-refreshes when new pastes arrive.
async fn paste_events( async fn paste_events(
State(state): State<AppState>, State(state): State<AppState>,
) -> Sse<impl futures_util::Stream<Item = Result<Event, Infallible>>> { ) -> Sse<impl futures_util::Stream<Item = Result<Event, Infallible>>> {
@ -289,6 +293,7 @@ async fn paste_events(
// ── Main ───────────────────────────────────────────────────────────────── // ── Main ─────────────────────────────────────────────────────────────────
/// Start the Paste web app: a simple pastebin that stores text and images in CAN service.
#[tokio::main] #[tokio::main]
async fn main() { async fn main() {
tracing_subscriber::fmt() tracing_subscriber::fmt()

View File

@ -1,6 +1,7 @@
use serde::Deserialize; use serde::Deserialize;
use std::path::{Path, PathBuf}; use std::path::{Path, PathBuf};
/// Application settings loaded from config.yaml at startup.
#[derive(Debug, Clone, Deserialize)] #[derive(Debug, Clone, Deserialize)]
pub struct Config { pub struct Config {
pub storage_root: PathBuf, pub storage_root: PathBuf,
@ -18,6 +19,7 @@ pub struct Config {
pub sync_api_key: Option<String>, pub sync_api_key: Option<String>,
} }
// Default values used when a field is missing from config.yaml.
fn default_admin_token() -> String { fn default_admin_token() -> String {
"changeme".to_string() "changeme".to_string()
} }
@ -32,24 +34,29 @@ fn default_verify_interval() -> u64 {
} }
impl Config { impl Config {
/// Read and parse the YAML config file from disk.
pub fn load(path: &Path) -> anyhow::Result<Self> { pub fn load(path: &Path) -> anyhow::Result<Self> {
let contents = std::fs::read_to_string(path)?; let contents = std::fs::read_to_string(path)?;
let config: Config = serde_yaml::from_str(&contents)?; let config: Config = serde_yaml::from_str(&contents)?;
Ok(config) Ok(config)
} }
/// Returns the path to the SQLite database file inside storage_root.
pub fn db_path(&self) -> PathBuf { pub fn db_path(&self) -> PathBuf {
self.storage_root.join(".can.db") self.storage_root.join(".can.db")
} }
/// Returns the path to the trash folder for soft-deleted files.
pub fn trash_dir(&self) -> PathBuf { pub fn trash_dir(&self) -> PathBuf {
self.storage_root.join(".trash") self.storage_root.join(".trash")
} }
/// Returns the path to the cached thumbnail images folder.
pub fn thumbs_dir(&self) -> PathBuf { pub fn thumbs_dir(&self) -> PathBuf {
self.storage_root.join(".thumbs") self.storage_root.join(".thumbs")
} }
/// Create the storage, trash, and thumbnail directories if they don't exist yet.
pub fn ensure_dirs(&self) -> anyhow::Result<()> { pub fn ensure_dirs(&self) -> anyhow::Result<()> {
std::fs::create_dir_all(&self.storage_root)?; std::fs::create_dir_all(&self.storage_root)?;
std::fs::create_dir_all(self.trash_dir())?; std::fs::create_dir_all(self.trash_dir())?;

View File

@ -4,8 +4,11 @@ use std::sync::{Arc, Mutex};
use crate::models::{Asset, AssetMeta, ListParams, SearchParams}; use crate::models::{Asset, AssetMeta, ListParams, SearchParams};
/// Thread-safe handle to the SQLite database (wrapped in Arc<Mutex> so multiple
/// threads can share it safely).
pub type Db = Arc<Mutex<Connection>>; pub type Db = Arc<Mutex<Connection>>;
/// Open (or create) the SQLite database file and set up tables.
pub fn open(path: &Path) -> anyhow::Result<Db> { pub fn open(path: &Path) -> anyhow::Result<Db> {
let conn = Connection::open(path)?; let conn = Connection::open(path)?;
conn.execute_batch("PRAGMA journal_mode=WAL; PRAGMA foreign_keys=ON;")?; conn.execute_batch("PRAGMA journal_mode=WAL; PRAGMA foreign_keys=ON;")?;
@ -13,6 +16,7 @@ pub fn open(path: &Path) -> anyhow::Result<Db> {
Ok(Arc::new(Mutex::new(conn))) Ok(Arc::new(Mutex::new(conn)))
} }
/// Open a temporary in-memory database (used for tests).
pub fn open_in_memory() -> anyhow::Result<Db> { pub fn open_in_memory() -> anyhow::Result<Db> {
let conn = Connection::open_in_memory()?; let conn = Connection::open_in_memory()?;
conn.execute_batch("PRAGMA foreign_keys=ON;")?; conn.execute_batch("PRAGMA foreign_keys=ON;")?;
@ -20,6 +24,8 @@ pub fn open_in_memory() -> anyhow::Result<Db> {
Ok(Arc::new(Mutex::new(conn))) Ok(Arc::new(Mutex::new(conn)))
} }
/// Create the assets, tags, and asset_tags tables if they don't already exist,
/// and run any pending migrations.
fn init_schema(conn: &Connection) -> rusqlite::Result<()> { fn init_schema(conn: &Connection) -> rusqlite::Result<()> {
conn.execute_batch( conn.execute_batch(
" "
@ -66,7 +72,7 @@ fn init_schema(conn: &Connection) -> rusqlite::Result<()> {
Ok(()) Ok(())
} }
/// Insert a new asset. Returns the row id. /// Save a new asset record to the database. Returns the auto-generated row id.
pub fn insert_asset(conn: &Connection, asset: &Asset) -> rusqlite::Result<i64> { pub fn insert_asset(conn: &Connection, asset: &Asset) -> rusqlite::Result<i64> {
conn.execute( conn.execute(
"INSERT INTO assets (timestamp, hash, mime_type, application, user_identity, description, actual_filename, human_filename, human_path, size) "INSERT INTO assets (timestamp, hash, mime_type, application, user_identity, description, actual_filename, human_filename, human_path, size)
@ -87,7 +93,7 @@ pub fn insert_asset(conn: &Connection, asset: &Asset) -> rusqlite::Result<i64> {
Ok(conn.last_insert_rowid()) Ok(conn.last_insert_rowid())
} }
/// Look up an asset by its hash. /// Find an asset by its unique SHA-256 hash. Returns None if not found.
pub fn get_asset_by_hash(conn: &Connection, hash: &str) -> rusqlite::Result<Option<Asset>> { pub fn get_asset_by_hash(conn: &Connection, hash: &str) -> rusqlite::Result<Option<Asset>> {
conn.query_row( conn.query_row(
"SELECT id, timestamp, hash, mime_type, application, user_identity, description, "SELECT id, timestamp, hash, mime_type, application, user_identity, description,
@ -115,7 +121,7 @@ pub fn get_asset_by_hash(conn: &Connection, hash: &str) -> rusqlite::Result<Opti
.optional() .optional()
} }
/// Get tags for an asset. /// Get the list of tag names attached to an asset.
pub fn get_asset_tags(conn: &Connection, asset_id: i64) -> rusqlite::Result<Vec<String>> { pub fn get_asset_tags(conn: &Connection, asset_id: i64) -> rusqlite::Result<Vec<String>> {
let mut stmt = conn.prepare( let mut stmt = conn.prepare(
"SELECT t.name FROM tags t "SELECT t.name FROM tags t
@ -127,7 +133,7 @@ pub fn get_asset_tags(conn: &Connection, asset_id: i64) -> rusqlite::Result<Vec<
tags.collect() tags.collect()
} }
/// Upsert a tag and return its id. /// Insert a tag if it doesn't exist yet, then return its id.
pub fn upsert_tag(conn: &Connection, name: &str) -> rusqlite::Result<i64> { pub fn upsert_tag(conn: &Connection, name: &str) -> rusqlite::Result<i64> {
conn.execute( conn.execute(
"INSERT OR IGNORE INTO tags (name) VALUES (?1)", "INSERT OR IGNORE INTO tags (name) VALUES (?1)",
@ -138,7 +144,7 @@ pub fn upsert_tag(conn: &Connection, name: &str) -> rusqlite::Result<i64> {
}) })
} }
/// Replace all tags for an asset within a transaction. /// Remove all existing tags for an asset and assign the new ones.
pub fn set_asset_tags(conn: &Connection, asset_id: i64, tags: &[String]) -> rusqlite::Result<()> { pub fn set_asset_tags(conn: &Connection, asset_id: i64, tags: &[String]) -> rusqlite::Result<()> {
conn.execute( conn.execute(
"DELETE FROM asset_tags WHERE asset_id = ?1", "DELETE FROM asset_tags WHERE asset_id = ?1",
@ -154,7 +160,8 @@ pub fn set_asset_tags(conn: &Connection, asset_id: i64, tags: &[String]) -> rusq
Ok(()) Ok(())
} }
/// Build an AssetMeta from an Asset row + tags. /// Convert an internal Asset database row into the API-friendly AssetMeta format
/// (includes tags fetched from the join table).
pub fn asset_to_meta(conn: &Connection, asset: &Asset) -> rusqlite::Result<AssetMeta> { pub fn asset_to_meta(conn: &Connection, asset: &Asset) -> rusqlite::Result<AssetMeta> {
let tags = get_asset_tags(conn, asset.id)?; let tags = get_asset_tags(conn, asset.id)?;
Ok(AssetMeta { Ok(AssetMeta {
@ -173,7 +180,7 @@ pub fn asset_to_meta(conn: &Connection, asset: &Asset) -> rusqlite::Result<Asset
}) })
} }
/// Update description and/or tags for an asset. /// Update an asset's description and/or tags (only changes the fields you provide).
pub fn update_asset_metadata( pub fn update_asset_metadata(
conn: &Connection, conn: &Connection,
hash: &str, hash: &str,
@ -195,7 +202,7 @@ pub fn update_asset_metadata(
Ok(()) Ok(())
} }
/// Flag an asset as corrupted. /// Mark or unmark an asset as corrupted (set by the background verifier).
pub fn flag_corrupted(conn: &Connection, hash: &str, corrupted: bool) -> rusqlite::Result<()> { pub fn flag_corrupted(conn: &Connection, hash: &str, corrupted: bool) -> rusqlite::Result<()> {
conn.execute( conn.execute(
"UPDATE assets SET is_corrupted = ?1 WHERE hash = ?2", "UPDATE assets SET is_corrupted = ?1 WHERE hash = ?2",
@ -204,7 +211,8 @@ pub fn flag_corrupted(conn: &Connection, hash: &str, corrupted: bool) -> rusqlit
Ok(()) Ok(())
} }
/// Update file size for an asset (used by verifier to backfill). /// Store the file size in bytes for an asset (used by the verifier to fill in
/// sizes for assets that were created before the size column existed).
pub fn update_asset_size(conn: &Connection, hash: &str, size: i64) -> rusqlite::Result<()> { pub fn update_asset_size(conn: &Connection, hash: &str, size: i64) -> rusqlite::Result<()> {
conn.execute( conn.execute(
"UPDATE assets SET size = ?1 WHERE hash = ?2", "UPDATE assets SET size = ?1 WHERE hash = ?2",
@ -213,7 +221,7 @@ pub fn update_asset_size(conn: &Connection, hash: &str, size: i64) -> rusqlite::
Ok(()) Ok(())
} }
/// Soft-delete: mark as trashed. /// Soft-delete an asset by marking it as trashed (the file is moved to .trash/).
pub fn trash_asset(conn: &Connection, hash: &str) -> rusqlite::Result<()> { pub fn trash_asset(conn: &Connection, hash: &str) -> rusqlite::Result<()> {
conn.execute( conn.execute(
"UPDATE assets SET is_trashed = 1 WHERE hash = ?1", "UPDATE assets SET is_trashed = 1 WHERE hash = ?1",
@ -222,7 +230,8 @@ pub fn trash_asset(conn: &Connection, hash: &str) -> rusqlite::Result<()> {
Ok(()) Ok(())
} }
/// List assets with pagination and filtering. /// Fetch a page of assets with optional filters (application, trashed, etc.).
/// Returns the matching assets and the total count for pagination.
pub fn list_assets(conn: &Connection, params: &ListParams) -> rusqlite::Result<(Vec<Asset>, i64)> { pub fn list_assets(conn: &Connection, params: &ListParams) -> rusqlite::Result<(Vec<Asset>, i64)> {
let limit = params.limit.unwrap_or(50); let limit = params.limit.unwrap_or(50);
let offset = params.offset.unwrap_or(0); let offset = params.offset.unwrap_or(0);
@ -301,7 +310,8 @@ pub fn list_assets(conn: &Connection, params: &ListParams) -> rusqlite::Result<(
Ok((assets, total)) Ok((assets, total))
} }
/// Search assets with various filters. /// Search assets with multiple filters (hash prefix, time range, MIME type, tags, etc.).
/// Returns matching assets and total count for pagination.
pub fn search_assets( pub fn search_assets(
conn: &Connection, conn: &Connection,
params: &SearchParams, params: &SearchParams,
@ -428,7 +438,8 @@ pub fn search_assets(
Ok((assets, total)) Ok((assets, total))
} }
/// Get ALL asset records including trashed (for sync reconciliation). /// Get every asset record in the database, including trashed ones.
/// Used by the sync system to compare what two peers have.
pub fn get_all_assets(conn: &Connection) -> rusqlite::Result<Vec<Asset>> { pub fn get_all_assets(conn: &Connection) -> rusqlite::Result<Vec<Asset>> {
let mut stmt = conn.prepare( let mut stmt = conn.prepare(
"SELECT id, timestamp, hash, mime_type, application, user_identity, description, "SELECT id, timestamp, hash, mime_type, application, user_identity, description,
@ -457,7 +468,8 @@ pub fn get_all_assets(conn: &Connection) -> rusqlite::Result<Vec<Asset>> {
Ok(assets) Ok(assets)
} }
/// Get assets with `timestamp > since` (for incremental sync queries). /// Get only assets added after a given timestamp (for incremental sync --
/// "what's new since last time I checked?").
pub fn get_assets_since(conn: &Connection, since: i64) -> rusqlite::Result<Vec<Asset>> { pub fn get_assets_since(conn: &Connection, since: i64) -> rusqlite::Result<Vec<Asset>> {
let mut stmt = conn.prepare( let mut stmt = conn.prepare(
"SELECT id, timestamp, hash, mime_type, application, user_identity, description, "SELECT id, timestamp, hash, mime_type, application, user_identity, description,
@ -487,7 +499,8 @@ pub fn get_assets_since(conn: &Connection, since: i64) -> rusqlite::Result<Vec<A
Ok(assets) Ok(assets)
} }
/// Get all non-trashed asset records (for verifier startup scan). /// Get all non-trashed assets (used by the background verifier to check
/// file integrity on startup).
pub fn get_all_active_assets(conn: &Connection) -> rusqlite::Result<Vec<Asset>> { pub fn get_all_active_assets(conn: &Connection) -> rusqlite::Result<Vec<Asset>> {
let mut stmt = conn.prepare( let mut stmt = conn.prepare(
"SELECT id, timestamp, hash, mime_type, application, user_identity, description, "SELECT id, timestamp, hash, mime_type, application, user_identity, description,

View File

@ -2,6 +2,7 @@ use axum::http::StatusCode;
use axum::response::{IntoResponse, Response}; use axum::response::{IntoResponse, Response};
use crate::models::ErrorResponse; use crate::models::ErrorResponse;
/// All the error types the API can return. Each variant maps to an HTTP status code.
#[derive(Debug, thiserror::Error)] #[derive(Debug, thiserror::Error)]
pub enum AppError { pub enum AppError {
#[error("Not found: {0}")] #[error("Not found: {0}")]
@ -23,6 +24,7 @@ pub enum AppError {
Internal(String), Internal(String),
} }
/// Converts an AppError into an HTTP response with the right status code and a JSON body.
impl IntoResponse for AppError { impl IntoResponse for AppError {
fn into_response(self) -> Response { fn into_response(self) -> Response {
let (status, message) = match &self { let (status, message) = match &self {

View File

@ -1,12 +1,12 @@
pub mod config; pub mod config; // Configuration loading from YAML
pub mod db; pub mod db; // SQLite database access (CRUD for assets and tags)
pub mod error; pub mod error; // Centralized error types and HTTP error responses
pub mod hash; pub mod hash; // SHA-256 content hashing
pub mod models; pub mod models; // Data structures shared across the codebase
pub mod routes; pub mod routes; // HTTP API route handlers
pub mod storage; pub mod storage; // File I/O: reading, writing, and trashing asset files
pub mod verifier; pub mod verifier; // Background integrity checker and file-attribute syncer
pub mod xattr; pub mod xattr; // OS-level file metadata (xattr on Unix, NTFS ADS on Windows)
use std::sync::Arc; use std::sync::Arc;
@ -17,6 +17,7 @@ use crate::db::Db;
/// Each message is `"hash:timestamp"` (e.g. `"abc123def456:1710000000000"`). /// Each message is `"hash:timestamp"` (e.g. `"abc123def456:1710000000000"`).
pub type SyncEventSender = tokio::sync::broadcast::Sender<String>; pub type SyncEventSender = tokio::sync::broadcast::Sender<String>;
/// Shared application state passed to every HTTP handler.
#[derive(Clone)] #[derive(Clone)]
pub struct AppState { pub struct AppState {
pub config: Arc<Config>, pub config: Arc<Config>,

View File

@ -10,6 +10,8 @@ use tower_http::trace::TraceLayer;
use can_service::config::Config; use can_service::config::Config;
use can_service::{db, routes, verifier, AppState}; use can_service::{db, routes, verifier, AppState};
/// Entry point: loads config, opens the database, starts background services,
/// and launches the HTTP server.
#[tokio::main] #[tokio::main]
async fn main() -> anyhow::Result<()> { async fn main() -> anyhow::Result<()> {
// Initialize tracing // Initialize tracing

View File

@ -1,6 +1,7 @@
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
/// Database representation of an asset. /// Internal database row for a stored file. Contains all metadata fields
/// that are persisted in SQLite.
#[derive(Debug, Clone, Serialize, Deserialize)] #[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Asset { pub struct Asset {
pub id: i64, pub id: i64,
@ -18,7 +19,8 @@ pub struct Asset {
pub size: i64, pub size: i64,
} }
/// API-facing asset metadata response. /// The public-facing version of an asset's metadata, returned by the API.
/// Includes resolved tags and omits internal fields like `id` and `actual_filename`.
#[derive(Debug, Clone, Serialize, Deserialize)] #[derive(Debug, Clone, Serialize, Deserialize)]
pub struct AssetMeta { pub struct AssetMeta {
pub hash: String, pub hash: String,
@ -35,7 +37,7 @@ pub struct AssetMeta {
pub size: i64, pub size: i64,
} }
/// Standard API response wrapper. /// Wraps every successful API response in `{ "status": "success", "data": ... }`.
#[derive(Debug, Serialize, Deserialize)] #[derive(Debug, Serialize, Deserialize)]
pub struct ApiResponse<T: Serialize> { pub struct ApiResponse<T: Serialize> {
pub status: String, pub status: String,
@ -43,6 +45,7 @@ pub struct ApiResponse<T: Serialize> {
} }
impl<T: Serialize> ApiResponse<T> { impl<T: Serialize> ApiResponse<T> {
/// Create a success response wrapping the given data.
pub fn success(data: T) -> Self { pub fn success(data: T) -> Self {
Self { Self {
status: "success".to_string(), status: "success".to_string(),
@ -51,7 +54,7 @@ impl<T: Serialize> ApiResponse<T> {
} }
} }
/// Error response body. /// JSON body for error responses: `{ "status": "error", "error": "..." }`.
#[derive(Debug, Serialize, Deserialize)] #[derive(Debug, Serialize, Deserialize)]
pub struct ErrorResponse { pub struct ErrorResponse {
pub status: String, pub status: String,
@ -67,7 +70,7 @@ impl ErrorResponse {
} }
} }
/// Ingest success response data. /// Returned after a successful file upload: the timestamp, hash, and on-disk filename.
#[derive(Debug, Serialize, Deserialize)] #[derive(Debug, Serialize, Deserialize)]
pub struct IngestResult { pub struct IngestResult {
pub timestamp: i64, pub timestamp: i64,
@ -97,7 +100,9 @@ pub struct MetadataUpdate {
pub description: Option<String>, pub description: Option<String>,
} }
/// OS-level file attribute metadata (for xattr / NTFS ADS). /// Metadata stored directly on the file via OS-level attributes
/// (xattr on macOS/Linux, NTFS Alternate Data Streams on Windows).
/// This lets external tools read CAN metadata without hitting the database.
#[derive(Debug, Clone, Default, PartialEq)] #[derive(Debug, Clone, Default, PartialEq)]
pub struct FileAttributes { pub struct FileAttributes {
pub mime_type: Option<String>, pub mime_type: Option<String>,

View File

@ -17,7 +17,8 @@ pub fn router() -> Router<AppState> {
.route("/api/v1/can/0/asset/{hash}", patch(patch_asset)) .route("/api/v1/can/0/asset/{hash}", patch(patch_asset))
} }
/// GET /api/v1/can/0/asset/{hash} - Stream the physical file. /// Download an asset's file by its hash. Streams the raw bytes back to the
/// client with the correct MIME type and a suggested filename.
async fn get_asset( async fn get_asset(
State(state): State<AppState>, State(state): State<AppState>,
Path(hash): Path<String>, Path(hash): Path<String>,
@ -59,7 +60,8 @@ async fn get_asset(
.into_response()) .into_response())
} }
/// PATCH /api/v1/can/0/asset/{hash} - Update metadata (tags, description). /// Update an asset's tags and/or description. Saves changes to both the
/// database and the OS-level file attributes.
async fn patch_asset( async fn patch_asset(
State(state): State<AppState>, State(state): State<AppState>,
Path(hash): Path<String>, Path(hash): Path<String>,

View File

@ -7,6 +7,9 @@ use crate::error::AppError;
use crate::models::{ApiResponse, Asset, DataIngestRequest, FileAttributes, IngestResult}; use crate::models::{ApiResponse, Asset, DataIngestRequest, FileAttributes, IngestResult};
use crate::{db, hash, storage, xattr, AppState}; use crate::{db, hash, storage, xattr, AppState};
/// Register the two upload endpoints:
/// - POST /ingest (multipart file upload)
/// - POST /ingest/data (JSON body upload, agent-friendly)
pub fn router() -> Router<AppState> { pub fn router() -> Router<AppState> {
Router::new() Router::new()
.route("/api/v1/can/0/ingest", post(ingest_multipart)) .route("/api/v1/can/0/ingest", post(ingest_multipart))
@ -27,7 +30,9 @@ struct IngestInput {
description: Option<String>, description: Option<String>,
} }
/// Common pipeline: timestamp → hash → write file → xattr → DB insert. /// Core ingest pipeline shared by both upload endpoints.
/// Steps: generate timestamp -> hash content -> write file to disk ->
/// save OS-level metadata -> insert into database -> notify SSE subscribers.
fn do_ingest(state: &AppState, input: IngestInput) -> Result<IngestResult, AppError> { fn do_ingest(state: &AppState, input: IngestInput) -> Result<IngestResult, AppError> {
let timestamp = SystemTime::now() let timestamp = SystemTime::now()
.duration_since(UNIX_EPOCH) .duration_since(UNIX_EPOCH)
@ -99,7 +104,7 @@ fn do_ingest(state: &AppState, input: IngestInput) -> Result<IngestResult, AppEr
}) })
} }
/// Parse a comma-separated tag string into a clean Vec. /// Split a comma-separated tag string like "photo,vacation" into a clean list.
fn parse_tags(raw: Option<&str>) -> Vec<String> { fn parse_tags(raw: Option<&str>) -> Vec<String> {
raw.unwrap_or("") raw.unwrap_or("")
.split(',') .split(',')
@ -110,6 +115,8 @@ fn parse_tags(raw: Option<&str>) -> Vec<String> {
// ── POST /api/v1/can/0/ingest (multipart — file uploads) ────────────── // ── POST /api/v1/can/0/ingest (multipart — file uploads) ──────────────
/// Handle multipart file upload. Reads the "file" field plus optional metadata
/// fields (tags, application, user, etc.) and runs the ingest pipeline.
async fn ingest_multipart( async fn ingest_multipart(
State(state): State<AppState>, State(state): State<AppState>,
mut multipart: Multipart, mut multipart: Multipart,

View File

@ -10,6 +10,8 @@ pub fn router() -> Router<AppState> {
Router::new().route("/api/v1/can/0/list", get(list_assets)) Router::new().route("/api/v1/can/0/list", get(list_assets))
} }
/// GET /api/v1/can/0/list - Return a paginated list of assets with their metadata.
/// Supports query params: limit, offset, order (asc/desc), application filter.
async fn list_assets( async fn list_assets(
State(state): State<AppState>, State(state): State<AppState>,
Query(params): Query<ListParams>, Query(params): Query<ListParams>,

View File

@ -10,6 +10,9 @@ pub fn router() -> Router<AppState> {
Router::new().route("/api/v1/can/0/asset/{hash}/meta", get(get_meta)) Router::new().route("/api/v1/can/0/asset/{hash}/meta", get(get_meta))
} }
/// GET /api/v1/can/0/asset/{hash}/meta - Return an asset's metadata as JSON
/// (hash, MIME type, tags, description, timestamps, etc.) without downloading
/// the actual file.
async fn get_meta( async fn get_meta(
State(state): State<AppState>, State(state): State<AppState>,
Path(hash): Path<String>, Path(hash): Path<String>,

View File

@ -1,15 +1,16 @@
pub mod ingest; pub mod ingest; // POST endpoints for uploading files and JSON data
pub mod asset; pub mod asset; // GET/PATCH endpoints for downloading files and updating metadata
pub mod meta; pub mod meta; // GET endpoint for reading asset metadata as JSON
pub mod list; pub mod list; // GET endpoint for paginated asset listing
pub mod search; pub mod search; // GET endpoint for searching/filtering assets
pub mod thumb; pub mod thumb; // GET endpoint for generating resized thumbnail images
pub mod sync; pub mod sync; // Private P2P sync endpoints (protobuf, requires API key)
pub mod events; pub mod events; // Public SSE endpoint for real-time "new asset" notifications
use axum::Router; use axum::Router;
use crate::AppState; use crate::AppState;
/// Combine all route modules into one router. Called once at startup.
pub fn router() -> Router<AppState> { pub fn router() -> Router<AppState> {
Router::new() Router::new()
.merge(ingest::router()) .merge(ingest::router())

View File

@ -10,6 +10,8 @@ pub fn router() -> Router<AppState> {
Router::new().route("/api/v1/can/0/search", get(search_assets)) Router::new().route("/api/v1/can/0/search", get(search_assets))
} }
/// GET /api/v1/can/0/search - Search assets by hash prefix, time range,
/// MIME type, user, application, or tags. Returns paginated results.
async fn search_assets( async fn search_assets(
State(state): State<AppState>, State(state): State<AppState>,
Query(params): Query<SearchParams>, Query(params): Query<SearchParams>,

View File

@ -22,7 +22,9 @@ use tokio_stream::StreamExt;
use crate::models::{Asset, FileAttributes}; use crate::models::{Asset, FileAttributes};
use crate::{db, hash, storage, xattr, AppState}; use crate::{db, hash, storage, xattr, AppState};
// ── Protobuf message types (hand-written, no protoc needed) ───────────── // ── Protobuf message types ───────────────────────────────────────────────
// These structs are serialized/deserialized as protobuf using the `prost` crate.
// They define the wire format for peer-to-peer sync communication.
#[derive(Clone, PartialEq, Message)] #[derive(Clone, PartialEq, Message)]
pub struct HashListRequest {} pub struct HashListRequest {}
@ -137,6 +139,8 @@ struct HashesQuery {
// ── Auth ──────────────────────────────────────────────────────────────── // ── Auth ────────────────────────────────────────────────────────────────
/// Verify the X-Sync-Key header matches the configured API key.
/// Returns 404 if sync is not configured, 401 if the key is wrong.
fn check_sync_key(state: &AppState, headers: &HeaderMap) -> Result<(), (StatusCode, String)> { fn check_sync_key(state: &AppState, headers: &HeaderMap) -> Result<(), (StatusCode, String)> {
let expected = match &state.config.sync_api_key { let expected = match &state.config.sync_api_key {
Some(key) if !key.is_empty() => key, Some(key) if !key.is_empty() => key,
@ -157,6 +161,7 @@ fn check_sync_key(state: &AppState, headers: &HeaderMap) -> Result<(), (StatusCo
// ── Helpers ───────────────────────────────────────────────────────────── // ── Helpers ─────────────────────────────────────────────────────────────
/// Serialize a protobuf message into bytes.
fn encode_proto<M: Message>(msg: &M) -> Result<Vec<u8>, (StatusCode, String)> { fn encode_proto<M: Message>(msg: &M) -> Result<Vec<u8>, (StatusCode, String)> {
let mut buf = Vec::with_capacity(msg.encoded_len()); let mut buf = Vec::with_capacity(msg.encoded_len());
msg.encode(&mut buf) msg.encode(&mut buf)
@ -164,12 +169,16 @@ fn encode_proto<M: Message>(msg: &M) -> Result<Vec<u8>, (StatusCode, String)> {
Ok(buf) Ok(buf)
} }
/// Wrap protobuf bytes into an HTTP 200 response with the right content type.
fn proto_response(buf: Vec<u8>) -> (StatusCode, [(&'static str, &'static str); 1], Vec<u8>) { fn proto_response(buf: Vec<u8>) -> (StatusCode, [(&'static str, &'static str); 1], Vec<u8>) {
(StatusCode::OK, [("content-type", "application/x-protobuf")], buf) (StatusCode::OK, [("content-type", "application/x-protobuf")], buf)
} }
// ── POST /sync/hashes ─────────────────────────────────────────────────── // ── POST /sync/hashes ───────────────────────────────────────────────────
/// Return a compact list of all known asset hashes + timestamps.
/// A remote peer calls this first to figure out which assets it's missing.
/// Supports `?since=<timestamp>` for incremental queries.
async fn sync_hashes( async fn sync_hashes(
State(state): State<AppState>, State(state): State<AppState>,
headers: HeaderMap, headers: HeaderMap,
@ -208,6 +217,8 @@ async fn sync_hashes(
// ── POST /sync/pull ───────────────────────────────────────────────────── // ── POST /sync/pull ─────────────────────────────────────────────────────
/// Download full asset bundles (metadata + file content) for a list of hashes.
/// A remote peer calls this to fetch assets it doesn't have yet.
async fn sync_pull( async fn sync_pull(
State(state): State<AppState>, State(state): State<AppState>,
headers: HeaderMap, headers: HeaderMap,
@ -261,6 +272,9 @@ async fn sync_pull(
// ── POST /sync/push ───────────────────────────────────────────────────── // ── POST /sync/push ─────────────────────────────────────────────────────
/// Receive and store a new asset pushed from a remote peer.
/// Verifies the hash, writes the file, and inserts the DB record.
/// Returns early if the asset already exists locally.
async fn sync_push( async fn sync_push(
State(state): State<AppState>, State(state): State<AppState>,
headers: HeaderMap, headers: HeaderMap,
@ -372,6 +386,7 @@ async fn sync_push(
// ── POST /sync/meta ───────────────────────────────────────────────────── // ── POST /sync/meta ─────────────────────────────────────────────────────
/// Receive a metadata update from a remote peer (description, tags, trash status).
async fn sync_meta( async fn sync_meta(
State(state): State<AppState>, State(state): State<AppState>,
headers: HeaderMap, headers: HeaderMap,

View File

@ -18,12 +18,15 @@ pub fn router() -> Router<AppState> {
) )
} }
/// Static fallback SVG icon for non-image assets. /// A simple "?" placeholder icon returned when the asset isn't a resizable image.
const FALLBACK_SVG: &str = r##"<svg xmlns="http://www.w3.org/2000/svg" width="128" height="128" viewBox="0 0 128 128"> const FALLBACK_SVG: &str = r##"<svg xmlns="http://www.w3.org/2000/svg" width="128" height="128" viewBox="0 0 128 128">
<rect width="128" height="128" rx="8" fill="#e0e0e0"/> <rect width="128" height="128" rx="8" fill="#e0e0e0"/>
<text x="64" y="72" text-anchor="middle" font-family="sans-serif" font-size="40" fill="#888">?</text> <text x="64" y="72" text-anchor="middle" font-family="sans-serif" font-size="40" fill="#888">?</text>
</svg>"##; </svg>"##;
/// GET /api/v1/can/0/asset/{hash}/thumb/{width}/{height}
/// Generate (or serve from cache) a resized JPEG thumbnail for image assets.
/// Non-image assets get a placeholder SVG icon instead.
async fn get_thumb( async fn get_thumb(
State(state): State<AppState>, State(state): State<AppState>,
Path((hash, max_width, max_height)): Path<(String, u32, u32)>, Path((hash, max_width, max_height)): Path<(String, u32, u32)>,

View File

@ -1,59 +1,49 @@
use std::path::{Path, PathBuf}; use std::path::{Path, PathBuf};
use chrono::{DateTime, Utc}; /// Build the on-disk filename for a new asset.
/// Format: `{timestamp}_{sha256hash}_{tags}.{extension}`
/// Classify a MIME type into a storage subdirectory. /// Tags are sanitized (alphanumeric only) and truncated to fit filesystem limits.
pub fn mime_to_type_dir(mime: &str) -> &str {
if mime.starts_with("image/") {
"images"
} else if mime == "application/pdf" {
"pdf"
} else if mime.starts_with("video/") {
"video"
} else if mime.starts_with("audio/") {
"audio"
} else if mime.starts_with("text/")
|| mime == "application/json"
|| mime == "application/xml"
|| mime == "application/msword"
|| mime == "application/rtf"
|| mime.starts_with("application/vnd.openxmlformats")
|| mime.starts_with("application/vnd.ms-")
|| mime == "application/vnd.oasis.opendocument.text"
|| mime == "application/vnd.oasis.opendocument.spreadsheet"
{
"documents"
} else {
"others"
}
}
/// Build the physical filename (including type subdirectory) per the spec:
/// `{type_dir}/{YYYY-MM-DD_HH-MM}_{hash8}.{extension}`
///
/// Example: `images/2026-03-13_14-30_a3b2c4d5.jpg`
pub fn build_filename( pub fn build_filename(
timestamp: i64, timestamp: i64,
hash: &str, hash: &str,
_tags: &[String], tags: &[String],
mime_type: &str, mime_type: &str,
) -> String { ) -> String {
let extension = mime_to_extension(mime_type); let extension = mime_to_extension(mime_type);
let type_dir = mime_to_type_dir(mime_type);
// Convert timestamp_ms to human-readable YYYY-MM-DD_HH-MM let base = format!("{}_{}", timestamp, hash);
let dt = DateTime::<Utc>::from_timestamp_millis(timestamp)
.unwrap_or_else(|| DateTime::<Utc>::from_timestamp(0, 0).unwrap());
let time_part = dt.format("%Y-%m-%d_%H-%M").to_string();
// Use first 8 chars of hash for short identifier if tags.is_empty() {
let short_hash = if hash.len() >= 8 { &hash[..8] } else { hash }; return format!("{}.{}", base, extension);
}
format!("{}/{}_{}. {}", type_dir, time_part, short_hash, extension) // Sanitize tags: strip non-alphanumeric, join with underscore
let sanitized_tags: Vec<String> = tags
.iter()
.map(|t| t.chars().filter(|c| c.is_alphanumeric()).collect::<String>())
.filter(|t| !t.is_empty())
.collect();
if sanitized_tags.is_empty() {
return format!("{}.{}", base, extension);
}
let tag_part = sanitized_tags.join("_");
// Truncate to keep total filename under ~200 chars (safely under 255)
let max_tag_len = 200usize.saturating_sub(base.len() + extension.len() + 2); // 2 for _ and .
let truncated = if tag_part.len() > max_tag_len {
&tag_part[..max_tag_len]
} else {
&tag_part
};
format!("{}_{}. {}", base, truncated, extension)
.replace(". ", ".") .replace(". ", ".")
} }
/// Derive file extension from MIME type. /// Convert a MIME type string (like "image/png") into a file extension (like "png").
/// Falls back to "bin" for unknown types.
pub fn mime_to_extension(mime: &str) -> &str { pub fn mime_to_extension(mime: &str) -> &str {
match mime { match mime {
"application/pdf" => "pdf", "application/pdf" => "pdf",
@ -87,92 +77,49 @@ pub fn mime_to_extension(mime: &str) -> &str {
} }
} }
/// Write asset bytes to the storage root. Creates the type subdirectory if needed. /// Save a file's raw bytes to the storage directory. Returns the full path on disk.
/// `filename` may include a subdirectory prefix (e.g. "images/2026-01-01_12-00_abcd1234.jpg").
pub fn write_asset(root: &Path, filename: &str, data: &[u8]) -> std::io::Result<PathBuf> { pub fn write_asset(root: &Path, filename: &str, data: &[u8]) -> std::io::Result<PathBuf> {
let path = root.join(filename); let path = root.join(filename);
// Ensure parent directory exists (handles type subdirectories)
if let Some(parent) = path.parent() {
std::fs::create_dir_all(parent)?;
}
std::fs::write(&path, data)?; std::fs::write(&path, data)?;
Ok(path) Ok(path)
} }
/// Read asset bytes from the storage root. /// Load the raw bytes of a stored file from the storage directory.
/// `filename` may include a subdirectory prefix.
pub fn read_asset(root: &Path, filename: &str) -> std::io::Result<Vec<u8>> { pub fn read_asset(root: &Path, filename: &str) -> std::io::Result<Vec<u8>> {
let path = root.join(filename); let path = root.join(filename);
std::fs::read(path) std::fs::read(path)
} }
/// Move an asset file to the .trash directory. /// Move a file from the storage directory into the .trash/ folder (soft delete).
/// Handles filenames with subdirectory prefixes (e.g. "images/file.jpg").
pub fn trash_asset_file(root: &Path, filename: &str) -> std::io::Result<()> { pub fn trash_asset_file(root: &Path, filename: &str) -> std::io::Result<()> {
let src = root.join(filename); let src = root.join(filename);
let trash_dir = root.join(".trash"); let trash_dir = root.join(".trash");
std::fs::create_dir_all(&trash_dir)?; std::fs::create_dir_all(&trash_dir)?;
// Use just the file basename in trash (flatten subdirectory structure) let dst = trash_dir.join(filename);
let basename = Path::new(filename)
.file_name()
.unwrap_or_else(|| std::ffi::OsStr::new(filename));
let dst = trash_dir.join(basename);
std::fs::rename(src, dst)?; std::fs::rename(src, dst)?;
Ok(()) Ok(())
} }
/// Parse a physical filename to extract the hash component. /// Extract the SHA-256 hash from a CAN filename.
/// /// Expects format: `{timestamp}_{sha256hash}_{tags}.{ext}`
/// New format: `{type_dir}/{YYYY-MM-DD_HH-MM}_{hash8}.{ext}` /// Returns None if the filename doesn't match the expected pattern.
/// Legacy format: `{timestamp}_{sha256_64}_{tags}.{ext}`
///
/// Returns the hash portion (8 chars for new format, 64 chars for legacy).
pub fn parse_hash_from_filename(filename: &str) -> Option<String> { pub fn parse_hash_from_filename(filename: &str) -> Option<String> {
// Strip any directory prefix
let basename = filename.rsplit('/').next().unwrap_or(filename);
let basename = basename.rsplit('\\').next().unwrap_or(basename);
// Remove extension // Remove extension
let stem = basename.rsplit_once('.')?.0; let stem = filename.rsplit_once('.')?.0;
// Split by underscore: first part is timestamp, second is hash (64 hex chars)
let parts: Vec<&str> = stem.splitn(3, '_').collect(); let parts: Vec<&str> = stem.splitn(3, '_').collect();
// New format: YYYY-MM-DD_HH-MM_hash8
// After splitn(3, '_'): ["YYYY-MM-DD", "HH-MM", "hash8"]
if parts.len() >= 3 && parts[0].len() == 10 && parts[0].contains('-') {
// New format: third part is the short hash
return Some(parts[2].to_string());
}
// Legacy format: {timestamp}_{sha256_64}_{tags}
if parts.len() >= 2 && parts[1].len() == 64 { if parts.len() >= 2 && parts[1].len() == 64 {
return Some(parts[1].to_string()); Some(parts[1].to_string())
} else {
None
} }
None
} }
/// Parse a physical filename to extract the timestamp component. /// Extract the millisecond timestamp from a CAN filename.
/// /// Returns None if the filename doesn't match the expected pattern.
/// New format: `{type_dir}/{YYYY-MM-DD_HH-MM}_{hash8}.{ext}` → parses date to epoch ms
/// Legacy format: `{timestamp}_{sha256}_{tags}.{ext}` → raw epoch ms
pub fn parse_timestamp_from_filename(filename: &str) -> Option<i64> { pub fn parse_timestamp_from_filename(filename: &str) -> Option<i64> {
// Strip any directory prefix let stem = filename.rsplit_once('.')?.0;
let basename = filename.rsplit('/').next().unwrap_or(filename); let ts_str = stem.split('_').next()?;
let basename = basename.rsplit('\\').next().unwrap_or(basename);
let stem = basename.rsplit_once('.')?.0;
let parts: Vec<&str> = stem.splitn(3, '_').collect();
// New format: YYYY-MM-DD_HH-MM_hash8
if parts.len() >= 2 && parts[0].len() == 10 && parts[0].contains('-') {
let date_str = format!("{}_{}", parts[0], parts[1]);
let dt = chrono::NaiveDateTime::parse_from_str(&date_str, "%Y-%m-%d_%H-%M").ok()?;
let utc = dt.and_utc();
return Some(utc.timestamp_millis());
}
// Legacy format: first part is raw epoch ms
let ts_str = parts.first()?;
ts_str.parse().ok() ts_str.parse().ok()
} }
@ -182,66 +129,23 @@ mod tests {
use tempfile::TempDir; use tempfile::TempDir;
#[test] #[test]
fn test_mime_to_type_dir() { fn test_build_filename_no_tags() {
assert_eq!(mime_to_type_dir("image/jpeg"), "images"); let name = build_filename(1773014400123, "a3b2c4d5e6f7", &[], "application/pdf");
assert_eq!(mime_to_type_dir("image/png"), "images"); assert_eq!(name, "1773014400123_a3b2c4d5e6f7.pdf");
assert_eq!(mime_to_type_dir("application/pdf"), "pdf");
assert_eq!(mime_to_type_dir("text/plain"), "documents");
assert_eq!(mime_to_type_dir("application/json"), "documents");
assert_eq!(mime_to_type_dir("video/mp4"), "video");
assert_eq!(mime_to_type_dir("audio/mpeg"), "audio");
assert_eq!(mime_to_type_dir("application/zip"), "others");
assert_eq!(mime_to_type_dir("application/octet-stream"), "others");
} }
#[test] #[test]
fn test_build_filename_image() { fn test_build_filename_with_tags() {
// 2026-03-13 14:30:00 UTC in ms let tags = vec!["photo".to_string(), "vacation".to_string()];
let ts = 1773412200000i64; let name = build_filename(1773014400123, "a3b2c4d5e6f7", &tags, "image/jpeg");
let hash = "a3b2c4d5e6f7a8b9".to_string(); assert_eq!(name, "1773014400123_a3b2c4d5e6f7_photo_vacation.jpg");
let name = build_filename(ts, &hash, &[], "image/jpeg");
assert_eq!(name, "images/2026-03-13_14-30_a3b2c4d5.jpg");
} }
#[test] #[test]
fn test_build_filename_pdf() { fn test_build_filename_strips_special_chars_from_tags() {
let ts = 1773412200000i64; let tags = vec!["hello world!".to_string(), "test@123".to_string()];
let hash = "deadbeef12345678".to_string(); let name = build_filename(100, "abc", &tags, "text/plain");
let name = build_filename(ts, &hash, &[], "application/pdf"); assert_eq!(name, "100_abc_helloworld_test123.txt");
assert_eq!(name, "pdf/2026-03-13_14-30_deadbeef.pdf");
}
#[test]
fn test_build_filename_text() {
let ts = 1773412200000i64;
let hash = "abcdef0123456789".to_string();
let name = build_filename(ts, &hash, &["ignored".to_string()], "text/plain");
// Tags are ignored in new format
assert_eq!(name, "documents/2026-03-13_14-30_abcdef01.txt");
}
#[test]
fn test_build_filename_video() {
let ts = 1773412200000i64;
let hash = "ff00ff00ff00ff00".to_string();
let name = build_filename(ts, &hash, &[], "video/mp4");
assert_eq!(name, "video/2026-03-13_14-30_ff00ff00.mp4");
}
#[test]
fn test_build_filename_audio() {
let ts = 1773412200000i64;
let hash = "aa11bb22cc33dd44".to_string();
let name = build_filename(ts, &hash, &[], "audio/mpeg");
assert_eq!(name, "audio/2026-03-13_14-30_aa11bb22.mp3");
}
#[test]
fn test_build_filename_others() {
let ts = 1773412200000i64;
let hash = "1234567890abcdef".to_string();
let name = build_filename(ts, &hash, &[], "application/zip");
assert_eq!(name, "others/2026-03-13_14-30_12345678.zip");
} }
#[test] #[test]
@ -253,55 +157,38 @@ mod tests {
} }
#[test] #[test]
fn test_write_and_read_asset_with_subdir() { fn test_write_and_read_asset() {
let dir = TempDir::new().unwrap(); let dir = TempDir::new().unwrap();
let data = b"hello world"; let data = b"hello world";
let filename = "images/2026-01-01_12-00_abcd1234.jpg"; let path = write_asset(dir.path(), "test_file.txt", data).unwrap();
let path = write_asset(dir.path(), filename, data).unwrap();
assert!(path.exists()); assert!(path.exists());
assert!(dir.path().join("images").is_dir());
let read_back = read_asset(dir.path(), filename).unwrap(); let read_back = read_asset(dir.path(), "test_file.txt").unwrap();
assert_eq!(read_back, data); assert_eq!(read_back, data);
} }
#[test] #[test]
fn test_trash_asset_file_with_subdir() { fn test_trash_asset_file() {
let dir = TempDir::new().unwrap(); let dir = TempDir::new().unwrap();
let filename = "images/2026-01-01_12-00_abcd1234.jpg"; write_asset(dir.path(), "to_trash.txt", b"bye").unwrap();
write_asset(dir.path(), filename, b"bye").unwrap();
trash_asset_file(dir.path(), filename).unwrap(); trash_asset_file(dir.path(), "to_trash.txt").unwrap();
assert!(!dir.path().join(filename).exists()); assert!(!dir.path().join("to_trash.txt").exists());
assert!(dir.path().join(".trash").join("2026-01-01_12-00_abcd1234.jpg").exists()); assert!(dir.path().join(".trash").join("to_trash.txt").exists());
} }
#[test] #[test]
fn test_parse_hash_from_new_filename() { fn test_parse_hash_from_filename() {
assert_eq!(
parse_hash_from_filename("images/2026-03-13_14-30_a3b2c4d5.jpg"),
Some("a3b2c4d5".to_string())
);
}
#[test]
fn test_parse_hash_from_legacy_filename() {
let hash_64 = "a".repeat(64); let hash_64 = "a".repeat(64);
let filename = format!("1773014400123_{}.pdf", hash_64); let filename = format!("1773014400123_{}.pdf", hash_64);
assert_eq!(parse_hash_from_filename(&filename), Some(hash_64)); assert_eq!(parse_hash_from_filename(&filename), Some(hash_64.clone()));
let filename_tags = format!("1773014400123_{}_photo_vacation.jpg", hash_64);
assert_eq!(parse_hash_from_filename(&filename_tags), Some(hash_64));
} }
#[test] #[test]
fn test_parse_timestamp_from_new_filename() { fn test_parse_timestamp_from_filename() {
let ts = parse_timestamp_from_filename("images/2026-03-13_14-30_a3b2c4d5.jpg");
assert!(ts.is_some());
let ts = ts.unwrap();
// Should be 2026-03-13 14:30 UTC in millis
assert_eq!(ts, 1773412200000);
}
#[test]
fn test_parse_timestamp_from_legacy_filename() {
let hash_64 = "b".repeat(64); let hash_64 = "b".repeat(64);
let filename = format!("1773014400123_{}.pdf", hash_64); let filename = format!("1773014400123_{}.pdf", hash_64);
assert_eq!(parse_timestamp_from_filename(&filename), Some(1773014400123)); assert_eq!(parse_timestamp_from_filename(&filename), Some(1773014400123));

View File

@ -11,10 +11,10 @@ use crate::models::FileAttributes;
use crate::storage::{parse_hash_from_filename, parse_timestamp_from_filename}; use crate::storage::{parse_hash_from_filename, parse_timestamp_from_filename};
use crate::xattr; use crate::xattr;
/// Start the background verifier subsystem. /// Launch the background integrity checker. It does three things:
/// - Runs an initial full scrub /// 1. Immediately scans all files to detect corruption or missing data.
/// - Watches for filesystem changes /// 2. Watches the storage folder for file changes and re-checks them in real time.
/// - Runs periodic scrubs /// 3. Re-runs the full scan on a timer (configurable in config.yaml).
pub fn start(config: Config, db: Db) { pub fn start(config: Config, db: Db) {
let config2 = config.clone(); let config2 = config.clone();
let db2 = db.clone(); let db2 = db.clone();
@ -58,6 +58,7 @@ fn config3_for_watcher(config: Config) -> Config {
config config
} }
/// Watch the storage directory for file changes and verify each changed file.
async fn run_watcher(config: Config, db: Db) -> anyhow::Result<()> { async fn run_watcher(config: Config, db: Db) -> anyhow::Result<()> {
let (tx, mut rx) = mpsc::channel::<PathBuf>(100); let (tx, mut rx) = mpsc::channel::<PathBuf>(100);
let storage_root = config.storage_root.clone(); let storage_root = config.storage_root.clone();
@ -114,7 +115,9 @@ async fn run_watcher(config: Config, db: Db) -> anyhow::Result<()> {
Ok(()) Ok(())
} }
/// Run a full scrub: verify every active asset's hash. /// Full integrity scan: re-hashes every active file on disk and compares it
/// to the expected hash in the database. Also syncs OS-level file attributes
/// and backfills missing file sizes.
async fn run_scrub(config: &Config, db: &Db) -> anyhow::Result<()> { async fn run_scrub(config: &Config, db: &Db) -> anyhow::Result<()> {
let assets = { let assets = {
let conn = db.lock().unwrap(); let conn = db.lock().unwrap();
@ -276,7 +279,8 @@ async fn run_scrub(config: &Config, db: &Db) -> anyhow::Result<()> {
Ok(()) Ok(())
} }
/// Verify a single file by its physical filename. /// Re-hash a single file and flag it as corrupted if the hash doesn't match.
/// Called when the filesystem watcher detects a change.
async fn verify_single_file( async fn verify_single_file(
config: &Config, config: &Config,
db: &Db, db: &Db,

View File

@ -27,7 +27,8 @@ pub fn read_attributes(path: &Path) -> std::io::Result<FileAttributes> {
} }
} }
// ── Unix implementation using xattr crate ── // ── Unix implementation ──
// Stores each metadata field as an extended attribute (e.g. "user.can.mime_type").
#[cfg(unix)] #[cfg(unix)]
fn write_xattr(path: &Path, attrs: &FileAttributes) -> std::io::Result<()> { fn write_xattr(path: &Path, attrs: &FileAttributes) -> std::io::Result<()> {
@ -58,6 +59,7 @@ fn write_xattr(path: &Path, attrs: &FileAttributes) -> std::io::Result<()> {
Ok(()) Ok(())
} }
/// Read all CAN metadata from Unix extended attributes on a file.
#[cfg(unix)] #[cfg(unix)]
fn read_xattr(path: &Path) -> std::io::Result<FileAttributes> { fn read_xattr(path: &Path) -> std::io::Result<FileAttributes> {
use xattr::FileExt; use xattr::FileExt;
@ -81,8 +83,10 @@ fn read_xattr(path: &Path) -> std::io::Result<FileAttributes> {
}) })
} }
// ── Windows implementation using NTFS Alternate Data Streams ── // ── Windows implementation ──
// Stores each metadata field as an NTFS Alternate Data Stream (e.g. "file.txt:can.mime_type").
/// Write CAN metadata fields as NTFS Alternate Data Streams on a file.
#[cfg(windows)] #[cfg(windows)]
fn write_ntfs_ads(path: &Path, attrs: &FileAttributes) -> std::io::Result<()> { fn write_ntfs_ads(path: &Path, attrs: &FileAttributes) -> std::io::Result<()> {
let base = path.to_string_lossy(); let base = path.to_string_lossy();
@ -111,6 +115,7 @@ fn write_ntfs_ads(path: &Path, attrs: &FileAttributes) -> std::io::Result<()> {
Ok(()) Ok(())
} }
/// Read all CAN metadata from NTFS Alternate Data Streams on a file.
#[cfg(windows)] #[cfg(windows)]
fn read_ntfs_ads(path: &Path) -> std::io::Result<FileAttributes> { fn read_ntfs_ads(path: &Path) -> std::io::Result<FileAttributes> {
let base = path.to_string_lossy(); let base = path.to_string_lossy();