feat(kez-chat): real-time messages via SSE — sub-second delivery
Chat was polling every 5s, which felt sluggish with two users online.
Switched to Server-Sent Events for push delivery. Polling now runs as
a 30s heartbeat just to catch anything missed during reconnect windows.
NATS is still bundled in docker-compose but no Rust code talks to it
yet — that lands in v0.2 for cross-instance fanout. The migration is
"swap the in-process broker for nats.publish/subscribe against
kez.chat.inbox.<handle>"; SSE subscribers don't notice.
Server (kez-chat-server):
• New broker module: per-recipient tokio::sync::broadcast channels,
in-process pub/sub. 64-slot buffer per channel; lagging subscribers
drop on the floor and resync via the polling heartbeat. 4 unit
tests cover subscribe/publish, multi-subscriber fanout, per-handle
isolation, no-op on no-subscribers.
• POST /v1/messages now publishes to broker after persisting → any
open SSE stream for the recipient gets the envelope immediately.
• New GET /v1/inbox/:handle/stream — SSE endpoint, ?auth=<ts>:<sig>
query param (EventSource can't set headers). Signed message is
distinct from the polling header ("GET\n/v1/inbox/<h>/stream\n<ts>"
vs "GET\n/v1/inbox/<h>\nsince=<n>\n<ts>") so a captured poll sig
can't be replayed as a stream sig and vice versa.
• 15s SSE keep-alive ping so Cloudflare/NAT/load balancers don't
drop idle connections.
• 3 new stream-auth unit tests, including the cross-endpoint replay
rejection. 19 unit + 20 integration tests all green.
• New deps: tokio-stream (sync feature for BroadcastStream),
futures (for the Stream trait the Sse handler returns).
Browser (kez-chat/web):
• streamInbox() in lib/messages.ts: long-lived EventSource,
auto-reconnects on error with fresh auth (tears down on `error`,
re-opens after 3s — EventSource's native retry uses the stale URL).
Exposes onMessage + onStatus callbacks.
• Messages.svelte: opens SSE on mount, decrypts pushed envelopes
inline via the new shared ingest() helper. Polling dropped from
5s → 30s heartbeat.
• Sidebar footer shows live status:
● live (green)
● reconnecting… (amber)
○ connecting… (gray)
Verified live: /v1/inbox/<registered>/stream?auth=bad returns 401,
no-auth returns 400. Asset index-C1ogRtUG.js serving.
Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
This commit is contained in:
parent
6c0f5e2fd5
commit
bd8c8bf606
63
kez-chat/Cargo.lock
generated
63
kez-chat/Cargo.lock
generated
@ -470,6 +470,21 @@ dependencies = [
|
||||
"percent-encoding",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "futures"
|
||||
version = "0.3.32"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "8b147ee9d1f6d097cef9ce628cd2ee62288d963e16fb287bd9286455b241382d"
|
||||
dependencies = [
|
||||
"futures-channel",
|
||||
"futures-core",
|
||||
"futures-executor",
|
||||
"futures-io",
|
||||
"futures-sink",
|
||||
"futures-task",
|
||||
"futures-util",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "futures-channel"
|
||||
version = "0.3.32"
|
||||
@ -477,6 +492,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "07bbe89c50d7a535e539b8c17bc0b49bdb77747034daa8087407d655f3f7cc1d"
|
||||
dependencies = [
|
||||
"futures-core",
|
||||
"futures-sink",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
@ -485,6 +501,34 @@ version = "0.3.32"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "7e3450815272ef58cec6d564423f6e755e25379b217b0bc688e295ba24df6b1d"
|
||||
|
||||
[[package]]
|
||||
name = "futures-executor"
|
||||
version = "0.3.32"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "baf29c38818342a3b26b5b923639e7b1f4a61fc5e76102d4b1981c6dc7a7579d"
|
||||
dependencies = [
|
||||
"futures-core",
|
||||
"futures-task",
|
||||
"futures-util",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "futures-io"
|
||||
version = "0.3.32"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "cecba35d7ad927e23624b22ad55235f2239cfa44fd10428eecbeba6d6a717718"
|
||||
|
||||
[[package]]
|
||||
name = "futures-macro"
|
||||
version = "0.3.32"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "e835b70203e41293343137df5c0664546da5745f82ec9b84d40be8336958447b"
|
||||
dependencies = [
|
||||
"proc-macro2",
|
||||
"quote",
|
||||
"syn",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "futures-sink"
|
||||
version = "0.3.32"
|
||||
@ -503,8 +547,13 @@ version = "0.3.32"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "389ca41296e6190b48053de0321d02a77f32f8a5d2461dd38762c0593805c6d6"
|
||||
dependencies = [
|
||||
"futures-channel",
|
||||
"futures-core",
|
||||
"futures-io",
|
||||
"futures-macro",
|
||||
"futures-sink",
|
||||
"futures-task",
|
||||
"memchr",
|
||||
"pin-project-lite",
|
||||
"slab",
|
||||
]
|
||||
@ -908,6 +957,7 @@ dependencies = [
|
||||
"axum",
|
||||
"chrono",
|
||||
"clap",
|
||||
"futures",
|
||||
"hex",
|
||||
"kez-core",
|
||||
"reqwest",
|
||||
@ -918,6 +968,7 @@ dependencies = [
|
||||
"tempfile",
|
||||
"thiserror",
|
||||
"tokio",
|
||||
"tokio-stream",
|
||||
"tower-http",
|
||||
"tracing",
|
||||
"tracing-subscriber",
|
||||
@ -1769,6 +1820,18 @@ dependencies = [
|
||||
"tokio",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "tokio-stream"
|
||||
version = "0.1.18"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "32da49809aab5c3bc678af03902d4ccddea2a87d028d86392a4b1560c6906c70"
|
||||
dependencies = [
|
||||
"futures-core",
|
||||
"pin-project-lite",
|
||||
"tokio",
|
||||
"tokio-util",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "tokio-util"
|
||||
version = "0.7.18"
|
||||
|
||||
@ -17,6 +17,8 @@ serde = { version = "1", features = ["derive"] }
|
||||
serde_json = "1"
|
||||
thiserror = "2"
|
||||
tokio = { version = "1.48", features = ["macros", "rt-multi-thread", "sync", "signal"] }
|
||||
tokio-stream = { version = "0.1", features = ["sync"] }
|
||||
futures = "0.3"
|
||||
tower-http = { version = "0.6", features = ["trace", "cors", "fs"] }
|
||||
tracing = "0.1"
|
||||
tracing-subscriber = { version = "0.3", features = ["env-filter"] }
|
||||
|
||||
@ -27,6 +27,7 @@ use crate::store::{HandleRecord, Store};
|
||||
pub struct AppState {
|
||||
pub store: Store,
|
||||
pub config: Config,
|
||||
pub broker: crate::broker::Broker,
|
||||
}
|
||||
|
||||
pub fn router(state: AppState) -> axum::Router {
|
||||
@ -42,6 +43,7 @@ pub fn router(state: AppState) -> axum::Router {
|
||||
.route("/v1/register", post(register))
|
||||
.route("/v1/messages", post(crate::messages::send_message))
|
||||
.route("/v1/inbox/:handle", get(crate::messages::inbox))
|
||||
.route("/v1/inbox/:handle/stream", get(crate::messages::stream_inbox))
|
||||
.route("/.well-known/webfinger", get(webfinger))
|
||||
.route("/internal/nats/auth", post(nats_auth_callout));
|
||||
|
||||
|
||||
127
kez-chat/src/broker.rs
Normal file
127
kez-chat/src/broker.rs
Normal file
@ -0,0 +1,127 @@
|
||||
//! In-process pub/sub for "a new envelope just landed in <handle>'s
|
||||
//! inbox". Used by the SSE endpoint to push messages to subscribed
|
||||
//! clients without waiting for the next poll tick.
|
||||
//!
|
||||
//! Implementation: one `tokio::sync::broadcast` channel per recipient.
|
||||
//! A subscriber gets a Receiver; when POST /v1/messages stores an
|
||||
//! envelope, the handler calls `publish(recipient, msg)` which sends
|
||||
//! into the per-recipient sender (creating it if absent).
|
||||
//!
|
||||
//! Bounds: this is single-instance. With N chat-servers we'd need to
|
||||
//! gossip publishes — that's what NATS in docker-compose is for, and
|
||||
//! the v0.2 migration is "swap out `Broker` for `nats.publish/subscribe`
|
||||
//! against `kez.chat.inbox.<handle>`". Subscribers don't notice.
|
||||
//!
|
||||
//! Channel capacity: 64 buffered messages per recipient. If a
|
||||
//! subscriber lags past that, they get `RecvError::Lagged(n)` — they
|
||||
//! reconnect and resync via the polling fallback, which the browser
|
||||
//! does anyway.
|
||||
|
||||
use std::collections::HashMap;
|
||||
use std::sync::Arc;
|
||||
|
||||
use serde::{Deserialize, Serialize};
|
||||
use tokio::sync::{Mutex, broadcast};
|
||||
|
||||
/// One push event over the SSE stream. Payload is the same shape as
|
||||
/// what GET /v1/inbox returns per message — re-using the type keeps
|
||||
/// the SPA's decode path identical for both transports.
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
pub struct PushedMessage {
|
||||
pub seq: i64,
|
||||
pub envelope: serde_json::Value,
|
||||
pub created_at: String,
|
||||
}
|
||||
|
||||
/// Capacity of each per-recipient broadcast channel. Tradeoff: too low
|
||||
/// and active conversations drop messages on bursty senders; too high
|
||||
/// and idle subscribers pin memory. 64 covers worst-case interactive
|
||||
/// chat; the polling fallback catches anything that overflows.
|
||||
const CHANNEL_CAPACITY: usize = 64;
|
||||
|
||||
#[derive(Clone, Default)]
|
||||
pub struct Broker {
|
||||
inner: Arc<Mutex<HashMap<String, broadcast::Sender<PushedMessage>>>>,
|
||||
}
|
||||
|
||||
impl Broker {
|
||||
pub fn new() -> Self {
|
||||
Self::default()
|
||||
}
|
||||
|
||||
/// Subscribe to push events for `handle`. Returns a Receiver that
|
||||
/// yields `PushedMessage`s until dropped or the channel lags.
|
||||
/// Creates the per-recipient sender on first subscribe.
|
||||
pub async fn subscribe(&self, handle: &str) -> broadcast::Receiver<PushedMessage> {
|
||||
let mut map = self.inner.lock().await;
|
||||
let tx = map
|
||||
.entry(handle.to_owned())
|
||||
.or_insert_with(|| broadcast::channel(CHANNEL_CAPACITY).0);
|
||||
tx.subscribe()
|
||||
}
|
||||
|
||||
/// Publish `msg` to any live subscribers for `handle`. Silently
|
||||
/// drops if no one is subscribed (the message is already persisted;
|
||||
/// recipients pick it up on next poll/reconnect).
|
||||
pub async fn publish(&self, handle: &str, msg: PushedMessage) {
|
||||
let map = self.inner.lock().await;
|
||||
if let Some(tx) = map.get(handle) {
|
||||
// `send` returns Err if there are zero receivers — fine,
|
||||
// means we raced a disconnect. Caller doesn't care.
|
||||
let _ = tx.send(msg);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use serde_json::json;
|
||||
|
||||
fn fake_msg(seq: i64) -> PushedMessage {
|
||||
PushedMessage {
|
||||
seq,
|
||||
envelope: json!({"v": 1, "ciphertext": format!("hi {seq}")}),
|
||||
created_at: "2026-01-01T00:00:00Z".into(),
|
||||
}
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn subscribe_then_publish_delivers() {
|
||||
let broker = Broker::new();
|
||||
let mut rx = broker.subscribe("alice").await;
|
||||
broker.publish("alice", fake_msg(1)).await;
|
||||
let got = rx.recv().await.unwrap();
|
||||
assert_eq!(got.seq, 1);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn publish_without_subscribers_is_noop() {
|
||||
let broker = Broker::new();
|
||||
// Nobody listening — must not panic, must not allocate a channel.
|
||||
broker.publish("alice", fake_msg(1)).await;
|
||||
assert!(broker.inner.lock().await.is_empty());
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn multiple_subscribers_each_get_a_copy() {
|
||||
let broker = Broker::new();
|
||||
let mut rx1 = broker.subscribe("alice").await;
|
||||
let mut rx2 = broker.subscribe("alice").await;
|
||||
broker.publish("alice", fake_msg(42)).await;
|
||||
assert_eq!(rx1.recv().await.unwrap().seq, 42);
|
||||
assert_eq!(rx2.recv().await.unwrap().seq, 42);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn channels_are_per_handle() {
|
||||
let broker = Broker::new();
|
||||
let mut alice_rx = broker.subscribe("alice").await;
|
||||
let _bob_rx = broker.subscribe("bob").await;
|
||||
broker.publish("alice", fake_msg(1)).await;
|
||||
// alice's subscriber got it; bob's didn't (would block).
|
||||
assert_eq!(alice_rx.recv().await.unwrap().seq, 1);
|
||||
// sanity: bob would have to time out, so skip that assertion
|
||||
// (we'd rather not slow the test suite for it).
|
||||
}
|
||||
}
|
||||
@ -2,6 +2,7 @@
|
||||
//! binary serves.
|
||||
|
||||
pub mod api;
|
||||
pub mod broker;
|
||||
pub mod config;
|
||||
pub mod error;
|
||||
pub mod handles;
|
||||
|
||||
@ -26,7 +26,11 @@ async fn main() -> Result<()> {
|
||||
);
|
||||
|
||||
let store = Store::open(&config.db)?;
|
||||
let state = AppState { store, config: config.clone() };
|
||||
let state = AppState {
|
||||
store,
|
||||
config: config.clone(),
|
||||
broker: kez_chat_server::broker::Broker::new(),
|
||||
};
|
||||
|
||||
let app = router(state)
|
||||
.layer(TraceLayer::new_for_http())
|
||||
|
||||
@ -19,15 +19,24 @@
|
||||
//! anyone's mailbox. Mitigations land in v0.2 (rate-limit by source IP +
|
||||
//! optional sender-handle proof).
|
||||
|
||||
use std::convert::Infallible;
|
||||
use std::time::Duration;
|
||||
|
||||
use axum::Json;
|
||||
use axum::extract::{Path, Query, State};
|
||||
use axum::http::HeaderMap;
|
||||
use axum::response::Sse;
|
||||
use axum::response::sse::{Event, KeepAlive};
|
||||
use chrono::Utc;
|
||||
use futures::stream::Stream;
|
||||
use kez_core::verify_ed25519_hex;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use serde_json::Value;
|
||||
use tokio_stream::StreamExt;
|
||||
use tokio_stream::wrappers::BroadcastStream;
|
||||
|
||||
use crate::api::AppState;
|
||||
use crate::broker::PushedMessage;
|
||||
use crate::error::ApiError;
|
||||
use crate::handles::validate_handle;
|
||||
|
||||
@ -88,6 +97,22 @@ pub async fn send_message(
|
||||
.store
|
||||
.store_message(&recipient.handle, &envelope_str)
|
||||
.await?;
|
||||
|
||||
// Push to any live SSE subscribers for this recipient — instant
|
||||
// delivery without waiting for the next poll. The message is
|
||||
// already persisted; if no one's listening, this is a no-op.
|
||||
state
|
||||
.broker
|
||||
.publish(
|
||||
&recipient.handle,
|
||||
PushedMessage {
|
||||
seq,
|
||||
envelope: req.envelope,
|
||||
created_at: Utc::now().to_rfc3339(),
|
||||
},
|
||||
)
|
||||
.await;
|
||||
|
||||
Ok(Json(SendMessageResponse { seq }))
|
||||
}
|
||||
|
||||
@ -200,6 +225,100 @@ pub fn canonical_inbox_message(handle: &str, since: i64, ts: i64) -> String {
|
||||
format!("GET\n/v1/inbox/{handle}\nsince={since}\n{ts}")
|
||||
}
|
||||
|
||||
// ─────────────────────────────────────────────────────────────────────────────
|
||||
// GET /v1/inbox/:handle/stream — SSE push channel
|
||||
// ─────────────────────────────────────────────────────────────────────────────
|
||||
//
|
||||
// EventSource can't set headers, so this endpoint accepts the auth as a
|
||||
// query param: `?auth=<unix_ts>:<sig_hex>`. The signed message is the
|
||||
// stream URL line (handle baked in via path, `since` not relevant here).
|
||||
// Same 60s clock-skew tolerance — log leakage past that window is moot.
|
||||
|
||||
#[derive(Debug, Deserialize)]
|
||||
pub struct StreamQuery {
|
||||
pub auth: String,
|
||||
}
|
||||
|
||||
pub async fn stream_inbox(
|
||||
State(state): State<AppState>,
|
||||
Path(handle): Path<String>,
|
||||
Query(q): Query<StreamQuery>,
|
||||
) -> Result<Sse<impl Stream<Item = Result<Event, Infallible>>>, ApiError> {
|
||||
validate_handle(&handle)
|
||||
.map_err(|e| ApiError::BadRequest(format!("invalid handle: {e}")))?;
|
||||
|
||||
let record = state
|
||||
.store
|
||||
.lookup(&handle)
|
||||
.await?
|
||||
.ok_or(ApiError::NotFound)?;
|
||||
|
||||
// We reuse the inbox signing scheme but with `since = -1` (a value
|
||||
// the polling path never produces) so a stream sig can't be replayed
|
||||
// as a polling header and vice versa. Encoded in the canonical msg.
|
||||
verify_stream_auth(&q.auth, &handle, record.primary.value(), Utc::now().timestamp())?;
|
||||
|
||||
let rx = state.broker.subscribe(&handle).await;
|
||||
let stream = BroadcastStream::new(rx)
|
||||
// Drop lag errors — subscriber should reconnect + resync via poll.
|
||||
.filter_map(|res| res.ok())
|
||||
.map(|msg| {
|
||||
// Serialize the PushedMessage as JSON inside the SSE `data:`.
|
||||
// `Event::default().json_data` serializes for us and base64-
|
||||
// safes long lines correctly.
|
||||
let event = Event::default()
|
||||
.event("message")
|
||||
.json_data(msg)
|
||||
.unwrap_or_else(|e| {
|
||||
Event::default()
|
||||
.event("error")
|
||||
.data(format!("serialize failed: {e}"))
|
||||
});
|
||||
Ok::<_, Infallible>(event)
|
||||
});
|
||||
|
||||
// Keep-alive pings every 15s so intermediaries (Cloudflare, NAT
|
||||
// boxes) don't close the idle connection.
|
||||
let sse = Sse::new(stream).keep_alive(
|
||||
KeepAlive::new()
|
||||
.interval(Duration::from_secs(15))
|
||||
.text("ping"),
|
||||
);
|
||||
Ok(sse)
|
||||
}
|
||||
|
||||
/// Canonical message that the stream auth signs over. Distinct from the
|
||||
/// inbox-poll message (different first line) so a sig captured from one
|
||||
/// endpoint can't be replayed against the other.
|
||||
pub fn canonical_stream_message(handle: &str, ts: i64) -> String {
|
||||
format!("GET\n/v1/inbox/{handle}/stream\n{ts}")
|
||||
}
|
||||
|
||||
fn verify_stream_auth(
|
||||
auth: &str,
|
||||
handle: &str,
|
||||
pubkey_hex: &str,
|
||||
now_ts: i64,
|
||||
) -> Result<(), ApiError> {
|
||||
let (ts_str, sig_hex) = auth
|
||||
.split_once(':')
|
||||
.ok_or_else(|| ApiError::Unauthorized("?auth must be <ts>:<sig>".into()))?;
|
||||
let ts: i64 = ts_str
|
||||
.parse()
|
||||
.map_err(|_| ApiError::Unauthorized("?auth ts must be a unix timestamp".into()))?;
|
||||
if (now_ts - ts).abs() > AUTH_MAX_AGE_SECS {
|
||||
return Err(ApiError::Unauthorized(format!(
|
||||
"auth is {}s stale (max {}s)",
|
||||
(now_ts - ts).abs(),
|
||||
AUTH_MAX_AGE_SECS
|
||||
)));
|
||||
}
|
||||
let message = canonical_stream_message(handle, ts);
|
||||
verify_ed25519_hex(pubkey_hex, message.as_bytes(), sig_hex)
|
||||
.map_err(|_| ApiError::Unauthorized("signature did not verify".into()))?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
@ -271,4 +390,49 @@ mod tests {
|
||||
let err = verify_inbox_auth("not-a-header", "tudisco", 0, &pk, 1).unwrap_err();
|
||||
assert!(matches!(err, ApiError::Unauthorized(_)));
|
||||
}
|
||||
|
||||
// ─── stream auth ─────────────────────────────────────────────────────────
|
||||
|
||||
fn stream_header_for(sk: &Ed25519Secret, handle: &str, ts: i64) -> String {
|
||||
let msg = canonical_stream_message(handle, ts);
|
||||
let sig = sk.sign(msg.as_bytes());
|
||||
format!("{ts}:{}", hex::encode(sig))
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn canonical_stream_message_is_distinct_from_inbox_message() {
|
||||
// Critical: an attacker who captures an inbox auth header must
|
||||
// NOT be able to use it as a stream auth (and vice versa).
|
||||
let inbox = canonical_inbox_message("alice", 0, 1_700_000_000);
|
||||
let stream = canonical_stream_message("alice", 1_700_000_000);
|
||||
assert_ne!(inbox, stream);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn verify_stream_auth_accepts_fresh_valid_signature() {
|
||||
let (sk, pk) = signer_for_test();
|
||||
let now = 1_700_000_000;
|
||||
let auth = stream_header_for(&sk, "tudisco", now);
|
||||
verify_stream_auth(&auth, "tudisco", &pk, now).unwrap();
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn verify_stream_auth_rejects_inbox_signature_replay() {
|
||||
// Sign for the inbox endpoint, try to use against the stream.
|
||||
let (sk, pk) = signer_for_test();
|
||||
let now = 1_700_000_000;
|
||||
let inbox_header = header_for(&sk, "tudisco", 0, now);
|
||||
let err = verify_stream_auth(&inbox_header, "tudisco", &pk, now).unwrap_err();
|
||||
assert!(matches!(err, ApiError::Unauthorized(_)));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn verify_stream_auth_rejects_stale_timestamp() {
|
||||
let (sk, pk) = signer_for_test();
|
||||
let signed_at = 1_700_000_000;
|
||||
let now = signed_at + 120;
|
||||
let auth = stream_header_for(&sk, "tudisco", signed_at);
|
||||
let err = verify_stream_auth(&auth, "tudisco", &pk, now).unwrap_err();
|
||||
assert!(matches!(err, ApiError::Unauthorized(_)));
|
||||
}
|
||||
}
|
||||
|
||||
@ -39,7 +39,11 @@ fn default_config() -> Config {
|
||||
|
||||
async fn spawn_server_with_config(config: Config) -> TestServer {
|
||||
let store = Store::open_in_memory().unwrap();
|
||||
let state = AppState { store, config };
|
||||
let state = AppState {
|
||||
store,
|
||||
config,
|
||||
broker: kez_chat_server::broker::Broker::new(),
|
||||
};
|
||||
let app = router(state);
|
||||
let listener = tokio::net::TcpListener::bind(SocketAddr::from(([127, 0, 0, 1], 0)))
|
||||
.await
|
||||
|
||||
@ -25,6 +25,12 @@ function canonicalInboxMessage(handle: string, since: number, ts: number): strin
|
||||
return `GET\n/v1/inbox/${handle}\nsince=${since}\n${ts}`;
|
||||
}
|
||||
|
||||
/** Canonical bytes the SSE stream auth signs over. Distinct from the poll
|
||||
* message so an inbox sig can't be replayed as a stream sig and vice versa. */
|
||||
function canonicalStreamMessage(handle: string, ts: number): string {
|
||||
return `GET\n/v1/inbox/${handle}/stream\n${ts}`;
|
||||
}
|
||||
|
||||
/**
|
||||
* Build the X-KEZ-Auth header value. The server's 60s clock-skew window
|
||||
* means we can sign each request just-in-time; no token cache.
|
||||
@ -40,6 +46,17 @@ function authHeader(opts: {
|
||||
return `${ts}:${bytesToHex(sig)}`;
|
||||
}
|
||||
|
||||
/** EventSource can't set headers, so the stream takes auth as a query param. */
|
||||
function streamAuthQueryParam(opts: {
|
||||
handle: string;
|
||||
seed: Uint8Array;
|
||||
}): string {
|
||||
const ts = Math.floor(Date.now() / 1000);
|
||||
const msg = canonicalStreamMessage(opts.handle, ts);
|
||||
const sig = ed25519.sign(new TextEncoder().encode(msg), opts.seed);
|
||||
return `${ts}:${bytesToHex(sig)}`;
|
||||
}
|
||||
|
||||
// ─────────────────────────────────────────────────────────────────────────────
|
||||
// Send
|
||||
// ─────────────────────────────────────────────────────────────────────────────
|
||||
@ -123,4 +140,77 @@ export async function decrypt(
|
||||
return openMessage({ envelope, myHandle, mySeed });
|
||||
}
|
||||
|
||||
// ─────────────────────────────────────────────────────────────────────────────
|
||||
// Server-Sent Events push (real-time new-message stream)
|
||||
// ─────────────────────────────────────────────────────────────────────────────
|
||||
|
||||
export interface StreamHandle {
|
||||
/** Close the connection — call from onDestroy. */
|
||||
close(): void;
|
||||
/** Live status flag the UI can use to show "live" vs "polling fallback". */
|
||||
readonly readyState: number;
|
||||
}
|
||||
|
||||
/**
|
||||
* Open a long-lived SSE connection to /v1/inbox/<handle>/stream. The
|
||||
* server pushes a `message` event each time a new envelope lands in
|
||||
* this handle's mailbox; we call `onMessage` for each.
|
||||
*
|
||||
* Auto-reconnects: EventSource does this natively, but if the auth
|
||||
* timestamp ages out (>60s since connection drop, then reconnect) the
|
||||
* server will 401 the reconnect. We mint a fresh `?auth=` on every
|
||||
* open by tearing down + reopening on `error`. The caller's onMessage
|
||||
* handler is preserved across reconnects.
|
||||
*/
|
||||
export function streamInbox(opts: {
|
||||
handle: string;
|
||||
seed: Uint8Array;
|
||||
onMessage: (msg: InboxMessage) => void;
|
||||
onStatus?: (status: "connecting" | "live" | "reconnecting") => void;
|
||||
}): StreamHandle {
|
||||
let es: EventSource | null = null;
|
||||
let closed = false;
|
||||
let reconnectTimer: ReturnType<typeof setTimeout> | null = null;
|
||||
|
||||
function open() {
|
||||
if (closed) return;
|
||||
opts.onStatus?.(es ? "reconnecting" : "connecting");
|
||||
const auth = streamAuthQueryParam({ handle: opts.handle, seed: opts.seed });
|
||||
const url = `${base()}/v1/inbox/${opts.handle}/stream?auth=${encodeURIComponent(auth)}`;
|
||||
es = new EventSource(url);
|
||||
es.addEventListener("open", () => opts.onStatus?.("live"));
|
||||
es.addEventListener("message", (ev) => {
|
||||
try {
|
||||
const msg = JSON.parse(ev.data) as InboxMessage;
|
||||
opts.onMessage(msg);
|
||||
} catch (e) {
|
||||
console.error("SSE: bad message payload", e);
|
||||
}
|
||||
});
|
||||
es.addEventListener("error", () => {
|
||||
// Tear down and re-open with a fresh auth ts after a short delay
|
||||
// (avoid hot-loop if the server is rejecting). EventSource also
|
||||
// auto-reconnects but with the same (now possibly stale) URL.
|
||||
if (closed) return;
|
||||
es?.close();
|
||||
es = null;
|
||||
if (reconnectTimer) clearTimeout(reconnectTimer);
|
||||
reconnectTimer = setTimeout(open, 3_000);
|
||||
});
|
||||
}
|
||||
|
||||
open();
|
||||
|
||||
return {
|
||||
close() {
|
||||
closed = true;
|
||||
if (reconnectTimer) clearTimeout(reconnectTimer);
|
||||
es?.close();
|
||||
},
|
||||
get readyState() {
|
||||
return es?.readyState ?? EventSource.CLOSED;
|
||||
},
|
||||
};
|
||||
}
|
||||
|
||||
export type { SealedEnvelope, MessagePlaintext };
|
||||
|
||||
@ -2,7 +2,14 @@
|
||||
import { onMount, onDestroy } from "svelte";
|
||||
import { push } from "svelte-spa-router";
|
||||
import { session } from "../lib/store.svelte.js";
|
||||
import { decrypt, pollInbox, sendMessage } from "../lib/messages.js";
|
||||
import {
|
||||
decrypt,
|
||||
pollInbox,
|
||||
sendMessage,
|
||||
streamInbox,
|
||||
type InboxMessage,
|
||||
type StreamHandle,
|
||||
} from "../lib/messages.js";
|
||||
import { lookup, lookupByPrimary, ApiError } from "../lib/api.js";
|
||||
import {
|
||||
appendInbound,
|
||||
@ -27,6 +34,8 @@
|
||||
let pollError = $state<string | null>(null);
|
||||
let lastPolledAt = $state<string | null>(null);
|
||||
let pollTimer: ReturnType<typeof setInterval> | null = null;
|
||||
let streamHandle: StreamHandle | null = null;
|
||||
let streamStatus = $state<"connecting" | "live" | "reconnecting">("connecting");
|
||||
|
||||
// "Start chat with" lookup state.
|
||||
let newPeerInput = $state("");
|
||||
@ -36,7 +45,9 @@
|
||||
// Toast for the share-link copy action.
|
||||
let copied = $state(false);
|
||||
|
||||
const POLL_INTERVAL_MS = 5_000;
|
||||
// Background heartbeat — SSE handles real-time delivery; this just
|
||||
// catches anything missed during a reconnect window.
|
||||
const POLL_INTERVAL_MS = 30_000;
|
||||
|
||||
onMount(async () => {
|
||||
if (!session.unlocked) {
|
||||
@ -44,14 +55,65 @@
|
||||
return;
|
||||
}
|
||||
await refresh();
|
||||
await pollOnce(); // first tick immediately so the UI isn't blank
|
||||
await pollOnce(); // catch-up before SSE goes live
|
||||
|
||||
// Real-time push via SSE — server fires per-message as soon as
|
||||
// POST /v1/messages hands off the row to the broker. Sub-second
|
||||
// latency in the common case.
|
||||
streamHandle = streamInbox({
|
||||
handle: session.unlocked.handle,
|
||||
seed: session.unlocked.seed,
|
||||
onMessage: handlePushedMessage,
|
||||
onStatus: (s) => (streamStatus = s),
|
||||
});
|
||||
|
||||
// Background heartbeat in case SSE drops + reconnects between events.
|
||||
pollTimer = setInterval(pollOnce, POLL_INTERVAL_MS);
|
||||
});
|
||||
|
||||
onDestroy(() => {
|
||||
if (pollTimer) clearInterval(pollTimer);
|
||||
streamHandle?.close();
|
||||
});
|
||||
|
||||
/** Decrypt + cache one incoming envelope from SSE or polling. */
|
||||
async function ingest(m: InboxMessage) {
|
||||
if (!session.unlocked) return;
|
||||
try {
|
||||
const pt = await decrypt(
|
||||
m.envelope,
|
||||
session.unlocked.handle,
|
||||
session.unlocked.seed,
|
||||
);
|
||||
let handle = "";
|
||||
const existing = await getConversation(pt.from);
|
||||
if (existing?.peer_handle) {
|
||||
handle = existing.peer_handle;
|
||||
} else {
|
||||
try {
|
||||
const record = await lookupByPrimary(pt.from);
|
||||
handle = record.fqhn;
|
||||
} catch {
|
||||
// unknown to this server — leave blank, UI will show short key
|
||||
}
|
||||
}
|
||||
await appendInbound({
|
||||
peer_primary: pt.from,
|
||||
peer_handle: handle,
|
||||
seq: m.seq,
|
||||
body: pt.body,
|
||||
ts: pt.sent_at,
|
||||
});
|
||||
} catch (e) {
|
||||
console.error(`seq ${m.seq}: decrypt failed`, e);
|
||||
}
|
||||
}
|
||||
|
||||
async function handlePushedMessage(m: InboxMessage) {
|
||||
await ingest(m);
|
||||
await refresh();
|
||||
}
|
||||
|
||||
async function refresh() {
|
||||
conversations = await listConversations();
|
||||
}
|
||||
@ -65,37 +127,7 @@
|
||||
seed: session.unlocked.seed,
|
||||
since,
|
||||
});
|
||||
for (const m of messages) {
|
||||
try {
|
||||
const pt = await decrypt(
|
||||
m.envelope,
|
||||
session.unlocked.handle,
|
||||
session.unlocked.seed,
|
||||
);
|
||||
// Resolve the sender's handle for display. Cache miss → ask server.
|
||||
let handle = "";
|
||||
const existing = await getConversation(pt.from);
|
||||
if (existing?.peer_handle) {
|
||||
handle = existing.peer_handle;
|
||||
} else {
|
||||
try {
|
||||
const record = await lookupByPrimary(pt.from);
|
||||
handle = record.fqhn;
|
||||
} catch {
|
||||
// Unknown to this server (cross-server v0.2). Show truncated key.
|
||||
}
|
||||
}
|
||||
await appendInbound({
|
||||
peer_primary: pt.from,
|
||||
peer_handle: handle,
|
||||
seq: m.seq,
|
||||
body: pt.body,
|
||||
ts: pt.sent_at,
|
||||
});
|
||||
} catch (e) {
|
||||
console.error(`seq ${m.seq}: decrypt failed`, e);
|
||||
}
|
||||
}
|
||||
for (const m of messages) await ingest(m);
|
||||
if (messages.length > 0) await refresh();
|
||||
pollError = null;
|
||||
lastPolledAt = new Date().toISOString();
|
||||
@ -291,11 +323,18 @@
|
||||
</div>
|
||||
|
||||
<!-- Footer status -->
|
||||
<div class="p-2 border-t border-gray-200 text-xs text-gray-500">
|
||||
<div class="p-2 border-t border-gray-200 text-xs text-gray-500 space-y-0.5">
|
||||
<p>
|
||||
{#if streamStatus === "live"}
|
||||
<span class="text-green-700">● live</span>
|
||||
{:else if streamStatus === "reconnecting"}
|
||||
<span class="text-amber-700">● reconnecting…</span>
|
||||
{:else}
|
||||
<span class="text-gray-500">○ connecting…</span>
|
||||
{/if}
|
||||
</p>
|
||||
{#if pollError}
|
||||
<p class="text-red-700">⚠ {pollError}</p>
|
||||
{:else if lastPolledAt}
|
||||
Polled {formatTime(lastPolledAt)}
|
||||
{/if}
|
||||
</div>
|
||||
</aside>
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user