Add private protobuf sync API to CAN service
New /sync/* endpoints for peer-to-peer replication: - POST /sync/hashes - list all asset digests for reconciliation - POST /sync/pull - pull full assets (metadata + content) by hash - POST /sync/push - push asset with explicit timestamp for deterministic hashing - POST /sync/meta - update metadata (tags, description, trash state) All endpoints use protobuf encoding (prost) and require X-Sync-Key header matching config.sync_api_key. Sync API is disabled when no key is configured. Also adds db::get_all_assets() for sync reconciliation (includes trashed). Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
parent
360ecbdad0
commit
1b8187a484
39
Cargo.lock
generated
39
Cargo.lock
generated
@ -172,6 +172,7 @@ dependencies = [
|
||||
"mime",
|
||||
"mime_guess",
|
||||
"notify",
|
||||
"prost",
|
||||
"reqwest",
|
||||
"rusqlite",
|
||||
"serde",
|
||||
@ -300,6 +301,12 @@ dependencies = [
|
||||
"syn",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "either"
|
||||
version = "1.15.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "48c757948c5ede0e46177b7add2e67155f70e33c07fea8284df6576da70b3719"
|
||||
|
||||
[[package]]
|
||||
name = "encoding_rs"
|
||||
version = "0.8.35"
|
||||
@ -913,6 +920,15 @@ dependencies = [
|
||||
"serde",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "itertools"
|
||||
version = "0.14.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "2b192c782037fadd9cfa75548310488aabdbf3d2da73885b31bd0abd03351285"
|
||||
dependencies = [
|
||||
"either",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "itoa"
|
||||
version = "1.0.17"
|
||||
@ -1310,6 +1326,29 @@ dependencies = [
|
||||
"unicode-ident",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "prost"
|
||||
version = "0.13.5"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "2796faa41db3ec313a31f7624d9286acf277b52de526150b7e69f3debf891ee5"
|
||||
dependencies = [
|
||||
"bytes",
|
||||
"prost-derive",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "prost-derive"
|
||||
version = "0.13.5"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "8a56d757972c98b346a9b766e3f02746cde6dd1cd1d1d563472929fdd74bec4d"
|
||||
dependencies = [
|
||||
"anyhow",
|
||||
"itertools",
|
||||
"proc-macro2",
|
||||
"quote",
|
||||
"syn",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "pxfm"
|
||||
version = "0.1.28"
|
||||
|
||||
@ -37,6 +37,9 @@ mime = "0.3"
|
||||
tracing = "0.1"
|
||||
tracing-subscriber = { version = "0.3", features = ["env-filter"] }
|
||||
|
||||
# Protobuf (sync API)
|
||||
prost = "0.13"
|
||||
|
||||
# Utilities
|
||||
chrono = { version = "0.4", features = ["serde"] }
|
||||
anyhow = "1"
|
||||
|
||||
@ -12,6 +12,10 @@ pub struct Config {
|
||||
pub rebuild_error_threshold: u32,
|
||||
#[serde(default = "default_verify_interval")]
|
||||
pub verify_interval_hours: u64,
|
||||
/// Optional API key for the private sync endpoints (/sync/*).
|
||||
/// If not set, sync endpoints are disabled (return 404).
|
||||
#[serde(default)]
|
||||
pub sync_api_key: Option<String>,
|
||||
}
|
||||
|
||||
fn default_admin_token() -> String {
|
||||
|
||||
29
src/db.rs
29
src/db.rs
@ -428,6 +428,35 @@ pub fn search_assets(
|
||||
Ok((assets, total))
|
||||
}
|
||||
|
||||
/// Get ALL asset records including trashed (for sync reconciliation).
|
||||
pub fn get_all_assets(conn: &Connection) -> rusqlite::Result<Vec<Asset>> {
|
||||
let mut stmt = conn.prepare(
|
||||
"SELECT id, timestamp, hash, mime_type, application, user_identity, description,
|
||||
actual_filename, human_filename, human_path, is_trashed, is_corrupted, size
|
||||
FROM assets",
|
||||
)?;
|
||||
let assets = stmt
|
||||
.query_map([], |row| {
|
||||
Ok(Asset {
|
||||
id: row.get(0)?,
|
||||
timestamp: row.get(1)?,
|
||||
hash: row.get(2)?,
|
||||
mime_type: row.get(3)?,
|
||||
application: row.get(4)?,
|
||||
user_identity: row.get(5)?,
|
||||
description: row.get(6)?,
|
||||
actual_filename: row.get(7)?,
|
||||
human_filename: row.get(8)?,
|
||||
human_path: row.get(9)?,
|
||||
is_trashed: row.get(10)?,
|
||||
is_corrupted: row.get(11)?,
|
||||
size: row.get(12)?,
|
||||
})
|
||||
})?
|
||||
.collect::<rusqlite::Result<Vec<_>>>()?;
|
||||
Ok(assets)
|
||||
}
|
||||
|
||||
/// Get all non-trashed asset records (for verifier startup scan).
|
||||
pub fn get_all_active_assets(conn: &Connection) -> rusqlite::Result<Vec<Asset>> {
|
||||
let mut stmt = conn.prepare(
|
||||
|
||||
@ -4,6 +4,7 @@ pub mod meta;
|
||||
pub mod list;
|
||||
pub mod search;
|
||||
pub mod thumb;
|
||||
pub mod sync;
|
||||
|
||||
use axum::Router;
|
||||
use crate::AppState;
|
||||
@ -16,4 +17,5 @@ pub fn router() -> Router<AppState> {
|
||||
.merge(list::router())
|
||||
.merge(search::router())
|
||||
.merge(thumb::router())
|
||||
.merge(sync::router())
|
||||
}
|
||||
|
||||
381
src/routes/sync.rs
Normal file
381
src/routes/sync.rs
Normal file
@ -0,0 +1,381 @@
|
||||
//! Private sync API endpoints (protobuf-encoded).
|
||||
//!
|
||||
//! All endpoints require `X-Sync-Key` header matching `config.sync_api_key`.
|
||||
//! If `sync_api_key` is not configured, all endpoints return 404.
|
||||
|
||||
use axum::body::Bytes;
|
||||
use axum::extract::State;
|
||||
use axum::http::{HeaderMap, StatusCode};
|
||||
use axum::response::IntoResponse;
|
||||
use axum::routing::post;
|
||||
use axum::Router;
|
||||
use prost::Message;
|
||||
|
||||
use crate::models::{Asset, FileAttributes};
|
||||
use crate::{db, hash, storage, xattr, AppState};
|
||||
|
||||
// ── Protobuf message types (hand-written, no protoc needed) ─────────────
|
||||
|
||||
#[derive(Clone, PartialEq, Message)]
|
||||
pub struct HashListRequest {}
|
||||
|
||||
#[derive(Clone, PartialEq, Message)]
|
||||
pub struct HashListResponse {
|
||||
#[prost(message, repeated, tag = "1")]
|
||||
pub assets: Vec<AssetDigest>,
|
||||
}
|
||||
|
||||
#[derive(Clone, PartialEq, Message)]
|
||||
pub struct AssetDigest {
|
||||
#[prost(string, tag = "1")]
|
||||
pub hash: String,
|
||||
#[prost(int64, tag = "2")]
|
||||
pub timestamp: i64,
|
||||
#[prost(int64, tag = "3")]
|
||||
pub size: i64,
|
||||
#[prost(bool, tag = "4")]
|
||||
pub is_trashed: bool,
|
||||
}
|
||||
|
||||
#[derive(Clone, PartialEq, Message)]
|
||||
pub struct PullRequest {
|
||||
#[prost(string, repeated, tag = "1")]
|
||||
pub hashes: Vec<String>,
|
||||
}
|
||||
|
||||
#[derive(Clone, PartialEq, Message)]
|
||||
pub struct PullResponse {
|
||||
#[prost(message, repeated, tag = "1")]
|
||||
pub bundles: Vec<AssetBundle>,
|
||||
}
|
||||
|
||||
#[derive(Clone, PartialEq, Message)]
|
||||
pub struct AssetBundle {
|
||||
#[prost(string, tag = "1")]
|
||||
pub hash: String,
|
||||
#[prost(int64, tag = "2")]
|
||||
pub timestamp: i64,
|
||||
#[prost(string, tag = "3")]
|
||||
pub mime_type: String,
|
||||
#[prost(string, optional, tag = "4")]
|
||||
pub application: Option<String>,
|
||||
#[prost(string, optional, tag = "5")]
|
||||
pub user_identity: Option<String>,
|
||||
#[prost(string, optional, tag = "6")]
|
||||
pub description: Option<String>,
|
||||
#[prost(string, optional, tag = "7")]
|
||||
pub human_filename: Option<String>,
|
||||
#[prost(string, optional, tag = "8")]
|
||||
pub human_path: Option<String>,
|
||||
#[prost(bool, tag = "9")]
|
||||
pub is_trashed: bool,
|
||||
#[prost(int64, tag = "10")]
|
||||
pub size: i64,
|
||||
#[prost(string, repeated, tag = "11")]
|
||||
pub tags: Vec<String>,
|
||||
#[prost(bytes = "vec", tag = "12")]
|
||||
pub content: Vec<u8>,
|
||||
}
|
||||
|
||||
#[derive(Clone, PartialEq, Message)]
|
||||
pub struct PushRequest {
|
||||
#[prost(message, optional, tag = "1")]
|
||||
pub bundle: Option<AssetBundle>,
|
||||
}
|
||||
|
||||
#[derive(Clone, PartialEq, Message)]
|
||||
pub struct PushResponse {
|
||||
#[prost(string, tag = "1")]
|
||||
pub hash: String,
|
||||
#[prost(bool, tag = "2")]
|
||||
pub already_existed: bool,
|
||||
}
|
||||
|
||||
#[derive(Clone, PartialEq, Message)]
|
||||
pub struct MetaUpdateRequest {
|
||||
#[prost(string, tag = "1")]
|
||||
pub hash: String,
|
||||
#[prost(string, optional, tag = "2")]
|
||||
pub description: Option<String>,
|
||||
#[prost(string, repeated, tag = "3")]
|
||||
pub tags: Vec<String>,
|
||||
#[prost(bool, tag = "4")]
|
||||
pub is_trashed: bool,
|
||||
}
|
||||
|
||||
#[derive(Clone, PartialEq, Message)]
|
||||
pub struct MetaUpdateResponse {
|
||||
#[prost(bool, tag = "1")]
|
||||
pub success: bool,
|
||||
}
|
||||
|
||||
// ── Router ──────────────────────────────────────────────────────────────
|
||||
|
||||
pub fn router() -> Router<AppState> {
|
||||
Router::new()
|
||||
.route("/sync/hashes", post(sync_hashes))
|
||||
.route("/sync/pull", post(sync_pull))
|
||||
.route("/sync/push", post(sync_push))
|
||||
.route("/sync/meta", post(sync_meta))
|
||||
}
|
||||
|
||||
// ── Auth ────────────────────────────────────────────────────────────────
|
||||
|
||||
fn check_sync_key(state: &AppState, headers: &HeaderMap) -> Result<(), (StatusCode, String)> {
|
||||
let expected = match &state.config.sync_api_key {
|
||||
Some(key) if !key.is_empty() => key,
|
||||
_ => return Err((StatusCode::NOT_FOUND, "Sync API not enabled".into())),
|
||||
};
|
||||
|
||||
let provided = headers
|
||||
.get("X-Sync-Key")
|
||||
.and_then(|v| v.to_str().ok())
|
||||
.unwrap_or("");
|
||||
|
||||
if provided != expected {
|
||||
return Err((StatusCode::UNAUTHORIZED, "Invalid sync key".into()));
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
// ── Helpers ─────────────────────────────────────────────────────────────
|
||||
|
||||
fn encode_proto<M: Message>(msg: &M) -> Result<Vec<u8>, (StatusCode, String)> {
|
||||
let mut buf = Vec::with_capacity(msg.encoded_len());
|
||||
msg.encode(&mut buf)
|
||||
.map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, format!("Encode error: {}", e)))?;
|
||||
Ok(buf)
|
||||
}
|
||||
|
||||
fn proto_response(buf: Vec<u8>) -> (StatusCode, [(&'static str, &'static str); 1], Vec<u8>) {
|
||||
(StatusCode::OK, [("content-type", "application/x-protobuf")], buf)
|
||||
}
|
||||
|
||||
// ── POST /sync/hashes ───────────────────────────────────────────────────
|
||||
|
||||
async fn sync_hashes(
|
||||
State(state): State<AppState>,
|
||||
headers: HeaderMap,
|
||||
_body: Bytes,
|
||||
) -> Result<impl IntoResponse, (StatusCode, String)> {
|
||||
check_sync_key(&state, &headers)?;
|
||||
|
||||
let assets = {
|
||||
let conn = state.db.lock().unwrap();
|
||||
db::get_all_assets(&conn)
|
||||
.map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, format!("DB error: {}", e)))?
|
||||
};
|
||||
|
||||
let resp = HashListResponse {
|
||||
assets: assets
|
||||
.iter()
|
||||
.map(|a| AssetDigest {
|
||||
hash: a.hash.clone(),
|
||||
timestamp: a.timestamp,
|
||||
size: a.size,
|
||||
is_trashed: a.is_trashed,
|
||||
})
|
||||
.collect(),
|
||||
};
|
||||
|
||||
Ok(proto_response(encode_proto(&resp)?))
|
||||
}
|
||||
|
||||
// ── POST /sync/pull ─────────────────────────────────────────────────────
|
||||
|
||||
async fn sync_pull(
|
||||
State(state): State<AppState>,
|
||||
headers: HeaderMap,
|
||||
body: Bytes,
|
||||
) -> Result<impl IntoResponse, (StatusCode, String)> {
|
||||
check_sync_key(&state, &headers)?;
|
||||
|
||||
let req = PullRequest::decode(body)
|
||||
.map_err(|e| (StatusCode::BAD_REQUEST, format!("Decode error: {}", e)))?;
|
||||
|
||||
let mut bundles = Vec::new();
|
||||
|
||||
for hash_str in &req.hashes {
|
||||
let (asset, tags) = {
|
||||
let conn = state.db.lock().unwrap();
|
||||
let asset = match db::get_asset_by_hash(&conn, hash_str) {
|
||||
Ok(Some(a)) => a,
|
||||
_ => continue,
|
||||
};
|
||||
let tags = db::get_asset_tags(&conn, asset.id).unwrap_or_default();
|
||||
(asset, tags)
|
||||
};
|
||||
|
||||
let content =
|
||||
match storage::read_asset(&state.config.storage_root, &asset.actual_filename) {
|
||||
Ok(c) => c,
|
||||
Err(e) => {
|
||||
tracing::warn!("Failed to read {}: {}", &asset.actual_filename, e);
|
||||
continue;
|
||||
}
|
||||
};
|
||||
|
||||
bundles.push(AssetBundle {
|
||||
hash: asset.hash,
|
||||
timestamp: asset.timestamp,
|
||||
mime_type: asset.mime_type,
|
||||
application: asset.application,
|
||||
user_identity: asset.user_identity,
|
||||
description: asset.description,
|
||||
human_filename: asset.human_filename,
|
||||
human_path: asset.human_path,
|
||||
is_trashed: asset.is_trashed,
|
||||
size: asset.size,
|
||||
tags,
|
||||
content,
|
||||
});
|
||||
}
|
||||
|
||||
Ok(proto_response(encode_proto(&PullResponse { bundles })?))
|
||||
}
|
||||
|
||||
// ── POST /sync/push ─────────────────────────────────────────────────────
|
||||
|
||||
async fn sync_push(
|
||||
State(state): State<AppState>,
|
||||
headers: HeaderMap,
|
||||
body: Bytes,
|
||||
) -> Result<impl IntoResponse, (StatusCode, String)> {
|
||||
check_sync_key(&state, &headers)?;
|
||||
|
||||
let req = PushRequest::decode(body)
|
||||
.map_err(|e| (StatusCode::BAD_REQUEST, format!("Decode error: {}", e)))?;
|
||||
|
||||
let bundle = req
|
||||
.bundle
|
||||
.ok_or_else(|| (StatusCode::BAD_REQUEST, "Missing bundle".into()))?;
|
||||
|
||||
// 1. Verify hash
|
||||
let computed = hash::compute_hash(bundle.timestamp, &bundle.content);
|
||||
if computed != bundle.hash {
|
||||
return Err((
|
||||
StatusCode::BAD_REQUEST,
|
||||
format!(
|
||||
"Hash mismatch: computed {} vs provided {}",
|
||||
&computed[..12],
|
||||
&bundle.hash[..12.min(bundle.hash.len())]
|
||||
),
|
||||
));
|
||||
}
|
||||
|
||||
// 2. Check if already exists
|
||||
{
|
||||
let conn = state.db.lock().unwrap();
|
||||
if let Ok(Some(_)) = db::get_asset_by_hash(&conn, &bundle.hash) {
|
||||
return Ok(proto_response(encode_proto(&PushResponse {
|
||||
hash: bundle.hash,
|
||||
already_existed: true,
|
||||
})?));
|
||||
}
|
||||
}
|
||||
|
||||
// 3. Write file
|
||||
let actual_filename =
|
||||
storage::build_filename(bundle.timestamp, &bundle.hash, &bundle.tags, &bundle.mime_type);
|
||||
|
||||
let file_path =
|
||||
storage::write_asset(&state.config.storage_root, &actual_filename, &bundle.content)
|
||||
.map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, format!("Write error: {}", e)))?;
|
||||
|
||||
// 4. OS attributes (best-effort)
|
||||
let attrs = FileAttributes {
|
||||
mime_type: Some(bundle.mime_type.clone()),
|
||||
application: bundle.application.clone(),
|
||||
user: bundle.user_identity.clone(),
|
||||
tags: if bundle.tags.is_empty() {
|
||||
None
|
||||
} else {
|
||||
Some(bundle.tags.join(","))
|
||||
},
|
||||
description: bundle.description.clone(),
|
||||
human_filename: bundle.human_filename.clone(),
|
||||
human_path: bundle.human_path.clone(),
|
||||
};
|
||||
if let Err(e) = xattr::write_attributes(&file_path, &attrs) {
|
||||
tracing::warn!("Failed to write OS attributes: {}", e);
|
||||
}
|
||||
|
||||
// 5. DB insert
|
||||
let asset = Asset {
|
||||
id: 0,
|
||||
timestamp: bundle.timestamp,
|
||||
hash: bundle.hash.clone(),
|
||||
mime_type: bundle.mime_type,
|
||||
application: bundle.application,
|
||||
user_identity: bundle.user_identity,
|
||||
description: bundle.description,
|
||||
actual_filename,
|
||||
human_filename: bundle.human_filename,
|
||||
human_path: bundle.human_path,
|
||||
is_trashed: false,
|
||||
is_corrupted: false,
|
||||
size: bundle.content.len() as i64,
|
||||
};
|
||||
|
||||
{
|
||||
let conn = state.db.lock().unwrap();
|
||||
let asset_id = db::insert_asset(&conn, &asset)
|
||||
.map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, format!("DB error: {}", e)))?;
|
||||
if !bundle.tags.is_empty() {
|
||||
db::set_asset_tags(&conn, asset_id, &bundle.tags)
|
||||
.map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, format!("Tag error: {}", e)))?;
|
||||
}
|
||||
if bundle.is_trashed {
|
||||
let _ = db::trash_asset(&conn, &bundle.hash);
|
||||
}
|
||||
}
|
||||
|
||||
tracing::info!("Sync push: ingested {} ({}B)", &bundle.hash[..12], bundle.content.len());
|
||||
|
||||
Ok(proto_response(encode_proto(&PushResponse {
|
||||
hash: bundle.hash,
|
||||
already_existed: false,
|
||||
})?))
|
||||
}
|
||||
|
||||
// ── POST /sync/meta ─────────────────────────────────────────────────────
|
||||
|
||||
async fn sync_meta(
|
||||
State(state): State<AppState>,
|
||||
headers: HeaderMap,
|
||||
body: Bytes,
|
||||
) -> Result<impl IntoResponse, (StatusCode, String)> {
|
||||
check_sync_key(&state, &headers)?;
|
||||
|
||||
let req = MetaUpdateRequest::decode(body)
|
||||
.map_err(|e| (StatusCode::BAD_REQUEST, format!("Decode error: {}", e)))?;
|
||||
|
||||
let conn = state.db.lock().unwrap();
|
||||
|
||||
let asset = db::get_asset_by_hash(&conn, &req.hash)
|
||||
.map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, format!("DB error: {}", e)))?
|
||||
.ok_or_else(|| (StatusCode::NOT_FOUND, "Asset not found".into()))?;
|
||||
|
||||
if let Some(ref desc) = req.description {
|
||||
db::update_asset_metadata(&conn, &req.hash, Some(desc), None)
|
||||
.map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, format!("Update error: {}", e)))?;
|
||||
}
|
||||
|
||||
if !req.tags.is_empty() {
|
||||
db::set_asset_tags(&conn, asset.id, &req.tags)
|
||||
.map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, format!("Tag error: {}", e)))?;
|
||||
}
|
||||
|
||||
if req.is_trashed && !asset.is_trashed {
|
||||
db::trash_asset(&conn, &req.hash)
|
||||
.map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, format!("Trash error: {}", e)))?;
|
||||
let _ = storage::trash_asset_file(&state.config.storage_root, &asset.actual_filename);
|
||||
}
|
||||
|
||||
tracing::info!("Sync meta update for {}", &req.hash[..12.min(req.hash.len())]);
|
||||
|
||||
Ok(proto_response(encode_proto(&MetaUpdateResponse {
|
||||
success: true,
|
||||
})?))
|
||||
}
|
||||
Loading…
x
Reference in New Issue
Block a user