//! Private sync API endpoints (protobuf-encoded). //! //! All endpoints require `X-Sync-Key` header matching `config.sync_api_key`. //! If `sync_api_key` is not configured, all endpoints return 404. //! //! Includes an SSE endpoint (`GET /sync/events`) that streams real-time //! notifications when new assets are ingested. use std::convert::Infallible; use axum::body::Bytes; use axum::extract::{Query, State}; use axum::http::{HeaderMap, StatusCode}; use axum::response::sse::{Event, Sse}; use axum::response::IntoResponse; use axum::routing::{get, post}; use axum::Router; use prost::Message; use tokio_stream::wrappers::BroadcastStream; use tokio_stream::StreamExt; use crate::models::{Asset, FileAttributes}; use crate::{db, hash, storage, xattr, AppState}; // ── Protobuf message types ─────────────────────────────────────────────── // These structs are serialized/deserialized as protobuf using the `prost` crate. // They define the wire format for peer-to-peer sync communication. #[derive(Clone, PartialEq, Message)] pub struct HashListRequest {} #[derive(Clone, PartialEq, Message)] pub struct HashListResponse { #[prost(message, repeated, tag = "1")] pub assets: Vec, } #[derive(Clone, PartialEq, Message)] pub struct AssetDigest { #[prost(string, tag = "1")] pub hash: String, #[prost(int64, tag = "2")] pub timestamp: i64, #[prost(int64, tag = "3")] pub size: i64, #[prost(bool, tag = "4")] pub is_trashed: bool, } #[derive(Clone, PartialEq, Message)] pub struct PullRequest { #[prost(string, repeated, tag = "1")] pub hashes: Vec, } #[derive(Clone, PartialEq, Message)] pub struct PullResponse { #[prost(message, repeated, tag = "1")] pub bundles: Vec, } #[derive(Clone, PartialEq, Message)] pub struct AssetBundle { #[prost(string, tag = "1")] pub hash: String, #[prost(int64, tag = "2")] pub timestamp: i64, #[prost(string, tag = "3")] pub mime_type: String, #[prost(string, optional, tag = "4")] pub application: Option, #[prost(string, optional, tag = "5")] pub user_identity: Option, #[prost(string, optional, tag = "6")] pub description: Option, #[prost(string, optional, tag = "7")] pub human_filename: Option, #[prost(string, optional, tag = "8")] pub human_path: Option, #[prost(bool, tag = "9")] pub is_trashed: bool, #[prost(int64, tag = "10")] pub size: i64, #[prost(string, repeated, tag = "11")] pub tags: Vec, #[prost(bytes = "vec", tag = "12")] pub content: Vec, } #[derive(Clone, PartialEq, Message)] pub struct PushRequest { #[prost(message, optional, tag = "1")] pub bundle: Option, } #[derive(Clone, PartialEq, Message)] pub struct PushResponse { #[prost(string, tag = "1")] pub hash: String, #[prost(bool, tag = "2")] pub already_existed: bool, } #[derive(Clone, PartialEq, Message)] pub struct MetaUpdateRequest { #[prost(string, tag = "1")] pub hash: String, #[prost(string, optional, tag = "2")] pub description: Option, #[prost(string, repeated, tag = "3")] pub tags: Vec, #[prost(bool, tag = "4")] pub is_trashed: bool, } #[derive(Clone, PartialEq, Message)] pub struct MetaUpdateResponse { #[prost(bool, tag = "1")] pub success: bool, } // ── Router ────────────────────────────────────────────────────────────── pub fn router() -> Router { Router::new() .route("/sync/hashes", post(sync_hashes)) .route("/sync/pull", post(sync_pull)) .route("/sync/push", post(sync_push)) .route("/sync/meta", post(sync_meta)) .route("/sync/events", get(sync_events)) } /// Query params for /sync/hashes (optional `since` timestamp for incremental queries). #[derive(serde::Deserialize, Default)] struct HashesQuery { /// Only return assets with `timestamp > since`. Omit or 0 for full list. since: Option, } // ── Auth ──────────────────────────────────────────────────────────────── /// Verify the X-Sync-Key header matches the configured API key. /// Returns 404 if sync is not configured, 401 if the key is wrong. fn check_sync_key(state: &AppState, headers: &HeaderMap) -> Result<(), (StatusCode, String)> { let expected = match &state.config.sync_api_key { Some(key) if !key.is_empty() => key, _ => return Err((StatusCode::NOT_FOUND, "Sync API not enabled".into())), }; let provided = headers .get("X-Sync-Key") .and_then(|v| v.to_str().ok()) .unwrap_or(""); if provided != expected { return Err((StatusCode::UNAUTHORIZED, "Invalid sync key".into())); } Ok(()) } // ── Helpers ───────────────────────────────────────────────────────────── /// Serialize a protobuf message into bytes. fn encode_proto(msg: &M) -> Result, (StatusCode, String)> { let mut buf = Vec::with_capacity(msg.encoded_len()); msg.encode(&mut buf) .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, format!("Encode error: {}", e)))?; Ok(buf) } /// Wrap protobuf bytes into an HTTP 200 response with the right content type. fn proto_response(buf: Vec) -> (StatusCode, [(&'static str, &'static str); 1], Vec) { (StatusCode::OK, [("content-type", "application/x-protobuf")], buf) } // ── POST /sync/hashes ─────────────────────────────────────────────────── /// Return a compact list of all known asset hashes + timestamps. /// A remote peer calls this first to figure out which assets it's missing. /// Supports `?since=` for incremental queries. async fn sync_hashes( State(state): State, headers: HeaderMap, query: Query, _body: Bytes, ) -> Result { check_sync_key(&state, &headers)?; let since = query.since.unwrap_or(0); let assets = { let conn = state.db.lock().unwrap(); if since > 0 { db::get_assets_since(&conn, since) .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, format!("DB error: {}", e)))? } else { db::get_all_assets(&conn) .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, format!("DB error: {}", e)))? } }; let resp = HashListResponse { assets: assets .iter() .map(|a| AssetDigest { hash: a.hash.clone(), timestamp: a.timestamp, size: a.size, is_trashed: a.is_trashed, }) .collect(), }; Ok(proto_response(encode_proto(&resp)?)) } // ── POST /sync/pull ───────────────────────────────────────────────────── /// Download full asset bundles (metadata + file content) for a list of hashes. /// A remote peer calls this to fetch assets it doesn't have yet. async fn sync_pull( State(state): State, headers: HeaderMap, body: Bytes, ) -> Result { check_sync_key(&state, &headers)?; let req = PullRequest::decode(body) .map_err(|e| (StatusCode::BAD_REQUEST, format!("Decode error: {}", e)))?; let mut bundles = Vec::new(); for hash_str in &req.hashes { let (asset, tags) = { let conn = state.db.lock().unwrap(); let asset = match db::get_asset_by_hash(&conn, hash_str) { Ok(Some(a)) => a, _ => continue, }; let tags = db::get_asset_tags(&conn, asset.id).unwrap_or_default(); (asset, tags) }; let content = match storage::read_asset(&state.config.storage_root, &asset.actual_filename) { Ok(c) => c, Err(e) => { tracing::warn!("Failed to read {}: {}", &asset.actual_filename, e); continue; } }; bundles.push(AssetBundle { hash: asset.hash, timestamp: asset.timestamp, mime_type: asset.mime_type, application: asset.application, user_identity: asset.user_identity, description: asset.description, human_filename: asset.human_filename, human_path: asset.human_path, is_trashed: asset.is_trashed, size: asset.size, tags, content, }); } Ok(proto_response(encode_proto(&PullResponse { bundles })?)) } // ── POST /sync/push ───────────────────────────────────────────────────── /// Receive and store a new asset pushed from a remote peer. /// Verifies the hash, writes the file, and inserts the DB record. /// Returns early if the asset already exists locally. async fn sync_push( State(state): State, headers: HeaderMap, body: Bytes, ) -> Result { check_sync_key(&state, &headers)?; let req = PushRequest::decode(body) .map_err(|e| (StatusCode::BAD_REQUEST, format!("Decode error: {}", e)))?; let bundle = req .bundle .ok_or_else(|| (StatusCode::BAD_REQUEST, "Missing bundle".into()))?; // 1. Verify hash let computed = hash::compute_hash(bundle.timestamp, &bundle.content); if computed != bundle.hash { return Err(( StatusCode::BAD_REQUEST, format!( "Hash mismatch: computed {} vs provided {}", &computed[..12], &bundle.hash[..12.min(bundle.hash.len())] ), )); } // 2. Check if already exists { let conn = state.db.lock().unwrap(); if let Ok(Some(_)) = db::get_asset_by_hash(&conn, &bundle.hash) { return Ok(proto_response(encode_proto(&PushResponse { hash: bundle.hash, already_existed: true, })?)); } } // 3. Write file let actual_filename = storage::build_filename(bundle.timestamp, &bundle.hash, &bundle.tags, &bundle.mime_type); let file_path = storage::write_asset(&state.config.storage_root, &actual_filename, &bundle.content) .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, format!("Write error: {}", e)))?; // 4. OS attributes (best-effort) let attrs = FileAttributes { mime_type: Some(bundle.mime_type.clone()), application: bundle.application.clone(), user: bundle.user_identity.clone(), tags: if bundle.tags.is_empty() { None } else { Some(bundle.tags.join(",")) }, description: bundle.description.clone(), human_filename: bundle.human_filename.clone(), human_path: bundle.human_path.clone(), }; if let Err(e) = xattr::write_attributes(&file_path, &attrs) { tracing::warn!("Failed to write OS attributes: {}", e); } // 5. DB insert let asset = Asset { id: 0, timestamp: bundle.timestamp, hash: bundle.hash.clone(), mime_type: bundle.mime_type, application: bundle.application, user_identity: bundle.user_identity, description: bundle.description, actual_filename, human_filename: bundle.human_filename, human_path: bundle.human_path, is_trashed: false, is_corrupted: false, size: bundle.content.len() as i64, }; { let conn = state.db.lock().unwrap(); let asset_id = db::insert_asset(&conn, &asset) .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, format!("DB error: {}", e)))?; if !bundle.tags.is_empty() { db::set_asset_tags(&conn, asset_id, &bundle.tags) .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, format!("Tag error: {}", e)))?; } if bundle.is_trashed { let _ = db::trash_asset(&conn, &bundle.hash); } } tracing::info!("Sync push: ingested {} ({}B)", &bundle.hash[..12], bundle.content.len()); // Notify SSE subscribers about the new asset let event_data = format!( r#"{{"hash":"{}","timestamp":{}}}"#, bundle.hash, bundle.timestamp ); let _ = state.sync_events.send(event_data); Ok(proto_response(encode_proto(&PushResponse { hash: bundle.hash, already_existed: false, })?)) } // ── POST /sync/meta ───────────────────────────────────────────────────── /// Receive a metadata update from a remote peer (description, tags, trash status). async fn sync_meta( State(state): State, headers: HeaderMap, body: Bytes, ) -> Result { check_sync_key(&state, &headers)?; let req = MetaUpdateRequest::decode(body) .map_err(|e| (StatusCode::BAD_REQUEST, format!("Decode error: {}", e)))?; let conn = state.db.lock().unwrap(); let asset = db::get_asset_by_hash(&conn, &req.hash) .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, format!("DB error: {}", e)))? .ok_or_else(|| (StatusCode::NOT_FOUND, "Asset not found".into()))?; if let Some(ref desc) = req.description { db::update_asset_metadata(&conn, &req.hash, Some(desc), None) .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, format!("Update error: {}", e)))?; } if !req.tags.is_empty() { db::set_asset_tags(&conn, asset.id, &req.tags) .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, format!("Tag error: {}", e)))?; } if req.is_trashed && !asset.is_trashed { db::trash_asset(&conn, &req.hash) .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, format!("Trash error: {}", e)))?; let _ = storage::trash_asset_file(&state.config.storage_root, &asset.actual_filename); } tracing::info!("Sync meta update for {}", &req.hash[..12.min(req.hash.len())]); Ok(proto_response(encode_proto(&MetaUpdateResponse { success: true, })?)) } // ── GET /sync/events (SSE) ──────────────────────────────────────────── /// Server-Sent Events endpoint. Streams `new_asset` events whenever a file is /// ingested (via public API or sync push). Requires `X-Sync-Key` as a query /// param (`?key=...`) since SSE/EventSource doesn't support custom headers. /// /// Each event is: /// ```text /// event: new_asset /// data: {"hash":"abc...","timestamp":1710000000000} /// ``` async fn sync_events( State(state): State, headers: HeaderMap, query: Query, ) -> Result>>, (StatusCode, String)> { // SSE clients (EventSource) can't set custom headers, so accept key from query param too let key_ok = check_sync_key(&state, &headers).is_ok() || query .key .as_deref() .map(|k| { state .config .sync_api_key .as_deref() .map(|expected| k == expected) .unwrap_or(false) }) .unwrap_or(false); if !key_ok { return Err((StatusCode::UNAUTHORIZED, "Invalid sync key".into())); } let rx = state.sync_events.subscribe(); let stream = BroadcastStream::new(rx).filter_map(|result| match result { Ok(data) => Some(Ok(Event::default().event("new_asset").data(data))), Err(_) => None, // lagged — skip missed events, client will reconcile }); Ok(Sse::new(stream).keep_alive( axum::response::sse::KeepAlive::new() .interval(std::time::Duration::from_secs(15)) .text("ping"), )) } #[derive(serde::Deserialize, Default)] struct SseQuery { key: Option, }