Chat was polling every 5s, which felt sluggish with two users online.
Switched to Server-Sent Events for push delivery. Polling now runs as
a 30s heartbeat just to catch anything missed during reconnect windows.
NATS is still bundled in docker-compose but no Rust code talks to it
yet — that lands in v0.2 for cross-instance fanout. The migration is
"swap the in-process broker for nats.publish/subscribe against
kez.chat.inbox.<handle>"; SSE subscribers don't notice.
Server (kez-chat-server):
• New broker module: per-recipient tokio::sync::broadcast channels,
in-process pub/sub. 64-slot buffer per channel; lagging subscribers
drop on the floor and resync via the polling heartbeat. 4 unit
tests cover subscribe/publish, multi-subscriber fanout, per-handle
isolation, no-op on no-subscribers.
• POST /v1/messages now publishes to broker after persisting → any
open SSE stream for the recipient gets the envelope immediately.
• New GET /v1/inbox/:handle/stream — SSE endpoint, ?auth=<ts>:<sig>
query param (EventSource can't set headers). Signed message is
distinct from the polling header ("GET\n/v1/inbox/<h>/stream\n<ts>"
vs "GET\n/v1/inbox/<h>\nsince=<n>\n<ts>") so a captured poll sig
can't be replayed as a stream sig and vice versa.
• 15s SSE keep-alive ping so Cloudflare/NAT/load balancers don't
drop idle connections.
• 3 new stream-auth unit tests, including the cross-endpoint replay
rejection. 19 unit + 20 integration tests all green.
• New deps: tokio-stream (sync feature for BroadcastStream),
futures (for the Stream trait the Sse handler returns).
Browser (kez-chat/web):
• streamInbox() in lib/messages.ts: long-lived EventSource,
auto-reconnects on error with fresh auth (tears down on `error`,
re-opens after 3s — EventSource's native retry uses the stale URL).
Exposes onMessage + onStatus callbacks.
• Messages.svelte: opens SSE on mount, decrypts pushed envelopes
inline via the new shared ingest() helper. Polling dropped from
5s → 30s heartbeat.
• Sidebar footer shows live status:
● live (green)
● reconnecting… (amber)
○ connecting… (gray)
Verified live: /v1/inbox/<registered>/stream?auth=bad returns 401,
no-auth returns 400. Asset index-C1ogRtUG.js serving.
Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
470 lines
15 KiB
Rust
470 lines
15 KiB
Rust
//! Integration tests: stand up the real router on a random local port,
|
|
//! drive it with `reqwest`. No mocks — exercises the full HTTP + SQLite +
|
|
//! kez-core signature path.
|
|
|
|
use std::net::SocketAddr;
|
|
use std::path::PathBuf;
|
|
|
|
use chrono::{DateTime, Utc};
|
|
use kez_chat_server::{AppState, Config, Store, router};
|
|
use kez_chat_server::registration::{
|
|
ENVELOPE_TAG, FORMAT_VERSION, REGISTRATION_TYPE, RegistrationPayload, SignedRegistration,
|
|
};
|
|
use kez_core::{
|
|
Ed25519Secret, Identity, SignatureBlock, ED25519_SHA512_ALG, canonical_bytes,
|
|
};
|
|
use reqwest::StatusCode;
|
|
use serde_json::Value;
|
|
use sha2::{Digest, Sha256};
|
|
|
|
struct TestServer {
|
|
base: String,
|
|
#[allow(dead_code)]
|
|
handle: tokio::task::JoinHandle<()>,
|
|
}
|
|
|
|
async fn spawn_server() -> TestServer {
|
|
spawn_server_with_config(default_config()).await
|
|
}
|
|
|
|
fn default_config() -> Config {
|
|
Config {
|
|
bind: SocketAddr::from(([127, 0, 0, 1], 0)),
|
|
db: PathBuf::from(":memory:"), // unused (we open in-memory below)
|
|
server: "kez.test".to_owned(),
|
|
sig_server_url: "http://sig.test".to_owned(),
|
|
web_dir: None,
|
|
}
|
|
}
|
|
|
|
async fn spawn_server_with_config(config: Config) -> TestServer {
|
|
let store = Store::open_in_memory().unwrap();
|
|
let state = AppState {
|
|
store,
|
|
config,
|
|
broker: kez_chat_server::broker::Broker::new(),
|
|
};
|
|
let app = router(state);
|
|
let listener = tokio::net::TcpListener::bind(SocketAddr::from(([127, 0, 0, 1], 0)))
|
|
.await
|
|
.unwrap();
|
|
let addr = listener.local_addr().unwrap();
|
|
let handle = tokio::spawn(async move {
|
|
axum::serve(listener, app).await.unwrap();
|
|
});
|
|
TestServer {
|
|
base: format!("http://{addr}"),
|
|
handle,
|
|
}
|
|
}
|
|
|
|
fn sign_registration(
|
|
secret: &Ed25519Secret,
|
|
handle: &str,
|
|
server: &str,
|
|
created_at: DateTime<Utc>,
|
|
) -> SignedRegistration {
|
|
let primary = secret.identity().unwrap();
|
|
let payload = RegistrationPayload {
|
|
kind: REGISTRATION_TYPE.to_owned(),
|
|
version: FORMAT_VERSION,
|
|
handle: handle.to_owned(),
|
|
primary: primary.clone(),
|
|
server: server.to_owned(),
|
|
created_at,
|
|
};
|
|
let jcs = canonical_bytes(&payload).unwrap();
|
|
let sig = secret.sign(&jcs);
|
|
SignedRegistration {
|
|
kez: ENVELOPE_TAG.to_owned(),
|
|
payload,
|
|
signature: SignatureBlock {
|
|
alg: ED25519_SHA512_ALG.to_owned(),
|
|
key: primary,
|
|
sig: hex::encode(sig),
|
|
},
|
|
}
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn healthz_returns_ok() {
|
|
let server = spawn_server().await;
|
|
let resp = reqwest::get(format!("{}/v1/healthz", server.base))
|
|
.await
|
|
.unwrap();
|
|
assert_eq!(resp.status(), StatusCode::OK);
|
|
let body: Value = resp.json().await.unwrap();
|
|
assert_eq!(body["status"], "ok");
|
|
assert_eq!(body["server"], "kez.test");
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn by_primary_round_trips() {
|
|
let server = spawn_server().await;
|
|
let secret = register_user(&server.base, "tudisco").await;
|
|
let primary = secret.identity().unwrap().to_string();
|
|
|
|
let resp = reqwest::get(format!("{}/v1/by-primary/{primary}", server.base))
|
|
.await
|
|
.unwrap();
|
|
assert_eq!(resp.status(), StatusCode::OK);
|
|
let body: Value = resp.json().await.unwrap();
|
|
assert_eq!(body["handle"], "tudisco");
|
|
assert_eq!(body["primary"], primary);
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn by_primary_unknown_404() {
|
|
let server = spawn_server().await;
|
|
let unregistered = Ed25519Secret::generate().identity().unwrap().to_string();
|
|
let resp = reqwest::get(format!("{}/v1/by-primary/{unregistered}", server.base))
|
|
.await
|
|
.unwrap();
|
|
assert_eq!(resp.status(), StatusCode::NOT_FOUND);
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn by_primary_garbage_400() {
|
|
let server = spawn_server().await;
|
|
let resp = reqwest::get(format!("{}/v1/by-primary/not-a-primary", server.base))
|
|
.await
|
|
.unwrap();
|
|
assert_eq!(resp.status(), StatusCode::BAD_REQUEST);
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn unknown_handle_returns_404() {
|
|
let server = spawn_server().await;
|
|
let resp = reqwest::get(format!("{}/v1/u/ghost", server.base))
|
|
.await
|
|
.unwrap();
|
|
assert_eq!(resp.status(), StatusCode::NOT_FOUND);
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn register_then_lookup_round_trip() {
|
|
let server = spawn_server().await;
|
|
let secret = Ed25519Secret::generate();
|
|
let req = sign_registration(&secret, "tudisco", "kez.test", Utc::now());
|
|
|
|
let client = reqwest::Client::new();
|
|
let post = client
|
|
.post(format!("{}/v1/register", server.base))
|
|
.json(&req)
|
|
.send()
|
|
.await
|
|
.unwrap();
|
|
assert_eq!(post.status(), StatusCode::CREATED);
|
|
let posted: Value = post.json().await.unwrap();
|
|
assert_eq!(posted["handle"], "tudisco");
|
|
assert_eq!(posted["fqhn"], "tudisco@kez.test");
|
|
|
|
let get = reqwest::get(format!("{}/v1/u/tudisco", server.base))
|
|
.await
|
|
.unwrap();
|
|
assert_eq!(get.status(), StatusCode::OK);
|
|
let looked: Value = get.json().await.unwrap();
|
|
assert_eq!(looked["handle"], "tudisco");
|
|
assert_eq!(looked["primary"], secret.identity().unwrap().to_string());
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn rejects_duplicate_handle() {
|
|
let server = spawn_server().await;
|
|
let a = Ed25519Secret::generate();
|
|
let b = Ed25519Secret::generate();
|
|
|
|
let req_a = sign_registration(&a, "tudisco", "kez.test", Utc::now());
|
|
let req_b = sign_registration(&b, "tudisco", "kez.test", Utc::now());
|
|
|
|
let client = reqwest::Client::new();
|
|
let r1 = client
|
|
.post(format!("{}/v1/register", server.base))
|
|
.json(&req_a)
|
|
.send()
|
|
.await
|
|
.unwrap();
|
|
assert_eq!(r1.status(), StatusCode::CREATED);
|
|
|
|
let r2 = client
|
|
.post(format!("{}/v1/register", server.base))
|
|
.json(&req_b)
|
|
.send()
|
|
.await
|
|
.unwrap();
|
|
assert_eq!(r2.status(), StatusCode::CONFLICT);
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn rejects_wrong_server() {
|
|
let server = spawn_server().await;
|
|
let secret = Ed25519Secret::generate();
|
|
let req = sign_registration(&secret, "tudisco", "other.example", Utc::now());
|
|
let client = reqwest::Client::new();
|
|
let resp = client
|
|
.post(format!("{}/v1/register", server.base))
|
|
.json(&req)
|
|
.send()
|
|
.await
|
|
.unwrap();
|
|
assert_eq!(resp.status(), StatusCode::BAD_REQUEST);
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn rejects_reserved_handle() {
|
|
let server = spawn_server().await;
|
|
let secret = Ed25519Secret::generate();
|
|
let req = sign_registration(&secret, "admin", "kez.test", Utc::now());
|
|
let client = reqwest::Client::new();
|
|
let resp = client
|
|
.post(format!("{}/v1/register", server.base))
|
|
.json(&req)
|
|
.send()
|
|
.await
|
|
.unwrap();
|
|
assert_eq!(resp.status(), StatusCode::FORBIDDEN);
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn rejects_tampered_signature() {
|
|
let server = spawn_server().await;
|
|
let secret = Ed25519Secret::generate();
|
|
let mut req = sign_registration(&secret, "tudisco", "kez.test", Utc::now());
|
|
// Tamper: flip the handle after signing. Signature still references
|
|
// the original handle, but payload now claims a different one.
|
|
req.payload.handle = "imposter".to_owned();
|
|
|
|
let client = reqwest::Client::new();
|
|
let resp = client
|
|
.post(format!("{}/v1/register", server.base))
|
|
.json(&req)
|
|
.send()
|
|
.await
|
|
.unwrap();
|
|
assert_eq!(resp.status(), StatusCode::BAD_REQUEST);
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn rejects_stale_timestamp() {
|
|
let server = spawn_server().await;
|
|
let secret = Ed25519Secret::generate();
|
|
let stale = Utc::now() - chrono::Duration::hours(1);
|
|
let req = sign_registration(&secret, "tudisco", "kez.test", stale);
|
|
|
|
let client = reqwest::Client::new();
|
|
let resp = client
|
|
.post(format!("{}/v1/register", server.base))
|
|
.json(&req)
|
|
.send()
|
|
.await
|
|
.unwrap();
|
|
assert_eq!(resp.status(), StatusCode::BAD_REQUEST);
|
|
let body: Value = resp.json().await.unwrap();
|
|
let msg = body["error"]["message"].as_str().unwrap();
|
|
assert!(msg.contains("created_at"), "got: {msg}");
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn webfinger_finds_registered_user() {
|
|
let server = spawn_server().await;
|
|
let secret = Ed25519Secret::generate();
|
|
let req = sign_registration(&secret, "tudisco", "kez.test", Utc::now());
|
|
let client = reqwest::Client::new();
|
|
client
|
|
.post(format!("{}/v1/register", server.base))
|
|
.json(&req)
|
|
.send()
|
|
.await
|
|
.unwrap();
|
|
|
|
let url = format!(
|
|
"{}/.well-known/webfinger?resource=acct:tudisco@kez.test",
|
|
server.base
|
|
);
|
|
let resp = reqwest::get(&url).await.unwrap();
|
|
assert_eq!(resp.status(), StatusCode::OK);
|
|
let body: Value = resp.json().await.unwrap();
|
|
assert_eq!(body["subject"], "acct:tudisco@kez.test");
|
|
assert!(body["links"].is_array());
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn webfinger_rejects_wrong_server() {
|
|
let server = spawn_server().await;
|
|
let resp = reqwest::get(format!(
|
|
"{}/.well-known/webfinger?resource=acct:tudisco@other.example",
|
|
server.base
|
|
))
|
|
.await
|
|
.unwrap();
|
|
assert_eq!(resp.status(), StatusCode::NOT_FOUND);
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn placeholder_index_renders() {
|
|
let server = spawn_server().await;
|
|
let resp = reqwest::get(format!("{}/", server.base)).await.unwrap();
|
|
assert_eq!(resp.status(), StatusCode::OK);
|
|
let text = resp.text().await.unwrap();
|
|
assert!(text.contains("kez-chat"));
|
|
assert!(text.contains("kez.test"));
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn nats_auth_callout_stub_returns_not_implemented() {
|
|
let server = spawn_server().await;
|
|
let client = reqwest::Client::new();
|
|
let resp = client
|
|
.post(format!("{}/internal/nats/auth", server.base))
|
|
.json(&serde_json::json!({}))
|
|
.send()
|
|
.await
|
|
.unwrap();
|
|
assert_eq!(resp.status(), StatusCode::NOT_IMPLEMENTED);
|
|
}
|
|
|
|
// ─── messages: send + inbox round-trip ───────────────────────────────────────
|
|
|
|
/// Helper: register a handle, return (handle, secret) for use as recipient.
|
|
async fn register_user(base: &str, handle: &str) -> Ed25519Secret {
|
|
let secret = Ed25519Secret::generate();
|
|
let req = sign_registration(&secret, handle, "kez.test", Utc::now());
|
|
let client = reqwest::Client::new();
|
|
let resp = client
|
|
.post(format!("{base}/v1/register"))
|
|
.json(&req)
|
|
.send()
|
|
.await
|
|
.unwrap();
|
|
assert_eq!(resp.status(), StatusCode::CREATED, "registration failed");
|
|
secret
|
|
}
|
|
|
|
/// Helper: build the X-KEZ-Auth header value for a given recipient + cursor.
|
|
fn auth_header(secret: &Ed25519Secret, handle: &str, since: i64, ts: i64) -> String {
|
|
let msg = kez_chat_server::messages::canonical_inbox_message(handle, since, ts);
|
|
let sig = secret.sign(msg.as_bytes());
|
|
format!("{ts}:{}", hex::encode(sig))
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn send_and_inbox_round_trip() {
|
|
let server = spawn_server().await;
|
|
let alice = register_user(&server.base, "alice").await;
|
|
let _bob = register_user(&server.base, "bob").await;
|
|
let client = reqwest::Client::new();
|
|
|
|
// bob sends two opaque envelopes to alice (server doesn't introspect).
|
|
for body in ["hello alice", "this is the second one"] {
|
|
let req = serde_json::json!({
|
|
"to": "alice",
|
|
"envelope": { "v": 1, "ciphertext": body, "from": "bob" },
|
|
});
|
|
let resp = client
|
|
.post(format!("{}/v1/messages", server.base))
|
|
.json(&req)
|
|
.send()
|
|
.await
|
|
.unwrap();
|
|
assert_eq!(resp.status(), StatusCode::OK);
|
|
}
|
|
|
|
// alice polls her inbox with a fresh auth header.
|
|
let now = Utc::now().timestamp();
|
|
let resp = client
|
|
.get(format!("{}/v1/inbox/alice", server.base))
|
|
.header("X-KEZ-Auth", auth_header(&alice, "alice", 0, now))
|
|
.send()
|
|
.await
|
|
.unwrap();
|
|
assert_eq!(resp.status(), StatusCode::OK);
|
|
let body: Value = resp.json().await.unwrap();
|
|
assert_eq!(body["messages"].as_array().unwrap().len(), 2);
|
|
assert_eq!(body["messages"][0]["envelope"]["ciphertext"], "hello alice");
|
|
assert_eq!(body["cursor"], 2);
|
|
|
|
// Polling again with cursor=2 yields nothing new.
|
|
let resp = client
|
|
.get(format!("{}/v1/inbox/alice?since=2", server.base))
|
|
.header("X-KEZ-Auth", auth_header(&alice, "alice", 2, now))
|
|
.send()
|
|
.await
|
|
.unwrap();
|
|
let body: Value = resp.json().await.unwrap();
|
|
assert_eq!(body["messages"].as_array().unwrap().len(), 0);
|
|
assert_eq!(body["cursor"], 2);
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn inbox_rejects_wrong_signer() {
|
|
let server = spawn_server().await;
|
|
let _alice = register_user(&server.base, "alice").await;
|
|
let mallory = Ed25519Secret::generate(); // not alice
|
|
let client = reqwest::Client::new();
|
|
|
|
let now = Utc::now().timestamp();
|
|
let resp = client
|
|
.get(format!("{}/v1/inbox/alice", server.base))
|
|
.header("X-KEZ-Auth", auth_header(&mallory, "alice", 0, now))
|
|
.send()
|
|
.await
|
|
.unwrap();
|
|
assert_eq!(resp.status(), StatusCode::UNAUTHORIZED);
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn inbox_rejects_missing_auth_header() {
|
|
let server = spawn_server().await;
|
|
let _alice = register_user(&server.base, "alice").await;
|
|
let resp = reqwest::get(format!("{}/v1/inbox/alice", server.base))
|
|
.await
|
|
.unwrap();
|
|
assert_eq!(resp.status(), StatusCode::UNAUTHORIZED);
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn send_to_unknown_handle_404s() {
|
|
let server = spawn_server().await;
|
|
let client = reqwest::Client::new();
|
|
let req = serde_json::json!({
|
|
"to": "ghost",
|
|
"envelope": { "v": 1 },
|
|
});
|
|
let resp = client
|
|
.post(format!("{}/v1/messages", server.base))
|
|
.json(&req)
|
|
.send()
|
|
.await
|
|
.unwrap();
|
|
assert_eq!(resp.status(), StatusCode::NOT_FOUND);
|
|
}
|
|
|
|
// Sanity: signing the same payload twice with the same Ed25519 key
|
|
// gives the same signature. Catches any accidental non-determinism in
|
|
// the JCS pipeline.
|
|
#[tokio::test]
|
|
async fn registration_signing_is_deterministic() {
|
|
let seed = "4242424242424242424242424242424242424242424242424242424242424242";
|
|
let secret = Ed25519Secret::from_seed_hex(seed).unwrap();
|
|
let payload = RegistrationPayload {
|
|
kind: REGISTRATION_TYPE.to_owned(),
|
|
version: FORMAT_VERSION,
|
|
handle: "tudisco".to_owned(),
|
|
primary: secret.identity().unwrap(),
|
|
server: "kez.lat".to_owned(),
|
|
created_at: DateTime::parse_from_rfc3339("2026-01-01T00:00:00Z")
|
|
.unwrap()
|
|
.with_timezone(&Utc),
|
|
};
|
|
let jcs1 = canonical_bytes(&payload).unwrap();
|
|
let jcs2 = canonical_bytes(&payload).unwrap();
|
|
assert_eq!(jcs1, jcs2);
|
|
|
|
let sig1 = secret.sign(&jcs1);
|
|
let sig2 = secret.sign(&jcs2);
|
|
assert_eq!(sig1, sig2);
|
|
|
|
// Hash for human eyeballing in CI logs.
|
|
let _ = Sha256::digest(&jcs1);
|
|
}
|