import type { SyncEvent, IndexDefinition, OpenOptions } from './types.js'; import type { IDBStore } from './idb-store.js'; import type { FolderStore } from './folder-store.js'; import type { Emitter } from './emitter.js'; import { canonicalJson, sha256Hex, eventFilename } from './utils.js'; /** * Knows about every registered collection's index definitions so * it can maintain indexes when replaying events during sync. */ export class SyncEngine { private clientId: string; private conflictResolver?: OpenOptions['conflictResolver']; private collectionIndexes = new Map(); private syncing = false; constructor( private idb: IDBStore, private folder: FolderStore, private emitter: Emitter, clientId: string, conflictResolver?: OpenOptions['conflictResolver'], ) { this.clientId = clientId; this.conflictResolver = conflictResolver; } // ── Collection index registration ────────────────────────── registerIndexes(store: string, indexes: IndexDefinition[]): void { this.collectionIndexes.set(store, indexes); } getIndexes(store: string): IndexDefinition[] { return this.collectionIndexes.get(store) ?? []; } // ── Local writes (write to IDB + folder) ─────────────────── async writeKV( key: string, value: unknown, ts: number, rev: number, ): Promise { const event: SyncEvent = { type: 'put', store: 'kv', key, ts, clientId: this.clientId, data: value, rev, }; await this.persistEvent(event); } async deleteKV(key: string, ts: number, rev: number): Promise { const event: SyncEvent = { type: 'delete', store: 'kv', key, ts, clientId: this.clientId, rev, }; await this.persistEvent(event); } async writeDoc( store: string, id: string, data: unknown, ts: number, rev: number, ): Promise { const event: SyncEvent = { type: 'put', store, key: id, id, ts, clientId: this.clientId, data, rev, }; await this.persistEvent(event); } async deleteDocEvent( store: string, id: string, ts: number, rev: number, ): Promise { const event: SyncEvent = { type: 'delete', store, key: id, id, ts, clientId: this.clientId, rev, }; await this.persistEvent(event); } // ── Core persist: IDB + folder ───────────────────────────── private async persistEvent(event: SyncEvent): Promise { const canonical = canonicalJson(event); const hash = await sha256Hex(canonical); const filename = eventFilename(event.ts, hash); // Write to IDB first (fast path) await this.applyEvent(event); await this.idb.markEventApplied(filename); // Then persist to folder (if available) if (this.folder.hasHandle && (await this.folder.hasPermission())) { try { await this.folder.writeEvent(filename, event); } catch (err) { this.emitter.emit('folder:lost-permission', err); } } this.emitter.emit('change', { type: event.type, store: event.store, key: event.key, id: event.id, data: event.data, }); } // ── Sync: import folder events into IDB ──────────────────── async sync(): Promise { if (this.syncing) return; if (!this.folder.hasHandle) return; const hasAccess = await this.folder.hasPermission(); if (!hasAccess) { this.emitter.emit('folder:lost-permission'); return; } this.syncing = true; this.emitter.emit('sync:start'); try { const appliedSet = await this.idb.getAppliedSet(); const filenames = await this.folder.scanEventFilenames(); let importCount = 0; for (const name of filenames) { if (appliedSet.has(name)) continue; const event = await this.folder.readEventFile(name); if (!event) continue; const hadConflict = await this.applyEventWithConflictCheck(event); await this.idb.markEventApplied(name); importCount++; if (hadConflict) { this.emitter.emit('conflict', { filename: name, event, }); } this.emitter.emit('change', { type: event.type, store: event.store, key: event.key, id: event.id, data: event.data, source: 'sync', }); } this.emitter.emit('sync:end', { imported: importCount }); } catch (err) { this.emitter.emit('sync:end', { error: err }); } finally { this.syncing = false; } } // ── Apply a single event to IDB ──────────────────────────── private async applyEvent(event: SyncEvent): Promise { if (event.store === 'kv') { if (event.type === 'put') { await this.idb.putKV({ key: event.key, value: event.data, ts: event.ts, rev: event.rev ?? 0, }); } else { await this.idb.deleteKV(event.key); } } else { const indexes = this.getIndexes(event.store); if (event.type === 'put') { await this.idb.putDoc( { store: event.store, id: event.id ?? event.key, data: event.data, ts: event.ts, rev: event.rev ?? 0, }, indexes, ); } else { await this.idb.deleteDoc( event.store, event.id ?? event.key, event.ts, event.rev ?? 0, indexes, ); } } } /** * Apply an incoming event, checking for conflicts using LWW. * Returns true if a conflict was detected (even if resolved). */ private async applyEventWithConflictCheck( event: SyncEvent, ): Promise { let hadConflict = false; if (event.store === 'kv') { const existing = await this.idb.getKV(event.key); if (existing) { if (event.ts > existing.ts) { // incoming wins hadConflict = true; } else if (event.ts === existing.ts) { // tie-break: use custom resolver or skip hadConflict = true; if (this.conflictResolver) { const resolved = this.conflictResolver(existing.value, event.data); event = { ...event, data: resolved }; } // With equal ts, still apply (deterministic: both sides converge) } else { // existing is newer — skip return true; // was a conflict but existing wins } } } else { const existing = await this.idb.getRawDoc( event.store, event.id ?? event.key, ); if (existing && !existing.deleted) { if (event.ts > existing.ts) { hadConflict = true; } else if (event.ts === existing.ts) { hadConflict = true; if (this.conflictResolver) { const resolved = this.conflictResolver(existing.data, event.data); event = { ...event, data: resolved }; } } else { return true; } } } await this.applyEvent(event); return hadConflict; } }