Compare commits
3 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| 689d14202b | |||
| e7def4b819 | |||
| 620966872e |
162
README.md
Normal file
162
README.md
Normal file
@ -0,0 +1,162 @@
|
||||
# CAN Service
|
||||
|
||||
**Containerized Asset Network** -- A self-healing local storage daemon with HTTP REST and Protobuf APIs for ingesting, managing, and retrieving files.
|
||||
|
||||
CAN stores any file you throw at it, tags it with metadata, verifies integrity in the background, and syncs between machines over encrypted P2P connections. Think of it as a personal S3 that runs on your laptop and replicates to your other devices automatically.
|
||||
|
||||
---
|
||||
|
||||
## Quick Start
|
||||
|
||||
```bash
|
||||
# Build and run (listens on port 3210)
|
||||
cargo run
|
||||
```
|
||||
|
||||
The service reads `config.yaml` from the current directory:
|
||||
|
||||
```yaml
|
||||
storage_root: "./can_data"
|
||||
admin_token: "super_secret_rebuild"
|
||||
enable_thumbnail_cache: true
|
||||
verify_interval_hours: 12
|
||||
sync_api_key: "can-sync-default-key" # enables P2P sync endpoints
|
||||
```
|
||||
|
||||
Override the port with `CAN_PORT=8080 cargo run`.
|
||||
|
||||
### Upload a file
|
||||
|
||||
```bash
|
||||
curl -X POST http://localhost:3210/api/v1/can/0/ingest \
|
||||
-F "file=@photo.jpg" \
|
||||
-F "tags=vacation,summer" \
|
||||
-F "application=my-app"
|
||||
```
|
||||
|
||||
### Upload JSON data (agent-friendly)
|
||||
|
||||
```bash
|
||||
curl -X POST http://localhost:3210/api/v1/can/0/ingest/data \
|
||||
-H "Content-Type: application/json" \
|
||||
-d '{"data": {"key": "value"}, "tags": "config,backup"}'
|
||||
```
|
||||
|
||||
### Download a file
|
||||
|
||||
```bash
|
||||
curl http://localhost:3210/api/v1/can/0/asset/{hash} -o file.jpg
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
## How It Works
|
||||
|
||||
```
|
||||
+-----------+
|
||||
Upload ---->| |----> SQLite index (millisecond queries)
|
||||
| CAN |
|
||||
Download <---| Service |----> Flat file storage (one file per asset)
|
||||
| |
|
||||
Search ---->| port 3210 |----> OS file attributes (disaster recovery)
|
||||
| |
|
||||
SSE <----| |----> Background verifier (integrity checks)
|
||||
+-----------+
|
||||
|
|
||||
P2P Sync (protobuf over QUIC)
|
||||
|
|
||||
+-----------+
|
||||
| CAN |
|
||||
| Service | (another machine)
|
||||
| port 3210 |
|
||||
+-----------+
|
||||
```
|
||||
|
||||
Each asset is saved as `{timestamp}_{sha256hash}_{tags}.{ext}` in a flat directory. Metadata lives in SQLite for fast queries and is redundantly written to OS-level file attributes (xattr on macOS/Linux, NTFS ADS on Windows) so you can recover even if the database is lost.
|
||||
|
||||
A background verifier re-hashes every file periodically and flags corruption. It also watches for filesystem changes in real time.
|
||||
|
||||
---
|
||||
|
||||
## API
|
||||
|
||||
All endpoints live under `/api/v1/can/0/`. See [API.md](API.md) for the full specification.
|
||||
|
||||
| Method | Path | Description |
|
||||
|--------|------|-------------|
|
||||
| `POST` | `/ingest` | Upload a file (multipart form) |
|
||||
| `POST` | `/ingest/data` | Upload JSON data (no multipart needed) |
|
||||
| `GET` | `/asset/{hash}` | Download an asset by its SHA-256 hash |
|
||||
| `GET` | `/asset/{hash}/meta` | Get metadata as JSON |
|
||||
| `PATCH` | `/asset/{hash}` | Update tags and/or description |
|
||||
| `GET` | `/asset/{hash}/thumb/{w}/{h}` | Get a resized thumbnail (images only) |
|
||||
| `GET` | `/list` | Paginated listing with filters |
|
||||
| `GET` | `/search` | Search by hash prefix, time range, MIME, tags, etc. |
|
||||
| `GET` | `/events` | SSE stream of new asset notifications |
|
||||
|
||||
Private sync endpoints (`/sync/*`) use protobuf and require the `X-Sync-Key` header.
|
||||
|
||||
---
|
||||
|
||||
## Examples
|
||||
|
||||
Four example apps show what you can build on top of CAN:
|
||||
|
||||
| Example | Port | Description | README |
|
||||
|---------|------|-------------|--------|
|
||||
| **[Paste](examples/paste/)** | 3211 | Pastebin -- type text or paste images, auto-tags with #hashtags | [README](examples/paste/README.md) |
|
||||
| **[File Manager](examples/filemanager/)** | 3212 | Web file browser with grid/list views, search, and filters | [README](examples/filemanager/README.md) |
|
||||
| **[CAN Sync](examples/can-sync/)** | -- | P2P replication agent -- encrypted sync via shared passphrase | [README](examples/can-sync/README.md) |
|
||||
| **[CanFS](examples/canfs/)** | -- | Mount assets as a read-only Windows drive (WinFSP) | [README](examples/canfs/README.md) |
|
||||
|
||||
### Run everything at once
|
||||
|
||||
```powershell
|
||||
# Windows
|
||||
.\go_example_1.ps1
|
||||
|
||||
# macOS / Linux
|
||||
./go_example_1.sh
|
||||
```
|
||||
|
||||
Starts CAN Service + Sync Agent + Paste, builds everything, cleans up on Ctrl+C.
|
||||
|
||||
---
|
||||
|
||||
## Configuration
|
||||
|
||||
| Field | Default | Description |
|
||||
|-------|---------|-------------|
|
||||
| `storage_root` | (required) | Directory where assets and the database are stored |
|
||||
| `admin_token` | `"changeme"` | Bearer token for admin endpoints |
|
||||
| `enable_thumbnail_cache` | `true` | Cache resized thumbnails in `.thumbs/` |
|
||||
| `rebuild_error_threshold` | `50` | Max errors before triggering a full rebuild |
|
||||
| `verify_interval_hours` | `12` | Hours between full integrity scans |
|
||||
| `sync_api_key` | (none) | API key for sync endpoints; omit to disable sync |
|
||||
|
||||
---
|
||||
|
||||
## Project Structure
|
||||
|
||||
```
|
||||
src/
|
||||
main.rs Entry point: config, DB, verifier, HTTP server
|
||||
config.rs YAML config loading
|
||||
db.rs SQLite CRUD (assets, tags, search)
|
||||
hash.rs SHA-256 content hashing
|
||||
storage.rs File I/O (write, read, trash, filename parsing)
|
||||
verifier.rs Background integrity checker + file watcher
|
||||
xattr.rs OS-level file attributes (xattr / NTFS ADS)
|
||||
routes/ HTTP API handlers (ingest, asset, list, search, thumb, sync, events)
|
||||
examples/
|
||||
paste/ Pastebin web app
|
||||
filemanager/ File browser web app
|
||||
can-sync/ P2P sync agent (iroh + gossip + pkarr)
|
||||
canfs/ Windows virtual filesystem (WinFSP)
|
||||
```
|
||||
|
||||
## Requirements
|
||||
|
||||
- **Rust** 1.75+
|
||||
- **SQLite** bundled (no system install needed)
|
||||
- **WinFSP** only for the canfs example (Windows only)
|
||||
@ -1,263 +1,91 @@
|
||||
# CAN Sync
|
||||
|
||||
P2P file synchronization service that runs on top of [CAN Service](../../). Uses [iroh](https://iroh.computer/) for encrypted peer-to-peer networking with NAT traversal.
|
||||
P2P full-mirror replication for [CAN Service](../../). Two machines with the same passphrase automatically discover each other and sync all assets over encrypted connections. No port forwarding or static IPs needed.
|
||||
|
||||
```
|
||||
┌─────────────┐ HTTP API ┌─────────────┐ iroh (QUIC) ┌─────────────┐
|
||||
│ CAN Service │◄───────────►│ CAN Sync │◄─────────────►│ CAN Sync │
|
||||
│ (port 3210)│ │ (port 3213)│ │ (remote) │
|
||||
│ storage + │ │ P2P node + │ │ │
|
||||
│ SQLite │ │ libraries │ │ │
|
||||
└─────────────┘ └─────────────┘ └─────────────┘
|
||||
┌─────────────┐ protobuf ┌─────────────┐ iroh (QUIC) ┌─────────────┐ protobuf ┌─────────────┐
|
||||
│ CAN Service │◄───────────►│ CAN Sync │◄─────────────►│ CAN Sync │◄───────────►│ CAN Service │
|
||||
│ Machine A │ sync API │ Agent A │ encrypted │ Agent B │ sync API │ Machine B │
|
||||
│ port 3210 │ │ │ │ │ │ port 3210 │
|
||||
└─────────────┘ └─────────────┘ └─────────────┘ └─────────────┘
|
||||
```
|
||||
|
||||
CAN Sync communicates with CAN Service **only** via its public HTTP API — zero changes to CAN Service required.
|
||||
|
||||
## Quick Start
|
||||
|
||||
1. **Start CAN Service** (default port 3210):
|
||||
```bash
|
||||
cd ../..
|
||||
cargo run
|
||||
```
|
||||
|
||||
2. **Edit config** (optional — defaults work out of the box):
|
||||
```bash
|
||||
cp config.yaml my-config.yaml
|
||||
# edit my-config.yaml if needed
|
||||
```
|
||||
|
||||
3. **Start CAN Sync**:
|
||||
1. **Start CAN Service** on each machine (port 3210):
|
||||
```bash
|
||||
cargo run
|
||||
# or with a custom config:
|
||||
cargo run -- my-config.yaml
|
||||
```
|
||||
|
||||
CAN Sync starts on `http://127.0.0.1:3213` and connects to CAN Service at `http://127.0.0.1:3210/api/v1/can/0`.
|
||||
2. **Configure the sync agent** -- edit `config.yaml`:
|
||||
```yaml
|
||||
can_service_url: "http://127.0.0.1:3210"
|
||||
sync_api_key: "can-sync-default-key"
|
||||
sync_passphrase: "my-secret-phrase" # must be the same on all machines
|
||||
poll_interval_secs: 30
|
||||
```
|
||||
|
||||
3. **Start the sync agent** on each machine:
|
||||
```bash
|
||||
cd examples/can-sync
|
||||
cargo run -- config.yaml
|
||||
```
|
||||
|
||||
That's it. Any file uploaded to either CAN Service will appear on the other within seconds.
|
||||
|
||||
## How It Works
|
||||
|
||||
### Peer Discovery
|
||||
|
||||
Peers find each other through two mechanisms (both run simultaneously):
|
||||
|
||||
- **Gossip** -- [iroh-gossip](https://docs.rs/iroh-gossip) uses a topic derived from the shared passphrase. Peers on the same local network or connected to the same relay discover each other by broadcasting their node IDs.
|
||||
- **Internet rendezvous** -- Each agent publishes its node ID to [pkarr](https://pkarr.org) relay servers using deterministic DNS-like "slots" derived from the passphrase. All agents scan these slots periodically to find peers worldwide.
|
||||
|
||||
### Sync Protocol
|
||||
|
||||
Once two peers connect over iroh's encrypted QUIC transport:
|
||||
|
||||
1. **Hash exchange** -- Both sides send their full list of asset hashes
|
||||
2. **Diff** -- Each side computes what the other is missing
|
||||
3. **Transfer** -- Missing assets are sent concurrently in both directions (metadata + file content bundled together as protobuf)
|
||||
4. **Live sync** -- After the initial reconciliation, each agent subscribes to SSE events from its local CAN Service. When a new asset is ingested locally, it's pushed to the connected peer instantly.
|
||||
|
||||
The live sync uses SSE events (not polling) for instant propagation. A fallback incremental poll runs every 30 seconds as a safety net.
|
||||
|
||||
### Echo Prevention
|
||||
|
||||
When peer A sends an asset to peer B, B's CAN Service emits an SSE event for the new ingest. Without protection, B would try to push that asset right back to A. The sync agent tracks which hashes were received from each peer and filters them out of the push loop.
|
||||
|
||||
## Configuration
|
||||
|
||||
`config.yaml`:
|
||||
| Field | Default | Description |
|
||||
|-------|---------|-------------|
|
||||
| `can_service_url` | (required) | URL of the local CAN Service |
|
||||
| `sync_api_key` | (required) | Must match `sync_api_key` in CAN Service's config |
|
||||
| `sync_passphrase` | (required) | Shared secret for peer discovery (all peers must match) |
|
||||
| `poll_interval_secs` | `3` | Fallback poll interval for catching missed events |
|
||||
| `ticket_file` | (none) | Write this node's address to a file (for direct connection in tests) |
|
||||
| `connect_ticket_file` | (none) | Read a peer's address from a file (for direct connection in tests) |
|
||||
|
||||
```yaml
|
||||
# URL of the local CAN Service API
|
||||
can_service_url: "http://127.0.0.1:3210/api/v1/can/0"
|
||||
|
||||
# Address for the CAN Sync HTTP API
|
||||
listen_addr: "127.0.0.1:3213"
|
||||
|
||||
# Directory for persistent data (peer key, sync state DB)
|
||||
data_dir: "./can_sync_data"
|
||||
|
||||
# Custom relay server URL (null = iroh's public relay)
|
||||
relay_url: null
|
||||
|
||||
# Seconds between fast polls for new assets
|
||||
poll_interval_secs: 5
|
||||
|
||||
# Seconds between full scans of all assets
|
||||
full_scan_interval_secs: 300
|
||||
```
|
||||
|
||||
## Concepts
|
||||
|
||||
### Libraries
|
||||
|
||||
A **library** is a shared collection of CAN assets that syncs between peers. Each library has a **filter** that determines which assets belong to it.
|
||||
|
||||
Filter options (combined with AND logic):
|
||||
- `application` — match assets with this application tag (e.g. `"paste"`)
|
||||
- `tags` — match assets with any of these tags (e.g. `["photos", "backup"]`)
|
||||
- `user` — match assets from this user identity
|
||||
- `mime_prefix` — match assets whose MIME type starts with this (e.g. `"image/"`)
|
||||
- `hashes` — manual list of specific asset hashes to include
|
||||
|
||||
### Sync Flow
|
||||
|
||||
**Outbound** (local → remote):
|
||||
1. Announcer polls CAN Service for new/changed assets
|
||||
2. Assets matching a library's filter get announced to the library's iroh document
|
||||
3. iroh replicates the entry to all subscribed peers
|
||||
4. Remote peer's fetcher downloads the blob and ingests it into their local CAN Service
|
||||
|
||||
**Inbound** (remote → local):
|
||||
1. iroh document receives new entry from remote peer
|
||||
2. Fetcher downloads the blob via iroh's encrypted QUIC transport
|
||||
3. Fetcher verifies the CAN hash (SHA-256) independently
|
||||
4. Fetcher ingests the file into local CAN Service with all metadata preserved
|
||||
|
||||
## API
|
||||
|
||||
All endpoints return JSON with `{ "status": "success", "data": ... }` or `{ "status": "error", "error": "..." }`.
|
||||
|
||||
### Status & Peers
|
||||
|
||||
| Method | Endpoint | Description |
|
||||
|--------|----------|-------------|
|
||||
| GET | `/status` | Node status, CAN service health, library count |
|
||||
| GET | `/peers` | Connected peers list |
|
||||
|
||||
### Libraries
|
||||
|
||||
| Method | Endpoint | Description |
|
||||
|--------|----------|-------------|
|
||||
| POST | `/libraries` | Create a library |
|
||||
| GET | `/libraries` | List all libraries |
|
||||
| GET | `/libraries/{id}` | Get library details |
|
||||
| DELETE | `/libraries/{id}` | Remove a library |
|
||||
|
||||
### Sharing
|
||||
|
||||
| Method | Endpoint | Description |
|
||||
|--------|----------|-------------|
|
||||
| POST | `/libraries/{id}/invite` | Generate a share ticket |
|
||||
| POST | `/join` | Join a library from a ticket |
|
||||
|
||||
### Examples
|
||||
|
||||
**Create a library** that syncs all assets with `application=paste`:
|
||||
```bash
|
||||
curl -X POST http://127.0.0.1:3213/libraries \
|
||||
-H "Content-Type: application/json" \
|
||||
-d '{"name": "my-pastes", "filter": {"application": "paste"}}'
|
||||
```
|
||||
|
||||
**Create a library** that syncs all images:
|
||||
```bash
|
||||
curl -X POST http://127.0.0.1:3213/libraries \
|
||||
-H "Content-Type: application/json" \
|
||||
-d '{"name": "images", "filter": {"mime_prefix": "image/"}}'
|
||||
```
|
||||
|
||||
**Generate an invite ticket** to share with another machine:
|
||||
```bash
|
||||
curl -X POST http://127.0.0.1:3213/libraries/{id}/invite
|
||||
```
|
||||
|
||||
**Join a library** on another machine using the ticket:
|
||||
```bash
|
||||
curl -X POST http://127.0.0.1:3213/join \
|
||||
-H "Content-Type: application/json" \
|
||||
-d '{"ticket": "eyJsaWJyYXJ5X25hbWUiOi..."}'
|
||||
```
|
||||
|
||||
**List all libraries**:
|
||||
```bash
|
||||
curl http://127.0.0.1:3213/libraries
|
||||
```
|
||||
|
||||
**Check status**:
|
||||
```bash
|
||||
curl http://127.0.0.1:3213/status
|
||||
```
|
||||
|
||||
## Two-Machine Setup
|
||||
|
||||
### Machine A (the host)
|
||||
|
||||
**1. Start CAN Service** (default port 3210):
|
||||
```bash
|
||||
cd /path/to/CanService
|
||||
cargo run
|
||||
```
|
||||
|
||||
**2. Start CAN Sync** with default config (port 3213):
|
||||
```bash
|
||||
cd examples/can-sync
|
||||
cargo run
|
||||
```
|
||||
|
||||
**3. Create a library** (e.g. sync all images):
|
||||
```bash
|
||||
curl -X POST http://127.0.0.1:3213/libraries \
|
||||
-H "Content-Type: application/json" \
|
||||
-d '{"name": "shared-images", "filter": {"mime_prefix": "image/"}}'
|
||||
```
|
||||
Save the `id` from the response (e.g. `"id": "a1b2c3d4-..."`).
|
||||
|
||||
**4. Generate an invite ticket:**
|
||||
```bash
|
||||
curl -X POST http://127.0.0.1:3213/libraries/a1b2c3d4-.../invite
|
||||
```
|
||||
Copy the `ticket` string from the response — this is what Machine B needs.
|
||||
|
||||
### Machine B (the joiner)
|
||||
|
||||
**1. Start CAN Service** on a different port:
|
||||
```bash
|
||||
cd /path/to/CanService
|
||||
CAN_PORT=3220 cargo run
|
||||
```
|
||||
|
||||
**2. Create a config file** for CAN Sync pointing at Machine B's CAN Service:
|
||||
```yaml
|
||||
# machine-b-config.yaml
|
||||
can_service_url: "http://127.0.0.1:3220/api/v1/can/0"
|
||||
listen_addr: "127.0.0.1:3223"
|
||||
data_dir: "./can_sync_data_b"
|
||||
```
|
||||
|
||||
**3. Start CAN Sync** with that config:
|
||||
```bash
|
||||
cd examples/can-sync
|
||||
cargo run -- machine-b-config.yaml
|
||||
```
|
||||
|
||||
**4. Join the library** using Machine A's ticket:
|
||||
```bash
|
||||
curl -X POST http://127.0.0.1:3223/join \
|
||||
-H "Content-Type: application/json" \
|
||||
-d '{"ticket": "eyJsaWJyYXJ5X25hbWUiOi..."}'
|
||||
```
|
||||
|
||||
### Verify it works
|
||||
|
||||
**Ingest a file on Machine A:**
|
||||
```bash
|
||||
curl -X POST http://127.0.0.1:3210/api/v1/can/0/ingest \
|
||||
-F "file=@photo.jpg" \
|
||||
-F "mime_type=image/jpeg"
|
||||
```
|
||||
|
||||
**Check Machine B** — the file should appear within a few seconds:
|
||||
```bash
|
||||
curl http://127.0.0.1:3220/api/v1/can/0/list?limit=5
|
||||
```
|
||||
|
||||
The same image (with matching hash and metadata) will be in Machine B's CAN Service, synced over iroh's encrypted P2P connection.
|
||||
|
||||
## Architecture
|
||||
|
||||
```
|
||||
src/
|
||||
├── main.rs — entry point: config, iroh node, announcer, fetcher, HTTP server
|
||||
├── config.rs — YAML config loading
|
||||
├── can_client.rs — HTTP client for CAN Service API (list, search, ingest, meta, etc.)
|
||||
├── node.rs — iroh endpoint + blobs + docs + gossip + router
|
||||
├── library.rs — library/filter definitions + SQLite state tracking
|
||||
├── manifest.rs — AssetSyncEntry serialized into iroh document entries
|
||||
├── announcer.rs — polls CAN Service, announces matching assets to libraries
|
||||
├── fetcher.rs — receives remote entries, downloads blobs, ingests into CAN Service
|
||||
└── routes.rs — Axum HTTP API handlers
|
||||
```
|
||||
CAN Service must have `sync_api_key` set in its `config.yaml` for the sync endpoints to be enabled.
|
||||
|
||||
## Security
|
||||
|
||||
- **Transport**: All peer-to-peer traffic is encrypted with QUIC + TLS 1.3 (mandatory in iroh)
|
||||
- **Identity**: Each node has an Ed25519 keypair generated on first run
|
||||
- **Access control**: Library access via cryptographic capability tickets — only peers with a valid ticket can read/write
|
||||
- **NAT traversal**: iroh's built-in relay servers and hole-punching
|
||||
- **Hash verification**: Downloaded files are independently verified against CAN's SHA-256 hash before ingestion
|
||||
- **Transport** -- All peer traffic is encrypted with QUIC + TLS 1.3 (mandatory in iroh)
|
||||
- **Identity** -- Each node gets an Ed25519 keypair on first run
|
||||
- **Discovery** -- Only peers with the same passphrase can find each other
|
||||
- **Hash verification** -- Every received asset is re-hashed and compared before being stored
|
||||
|
||||
## Current Status
|
||||
## Project Structure
|
||||
|
||||
The service compiles and runs with the following fully implemented:
|
||||
- iroh P2P node startup with all protocol handlers (blobs, docs, gossip)
|
||||
- CAN Service HTTP client with full API coverage
|
||||
- Library management with SQLite persistence
|
||||
- Announcer polling loop (fast + full scan) with real iroh-docs writes
|
||||
- Fetcher with iroh document event subscription for real-time sync
|
||||
- Fetcher blob download via iroh and CAN hash verification before ingestion
|
||||
- Real DocTicket-based invite/join with cryptographic capability tokens
|
||||
- HTTP API for library CRUD, invite, and join
|
||||
```
|
||||
src/
|
||||
main.rs Entry point: config, iroh endpoint, discovery, peer connections
|
||||
config.rs YAML config loading
|
||||
can_client.rs HTTP client for CAN Service's sync API (protobuf + SSE)
|
||||
protocol.rs Protobuf message types (shared with CAN Service)
|
||||
discovery.rs Peer discovery via iroh-gossip
|
||||
rendezvous.rs Internet peer discovery via pkarr relay
|
||||
peer.rs Per-peer sync: reconciliation, live push/receive, echo prevention
|
||||
```
|
||||
|
||||
@ -27,6 +27,7 @@ pub struct CanSyncClient {
|
||||
}
|
||||
|
||||
impl CanSyncClient {
|
||||
/// Create a new client pointed at the given CAN service URL, authenticated with the sync API key.
|
||||
pub fn new(base_url: &str, sync_key: &str) -> Self {
|
||||
Self {
|
||||
http: reqwest::Client::new(),
|
||||
@ -182,7 +183,8 @@ impl CanSyncClient {
|
||||
rx
|
||||
}
|
||||
|
||||
/// Internal: connect to SSE and forward events until the stream ends or errors.
|
||||
// Connect to the SSE endpoint and forward parsed events to the channel
|
||||
// until the stream ends or an error occurs.
|
||||
async fn run_sse_stream(
|
||||
http: &reqwest::Client,
|
||||
url: &str,
|
||||
|
||||
@ -1,6 +1,7 @@
|
||||
use serde::Deserialize;
|
||||
use std::path::Path;
|
||||
|
||||
/// All settings needed to run the sync agent, loaded from a YAML file.
|
||||
#[derive(Debug, Clone, Deserialize)]
|
||||
pub struct SyncConfig {
|
||||
/// Base URL of the local CAN service (e.g. "http://127.0.0.1:3210")
|
||||
@ -32,6 +33,7 @@ fn default_poll_interval() -> u64 {
|
||||
}
|
||||
|
||||
impl SyncConfig {
|
||||
/// Read a YAML config file from disk and parse it into a SyncConfig.
|
||||
pub fn load(path: &Path) -> anyhow::Result<Self> {
|
||||
let contents = std::fs::read_to_string(path)?;
|
||||
let config: Self = serde_yaml::from_str(&contents)?;
|
||||
|
||||
@ -27,6 +27,7 @@ pub struct Discovery {
|
||||
}
|
||||
|
||||
impl Discovery {
|
||||
/// Create a new Discovery that listens on a gossip topic derived from the shared passphrase.
|
||||
pub fn new(endpoint: Endpoint, gossip: Gossip, passphrase: &str) -> Self {
|
||||
let topic = derive_topic(passphrase);
|
||||
info!("Gossip topic: {}", hex::encode(topic.as_bytes()));
|
||||
|
||||
@ -30,6 +30,8 @@ use crate::rendezvous::Rendezvous;
|
||||
/// ALPN protocol identifier for CAN sync peer connections.
|
||||
const SYNC_ALPN: &[u8] = b"can-sync/1";
|
||||
|
||||
/// Entry point: loads config, connects to the local CAN service, sets up
|
||||
/// encrypted P2P networking (iroh), and discovers + syncs with peers.
|
||||
#[tokio::main]
|
||||
async fn main() -> Result<()> {
|
||||
// Initialize logging
|
||||
|
||||
@ -40,6 +40,7 @@ pub struct Rendezvous {
|
||||
}
|
||||
|
||||
impl Rendezvous {
|
||||
/// Create a new Rendezvous by deriving keypairs for all slots from the passphrase.
|
||||
pub fn new(passphrase: &str, our_id: EndpointId) -> Result<Self> {
|
||||
let slots: Vec<Keypair> = (0..NUM_SLOTS)
|
||||
.map(|i| derive_slot_keypair(passphrase, i))
|
||||
@ -91,6 +92,7 @@ impl Rendezvous {
|
||||
}
|
||||
}
|
||||
|
||||
// Read every slot and report any newly discovered peer IDs.
|
||||
async fn scan_all_slots(
|
||||
&self,
|
||||
known_peers: &mut HashSet<EndpointId>,
|
||||
@ -111,6 +113,8 @@ impl Rendezvous {
|
||||
}
|
||||
}
|
||||
|
||||
// Pick an available slot for this peer: reuse our old slot, take an empty one,
|
||||
// or fall back to a deterministic slot based on our ID.
|
||||
async fn claim_slot(&self, our_id_hex: &str) -> usize {
|
||||
// Check if we already own a slot (from a previous run)
|
||||
for i in 0..NUM_SLOTS {
|
||||
@ -143,6 +147,7 @@ impl Rendezvous {
|
||||
slot
|
||||
}
|
||||
|
||||
// Write our EndpointId into the given slot's DNS TXT record via the pkarr relay.
|
||||
async fn publish_slot(&self, slot: usize, our_id_hex: &str) -> Result<()> {
|
||||
let keypair = &self.slots[slot];
|
||||
let packet = SignedPacket::builder()
|
||||
@ -163,6 +168,7 @@ impl Rendezvous {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
// Look up a slot's DNS TXT record and parse the EndpointId stored there, if any.
|
||||
async fn read_slot(&self, slot: usize) -> Option<EndpointId> {
|
||||
let public_key = self.slots[slot].public_key();
|
||||
let packet = self.client.resolve(&public_key).await?;
|
||||
|
||||
90
examples/canfs/README.md
Normal file
90
examples/canfs/README.md
Normal file
@ -0,0 +1,90 @@
|
||||
# CanFS
|
||||
|
||||
Mount [CAN Service](../../) assets as a read-only Windows drive using [WinFSP](https://winfsp.dev). Browse your assets in Windows Explorer like regular files.
|
||||
|
||||
## Features
|
||||
|
||||
- **Drive letter mount** -- assets appear as files under a drive like `X:\`
|
||||
- **Virtual folder structure** -- files organized into `CAN\`, `APPLICATION\`, `DATES\`, and `TAGS\` directories
|
||||
- **Lazy file loading** -- file content is fetched from CAN Service only when you actually open/read a file
|
||||
- **Background refresh** -- the file tree updates periodically to pick up new assets
|
||||
|
||||
## Requirements
|
||||
|
||||
- **Windows** (this example uses WinFSP, which is Windows-only)
|
||||
- **[WinFSP](https://winfsp.dev/rel/)** must be installed (the filesystem driver)
|
||||
|
||||
## Running
|
||||
|
||||
Make sure CAN Service is running on port 3210 first:
|
||||
|
||||
```bash
|
||||
# From the repo root
|
||||
cargo run
|
||||
```
|
||||
|
||||
Then mount the filesystem:
|
||||
|
||||
```bash
|
||||
cd examples/canfs
|
||||
cargo run
|
||||
```
|
||||
|
||||
By default, it mounts on `X:`. Customize with flags:
|
||||
|
||||
```bash
|
||||
cargo run -- --mount Z: --can-url http://127.0.0.1:3210/api/v1/can/0 --refresh-secs 30
|
||||
```
|
||||
|
||||
Press Ctrl+C to unmount.
|
||||
|
||||
## Folder Structure
|
||||
|
||||
When mounted, the drive shows these virtual directories:
|
||||
|
||||
```
|
||||
X:\
|
||||
CAN\ All assets by timestamp and hash
|
||||
1710000000000_abc123.pdf
|
||||
1710000005000_def456.jpg
|
||||
|
||||
APPLICATION\ Grouped by the "application" field
|
||||
paste\
|
||||
readme.txt
|
||||
my-app\
|
||||
report.pdf
|
||||
|
||||
DATES\ Grouped by year and month
|
||||
2025\
|
||||
01\
|
||||
photo.jpg
|
||||
03\
|
||||
report.pdf
|
||||
|
||||
TAGS\ One folder per tag
|
||||
vacation\
|
||||
photo.jpg
|
||||
work\
|
||||
report.pdf
|
||||
```
|
||||
|
||||
Files with a `human_filename` show their friendly name in `APPLICATION/`, `DATES/`, and `TAGS/` folders. The `CAN/` folder always shows the raw `{timestamp}_{hash}.{ext}` format.
|
||||
|
||||
## CLI Options
|
||||
|
||||
| Flag | Default | Description |
|
||||
|------|---------|-------------|
|
||||
| `-m, --mount` | `X:` | Drive letter or directory to mount on |
|
||||
| `--can-url` | `http://127.0.0.1:3210/api/v1/can/0` | CAN Service API base URL |
|
||||
| `--refresh-secs` | `60` | Seconds between cache refreshes |
|
||||
|
||||
## Project Structure
|
||||
|
||||
```
|
||||
src/
|
||||
main.rs Entry point: CLI args, WinFSP host setup, background refresh
|
||||
api.rs Blocking HTTP client for CAN Service (list, fetch)
|
||||
fs.rs WinFSP filesystem implementation (open, read, readdir, etc.)
|
||||
tree.rs Virtual directory tree builder (turns flat asset list into folders)
|
||||
util.rs Helpers: MIME-to-extension, timestamp conversion, path sanitization
|
||||
```
|
||||
@ -49,6 +49,7 @@ pub struct CanClient {
|
||||
}
|
||||
|
||||
impl CanClient {
|
||||
/// Create a new client pointed at the given CAN service base URL.
|
||||
pub fn new(base_url: &str) -> Self {
|
||||
Self {
|
||||
client: reqwest::blocking::Client::new(),
|
||||
|
||||
@ -26,6 +26,7 @@ const FILE_ATTRIBUTE_DIRECTORY: u32 = 0x10;
|
||||
const FILE_ATTRIBUTE_READONLY: u32 = 0x01;
|
||||
const FILE_ATTRIBUTE_ARCHIVE: u32 = 0x20;
|
||||
|
||||
// Wrap a raw NTSTATUS error code into WinFSP's error type.
|
||||
fn ntstatus(code: i32) -> FspError {
|
||||
FspError::NTSTATUS(code)
|
||||
}
|
||||
@ -54,6 +55,7 @@ pub struct CanFileContext {
|
||||
impl FileSystemContext for CanFs {
|
||||
type FileContext = CanFileContext;
|
||||
|
||||
/// Called by Windows to check if a file/folder exists and get its basic attributes before opening it.
|
||||
fn get_security_by_name(
|
||||
&self,
|
||||
file_name: &U16CStr,
|
||||
@ -83,6 +85,7 @@ impl FileSystemContext for CanFs {
|
||||
})
|
||||
}
|
||||
|
||||
/// Called when a file or directory is opened; returns a context handle and fills in size/timestamps.
|
||||
fn open(
|
||||
&self,
|
||||
file_name: &U16CStr,
|
||||
@ -142,8 +145,10 @@ impl FileSystemContext for CanFs {
|
||||
})
|
||||
}
|
||||
|
||||
/// Called when a handle is closed; nothing to clean up since content is dropped automatically.
|
||||
fn close(&self, _context: Self::FileContext) {}
|
||||
|
||||
/// Returns up-to-date size and attribute info for an already-opened file or directory.
|
||||
fn get_file_info(
|
||||
&self,
|
||||
context: &Self::FileContext,
|
||||
@ -194,6 +199,7 @@ impl FileSystemContext for CanFs {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Reads file bytes at the given offset; downloads the asset from the CAN service on first access.
|
||||
fn read(
|
||||
&self,
|
||||
context: &Self::FileContext,
|
||||
@ -233,6 +239,7 @@ impl FileSystemContext for CanFs {
|
||||
Ok(count as u32)
|
||||
}
|
||||
|
||||
/// Lists the contents of a directory, including "." and ".." entries, for Windows Explorer and dir commands.
|
||||
fn read_directory(
|
||||
&self,
|
||||
context: &Self::FileContext,
|
||||
@ -308,6 +315,7 @@ impl FileSystemContext for CanFs {
|
||||
Ok(context.dir_buffer.read(marker, buffer))
|
||||
}
|
||||
|
||||
/// Reports the virtual drive's total and free space (shows as a 1 GB read-only volume).
|
||||
fn get_volume_info(&self, out_volume_info: &mut VolumeInfo) -> winfsp::Result<()> {
|
||||
out_volume_info.total_size = 1024 * 1024 * 1024; // 1 GB
|
||||
out_volume_info.free_size = 0;
|
||||
|
||||
@ -17,6 +17,7 @@ use crate::api::CanClient;
|
||||
use crate::fs::{CacheState, CanFs};
|
||||
use crate::tree::VirtualTree;
|
||||
|
||||
/// Command-line arguments for mounting CAN service assets as a virtual Windows drive using WinFSP.
|
||||
#[derive(Parser)]
|
||||
#[command(name = "canfs", about = "Mount CAN service assets as a virtual drive")]
|
||||
struct Args {
|
||||
@ -33,6 +34,7 @@ struct Args {
|
||||
refresh_secs: u64,
|
||||
}
|
||||
|
||||
/// Entry point: connects to the CAN service, builds a virtual file tree, and mounts it as a read-only Windows drive.
|
||||
fn main() {
|
||||
tracing_subscriber::fmt()
|
||||
.with_env_filter(
|
||||
|
||||
@ -131,6 +131,7 @@ struct TreeBuilder {
|
||||
}
|
||||
|
||||
impl TreeBuilder {
|
||||
// Create a new tree builder with an empty root directory node.
|
||||
fn new() -> Self {
|
||||
let root = VNode {
|
||||
name: String::new(),
|
||||
|
||||
52
examples/filemanager/README.md
Normal file
52
examples/filemanager/README.md
Normal file
@ -0,0 +1,52 @@
|
||||
# File Manager
|
||||
|
||||
A web-based file browser for [CAN Service](../../) assets. Grid and list views, search, filters, and a detail modal with previews.
|
||||
|
||||
## Features
|
||||
|
||||
- **Grid and list views** -- toggle between thumbnail cards and a compact file list
|
||||
- **Virtual folder tree** -- assets organized into `CAN/`, `APPLICATION/`, `DATES/`, `TAGS/`, and `TYPE/` folders
|
||||
- **Search** -- filter by filename, description, or hash prefix
|
||||
- **Filters** -- narrow by application, MIME type, tag, or date range
|
||||
- **Detail modal** -- click any file to see full metadata, preview images, and download
|
||||
|
||||
## Running
|
||||
|
||||
Make sure CAN Service is running on port 3210 first:
|
||||
|
||||
```bash
|
||||
# From the repo root
|
||||
cargo run
|
||||
```
|
||||
|
||||
Then start the File Manager:
|
||||
|
||||
```bash
|
||||
cd examples/filemanager
|
||||
cargo run
|
||||
```
|
||||
|
||||
Opens automatically at [http://127.0.0.1:3212](http://127.0.0.1:3212).
|
||||
|
||||
## How It Works
|
||||
|
||||
The Rust backend serves a single-page app and proxies all data requests to CAN Service:
|
||||
|
||||
| File Manager Route | Proxies To | Purpose |
|
||||
|--------------------|-----------|---------|
|
||||
| `GET /` | -- | Serve the HTML/JS/CSS frontend |
|
||||
| `GET /fm/list` | `GET /api/v1/can/0/list` | Paginated asset listing |
|
||||
| `GET /fm/search` | `GET /api/v1/can/0/search` | Search with filters |
|
||||
| `GET /fm/asset/{hash}` | `GET /api/v1/can/0/asset/{hash}` | Download/preview a file |
|
||||
| `GET /fm/asset/{hash}/meta` | `GET /api/v1/can/0/asset/{hash}/meta` | Asset metadata |
|
||||
| `GET /fm/thumb/{hash}` | `GET /api/v1/can/0/asset/{hash}/thumb/200/200` | Thumbnail |
|
||||
|
||||
The virtual folder tree is built entirely in the browser from the flat asset list -- no folder structure exists on disk.
|
||||
|
||||
## Project Structure
|
||||
|
||||
```
|
||||
src/
|
||||
main.rs HTTP server: proxy handlers and query forwarding
|
||||
html.rs Single-page frontend (HTML + CSS + JS, embedded as a string)
|
||||
```
|
||||
@ -9,11 +9,13 @@ use std::collections::HashMap;
|
||||
|
||||
const CAN_API: &str = "http://127.0.0.1:3210/api/v1/can/0";
|
||||
|
||||
// Shared state passed to every request handler; holds a reusable HTTP client.
|
||||
#[derive(Clone)]
|
||||
struct AppState {
|
||||
client: reqwest::Client,
|
||||
}
|
||||
|
||||
/// Web-based file manager UI that proxies requests to the CAN service API.
|
||||
#[tokio::main]
|
||||
async fn main() {
|
||||
tracing_subscriber::fmt()
|
||||
@ -47,6 +49,7 @@ async fn main() {
|
||||
axum::serve(listener, app).await.unwrap();
|
||||
}
|
||||
|
||||
// Return the single-page HTML UI for the file manager.
|
||||
async fn serve_index() -> Html<&'static str> {
|
||||
Html(html::INDEX_HTML)
|
||||
}
|
||||
@ -86,6 +89,7 @@ fn build_qs(params: &HashMap<String, String>) -> String {
|
||||
format!("?{}", qs.join("&"))
|
||||
}
|
||||
|
||||
// Percent-encode a string for use in URL query parameters.
|
||||
fn urlencoding(s: &str) -> String {
|
||||
s.chars()
|
||||
.map(|c| match c {
|
||||
|
||||
51
examples/paste/README.md
Normal file
51
examples/paste/README.md
Normal file
@ -0,0 +1,51 @@
|
||||
# Paste
|
||||
|
||||
A minimal pastebin web app built on [CAN Service](../../). Type text and press Enter, or paste an image from your clipboard. Everything gets stored as a CAN asset.
|
||||
|
||||
## Features
|
||||
|
||||
- **Text paste** -- type and hit Enter to store a text snippet
|
||||
- **Image paste** -- Ctrl+V an image from your clipboard, or click the paperclip to attach a file
|
||||
- **Auto-tagging** -- use `#hashtags` in your text and they're extracted as CAN tags
|
||||
- **Live refresh** -- new pastes appear instantly via Server-Sent Events (including content arriving from P2P sync on another machine)
|
||||
|
||||
## Running
|
||||
|
||||
Make sure CAN Service is running on port 3210 first:
|
||||
|
||||
```bash
|
||||
# From the repo root
|
||||
cargo run
|
||||
```
|
||||
|
||||
Then start Paste:
|
||||
|
||||
```bash
|
||||
cd examples/paste
|
||||
cargo run
|
||||
```
|
||||
|
||||
Opens automatically at [http://127.0.0.1:3211](http://127.0.0.1:3211).
|
||||
|
||||
## How It Works
|
||||
|
||||
Paste is a thin proxy layer. The Rust backend serves a single-page HTML/JS frontend and forwards requests to the CAN Service API:
|
||||
|
||||
| Paste Route | Proxies To | Purpose |
|
||||
|-------------|-----------|---------|
|
||||
| `POST /paste/text` | `POST /api/v1/can/0/ingest` | Store text as a `.txt` asset |
|
||||
| `POST /paste/file` | `POST /api/v1/can/0/ingest` | Store an uploaded file |
|
||||
| `GET /paste/list` | `GET /api/v1/can/0/list?application=paste` | List paste assets |
|
||||
| `GET /paste/asset/{hash}` | `GET /api/v1/can/0/asset/{hash}` | Download an asset |
|
||||
| `GET /paste/thumb/{hash}` | `GET /api/v1/can/0/asset/{hash}/thumb/200/200` | Image thumbnail |
|
||||
| `GET /paste/events` | `GET /api/v1/can/0/events` | SSE stream for live updates |
|
||||
|
||||
All pastes are tagged with `application=paste` so they're scoped separately from other CAN content.
|
||||
|
||||
## Project Structure
|
||||
|
||||
```
|
||||
src/
|
||||
main.rs HTTP server: proxy handlers, tag extraction, SSE relay
|
||||
html.rs Single-page frontend (HTML + CSS + JS, embedded as a string)
|
||||
```
|
||||
@ -12,11 +12,13 @@ use std::net::SocketAddr;
|
||||
|
||||
const CAN_API: &str = "http://127.0.0.1:3210/api/v1/can/0";
|
||||
|
||||
/// Shared HTTP client for talking to the CAN service backend.
|
||||
#[derive(Clone)]
|
||||
struct AppState {
|
||||
client: reqwest::Client,
|
||||
}
|
||||
|
||||
/// JSON body for the text paste endpoint.
|
||||
#[derive(Deserialize)]
|
||||
struct PasteTextRequest {
|
||||
text: String,
|
||||
@ -62,6 +64,7 @@ async fn forward(resp: Result<reqwest::Response, reqwest::Error>) -> Response {
|
||||
|
||||
// ── Handlers ─────────────────────────────────────────────────────────────
|
||||
|
||||
/// Serve the single-page HTML frontend.
|
||||
async fn serve_index() -> Html<&'static str> {
|
||||
Html(html::INDEX_HTML)
|
||||
}
|
||||
@ -227,7 +230,8 @@ async fn proxy_thumb(
|
||||
forward(resp).await
|
||||
}
|
||||
|
||||
/// Proxy SSE events from CAN service so the frontend gets live updates.
|
||||
/// Proxy SSE (Server-Sent Events) from the CAN service to the browser so
|
||||
/// the frontend auto-refreshes when new pastes arrive.
|
||||
async fn paste_events(
|
||||
State(state): State<AppState>,
|
||||
) -> Sse<impl futures_util::Stream<Item = Result<Event, Infallible>>> {
|
||||
@ -289,6 +293,7 @@ async fn paste_events(
|
||||
|
||||
// ── Main ─────────────────────────────────────────────────────────────────
|
||||
|
||||
/// Start the Paste web app: a simple pastebin that stores text and images in CAN service.
|
||||
#[tokio::main]
|
||||
async fn main() {
|
||||
tracing_subscriber::fmt()
|
||||
|
||||
@ -1,6 +1,7 @@
|
||||
use serde::Deserialize;
|
||||
use std::path::{Path, PathBuf};
|
||||
|
||||
/// Application settings loaded from config.yaml at startup.
|
||||
#[derive(Debug, Clone, Deserialize)]
|
||||
pub struct Config {
|
||||
pub storage_root: PathBuf,
|
||||
@ -18,6 +19,7 @@ pub struct Config {
|
||||
pub sync_api_key: Option<String>,
|
||||
}
|
||||
|
||||
// Default values used when a field is missing from config.yaml.
|
||||
fn default_admin_token() -> String {
|
||||
"changeme".to_string()
|
||||
}
|
||||
@ -32,24 +34,29 @@ fn default_verify_interval() -> u64 {
|
||||
}
|
||||
|
||||
impl Config {
|
||||
/// Read and parse the YAML config file from disk.
|
||||
pub fn load(path: &Path) -> anyhow::Result<Self> {
|
||||
let contents = std::fs::read_to_string(path)?;
|
||||
let config: Config = serde_yaml::from_str(&contents)?;
|
||||
Ok(config)
|
||||
}
|
||||
|
||||
/// Returns the path to the SQLite database file inside storage_root.
|
||||
pub fn db_path(&self) -> PathBuf {
|
||||
self.storage_root.join(".can.db")
|
||||
}
|
||||
|
||||
/// Returns the path to the trash folder for soft-deleted files.
|
||||
pub fn trash_dir(&self) -> PathBuf {
|
||||
self.storage_root.join(".trash")
|
||||
}
|
||||
|
||||
/// Returns the path to the cached thumbnail images folder.
|
||||
pub fn thumbs_dir(&self) -> PathBuf {
|
||||
self.storage_root.join(".thumbs")
|
||||
}
|
||||
|
||||
/// Create the storage, trash, and thumbnail directories if they don't exist yet.
|
||||
pub fn ensure_dirs(&self) -> anyhow::Result<()> {
|
||||
std::fs::create_dir_all(&self.storage_root)?;
|
||||
std::fs::create_dir_all(self.trash_dir())?;
|
||||
|
||||
43
src/db.rs
43
src/db.rs
@ -4,8 +4,11 @@ use std::sync::{Arc, Mutex};
|
||||
|
||||
use crate::models::{Asset, AssetMeta, ListParams, SearchParams};
|
||||
|
||||
/// Thread-safe handle to the SQLite database (wrapped in Arc<Mutex> so multiple
|
||||
/// threads can share it safely).
|
||||
pub type Db = Arc<Mutex<Connection>>;
|
||||
|
||||
/// Open (or create) the SQLite database file and set up tables.
|
||||
pub fn open(path: &Path) -> anyhow::Result<Db> {
|
||||
let conn = Connection::open(path)?;
|
||||
conn.execute_batch("PRAGMA journal_mode=WAL; PRAGMA foreign_keys=ON;")?;
|
||||
@ -13,6 +16,7 @@ pub fn open(path: &Path) -> anyhow::Result<Db> {
|
||||
Ok(Arc::new(Mutex::new(conn)))
|
||||
}
|
||||
|
||||
/// Open a temporary in-memory database (used for tests).
|
||||
pub fn open_in_memory() -> anyhow::Result<Db> {
|
||||
let conn = Connection::open_in_memory()?;
|
||||
conn.execute_batch("PRAGMA foreign_keys=ON;")?;
|
||||
@ -20,6 +24,8 @@ pub fn open_in_memory() -> anyhow::Result<Db> {
|
||||
Ok(Arc::new(Mutex::new(conn)))
|
||||
}
|
||||
|
||||
/// Create the assets, tags, and asset_tags tables if they don't already exist,
|
||||
/// and run any pending migrations.
|
||||
fn init_schema(conn: &Connection) -> rusqlite::Result<()> {
|
||||
conn.execute_batch(
|
||||
"
|
||||
@ -66,7 +72,7 @@ fn init_schema(conn: &Connection) -> rusqlite::Result<()> {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Insert a new asset. Returns the row id.
|
||||
/// Save a new asset record to the database. Returns the auto-generated row id.
|
||||
pub fn insert_asset(conn: &Connection, asset: &Asset) -> rusqlite::Result<i64> {
|
||||
conn.execute(
|
||||
"INSERT INTO assets (timestamp, hash, mime_type, application, user_identity, description, actual_filename, human_filename, human_path, size)
|
||||
@ -87,7 +93,7 @@ pub fn insert_asset(conn: &Connection, asset: &Asset) -> rusqlite::Result<i64> {
|
||||
Ok(conn.last_insert_rowid())
|
||||
}
|
||||
|
||||
/// Look up an asset by its hash.
|
||||
/// Find an asset by its unique SHA-256 hash. Returns None if not found.
|
||||
pub fn get_asset_by_hash(conn: &Connection, hash: &str) -> rusqlite::Result<Option<Asset>> {
|
||||
conn.query_row(
|
||||
"SELECT id, timestamp, hash, mime_type, application, user_identity, description,
|
||||
@ -115,7 +121,7 @@ pub fn get_asset_by_hash(conn: &Connection, hash: &str) -> rusqlite::Result<Opti
|
||||
.optional()
|
||||
}
|
||||
|
||||
/// Get tags for an asset.
|
||||
/// Get the list of tag names attached to an asset.
|
||||
pub fn get_asset_tags(conn: &Connection, asset_id: i64) -> rusqlite::Result<Vec<String>> {
|
||||
let mut stmt = conn.prepare(
|
||||
"SELECT t.name FROM tags t
|
||||
@ -127,7 +133,7 @@ pub fn get_asset_tags(conn: &Connection, asset_id: i64) -> rusqlite::Result<Vec<
|
||||
tags.collect()
|
||||
}
|
||||
|
||||
/// Upsert a tag and return its id.
|
||||
/// Insert a tag if it doesn't exist yet, then return its id.
|
||||
pub fn upsert_tag(conn: &Connection, name: &str) -> rusqlite::Result<i64> {
|
||||
conn.execute(
|
||||
"INSERT OR IGNORE INTO tags (name) VALUES (?1)",
|
||||
@ -138,7 +144,7 @@ pub fn upsert_tag(conn: &Connection, name: &str) -> rusqlite::Result<i64> {
|
||||
})
|
||||
}
|
||||
|
||||
/// Replace all tags for an asset within a transaction.
|
||||
/// Remove all existing tags for an asset and assign the new ones.
|
||||
pub fn set_asset_tags(conn: &Connection, asset_id: i64, tags: &[String]) -> rusqlite::Result<()> {
|
||||
conn.execute(
|
||||
"DELETE FROM asset_tags WHERE asset_id = ?1",
|
||||
@ -154,7 +160,8 @@ pub fn set_asset_tags(conn: &Connection, asset_id: i64, tags: &[String]) -> rusq
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Build an AssetMeta from an Asset row + tags.
|
||||
/// Convert an internal Asset database row into the API-friendly AssetMeta format
|
||||
/// (includes tags fetched from the join table).
|
||||
pub fn asset_to_meta(conn: &Connection, asset: &Asset) -> rusqlite::Result<AssetMeta> {
|
||||
let tags = get_asset_tags(conn, asset.id)?;
|
||||
Ok(AssetMeta {
|
||||
@ -173,7 +180,7 @@ pub fn asset_to_meta(conn: &Connection, asset: &Asset) -> rusqlite::Result<Asset
|
||||
})
|
||||
}
|
||||
|
||||
/// Update description and/or tags for an asset.
|
||||
/// Update an asset's description and/or tags (only changes the fields you provide).
|
||||
pub fn update_asset_metadata(
|
||||
conn: &Connection,
|
||||
hash: &str,
|
||||
@ -195,7 +202,7 @@ pub fn update_asset_metadata(
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Flag an asset as corrupted.
|
||||
/// Mark or unmark an asset as corrupted (set by the background verifier).
|
||||
pub fn flag_corrupted(conn: &Connection, hash: &str, corrupted: bool) -> rusqlite::Result<()> {
|
||||
conn.execute(
|
||||
"UPDATE assets SET is_corrupted = ?1 WHERE hash = ?2",
|
||||
@ -204,7 +211,8 @@ pub fn flag_corrupted(conn: &Connection, hash: &str, corrupted: bool) -> rusqlit
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Update file size for an asset (used by verifier to backfill).
|
||||
/// Store the file size in bytes for an asset (used by the verifier to fill in
|
||||
/// sizes for assets that were created before the size column existed).
|
||||
pub fn update_asset_size(conn: &Connection, hash: &str, size: i64) -> rusqlite::Result<()> {
|
||||
conn.execute(
|
||||
"UPDATE assets SET size = ?1 WHERE hash = ?2",
|
||||
@ -213,7 +221,7 @@ pub fn update_asset_size(conn: &Connection, hash: &str, size: i64) -> rusqlite::
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Soft-delete: mark as trashed.
|
||||
/// Soft-delete an asset by marking it as trashed (the file is moved to .trash/).
|
||||
pub fn trash_asset(conn: &Connection, hash: &str) -> rusqlite::Result<()> {
|
||||
conn.execute(
|
||||
"UPDATE assets SET is_trashed = 1 WHERE hash = ?1",
|
||||
@ -222,7 +230,8 @@ pub fn trash_asset(conn: &Connection, hash: &str) -> rusqlite::Result<()> {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// List assets with pagination and filtering.
|
||||
/// Fetch a page of assets with optional filters (application, trashed, etc.).
|
||||
/// Returns the matching assets and the total count for pagination.
|
||||
pub fn list_assets(conn: &Connection, params: &ListParams) -> rusqlite::Result<(Vec<Asset>, i64)> {
|
||||
let limit = params.limit.unwrap_or(50);
|
||||
let offset = params.offset.unwrap_or(0);
|
||||
@ -301,7 +310,8 @@ pub fn list_assets(conn: &Connection, params: &ListParams) -> rusqlite::Result<(
|
||||
Ok((assets, total))
|
||||
}
|
||||
|
||||
/// Search assets with various filters.
|
||||
/// Search assets with multiple filters (hash prefix, time range, MIME type, tags, etc.).
|
||||
/// Returns matching assets and total count for pagination.
|
||||
pub fn search_assets(
|
||||
conn: &Connection,
|
||||
params: &SearchParams,
|
||||
@ -428,7 +438,8 @@ pub fn search_assets(
|
||||
Ok((assets, total))
|
||||
}
|
||||
|
||||
/// Get ALL asset records including trashed (for sync reconciliation).
|
||||
/// Get every asset record in the database, including trashed ones.
|
||||
/// Used by the sync system to compare what two peers have.
|
||||
pub fn get_all_assets(conn: &Connection) -> rusqlite::Result<Vec<Asset>> {
|
||||
let mut stmt = conn.prepare(
|
||||
"SELECT id, timestamp, hash, mime_type, application, user_identity, description,
|
||||
@ -457,7 +468,8 @@ pub fn get_all_assets(conn: &Connection) -> rusqlite::Result<Vec<Asset>> {
|
||||
Ok(assets)
|
||||
}
|
||||
|
||||
/// Get assets with `timestamp > since` (for incremental sync queries).
|
||||
/// Get only assets added after a given timestamp (for incremental sync --
|
||||
/// "what's new since last time I checked?").
|
||||
pub fn get_assets_since(conn: &Connection, since: i64) -> rusqlite::Result<Vec<Asset>> {
|
||||
let mut stmt = conn.prepare(
|
||||
"SELECT id, timestamp, hash, mime_type, application, user_identity, description,
|
||||
@ -487,7 +499,8 @@ pub fn get_assets_since(conn: &Connection, since: i64) -> rusqlite::Result<Vec<A
|
||||
Ok(assets)
|
||||
}
|
||||
|
||||
/// Get all non-trashed asset records (for verifier startup scan).
|
||||
/// Get all non-trashed assets (used by the background verifier to check
|
||||
/// file integrity on startup).
|
||||
pub fn get_all_active_assets(conn: &Connection) -> rusqlite::Result<Vec<Asset>> {
|
||||
let mut stmt = conn.prepare(
|
||||
"SELECT id, timestamp, hash, mime_type, application, user_identity, description,
|
||||
|
||||
@ -2,6 +2,7 @@ use axum::http::StatusCode;
|
||||
use axum::response::{IntoResponse, Response};
|
||||
use crate::models::ErrorResponse;
|
||||
|
||||
/// All the error types the API can return. Each variant maps to an HTTP status code.
|
||||
#[derive(Debug, thiserror::Error)]
|
||||
pub enum AppError {
|
||||
#[error("Not found: {0}")]
|
||||
@ -23,6 +24,7 @@ pub enum AppError {
|
||||
Internal(String),
|
||||
}
|
||||
|
||||
/// Converts an AppError into an HTTP response with the right status code and a JSON body.
|
||||
impl IntoResponse for AppError {
|
||||
fn into_response(self) -> Response {
|
||||
let (status, message) = match &self {
|
||||
|
||||
19
src/lib.rs
19
src/lib.rs
@ -1,12 +1,12 @@
|
||||
pub mod config;
|
||||
pub mod db;
|
||||
pub mod error;
|
||||
pub mod hash;
|
||||
pub mod models;
|
||||
pub mod routes;
|
||||
pub mod storage;
|
||||
pub mod verifier;
|
||||
pub mod xattr;
|
||||
pub mod config; // Configuration loading from YAML
|
||||
pub mod db; // SQLite database access (CRUD for assets and tags)
|
||||
pub mod error; // Centralized error types and HTTP error responses
|
||||
pub mod hash; // SHA-256 content hashing
|
||||
pub mod models; // Data structures shared across the codebase
|
||||
pub mod routes; // HTTP API route handlers
|
||||
pub mod storage; // File I/O: reading, writing, and trashing asset files
|
||||
pub mod verifier; // Background integrity checker and file-attribute syncer
|
||||
pub mod xattr; // OS-level file metadata (xattr on Unix, NTFS ADS on Windows)
|
||||
|
||||
use std::sync::Arc;
|
||||
|
||||
@ -17,6 +17,7 @@ use crate::db::Db;
|
||||
/// Each message is `"hash:timestamp"` (e.g. `"abc123def456:1710000000000"`).
|
||||
pub type SyncEventSender = tokio::sync::broadcast::Sender<String>;
|
||||
|
||||
/// Shared application state passed to every HTTP handler.
|
||||
#[derive(Clone)]
|
||||
pub struct AppState {
|
||||
pub config: Arc<Config>,
|
||||
|
||||
@ -10,6 +10,8 @@ use tower_http::trace::TraceLayer;
|
||||
use can_service::config::Config;
|
||||
use can_service::{db, routes, verifier, AppState};
|
||||
|
||||
/// Entry point: loads config, opens the database, starts background services,
|
||||
/// and launches the HTTP server.
|
||||
#[tokio::main]
|
||||
async fn main() -> anyhow::Result<()> {
|
||||
// Initialize tracing
|
||||
|
||||
@ -1,6 +1,7 @@
|
||||
use serde::{Deserialize, Serialize};
|
||||
|
||||
/// Database representation of an asset.
|
||||
/// Internal database row for a stored file. Contains all metadata fields
|
||||
/// that are persisted in SQLite.
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
pub struct Asset {
|
||||
pub id: i64,
|
||||
@ -18,7 +19,8 @@ pub struct Asset {
|
||||
pub size: i64,
|
||||
}
|
||||
|
||||
/// API-facing asset metadata response.
|
||||
/// The public-facing version of an asset's metadata, returned by the API.
|
||||
/// Includes resolved tags and omits internal fields like `id` and `actual_filename`.
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
pub struct AssetMeta {
|
||||
pub hash: String,
|
||||
@ -35,7 +37,7 @@ pub struct AssetMeta {
|
||||
pub size: i64,
|
||||
}
|
||||
|
||||
/// Standard API response wrapper.
|
||||
/// Wraps every successful API response in `{ "status": "success", "data": ... }`.
|
||||
#[derive(Debug, Serialize, Deserialize)]
|
||||
pub struct ApiResponse<T: Serialize> {
|
||||
pub status: String,
|
||||
@ -43,6 +45,7 @@ pub struct ApiResponse<T: Serialize> {
|
||||
}
|
||||
|
||||
impl<T: Serialize> ApiResponse<T> {
|
||||
/// Create a success response wrapping the given data.
|
||||
pub fn success(data: T) -> Self {
|
||||
Self {
|
||||
status: "success".to_string(),
|
||||
@ -51,7 +54,7 @@ impl<T: Serialize> ApiResponse<T> {
|
||||
}
|
||||
}
|
||||
|
||||
/// Error response body.
|
||||
/// JSON body for error responses: `{ "status": "error", "error": "..." }`.
|
||||
#[derive(Debug, Serialize, Deserialize)]
|
||||
pub struct ErrorResponse {
|
||||
pub status: String,
|
||||
@ -67,7 +70,7 @@ impl ErrorResponse {
|
||||
}
|
||||
}
|
||||
|
||||
/// Ingest success response data.
|
||||
/// Returned after a successful file upload: the timestamp, hash, and on-disk filename.
|
||||
#[derive(Debug, Serialize, Deserialize)]
|
||||
pub struct IngestResult {
|
||||
pub timestamp: i64,
|
||||
@ -97,7 +100,9 @@ pub struct MetadataUpdate {
|
||||
pub description: Option<String>,
|
||||
}
|
||||
|
||||
/// OS-level file attribute metadata (for xattr / NTFS ADS).
|
||||
/// Metadata stored directly on the file via OS-level attributes
|
||||
/// (xattr on macOS/Linux, NTFS Alternate Data Streams on Windows).
|
||||
/// This lets external tools read CAN metadata without hitting the database.
|
||||
#[derive(Debug, Clone, Default, PartialEq)]
|
||||
pub struct FileAttributes {
|
||||
pub mime_type: Option<String>,
|
||||
|
||||
@ -17,7 +17,8 @@ pub fn router() -> Router<AppState> {
|
||||
.route("/api/v1/can/0/asset/{hash}", patch(patch_asset))
|
||||
}
|
||||
|
||||
/// GET /api/v1/can/0/asset/{hash} - Stream the physical file.
|
||||
/// Download an asset's file by its hash. Streams the raw bytes back to the
|
||||
/// client with the correct MIME type and a suggested filename.
|
||||
async fn get_asset(
|
||||
State(state): State<AppState>,
|
||||
Path(hash): Path<String>,
|
||||
@ -59,7 +60,8 @@ async fn get_asset(
|
||||
.into_response())
|
||||
}
|
||||
|
||||
/// PATCH /api/v1/can/0/asset/{hash} - Update metadata (tags, description).
|
||||
/// Update an asset's tags and/or description. Saves changes to both the
|
||||
/// database and the OS-level file attributes.
|
||||
async fn patch_asset(
|
||||
State(state): State<AppState>,
|
||||
Path(hash): Path<String>,
|
||||
|
||||
@ -7,6 +7,9 @@ use crate::error::AppError;
|
||||
use crate::models::{ApiResponse, Asset, DataIngestRequest, FileAttributes, IngestResult};
|
||||
use crate::{db, hash, storage, xattr, AppState};
|
||||
|
||||
/// Register the two upload endpoints:
|
||||
/// - POST /ingest (multipart file upload)
|
||||
/// - POST /ingest/data (JSON body upload, agent-friendly)
|
||||
pub fn router() -> Router<AppState> {
|
||||
Router::new()
|
||||
.route("/api/v1/can/0/ingest", post(ingest_multipart))
|
||||
@ -27,7 +30,9 @@ struct IngestInput {
|
||||
description: Option<String>,
|
||||
}
|
||||
|
||||
/// Common pipeline: timestamp → hash → write file → xattr → DB insert.
|
||||
/// Core ingest pipeline shared by both upload endpoints.
|
||||
/// Steps: generate timestamp -> hash content -> write file to disk ->
|
||||
/// save OS-level metadata -> insert into database -> notify SSE subscribers.
|
||||
fn do_ingest(state: &AppState, input: IngestInput) -> Result<IngestResult, AppError> {
|
||||
let timestamp = SystemTime::now()
|
||||
.duration_since(UNIX_EPOCH)
|
||||
@ -99,7 +104,7 @@ fn do_ingest(state: &AppState, input: IngestInput) -> Result<IngestResult, AppEr
|
||||
})
|
||||
}
|
||||
|
||||
/// Parse a comma-separated tag string into a clean Vec.
|
||||
/// Split a comma-separated tag string like "photo,vacation" into a clean list.
|
||||
fn parse_tags(raw: Option<&str>) -> Vec<String> {
|
||||
raw.unwrap_or("")
|
||||
.split(',')
|
||||
@ -110,6 +115,8 @@ fn parse_tags(raw: Option<&str>) -> Vec<String> {
|
||||
|
||||
// ── POST /api/v1/can/0/ingest (multipart — file uploads) ──────────────
|
||||
|
||||
/// Handle multipart file upload. Reads the "file" field plus optional metadata
|
||||
/// fields (tags, application, user, etc.) and runs the ingest pipeline.
|
||||
async fn ingest_multipart(
|
||||
State(state): State<AppState>,
|
||||
mut multipart: Multipart,
|
||||
|
||||
@ -10,6 +10,8 @@ pub fn router() -> Router<AppState> {
|
||||
Router::new().route("/api/v1/can/0/list", get(list_assets))
|
||||
}
|
||||
|
||||
/// GET /api/v1/can/0/list - Return a paginated list of assets with their metadata.
|
||||
/// Supports query params: limit, offset, order (asc/desc), application filter.
|
||||
async fn list_assets(
|
||||
State(state): State<AppState>,
|
||||
Query(params): Query<ListParams>,
|
||||
|
||||
@ -10,6 +10,9 @@ pub fn router() -> Router<AppState> {
|
||||
Router::new().route("/api/v1/can/0/asset/{hash}/meta", get(get_meta))
|
||||
}
|
||||
|
||||
/// GET /api/v1/can/0/asset/{hash}/meta - Return an asset's metadata as JSON
|
||||
/// (hash, MIME type, tags, description, timestamps, etc.) without downloading
|
||||
/// the actual file.
|
||||
async fn get_meta(
|
||||
State(state): State<AppState>,
|
||||
Path(hash): Path<String>,
|
||||
|
||||
@ -1,15 +1,16 @@
|
||||
pub mod ingest;
|
||||
pub mod asset;
|
||||
pub mod meta;
|
||||
pub mod list;
|
||||
pub mod search;
|
||||
pub mod thumb;
|
||||
pub mod sync;
|
||||
pub mod events;
|
||||
pub mod ingest; // POST endpoints for uploading files and JSON data
|
||||
pub mod asset; // GET/PATCH endpoints for downloading files and updating metadata
|
||||
pub mod meta; // GET endpoint for reading asset metadata as JSON
|
||||
pub mod list; // GET endpoint for paginated asset listing
|
||||
pub mod search; // GET endpoint for searching/filtering assets
|
||||
pub mod thumb; // GET endpoint for generating resized thumbnail images
|
||||
pub mod sync; // Private P2P sync endpoints (protobuf, requires API key)
|
||||
pub mod events; // Public SSE endpoint for real-time "new asset" notifications
|
||||
|
||||
use axum::Router;
|
||||
use crate::AppState;
|
||||
|
||||
/// Combine all route modules into one router. Called once at startup.
|
||||
pub fn router() -> Router<AppState> {
|
||||
Router::new()
|
||||
.merge(ingest::router())
|
||||
|
||||
@ -10,6 +10,8 @@ pub fn router() -> Router<AppState> {
|
||||
Router::new().route("/api/v1/can/0/search", get(search_assets))
|
||||
}
|
||||
|
||||
/// GET /api/v1/can/0/search - Search assets by hash prefix, time range,
|
||||
/// MIME type, user, application, or tags. Returns paginated results.
|
||||
async fn search_assets(
|
||||
State(state): State<AppState>,
|
||||
Query(params): Query<SearchParams>,
|
||||
|
||||
@ -22,7 +22,9 @@ use tokio_stream::StreamExt;
|
||||
use crate::models::{Asset, FileAttributes};
|
||||
use crate::{db, hash, storage, xattr, AppState};
|
||||
|
||||
// ── Protobuf message types (hand-written, no protoc needed) ─────────────
|
||||
// ── Protobuf message types ───────────────────────────────────────────────
|
||||
// These structs are serialized/deserialized as protobuf using the `prost` crate.
|
||||
// They define the wire format for peer-to-peer sync communication.
|
||||
|
||||
#[derive(Clone, PartialEq, Message)]
|
||||
pub struct HashListRequest {}
|
||||
@ -137,6 +139,8 @@ struct HashesQuery {
|
||||
|
||||
// ── Auth ────────────────────────────────────────────────────────────────
|
||||
|
||||
/// Verify the X-Sync-Key header matches the configured API key.
|
||||
/// Returns 404 if sync is not configured, 401 if the key is wrong.
|
||||
fn check_sync_key(state: &AppState, headers: &HeaderMap) -> Result<(), (StatusCode, String)> {
|
||||
let expected = match &state.config.sync_api_key {
|
||||
Some(key) if !key.is_empty() => key,
|
||||
@ -157,6 +161,7 @@ fn check_sync_key(state: &AppState, headers: &HeaderMap) -> Result<(), (StatusCo
|
||||
|
||||
// ── Helpers ─────────────────────────────────────────────────────────────
|
||||
|
||||
/// Serialize a protobuf message into bytes.
|
||||
fn encode_proto<M: Message>(msg: &M) -> Result<Vec<u8>, (StatusCode, String)> {
|
||||
let mut buf = Vec::with_capacity(msg.encoded_len());
|
||||
msg.encode(&mut buf)
|
||||
@ -164,12 +169,16 @@ fn encode_proto<M: Message>(msg: &M) -> Result<Vec<u8>, (StatusCode, String)> {
|
||||
Ok(buf)
|
||||
}
|
||||
|
||||
/// Wrap protobuf bytes into an HTTP 200 response with the right content type.
|
||||
fn proto_response(buf: Vec<u8>) -> (StatusCode, [(&'static str, &'static str); 1], Vec<u8>) {
|
||||
(StatusCode::OK, [("content-type", "application/x-protobuf")], buf)
|
||||
}
|
||||
|
||||
// ── POST /sync/hashes ───────────────────────────────────────────────────
|
||||
|
||||
/// Return a compact list of all known asset hashes + timestamps.
|
||||
/// A remote peer calls this first to figure out which assets it's missing.
|
||||
/// Supports `?since=<timestamp>` for incremental queries.
|
||||
async fn sync_hashes(
|
||||
State(state): State<AppState>,
|
||||
headers: HeaderMap,
|
||||
@ -208,6 +217,8 @@ async fn sync_hashes(
|
||||
|
||||
// ── POST /sync/pull ─────────────────────────────────────────────────────
|
||||
|
||||
/// Download full asset bundles (metadata + file content) for a list of hashes.
|
||||
/// A remote peer calls this to fetch assets it doesn't have yet.
|
||||
async fn sync_pull(
|
||||
State(state): State<AppState>,
|
||||
headers: HeaderMap,
|
||||
@ -261,6 +272,9 @@ async fn sync_pull(
|
||||
|
||||
// ── POST /sync/push ─────────────────────────────────────────────────────
|
||||
|
||||
/// Receive and store a new asset pushed from a remote peer.
|
||||
/// Verifies the hash, writes the file, and inserts the DB record.
|
||||
/// Returns early if the asset already exists locally.
|
||||
async fn sync_push(
|
||||
State(state): State<AppState>,
|
||||
headers: HeaderMap,
|
||||
@ -372,6 +386,7 @@ async fn sync_push(
|
||||
|
||||
// ── POST /sync/meta ─────────────────────────────────────────────────────
|
||||
|
||||
/// Receive a metadata update from a remote peer (description, tags, trash status).
|
||||
async fn sync_meta(
|
||||
State(state): State<AppState>,
|
||||
headers: HeaderMap,
|
||||
|
||||
@ -18,12 +18,15 @@ pub fn router() -> Router<AppState> {
|
||||
)
|
||||
}
|
||||
|
||||
/// Static fallback SVG icon for non-image assets.
|
||||
/// A simple "?" placeholder icon returned when the asset isn't a resizable image.
|
||||
const FALLBACK_SVG: &str = r##"<svg xmlns="http://www.w3.org/2000/svg" width="128" height="128" viewBox="0 0 128 128">
|
||||
<rect width="128" height="128" rx="8" fill="#e0e0e0"/>
|
||||
<text x="64" y="72" text-anchor="middle" font-family="sans-serif" font-size="40" fill="#888">?</text>
|
||||
</svg>"##;
|
||||
|
||||
/// GET /api/v1/can/0/asset/{hash}/thumb/{width}/{height}
|
||||
/// Generate (or serve from cache) a resized JPEG thumbnail for image assets.
|
||||
/// Non-image assets get a placeholder SVG icon instead.
|
||||
async fn get_thumb(
|
||||
State(state): State<AppState>,
|
||||
Path((hash, max_width, max_height)): Path<(String, u32, u32)>,
|
||||
|
||||
259
src/storage.rs
259
src/storage.rs
@ -1,59 +1,49 @@
|
||||
use std::path::{Path, PathBuf};
|
||||
|
||||
use chrono::{DateTime, Utc};
|
||||
|
||||
/// Classify a MIME type into a storage subdirectory.
|
||||
pub fn mime_to_type_dir(mime: &str) -> &str {
|
||||
if mime.starts_with("image/") {
|
||||
"images"
|
||||
} else if mime == "application/pdf" {
|
||||
"pdf"
|
||||
} else if mime.starts_with("video/") {
|
||||
"video"
|
||||
} else if mime.starts_with("audio/") {
|
||||
"audio"
|
||||
} else if mime.starts_with("text/")
|
||||
|| mime == "application/json"
|
||||
|| mime == "application/xml"
|
||||
|| mime == "application/msword"
|
||||
|| mime == "application/rtf"
|
||||
|| mime.starts_with("application/vnd.openxmlformats")
|
||||
|| mime.starts_with("application/vnd.ms-")
|
||||
|| mime == "application/vnd.oasis.opendocument.text"
|
||||
|| mime == "application/vnd.oasis.opendocument.spreadsheet"
|
||||
{
|
||||
"documents"
|
||||
} else {
|
||||
"others"
|
||||
}
|
||||
}
|
||||
|
||||
/// Build the physical filename (including type subdirectory) per the spec:
|
||||
/// `{type_dir}/{YYYY-MM-DD_HH-MM}_{hash8}.{extension}`
|
||||
///
|
||||
/// Example: `images/2026-03-13_14-30_a3b2c4d5.jpg`
|
||||
/// Build the on-disk filename for a new asset.
|
||||
/// Format: `{timestamp}_{sha256hash}_{tags}.{extension}`
|
||||
/// Tags are sanitized (alphanumeric only) and truncated to fit filesystem limits.
|
||||
pub fn build_filename(
|
||||
timestamp: i64,
|
||||
hash: &str,
|
||||
_tags: &[String],
|
||||
tags: &[String],
|
||||
mime_type: &str,
|
||||
) -> String {
|
||||
let extension = mime_to_extension(mime_type);
|
||||
let type_dir = mime_to_type_dir(mime_type);
|
||||
|
||||
// Convert timestamp_ms to human-readable YYYY-MM-DD_HH-MM
|
||||
let dt = DateTime::<Utc>::from_timestamp_millis(timestamp)
|
||||
.unwrap_or_else(|| DateTime::<Utc>::from_timestamp(0, 0).unwrap());
|
||||
let time_part = dt.format("%Y-%m-%d_%H-%M").to_string();
|
||||
let base = format!("{}_{}", timestamp, hash);
|
||||
|
||||
// Use first 8 chars of hash for short identifier
|
||||
let short_hash = if hash.len() >= 8 { &hash[..8] } else { hash };
|
||||
if tags.is_empty() {
|
||||
return format!("{}.{}", base, extension);
|
||||
}
|
||||
|
||||
format!("{}/{}_{}. {}", type_dir, time_part, short_hash, extension)
|
||||
// Sanitize tags: strip non-alphanumeric, join with underscore
|
||||
let sanitized_tags: Vec<String> = tags
|
||||
.iter()
|
||||
.map(|t| t.chars().filter(|c| c.is_alphanumeric()).collect::<String>())
|
||||
.filter(|t| !t.is_empty())
|
||||
.collect();
|
||||
|
||||
if sanitized_tags.is_empty() {
|
||||
return format!("{}.{}", base, extension);
|
||||
}
|
||||
|
||||
let tag_part = sanitized_tags.join("_");
|
||||
|
||||
// Truncate to keep total filename under ~200 chars (safely under 255)
|
||||
let max_tag_len = 200usize.saturating_sub(base.len() + extension.len() + 2); // 2 for _ and .
|
||||
let truncated = if tag_part.len() > max_tag_len {
|
||||
&tag_part[..max_tag_len]
|
||||
} else {
|
||||
&tag_part
|
||||
};
|
||||
|
||||
format!("{}_{}. {}", base, truncated, extension)
|
||||
.replace(". ", ".")
|
||||
}
|
||||
|
||||
/// Derive file extension from MIME type.
|
||||
/// Convert a MIME type string (like "image/png") into a file extension (like "png").
|
||||
/// Falls back to "bin" for unknown types.
|
||||
pub fn mime_to_extension(mime: &str) -> &str {
|
||||
match mime {
|
||||
"application/pdf" => "pdf",
|
||||
@ -87,92 +77,49 @@ pub fn mime_to_extension(mime: &str) -> &str {
|
||||
}
|
||||
}
|
||||
|
||||
/// Write asset bytes to the storage root. Creates the type subdirectory if needed.
|
||||
/// `filename` may include a subdirectory prefix (e.g. "images/2026-01-01_12-00_abcd1234.jpg").
|
||||
/// Save a file's raw bytes to the storage directory. Returns the full path on disk.
|
||||
pub fn write_asset(root: &Path, filename: &str, data: &[u8]) -> std::io::Result<PathBuf> {
|
||||
let path = root.join(filename);
|
||||
// Ensure parent directory exists (handles type subdirectories)
|
||||
if let Some(parent) = path.parent() {
|
||||
std::fs::create_dir_all(parent)?;
|
||||
}
|
||||
std::fs::write(&path, data)?;
|
||||
Ok(path)
|
||||
}
|
||||
|
||||
/// Read asset bytes from the storage root.
|
||||
/// `filename` may include a subdirectory prefix.
|
||||
/// Load the raw bytes of a stored file from the storage directory.
|
||||
pub fn read_asset(root: &Path, filename: &str) -> std::io::Result<Vec<u8>> {
|
||||
let path = root.join(filename);
|
||||
std::fs::read(path)
|
||||
}
|
||||
|
||||
/// Move an asset file to the .trash directory.
|
||||
/// Handles filenames with subdirectory prefixes (e.g. "images/file.jpg").
|
||||
/// Move a file from the storage directory into the .trash/ folder (soft delete).
|
||||
pub fn trash_asset_file(root: &Path, filename: &str) -> std::io::Result<()> {
|
||||
let src = root.join(filename);
|
||||
let trash_dir = root.join(".trash");
|
||||
std::fs::create_dir_all(&trash_dir)?;
|
||||
// Use just the file basename in trash (flatten subdirectory structure)
|
||||
let basename = Path::new(filename)
|
||||
.file_name()
|
||||
.unwrap_or_else(|| std::ffi::OsStr::new(filename));
|
||||
let dst = trash_dir.join(basename);
|
||||
let dst = trash_dir.join(filename);
|
||||
std::fs::rename(src, dst)?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Parse a physical filename to extract the hash component.
|
||||
///
|
||||
/// New format: `{type_dir}/{YYYY-MM-DD_HH-MM}_{hash8}.{ext}`
|
||||
/// Legacy format: `{timestamp}_{sha256_64}_{tags}.{ext}`
|
||||
///
|
||||
/// Returns the hash portion (8 chars for new format, 64 chars for legacy).
|
||||
/// Extract the SHA-256 hash from a CAN filename.
|
||||
/// Expects format: `{timestamp}_{sha256hash}_{tags}.{ext}`
|
||||
/// Returns None if the filename doesn't match the expected pattern.
|
||||
pub fn parse_hash_from_filename(filename: &str) -> Option<String> {
|
||||
// Strip any directory prefix
|
||||
let basename = filename.rsplit('/').next().unwrap_or(filename);
|
||||
let basename = basename.rsplit('\\').next().unwrap_or(basename);
|
||||
|
||||
// Remove extension
|
||||
let stem = basename.rsplit_once('.')?.0;
|
||||
let stem = filename.rsplit_once('.')?.0;
|
||||
// Split by underscore: first part is timestamp, second is hash (64 hex chars)
|
||||
let parts: Vec<&str> = stem.splitn(3, '_').collect();
|
||||
|
||||
// New format: YYYY-MM-DD_HH-MM_hash8
|
||||
// After splitn(3, '_'): ["YYYY-MM-DD", "HH-MM", "hash8"]
|
||||
if parts.len() >= 3 && parts[0].len() == 10 && parts[0].contains('-') {
|
||||
// New format: third part is the short hash
|
||||
return Some(parts[2].to_string());
|
||||
}
|
||||
|
||||
// Legacy format: {timestamp}_{sha256_64}_{tags}
|
||||
if parts.len() >= 2 && parts[1].len() == 64 {
|
||||
return Some(parts[1].to_string());
|
||||
Some(parts[1].to_string())
|
||||
} else {
|
||||
None
|
||||
}
|
||||
|
||||
None
|
||||
}
|
||||
|
||||
/// Parse a physical filename to extract the timestamp component.
|
||||
///
|
||||
/// New format: `{type_dir}/{YYYY-MM-DD_HH-MM}_{hash8}.{ext}` → parses date to epoch ms
|
||||
/// Legacy format: `{timestamp}_{sha256}_{tags}.{ext}` → raw epoch ms
|
||||
/// Extract the millisecond timestamp from a CAN filename.
|
||||
/// Returns None if the filename doesn't match the expected pattern.
|
||||
pub fn parse_timestamp_from_filename(filename: &str) -> Option<i64> {
|
||||
// Strip any directory prefix
|
||||
let basename = filename.rsplit('/').next().unwrap_or(filename);
|
||||
let basename = basename.rsplit('\\').next().unwrap_or(basename);
|
||||
|
||||
let stem = basename.rsplit_once('.')?.0;
|
||||
let parts: Vec<&str> = stem.splitn(3, '_').collect();
|
||||
|
||||
// New format: YYYY-MM-DD_HH-MM_hash8
|
||||
if parts.len() >= 2 && parts[0].len() == 10 && parts[0].contains('-') {
|
||||
let date_str = format!("{}_{}", parts[0], parts[1]);
|
||||
let dt = chrono::NaiveDateTime::parse_from_str(&date_str, "%Y-%m-%d_%H-%M").ok()?;
|
||||
let utc = dt.and_utc();
|
||||
return Some(utc.timestamp_millis());
|
||||
}
|
||||
|
||||
// Legacy format: first part is raw epoch ms
|
||||
let ts_str = parts.first()?;
|
||||
let stem = filename.rsplit_once('.')?.0;
|
||||
let ts_str = stem.split('_').next()?;
|
||||
ts_str.parse().ok()
|
||||
}
|
||||
|
||||
@ -182,66 +129,23 @@ mod tests {
|
||||
use tempfile::TempDir;
|
||||
|
||||
#[test]
|
||||
fn test_mime_to_type_dir() {
|
||||
assert_eq!(mime_to_type_dir("image/jpeg"), "images");
|
||||
assert_eq!(mime_to_type_dir("image/png"), "images");
|
||||
assert_eq!(mime_to_type_dir("application/pdf"), "pdf");
|
||||
assert_eq!(mime_to_type_dir("text/plain"), "documents");
|
||||
assert_eq!(mime_to_type_dir("application/json"), "documents");
|
||||
assert_eq!(mime_to_type_dir("video/mp4"), "video");
|
||||
assert_eq!(mime_to_type_dir("audio/mpeg"), "audio");
|
||||
assert_eq!(mime_to_type_dir("application/zip"), "others");
|
||||
assert_eq!(mime_to_type_dir("application/octet-stream"), "others");
|
||||
fn test_build_filename_no_tags() {
|
||||
let name = build_filename(1773014400123, "a3b2c4d5e6f7", &[], "application/pdf");
|
||||
assert_eq!(name, "1773014400123_a3b2c4d5e6f7.pdf");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_build_filename_image() {
|
||||
// 2026-03-13 14:30:00 UTC in ms
|
||||
let ts = 1773412200000i64;
|
||||
let hash = "a3b2c4d5e6f7a8b9".to_string();
|
||||
let name = build_filename(ts, &hash, &[], "image/jpeg");
|
||||
assert_eq!(name, "images/2026-03-13_14-30_a3b2c4d5.jpg");
|
||||
fn test_build_filename_with_tags() {
|
||||
let tags = vec!["photo".to_string(), "vacation".to_string()];
|
||||
let name = build_filename(1773014400123, "a3b2c4d5e6f7", &tags, "image/jpeg");
|
||||
assert_eq!(name, "1773014400123_a3b2c4d5e6f7_photo_vacation.jpg");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_build_filename_pdf() {
|
||||
let ts = 1773412200000i64;
|
||||
let hash = "deadbeef12345678".to_string();
|
||||
let name = build_filename(ts, &hash, &[], "application/pdf");
|
||||
assert_eq!(name, "pdf/2026-03-13_14-30_deadbeef.pdf");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_build_filename_text() {
|
||||
let ts = 1773412200000i64;
|
||||
let hash = "abcdef0123456789".to_string();
|
||||
let name = build_filename(ts, &hash, &["ignored".to_string()], "text/plain");
|
||||
// Tags are ignored in new format
|
||||
assert_eq!(name, "documents/2026-03-13_14-30_abcdef01.txt");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_build_filename_video() {
|
||||
let ts = 1773412200000i64;
|
||||
let hash = "ff00ff00ff00ff00".to_string();
|
||||
let name = build_filename(ts, &hash, &[], "video/mp4");
|
||||
assert_eq!(name, "video/2026-03-13_14-30_ff00ff00.mp4");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_build_filename_audio() {
|
||||
let ts = 1773412200000i64;
|
||||
let hash = "aa11bb22cc33dd44".to_string();
|
||||
let name = build_filename(ts, &hash, &[], "audio/mpeg");
|
||||
assert_eq!(name, "audio/2026-03-13_14-30_aa11bb22.mp3");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_build_filename_others() {
|
||||
let ts = 1773412200000i64;
|
||||
let hash = "1234567890abcdef".to_string();
|
||||
let name = build_filename(ts, &hash, &[], "application/zip");
|
||||
assert_eq!(name, "others/2026-03-13_14-30_12345678.zip");
|
||||
fn test_build_filename_strips_special_chars_from_tags() {
|
||||
let tags = vec!["hello world!".to_string(), "test@123".to_string()];
|
||||
let name = build_filename(100, "abc", &tags, "text/plain");
|
||||
assert_eq!(name, "100_abc_helloworld_test123.txt");
|
||||
}
|
||||
|
||||
#[test]
|
||||
@ -253,55 +157,38 @@ mod tests {
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_write_and_read_asset_with_subdir() {
|
||||
fn test_write_and_read_asset() {
|
||||
let dir = TempDir::new().unwrap();
|
||||
let data = b"hello world";
|
||||
let filename = "images/2026-01-01_12-00_abcd1234.jpg";
|
||||
let path = write_asset(dir.path(), filename, data).unwrap();
|
||||
let path = write_asset(dir.path(), "test_file.txt", data).unwrap();
|
||||
assert!(path.exists());
|
||||
assert!(dir.path().join("images").is_dir());
|
||||
|
||||
let read_back = read_asset(dir.path(), filename).unwrap();
|
||||
let read_back = read_asset(dir.path(), "test_file.txt").unwrap();
|
||||
assert_eq!(read_back, data);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_trash_asset_file_with_subdir() {
|
||||
fn test_trash_asset_file() {
|
||||
let dir = TempDir::new().unwrap();
|
||||
let filename = "images/2026-01-01_12-00_abcd1234.jpg";
|
||||
write_asset(dir.path(), filename, b"bye").unwrap();
|
||||
write_asset(dir.path(), "to_trash.txt", b"bye").unwrap();
|
||||
|
||||
trash_asset_file(dir.path(), filename).unwrap();
|
||||
assert!(!dir.path().join(filename).exists());
|
||||
assert!(dir.path().join(".trash").join("2026-01-01_12-00_abcd1234.jpg").exists());
|
||||
trash_asset_file(dir.path(), "to_trash.txt").unwrap();
|
||||
assert!(!dir.path().join("to_trash.txt").exists());
|
||||
assert!(dir.path().join(".trash").join("to_trash.txt").exists());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_parse_hash_from_new_filename() {
|
||||
assert_eq!(
|
||||
parse_hash_from_filename("images/2026-03-13_14-30_a3b2c4d5.jpg"),
|
||||
Some("a3b2c4d5".to_string())
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_parse_hash_from_legacy_filename() {
|
||||
fn test_parse_hash_from_filename() {
|
||||
let hash_64 = "a".repeat(64);
|
||||
let filename = format!("1773014400123_{}.pdf", hash_64);
|
||||
assert_eq!(parse_hash_from_filename(&filename), Some(hash_64));
|
||||
assert_eq!(parse_hash_from_filename(&filename), Some(hash_64.clone()));
|
||||
|
||||
let filename_tags = format!("1773014400123_{}_photo_vacation.jpg", hash_64);
|
||||
assert_eq!(parse_hash_from_filename(&filename_tags), Some(hash_64));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_parse_timestamp_from_new_filename() {
|
||||
let ts = parse_timestamp_from_filename("images/2026-03-13_14-30_a3b2c4d5.jpg");
|
||||
assert!(ts.is_some());
|
||||
let ts = ts.unwrap();
|
||||
// Should be 2026-03-13 14:30 UTC in millis
|
||||
assert_eq!(ts, 1773412200000);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_parse_timestamp_from_legacy_filename() {
|
||||
fn test_parse_timestamp_from_filename() {
|
||||
let hash_64 = "b".repeat(64);
|
||||
let filename = format!("1773014400123_{}.pdf", hash_64);
|
||||
assert_eq!(parse_timestamp_from_filename(&filename), Some(1773014400123));
|
||||
|
||||
@ -11,10 +11,10 @@ use crate::models::FileAttributes;
|
||||
use crate::storage::{parse_hash_from_filename, parse_timestamp_from_filename};
|
||||
use crate::xattr;
|
||||
|
||||
/// Start the background verifier subsystem.
|
||||
/// - Runs an initial full scrub
|
||||
/// - Watches for filesystem changes
|
||||
/// - Runs periodic scrubs
|
||||
/// Launch the background integrity checker. It does three things:
|
||||
/// 1. Immediately scans all files to detect corruption or missing data.
|
||||
/// 2. Watches the storage folder for file changes and re-checks them in real time.
|
||||
/// 3. Re-runs the full scan on a timer (configurable in config.yaml).
|
||||
pub fn start(config: Config, db: Db) {
|
||||
let config2 = config.clone();
|
||||
let db2 = db.clone();
|
||||
@ -58,6 +58,7 @@ fn config3_for_watcher(config: Config) -> Config {
|
||||
config
|
||||
}
|
||||
|
||||
/// Watch the storage directory for file changes and verify each changed file.
|
||||
async fn run_watcher(config: Config, db: Db) -> anyhow::Result<()> {
|
||||
let (tx, mut rx) = mpsc::channel::<PathBuf>(100);
|
||||
let storage_root = config.storage_root.clone();
|
||||
@ -114,7 +115,9 @@ async fn run_watcher(config: Config, db: Db) -> anyhow::Result<()> {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Run a full scrub: verify every active asset's hash.
|
||||
/// Full integrity scan: re-hashes every active file on disk and compares it
|
||||
/// to the expected hash in the database. Also syncs OS-level file attributes
|
||||
/// and backfills missing file sizes.
|
||||
async fn run_scrub(config: &Config, db: &Db) -> anyhow::Result<()> {
|
||||
let assets = {
|
||||
let conn = db.lock().unwrap();
|
||||
@ -276,7 +279,8 @@ async fn run_scrub(config: &Config, db: &Db) -> anyhow::Result<()> {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Verify a single file by its physical filename.
|
||||
/// Re-hash a single file and flag it as corrupted if the hash doesn't match.
|
||||
/// Called when the filesystem watcher detects a change.
|
||||
async fn verify_single_file(
|
||||
config: &Config,
|
||||
db: &Db,
|
||||
|
||||
@ -27,7 +27,8 @@ pub fn read_attributes(path: &Path) -> std::io::Result<FileAttributes> {
|
||||
}
|
||||
}
|
||||
|
||||
// ── Unix implementation using xattr crate ──
|
||||
// ── Unix implementation ──
|
||||
// Stores each metadata field as an extended attribute (e.g. "user.can.mime_type").
|
||||
|
||||
#[cfg(unix)]
|
||||
fn write_xattr(path: &Path, attrs: &FileAttributes) -> std::io::Result<()> {
|
||||
@ -58,6 +59,7 @@ fn write_xattr(path: &Path, attrs: &FileAttributes) -> std::io::Result<()> {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Read all CAN metadata from Unix extended attributes on a file.
|
||||
#[cfg(unix)]
|
||||
fn read_xattr(path: &Path) -> std::io::Result<FileAttributes> {
|
||||
use xattr::FileExt;
|
||||
@ -81,8 +83,10 @@ fn read_xattr(path: &Path) -> std::io::Result<FileAttributes> {
|
||||
})
|
||||
}
|
||||
|
||||
// ── Windows implementation using NTFS Alternate Data Streams ──
|
||||
// ── Windows implementation ──
|
||||
// Stores each metadata field as an NTFS Alternate Data Stream (e.g. "file.txt:can.mime_type").
|
||||
|
||||
/// Write CAN metadata fields as NTFS Alternate Data Streams on a file.
|
||||
#[cfg(windows)]
|
||||
fn write_ntfs_ads(path: &Path, attrs: &FileAttributes) -> std::io::Result<()> {
|
||||
let base = path.to_string_lossy();
|
||||
@ -111,6 +115,7 @@ fn write_ntfs_ads(path: &Path, attrs: &FileAttributes) -> std::io::Result<()> {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Read all CAN metadata from NTFS Alternate Data Streams on a file.
|
||||
#[cfg(windows)]
|
||||
fn read_ntfs_ads(path: &Path) -> std::io::Result<FileAttributes> {
|
||||
let base = path.to_string_lossy();
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user