//! SQLite-backed handle registry. use std::path::Path; use std::sync::Arc; use chrono::{DateTime, Utc}; use kez_core::Identity; use rusqlite::{Connection, OptionalExtension, params}; use tokio::sync::Mutex; use crate::error::ApiError; #[derive(Debug, Clone)] pub struct HandleRecord { pub handle: String, pub primary: Identity, pub registered_at: DateTime, /// JSON array of claim subjects the user has published to their profile /// (e.g. ["github:alice","dns:alice.com"]). Discovery only — peers /// independently verify each against the channel. None until set. pub proofs: Option, } #[derive(Clone)] pub struct Store { inner: Arc>, } impl Store { pub fn open(path: &Path) -> Result { let conn = Connection::open(path)?; init_schema(&conn)?; Ok(Self { inner: Arc::new(Mutex::new(conn)), }) } pub fn open_in_memory() -> Result { let conn = Connection::open_in_memory()?; init_schema(&conn)?; Ok(Self { inner: Arc::new(Mutex::new(conn)), }) } /// Reserve a handle for a primary key. Fails with Conflict if the /// handle is already taken, or if this primary key has already /// registered a (different) handle. pub async fn register(&self, record: &HandleRecord) -> Result<(), ApiError> { let conn = self.inner.lock().await; conn.execute( "INSERT INTO handles (handle, primary_id, registered_at) VALUES (?1, ?2, ?3)", params![ record.handle, record.primary.to_string(), record.registered_at.to_rfc3339(), ], ) .map_err(|e| match e { rusqlite::Error::SqliteFailure(err, _) if err.code == rusqlite::ErrorCode::ConstraintViolation => { ApiError::Conflict("handle is already taken".into()) } other => ApiError::Internal(format!("db: {other}")), })?; Ok(()) } /// Look up the record for `handle`. Returns None if not registered. pub async fn lookup(&self, handle: &str) -> Result, ApiError> { let conn = self.inner.lock().await; let row = conn .query_row( "SELECT handle, primary_id, registered_at, proofs FROM handles WHERE handle = ?1", params![handle], row_to_record_parts, ) .optional()?; row.map(build_record).transpose() } /// Replace the published proof-subject list for `handle`. pub async fn set_proofs(&self, handle: &str, proofs_json: &str) -> Result<(), ApiError> { let conn = self.inner.lock().await; let n = conn.execute( "UPDATE handles SET proofs = ?1 WHERE handle = ?2", params![proofs_json, handle], )?; if n == 0 { return Err(ApiError::NotFound); } Ok(()) } /// Look up the record for a primary key — used by the NATS auth /// callout: NATS sends us a connecting client's nkey, we figure out /// which handle (if any) owns it. pub async fn lookup_by_primary( &self, primary: &Identity, ) -> Result, ApiError> { let conn = self.inner.lock().await; let row = conn .query_row( "SELECT handle, primary_id, registered_at, proofs FROM handles WHERE primary_id = ?1", params![primary.to_string()], row_to_record_parts, ) .optional()?; row.map(build_record).transpose() } } type RecordParts = (String, String, String, Option); fn row_to_record_parts(row: &rusqlite::Row) -> rusqlite::Result { Ok((row.get(0)?, row.get(1)?, row.get(2)?, row.get(3)?)) } fn build_record(parts: RecordParts) -> Result { let (handle, primary_id, registered_at, proofs) = parts; let primary = Identity::parse(primary_id) .map_err(|e| ApiError::Internal(format!("stored primary not parseable: {e}")))?; let registered_at = DateTime::parse_from_rfc3339(®istered_at) .map_err(|e| ApiError::Internal(format!("stored timestamp not parseable: {e}")))? .with_timezone(&Utc); Ok(HandleRecord { handle, primary, registered_at, proofs, }) } fn init_schema(conn: &Connection) -> Result<(), rusqlite::Error> { // New column for existing databases — ignore "duplicate column" so // re-running on an already-migrated DB is a no-op. let _ = conn.execute("ALTER TABLE handles ADD COLUMN proofs TEXT", []); conn.execute_batch( "CREATE TABLE IF NOT EXISTS handles ( handle TEXT NOT NULL PRIMARY KEY, primary_id TEXT NOT NULL UNIQUE, registered_at TEXT NOT NULL, proofs TEXT ); CREATE INDEX IF NOT EXISTS idx_handles_primary ON handles (primary_id); -- Opaque encrypted-envelope mailbox. The server can't read these; -- it's a dumb relay that stores blobs addressed to a handle and -- hands them back when the handle's owner authenticates a poll. CREATE TABLE IF NOT EXISTS messages ( seq INTEGER PRIMARY KEY AUTOINCREMENT, recipient_handle TEXT NOT NULL, envelope TEXT NOT NULL, created_at TEXT NOT NULL ); CREATE INDEX IF NOT EXISTS idx_messages_recipient ON messages (recipient_handle, seq);", ) } // ───────────────────────────────────────────────────────────────────────────── // Messages (opaque encrypted envelopes, server-side dumb relay) // ───────────────────────────────────────────────────────────────────────────── /// One stored envelope. The server doesn't introspect the envelope JSON — /// recipients decrypt it client-side. We surface only what's needed for /// addressing + ordering: `seq`, when we received it, and the raw blob. #[derive(Debug, Clone)] pub struct StoredMessage { pub seq: i64, pub envelope: String, pub created_at: DateTime, } impl Store { /// Append an envelope to `recipient`'s mailbox. Returns the assigned seq. pub async fn store_message( &self, recipient: &str, envelope: &str, ) -> Result { let now = Utc::now().to_rfc3339(); let conn = self.inner.lock().await; conn.execute( "INSERT INTO messages (recipient_handle, envelope, created_at) VALUES (?1, ?2, ?3)", params![recipient, envelope, now], )?; Ok(conn.last_insert_rowid()) } /// Pull envelopes for `recipient` with `seq > since`, oldest first. /// `limit` caps the response so a long-offline user can't blow up a /// single poll — the client paginates by re-calling with the new max seq. pub async fn inbox( &self, recipient: &str, since: i64, limit: i64, ) -> Result, ApiError> { let conn = self.inner.lock().await; let mut stmt = conn.prepare( "SELECT seq, envelope, created_at FROM messages WHERE recipient_handle = ?1 AND seq > ?2 ORDER BY seq ASC LIMIT ?3", )?; let rows = stmt .query_map(params![recipient, since, limit], |row| { let seq: i64 = row.get(0)?; let envelope: String = row.get(1)?; let created_at: String = row.get(2)?; Ok((seq, envelope, created_at)) })? .collect::, _>>()?; let mut out = Vec::with_capacity(rows.len()); for (seq, envelope, created_at) in rows { let created_at = DateTime::parse_from_rfc3339(&created_at) .map_err(|e| { ApiError::Internal(format!("stored message ts not parseable: {e}")) })? .with_timezone(&Utc); out.push(StoredMessage { seq, envelope, created_at, }); } Ok(out) } }