Inventory-based set reconciliation for Nostr sync

Replace timestamp-based catch-up with proper set reconciliation.
When a peer joins a room it publishes its full filename inventory.
Other peers diff against their own set and re-publish only the
events the newcomer is missing, then send their own inventory so
the newcomer can do the same. Two rounds max, no infinite loops.

Also switched from custom #channel tag (rejected by relays as
unindexed) to standard #t topic tag, and kind 4078 to kind 1.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
Jason Tudisco 2026-03-17 23:09:39 -06:00
parent 63ce105114
commit fb3147f0c9
2 changed files with 188 additions and 65 deletions

View File

@ -1,6 +1,6 @@
import type {
OpenOptions, StoreOptions, CollectionApi, KVApi,
SyncDBEventName, SyncDBEventHandler,
SyncDBEventName, SyncDBEventHandler, SyncEvent,
} from './types.js';
import { generateClientId } from './utils.js';
import { Emitter } from './emitter.js';
@ -19,7 +19,7 @@ const META_NOSTR_ROOM = 'nostrRoom';
const DEFAULT_RELAYS = [
'wss://relay.damus.io',
'wss://nos.lol',
'wss://relay.nostr.band',
'wss://relay.primal.net',
];
export class FolderSyncDB {
@ -76,6 +76,9 @@ export class FolderSyncDB {
await this.idb.setMeta(META_CLIENT_ID, clientId);
}
// ── Nostr transport identity ─────────────────────────────
this.nostrTransport.setClientId(clientId);
// ── Sync engine (dual transport) ─────────────────────────
this.syncEngine = new SyncEngine(
this.idb, this.folderStore, this.nostrTransport,
@ -98,6 +101,43 @@ export class FolderSyncDB {
this.sync().catch(() => {});
});
// ── Inventory-based set reconciliation ────────────────────
this.nostrTransport.onInventory(
async (theirKnown: Set<string>, round: number, _fromClientId: string) => {
try {
const ourApplied = await this.idb.getAppliedSet();
// Find events WE have that THEY don't
const theyNeed: Array<{ filename: string; event: SyncEvent }> = [];
for (const filename of ourApplied) {
if (theirKnown.has(filename)) continue;
// Try Nostr cache first, then folder
let event = await this.nostrTransport.readEventFile(filename);
if (!event && this.folderStore.hasHandle) {
event = await this.folderStore.readEventFile(filename);
}
if (event) theyNeed.push({ filename, event });
}
// Send them what they're missing
if (theyNeed.length > 0) {
theyNeed.sort((a, b) => (a.event.ts ?? 0) - (b.event.ts ?? 0));
await this.nostrTransport.republishEvents(theyNeed);
console.log(`[nostr] sent ${theyNeed.length} events peer was missing`);
}
// If this was their initial inventory (round 0), send ours back
// so they can send us what WE'RE missing. Max 1 reply round.
if (round < 1) {
const ourFilenames = Array.from(ourApplied);
await this.nostrTransport.publishInventory(ourFilenames, round + 1);
}
} catch (err) {
console.warn('[nostr] inventory reconciliation failed:', err);
}
},
);
// ── Auto-sync ────────────────────────────────────────────
if (opts.autoSyncIntervalMs && opts.autoSyncIntervalMs > 0) {
this.autoSyncTimer = setInterval(() => {
@ -131,6 +171,10 @@ export class FolderSyncDB {
await this.idb.setMeta(META_NOSTR_ROOM, roomKey);
this.emitter.emit('nostr:connected', { roomKey });
await this.sync();
// Announce our inventory so peers can send us what we're missing
const appliedSet = await this.idb.getAppliedSet();
await this.nostrTransport.publishInventory(Array.from(appliedSet), 0);
}
leaveRoom(): void {

View File

@ -1,38 +1,54 @@
import type { SyncEvent } from './types.js';
/**
* Nostr relay transport layer.
* Nostr relay transport layer with inventory-based set reconciliation.
*
* Implements the same event I/O interface as FolderStore so the SyncEngine
* can treat it as just another transport.
* When a peer joins a room it publishes an "inventory" listing every
* event filename it already knows. Other peers diff against their own
* set and re-publish anything the newcomer is missing, then send their
* own inventory so the newcomer can do the same. Two rounds max.
*
* Uses raw WebSocket + minimal Nostr protocol (NIP-01) to avoid
* heavy dependencies. Works from file:// origins.
* Works from file:// origins via WebSocket.
*/
const NOSTR_EVENT_KIND = 4078; // custom regular kind (stored by relays)
// ── Minimal Nostr crypto (secp256k1 via nostr-tools) ─────────
const NOSTR_EVENT_KIND = 1;
const TOPIC_PREFIX = 'foldersync-';
import { generateSecretKey, getPublicKey, finalizeEvent } from 'nostr-tools/pure';
import { SimplePool, type SubCloser } from 'nostr-tools/pool';
import type { Filter } from 'nostr-tools/filter';
/** Internal message envelope. */
type NostrMessage =
| { _type: 'event'; filename: string; payload: SyncEvent }
| { _type: 'inventory'; clientId: string; known: string[]; round: number };
export class NostrTransport {
private relays: string[];
private pool: SimplePool;
private roomKey: string | null = null;
private topicTag: string | null = null;
private secretKey: Uint8Array | null = null;
private clientId: string = '';
/** Cached events keyed by our standard filename. */
/** Cached events keyed by filename. */
private eventCache = new Map<string, SyncEvent>();
/** Active relay subscription (closeable). */
/** Active relay subscription. */
private sub: SubCloser | null = null;
/** Callback fired when a new event arrives in real time. */
/** Callback: new sync event arrived. */
private _onNewEvent?: () => void;
/**
* Callback: peer inventory received.
* Handler should call respondToInventory() with events the peer is missing.
*/
private _onInventory?: (
theirKnown: Set<string>,
round: number,
fromClientId: string,
) => void;
constructor(relays: string[]) {
this.relays = relays;
this.pool = new SimplePool();
@ -40,13 +56,8 @@ export class NostrTransport {
// ── Key management ──────────────────────────────────────────
/**
* Set the keypair. Called by FolderSyncDB after loading from IDB
* (or generating a new one on first run).
*/
setKeypair(sk: Uint8Array): void {
this.secretKey = sk;
// Derive pubkey (used internally by finalizeEvent)
getPublicKey(sk);
}
@ -56,40 +67,64 @@ export class NostrTransport {
return sk;
}
setClientId(id: string): void {
this.clientId = id;
}
// ── Room management ─────────────────────────────────────────
async joinRoom(roomKey: string): Promise<void> {
if (this.roomKey === roomKey && this.sub) return; // already joined
if (this.roomKey === roomKey && this.sub) return;
this.leaveRoom();
this.roomKey = roomKey;
this.topicTag = TOPIC_PREFIX + roomKey;
// Fetch existing events from relays
// Fetch any events the relay still has
const fetchFilter: Filter = {
kinds: [NOSTR_EVENT_KIND],
'#channel': [roomKey],
'#t': [this.topicTag],
limit: 5000,
};
try {
const events = await this.pool.querySync(this.relays, fetchFilter as Filter);
const events = await this.pool.querySync(this.relays, fetchFilter);
console.log(`[nostr] fetched ${events.length} existing events from relay`);
for (const ev of events) {
this.cacheNostrEvent(ev);
this.handleIncoming(ev);
}
} catch {
// relay might be unreachable — continue with empty cache
} catch (err) {
console.warn('[nostr] relay fetch failed:', err);
}
// Subscribe for new real-time events
// Subscribe for real-time events
const subFilter: Filter = {
kinds: [NOSTR_EVENT_KIND],
'#channel': [roomKey],
since: Math.floor(Date.now() / 1000),
'#t': [this.topicTag],
since: Math.floor(Date.now() / 1000) - 1,
};
this.sub = this.pool.subscribeMany(this.relays, subFilter, {
onevent: (ev) => {
const isNew = this.cacheNostrEvent(ev);
if (isNew) this._onNewEvent?.();
},
onevent: (ev) => this.handleIncoming(ev),
});
console.log(`[nostr] joined room "${roomKey}" on ${this.relays.length} relays`);
}
/**
* Publish our inventory to the room.
* Called by FolderSyncDB after joining (it has the full applied set from IDB).
*/
async publishInventory(allKnownFilenames: string[], round = 0): Promise<void> {
const msg: NostrMessage = {
_type: 'inventory',
clientId: this.clientId,
known: allKnownFilenames,
round,
};
await this.publishToRelay(JSON.stringify(msg));
console.log(
`[nostr] sent inventory (round ${round}, ${allKnownFilenames.length} events known)`,
);
}
leaveRoom(): void {
@ -98,6 +133,7 @@ export class NostrTransport {
this.sub = null;
}
this.roomKey = null;
this.topicTag = null;
this.eventCache.clear();
}
@ -109,34 +145,44 @@ export class NostrTransport {
return this.roomKey;
}
// ── Callback for real-time push ─────────────────────────────
// ── Callbacks ───────────────────────────────────────────────
onNewEvent(cb: () => void): void {
this._onNewEvent = cb;
}
// ── Event I/O (same interface as FolderStore) ───────────────
onInventory(
cb: (theirKnown: Set<string>, round: number, fromClientId: string) => void,
): void {
this._onInventory = cb;
}
// ── Event I/O ───────────────────────────────────────────────
async writeEvent(filename: string, event: SyncEvent): Promise<void> {
if (!this.roomKey || !this.secretKey) return;
if (!this.topicTag || !this.secretKey) return;
this.eventCache.set(filename, event);
const nostrEvent = finalizeEvent({
kind: NOSTR_EVENT_KIND,
created_at: Math.floor(Date.now() / 1000),
tags: [
['channel', this.roomKey],
['filename', filename],
],
content: JSON.stringify(event),
}, this.secretKey);
const msg: NostrMessage = {
_type: 'event',
filename,
payload: event,
};
// Publish to all relays, don't wait for all to confirm
try {
await Promise.any(this.pool.publish(this.relays, nostrEvent as any));
} catch {
// All relays failed — event is still in local cache
await this.publishToRelay(JSON.stringify(msg));
console.log(`[nostr] published ${filename}`);
}
async republishEvents(
events: Array<{ filename: string; event: SyncEvent }>,
): Promise<void> {
if (!this.topicTag || !this.secretKey || events.length === 0) return;
console.log(`[nostr] re-publishing ${events.length} events for peer catch-up`);
for (const { filename, event } of events) {
const msg: NostrMessage = { _type: 'event', filename, payload: event };
await this.publishToRelay(JSON.stringify(msg));
}
}
@ -157,23 +203,56 @@ export class NostrTransport {
// ── Internals ───────────────────────────────────────────────
/**
* Parse a Nostr event and cache it. Returns true if the event
* was new (not already cached).
*/
private cacheNostrEvent(nostrEvent: any): boolean {
try {
const filename = nostrEvent.tags?.find(
(t: string[]) => t[0] === 'filename',
)?.[1];
if (!filename) return false;
if (this.eventCache.has(filename)) return false;
private async publishToRelay(content: string): Promise<void> {
if (!this.topicTag || !this.secretKey) return;
const syncEvent = JSON.parse(nostrEvent.content) as SyncEvent;
this.eventCache.set(filename, syncEvent);
return true;
const nostrEvent = finalizeEvent(
{
kind: NOSTR_EVENT_KIND,
created_at: Math.floor(Date.now() / 1000),
tags: [['t', this.topicTag]],
content,
},
this.secretKey,
);
try {
await Promise.any(this.pool.publish(this.relays, nostrEvent as any));
} catch {
return false;
/* relay publish failed */
}
}
private handleIncoming(nostrEvent: any): void {
try {
const hasTag = nostrEvent.tags?.some(
(t: string[]) => t[0] === 't' && t[1]?.startsWith(TOPIC_PREFIX),
);
if (!hasTag) return;
const msg: NostrMessage = JSON.parse(nostrEvent.content);
// ── Inventory message ───────────────────────────────────
if (msg._type === 'inventory') {
if (msg.clientId === this.clientId) return; // ignore our own
console.log(
`[nostr] peer ${msg.clientId.slice(0, 8)} sent inventory ` +
`(round ${msg.round}, ${msg.known.length} events)`,
);
this._onInventory?.(new Set(msg.known), msg.round, msg.clientId);
return;
}
// ── Sync event ──────────────────────────────────────────
if (msg._type === 'event' && msg.filename && msg.payload) {
if (this.eventCache.has(msg.filename)) return;
this.eventCache.set(msg.filename, msg.payload);
console.log(`[nostr] cached event ${msg.filename}`);
this._onNewEvent?.();
return;
}
} catch {
/* not a valid message */
}
}
}