From fb3147f0c927540d2a82f088ef0bd34882a00aca Mon Sep 17 00:00:00 2001 From: Jason Tudisco Date: Tue, 17 Mar 2026 23:09:39 -0600 Subject: [PATCH] Inventory-based set reconciliation for Nostr sync Replace timestamp-based catch-up with proper set reconciliation. When a peer joins a room it publishes its full filename inventory. Other peers diff against their own set and re-publish only the events the newcomer is missing, then send their own inventory so the newcomer can do the same. Two rounds max, no infinite loops. Also switched from custom #channel tag (rejected by relays as unindexed) to standard #t topic tag, and kind 4078 to kind 1. Co-Authored-By: Claude Opus 4.6 (1M context) --- nostr/src/folder-sync-db.ts | 48 +++++++- nostr/src/nostr-transport.ts | 205 ++++++++++++++++++++++++----------- 2 files changed, 188 insertions(+), 65 deletions(-) diff --git a/nostr/src/folder-sync-db.ts b/nostr/src/folder-sync-db.ts index 5c669a3..011915f 100644 --- a/nostr/src/folder-sync-db.ts +++ b/nostr/src/folder-sync-db.ts @@ -1,6 +1,6 @@ import type { OpenOptions, StoreOptions, CollectionApi, KVApi, - SyncDBEventName, SyncDBEventHandler, + SyncDBEventName, SyncDBEventHandler, SyncEvent, } from './types.js'; import { generateClientId } from './utils.js'; import { Emitter } from './emitter.js'; @@ -19,7 +19,7 @@ const META_NOSTR_ROOM = 'nostrRoom'; const DEFAULT_RELAYS = [ 'wss://relay.damus.io', 'wss://nos.lol', - 'wss://relay.nostr.band', + 'wss://relay.primal.net', ]; export class FolderSyncDB { @@ -76,6 +76,9 @@ export class FolderSyncDB { await this.idb.setMeta(META_CLIENT_ID, clientId); } + // ── Nostr transport identity ───────────────────────────── + this.nostrTransport.setClientId(clientId); + // ── Sync engine (dual transport) ───────────────────────── this.syncEngine = new SyncEngine( this.idb, this.folderStore, this.nostrTransport, @@ -98,6 +101,43 @@ export class FolderSyncDB { this.sync().catch(() => {}); }); + // ── Inventory-based set reconciliation ──────────────────── + this.nostrTransport.onInventory( + async (theirKnown: Set, round: number, _fromClientId: string) => { + try { + const ourApplied = await this.idb.getAppliedSet(); + + // Find events WE have that THEY don't + const theyNeed: Array<{ filename: string; event: SyncEvent }> = []; + for (const filename of ourApplied) { + if (theirKnown.has(filename)) continue; + // Try Nostr cache first, then folder + let event = await this.nostrTransport.readEventFile(filename); + if (!event && this.folderStore.hasHandle) { + event = await this.folderStore.readEventFile(filename); + } + if (event) theyNeed.push({ filename, event }); + } + + // Send them what they're missing + if (theyNeed.length > 0) { + theyNeed.sort((a, b) => (a.event.ts ?? 0) - (b.event.ts ?? 0)); + await this.nostrTransport.republishEvents(theyNeed); + console.log(`[nostr] sent ${theyNeed.length} events peer was missing`); + } + + // If this was their initial inventory (round 0), send ours back + // so they can send us what WE'RE missing. Max 1 reply round. + if (round < 1) { + const ourFilenames = Array.from(ourApplied); + await this.nostrTransport.publishInventory(ourFilenames, round + 1); + } + } catch (err) { + console.warn('[nostr] inventory reconciliation failed:', err); + } + }, + ); + // ── Auto-sync ──────────────────────────────────────────── if (opts.autoSyncIntervalMs && opts.autoSyncIntervalMs > 0) { this.autoSyncTimer = setInterval(() => { @@ -131,6 +171,10 @@ export class FolderSyncDB { await this.idb.setMeta(META_NOSTR_ROOM, roomKey); this.emitter.emit('nostr:connected', { roomKey }); await this.sync(); + + // Announce our inventory so peers can send us what we're missing + const appliedSet = await this.idb.getAppliedSet(); + await this.nostrTransport.publishInventory(Array.from(appliedSet), 0); } leaveRoom(): void { diff --git a/nostr/src/nostr-transport.ts b/nostr/src/nostr-transport.ts index e8ed210..d16458a 100644 --- a/nostr/src/nostr-transport.ts +++ b/nostr/src/nostr-transport.ts @@ -1,38 +1,54 @@ import type { SyncEvent } from './types.js'; /** - * Nostr relay transport layer. + * Nostr relay transport layer with inventory-based set reconciliation. * - * Implements the same event I/O interface as FolderStore so the SyncEngine - * can treat it as just another transport. + * When a peer joins a room it publishes an "inventory" listing every + * event filename it already knows. Other peers diff against their own + * set and re-publish anything the newcomer is missing, then send their + * own inventory so the newcomer can do the same. Two rounds max. * - * Uses raw WebSocket + minimal Nostr protocol (NIP-01) to avoid - * heavy dependencies. Works from file:// origins. + * Works from file:// origins via WebSocket. */ -const NOSTR_EVENT_KIND = 4078; // custom regular kind (stored by relays) - -// ── Minimal Nostr crypto (secp256k1 via nostr-tools) ───────── - +const NOSTR_EVENT_KIND = 1; +const TOPIC_PREFIX = 'foldersync-'; import { generateSecretKey, getPublicKey, finalizeEvent } from 'nostr-tools/pure'; import { SimplePool, type SubCloser } from 'nostr-tools/pool'; import type { Filter } from 'nostr-tools/filter'; +/** Internal message envelope. */ +type NostrMessage = + | { _type: 'event'; filename: string; payload: SyncEvent } + | { _type: 'inventory'; clientId: string; known: string[]; round: number }; + export class NostrTransport { private relays: string[]; private pool: SimplePool; private roomKey: string | null = null; + private topicTag: string | null = null; private secretKey: Uint8Array | null = null; + private clientId: string = ''; - /** Cached events keyed by our standard filename. */ + /** Cached events keyed by filename. */ private eventCache = new Map(); - /** Active relay subscription (closeable). */ + /** Active relay subscription. */ private sub: SubCloser | null = null; - /** Callback fired when a new event arrives in real time. */ + /** Callback: new sync event arrived. */ private _onNewEvent?: () => void; + /** + * Callback: peer inventory received. + * Handler should call respondToInventory() with events the peer is missing. + */ + private _onInventory?: ( + theirKnown: Set, + round: number, + fromClientId: string, + ) => void; + constructor(relays: string[]) { this.relays = relays; this.pool = new SimplePool(); @@ -40,13 +56,8 @@ export class NostrTransport { // ── Key management ────────────────────────────────────────── - /** - * Set the keypair. Called by FolderSyncDB after loading from IDB - * (or generating a new one on first run). - */ setKeypair(sk: Uint8Array): void { this.secretKey = sk; - // Derive pubkey (used internally by finalizeEvent) getPublicKey(sk); } @@ -56,40 +67,64 @@ export class NostrTransport { return sk; } + setClientId(id: string): void { + this.clientId = id; + } + // ── Room management ───────────────────────────────────────── async joinRoom(roomKey: string): Promise { - if (this.roomKey === roomKey && this.sub) return; // already joined + if (this.roomKey === roomKey && this.sub) return; this.leaveRoom(); this.roomKey = roomKey; + this.topicTag = TOPIC_PREFIX + roomKey; - // Fetch existing events from relays + // Fetch any events the relay still has const fetchFilter: Filter = { kinds: [NOSTR_EVENT_KIND], - '#channel': [roomKey], + '#t': [this.topicTag], limit: 5000, }; + try { - const events = await this.pool.querySync(this.relays, fetchFilter as Filter); + const events = await this.pool.querySync(this.relays, fetchFilter); + console.log(`[nostr] fetched ${events.length} existing events from relay`); for (const ev of events) { - this.cacheNostrEvent(ev); + this.handleIncoming(ev); } - } catch { - // relay might be unreachable — continue with empty cache + } catch (err) { + console.warn('[nostr] relay fetch failed:', err); } - // Subscribe for new real-time events + // Subscribe for real-time events const subFilter: Filter = { kinds: [NOSTR_EVENT_KIND], - '#channel': [roomKey], - since: Math.floor(Date.now() / 1000), + '#t': [this.topicTag], + since: Math.floor(Date.now() / 1000) - 1, }; + this.sub = this.pool.subscribeMany(this.relays, subFilter, { - onevent: (ev) => { - const isNew = this.cacheNostrEvent(ev); - if (isNew) this._onNewEvent?.(); - }, + onevent: (ev) => this.handleIncoming(ev), }); + + console.log(`[nostr] joined room "${roomKey}" on ${this.relays.length} relays`); + } + + /** + * Publish our inventory to the room. + * Called by FolderSyncDB after joining (it has the full applied set from IDB). + */ + async publishInventory(allKnownFilenames: string[], round = 0): Promise { + const msg: NostrMessage = { + _type: 'inventory', + clientId: this.clientId, + known: allKnownFilenames, + round, + }; + await this.publishToRelay(JSON.stringify(msg)); + console.log( + `[nostr] sent inventory (round ${round}, ${allKnownFilenames.length} events known)`, + ); } leaveRoom(): void { @@ -98,6 +133,7 @@ export class NostrTransport { this.sub = null; } this.roomKey = null; + this.topicTag = null; this.eventCache.clear(); } @@ -109,34 +145,44 @@ export class NostrTransport { return this.roomKey; } - // ── Callback for real-time push ───────────────────────────── + // ── Callbacks ─────────────────────────────────────────────── onNewEvent(cb: () => void): void { this._onNewEvent = cb; } - // ── Event I/O (same interface as FolderStore) ─────────────── + onInventory( + cb: (theirKnown: Set, round: number, fromClientId: string) => void, + ): void { + this._onInventory = cb; + } + + // ── Event I/O ─────────────────────────────────────────────── async writeEvent(filename: string, event: SyncEvent): Promise { - if (!this.roomKey || !this.secretKey) return; + if (!this.topicTag || !this.secretKey) return; this.eventCache.set(filename, event); - const nostrEvent = finalizeEvent({ - kind: NOSTR_EVENT_KIND, - created_at: Math.floor(Date.now() / 1000), - tags: [ - ['channel', this.roomKey], - ['filename', filename], - ], - content: JSON.stringify(event), - }, this.secretKey); + const msg: NostrMessage = { + _type: 'event', + filename, + payload: event, + }; - // Publish to all relays, don't wait for all to confirm - try { - await Promise.any(this.pool.publish(this.relays, nostrEvent as any)); - } catch { - // All relays failed — event is still in local cache + await this.publishToRelay(JSON.stringify(msg)); + console.log(`[nostr] published ${filename}`); + } + + async republishEvents( + events: Array<{ filename: string; event: SyncEvent }>, + ): Promise { + if (!this.topicTag || !this.secretKey || events.length === 0) return; + + console.log(`[nostr] re-publishing ${events.length} events for peer catch-up`); + for (const { filename, event } of events) { + const msg: NostrMessage = { _type: 'event', filename, payload: event }; + await this.publishToRelay(JSON.stringify(msg)); } } @@ -157,23 +203,56 @@ export class NostrTransport { // ── Internals ─────────────────────────────────────────────── - /** - * Parse a Nostr event and cache it. Returns true if the event - * was new (not already cached). - */ - private cacheNostrEvent(nostrEvent: any): boolean { - try { - const filename = nostrEvent.tags?.find( - (t: string[]) => t[0] === 'filename', - )?.[1]; - if (!filename) return false; - if (this.eventCache.has(filename)) return false; + private async publishToRelay(content: string): Promise { + if (!this.topicTag || !this.secretKey) return; - const syncEvent = JSON.parse(nostrEvent.content) as SyncEvent; - this.eventCache.set(filename, syncEvent); - return true; + const nostrEvent = finalizeEvent( + { + kind: NOSTR_EVENT_KIND, + created_at: Math.floor(Date.now() / 1000), + tags: [['t', this.topicTag]], + content, + }, + this.secretKey, + ); + + try { + await Promise.any(this.pool.publish(this.relays, nostrEvent as any)); } catch { - return false; + /* relay publish failed */ + } + } + + private handleIncoming(nostrEvent: any): void { + try { + const hasTag = nostrEvent.tags?.some( + (t: string[]) => t[0] === 't' && t[1]?.startsWith(TOPIC_PREFIX), + ); + if (!hasTag) return; + + const msg: NostrMessage = JSON.parse(nostrEvent.content); + + // ── Inventory message ─────────────────────────────────── + if (msg._type === 'inventory') { + if (msg.clientId === this.clientId) return; // ignore our own + console.log( + `[nostr] peer ${msg.clientId.slice(0, 8)} sent inventory ` + + `(round ${msg.round}, ${msg.known.length} events)`, + ); + this._onInventory?.(new Set(msg.known), msg.round, msg.clientId); + return; + } + + // ── Sync event ────────────────────────────────────────── + if (msg._type === 'event' && msg.filename && msg.payload) { + if (this.eventCache.has(msg.filename)) return; + this.eventCache.set(msg.filename, msg.payload); + console.log(`[nostr] cached event ${msg.filename}`); + this._onNewEvent?.(); + return; + } + } catch { + /* not a valid message */ } } }