//! Nostr channel: fetches events from one or more relays and verifies that //! the event content is a KEZ proof for the requested `nostr:npub1...` //! identity. //! //! Spec §5: KEZ proofs on nostr are published as kind `30078` //! (parameterized replaceable) events. We query a relay for events with //! `authors == []` and `kinds == [30078]`, then run each //! event's `content` through the standard proof parser. //! //! Trust model for this minimal cut: a malicious relay could forge events, //! but the embedded KEZ proof carries its own signature over the primary //! key. As long as the proof's `primary == subject` (the npub case), the //! relay cannot mint a valid proof without the user's private key. Event //! signature verification is TODO for the cross-key case (e.g. an ed25519 //! primary claiming a nostr identity). use std::sync::Arc; use std::time::Duration; use async_trait::async_trait; use futures_util::{SinkExt, StreamExt}; use kez_core::{Identity, NostrSecret, nostr_pubkey_hex}; use serde::{Deserialize, Serialize}; use serde_json::{Value, json}; use sha2::{Digest, Sha256}; use tokio_tungstenite::{connect_async, tungstenite::Message}; use crate::{Channel, ChannelError, ChannelHit, ChannelResult, parse_and_verify_for}; pub const KEZ_NOSTR_KIND: u32 = 30078; const DEFAULT_RELAYS: &[&str] = &[ "wss://relay.damus.io", "wss://nos.lol", "wss://relay.primal.net", ]; const FETCH_TIMEOUT: Duration = Duration::from_secs(8); /// A nostr event in the wire shape we care about (a subset of NIP-01). #[derive(Debug, Clone, Deserialize, Serialize)] pub struct NostrEvent { pub id: String, pub pubkey: String, pub created_at: i64, pub kind: u32, #[serde(default)] pub tags: Vec>, pub content: String, pub sig: String, } /// Filter sent in a nostr REQ message. #[derive(Debug, Clone)] pub struct NostrFilter { pub authors: Vec, // lowercase hex pubkeys pub kinds: Vec, pub limit: Option, } /// Fetcher abstraction so tests can substitute canned events without /// touching the network. #[async_trait] pub trait NostrFetcher: Send + Sync { async fn fetch_events(&self, filter: &NostrFilter) -> ChannelResult>; } /// Real fetcher: queries each relay in turn (websocket), merges events, /// and times out if relays are unresponsive. pub struct RelayPoolFetcher { relays: Vec, } impl RelayPoolFetcher { pub fn new(relays: Vec) -> Self { Self { relays } } pub fn defaults() -> Self { Self::new(DEFAULT_RELAYS.iter().map(|s| (*s).to_owned()).collect()) } } #[async_trait] impl NostrFetcher for RelayPoolFetcher { async fn fetch_events(&self, filter: &NostrFilter) -> ChannelResult> { let mut last_error: Option = None; let mut events: Vec = Vec::new(); for relay in &self.relays { match query_relay(relay, filter).await { Ok(mut batch) => events.append(&mut batch), Err(err) => last_error = Some(err), } // First relay that returns anything is enough for a discovery hit; // we keep going only if we still have nothing. if !events.is_empty() { break; } } if events.is_empty() && let Some(err) = last_error { return Err(err); } Ok(events) } } async fn query_relay(url: &str, filter: &NostrFilter) -> ChannelResult> { let (mut ws, _) = connect_async(url) .await .map_err(|e| ChannelError::Unreachable(format!("connect {url}: {e}")))?; let sub_id = "kez-1"; let req = build_req_message(sub_id, filter); ws.send(Message::Text(req.into())) .await .map_err(|e| ChannelError::Unreachable(format!("send REQ {url}: {e}")))?; let mut events = Vec::new(); loop { let next = tokio::time::timeout(FETCH_TIMEOUT, ws.next()).await; let Ok(Some(msg)) = next else { break }; let msg = match msg { Ok(m) => m, Err(e) => return Err(ChannelError::Unreachable(format!("ws read {url}: {e}"))), }; let Message::Text(text) = msg else { continue }; match parse_relay_message(&text) { RelayMessage::Event(ev) => events.push(ev), RelayMessage::EndOfStored => break, RelayMessage::Other => continue, } } let _ = ws .send(Message::Text( json!(["CLOSE", sub_id]).to_string().into(), )) .await; let _ = ws.close(None).await; Ok(events) } #[derive(Clone)] pub struct NostrChannel { fetcher: Arc, } impl NostrChannel { pub fn new() -> Self { Self { fetcher: Arc::new(RelayPoolFetcher::defaults()), } } pub fn with_fetcher(fetcher: Arc) -> Self { Self { fetcher } } } impl Default for NostrChannel { fn default() -> Self { Self::new() } } #[async_trait] impl Channel for NostrChannel { fn system(&self) -> &'static str { "nostr" } async fn fetch_and_verify(&self, identity: &Identity) -> ChannelResult { let pubkey_hex = nostr_pubkey_hex(identity).map_err(|e| ChannelError::Other(e.into()))?; let filter = NostrFilter { authors: vec![pubkey_hex.clone()], kinds: vec![KEZ_NOSTR_KIND], limit: Some(20), }; let events = self.fetcher.fetch_events(&filter).await?; let mut last_error: Option = None; for event in events { if !event_matches_author(&event, &pubkey_hex) { continue; } match parse_and_verify_for(&event.content, identity) { Ok(hit) => return Ok(hit), Err(err) => last_error = Some(err), } } Err(last_error.unwrap_or_else(|| ChannelError::NotFound(identity.clone()))) } } /// Build and sign a NIP-01 event. The event id is `sha256` of the /// canonically-serialized array `[0, pubkey, created_at, kind, tags, /// content]`; the signature is Schnorr over that id. pub fn build_signed_event( signer: &NostrSecret, created_at: i64, kind: u32, tags: Vec>, content: String, ) -> ChannelResult { let pubkey_hex = signer.pubkey_hex(); let canonical = json!([0, pubkey_hex, created_at, kind, tags, content]); let canonical_str = serde_json::to_string(&canonical) .map_err(|e| ChannelError::Other(anyhow::anyhow!("event serialize: {e}")))?; let digest: [u8; 32] = Sha256::digest(canonical_str.as_bytes()).into(); let id_hex = hex::encode(digest); let sig = signer .sign_raw(&digest) .map_err(|e| ChannelError::Other(anyhow::anyhow!("schnorr sign: {e}")))?; Ok(NostrEvent { id: id_hex, pubkey: pubkey_hex, created_at, kind, tags, content, sig: hex::encode(sig), }) } /// Publish one event to a single relay over WebSocket. Returns Ok if the /// relay either acknowledges with `["OK", id, true, ...]` or closes the /// connection without rejecting. pub async fn publish_event_to_relay( relay_url: &str, event: &NostrEvent, ) -> ChannelResult<()> { let (mut ws, _) = connect_async(relay_url) .await .map_err(|e| ChannelError::Unreachable(format!("connect {relay_url}: {e}")))?; let msg = json!(["EVENT", event]).to_string(); ws.send(Message::Text(msg.into())) .await .map_err(|e| ChannelError::Unreachable(format!("send EVENT {relay_url}: {e}")))?; // Wait briefly for an OK / NOTICE response. Don't hang forever if the // relay never sends one — many relays accept and stay silent. let deadline = tokio::time::timeout(std::time::Duration::from_secs(5), async { while let Some(msg) = ws.next().await { let Ok(Message::Text(text)) = msg else { continue }; let Ok(arr) = serde_json::from_str::(&text) else { continue; }; let Some(arr) = arr.as_array() else { continue }; match arr.first().and_then(|v| v.as_str()) { Some("OK") => { // ["OK", , , ] if arr.get(2).and_then(|v| v.as_bool()) == Some(false) { let reason = arr.get(3).and_then(|v| v.as_str()).unwrap_or(""); return Err(ChannelError::Other(anyhow::anyhow!( "relay {relay_url} rejected event: {reason}" ))); } return Ok(()); } Some("NOTICE") => { // Informational; not failure on its own. Keep reading. continue; } _ => continue, } } Ok(()) }) .await; let _ = ws.close(None).await; match deadline { Ok(result) => result, Err(_) => Ok(()), // Timeout — assume accepted; we'll retry by GET later. } } /// Pure: build the JSON REQ message a nostr relay expects. pub fn build_req_message(sub_id: &str, filter: &NostrFilter) -> String { let mut spec = serde_json::Map::new(); spec.insert("authors".into(), json!(filter.authors)); spec.insert("kinds".into(), json!(filter.kinds)); if let Some(limit) = filter.limit { spec.insert("limit".into(), json!(limit)); } json!(["REQ", sub_id, Value::Object(spec)]).to_string() } /// Pure: defense against a relay that lies about authorship in the array /// envelope but sets a different `pubkey` inside the event JSON. pub fn event_matches_author(event: &NostrEvent, expected_hex: &str) -> bool { event.pubkey.eq_ignore_ascii_case(expected_hex) } /// Parsed shape of a single relay → client message. pub enum RelayMessage { Event(NostrEvent), EndOfStored, Other, } /// Pure: parse one inbound `["EVENT", sub, {…}]` / `["EOSE", sub]` / other /// frame into our enum. pub fn parse_relay_message(text: &str) -> RelayMessage { let Ok(value) = serde_json::from_str::(text) else { return RelayMessage::Other; }; let Some(arr) = value.as_array() else { return RelayMessage::Other; }; match arr.first().and_then(|v| v.as_str()) { Some("EVENT") => { let Some(ev_val) = arr.get(2) else { return RelayMessage::Other; }; match serde_json::from_value::(ev_val.clone()) { Ok(ev) => RelayMessage::Event(ev), Err(_) => RelayMessage::Other, } } Some("EOSE") => RelayMessage::EndOfStored, _ => RelayMessage::Other, } } #[cfg(test)] mod tests { use super::*; use serde_json::json; #[test] fn build_req_includes_filter_fields() { let filter = NostrFilter { authors: vec!["aa".into()], kinds: vec![30078], limit: Some(20), }; let req = build_req_message("sub-1", &filter); let parsed: Value = serde_json::from_str(&req).unwrap(); assert_eq!(parsed[0], "REQ"); assert_eq!(parsed[1], "sub-1"); assert_eq!(parsed[2]["authors"], json!(["aa"])); assert_eq!(parsed[2]["kinds"], json!([30078])); assert_eq!(parsed[2]["limit"], json!(20)); } #[test] fn build_req_omits_limit_when_none() { let filter = NostrFilter { authors: vec!["aa".into()], kinds: vec![1], limit: None, }; let req = build_req_message("s", &filter); let parsed: Value = serde_json::from_str(&req).unwrap(); assert!(parsed[2].get("limit").is_none()); } #[test] fn parse_event_message() { let frame = json!([ "EVENT", "sub-1", { "id": "0".repeat(64), "pubkey": "a".repeat(64), "created_at": 1700000000_i64, "kind": 30078, "tags": [["d", "kez"]], "content": "hello", "sig": "f".repeat(128), } ]) .to_string(); match parse_relay_message(&frame) { RelayMessage::Event(ev) => { assert_eq!(ev.kind, 30078); assert_eq!(ev.content, "hello"); assert_eq!(ev.tags, vec![vec!["d".to_owned(), "kez".to_owned()]]); } _ => panic!("expected Event"), } } #[test] fn parse_eose_message() { let frame = json!(["EOSE", "sub-1"]).to_string(); assert!(matches!( parse_relay_message(&frame), RelayMessage::EndOfStored )); } #[test] fn parse_garbage_message_is_other() { assert!(matches!(parse_relay_message("not json"), RelayMessage::Other)); assert!(matches!(parse_relay_message("{}"), RelayMessage::Other)); assert!(matches!( parse_relay_message(r#"["NOTICE","hi"]"#), RelayMessage::Other )); } #[test] fn build_signed_event_produces_valid_nip01_event() { let signer = kez_core::NostrSecret::generate(); let event = build_signed_event( &signer, 1_700_000_000, 30078, vec![vec!["d".into(), "kez-sigchain".into()]], "hello".into(), ) .unwrap(); // Basic shape: 32-byte id, 32-byte pubkey, 64-byte sig (all hex). assert_eq!(event.id.len(), 64); assert_eq!(event.pubkey.len(), 64); assert_eq!(event.sig.len(), 128); assert_eq!(event.pubkey, signer.pubkey_hex()); assert_eq!(event.kind, 30078); assert_eq!(event.content, "hello"); // The id MUST equal sha256 of the canonical serialization. let canonical = serde_json::json!([ 0, event.pubkey, event.created_at, event.kind, event.tags, event.content ]); let canonical_str = serde_json::to_string(&canonical).unwrap(); let expected_id = hex::encode(::digest(canonical_str.as_bytes())); assert_eq!(event.id, expected_id); } #[test] fn event_matches_author_is_case_insensitive() { let ev = NostrEvent { id: "x".into(), pubkey: "ABCDEF".into(), created_at: 0, kind: 30078, tags: vec![], content: String::new(), sig: String::new(), }; assert!(event_matches_author(&ev, "abcdef")); assert!(!event_matches_author(&ev, "ababab")); } }