Jason Tudisco bd8c8bf606 feat(kez-chat): real-time messages via SSE — sub-second delivery
Chat was polling every 5s, which felt sluggish with two users online.
Switched to Server-Sent Events for push delivery. Polling now runs as
a 30s heartbeat just to catch anything missed during reconnect windows.

NATS is still bundled in docker-compose but no Rust code talks to it
yet — that lands in v0.2 for cross-instance fanout. The migration is
"swap the in-process broker for nats.publish/subscribe against
kez.chat.inbox.<handle>"; SSE subscribers don't notice.

Server (kez-chat-server):
  • New broker module: per-recipient tokio::sync::broadcast channels,
    in-process pub/sub. 64-slot buffer per channel; lagging subscribers
    drop on the floor and resync via the polling heartbeat. 4 unit
    tests cover subscribe/publish, multi-subscriber fanout, per-handle
    isolation, no-op on no-subscribers.
  • POST /v1/messages now publishes to broker after persisting → any
    open SSE stream for the recipient gets the envelope immediately.
  • New GET /v1/inbox/:handle/stream — SSE endpoint, ?auth=<ts>:<sig>
    query param (EventSource can't set headers). Signed message is
    distinct from the polling header ("GET\n/v1/inbox/<h>/stream\n<ts>"
    vs "GET\n/v1/inbox/<h>\nsince=<n>\n<ts>") so a captured poll sig
    can't be replayed as a stream sig and vice versa.
  • 15s SSE keep-alive ping so Cloudflare/NAT/load balancers don't
    drop idle connections.
  • 3 new stream-auth unit tests, including the cross-endpoint replay
    rejection. 19 unit + 20 integration tests all green.
  • New deps: tokio-stream (sync feature for BroadcastStream),
    futures (for the Stream trait the Sse handler returns).

Browser (kez-chat/web):
  • streamInbox() in lib/messages.ts: long-lived EventSource,
    auto-reconnects on error with fresh auth (tears down on `error`,
    re-opens after 3s — EventSource's native retry uses the stale URL).
    Exposes onMessage + onStatus callbacks.
  • Messages.svelte: opens SSE on mount, decrypts pushed envelopes
    inline via the new shared ingest() helper. Polling dropped from
    5s → 30s heartbeat.
  • Sidebar footer shows live status:
        ● live          (green)
        ● reconnecting… (amber)
        ○ connecting…   (gray)

Verified live: /v1/inbox/<registered>/stream?auth=bad returns 401,
no-auth returns 400. Asset index-C1ogRtUG.js serving.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-05-26 23:06:17 -06:00

470 lines
15 KiB
Rust

//! Integration tests: stand up the real router on a random local port,
//! drive it with `reqwest`. No mocks — exercises the full HTTP + SQLite +
//! kez-core signature path.
use std::net::SocketAddr;
use std::path::PathBuf;
use chrono::{DateTime, Utc};
use kez_chat_server::{AppState, Config, Store, router};
use kez_chat_server::registration::{
ENVELOPE_TAG, FORMAT_VERSION, REGISTRATION_TYPE, RegistrationPayload, SignedRegistration,
};
use kez_core::{
Ed25519Secret, Identity, SignatureBlock, ED25519_SHA512_ALG, canonical_bytes,
};
use reqwest::StatusCode;
use serde_json::Value;
use sha2::{Digest, Sha256};
struct TestServer {
base: String,
#[allow(dead_code)]
handle: tokio::task::JoinHandle<()>,
}
async fn spawn_server() -> TestServer {
spawn_server_with_config(default_config()).await
}
fn default_config() -> Config {
Config {
bind: SocketAddr::from(([127, 0, 0, 1], 0)),
db: PathBuf::from(":memory:"), // unused (we open in-memory below)
server: "kez.test".to_owned(),
sig_server_url: "http://sig.test".to_owned(),
web_dir: None,
}
}
async fn spawn_server_with_config(config: Config) -> TestServer {
let store = Store::open_in_memory().unwrap();
let state = AppState {
store,
config,
broker: kez_chat_server::broker::Broker::new(),
};
let app = router(state);
let listener = tokio::net::TcpListener::bind(SocketAddr::from(([127, 0, 0, 1], 0)))
.await
.unwrap();
let addr = listener.local_addr().unwrap();
let handle = tokio::spawn(async move {
axum::serve(listener, app).await.unwrap();
});
TestServer {
base: format!("http://{addr}"),
handle,
}
}
fn sign_registration(
secret: &Ed25519Secret,
handle: &str,
server: &str,
created_at: DateTime<Utc>,
) -> SignedRegistration {
let primary = secret.identity().unwrap();
let payload = RegistrationPayload {
kind: REGISTRATION_TYPE.to_owned(),
version: FORMAT_VERSION,
handle: handle.to_owned(),
primary: primary.clone(),
server: server.to_owned(),
created_at,
};
let jcs = canonical_bytes(&payload).unwrap();
let sig = secret.sign(&jcs);
SignedRegistration {
kez: ENVELOPE_TAG.to_owned(),
payload,
signature: SignatureBlock {
alg: ED25519_SHA512_ALG.to_owned(),
key: primary,
sig: hex::encode(sig),
},
}
}
#[tokio::test]
async fn healthz_returns_ok() {
let server = spawn_server().await;
let resp = reqwest::get(format!("{}/v1/healthz", server.base))
.await
.unwrap();
assert_eq!(resp.status(), StatusCode::OK);
let body: Value = resp.json().await.unwrap();
assert_eq!(body["status"], "ok");
assert_eq!(body["server"], "kez.test");
}
#[tokio::test]
async fn by_primary_round_trips() {
let server = spawn_server().await;
let secret = register_user(&server.base, "tudisco").await;
let primary = secret.identity().unwrap().to_string();
let resp = reqwest::get(format!("{}/v1/by-primary/{primary}", server.base))
.await
.unwrap();
assert_eq!(resp.status(), StatusCode::OK);
let body: Value = resp.json().await.unwrap();
assert_eq!(body["handle"], "tudisco");
assert_eq!(body["primary"], primary);
}
#[tokio::test]
async fn by_primary_unknown_404() {
let server = spawn_server().await;
let unregistered = Ed25519Secret::generate().identity().unwrap().to_string();
let resp = reqwest::get(format!("{}/v1/by-primary/{unregistered}", server.base))
.await
.unwrap();
assert_eq!(resp.status(), StatusCode::NOT_FOUND);
}
#[tokio::test]
async fn by_primary_garbage_400() {
let server = spawn_server().await;
let resp = reqwest::get(format!("{}/v1/by-primary/not-a-primary", server.base))
.await
.unwrap();
assert_eq!(resp.status(), StatusCode::BAD_REQUEST);
}
#[tokio::test]
async fn unknown_handle_returns_404() {
let server = spawn_server().await;
let resp = reqwest::get(format!("{}/v1/u/ghost", server.base))
.await
.unwrap();
assert_eq!(resp.status(), StatusCode::NOT_FOUND);
}
#[tokio::test]
async fn register_then_lookup_round_trip() {
let server = spawn_server().await;
let secret = Ed25519Secret::generate();
let req = sign_registration(&secret, "tudisco", "kez.test", Utc::now());
let client = reqwest::Client::new();
let post = client
.post(format!("{}/v1/register", server.base))
.json(&req)
.send()
.await
.unwrap();
assert_eq!(post.status(), StatusCode::CREATED);
let posted: Value = post.json().await.unwrap();
assert_eq!(posted["handle"], "tudisco");
assert_eq!(posted["fqhn"], "tudisco@kez.test");
let get = reqwest::get(format!("{}/v1/u/tudisco", server.base))
.await
.unwrap();
assert_eq!(get.status(), StatusCode::OK);
let looked: Value = get.json().await.unwrap();
assert_eq!(looked["handle"], "tudisco");
assert_eq!(looked["primary"], secret.identity().unwrap().to_string());
}
#[tokio::test]
async fn rejects_duplicate_handle() {
let server = spawn_server().await;
let a = Ed25519Secret::generate();
let b = Ed25519Secret::generate();
let req_a = sign_registration(&a, "tudisco", "kez.test", Utc::now());
let req_b = sign_registration(&b, "tudisco", "kez.test", Utc::now());
let client = reqwest::Client::new();
let r1 = client
.post(format!("{}/v1/register", server.base))
.json(&req_a)
.send()
.await
.unwrap();
assert_eq!(r1.status(), StatusCode::CREATED);
let r2 = client
.post(format!("{}/v1/register", server.base))
.json(&req_b)
.send()
.await
.unwrap();
assert_eq!(r2.status(), StatusCode::CONFLICT);
}
#[tokio::test]
async fn rejects_wrong_server() {
let server = spawn_server().await;
let secret = Ed25519Secret::generate();
let req = sign_registration(&secret, "tudisco", "other.example", Utc::now());
let client = reqwest::Client::new();
let resp = client
.post(format!("{}/v1/register", server.base))
.json(&req)
.send()
.await
.unwrap();
assert_eq!(resp.status(), StatusCode::BAD_REQUEST);
}
#[tokio::test]
async fn rejects_reserved_handle() {
let server = spawn_server().await;
let secret = Ed25519Secret::generate();
let req = sign_registration(&secret, "admin", "kez.test", Utc::now());
let client = reqwest::Client::new();
let resp = client
.post(format!("{}/v1/register", server.base))
.json(&req)
.send()
.await
.unwrap();
assert_eq!(resp.status(), StatusCode::FORBIDDEN);
}
#[tokio::test]
async fn rejects_tampered_signature() {
let server = spawn_server().await;
let secret = Ed25519Secret::generate();
let mut req = sign_registration(&secret, "tudisco", "kez.test", Utc::now());
// Tamper: flip the handle after signing. Signature still references
// the original handle, but payload now claims a different one.
req.payload.handle = "imposter".to_owned();
let client = reqwest::Client::new();
let resp = client
.post(format!("{}/v1/register", server.base))
.json(&req)
.send()
.await
.unwrap();
assert_eq!(resp.status(), StatusCode::BAD_REQUEST);
}
#[tokio::test]
async fn rejects_stale_timestamp() {
let server = spawn_server().await;
let secret = Ed25519Secret::generate();
let stale = Utc::now() - chrono::Duration::hours(1);
let req = sign_registration(&secret, "tudisco", "kez.test", stale);
let client = reqwest::Client::new();
let resp = client
.post(format!("{}/v1/register", server.base))
.json(&req)
.send()
.await
.unwrap();
assert_eq!(resp.status(), StatusCode::BAD_REQUEST);
let body: Value = resp.json().await.unwrap();
let msg = body["error"]["message"].as_str().unwrap();
assert!(msg.contains("created_at"), "got: {msg}");
}
#[tokio::test]
async fn webfinger_finds_registered_user() {
let server = spawn_server().await;
let secret = Ed25519Secret::generate();
let req = sign_registration(&secret, "tudisco", "kez.test", Utc::now());
let client = reqwest::Client::new();
client
.post(format!("{}/v1/register", server.base))
.json(&req)
.send()
.await
.unwrap();
let url = format!(
"{}/.well-known/webfinger?resource=acct:tudisco@kez.test",
server.base
);
let resp = reqwest::get(&url).await.unwrap();
assert_eq!(resp.status(), StatusCode::OK);
let body: Value = resp.json().await.unwrap();
assert_eq!(body["subject"], "acct:tudisco@kez.test");
assert!(body["links"].is_array());
}
#[tokio::test]
async fn webfinger_rejects_wrong_server() {
let server = spawn_server().await;
let resp = reqwest::get(format!(
"{}/.well-known/webfinger?resource=acct:tudisco@other.example",
server.base
))
.await
.unwrap();
assert_eq!(resp.status(), StatusCode::NOT_FOUND);
}
#[tokio::test]
async fn placeholder_index_renders() {
let server = spawn_server().await;
let resp = reqwest::get(format!("{}/", server.base)).await.unwrap();
assert_eq!(resp.status(), StatusCode::OK);
let text = resp.text().await.unwrap();
assert!(text.contains("kez-chat"));
assert!(text.contains("kez.test"));
}
#[tokio::test]
async fn nats_auth_callout_stub_returns_not_implemented() {
let server = spawn_server().await;
let client = reqwest::Client::new();
let resp = client
.post(format!("{}/internal/nats/auth", server.base))
.json(&serde_json::json!({}))
.send()
.await
.unwrap();
assert_eq!(resp.status(), StatusCode::NOT_IMPLEMENTED);
}
// ─── messages: send + inbox round-trip ───────────────────────────────────────
/// Helper: register a handle, return (handle, secret) for use as recipient.
async fn register_user(base: &str, handle: &str) -> Ed25519Secret {
let secret = Ed25519Secret::generate();
let req = sign_registration(&secret, handle, "kez.test", Utc::now());
let client = reqwest::Client::new();
let resp = client
.post(format!("{base}/v1/register"))
.json(&req)
.send()
.await
.unwrap();
assert_eq!(resp.status(), StatusCode::CREATED, "registration failed");
secret
}
/// Helper: build the X-KEZ-Auth header value for a given recipient + cursor.
fn auth_header(secret: &Ed25519Secret, handle: &str, since: i64, ts: i64) -> String {
let msg = kez_chat_server::messages::canonical_inbox_message(handle, since, ts);
let sig = secret.sign(msg.as_bytes());
format!("{ts}:{}", hex::encode(sig))
}
#[tokio::test]
async fn send_and_inbox_round_trip() {
let server = spawn_server().await;
let alice = register_user(&server.base, "alice").await;
let _bob = register_user(&server.base, "bob").await;
let client = reqwest::Client::new();
// bob sends two opaque envelopes to alice (server doesn't introspect).
for body in ["hello alice", "this is the second one"] {
let req = serde_json::json!({
"to": "alice",
"envelope": { "v": 1, "ciphertext": body, "from": "bob" },
});
let resp = client
.post(format!("{}/v1/messages", server.base))
.json(&req)
.send()
.await
.unwrap();
assert_eq!(resp.status(), StatusCode::OK);
}
// alice polls her inbox with a fresh auth header.
let now = Utc::now().timestamp();
let resp = client
.get(format!("{}/v1/inbox/alice", server.base))
.header("X-KEZ-Auth", auth_header(&alice, "alice", 0, now))
.send()
.await
.unwrap();
assert_eq!(resp.status(), StatusCode::OK);
let body: Value = resp.json().await.unwrap();
assert_eq!(body["messages"].as_array().unwrap().len(), 2);
assert_eq!(body["messages"][0]["envelope"]["ciphertext"], "hello alice");
assert_eq!(body["cursor"], 2);
// Polling again with cursor=2 yields nothing new.
let resp = client
.get(format!("{}/v1/inbox/alice?since=2", server.base))
.header("X-KEZ-Auth", auth_header(&alice, "alice", 2, now))
.send()
.await
.unwrap();
let body: Value = resp.json().await.unwrap();
assert_eq!(body["messages"].as_array().unwrap().len(), 0);
assert_eq!(body["cursor"], 2);
}
#[tokio::test]
async fn inbox_rejects_wrong_signer() {
let server = spawn_server().await;
let _alice = register_user(&server.base, "alice").await;
let mallory = Ed25519Secret::generate(); // not alice
let client = reqwest::Client::new();
let now = Utc::now().timestamp();
let resp = client
.get(format!("{}/v1/inbox/alice", server.base))
.header("X-KEZ-Auth", auth_header(&mallory, "alice", 0, now))
.send()
.await
.unwrap();
assert_eq!(resp.status(), StatusCode::UNAUTHORIZED);
}
#[tokio::test]
async fn inbox_rejects_missing_auth_header() {
let server = spawn_server().await;
let _alice = register_user(&server.base, "alice").await;
let resp = reqwest::get(format!("{}/v1/inbox/alice", server.base))
.await
.unwrap();
assert_eq!(resp.status(), StatusCode::UNAUTHORIZED);
}
#[tokio::test]
async fn send_to_unknown_handle_404s() {
let server = spawn_server().await;
let client = reqwest::Client::new();
let req = serde_json::json!({
"to": "ghost",
"envelope": { "v": 1 },
});
let resp = client
.post(format!("{}/v1/messages", server.base))
.json(&req)
.send()
.await
.unwrap();
assert_eq!(resp.status(), StatusCode::NOT_FOUND);
}
// Sanity: signing the same payload twice with the same Ed25519 key
// gives the same signature. Catches any accidental non-determinism in
// the JCS pipeline.
#[tokio::test]
async fn registration_signing_is_deterministic() {
let seed = "4242424242424242424242424242424242424242424242424242424242424242";
let secret = Ed25519Secret::from_seed_hex(seed).unwrap();
let payload = RegistrationPayload {
kind: REGISTRATION_TYPE.to_owned(),
version: FORMAT_VERSION,
handle: "tudisco".to_owned(),
primary: secret.identity().unwrap(),
server: "kez.lat".to_owned(),
created_at: DateTime::parse_from_rfc3339("2026-01-01T00:00:00Z")
.unwrap()
.with_timezone(&Utc),
};
let jcs1 = canonical_bytes(&payload).unwrap();
let jcs2 = canonical_bytes(&payload).unwrap();
assert_eq!(jcs1, jcs2);
let sig1 = secret.sign(&jcs1);
let sig2 = secret.sign(&jcs2);
assert_eq!(sig1, sig2);
// Hash for human eyeballing in CI logs.
let _ = Sha256::digest(&jcs1);
}