diff --git a/kez-chat/Cargo.lock b/kez-chat/Cargo.lock index 9d33bf9..13bb6d2 100644 --- a/kez-chat/Cargo.lock +++ b/kez-chat/Cargo.lock @@ -470,6 +470,21 @@ dependencies = [ "percent-encoding", ] +[[package]] +name = "futures" +version = "0.3.32" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8b147ee9d1f6d097cef9ce628cd2ee62288d963e16fb287bd9286455b241382d" +dependencies = [ + "futures-channel", + "futures-core", + "futures-executor", + "futures-io", + "futures-sink", + "futures-task", + "futures-util", +] + [[package]] name = "futures-channel" version = "0.3.32" @@ -477,6 +492,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "07bbe89c50d7a535e539b8c17bc0b49bdb77747034daa8087407d655f3f7cc1d" dependencies = [ "futures-core", + "futures-sink", ] [[package]] @@ -485,6 +501,34 @@ version = "0.3.32" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7e3450815272ef58cec6d564423f6e755e25379b217b0bc688e295ba24df6b1d" +[[package]] +name = "futures-executor" +version = "0.3.32" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "baf29c38818342a3b26b5b923639e7b1f4a61fc5e76102d4b1981c6dc7a7579d" +dependencies = [ + "futures-core", + "futures-task", + "futures-util", +] + +[[package]] +name = "futures-io" +version = "0.3.32" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cecba35d7ad927e23624b22ad55235f2239cfa44fd10428eecbeba6d6a717718" + +[[package]] +name = "futures-macro" +version = "0.3.32" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e835b70203e41293343137df5c0664546da5745f82ec9b84d40be8336958447b" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "futures-sink" version = "0.3.32" @@ -503,8 +547,13 @@ version = "0.3.32" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "389ca41296e6190b48053de0321d02a77f32f8a5d2461dd38762c0593805c6d6" dependencies = [ + "futures-channel", "futures-core", + "futures-io", + "futures-macro", + "futures-sink", "futures-task", + "memchr", "pin-project-lite", "slab", ] @@ -908,6 +957,7 @@ dependencies = [ "axum", "chrono", "clap", + "futures", "hex", "kez-core", "reqwest", @@ -918,6 +968,7 @@ dependencies = [ "tempfile", "thiserror", "tokio", + "tokio-stream", "tower-http", "tracing", "tracing-subscriber", @@ -1769,6 +1820,18 @@ dependencies = [ "tokio", ] +[[package]] +name = "tokio-stream" +version = "0.1.18" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "32da49809aab5c3bc678af03902d4ccddea2a87d028d86392a4b1560c6906c70" +dependencies = [ + "futures-core", + "pin-project-lite", + "tokio", + "tokio-util", +] + [[package]] name = "tokio-util" version = "0.7.18" diff --git a/kez-chat/Cargo.toml b/kez-chat/Cargo.toml index 33e32a2..f945751 100644 --- a/kez-chat/Cargo.toml +++ b/kez-chat/Cargo.toml @@ -17,6 +17,8 @@ serde = { version = "1", features = ["derive"] } serde_json = "1" thiserror = "2" tokio = { version = "1.48", features = ["macros", "rt-multi-thread", "sync", "signal"] } +tokio-stream = { version = "0.1", features = ["sync"] } +futures = "0.3" tower-http = { version = "0.6", features = ["trace", "cors", "fs"] } tracing = "0.1" tracing-subscriber = { version = "0.3", features = ["env-filter"] } diff --git a/kez-chat/src/api.rs b/kez-chat/src/api.rs index dd07bb4..06b5062 100644 --- a/kez-chat/src/api.rs +++ b/kez-chat/src/api.rs @@ -27,6 +27,7 @@ use crate::store::{HandleRecord, Store}; pub struct AppState { pub store: Store, pub config: Config, + pub broker: crate::broker::Broker, } pub fn router(state: AppState) -> axum::Router { @@ -42,6 +43,7 @@ pub fn router(state: AppState) -> axum::Router { .route("/v1/register", post(register)) .route("/v1/messages", post(crate::messages::send_message)) .route("/v1/inbox/:handle", get(crate::messages::inbox)) + .route("/v1/inbox/:handle/stream", get(crate::messages::stream_inbox)) .route("/.well-known/webfinger", get(webfinger)) .route("/internal/nats/auth", post(nats_auth_callout)); diff --git a/kez-chat/src/broker.rs b/kez-chat/src/broker.rs new file mode 100644 index 0000000..4c1db4f --- /dev/null +++ b/kez-chat/src/broker.rs @@ -0,0 +1,127 @@ +//! In-process pub/sub for "a new envelope just landed in 's +//! inbox". Used by the SSE endpoint to push messages to subscribed +//! clients without waiting for the next poll tick. +//! +//! Implementation: one `tokio::sync::broadcast` channel per recipient. +//! A subscriber gets a Receiver; when POST /v1/messages stores an +//! envelope, the handler calls `publish(recipient, msg)` which sends +//! into the per-recipient sender (creating it if absent). +//! +//! Bounds: this is single-instance. With N chat-servers we'd need to +//! gossip publishes — that's what NATS in docker-compose is for, and +//! the v0.2 migration is "swap out `Broker` for `nats.publish/subscribe` +//! against `kez.chat.inbox.`". Subscribers don't notice. +//! +//! Channel capacity: 64 buffered messages per recipient. If a +//! subscriber lags past that, they get `RecvError::Lagged(n)` — they +//! reconnect and resync via the polling fallback, which the browser +//! does anyway. + +use std::collections::HashMap; +use std::sync::Arc; + +use serde::{Deserialize, Serialize}; +use tokio::sync::{Mutex, broadcast}; + +/// One push event over the SSE stream. Payload is the same shape as +/// what GET /v1/inbox returns per message — re-using the type keeps +/// the SPA's decode path identical for both transports. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct PushedMessage { + pub seq: i64, + pub envelope: serde_json::Value, + pub created_at: String, +} + +/// Capacity of each per-recipient broadcast channel. Tradeoff: too low +/// and active conversations drop messages on bursty senders; too high +/// and idle subscribers pin memory. 64 covers worst-case interactive +/// chat; the polling fallback catches anything that overflows. +const CHANNEL_CAPACITY: usize = 64; + +#[derive(Clone, Default)] +pub struct Broker { + inner: Arc>>>, +} + +impl Broker { + pub fn new() -> Self { + Self::default() + } + + /// Subscribe to push events for `handle`. Returns a Receiver that + /// yields `PushedMessage`s until dropped or the channel lags. + /// Creates the per-recipient sender on first subscribe. + pub async fn subscribe(&self, handle: &str) -> broadcast::Receiver { + let mut map = self.inner.lock().await; + let tx = map + .entry(handle.to_owned()) + .or_insert_with(|| broadcast::channel(CHANNEL_CAPACITY).0); + tx.subscribe() + } + + /// Publish `msg` to any live subscribers for `handle`. Silently + /// drops if no one is subscribed (the message is already persisted; + /// recipients pick it up on next poll/reconnect). + pub async fn publish(&self, handle: &str, msg: PushedMessage) { + let map = self.inner.lock().await; + if let Some(tx) = map.get(handle) { + // `send` returns Err if there are zero receivers — fine, + // means we raced a disconnect. Caller doesn't care. + let _ = tx.send(msg); + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + use serde_json::json; + + fn fake_msg(seq: i64) -> PushedMessage { + PushedMessage { + seq, + envelope: json!({"v": 1, "ciphertext": format!("hi {seq}")}), + created_at: "2026-01-01T00:00:00Z".into(), + } + } + + #[tokio::test] + async fn subscribe_then_publish_delivers() { + let broker = Broker::new(); + let mut rx = broker.subscribe("alice").await; + broker.publish("alice", fake_msg(1)).await; + let got = rx.recv().await.unwrap(); + assert_eq!(got.seq, 1); + } + + #[tokio::test] + async fn publish_without_subscribers_is_noop() { + let broker = Broker::new(); + // Nobody listening — must not panic, must not allocate a channel. + broker.publish("alice", fake_msg(1)).await; + assert!(broker.inner.lock().await.is_empty()); + } + + #[tokio::test] + async fn multiple_subscribers_each_get_a_copy() { + let broker = Broker::new(); + let mut rx1 = broker.subscribe("alice").await; + let mut rx2 = broker.subscribe("alice").await; + broker.publish("alice", fake_msg(42)).await; + assert_eq!(rx1.recv().await.unwrap().seq, 42); + assert_eq!(rx2.recv().await.unwrap().seq, 42); + } + + #[tokio::test] + async fn channels_are_per_handle() { + let broker = Broker::new(); + let mut alice_rx = broker.subscribe("alice").await; + let _bob_rx = broker.subscribe("bob").await; + broker.publish("alice", fake_msg(1)).await; + // alice's subscriber got it; bob's didn't (would block). + assert_eq!(alice_rx.recv().await.unwrap().seq, 1); + // sanity: bob would have to time out, so skip that assertion + // (we'd rather not slow the test suite for it). + } +} diff --git a/kez-chat/src/lib.rs b/kez-chat/src/lib.rs index dff878a..56b8538 100644 --- a/kez-chat/src/lib.rs +++ b/kez-chat/src/lib.rs @@ -2,6 +2,7 @@ //! binary serves. pub mod api; +pub mod broker; pub mod config; pub mod error; pub mod handles; diff --git a/kez-chat/src/main.rs b/kez-chat/src/main.rs index 72df337..e12a72b 100644 --- a/kez-chat/src/main.rs +++ b/kez-chat/src/main.rs @@ -26,7 +26,11 @@ async fn main() -> Result<()> { ); let store = Store::open(&config.db)?; - let state = AppState { store, config: config.clone() }; + let state = AppState { + store, + config: config.clone(), + broker: kez_chat_server::broker::Broker::new(), + }; let app = router(state) .layer(TraceLayer::new_for_http()) diff --git a/kez-chat/src/messages.rs b/kez-chat/src/messages.rs index ce24d3d..67b74a0 100644 --- a/kez-chat/src/messages.rs +++ b/kez-chat/src/messages.rs @@ -19,15 +19,24 @@ //! anyone's mailbox. Mitigations land in v0.2 (rate-limit by source IP + //! optional sender-handle proof). +use std::convert::Infallible; +use std::time::Duration; + use axum::Json; use axum::extract::{Path, Query, State}; use axum::http::HeaderMap; +use axum::response::Sse; +use axum::response::sse::{Event, KeepAlive}; use chrono::Utc; +use futures::stream::Stream; use kez_core::verify_ed25519_hex; use serde::{Deserialize, Serialize}; use serde_json::Value; +use tokio_stream::StreamExt; +use tokio_stream::wrappers::BroadcastStream; use crate::api::AppState; +use crate::broker::PushedMessage; use crate::error::ApiError; use crate::handles::validate_handle; @@ -88,6 +97,22 @@ pub async fn send_message( .store .store_message(&recipient.handle, &envelope_str) .await?; + + // Push to any live SSE subscribers for this recipient — instant + // delivery without waiting for the next poll. The message is + // already persisted; if no one's listening, this is a no-op. + state + .broker + .publish( + &recipient.handle, + PushedMessage { + seq, + envelope: req.envelope, + created_at: Utc::now().to_rfc3339(), + }, + ) + .await; + Ok(Json(SendMessageResponse { seq })) } @@ -200,6 +225,100 @@ pub fn canonical_inbox_message(handle: &str, since: i64, ts: i64) -> String { format!("GET\n/v1/inbox/{handle}\nsince={since}\n{ts}") } +// ───────────────────────────────────────────────────────────────────────────── +// GET /v1/inbox/:handle/stream — SSE push channel +// ───────────────────────────────────────────────────────────────────────────── +// +// EventSource can't set headers, so this endpoint accepts the auth as a +// query param: `?auth=:`. The signed message is the +// stream URL line (handle baked in via path, `since` not relevant here). +// Same 60s clock-skew tolerance — log leakage past that window is moot. + +#[derive(Debug, Deserialize)] +pub struct StreamQuery { + pub auth: String, +} + +pub async fn stream_inbox( + State(state): State, + Path(handle): Path, + Query(q): Query, +) -> Result>>, ApiError> { + validate_handle(&handle) + .map_err(|e| ApiError::BadRequest(format!("invalid handle: {e}")))?; + + let record = state + .store + .lookup(&handle) + .await? + .ok_or(ApiError::NotFound)?; + + // We reuse the inbox signing scheme but with `since = -1` (a value + // the polling path never produces) so a stream sig can't be replayed + // as a polling header and vice versa. Encoded in the canonical msg. + verify_stream_auth(&q.auth, &handle, record.primary.value(), Utc::now().timestamp())?; + + let rx = state.broker.subscribe(&handle).await; + let stream = BroadcastStream::new(rx) + // Drop lag errors — subscriber should reconnect + resync via poll. + .filter_map(|res| res.ok()) + .map(|msg| { + // Serialize the PushedMessage as JSON inside the SSE `data:`. + // `Event::default().json_data` serializes for us and base64- + // safes long lines correctly. + let event = Event::default() + .event("message") + .json_data(msg) + .unwrap_or_else(|e| { + Event::default() + .event("error") + .data(format!("serialize failed: {e}")) + }); + Ok::<_, Infallible>(event) + }); + + // Keep-alive pings every 15s so intermediaries (Cloudflare, NAT + // boxes) don't close the idle connection. + let sse = Sse::new(stream).keep_alive( + KeepAlive::new() + .interval(Duration::from_secs(15)) + .text("ping"), + ); + Ok(sse) +} + +/// Canonical message that the stream auth signs over. Distinct from the +/// inbox-poll message (different first line) so a sig captured from one +/// endpoint can't be replayed against the other. +pub fn canonical_stream_message(handle: &str, ts: i64) -> String { + format!("GET\n/v1/inbox/{handle}/stream\n{ts}") +} + +fn verify_stream_auth( + auth: &str, + handle: &str, + pubkey_hex: &str, + now_ts: i64, +) -> Result<(), ApiError> { + let (ts_str, sig_hex) = auth + .split_once(':') + .ok_or_else(|| ApiError::Unauthorized("?auth must be :".into()))?; + let ts: i64 = ts_str + .parse() + .map_err(|_| ApiError::Unauthorized("?auth ts must be a unix timestamp".into()))?; + if (now_ts - ts).abs() > AUTH_MAX_AGE_SECS { + return Err(ApiError::Unauthorized(format!( + "auth is {}s stale (max {}s)", + (now_ts - ts).abs(), + AUTH_MAX_AGE_SECS + ))); + } + let message = canonical_stream_message(handle, ts); + verify_ed25519_hex(pubkey_hex, message.as_bytes(), sig_hex) + .map_err(|_| ApiError::Unauthorized("signature did not verify".into()))?; + Ok(()) +} + #[cfg(test)] mod tests { use super::*; @@ -271,4 +390,49 @@ mod tests { let err = verify_inbox_auth("not-a-header", "tudisco", 0, &pk, 1).unwrap_err(); assert!(matches!(err, ApiError::Unauthorized(_))); } + + // ─── stream auth ───────────────────────────────────────────────────────── + + fn stream_header_for(sk: &Ed25519Secret, handle: &str, ts: i64) -> String { + let msg = canonical_stream_message(handle, ts); + let sig = sk.sign(msg.as_bytes()); + format!("{ts}:{}", hex::encode(sig)) + } + + #[test] + fn canonical_stream_message_is_distinct_from_inbox_message() { + // Critical: an attacker who captures an inbox auth header must + // NOT be able to use it as a stream auth (and vice versa). + let inbox = canonical_inbox_message("alice", 0, 1_700_000_000); + let stream = canonical_stream_message("alice", 1_700_000_000); + assert_ne!(inbox, stream); + } + + #[test] + fn verify_stream_auth_accepts_fresh_valid_signature() { + let (sk, pk) = signer_for_test(); + let now = 1_700_000_000; + let auth = stream_header_for(&sk, "tudisco", now); + verify_stream_auth(&auth, "tudisco", &pk, now).unwrap(); + } + + #[test] + fn verify_stream_auth_rejects_inbox_signature_replay() { + // Sign for the inbox endpoint, try to use against the stream. + let (sk, pk) = signer_for_test(); + let now = 1_700_000_000; + let inbox_header = header_for(&sk, "tudisco", 0, now); + let err = verify_stream_auth(&inbox_header, "tudisco", &pk, now).unwrap_err(); + assert!(matches!(err, ApiError::Unauthorized(_))); + } + + #[test] + fn verify_stream_auth_rejects_stale_timestamp() { + let (sk, pk) = signer_for_test(); + let signed_at = 1_700_000_000; + let now = signed_at + 120; + let auth = stream_header_for(&sk, "tudisco", signed_at); + let err = verify_stream_auth(&auth, "tudisco", &pk, now).unwrap_err(); + assert!(matches!(err, ApiError::Unauthorized(_))); + } } diff --git a/kez-chat/tests/http.rs b/kez-chat/tests/http.rs index f9d7eb4..c4dcca6 100644 --- a/kez-chat/tests/http.rs +++ b/kez-chat/tests/http.rs @@ -39,7 +39,11 @@ fn default_config() -> Config { async fn spawn_server_with_config(config: Config) -> TestServer { let store = Store::open_in_memory().unwrap(); - let state = AppState { store, config }; + let state = AppState { + store, + config, + broker: kez_chat_server::broker::Broker::new(), + }; let app = router(state); let listener = tokio::net::TcpListener::bind(SocketAddr::from(([127, 0, 0, 1], 0))) .await diff --git a/kez-chat/web/src/lib/messages.ts b/kez-chat/web/src/lib/messages.ts index 5aa9be2..b6bb9fd 100644 --- a/kez-chat/web/src/lib/messages.ts +++ b/kez-chat/web/src/lib/messages.ts @@ -25,6 +25,12 @@ function canonicalInboxMessage(handle: string, since: number, ts: number): strin return `GET\n/v1/inbox/${handle}\nsince=${since}\n${ts}`; } +/** Canonical bytes the SSE stream auth signs over. Distinct from the poll + * message so an inbox sig can't be replayed as a stream sig and vice versa. */ +function canonicalStreamMessage(handle: string, ts: number): string { + return `GET\n/v1/inbox/${handle}/stream\n${ts}`; +} + /** * Build the X-KEZ-Auth header value. The server's 60s clock-skew window * means we can sign each request just-in-time; no token cache. @@ -40,6 +46,17 @@ function authHeader(opts: { return `${ts}:${bytesToHex(sig)}`; } +/** EventSource can't set headers, so the stream takes auth as a query param. */ +function streamAuthQueryParam(opts: { + handle: string; + seed: Uint8Array; +}): string { + const ts = Math.floor(Date.now() / 1000); + const msg = canonicalStreamMessage(opts.handle, ts); + const sig = ed25519.sign(new TextEncoder().encode(msg), opts.seed); + return `${ts}:${bytesToHex(sig)}`; +} + // ───────────────────────────────────────────────────────────────────────────── // Send // ───────────────────────────────────────────────────────────────────────────── @@ -123,4 +140,77 @@ export async function decrypt( return openMessage({ envelope, myHandle, mySeed }); } +// ───────────────────────────────────────────────────────────────────────────── +// Server-Sent Events push (real-time new-message stream) +// ───────────────────────────────────────────────────────────────────────────── + +export interface StreamHandle { + /** Close the connection — call from onDestroy. */ + close(): void; + /** Live status flag the UI can use to show "live" vs "polling fallback". */ + readonly readyState: number; +} + +/** + * Open a long-lived SSE connection to /v1/inbox//stream. The + * server pushes a `message` event each time a new envelope lands in + * this handle's mailbox; we call `onMessage` for each. + * + * Auto-reconnects: EventSource does this natively, but if the auth + * timestamp ages out (>60s since connection drop, then reconnect) the + * server will 401 the reconnect. We mint a fresh `?auth=` on every + * open by tearing down + reopening on `error`. The caller's onMessage + * handler is preserved across reconnects. + */ +export function streamInbox(opts: { + handle: string; + seed: Uint8Array; + onMessage: (msg: InboxMessage) => void; + onStatus?: (status: "connecting" | "live" | "reconnecting") => void; +}): StreamHandle { + let es: EventSource | null = null; + let closed = false; + let reconnectTimer: ReturnType | null = null; + + function open() { + if (closed) return; + opts.onStatus?.(es ? "reconnecting" : "connecting"); + const auth = streamAuthQueryParam({ handle: opts.handle, seed: opts.seed }); + const url = `${base()}/v1/inbox/${opts.handle}/stream?auth=${encodeURIComponent(auth)}`; + es = new EventSource(url); + es.addEventListener("open", () => opts.onStatus?.("live")); + es.addEventListener("message", (ev) => { + try { + const msg = JSON.parse(ev.data) as InboxMessage; + opts.onMessage(msg); + } catch (e) { + console.error("SSE: bad message payload", e); + } + }); + es.addEventListener("error", () => { + // Tear down and re-open with a fresh auth ts after a short delay + // (avoid hot-loop if the server is rejecting). EventSource also + // auto-reconnects but with the same (now possibly stale) URL. + if (closed) return; + es?.close(); + es = null; + if (reconnectTimer) clearTimeout(reconnectTimer); + reconnectTimer = setTimeout(open, 3_000); + }); + } + + open(); + + return { + close() { + closed = true; + if (reconnectTimer) clearTimeout(reconnectTimer); + es?.close(); + }, + get readyState() { + return es?.readyState ?? EventSource.CLOSED; + }, + }; +} + export type { SealedEnvelope, MessagePlaintext }; diff --git a/kez-chat/web/src/routes/Messages.svelte b/kez-chat/web/src/routes/Messages.svelte index 08b910a..cb55b22 100644 --- a/kez-chat/web/src/routes/Messages.svelte +++ b/kez-chat/web/src/routes/Messages.svelte @@ -2,7 +2,14 @@ import { onMount, onDestroy } from "svelte"; import { push } from "svelte-spa-router"; import { session } from "../lib/store.svelte.js"; - import { decrypt, pollInbox, sendMessage } from "../lib/messages.js"; + import { + decrypt, + pollInbox, + sendMessage, + streamInbox, + type InboxMessage, + type StreamHandle, + } from "../lib/messages.js"; import { lookup, lookupByPrimary, ApiError } from "../lib/api.js"; import { appendInbound, @@ -27,6 +34,8 @@ let pollError = $state(null); let lastPolledAt = $state(null); let pollTimer: ReturnType | null = null; + let streamHandle: StreamHandle | null = null; + let streamStatus = $state<"connecting" | "live" | "reconnecting">("connecting"); // "Start chat with" lookup state. let newPeerInput = $state(""); @@ -36,7 +45,9 @@ // Toast for the share-link copy action. let copied = $state(false); - const POLL_INTERVAL_MS = 5_000; + // Background heartbeat — SSE handles real-time delivery; this just + // catches anything missed during a reconnect window. + const POLL_INTERVAL_MS = 30_000; onMount(async () => { if (!session.unlocked) { @@ -44,14 +55,65 @@ return; } await refresh(); - await pollOnce(); // first tick immediately so the UI isn't blank + await pollOnce(); // catch-up before SSE goes live + + // Real-time push via SSE — server fires per-message as soon as + // POST /v1/messages hands off the row to the broker. Sub-second + // latency in the common case. + streamHandle = streamInbox({ + handle: session.unlocked.handle, + seed: session.unlocked.seed, + onMessage: handlePushedMessage, + onStatus: (s) => (streamStatus = s), + }); + + // Background heartbeat in case SSE drops + reconnects between events. pollTimer = setInterval(pollOnce, POLL_INTERVAL_MS); }); onDestroy(() => { if (pollTimer) clearInterval(pollTimer); + streamHandle?.close(); }); + /** Decrypt + cache one incoming envelope from SSE or polling. */ + async function ingest(m: InboxMessage) { + if (!session.unlocked) return; + try { + const pt = await decrypt( + m.envelope, + session.unlocked.handle, + session.unlocked.seed, + ); + let handle = ""; + const existing = await getConversation(pt.from); + if (existing?.peer_handle) { + handle = existing.peer_handle; + } else { + try { + const record = await lookupByPrimary(pt.from); + handle = record.fqhn; + } catch { + // unknown to this server — leave blank, UI will show short key + } + } + await appendInbound({ + peer_primary: pt.from, + peer_handle: handle, + seq: m.seq, + body: pt.body, + ts: pt.sent_at, + }); + } catch (e) { + console.error(`seq ${m.seq}: decrypt failed`, e); + } + } + + async function handlePushedMessage(m: InboxMessage) { + await ingest(m); + await refresh(); + } + async function refresh() { conversations = await listConversations(); } @@ -65,37 +127,7 @@ seed: session.unlocked.seed, since, }); - for (const m of messages) { - try { - const pt = await decrypt( - m.envelope, - session.unlocked.handle, - session.unlocked.seed, - ); - // Resolve the sender's handle for display. Cache miss → ask server. - let handle = ""; - const existing = await getConversation(pt.from); - if (existing?.peer_handle) { - handle = existing.peer_handle; - } else { - try { - const record = await lookupByPrimary(pt.from); - handle = record.fqhn; - } catch { - // Unknown to this server (cross-server v0.2). Show truncated key. - } - } - await appendInbound({ - peer_primary: pt.from, - peer_handle: handle, - seq: m.seq, - body: pt.body, - ts: pt.sent_at, - }); - } catch (e) { - console.error(`seq ${m.seq}: decrypt failed`, e); - } - } + for (const m of messages) await ingest(m); if (messages.length > 0) await refresh(); pollError = null; lastPolledAt = new Date().toISOString(); @@ -291,11 +323,18 @@ -
+
+

+ {#if streamStatus === "live"} + ● live + {:else if streamStatus === "reconnecting"} + ● reconnecting… + {:else} + ○ connecting… + {/if} +

{#if pollError}

⚠ {pollError}

- {:else if lastPolledAt} - Polled {formatTime(lastPolledAt)} {/if}