diff --git a/Cargo.lock b/Cargo.lock index 81e5a5b..aa4e667 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -172,6 +172,7 @@ dependencies = [ "mime", "mime_guess", "notify", + "prost", "reqwest", "rusqlite", "serde", @@ -300,6 +301,12 @@ dependencies = [ "syn", ] +[[package]] +name = "either" +version = "1.15.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "48c757948c5ede0e46177b7add2e67155f70e33c07fea8284df6576da70b3719" + [[package]] name = "encoding_rs" version = "0.8.35" @@ -913,6 +920,15 @@ dependencies = [ "serde", ] +[[package]] +name = "itertools" +version = "0.14.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2b192c782037fadd9cfa75548310488aabdbf3d2da73885b31bd0abd03351285" +dependencies = [ + "either", +] + [[package]] name = "itoa" version = "1.0.17" @@ -1310,6 +1326,29 @@ dependencies = [ "unicode-ident", ] +[[package]] +name = "prost" +version = "0.13.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2796faa41db3ec313a31f7624d9286acf277b52de526150b7e69f3debf891ee5" +dependencies = [ + "bytes", + "prost-derive", +] + +[[package]] +name = "prost-derive" +version = "0.13.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8a56d757972c98b346a9b766e3f02746cde6dd1cd1d1d563472929fdd74bec4d" +dependencies = [ + "anyhow", + "itertools", + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "pxfm" version = "0.1.28" diff --git a/Cargo.toml b/Cargo.toml index 261423f..dd8194f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -37,6 +37,9 @@ mime = "0.3" tracing = "0.1" tracing-subscriber = { version = "0.3", features = ["env-filter"] } +# Protobuf (sync API) +prost = "0.13" + # Utilities chrono = { version = "0.4", features = ["serde"] } anyhow = "1" diff --git a/src/config.rs b/src/config.rs index 8feef56..3a08322 100644 --- a/src/config.rs +++ b/src/config.rs @@ -12,6 +12,10 @@ pub struct Config { pub rebuild_error_threshold: u32, #[serde(default = "default_verify_interval")] pub verify_interval_hours: u64, + /// Optional API key for the private sync endpoints (/sync/*). + /// If not set, sync endpoints are disabled (return 404). + #[serde(default)] + pub sync_api_key: Option, } fn default_admin_token() -> String { diff --git a/src/db.rs b/src/db.rs index 97fb192..08227d9 100644 --- a/src/db.rs +++ b/src/db.rs @@ -428,6 +428,35 @@ pub fn search_assets( Ok((assets, total)) } +/// Get ALL asset records including trashed (for sync reconciliation). +pub fn get_all_assets(conn: &Connection) -> rusqlite::Result> { + let mut stmt = conn.prepare( + "SELECT id, timestamp, hash, mime_type, application, user_identity, description, + actual_filename, human_filename, human_path, is_trashed, is_corrupted, size + FROM assets", + )?; + let assets = stmt + .query_map([], |row| { + Ok(Asset { + id: row.get(0)?, + timestamp: row.get(1)?, + hash: row.get(2)?, + mime_type: row.get(3)?, + application: row.get(4)?, + user_identity: row.get(5)?, + description: row.get(6)?, + actual_filename: row.get(7)?, + human_filename: row.get(8)?, + human_path: row.get(9)?, + is_trashed: row.get(10)?, + is_corrupted: row.get(11)?, + size: row.get(12)?, + }) + })? + .collect::>>()?; + Ok(assets) +} + /// Get all non-trashed asset records (for verifier startup scan). pub fn get_all_active_assets(conn: &Connection) -> rusqlite::Result> { let mut stmt = conn.prepare( diff --git a/src/routes/mod.rs b/src/routes/mod.rs index 0a784fe..8d3ebd4 100644 --- a/src/routes/mod.rs +++ b/src/routes/mod.rs @@ -4,6 +4,7 @@ pub mod meta; pub mod list; pub mod search; pub mod thumb; +pub mod sync; use axum::Router; use crate::AppState; @@ -16,4 +17,5 @@ pub fn router() -> Router { .merge(list::router()) .merge(search::router()) .merge(thumb::router()) + .merge(sync::router()) } diff --git a/src/routes/sync.rs b/src/routes/sync.rs new file mode 100644 index 0000000..c306fd9 --- /dev/null +++ b/src/routes/sync.rs @@ -0,0 +1,381 @@ +//! Private sync API endpoints (protobuf-encoded). +//! +//! All endpoints require `X-Sync-Key` header matching `config.sync_api_key`. +//! If `sync_api_key` is not configured, all endpoints return 404. + +use axum::body::Bytes; +use axum::extract::State; +use axum::http::{HeaderMap, StatusCode}; +use axum::response::IntoResponse; +use axum::routing::post; +use axum::Router; +use prost::Message; + +use crate::models::{Asset, FileAttributes}; +use crate::{db, hash, storage, xattr, AppState}; + +// ── Protobuf message types (hand-written, no protoc needed) ───────────── + +#[derive(Clone, PartialEq, Message)] +pub struct HashListRequest {} + +#[derive(Clone, PartialEq, Message)] +pub struct HashListResponse { + #[prost(message, repeated, tag = "1")] + pub assets: Vec, +} + +#[derive(Clone, PartialEq, Message)] +pub struct AssetDigest { + #[prost(string, tag = "1")] + pub hash: String, + #[prost(int64, tag = "2")] + pub timestamp: i64, + #[prost(int64, tag = "3")] + pub size: i64, + #[prost(bool, tag = "4")] + pub is_trashed: bool, +} + +#[derive(Clone, PartialEq, Message)] +pub struct PullRequest { + #[prost(string, repeated, tag = "1")] + pub hashes: Vec, +} + +#[derive(Clone, PartialEq, Message)] +pub struct PullResponse { + #[prost(message, repeated, tag = "1")] + pub bundles: Vec, +} + +#[derive(Clone, PartialEq, Message)] +pub struct AssetBundle { + #[prost(string, tag = "1")] + pub hash: String, + #[prost(int64, tag = "2")] + pub timestamp: i64, + #[prost(string, tag = "3")] + pub mime_type: String, + #[prost(string, optional, tag = "4")] + pub application: Option, + #[prost(string, optional, tag = "5")] + pub user_identity: Option, + #[prost(string, optional, tag = "6")] + pub description: Option, + #[prost(string, optional, tag = "7")] + pub human_filename: Option, + #[prost(string, optional, tag = "8")] + pub human_path: Option, + #[prost(bool, tag = "9")] + pub is_trashed: bool, + #[prost(int64, tag = "10")] + pub size: i64, + #[prost(string, repeated, tag = "11")] + pub tags: Vec, + #[prost(bytes = "vec", tag = "12")] + pub content: Vec, +} + +#[derive(Clone, PartialEq, Message)] +pub struct PushRequest { + #[prost(message, optional, tag = "1")] + pub bundle: Option, +} + +#[derive(Clone, PartialEq, Message)] +pub struct PushResponse { + #[prost(string, tag = "1")] + pub hash: String, + #[prost(bool, tag = "2")] + pub already_existed: bool, +} + +#[derive(Clone, PartialEq, Message)] +pub struct MetaUpdateRequest { + #[prost(string, tag = "1")] + pub hash: String, + #[prost(string, optional, tag = "2")] + pub description: Option, + #[prost(string, repeated, tag = "3")] + pub tags: Vec, + #[prost(bool, tag = "4")] + pub is_trashed: bool, +} + +#[derive(Clone, PartialEq, Message)] +pub struct MetaUpdateResponse { + #[prost(bool, tag = "1")] + pub success: bool, +} + +// ── Router ────────────────────────────────────────────────────────────── + +pub fn router() -> Router { + Router::new() + .route("/sync/hashes", post(sync_hashes)) + .route("/sync/pull", post(sync_pull)) + .route("/sync/push", post(sync_push)) + .route("/sync/meta", post(sync_meta)) +} + +// ── Auth ──────────────────────────────────────────────────────────────── + +fn check_sync_key(state: &AppState, headers: &HeaderMap) -> Result<(), (StatusCode, String)> { + let expected = match &state.config.sync_api_key { + Some(key) if !key.is_empty() => key, + _ => return Err((StatusCode::NOT_FOUND, "Sync API not enabled".into())), + }; + + let provided = headers + .get("X-Sync-Key") + .and_then(|v| v.to_str().ok()) + .unwrap_or(""); + + if provided != expected { + return Err((StatusCode::UNAUTHORIZED, "Invalid sync key".into())); + } + + Ok(()) +} + +// ── Helpers ───────────────────────────────────────────────────────────── + +fn encode_proto(msg: &M) -> Result, (StatusCode, String)> { + let mut buf = Vec::with_capacity(msg.encoded_len()); + msg.encode(&mut buf) + .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, format!("Encode error: {}", e)))?; + Ok(buf) +} + +fn proto_response(buf: Vec) -> (StatusCode, [(&'static str, &'static str); 1], Vec) { + (StatusCode::OK, [("content-type", "application/x-protobuf")], buf) +} + +// ── POST /sync/hashes ─────────────────────────────────────────────────── + +async fn sync_hashes( + State(state): State, + headers: HeaderMap, + _body: Bytes, +) -> Result { + check_sync_key(&state, &headers)?; + + let assets = { + let conn = state.db.lock().unwrap(); + db::get_all_assets(&conn) + .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, format!("DB error: {}", e)))? + }; + + let resp = HashListResponse { + assets: assets + .iter() + .map(|a| AssetDigest { + hash: a.hash.clone(), + timestamp: a.timestamp, + size: a.size, + is_trashed: a.is_trashed, + }) + .collect(), + }; + + Ok(proto_response(encode_proto(&resp)?)) +} + +// ── POST /sync/pull ───────────────────────────────────────────────────── + +async fn sync_pull( + State(state): State, + headers: HeaderMap, + body: Bytes, +) -> Result { + check_sync_key(&state, &headers)?; + + let req = PullRequest::decode(body) + .map_err(|e| (StatusCode::BAD_REQUEST, format!("Decode error: {}", e)))?; + + let mut bundles = Vec::new(); + + for hash_str in &req.hashes { + let (asset, tags) = { + let conn = state.db.lock().unwrap(); + let asset = match db::get_asset_by_hash(&conn, hash_str) { + Ok(Some(a)) => a, + _ => continue, + }; + let tags = db::get_asset_tags(&conn, asset.id).unwrap_or_default(); + (asset, tags) + }; + + let content = + match storage::read_asset(&state.config.storage_root, &asset.actual_filename) { + Ok(c) => c, + Err(e) => { + tracing::warn!("Failed to read {}: {}", &asset.actual_filename, e); + continue; + } + }; + + bundles.push(AssetBundle { + hash: asset.hash, + timestamp: asset.timestamp, + mime_type: asset.mime_type, + application: asset.application, + user_identity: asset.user_identity, + description: asset.description, + human_filename: asset.human_filename, + human_path: asset.human_path, + is_trashed: asset.is_trashed, + size: asset.size, + tags, + content, + }); + } + + Ok(proto_response(encode_proto(&PullResponse { bundles })?)) +} + +// ── POST /sync/push ───────────────────────────────────────────────────── + +async fn sync_push( + State(state): State, + headers: HeaderMap, + body: Bytes, +) -> Result { + check_sync_key(&state, &headers)?; + + let req = PushRequest::decode(body) + .map_err(|e| (StatusCode::BAD_REQUEST, format!("Decode error: {}", e)))?; + + let bundle = req + .bundle + .ok_or_else(|| (StatusCode::BAD_REQUEST, "Missing bundle".into()))?; + + // 1. Verify hash + let computed = hash::compute_hash(bundle.timestamp, &bundle.content); + if computed != bundle.hash { + return Err(( + StatusCode::BAD_REQUEST, + format!( + "Hash mismatch: computed {} vs provided {}", + &computed[..12], + &bundle.hash[..12.min(bundle.hash.len())] + ), + )); + } + + // 2. Check if already exists + { + let conn = state.db.lock().unwrap(); + if let Ok(Some(_)) = db::get_asset_by_hash(&conn, &bundle.hash) { + return Ok(proto_response(encode_proto(&PushResponse { + hash: bundle.hash, + already_existed: true, + })?)); + } + } + + // 3. Write file + let actual_filename = + storage::build_filename(bundle.timestamp, &bundle.hash, &bundle.tags, &bundle.mime_type); + + let file_path = + storage::write_asset(&state.config.storage_root, &actual_filename, &bundle.content) + .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, format!("Write error: {}", e)))?; + + // 4. OS attributes (best-effort) + let attrs = FileAttributes { + mime_type: Some(bundle.mime_type.clone()), + application: bundle.application.clone(), + user: bundle.user_identity.clone(), + tags: if bundle.tags.is_empty() { + None + } else { + Some(bundle.tags.join(",")) + }, + description: bundle.description.clone(), + human_filename: bundle.human_filename.clone(), + human_path: bundle.human_path.clone(), + }; + if let Err(e) = xattr::write_attributes(&file_path, &attrs) { + tracing::warn!("Failed to write OS attributes: {}", e); + } + + // 5. DB insert + let asset = Asset { + id: 0, + timestamp: bundle.timestamp, + hash: bundle.hash.clone(), + mime_type: bundle.mime_type, + application: bundle.application, + user_identity: bundle.user_identity, + description: bundle.description, + actual_filename, + human_filename: bundle.human_filename, + human_path: bundle.human_path, + is_trashed: false, + is_corrupted: false, + size: bundle.content.len() as i64, + }; + + { + let conn = state.db.lock().unwrap(); + let asset_id = db::insert_asset(&conn, &asset) + .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, format!("DB error: {}", e)))?; + if !bundle.tags.is_empty() { + db::set_asset_tags(&conn, asset_id, &bundle.tags) + .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, format!("Tag error: {}", e)))?; + } + if bundle.is_trashed { + let _ = db::trash_asset(&conn, &bundle.hash); + } + } + + tracing::info!("Sync push: ingested {} ({}B)", &bundle.hash[..12], bundle.content.len()); + + Ok(proto_response(encode_proto(&PushResponse { + hash: bundle.hash, + already_existed: false, + })?)) +} + +// ── POST /sync/meta ───────────────────────────────────────────────────── + +async fn sync_meta( + State(state): State, + headers: HeaderMap, + body: Bytes, +) -> Result { + check_sync_key(&state, &headers)?; + + let req = MetaUpdateRequest::decode(body) + .map_err(|e| (StatusCode::BAD_REQUEST, format!("Decode error: {}", e)))?; + + let conn = state.db.lock().unwrap(); + + let asset = db::get_asset_by_hash(&conn, &req.hash) + .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, format!("DB error: {}", e)))? + .ok_or_else(|| (StatusCode::NOT_FOUND, "Asset not found".into()))?; + + if let Some(ref desc) = req.description { + db::update_asset_metadata(&conn, &req.hash, Some(desc), None) + .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, format!("Update error: {}", e)))?; + } + + if !req.tags.is_empty() { + db::set_asset_tags(&conn, asset.id, &req.tags) + .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, format!("Tag error: {}", e)))?; + } + + if req.is_trashed && !asset.is_trashed { + db::trash_asset(&conn, &req.hash) + .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, format!("Trash error: {}", e)))?; + let _ = storage::trash_asset_file(&state.config.storage_root, &asset.actual_filename); + } + + tracing::info!("Sync meta update for {}", &req.hash[..12.min(req.hash.len())]); + + Ok(proto_response(encode_proto(&MetaUpdateResponse { + success: true, + })?)) +}