use rusqlite::{params, Connection, OptionalExtension}; use std::path::Path; use std::sync::{Arc, Mutex}; use crate::models::{Asset, AssetMeta, ListParams, SearchParams}; pub type Db = Arc>; pub fn open(path: &Path) -> anyhow::Result { let conn = Connection::open(path)?; conn.execute_batch("PRAGMA journal_mode=WAL; PRAGMA foreign_keys=ON;")?; init_schema(&conn)?; Ok(Arc::new(Mutex::new(conn))) } pub fn open_in_memory() -> anyhow::Result { let conn = Connection::open_in_memory()?; conn.execute_batch("PRAGMA foreign_keys=ON;")?; init_schema(&conn)?; Ok(Arc::new(Mutex::new(conn))) } fn init_schema(conn: &Connection) -> rusqlite::Result<()> { conn.execute_batch( " CREATE TABLE IF NOT EXISTS assets ( id INTEGER PRIMARY KEY AUTOINCREMENT, timestamp INTEGER NOT NULL, hash TEXT NOT NULL UNIQUE, mime_type TEXT NOT NULL, application TEXT, user_identity TEXT, description TEXT, actual_filename TEXT NOT NULL, human_filename TEXT, human_path TEXT, is_trashed BOOLEAN NOT NULL DEFAULT 0, is_corrupted BOOLEAN NOT NULL DEFAULT 0 ); CREATE TABLE IF NOT EXISTS tags ( id INTEGER PRIMARY KEY AUTOINCREMENT, name TEXT NOT NULL UNIQUE ); CREATE TABLE IF NOT EXISTS asset_tags ( asset_id INTEGER NOT NULL, tag_id INTEGER NOT NULL, PRIMARY KEY (asset_id, tag_id), FOREIGN KEY (asset_id) REFERENCES assets(id) ON DELETE CASCADE, FOREIGN KEY (tag_id) REFERENCES tags(id) ON DELETE CASCADE ); CREATE INDEX IF NOT EXISTS idx_hash ON assets(hash); CREATE INDEX IF NOT EXISTS idx_timestamp ON assets(timestamp); CREATE INDEX IF NOT EXISTS idx_application ON assets(application); CREATE INDEX IF NOT EXISTS idx_user ON assets(user_identity); CREATE INDEX IF NOT EXISTS idx_trashed ON assets(is_trashed); CREATE INDEX IF NOT EXISTS idx_tag_name ON tags(name); ", )?; // Migration: add size column (ignore error if column already exists) let _ = conn.execute("ALTER TABLE assets ADD COLUMN size INTEGER NOT NULL DEFAULT 0", []); Ok(()) } /// Insert a new asset. Returns the row id. pub fn insert_asset(conn: &Connection, asset: &Asset) -> rusqlite::Result { conn.execute( "INSERT INTO assets (timestamp, hash, mime_type, application, user_identity, description, actual_filename, human_filename, human_path, size) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10)", params![ asset.timestamp, asset.hash, asset.mime_type, asset.application, asset.user_identity, asset.description, asset.actual_filename, asset.human_filename, asset.human_path, asset.size, ], )?; Ok(conn.last_insert_rowid()) } /// Look up an asset by its hash. pub fn get_asset_by_hash(conn: &Connection, hash: &str) -> rusqlite::Result> { conn.query_row( "SELECT id, timestamp, hash, mime_type, application, user_identity, description, actual_filename, human_filename, human_path, is_trashed, is_corrupted, size FROM assets WHERE hash = ?1", params![hash], |row| { Ok(Asset { id: row.get(0)?, timestamp: row.get(1)?, hash: row.get(2)?, mime_type: row.get(3)?, application: row.get(4)?, user_identity: row.get(5)?, description: row.get(6)?, actual_filename: row.get(7)?, human_filename: row.get(8)?, human_path: row.get(9)?, is_trashed: row.get(10)?, is_corrupted: row.get(11)?, size: row.get(12)?, }) }, ) .optional() } /// Get tags for an asset. pub fn get_asset_tags(conn: &Connection, asset_id: i64) -> rusqlite::Result> { let mut stmt = conn.prepare( "SELECT t.name FROM tags t JOIN asset_tags at ON at.tag_id = t.id WHERE at.asset_id = ?1 ORDER BY t.name", )?; let tags = stmt.query_map(params![asset_id], |row| row.get(0))?; tags.collect() } /// Upsert a tag and return its id. pub fn upsert_tag(conn: &Connection, name: &str) -> rusqlite::Result { conn.execute( "INSERT OR IGNORE INTO tags (name) VALUES (?1)", params![name], )?; conn.query_row("SELECT id FROM tags WHERE name = ?1", params![name], |row| { row.get(0) }) } /// Replace all tags for an asset within a transaction. pub fn set_asset_tags(conn: &Connection, asset_id: i64, tags: &[String]) -> rusqlite::Result<()> { conn.execute( "DELETE FROM asset_tags WHERE asset_id = ?1", params![asset_id], )?; for tag in tags { let tag_id = upsert_tag(conn, tag)?; conn.execute( "INSERT OR IGNORE INTO asset_tags (asset_id, tag_id) VALUES (?1, ?2)", params![asset_id, tag_id], )?; } Ok(()) } /// Build an AssetMeta from an Asset row + tags. pub fn asset_to_meta(conn: &Connection, asset: &Asset) -> rusqlite::Result { let tags = get_asset_tags(conn, asset.id)?; Ok(AssetMeta { hash: asset.hash.clone(), mime_type: asset.mime_type.clone(), application: asset.application.clone(), user: asset.user_identity.clone(), tags, description: asset.description.clone(), human_filename: asset.human_filename.clone(), human_path: asset.human_path.clone(), timestamp: asset.timestamp, is_trashed: asset.is_trashed, is_corrupted: asset.is_corrupted, size: asset.size, }) } /// Update description and/or tags for an asset. pub fn update_asset_metadata( conn: &Connection, hash: &str, description: Option<&str>, tags: Option<&[String]>, ) -> rusqlite::Result<()> { let asset = get_asset_by_hash(conn, hash)? .ok_or(rusqlite::Error::QueryReturnedNoRows)?; if let Some(desc) = description { conn.execute( "UPDATE assets SET description = ?1 WHERE id = ?2", params![desc, asset.id], )?; } if let Some(tags) = tags { set_asset_tags(conn, asset.id, tags)?; } Ok(()) } /// Flag an asset as corrupted. pub fn flag_corrupted(conn: &Connection, hash: &str, corrupted: bool) -> rusqlite::Result<()> { conn.execute( "UPDATE assets SET is_corrupted = ?1 WHERE hash = ?2", params![corrupted, hash], )?; Ok(()) } /// Update file size for an asset (used by verifier to backfill). pub fn update_asset_size(conn: &Connection, hash: &str, size: i64) -> rusqlite::Result<()> { conn.execute( "UPDATE assets SET size = ?1 WHERE hash = ?2", params![size, hash], )?; Ok(()) } /// Soft-delete: mark as trashed. pub fn trash_asset(conn: &Connection, hash: &str) -> rusqlite::Result<()> { conn.execute( "UPDATE assets SET is_trashed = 1 WHERE hash = ?1", params![hash], )?; Ok(()) } /// List assets with pagination and filtering. pub fn list_assets(conn: &Connection, params: &ListParams) -> rusqlite::Result<(Vec, i64)> { let limit = params.limit.unwrap_or(50); let offset = params.offset.unwrap_or(0); let order = match params.order.as_deref() { Some("asc") => "ASC", _ => "DESC", }; let include_trashed = params.include_trashed.unwrap_or(false); let include_corrupted = params.include_corrupted.unwrap_or(false); let mut conditions = Vec::new(); let mut bind_values: Vec> = Vec::new(); if !include_trashed { conditions.push("is_trashed = 0"); } if !include_corrupted { conditions.push("is_corrupted = 0"); } if let Some(ref app) = params.application { conditions.push("application = ?"); bind_values.push(Box::new(app.clone())); } if let Some(offset_time) = params.offset_time { if order == "DESC" { conditions.push("timestamp < ?"); } else { conditions.push("timestamp > ?"); } bind_values.push(Box::new(offset_time)); } let where_clause = if conditions.is_empty() { String::new() } else { format!("WHERE {}", conditions.join(" AND ")) }; let count_sql = format!("SELECT COUNT(*) FROM assets {}", where_clause); let refs: Vec<&dyn rusqlite::types::ToSql> = bind_values.iter().map(|b| b.as_ref()).collect(); let total: i64 = conn.query_row(&count_sql, refs.as_slice(), |row| row.get(0))?; let query_sql = format!( "SELECT id, timestamp, hash, mime_type, application, user_identity, description, actual_filename, human_filename, human_path, is_trashed, is_corrupted, size FROM assets {} ORDER BY timestamp {} LIMIT ? OFFSET ?", where_clause, order ); let mut all_binds: Vec> = bind_values; all_binds.push(Box::new(limit)); all_binds.push(Box::new(offset)); let refs2: Vec<&dyn rusqlite::types::ToSql> = all_binds.iter().map(|b| b.as_ref()).collect(); let mut stmt = conn.prepare(&query_sql)?; let assets = stmt .query_map(refs2.as_slice(), |row| { Ok(Asset { id: row.get(0)?, timestamp: row.get(1)?, hash: row.get(2)?, mime_type: row.get(3)?, application: row.get(4)?, user_identity: row.get(5)?, description: row.get(6)?, actual_filename: row.get(7)?, human_filename: row.get(8)?, human_path: row.get(9)?, is_trashed: row.get(10)?, is_corrupted: row.get(11)?, size: row.get(12)?, }) })? .collect::>>()?; Ok((assets, total)) } /// Search assets with various filters. pub fn search_assets( conn: &Connection, params: &SearchParams, ) -> rusqlite::Result<(Vec, i64)> { let limit = params.limit.unwrap_or(50); let offset = params.offset.unwrap_or(0); let order = match params.order.as_deref() { Some("asc") => "ASC", _ => "DESC", }; let include_trashed = params.include_trashed.unwrap_or(false); let include_corrupted = params.include_corrupted.unwrap_or(false); let mut conditions = Vec::new(); let mut bind_values: Vec> = Vec::new(); let mut needs_tag_join = false; if !include_trashed { conditions.push("a.is_trashed = 0".to_string()); } if !include_corrupted { conditions.push("a.is_corrupted = 0".to_string()); } if let Some(ref hash) = params.hash { conditions.push("a.hash LIKE ?".to_string()); bind_values.push(Box::new(format!("{}%", hash))); } if let Some(start) = params.start_time { conditions.push("a.timestamp >= ?".to_string()); bind_values.push(Box::new(start)); } if let Some(end) = params.end_time { conditions.push("a.timestamp <= ?".to_string()); bind_values.push(Box::new(end)); } if let Some(ref mime) = params.mime_type { conditions.push("a.mime_type = ?".to_string()); bind_values.push(Box::new(mime.clone())); } if let Some(ref user) = params.user { conditions.push("a.user_identity = ?".to_string()); bind_values.push(Box::new(user.clone())); } if let Some(ref app) = params.application { conditions.push("a.application = ?".to_string()); bind_values.push(Box::new(app.clone())); } // Tag filtering: AND logic - asset must have ALL specified tags let tag_names: Vec = params .tags .as_deref() .unwrap_or("") .split(',') .map(|s| s.trim().to_string()) .filter(|s| !s.is_empty()) .collect(); if !tag_names.is_empty() { needs_tag_join = true; let placeholders: Vec = tag_names.iter().map(|_| "?".to_string()).collect(); conditions.push(format!( "a.id IN ( SELECT at.asset_id FROM asset_tags at JOIN tags t ON t.id = at.tag_id WHERE t.name IN ({}) GROUP BY at.asset_id HAVING COUNT(DISTINCT t.id) = ? )", placeholders.join(", ") )); for tag in &tag_names { bind_values.push(Box::new(tag.clone())); } bind_values.push(Box::new(tag_names.len() as i64)); } let _ = needs_tag_join; // subquery handles it let where_clause = if conditions.is_empty() { String::new() } else { format!("WHERE {}", conditions.join(" AND ")) }; let count_sql = format!("SELECT COUNT(*) FROM assets a {}", where_clause); let refs: Vec<&dyn rusqlite::types::ToSql> = bind_values.iter().map(|b| b.as_ref()).collect(); let total: i64 = conn.query_row(&count_sql, refs.as_slice(), |row| row.get(0))?; let query_sql = format!( "SELECT a.id, a.timestamp, a.hash, a.mime_type, a.application, a.user_identity, a.description, a.actual_filename, a.human_filename, a.human_path, a.is_trashed, a.is_corrupted, a.size FROM assets a {} ORDER BY a.timestamp {} LIMIT ? OFFSET ?", where_clause, order ); let mut all_binds = bind_values; all_binds.push(Box::new(limit)); all_binds.push(Box::new(offset)); let refs2: Vec<&dyn rusqlite::types::ToSql> = all_binds.iter().map(|b| b.as_ref()).collect(); let mut stmt = conn.prepare(&query_sql)?; let assets = stmt .query_map(refs2.as_slice(), |row| { Ok(Asset { id: row.get(0)?, timestamp: row.get(1)?, hash: row.get(2)?, mime_type: row.get(3)?, application: row.get(4)?, user_identity: row.get(5)?, description: row.get(6)?, actual_filename: row.get(7)?, human_filename: row.get(8)?, human_path: row.get(9)?, is_trashed: row.get(10)?, is_corrupted: row.get(11)?, size: row.get(12)?, }) })? .collect::>>()?; Ok((assets, total)) } /// Get ALL asset records including trashed (for sync reconciliation). pub fn get_all_assets(conn: &Connection) -> rusqlite::Result> { let mut stmt = conn.prepare( "SELECT id, timestamp, hash, mime_type, application, user_identity, description, actual_filename, human_filename, human_path, is_trashed, is_corrupted, size FROM assets", )?; let assets = stmt .query_map([], |row| { Ok(Asset { id: row.get(0)?, timestamp: row.get(1)?, hash: row.get(2)?, mime_type: row.get(3)?, application: row.get(4)?, user_identity: row.get(5)?, description: row.get(6)?, actual_filename: row.get(7)?, human_filename: row.get(8)?, human_path: row.get(9)?, is_trashed: row.get(10)?, is_corrupted: row.get(11)?, size: row.get(12)?, }) })? .collect::>>()?; Ok(assets) } /// Get assets with `timestamp > since` (for incremental sync queries). pub fn get_assets_since(conn: &Connection, since: i64) -> rusqlite::Result> { let mut stmt = conn.prepare( "SELECT id, timestamp, hash, mime_type, application, user_identity, description, actual_filename, human_filename, human_path, is_trashed, is_corrupted, size FROM assets WHERE timestamp > ?1 ORDER BY timestamp ASC", )?; let assets = stmt .query_map([since], |row| { Ok(Asset { id: row.get(0)?, timestamp: row.get(1)?, hash: row.get(2)?, mime_type: row.get(3)?, application: row.get(4)?, user_identity: row.get(5)?, description: row.get(6)?, actual_filename: row.get(7)?, human_filename: row.get(8)?, human_path: row.get(9)?, is_trashed: row.get(10)?, is_corrupted: row.get(11)?, size: row.get(12)?, }) })? .collect::>>()?; Ok(assets) } /// Get all non-trashed asset records (for verifier startup scan). pub fn get_all_active_assets(conn: &Connection) -> rusqlite::Result> { let mut stmt = conn.prepare( "SELECT id, timestamp, hash, mime_type, application, user_identity, description, actual_filename, human_filename, human_path, is_trashed, is_corrupted, size FROM assets WHERE is_trashed = 0", )?; let assets = stmt .query_map([], |row| { Ok(Asset { id: row.get(0)?, timestamp: row.get(1)?, hash: row.get(2)?, mime_type: row.get(3)?, application: row.get(4)?, user_identity: row.get(5)?, description: row.get(6)?, actual_filename: row.get(7)?, human_filename: row.get(8)?, human_path: row.get(9)?, is_trashed: row.get(10)?, is_corrupted: row.get(11)?, size: row.get(12)?, }) })? .collect::>>()?; Ok(assets) } #[cfg(test)] mod tests { use super::*; fn make_test_asset(ts: i64, hash: &str) -> Asset { Asset { id: 0, timestamp: ts, hash: hash.to_string(), mime_type: "text/plain".to_string(), application: Some("test_app".to_string()), user_identity: Some("test_user".to_string()), description: Some("test desc".to_string()), actual_filename: format!("{}_{}.txt", ts, hash), human_filename: Some("readme.txt".to_string()), human_path: Some("/docs/".to_string()), is_trashed: false, is_corrupted: false, size: 0, } } #[test] fn test_insert_and_get_asset() { let db = open_in_memory().unwrap(); let conn = db.lock().unwrap(); let asset = make_test_asset(1000, "abc123"); let id = insert_asset(&conn, &asset).unwrap(); assert!(id > 0); let found = get_asset_by_hash(&conn, "abc123").unwrap().unwrap(); assert_eq!(found.hash, "abc123"); assert_eq!(found.timestamp, 1000); assert_eq!(found.mime_type, "text/plain"); } #[test] fn test_get_nonexistent_asset() { let db = open_in_memory().unwrap(); let conn = db.lock().unwrap(); let found = get_asset_by_hash(&conn, "nonexistent").unwrap(); assert!(found.is_none()); } #[test] fn test_tags() { let db = open_in_memory().unwrap(); let conn = db.lock().unwrap(); let asset = make_test_asset(2000, "def456"); let id = insert_asset(&conn, &asset).unwrap(); let tags = vec!["photo".to_string(), "vacation".to_string()]; set_asset_tags(&conn, id, &tags).unwrap(); let fetched = get_asset_tags(&conn, id).unwrap(); assert_eq!(fetched, vec!["photo", "vacation"]); // Replace tags let new_tags = vec!["work".to_string()]; set_asset_tags(&conn, id, &new_tags).unwrap(); let fetched2 = get_asset_tags(&conn, id).unwrap(); assert_eq!(fetched2, vec!["work"]); } #[test] fn test_update_metadata() { let db = open_in_memory().unwrap(); let conn = db.lock().unwrap(); let asset = make_test_asset(3000, "ghi789"); insert_asset(&conn, &asset).unwrap(); let new_tags = vec!["updated".to_string()]; update_asset_metadata(&conn, "ghi789", Some("new desc"), Some(&new_tags)).unwrap(); let found = get_asset_by_hash(&conn, "ghi789").unwrap().unwrap(); assert_eq!(found.description, Some("new desc".to_string())); let tags = get_asset_tags(&conn, found.id).unwrap(); assert_eq!(tags, vec!["updated"]); } #[test] fn test_flag_corrupted() { let db = open_in_memory().unwrap(); let conn = db.lock().unwrap(); let asset = make_test_asset(4000, "corrupt1"); insert_asset(&conn, &asset).unwrap(); flag_corrupted(&conn, "corrupt1", true).unwrap(); let found = get_asset_by_hash(&conn, "corrupt1").unwrap().unwrap(); assert!(found.is_corrupted); flag_corrupted(&conn, "corrupt1", false).unwrap(); let found2 = get_asset_by_hash(&conn, "corrupt1").unwrap().unwrap(); assert!(!found2.is_corrupted); } #[test] fn test_trash_asset() { let db = open_in_memory().unwrap(); let conn = db.lock().unwrap(); let asset = make_test_asset(5000, "trash1"); insert_asset(&conn, &asset).unwrap(); trash_asset(&conn, "trash1").unwrap(); let found = get_asset_by_hash(&conn, "trash1").unwrap().unwrap(); assert!(found.is_trashed); } #[test] fn test_list_assets_basic() { let db = open_in_memory().unwrap(); let conn = db.lock().unwrap(); for i in 0..5 { let asset = make_test_asset(1000 + i, &format!("hash_{}", i)); insert_asset(&conn, &asset).unwrap(); } let params = ListParams { limit: Some(3), offset: Some(0), offset_time: None, order: Some("desc".to_string()), application: None, include_trashed: None, include_corrupted: None, }; let (assets, total) = list_assets(&conn, ¶ms).unwrap(); assert_eq!(total, 5); assert_eq!(assets.len(), 3); // DESC order: highest timestamp first assert!(assets[0].timestamp > assets[1].timestamp); } #[test] fn test_list_excludes_trashed_by_default() { let db = open_in_memory().unwrap(); let conn = db.lock().unwrap(); let a1 = make_test_asset(100, "visible1"); insert_asset(&conn, &a1).unwrap(); let a2 = make_test_asset(200, "trashed1"); insert_asset(&conn, &a2).unwrap(); trash_asset(&conn, "trashed1").unwrap(); let params = ListParams { limit: None, offset: None, offset_time: None, order: None, application: None, include_trashed: None, include_corrupted: None, }; let (assets, total) = list_assets(&conn, ¶ms).unwrap(); assert_eq!(total, 1); assert_eq!(assets[0].hash, "visible1"); } #[test] fn test_search_by_hash_prefix() { let db = open_in_memory().unwrap(); let conn = db.lock().unwrap(); let a1 = make_test_asset(100, "abcdef123"); let a2 = make_test_asset(200, "abcxyz789"); let a3 = make_test_asset(300, "zzz000111"); insert_asset(&conn, &a1).unwrap(); insert_asset(&conn, &a2).unwrap(); insert_asset(&conn, &a3).unwrap(); let params = SearchParams { hash: Some("abc".to_string()), start_time: None, end_time: None, tags: None, mime_type: None, user: None, application: None, limit: None, offset: None, order: None, include_trashed: None, include_corrupted: None, }; let (assets, total) = search_assets(&conn, ¶ms).unwrap(); assert_eq!(total, 2); assert!(assets.iter().all(|a| a.hash.starts_with("abc"))); } #[test] fn test_search_by_tags() { let db = open_in_memory().unwrap(); let conn = db.lock().unwrap(); let a1 = make_test_asset(100, "tagged1"); let id1 = insert_asset(&conn, &a1).unwrap(); set_asset_tags(&conn, id1, &["red".to_string(), "blue".to_string()]).unwrap(); let a2 = make_test_asset(200, "tagged2"); let id2 = insert_asset(&conn, &a2).unwrap(); set_asset_tags(&conn, id2, &["red".to_string()]).unwrap(); // Search for both red AND blue -> only tagged1 let params = SearchParams { hash: None, start_time: None, end_time: None, tags: Some("red,blue".to_string()), mime_type: None, user: None, application: None, limit: None, offset: None, order: None, include_trashed: None, include_corrupted: None, }; let (assets, total) = search_assets(&conn, ¶ms).unwrap(); assert_eq!(total, 1); assert_eq!(assets[0].hash, "tagged1"); } }