//! SQLite-backed sigchain store. One table, one row per event. //! //! Concurrency: a single `tokio::sync::Mutex` serializes all //! writes. This is fine at any realistic single-instance scale — sigchain //! writes are rare events (one per identity change) and read paths can be //! served from the same lock without contention worth optimizing. use std::path::Path; use std::sync::Arc; use kez_core::{Identity, Sigchain, SignedSigchainEvent}; use rusqlite::{Connection, OptionalExtension, params}; use tokio::sync::Mutex; use crate::error::ApiError; /// Shared store handle. Cheap to clone — wraps an `Arc`. #[derive(Clone)] pub struct Store { inner: Arc>, } impl Store { pub fn open(path: &Path) -> Result { let conn = Connection::open(path)?; init_schema(&conn)?; Ok(Self { inner: Arc::new(Mutex::new(conn)), }) } pub fn open_in_memory() -> Result { let conn = Connection::open_in_memory()?; init_schema(&conn)?; Ok(Self { inner: Arc::new(Mutex::new(conn)), }) } /// Load all events for `primary` and return them as a validated `Sigchain`. /// Returns an empty `Sigchain` if no events exist for this primary. pub async fn load_chain(&self, primary: &Identity) -> Result { let conn = self.inner.lock().await; let mut stmt = conn.prepare( "SELECT envelope_json FROM sigchain_events WHERE primary_scheme = ?1 AND primary_id = ?2 ORDER BY seq ASC", )?; let rows = stmt .query_map(params![primary.scheme(), primary.value()], |row| { row.get::<_, String>(0) })? .collect::, _>>()?; let mut chain = Sigchain::new(primary.clone()); for json in rows { let event: SignedSigchainEvent = serde_json::from_str(&json)?; chain.append(event)?; } Ok(chain) } /// Append a pre-validated event. Caller must have already passed it /// through `Sigchain::append`. We re-do the write transactionally to /// guard against a racing writer (INSERT OR ABORT on the (primary, seq) /// PK provides this). pub async fn append( &self, primary: &Identity, event: &SignedSigchainEvent, ) -> Result<(), ApiError> { let envelope_json = serde_json::to_string(event)?; let envelope_hash = event .hash() .map_err(|e| ApiError::Internal(format!("hash: {e}")))?; let seq = event.payload.seq as i64; let created_at = event.payload.created_at.to_rfc3339(); let conn = self.inner.lock().await; conn.execute( "INSERT INTO sigchain_events (primary_scheme, primary_id, seq, envelope_json, envelope_hash, created_at) VALUES (?1, ?2, ?3, ?4, ?5, ?6)", params![ primary.scheme(), primary.value(), seq, envelope_json, envelope_hash, created_at, ], ) .map_err(|e| match e { rusqlite::Error::SqliteFailure(err, _) if err.code == rusqlite::ErrorCode::ConstraintViolation => { ApiError::Conflict(format!("seq {} already exists for this primary", seq)) } other => ApiError::Internal(format!("db: {other}")), })?; Ok(()) } /// Just the head event, if any. pub async fn head(&self, primary: &Identity) -> Result, ApiError> { let conn = self.inner.lock().await; let row = conn .query_row( "SELECT envelope_json FROM sigchain_events WHERE primary_scheme = ?1 AND primary_id = ?2 ORDER BY seq DESC LIMIT 1", params![primary.scheme(), primary.value()], |row| row.get::<_, String>(0), ) .optional()?; match row { None => Ok(None), Some(json) => Ok(Some(serde_json::from_str(&json)?)), } } } fn init_schema(conn: &Connection) -> Result<(), rusqlite::Error> { conn.execute_batch( "CREATE TABLE IF NOT EXISTS sigchain_events ( primary_scheme TEXT NOT NULL, primary_id TEXT NOT NULL, seq INTEGER NOT NULL, envelope_json TEXT NOT NULL, envelope_hash TEXT NOT NULL, created_at TEXT NOT NULL, PRIMARY KEY (primary_scheme, primary_id, seq) ); CREATE INDEX IF NOT EXISTS idx_primary ON sigchain_events (primary_scheme, primary_id);", ) }